import json import urllib import io import base64 import asyncio import re import aiohttp from PIL import Image from typing import Type, Deque, Dict, Generator from mautrix.types import ImageInfo, EventType, MessageType, RelationType from mautrix.types.event.message import BaseFileInfo, Format, TextMessageEventContent from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper from mautrix.util import markdown from maubot import Plugin, MessageEvent from maubot.handlers import event, command from mautrix.client import Client from .history import History class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("respond_to_notice") helper.copy("reply_in_thread") helper.copy("model") helper.copy("name") helper.copy("nickname") helper.copy("redis_host") helper.copy("redis_port") helper.copy("haste_url") class Gpt(Plugin): name: str nickname: str history: History async def start(self) -> None: self.config.load_and_update() self.name = self.config["name"] self.nickname = self.config["nickname"] self.history = History(self.config["redis_host"], self.config["redis_port"], self.config["haste_url"]) @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: return Config async def should_respond(self, event: MessageEvent) -> bool: if (event.sender == self.client.mxid or event.content.body.startswith("!") or event.content["msgtype"] != MessageType.TEXT or event.content.relates_to["rel_type"] == RelationType.REPLACE): return False # Check if user is using element # Check if bot is mentioned if "m.mentions" in event.content: # case if element x and desktop element if re.search("(^|\s)(@)?" + self.name + "([ :,.!?]|$)", event.content["m.mentions"]["user_ids"][0], re.IGNORECASE): return True # most other clients elif re.search("(^|\s)(@)?" + self.name + "([ :,.!?]|$)", event.content.body, re.IGNORECASE): return True # element elif re.search("(^|\s)(@)?" + self.nickname + "([ :,.!?]|$)", event.content.body, re.IGNORECASE): return True # Reply to all DMs if len(await self.client.get_joined_members(event.room_id)) == 2: return True return False @event.on(EventType.ROOM_MESSAGE) async def on_message(self, event: MessageEvent) -> None: if not await self.should_respond(event): return if "m.mentions" in event.content: # case if element x and desktop element if re.search("(^|\s)(@)?" + self.name + "([ :,.!?]|$)", event.content["m.mentions"]["user_ids"][0], re.IGNORECASE): event.content.body = event.content.body.replace(self.nickname, "") # most other clients elif re.search("(^|\s)(@)?" + self.name + "([ :,.!?]|$)", event.content.body, re.IGNORECASE): event.content.body = event.content.body.replace(self.name, "") # element elif re.search(self.nickname + "([ :,.!?]|$)", event.content.body, re.IGNORECASE): event.content.body = event.content.body.replace(self.nickname, "") try: await event.mark_read() # Call the GPT API to get picture await self.client.set_typing(event.room_id, timeout=99999) messages = await self.history.get(event) response = await self._call_gpt(event.content["body"], messages) # Send the repond back to the chat room content = TextMessageEventContent(msgtype=MessageType.NOTICE, body=response, format=Format.HTML, formatted_body=markdown.render(response)) await event.respond(content, in_thread=self.config['reply_in_thread']) # Reset our typing status await self.client.set_typing(event.room_id, timeout=0) # Send this to our cache await self.history.add(event, event.content["body"], response) except Exception as e: await self.client.set_typing(event.room_id, timeout=0) self.log.exception(f"We have failed somewhere? {e}") pass async def _call_gpt(self, prompt, messages): headers = { "Content-Type": "application/json" } data = { "messages": messages, "model": self.config["model"], "prompt": prompt } async with aiohttp.request("POST", "https://nexra.aryahcr.cc/api/chat/gpt", headers=headers, data=json.dumps(data)) as response: if response.status != 200: self.log.warning(f"Unexpected status sending request to nexra.aryahcr.cc: {response.status_code}") return count = -1 response_text = await response.text() for i in range(len(response_text)): if count <= -1: if response_text[i] == "{": count = i else: break try: response_json = await response.json(content_type=None) content = response_json['gpt'] return content except e: return e @command.new(name="export") async def export(self, event: MessageEvent) -> None: await self.client.set_typing(event.room_id, timeout=9999) result = await self.history.dump(event) content = TextMessageEventContent(msgtype=MessageType.NOTICE, body=result, format=Format.HTML) await event.respond(content, in_thread=self.config['reply_in_thread']) await self.client.set_typing(event.room_id, timeout=0)