from __future__ import annotations import asyncio import re from io import BytesIO from textwrap import dedent from typing import Annotated import discord import sympy from aiohttp import ClientSession from discord.ext import commands from PIL import Image # https://github.com/python-discord/bot-core/blob/main/pydis_core/utils/regex.py#L29 FORMATTED_CODE_REGEX = re.compile( r"(?P(?P```)|``?)" r"(?(block)(?:(?P[a-z]+)\n)?)" r"(?:[ \t]*\n)*" r"(?P.*?)" r"\s*" r"(?P=delim)", flags=re.DOTALL | re.IGNORECASE, ) # https://github.com/python-discord/bot-core/blob/main/pydis_core/utils/regex.py#L44C1-L49C2 RAW_CODE_REGEX = re.compile(r"^(?:[ \t]*\n)*" r"(?P.*?)" r"\s*$", flags=re.DOTALL) async def threads(channel: discord.TextChannel): for t in channel.threads: yield t async for t in channel.archived_threads(): yield t # https://github.com/python-discord/bot/blob/main/bot/exts/utils/snekbox/_cog.py#L97 class CodeblockConverter(commands.Converter): @classmethod async def convert(cls, ctx: commands.Context, code: str) -> list[str]: if match := list(FORMATTED_CODE_REGEX.finditer(code)): blocks = [block for block in match if block.group("block")] if len(blocks) > 1: codeblocks = [block.group("code") for block in blocks] info = "several code blocks" else: match = match[0] if len(blocks) == 0 else blocks[0] code, block, lang, delim = match.group("code", "block", "lang", "delim") codeblocks = [dedent(code)] if block: info = (f"'{lang}' highlighted" if lang else "plain") + " code block" else: info = f"{delim}-enclosed inline code" else: m = RAW_CODE_REGEX.fullmatch(code) if m is None: raise RuntimeError("what") codeblocks = [dedent(m.group("code"))] info = "unformatted or badly formatted code" code = "\n".join(codeblocks) print(f"Extracted {info} for evaluation:\n{code}") return codeblocks class Night(commands.Cog): @commands.hybrid_command() @commands.is_owner() async def reload(self, ctx: commands.Context): print("reload") bot: commands.Bot = ctx.bot try: await bot.reload_extension("nightly.night") except commands.ExtensionNotLoaded: await ctx.reply("not loaded") print("reloaded") await ctx.reply("reloaded") @commands.hybrid_command() @commands.is_owner() async def sync(self, ctx: commands.Context): await ctx.bot.tree.sync() print("synced") await ctx.reply("synced") async def leetcode_one(self, ctx: commands.Context, /, url: str): match = re.search(r"/problems/([a-z0-9\-]*)", url) if not match: await ctx.reply("invalid url") return name = match.group(1) async with ClientSession() as session: async with session.post( "https://leetcode.com/graphql/", json={ "query": "\n query consolePanelConfig($titleSlug: String!) {\n question(titleSlug: $titleSlug) {\n questionId\n questionFrontendId\n questionTitle\n enableDebugger\n enableRunCode\n enableSubmit\n enableTestMode\n exampleTestcaseList\n metaData\n }\n}\n ", "variables": {"titleSlug": name}, "operationName": "consolePanelConfig", }, ) as response: if not response.ok: await ctx.reply("error (idk)") return match await response.json(): case {"data": {"question": {"questionFrontendId": qfi, "questionTitle": qt}}}: title = f"{qfi}. {qt}" print(title) case _: await ctx.reply("error (leetcode format wrong)") return if not isinstance(ctx.channel, discord.TextChannel): await ctx.reply("not available outside text channels") return async for thread in threads(ctx.channel): if thread.name == title: message = await ctx.reply(f"exists: {thread.jump_url}", delete_after=60) return thread = await ctx.channel.create_thread(name=title, type=discord.ChannelType.public_thread) message = await thread.send(f"https://leetcode.com/problems/{name}/") if not ctx.bot_permissions.manage_messages: await ctx.reply("cannot pin messages (missing permissions)", mention_author=False) return await message.pin() @commands.command(aliases=["lc"]) async def leetcode(self, ctx: commands.Context, *urls: str): async with asyncio.TaskGroup() as tg: for url in urls: tg.create_task(self.leetcode_one(ctx, url)) @commands.command() @commands.is_owner() async def lc_cleanup(self, ctx: commands.Context): async for message in ctx.channel.history(limit=1000): if message.author == ctx.me and message.content.startswith("exists:"): await message.delete() await ctx.reply("done") @commands.hybrid_command() async def tex(self, ctx: commands.Context, *, code: Annotated[list[str], CodeblockConverter]): print(code) buf = BytesIO() def preview(): sympy.preview( code[0], viewer="BytesIO", euler=False, outputbuffer=buf, dvioptions=["-fg", "White", "-bg", "Black", "-D", "200"], extra_preamble=r""" \usepackage{xcolor} \color{white} """, ) await asyncio.to_thread(preview) buf.seek(0) im0 = Image.open(buf) print(im0.size) im1 = Image.new(im0.mode, (im0.width + 40, im0.height + 40)) im1.paste(im0, (20, 20)) buf = BytesIO() im1.save(buf, format="png") buf.seek(0) await ctx.reply(file=discord.File(buf, filename="tex.png")) async def setup(bot: commands.Bot): global cog cog = Night() await bot.add_cog(cog) async def teardown(bot: commands.Bot): global cog await bot.remove_cog(cog.qualified_name) del cog print("torn down")