maubot-gpt-bot/gpt/gpt.py

141 lines
5.1 KiB
Python
Raw Normal View History

2024-04-06 23:51:14 +02:00
import json
import urllib
import requests
import io
import base64
import asyncio
import requests
import re
from PIL import Image
from typing import Type, Deque, Dict, Generator
from mautrix.types import ImageInfo, EventType, MessageType, RelationType
from mautrix.types.event.message import BaseFileInfo, Format, TextMessageEventContent
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
from mautrix.util import markdown
from maubot import Plugin, MessageEvent
from maubot.handlers import event
from mautrix.client import Client
from .history import History
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("respond_to_notice")
helper.copy("reply_in_thread")
helper.copy("model")
helper.copy("name")
helper.copy("nickname")
helper.copy("redis_host")
helper.copy("redis_port")
class Gpt(Plugin):
name: str
nickname: str
history: History
async def start(self) -> None:
self.config.load_and_update()
self.name = self.config["name"]
self.nickname = self.config["nickname"]
self.history = History(self.config["redis_host"], self.config["redis_port"])
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
async def should_respond(self, event: MessageEvent) -> bool:
if (event.sender == self.client.mxid or
event.content.body.startswith("!") or
event.content["msgtype"] != MessageType.TEXT or
event.content.relates_to["rel_type"] == RelationType.REPLACE):
return False
# Check if user is using element
# Check if bot is mentioned
if "m.mentions" in event.content:
# case if element x and desktop element
if re.search("(^|\s)(@)?" + self.name + "([ :,.!?]|$)", event.content["m.mentions"]["user_ids"][0], re.IGNORECASE):
return True
# most other clients
elif re.search("(^|\s)(@)?" + self.name + "([ :,.!?]|$)", event.content.body, re.IGNORECASE):
return True
# element
elif re.search("(^|\s)(@)?" + self.nickname + "([ :,.!?]|$)", event.content.body, re.IGNORECASE):
return True
# Reply to all DMs
if len(await self.client.get_joined_members(event.room_id)) == 2:
return True
return False
@event.on(EventType.ROOM_MESSAGE)
async def on_message(self, event: MessageEvent) -> None:
if not await self.should_respond(event):
return
if "m.mentions" in event.content:
# case if element x and desktop element
if re.search("(^|\s)(@)?" + self.name + "([ :,.!?]|$)", event.content["m.mentions"]["user_ids"][0], re.IGNORECASE):
event.content.body = event.content.body.replace(self.nickname, "")
# most other clients
elif re.search("(^|\s)(@)?" + self.name + "([ :,.!?]|$)", event.content.body, re.IGNORECASE):
event.content.body = event.content.body.replace(self.name, "")
# element
elif re.search(self.nickname + "([ :,.!?]|$)", event.content.body, re.IGNORECASE):
event.content.body = event.content.body.replace(self.nickname, "")
try:
await event.mark_read()
# Call the GPT API to get picture
await self.client.set_typing(event.room_id, timeout=99999)
messages = await self.history.get(event)
print(messages)
response = await self._call_gpt(event.content["body"], messages)
# Send the repond back to the chat room
content = TextMessageEventContent(msgtype=MessageType.NOTICE, body=response, format=Format.HTML, formatted_body=markdown.render(response))
await event.respond(content, in_thread=self.config['reply_in_thread'])
# Reset our typing status
await self.client.set_typing(event.room_id, timeout=0)
# Send this to our cache
await self.history.add(event, event.content["body"], response)
except Exception as e:
await self.client.set_typing(event.room_id, timeout=0)
self.log.exception(f"We have failed somewhere? {e}")
pass
async def _call_gpt(self, prompt, messages):
headers = { "Content-Type": "application/json" }
data = {
"messages": messages,
"model": self.config["model"],
"prompt": prompt
}
response = requests.post("https://nexra.aryahcr.cc/api/chat/gpt", headers=headers, data=json.dumps(data))
if response.status_code != 200:
self.log.warning(f"Unexpected status sending request to nexra.aryahcr.cc: {response.status_code}")
return
count = -1
for i in range(len(response.text)):
if count <= -1:
if response.text[i] == "{":
count = i
else:
break
response_json = json.loads(response.text[count:])
content = response_json['gpt']
return content