From 1b2102db3fb74c1ce765720b004c1e6dff01bdd3 Mon Sep 17 00:00:00 2001 From: timofey Date: Fri, 25 Aug 2023 02:12:10 +0000 Subject: [PATCH] on-request optimisation --- starbot/starbot/stars.py | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/starbot/starbot/stars.py b/starbot/starbot/stars.py index 50c7018..ff58193 100644 --- a/starbot/starbot/stars.py +++ b/starbot/starbot/stars.py @@ -51,41 +51,45 @@ class Increment: return self.__current -class AlreadyProcessed(Exception): +class EventuallyConsistent(Exception): + pass + + +class AlreadyRequested(EventuallyConsistent): + pass + + +class AlreadyProcessed(EventuallyConsistent): pass class MessageState: def __init__(self) -> None: self.increment = Increment() - self.current = self.increment.next() + self.requested = self.processing = self.done = self._next() self.__lock = asyncio.Lock() - def _pull(self): + def _next(self): return self.increment.next() - def _push(self, token: int): - self.current = token - - def _before(self, token: int): - return self.current < token - async def __aenter__(self) -> AsyncContextManager[None]: @asynccontextmanager async def inner(): - token = self._pull() + if self.processing < self.requested: + raise AlreadyRequested() + self.requested = current = self._next() async with self.__lock: - if self._before(token): - to_push = self._pull() + if self.done < current: + self.processing = self._next() yield - self._push(to_push) + self.done = self.processing else: raise AlreadyProcessed() return inner() async def __aexit__(self, et, ev, tb, /): - return et and issubclass(et, AlreadyProcessed) + return et and issubclass(et, EventuallyConsistent) def states(state: StarState): @@ -171,14 +175,12 @@ class StarEventCtx: return await event_channel.fetch_message(self.message_id) async def _on(self) -> None: - star_channel, message = await asyncio.gather( - self._get_channel(self.star_channel_id), self._get_message() - ) + star_channel, message = await asyncio.gather(self._get_channel(self.star_channel_id), self._get_message()) await StarMessageCtx(message, star_channel, self.count).on() async def on(self) -> None: async with self.states.reserve(self.message_id) as state, state as guard, guard: - await self._on() + await asyncio.create_task(self._on()) class ReactionCtx: