Catching Async Deadlocks in Python asyncio Before They Freeze Your App

June 10, 2026 8 min read 9 views

Your asyncio application is running fine, then suddenly it stops doing anything. No exception, no traceback, no log output β€” just silence. You've hit an async deadlock, and Python isn't going to tell you where it came from.

Async deadlocks are harder to spot than their threading counterparts because there's no stack of blocked threads to inspect. Everything lives on a single event loop, and when that loop stalls, your whole app stalls with it.

What you'll learn

  • How async deadlocks form in an event loop and why they're hard to see
  • The most common patterns that cause them in real asyncio code
  • How to detect a stalled event loop at runtime
  • Debugging techniques using asyncio built-ins and third-party tools
  • Defensive coding patterns that prevent deadlocks from forming in the first place

Prerequisites

You should be comfortable writing coroutines with async def and await, and know what the event loop does at a high level. All examples target Python 3.10+ where the asyncio API is stable and expressive.

How Async Deadlocks Actually Form

In threaded code, a deadlock happens when two threads each hold a lock the other needs. In asyncio, the mechanism is different but the result is the same: nothing makes progress.

The event loop runs one coroutine at a time. A coroutine yields control by hitting an await expression, which lets the loop pick up the next waiting task. A deadlock occurs when every runnable task is waiting for something that can only be unblocked by another task β€” and none of them can run.

The most common trigger is a coroutine that blocks the event loop entirely, starving all other tasks. When task A is waiting for task B to produce a result, and task B can never run because task A never yields, you're stuck.

Pattern 1: Calling Blocking Code Without an Executor

This is the single most common cause of a frozen asyncio app. You call a synchronous, blocking function directly inside a coroutine without wrapping it in an executor.

import asyncio
import time

async def fetch_data():
    time.sleep(5)  # blocks the entire event loop for 5 seconds
    return "done"

async def main():
    await asyncio.gather(
        fetch_data(),
        fetch_data(),
    )

asyncio.run(main())

Both coroutines look concurrent, but time.sleep blocks the OS thread the event loop runs on. The loop cannot switch to the second coroutine until the first sleep finishes. Replace blocking calls with their async equivalents, or offload them to a thread pool:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor()

async def fetch_data():
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(executor, time.sleep, 5)
    return "done"

For CPU-bound work that takes real time, use ProcessPoolExecutor instead so you aren't competing with the GIL.

Pattern 2: Awaiting a Future That Nobody Will Resolve

A bare asyncio.Future is a promise that some other piece of code will call future.set_result(). If that code never runs β€” or runs after the future is already cancelled β€” any coroutine awaiting it waits forever.

import asyncio

async def main():
    loop = asyncio.get_running_loop()
    future = loop.create_future()

    # Forgot to schedule anything that calls future.set_result()
    result = await future  # hangs indefinitely
    print(result)

asyncio.run(main())

Always pair a future with the coroutine responsible for resolving it, and use asyncio.wait_for with a timeout as a safety net:

async def resolver(future):
    await asyncio.sleep(1)
    future.set_result("resolved")

async def main():
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    asyncio.create_task(resolver(future))

    try:
        result = await asyncio.wait_for(future, timeout=3.0)
        print(result)
    except asyncio.TimeoutError:
        print("future was never resolved")

asyncio.run(main())

Pattern 3: Lock Acquisition Cycles

Asyncio provides asyncio.Lock, and it can deadlock just like a threading lock if you create a cycle. Task A holds lock 1 and waits for lock 2. Task B holds lock 2 and waits for lock 1. Both yield at the await lock.acquire() call, and the loop spins without making progress on either.

import asyncio

lock1 = asyncio.Lock()
lock2 = asyncio.Lock()

async def task_a():
    async with lock1:
        await asyncio.sleep(0)  # yield to let task_b acquire lock2
        async with lock2:  # now waits forever
            print("task_a done")

async def task_b():
    async with lock2:
        await asyncio.sleep(0)
        async with lock1:  # now waits forever
            print("task_b done")

async def main():
    await asyncio.gather(task_a(), task_b())

asyncio.run(main())

The fix is consistent lock ordering: always acquire locks in the same global order across all coroutines. If every task acquires lock1 before lock2, a cycle cannot form.

Pattern 4: Queues With No Consumer

An asyncio.Queue with await queue.join() waits until every item has been processed. If the consumer task crashes or is never started, the producer hangs at join() indefinitely.

import asyncio

async def producer(queue):
    for i in range(5):
        await queue.put(i)
    await queue.join()  # blocks if consumer never calls task_done()
    print("all items processed")

async def main():
    queue = asyncio.Queue()
    # Oops β€” forgot to start the consumer task
    await producer(queue)

asyncio.run(main())

Always create consumer tasks before calling queue.join(), and consider wrapping producers in a timeout so a missing consumer surfaces as an error rather than a hang.

Detecting a Stalled Event Loop at Runtime

You can't always reproduce a deadlock locally. For production systems, you need runtime detection.

Enable asyncio debug mode

Set the environment variable PYTHONASYNCIODEBUG=1 or call asyncio.run(main(), debug=True). In debug mode, asyncio logs a warning whenever a coroutine blocks the event loop for longer than 100 milliseconds. This won't catch every deadlock, but it surfaces blocking calls immediately.

PYTHONASYNCIODEBUG=1 python myapp.py

Use a watchdog task

A lightweight watchdog coroutine runs alongside your application and tracks whether the loop is still making progress. If the loop stalls, the watchdog never gets scheduled either β€” but you can run it on a separate thread to detect that silence.

import asyncio
import threading
import time

_last_tick = time.monotonic()

async def loop_ticker():
    global _last_tick
    while True:
        _last_tick = time.monotonic()
        await asyncio.sleep(1)

def watchdog(threshold_seconds=5):
    while True:
        time.sleep(threshold_seconds)
        age = time.monotonic() - _last_tick
        if age > threshold_seconds:
            print(f"WARNING: event loop may be stalled ({age:.1f}s since last tick)")

async def main():
    asyncio.create_task(loop_ticker())
    threading.Thread(target=watchdog, daemon=True).start()
    # ... rest of your application
    await asyncio.sleep(60)

asyncio.run(main())

Dump all running tasks on demand

When you suspect a hang, asyncio.all_tasks() gives you every task currently scheduled. Print their stack frames to see where each one is stuck:

import asyncio
import signal
import sys

def dump_tasks(signum, frame):
    loop = asyncio.get_event_loop()
    tasks = asyncio.all_tasks(loop)
    for task in tasks:
        task.print_stack()

signal.signal(signal.SIGUSR1, dump_tasks)

Send SIGUSR1 to the process while it's hung and you'll get a full coroutine stack dump in your logs. On Windows, use signal.SIGBREAK instead.

Using Third-Party Tools

The aiomonitor library attaches a telnet console to a running asyncio application. You can connect to it while the app is live and inspect tasks, stack traces, and loop state without restarting. It's particularly useful for long-running services where you cannot reproduce the deadlock in a test environment. Install it with pip install aiomonitor and start it alongside your app loop.

The py-spy sampling profiler works at the OS level and can attach to a running Python process without modifying your code. Run py-spy dump --pid <pid> to get a snapshot of every thread's current call stack. Because it operates outside the GIL, it works even when the event loop thread is completely blocked.

Common Pitfalls to Watch For

  • Mixing asyncio.run calls: Calling asyncio.run from inside an already-running event loop raises a RuntimeError in recent Python versions, but in some environments it silently creates a nested loop that stalls. Use asyncio.get_event_loop().run_until_complete only when you know there is no running loop.
  • Synchronous teardown in __del__: Object destructors run outside the event loop's control. If your __del__ method awaits something β€” even indirectly through a synchronous wrapper β€” it will block whichever thread runs the garbage collector.
  • Forgetting await on a coroutine call: Calling a coroutine without await returns a coroutine object but does not schedule it. This is a logic bug that looks like a deadlock when another task is waiting for work that never starts. Enable Python's -W error::RuntimeWarning flag to catch unawaited coroutines during development.
  • Shield misuse: asyncio.shield protects a coroutine from cancellation, but if you shield something that itself blocks indefinitely, you lose the ability to cancel it as an escape valve. Use shield conservatively.
  • Long-lived asyncio.gather without error handling: If one task in a gather call raises an exception and you haven't set return_exceptions=True, the other tasks are cancelled. But if one task deadlocks silently, the gather never returns and the exception from other tasks is never surfaced.

Defensive Patterns That Prevent Deadlocks

The most effective approach is to make deadlocks structurally impossible rather than relying on detection after the fact.

Put a timeout on every external wait. Wrapping every await that touches I/O, a lock, or a queue with asyncio.wait_for(coro, timeout=N) means a stall becomes a TimeoutError you can log and handle, not a silent freeze.

Prefer asyncio.TaskGroup (available since Python 3.11) over bare gather. A task group propagates exceptions immediately and cancels sibling tasks, so a hung coroutine doesn't silently hold up the whole group indefinitely.

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(worker_one())
        tg.create_task(worker_two())

Keep critical sections short. Acquire a lock, do the minimum necessary work, and release it immediately. The longer you hold a lock, the wider the window for a cycle to form.

Audit every synchronous call inside a coroutine. If a function doesn't have an async signature, it might be blocking. Check its implementation or wrap it in run_in_executor as a precaution.

Wrapping Up

Async deadlocks are fixable once you know the patterns behind them. Here are concrete steps you can take right now:

  1. Run your application with PYTHONASYNCIODEBUG=1 and fix every blocking-call warning it surfaces.
  2. Add a watchdog thread to your production service so a stalled loop becomes an observable alert rather than a silent outage.
  3. Add asyncio.wait_for timeouts to any await that touches a lock, queue, or external resource.
  4. Replace bare asyncio.gather with asyncio.TaskGroup in Python 3.11+ to get automatic exception propagation and task cancellation.
  5. Wire up a SIGUSR1 handler that dumps all task stacks so you can diagnose a live hang without a restart.

None of these changes require a major refactor. Start with debug mode and timeouts β€” those two alone will catch the majority of deadlocks before they reach users.

πŸ“€ Share this article

Sign in to save

Comments (0)

No comments yet. Be the first!

Leave a Comment

Sign in to comment with your profile.

πŸ“¬ Weekly Newsletter

Stay ahead of the curve

Get the best programming tutorials, data analytics tips, and tool reviews delivered to your inbox every week.

No spam. Unsubscribe anytime.