on-request optimisation
This commit is contained in:
parent
4c5a034187
commit
1b2102db3f
@ -51,41 +51,45 @@ class Increment:
|
|||||||
return self.__current
|
return self.__current
|
||||||
|
|
||||||
|
|
||||||
class AlreadyProcessed(Exception):
|
class EventuallyConsistent(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class AlreadyRequested(EventuallyConsistent):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class AlreadyProcessed(EventuallyConsistent):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class MessageState:
|
class MessageState:
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self.increment = Increment()
|
self.increment = Increment()
|
||||||
self.current = self.increment.next()
|
self.requested = self.processing = self.done = self._next()
|
||||||
self.__lock = asyncio.Lock()
|
self.__lock = asyncio.Lock()
|
||||||
|
|
||||||
def _pull(self):
|
def _next(self):
|
||||||
return self.increment.next()
|
return self.increment.next()
|
||||||
|
|
||||||
def _push(self, token: int):
|
|
||||||
self.current = token
|
|
||||||
|
|
||||||
def _before(self, token: int):
|
|
||||||
return self.current < token
|
|
||||||
|
|
||||||
async def __aenter__(self) -> AsyncContextManager[None]:
|
async def __aenter__(self) -> AsyncContextManager[None]:
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def inner():
|
async def inner():
|
||||||
token = self._pull()
|
if self.processing < self.requested:
|
||||||
|
raise AlreadyRequested()
|
||||||
|
self.requested = current = self._next()
|
||||||
async with self.__lock:
|
async with self.__lock:
|
||||||
if self._before(token):
|
if self.done < current:
|
||||||
to_push = self._pull()
|
self.processing = self._next()
|
||||||
yield
|
yield
|
||||||
self._push(to_push)
|
self.done = self.processing
|
||||||
else:
|
else:
|
||||||
raise AlreadyProcessed()
|
raise AlreadyProcessed()
|
||||||
|
|
||||||
return inner()
|
return inner()
|
||||||
|
|
||||||
async def __aexit__(self, et, ev, tb, /):
|
async def __aexit__(self, et, ev, tb, /):
|
||||||
return et and issubclass(et, AlreadyProcessed)
|
return et and issubclass(et, EventuallyConsistent)
|
||||||
|
|
||||||
|
|
||||||
def states(state: StarState):
|
def states(state: StarState):
|
||||||
@ -171,14 +175,12 @@ class StarEventCtx:
|
|||||||
return await event_channel.fetch_message(self.message_id)
|
return await event_channel.fetch_message(self.message_id)
|
||||||
|
|
||||||
async def _on(self) -> None:
|
async def _on(self) -> None:
|
||||||
star_channel, message = await asyncio.gather(
|
star_channel, message = await asyncio.gather(self._get_channel(self.star_channel_id), self._get_message())
|
||||||
self._get_channel(self.star_channel_id), self._get_message()
|
|
||||||
)
|
|
||||||
await StarMessageCtx(message, star_channel, self.count).on()
|
await StarMessageCtx(message, star_channel, self.count).on()
|
||||||
|
|
||||||
async def on(self) -> None:
|
async def on(self) -> None:
|
||||||
async with self.states.reserve(self.message_id) as state, state as guard, guard:
|
async with self.states.reserve(self.message_id) as state, state as guard, guard:
|
||||||
await self._on()
|
await asyncio.create_task(self._on())
|
||||||
|
|
||||||
|
|
||||||
class ReactionCtx:
|
class ReactionCtx:
|
||||||
|
Loading…
Reference in New Issue
Block a user