264 lines
8.6 KiB
Python
264 lines
8.6 KiB
Python
import asyncio
|
|
from typing import Optional, TypedDict
|
|
|
|
import discord
|
|
from v6d3vote.config import myroot, prefix
|
|
|
|
from ptvp35 import *
|
|
from v6d1tokens.client import *
|
|
from v6d2ctx.at_of import *
|
|
from v6d2ctx.context import *
|
|
from v6d2ctx.handle_content import *
|
|
from v6d2ctx.lock_for import *
|
|
from v6d2ctx.pain import *
|
|
from v6d2ctx.serve import *
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
token = loop.run_until_complete(request_token('vote', 'token'))
|
|
client = discord.Client(
|
|
intents=discord.Intents(
|
|
members=True,
|
|
guilds=True,
|
|
bans=True,
|
|
emojis=True,
|
|
invites=True,
|
|
guild_messages=True,
|
|
reactions=True,
|
|
message_content=True,
|
|
)
|
|
)
|
|
vote_db = Db(myroot / 'vote.db', kvfactory=KVJson())
|
|
|
|
|
|
@client.event
|
|
async def on_ready():
|
|
print("ready")
|
|
await client.change_presence(
|
|
activity=discord.Game(
|
|
name='феноменально'
|
|
)
|
|
)
|
|
|
|
|
|
at_of: AtOf[str, command_type] = AtOf()
|
|
at, of = at_of()
|
|
|
|
|
|
@at('help')
|
|
async def help_(ctx: Context, args: list[str]) -> None:
|
|
match args:
|
|
case []:
|
|
await ctx.reply('poll bot')
|
|
case [name]:
|
|
await ctx.reply(f'help for {name}: `{name} help`')
|
|
|
|
|
|
class SavedPoll(TypedDict):
|
|
votes: dict[str, str]
|
|
emojis: dict[str, str]
|
|
options: list[str]
|
|
title: str
|
|
|
|
|
|
class Poll:
|
|
def __init__(
|
|
self,
|
|
message: discord.Message,
|
|
votes: dict[discord.Member, str],
|
|
emojis: dict[str, str],
|
|
options: list[str],
|
|
title: str
|
|
):
|
|
self.message = message
|
|
self.votes = votes
|
|
self.emojis = emojis
|
|
self.reverse: dict[str, str] = {emoji: option for option, emoji in emojis.items()}
|
|
self.options = options
|
|
self.title = title
|
|
|
|
def saved(self):
|
|
return {
|
|
'votes': {
|
|
str(member.id): option for member, option in self.votes.items()
|
|
},
|
|
'emojis': self.emojis,
|
|
'options': self.options,
|
|
'title': self.title
|
|
}
|
|
|
|
def content(self):
|
|
count: dict[str, int] = {}
|
|
for _, option in self.votes.items():
|
|
count[option] = count.get(option, 0) + 1
|
|
return (
|
|
f'{self.title}\n'
|
|
+
|
|
'\n'.join(f'{self.emojis[option]} `{count.get(option, 0)}` {option}' for option in self.options)
|
|
)
|
|
|
|
async def save(self):
|
|
await vote_db.set(
|
|
self.message.id,
|
|
self.saved()
|
|
)
|
|
await self.message.edit(
|
|
content=self.content()
|
|
)
|
|
|
|
@classmethod
|
|
async def create(
|
|
cls, ctx: Context, options: list[tuple[str, discord.Emoji | str]], title: str,
|
|
votes: dict[discord.Member, str]
|
|
):
|
|
message: discord.Message = await ctx.channel.send('creating poll...')
|
|
async with lock_for(message, 'failed to create poll'):
|
|
if len(set(emoji for option, emoji in options)) != len(options):
|
|
raise Explicit('duplicate emojis')
|
|
if len(set(option for option, emoji in options)) != len(options):
|
|
raise Explicit('duplicate options')
|
|
poll = cls(
|
|
message,
|
|
votes,
|
|
{option: str(emoji) for option, emoji in options},
|
|
[option for option, _ in options],
|
|
title
|
|
)
|
|
for _, emoji in options:
|
|
await message.add_reaction(emoji)
|
|
await poll.save()
|
|
await ctx.message.delete()
|
|
|
|
@staticmethod
|
|
async def load_votes(guild: discord.Guild, votes: dict[str, str]) -> dict[discord.Member, str]:
|
|
loaded: dict[discord.Member, str] = {}
|
|
for member, option in votes.items():
|
|
try:
|
|
loaded[guild.get_member(int(member)) or await guild.fetch_member(int(member))] = option
|
|
except (ValueError, discord.HTTPException):
|
|
pass
|
|
return loaded
|
|
|
|
@classmethod
|
|
async def load(cls, message: discord.Message) -> Optional['Poll']:
|
|
saved: Optional[SavedPoll] = vote_db.get(message.id, None)
|
|
if saved is None:
|
|
return None
|
|
assert message.guild is not None
|
|
guild: discord.Guild = message.guild
|
|
return cls(
|
|
message,
|
|
await cls.load_votes(guild, saved['votes']),
|
|
saved['emojis'],
|
|
saved['options'],
|
|
saved.get('title', 'unnamed')
|
|
)
|
|
|
|
@classmethod
|
|
async def global_vote(cls, rrae: discord.RawReactionActionEvent):
|
|
assert client.user is not None
|
|
if rrae.user_id == client.user.id:
|
|
return
|
|
assert rrae.guild_id is not None
|
|
assert rrae.channel_id is not None
|
|
guild: discord.Guild = client.get_guild(rrae.guild_id) or await client.fetch_guild(rrae.guild_id)
|
|
member: discord.Member = guild.get_member(rrae.user_id) or await guild.fetch_member(rrae.user_id)
|
|
_channel = guild.get_channel(rrae.channel_id)
|
|
assert isinstance(_channel, discord.TextChannel)
|
|
channel: discord.TextChannel = _channel
|
|
message: discord.Message = await channel.fetch_message(rrae.message_id)
|
|
if message.author != client.user:
|
|
return
|
|
async with lock_for(message, 'no message'):
|
|
poll = await cls.load(message)
|
|
if poll is None:
|
|
return
|
|
await poll.vote(member, rrae.emoji, rrae.event_type == 'REACTION_REMOVE')
|
|
await poll.save()
|
|
|
|
async def vote(self, member: discord.Member, emoji: discord.PartialEmoji | str, remove: bool):
|
|
if str(emoji) in self.reverse:
|
|
option = self.reverse[str(emoji)]
|
|
if remove:
|
|
if self.votes.get(member) == option:
|
|
del self.votes[member]
|
|
else:
|
|
self.votes[member] = option
|
|
for other_reaction in self.message.reactions:
|
|
if str(other_reaction.emoji) != str(emoji):
|
|
await self.message.remove_reaction(other_reaction.emoji, member)
|
|
|
|
|
|
async def poll_options(args: list[str]) -> list[tuple[str, discord.Emoji | str]]:
|
|
options: list[tuple[str, discord.Emoji | str]] = []
|
|
while args:
|
|
match args:
|
|
case [emoji, option, *args]:
|
|
if '<' in emoji and '>' in emoji:
|
|
try:
|
|
emoji = client.get_emoji(
|
|
int(''.join(c for c in emoji.rsplit(':', 1)[-1] if c.isdecimal()))
|
|
) or emoji
|
|
except (discord.NotFound, ValueError):
|
|
pass
|
|
options.append((option, emoji))
|
|
case _:
|
|
raise Explicit('option not specified')
|
|
return options
|
|
|
|
|
|
@at('poll')
|
|
async def create_poll(ctx: Context, args: list[str]) -> None:
|
|
match args:
|
|
case ['help']:
|
|
await ctx.reply('`poll title emoji option [emoji option ...]`')
|
|
await ctx.reply('`poll emoji option [emoji option ...]` (reply fork)')
|
|
case []:
|
|
raise Explicit('no options')
|
|
case [*args] if ctx.message.reference is not None and isinstance(ctx.message.reference.resolved, discord.Message):
|
|
refd: discord.Message = ctx.message.reference.resolved
|
|
poll = await Poll.load(refd)
|
|
if poll is None:
|
|
raise Explicit('referenced message is not a poll')
|
|
await Poll.create(ctx, await poll_options(args), poll.title, poll.votes)
|
|
case [title, *args]:
|
|
await Poll.create(ctx, await poll_options(args), title, {})
|
|
|
|
|
|
@client.event
|
|
async def on_message(message: discord.Message) -> None:
|
|
await handle_content(of, message, message.content, prefix, client)
|
|
|
|
|
|
@client.event
|
|
async def on_raw_reaction_add(rrae: discord.RawReactionActionEvent) -> None:
|
|
await Poll.global_vote(rrae)
|
|
|
|
|
|
@client.event
|
|
async def on_raw_reaction_remove(rrae: discord.RawReactionActionEvent) -> None:
|
|
await Poll.global_vote(rrae)
|
|
|
|
|
|
async def main():
|
|
async with vote_db:
|
|
await client.login(token)
|
|
await client.connect()
|
|
print('exited')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
from contextlib import ExitStack
|
|
with ExitStack() as es:
|
|
ALog(client, 'connect').enter(es)
|
|
ALog(client, 'close').enter(es)
|
|
ALog(Db, '__aenter__').enter(es)
|
|
ALog(Db, '__aexit__').enter(es)
|
|
ALog(Db, 'aclose').enter(es)
|
|
SLog(Db, '_build_file_sync').enter(es)
|
|
SLog(Db, '_finish_recovery_sync').enter(es)
|
|
SLog(Db, '_copy_sync').enter(es)
|
|
ALog(loop, 'run_in_executor').enter(es)
|
|
serve(main(), client, loop)
|
|
print('after serve')
|