diff --git a/v6d3losyash/run-bot.py b/v6d3losyash/run-bot.py index 59820fa..f711a12 100644 --- a/v6d3losyash/run-bot.py +++ b/v6d3losyash/run-bot.py @@ -3,6 +3,7 @@ import json import re from io import StringIO from pathlib import Path +from traceback import print_exc from typing import AsyncIterable, Iterable, Optional, Union import discord @@ -14,7 +15,7 @@ from v6d3losyash import config loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) -token = loop.run_until_complete(request_token('losyash', 'token')) +token = loop.run_until_complete(request_token("losyash", "token")) client = discord.Client( intents=discord.Intents( members=True, @@ -26,16 +27,16 @@ client = discord.Client( message_content=True, ), ) -ESCAPED = '`_*\'"\\' -nest_db = Db(config.myroot / 'nest.db', kvfactory=KVJson()) -spam_db = Db(config.myroot / 'spam.db', kvfactory=KVJson()) +ESCAPED = "`_*'\"\\" +nest_db = Db(config.myroot / "nest.db", kvfactory=KVJson()) +spam_db = Db(config.myroot / "spam.db", kvfactory=KVJson()) def escape(s: str): res = StringIO() for c in s: if c in ESCAPED: - c = '\\' + c + c = "\\" + c res.write(c) return res.getvalue() @@ -43,69 +44,69 @@ def escape(s: str): class Spam: def __init__(self, mode: str, channel: int) -> None: self.spammed = False - self.root = Path(__file__).parent / '../constitution' - self.path = self.root / f'{mode}.md' + self.root = Path(__file__).parent / "../constitution" + self.path = self.root / f"{mode}.md" self.channel = channel self.lock = asyncio.Lock() def _format_segment(self, segment: str) -> str: segment = re.sub( - r'\[[\d.xX]*?\]', - lambda m: f'\u001b[0;1;36m{m[0]}\u001b[0m', + r"\[[\d.xX]*?\]", + lambda m: f"\u001b[0;1;36m{m[0]}\u001b[0m", segment, ) segment = re.sub( - r'\#[\w-]*', - lambda m: f'\u001b[0;34m{m[0]}\u001b[0m', + r"\#[\w-]*", + lambda m: f"\u001b[0;34m{m[0]}\u001b[0m", segment, ) segment = re.sub( - r'\:\:', - lambda m: f'\u001b[0;31;41m{m[0]}\u001b[0m', + r"\:\:", + lambda m: f"\u001b[0;31;41m{m[0]}\u001b[0m", segment, ) segment = re.sub( - r'\@(\[.*?\])', - lambda m: f'\u001b[0;33m{m[1]}\u001b[0m', + r"\@(\[.*?\])", + lambda m: f"\u001b[0;33m{m[1]}\u001b[0m", segment, ) segment = re.sub( - r'\{([\d;]*)\:(.*?)\}', - lambda m: f'\u001b[0;{m[1]}m{m[2]}\u001b[0m', + r"\{([\d;]*)\:(.*?)\}", + lambda m: f"\u001b[0;{m[1]}m{m[2]}\u001b[0m", segment, ) return segment def format_segment(self, segment: str) -> tuple[str, list[discord.Embed], dict]: - segment, *sflagsl = segment.split('+flags') + segment, *sflagsl = segment.split("+flags") flags = {} for sflags in sflagsl: flags.update(json.loads(sflags)) - segment, *esegments = segment.split('+embed') + segment, *esegments = segment.split("+embed") segment = self._format_segment(segment) embeds = [] for esegment in esegments: ejson: dict = json.loads(esegment) embed = discord.Embed( - colour=ejson.get('colour'), - title=ejson.get('title'), - type=ejson.get('type', 'rich'), - url=ejson.get('url'), - description=ejson.get('description'), + colour=ejson.get("colour"), + title=ejson.get("title"), + type=ejson.get("type", "rich"), + url=ejson.get("url"), + description=ejson.get("description"), ) embeds.append(embed) fjson: dict - for fjson in ejson.get('fields', []): - embed.add_field(name=fjson['name'], value=fjson['value'], inline=fjson.get('inline', False)) + for fjson in ejson.get("fields", []): + embed.add_field(name=fjson["name"], value=fjson["value"], inline=fjson.get("inline", False)) return segment.strip(), embeds, flags def _ru_segments(self) -> Iterable[tuple[str, list[discord.Embed], dict]]: - return map(self.format_segment, self.path.read_text().split('=' * 80)) + return map(self.format_segment, self.path.read_text().split("=" * 80)) async def _ru_segments_checked(self) -> AsyncIterable[tuple[str, list[discord.Embed], dict]]: for i, (segment, embeds, flags) in enumerate(await asyncio.to_thread(self._ru_segments)): match flags: - case {'fix': int() as fix}: + case {"fix": int() as fix}: assert i == fix, (i, fix) yield segment, embeds, flags @@ -116,7 +117,7 @@ class Spam: if self.spammed: return try: - print('spamming') + print("spamming") guild = await client.fetch_guild(config.guild) for role in await guild.fetch_roles(): print(role.colour, role) @@ -128,6 +129,7 @@ class Spam: return await channel.fetch_message(msid) except discord.NotFound: return None + broken_order = False for i, (segment, embeds, flags) in enumerate(await self.ru_segments()): content = segment.strip() @@ -136,8 +138,7 @@ class Spam: messageid: int | None if ( (messageid := spam_db.get(dbkey, None)) is None - or - (message := await fetch(messageid)) is None + or (message := await fetch(messageid)) is None or broken_order ): if broken_order and message is not None: @@ -149,17 +150,15 @@ class Spam: if content != message.content or embeds != message.embeds: await message.edit(content=content, embeds=embeds) emojiid: int - for emojiid in flags.get('emojis', []): + for emojiid in flags.get("emojis", []): emoji = await guild.fetch_emoji(emojiid) await message.add_reaction(emoji) - if (custom_key := flags.get('key', None)) is not None: + if (custom_key := flags.get("key", None)) is not None: await spam_db.set(custom_key, message.id) - except: - from traceback import print_exc print_exc() finally: - print('spammed') + print("spammed") self.spammed = True async def spam(self) -> None: @@ -168,16 +167,18 @@ class Spam: lock = asyncio.Lock() -spam_ru = Spam('ru', 1056432869834240080) -spam_en = Spam('en', 1057006937360842942) +spam_ru = Spam("ru", 1056432869834240080) +spam_en = Spam("en", 1057006937360842942) @client.event async def on_ready(): print("ready") - await client.change_presence(activity=discord.Game( - name='феноменально', - )) + await client.change_presence( + activity=discord.Game( + name="феноменально", + ) + ) task_ru = asyncio.create_task(spam_ru.spam()) task_en = asyncio.create_task(spam_en.spam()) async with lock: @@ -199,7 +200,7 @@ class SimpleEmoji: return NotImplemented @classmethod - def of(cls, emoji: Union[str, discord.Emoji, discord.PartialEmoji]) -> 'SimpleEmoji': + def of(cls, emoji: Union[str, discord.Emoji, discord.PartialEmoji]) -> "SimpleEmoji": if isinstance(emoji, discord.Emoji): return cls(emoji.id) elif isinstance(emoji, str): @@ -230,7 +231,7 @@ class RoleGrant: guild_ = client.get_guild(config.guild) assert guild_ is not None guild: discord.Guild = guild_ - return f'{self.emoji.to()} {guild.get_role(self.role)}' + return f"{self.emoji.to()} {guild.get_role(self.role)}" class MessageDescription: @@ -241,7 +242,7 @@ class MessageDescription: self.mapping: dict[SimpleEmoji, int] = {grant.emoji: grant.role for grant in roles} def description(self): - return self._description + '\n' + '\n'.join(map(RoleGrant.format, self.roles)) + return self._description + "\n" + "\n".join(map(RoleGrant.format, self.roles)) class ChannelDescription: @@ -267,7 +268,7 @@ class State: self.cd = cd async def _load(self): - self.defmap = nest_db.get('state-map', {}) + self.defmap = nest_db.get("state-map", {}) self.defrev.clear() for md in self.cd.mds: msg = await self._get(md.key, md.description()) @@ -309,14 +310,14 @@ class State: async def _save(self): self._reset_defmap() await self._clear_channel() - await nest_db.set('state-map', self.defmap) + await nest_db.set("state-map", self.defmap) async def reload(self): await self._load() await self._save() async def _role_member( - self, msid: int, member_id: int, emoji: SimpleEmoji + self, msid: int, member_id: int, emoji: SimpleEmoji ) -> tuple[Optional[discord.Role], Optional[discord.Member]]: key = self.defrev[msid] md = self.cd.mapping[key] @@ -337,31 +338,31 @@ class State: role, member = await self._role_member(msid, member_id, emoji) if role is None or member is None: return - await member.add_roles(role, reason='emojis go yes') - await log(f'emoji {emoji.to()} +role {role} member {member}') + await member.add_roles(role, reason="emojis go yes") + await log(f"emoji {emoji.to()} +role {role} member {member}") async def role_revoke(self, msid: int, member_id: int, emoji: SimpleEmoji): role, member = await self._role_member(msid, member_id, emoji) if role is None or member is None: return - await member.remove_roles(role, reason='emojis go no') - await log(f'emoji {emoji.to()} -role {role} member {member}') + await member.remove_roles(role, reason="emojis go no") + await log(f"emoji {emoji.to()} -role {role} member {member}") state = State( ChannelDescription( MessageDescription( - 'games', - '''Игровые роли -''', - RoleGrant(989231768341209230, SimpleEmoji('🇬🇮')), - RoleGrant(933103254752088104, SimpleEmoji('🐟')), + "games", + """Игровые роли +""", + RoleGrant(989231768341209230, SimpleEmoji("🇬🇮")), + RoleGrant(933103254752088104, SimpleEmoji("🐟")), RoleGrant(541263011822698506, SimpleEmoji(541257134407417864)), - RoleGrant(542056704842661899, SimpleEmoji('🦆')), - RoleGrant(1017483326593974282, SimpleEmoji('💸')), - RoleGrant(541349294586724365, SimpleEmoji('⛏')), - RoleGrant(934467294371938355, SimpleEmoji('🎩')), - RoleGrant(541263005971644419, SimpleEmoji('🌈')), + RoleGrant(542056704842661899, SimpleEmoji("🦆")), + RoleGrant(1017483326593974282, SimpleEmoji("💸")), + RoleGrant(541349294586724365, SimpleEmoji("⛏")), + RoleGrant(934467294371938355, SimpleEmoji("🎩")), + RoleGrant(541263005971644419, SimpleEmoji("🌈")), ) ) ) @@ -388,8 +389,8 @@ async def grant_citizenship(member_id: int): assert role is not None if role in member.roles: return - await member.add_roles(role, reason='феноменально') - await log(f'{escape(str(member))} {member.id} <:Jesus:586669134406877270>') + await member.add_roles(role, reason="феноменально") + await log(f"{escape(str(member))} {member.id} <:Jesus:586669134406877270>") @client.event @@ -400,8 +401,8 @@ async def on_raw_reaction_add(payload: discord.RawReactionActionEvent): emoji: discord.PartialEmoji = payload.emoji if emoji.id == config.emoji and payload.message_id in [ config.message, - spam_db.get('jesus', 'not an id'), - spam_db.get('jensus', 'not an id'), + spam_db.get("jesus", "not an id"), + spam_db.get("jensus", "not an id"), ]: await grant_citizenship(payload.user_id) if payload.message_id in state.defrev: @@ -426,7 +427,7 @@ async def on_member_join(member: discord.Member): channel_ = guild.get_channel(config.channel) assert isinstance(channel_, discord.TextChannel) channel: discord.TextChannel = channel_ - await channel.send(f'{escape(str(member))} {member.id} joined') + await channel.send(f"{escape(str(member))} {member.id} joined") @client.event @@ -437,18 +438,18 @@ async def on_member_remove(member: discord.Member): channel_ = guild.get_channel(config.channel) assert isinstance(channel_, discord.TextChannel) channel: discord.TextChannel = channel_ - message = await channel.send(f'{escape(str(member))} {member.id} left (joined {member.joined_at})') + message = await channel.send(f"{escape(str(member))} {member.id} left (joined {member.joined_at})") await asyncio.sleep(1) entry: discord.AuditLogEntry async for entry in guild.audit_logs(action=discord.AuditLogAction.kick): assert entry.target is not None if entry.target.id == member.id: - await message.reply(f'latest kick: {entry.created_at} {entry.reason}') + await message.reply(f"latest kick: {entry.created_at} {entry.reason}") break async for entry in guild.audit_logs(action=discord.AuditLogAction.ban): assert entry.target is not None if entry.target.id == member.id: - await message.reply(f'latest ban: {entry.created_at} {entry.reason}') + await message.reply(f"latest ban: {entry.created_at} {entry.reason}") break @@ -458,5 +459,5 @@ async def main(): await client.connect() -if __name__ == '__main__': +if __name__ == "__main__": serve(main(), client, loop)