464 lines
16 KiB
Python
464 lines
16 KiB
Python
import asyncio
|
|
import json
|
|
import re
|
|
from io import StringIO
|
|
from pathlib import Path
|
|
from traceback import print_exc
|
|
from typing import AsyncIterable, Iterable, Optional, Union
|
|
|
|
import discord
|
|
|
|
from ptvp35 import Db, KVJson
|
|
from v6d1tokens.client import *
|
|
from v6d2ctx.serve import *
|
|
from v6d3losyash import config
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
token = loop.run_until_complete(request_token("losyash", "token"))
|
|
client = discord.Client(
|
|
intents=discord.Intents(
|
|
members=True,
|
|
guilds=True,
|
|
emojis=True,
|
|
reactions=True,
|
|
invites=True,
|
|
bans=True,
|
|
message_content=True,
|
|
),
|
|
)
|
|
ESCAPED = "`_*'\"\\"
|
|
nest_db = Db(config.myroot / "nest.db", kvfactory=KVJson())
|
|
spam_db = Db(config.myroot / "spam.db", kvfactory=KVJson())
|
|
|
|
|
|
def escape(s: str):
|
|
res = StringIO()
|
|
for c in s:
|
|
if c in ESCAPED:
|
|
c = "\\" + c
|
|
res.write(c)
|
|
return res.getvalue()
|
|
|
|
|
|
class Spam:
|
|
def __init__(self, mode: str, channel: int) -> None:
|
|
self.spammed = False
|
|
self.root = Path(__file__).parent / "../constitution"
|
|
self.path = self.root / f"{mode}.md"
|
|
self.channel = channel
|
|
self.lock = asyncio.Lock()
|
|
|
|
def _format_segment(self, segment: str) -> str:
|
|
segment = re.sub(
|
|
r"\[[\d.xX]*?\]",
|
|
lambda m: f"\u001b[0;1;36m{m[0]}\u001b[0m",
|
|
segment,
|
|
)
|
|
segment = re.sub(
|
|
r"\#[\w-]*",
|
|
lambda m: f"\u001b[0;34m{m[0]}\u001b[0m",
|
|
segment,
|
|
)
|
|
segment = re.sub(
|
|
r"\:\:",
|
|
lambda m: f"\u001b[0;31;41m{m[0]}\u001b[0m",
|
|
segment,
|
|
)
|
|
segment = re.sub(
|
|
r"\@(\[.*?\])",
|
|
lambda m: f"\u001b[0;33m{m[1]}\u001b[0m",
|
|
segment,
|
|
)
|
|
segment = re.sub(
|
|
r"\{([\d;]*)\:(.*?)\}",
|
|
lambda m: f"\u001b[0;{m[1]}m{m[2]}\u001b[0m",
|
|
segment,
|
|
)
|
|
return segment
|
|
|
|
def format_segment(self, segment: str) -> tuple[str, list[discord.Embed], dict]:
|
|
segment, *sflagsl = segment.split("+flags")
|
|
flags = {}
|
|
for sflags in sflagsl:
|
|
flags.update(json.loads(sflags))
|
|
segment, *esegments = segment.split("+embed")
|
|
segment = self._format_segment(segment)
|
|
embeds = []
|
|
for esegment in esegments:
|
|
ejson: dict = json.loads(esegment)
|
|
embed = discord.Embed(
|
|
colour=ejson.get("colour"),
|
|
title=ejson.get("title"),
|
|
type=ejson.get("type", "rich"),
|
|
url=ejson.get("url"),
|
|
description=ejson.get("description"),
|
|
)
|
|
embeds.append(embed)
|
|
fjson: dict
|
|
for fjson in ejson.get("fields", []):
|
|
embed.add_field(name=fjson["name"], value=fjson["value"], inline=fjson.get("inline", False))
|
|
return segment.strip(), embeds, flags
|
|
|
|
def _ru_segments(self) -> Iterable[tuple[str, list[discord.Embed], dict]]:
|
|
return map(self.format_segment, self.path.read_text().split("=" * 80))
|
|
|
|
async def _ru_segments_checked(self) -> AsyncIterable[tuple[str, list[discord.Embed], dict]]:
|
|
for i, (segment, embeds, flags) in enumerate(await asyncio.to_thread(self._ru_segments)):
|
|
match flags:
|
|
case {"fix": int() as fix}:
|
|
assert i == fix, (i, fix)
|
|
yield segment, embeds, flags
|
|
|
|
async def ru_segments(self) -> Iterable[tuple[str, list[discord.Embed], dict]]:
|
|
return [t async for t in self._ru_segments_checked()]
|
|
|
|
async def _spam(self) -> None:
|
|
if self.spammed:
|
|
return
|
|
try:
|
|
print("spamming")
|
|
guild = await client.fetch_guild(config.guild)
|
|
for role in await guild.fetch_roles():
|
|
print(role.colour, role)
|
|
channel = await guild.fetch_channel(self.channel)
|
|
assert isinstance(channel, discord.abc.Messageable)
|
|
|
|
async def fetch(msid: int) -> discord.Message | None:
|
|
try:
|
|
return await channel.fetch_message(msid)
|
|
except discord.NotFound:
|
|
return None
|
|
|
|
broken_order = False
|
|
for i, (segment, embeds, flags) in enumerate(await self.ru_segments()):
|
|
content = segment.strip()
|
|
dbkey = (channel.id, i)
|
|
message = None
|
|
messageid: int | None
|
|
if (
|
|
(messageid := spam_db.get(dbkey, None)) is None
|
|
or (message := await fetch(messageid)) is None
|
|
or broken_order
|
|
):
|
|
if broken_order and message is not None:
|
|
await message.delete()
|
|
message = await channel.send(content=content, embeds=embeds)
|
|
await spam_db.set(dbkey, message.id)
|
|
broken_order = True
|
|
else:
|
|
if content != message.content or embeds != message.embeds:
|
|
await message.edit(content=content, embeds=embeds)
|
|
emojiid: int
|
|
for emojiid in flags.get("emojis", []):
|
|
emoji = await guild.fetch_emoji(emojiid)
|
|
await message.add_reaction(emoji)
|
|
if (custom_key := flags.get("key", None)) is not None:
|
|
await spam_db.set(custom_key, message.id)
|
|
except:
|
|
print_exc()
|
|
finally:
|
|
print("spammed")
|
|
self.spammed = True
|
|
|
|
async def spam(self) -> None:
|
|
async with self.lock:
|
|
await self._spam()
|
|
|
|
|
|
lock = asyncio.Lock()
|
|
spam_ru = Spam("ru", 1056432869834240080)
|
|
spam_en = Spam("en", 1057006937360842942)
|
|
|
|
|
|
@client.event
|
|
async def on_ready():
|
|
print("ready")
|
|
await client.change_presence(
|
|
activity=discord.Game(
|
|
name="феноменально",
|
|
)
|
|
)
|
|
task_ru = asyncio.create_task(spam_ru.spam())
|
|
task_en = asyncio.create_task(spam_en.spam())
|
|
async with lock:
|
|
await state.reload()
|
|
await task_ru
|
|
await task_en
|
|
|
|
|
|
class SimpleEmoji:
|
|
def __init__(self, ref: Union[str, int]):
|
|
self.ref = ref
|
|
|
|
def __hash__(self):
|
|
return hash(self.ref)
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, SimpleEmoji):
|
|
return self.ref == other.ref
|
|
return NotImplemented
|
|
|
|
@classmethod
|
|
def of(cls, emoji: Union[str, discord.Emoji, discord.PartialEmoji]) -> "SimpleEmoji":
|
|
if isinstance(emoji, discord.Emoji):
|
|
return cls(emoji.id)
|
|
elif isinstance(emoji, str):
|
|
return cls(emoji)
|
|
elif isinstance(emoji, discord.PartialEmoji):
|
|
if emoji.id is None:
|
|
return cls(emoji.name)
|
|
else:
|
|
return cls(emoji.id)
|
|
else:
|
|
raise TypeError
|
|
|
|
def to(self) -> Union[str, discord.Emoji]:
|
|
if isinstance(self.ref, str):
|
|
return self.ref
|
|
else:
|
|
emoji = client.get_emoji(self.ref)
|
|
assert emoji is not None
|
|
return emoji
|
|
|
|
|
|
class RoleGrant:
|
|
def __init__(self, role: int, emoji: SimpleEmoji):
|
|
self.role = role
|
|
self.emoji = emoji
|
|
|
|
def format(self) -> str:
|
|
guild_ = client.get_guild(config.guild)
|
|
assert guild_ is not None
|
|
guild: discord.Guild = guild_
|
|
return f"{self.emoji.to()} {guild.get_role(self.role)}"
|
|
|
|
|
|
class MessageDescription:
|
|
def __init__(self, key: str, description: str, *roles: RoleGrant):
|
|
self.key = key
|
|
self._description = description
|
|
self.roles = roles
|
|
self.mapping: dict[SimpleEmoji, int] = {grant.emoji: grant.role for grant in roles}
|
|
|
|
def description(self):
|
|
return self._description + "\n" + "\n".join(map(RoleGrant.format, self.roles))
|
|
|
|
|
|
class ChannelDescription:
|
|
def __init__(self, *mds: MessageDescription):
|
|
self.mds = mds
|
|
self.mapping: dict[str, MessageDescription] = {md.key: md for md in mds}
|
|
|
|
|
|
def role_channel() -> discord.TextChannel:
|
|
guild_ = client.get_guild(config.guild)
|
|
assert guild_ is not None
|
|
guild: discord.Guild = guild_
|
|
channel_ = guild.get_channel(config.role_channel)
|
|
assert isinstance(channel_, discord.TextChannel)
|
|
channel: discord.TextChannel = channel_
|
|
return channel
|
|
|
|
|
|
class State:
|
|
def __init__(self, cd: ChannelDescription):
|
|
self.defmap: dict[str, int] = {}
|
|
self.defrev: dict[int, str] = {}
|
|
self.cd = cd
|
|
|
|
async def _load(self):
|
|
self.defmap = nest_db.get("state-map", {})
|
|
self.defrev.clear()
|
|
for md in self.cd.mds:
|
|
msg = await self._get(md.key, md.description())
|
|
for reaction in msg.reactions:
|
|
emoji = SimpleEmoji.of(reaction.emoji)
|
|
if emoji not in md.mapping:
|
|
await reaction.clear()
|
|
for emoji in md.mapping.keys():
|
|
await msg.add_reaction(emoji.to())
|
|
|
|
async def _get_raw(self, key: str, content: str) -> discord.Message:
|
|
channel = role_channel()
|
|
if key in self.defmap:
|
|
try:
|
|
msg = await channel.fetch_message(self.defmap[key])
|
|
if msg.content != content:
|
|
await msg.edit(content=content)
|
|
return msg
|
|
except discord.NotFound:
|
|
pass
|
|
return await channel.send(content)
|
|
|
|
async def _get(self, key: str, content: str) -> discord.Message:
|
|
msg = await self._get_raw(key, content)
|
|
self.defmap[key] = msg.id
|
|
self.defrev[msg.id] = key
|
|
return msg
|
|
|
|
def _reset_defmap(self):
|
|
self.defmap = {key: msid for msid, key in self.defrev.items()}
|
|
|
|
async def _clear_channel(self):
|
|
assert client.user is not None
|
|
msg: discord.Message
|
|
async for msg in role_channel().history(limit=100):
|
|
if msg.author.id == client.user.id and msg.id not in self.defrev:
|
|
await msg.delete()
|
|
|
|
async def _save(self):
|
|
self._reset_defmap()
|
|
await self._clear_channel()
|
|
await nest_db.set("state-map", self.defmap)
|
|
|
|
async def reload(self):
|
|
await self._load()
|
|
await self._save()
|
|
|
|
async def _role_member(
|
|
self, msid: int, member_id: int, emoji: SimpleEmoji
|
|
) -> tuple[Optional[discord.Role], Optional[discord.Member]]:
|
|
key = self.defrev[msid]
|
|
md = self.cd.mapping[key]
|
|
role_id = md.mapping.get(emoji)
|
|
if role_id is None:
|
|
channel = role_channel()
|
|
message: discord.Message = await channel.fetch_message(msid)
|
|
await message.clear_reaction(emoji.to())
|
|
return None, None
|
|
guild: discord.Guild = await client.fetch_guild(config.guild)
|
|
role_ = guild.get_role(role_id)
|
|
assert role_ is not None
|
|
role: discord.Role = role_
|
|
member: discord.Member = await guild.fetch_member(member_id)
|
|
return role, member
|
|
|
|
async def role_grant(self, msid: int, member_id: int, emoji: SimpleEmoji):
|
|
role, member = await self._role_member(msid, member_id, emoji)
|
|
if role is None or member is None:
|
|
return
|
|
await member.add_roles(role, reason="emojis go yes")
|
|
await log(f"emoji {emoji.to()} +role {role} member {member}")
|
|
|
|
async def role_revoke(self, msid: int, member_id: int, emoji: SimpleEmoji):
|
|
role, member = await self._role_member(msid, member_id, emoji)
|
|
if role is None or member is None:
|
|
return
|
|
await member.remove_roles(role, reason="emojis go no")
|
|
await log(f"emoji {emoji.to()} -role {role} member {member}")
|
|
|
|
|
|
state = State(
|
|
ChannelDescription(
|
|
MessageDescription(
|
|
"games",
|
|
"""Игровые роли
|
|
""",
|
|
RoleGrant(989231768341209230, SimpleEmoji("🇬🇮")),
|
|
RoleGrant(933103254752088104, SimpleEmoji("🐟")),
|
|
RoleGrant(541263011822698506, SimpleEmoji(541257134407417864)),
|
|
RoleGrant(542056704842661899, SimpleEmoji("🦆")),
|
|
RoleGrant(1017483326593974282, SimpleEmoji("💸")),
|
|
RoleGrant(541349294586724365, SimpleEmoji("⛏")),
|
|
RoleGrant(934467294371938355, SimpleEmoji("🎩")),
|
|
RoleGrant(541263005971644419, SimpleEmoji("🌈")),
|
|
)
|
|
)
|
|
)
|
|
|
|
|
|
async def log(msg: str):
|
|
guild_ = client.get_guild(config.guild)
|
|
assert guild_ is not None
|
|
guild: discord.Guild = guild_
|
|
channel_ = guild.get_channel(config.channel)
|
|
assert isinstance(channel_, discord.TextChannel)
|
|
channel: discord.TextChannel = channel_
|
|
await channel.send(msg)
|
|
|
|
|
|
async def grant_citizenship(member_id: int):
|
|
guild_ = client.get_guild(config.guild)
|
|
assert guild_ is not None
|
|
guild: discord.Guild = guild_
|
|
member_ = guild.get_member(member_id)
|
|
assert member_ is not None
|
|
member: discord.Member = member_
|
|
role = guild.get_role(config.role)
|
|
assert role is not None
|
|
if role in member.roles:
|
|
return
|
|
await member.add_roles(role, reason="феноменально")
|
|
await log(f"{escape(str(member))} {member.id} <:Jesus:586669134406877270>")
|
|
|
|
|
|
@client.event
|
|
async def on_raw_reaction_add(payload: discord.RawReactionActionEvent):
|
|
assert client.user is not None
|
|
if payload.user_id == client.user.id:
|
|
return
|
|
emoji: discord.PartialEmoji = payload.emoji
|
|
if emoji.id == config.emoji and payload.message_id in [
|
|
config.message,
|
|
spam_db.get("jesus", "not an id"),
|
|
spam_db.get("jensus", "not an id"),
|
|
]:
|
|
await grant_citizenship(payload.user_id)
|
|
if payload.message_id in state.defrev:
|
|
await state.role_grant(payload.message_id, payload.user_id, SimpleEmoji.of(emoji))
|
|
|
|
|
|
@client.event
|
|
async def on_raw_reaction_remove(payload: discord.RawReactionActionEvent):
|
|
assert client.user is not None
|
|
if payload.user_id == client.user.id:
|
|
return
|
|
emoji: discord.PartialEmoji = payload.emoji
|
|
if payload.message_id in state.defrev:
|
|
await state.role_revoke(payload.message_id, payload.user_id, SimpleEmoji.of(emoji))
|
|
|
|
|
|
@client.event
|
|
async def on_member_join(member: discord.Member):
|
|
guild: discord.Guild = member.guild
|
|
if guild.id != config.guild:
|
|
return
|
|
channel_ = guild.get_channel(config.channel)
|
|
assert isinstance(channel_, discord.TextChannel)
|
|
channel: discord.TextChannel = channel_
|
|
await channel.send(f"{escape(str(member))} {member.id} joined")
|
|
|
|
|
|
@client.event
|
|
async def on_member_remove(member: discord.Member):
|
|
guild: discord.Guild = member.guild
|
|
if guild.id != config.guild:
|
|
return
|
|
channel_ = guild.get_channel(config.channel)
|
|
assert isinstance(channel_, discord.TextChannel)
|
|
channel: discord.TextChannel = channel_
|
|
message = await channel.send(f"{escape(str(member))} {member.id} left (joined {member.joined_at})")
|
|
await asyncio.sleep(1)
|
|
entry: discord.AuditLogEntry
|
|
async for entry in guild.audit_logs(action=discord.AuditLogAction.kick):
|
|
assert entry.target is not None
|
|
if entry.target.id == member.id:
|
|
await message.reply(f"latest kick: {entry.created_at} {entry.reason}")
|
|
break
|
|
async for entry in guild.audit_logs(action=discord.AuditLogAction.ban):
|
|
assert entry.target is not None
|
|
if entry.target.id == member.id:
|
|
await message.reply(f"latest ban: {entry.created_at} {entry.reason}")
|
|
break
|
|
|
|
|
|
async def main():
|
|
async with nest_db, spam_db:
|
|
await client.login(token)
|
|
await client.connect()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
serve(main(), client, loop)
|