import asyncio class DeadlockError(Exception): """Error raised if DeadlockDetectingLock detects deadlock""" class DeadlockDetectingLock: """ Lock that detects deadlock when it is about to be acquired by the same task that already holds it. """ def __init__(self): self._lock = asyncio.Lock() self._owner = None def locked(self): return self._lock.locked() async def __aenter__(self): curr_task = asyncio.current_task() if self._owner == curr_task: raise DeadlockError() await self._lock.acquire() self._owner = curr_task return self async def __aexit__(self, exc_type, exc, tb): self._owner = None self._lock.release()