#1 Data Analytics Program in India
₹2,499₹1,499Enroll Now
9 min read
•Question 30 of 41hard

Async/Await and Asyncio

Asynchronous programming in Python.

What You'll Learn

  • Coroutines, tasks, and the event loop
  • async/await syntax and semantics
  • Concurrent execution with gather and TaskGroup
  • Async context managers and generators
  • When to use asyncio vs threading

Understanding Async Programming

Asyncio provides cooperative multitasking — code explicitly yields control at await points, allowing other code to run.

code.pyPython
import asyncio

# A coroutine function (async def)
async def say_hello():
    print("Hello")
    await asyncio.sleep(1)  # Yield control for 1 second
    print("World")

# Coroutine object (calling async function)
coro = say_hello()  # Returns coroutine, doesn't run it

# Run the coroutine
asyncio.run(say_hello())  # Python 3.7+

Coroutines, Tasks, and Futures

code.pyPython
import asyncio

async def fetch_data(name, delay):
    print(f"{name}: Starting")
    await asyncio.sleep(delay)
    print(f"{name}: Done")
    return f"Result from {name}"

async def main():
    # Coroutine - suspended until awaited
    coro = fetch_data("Coro", 1)

    # Task - scheduled to run immediately
    task = asyncio.create_task(fetch_data("Task", 1))

    # Both complete
    result1 = await coro
    result2 = await task

    print(result1, result2)

asyncio.run(main())

Concurrent Execution

Using asyncio.gather()

code.pyPython
import asyncio

async def fetch(url, delay):
    print(f"Fetching {url}")
    await asyncio.sleep(delay)
    return f"Data from {url}"

async def main():
    # All run concurrently, not sequentially
    results = await asyncio.gather(
        fetch("api/users", 2),
        fetch("api/posts", 1),
        fetch("api/comments", 3)
    )
    # Total time: ~3 seconds (not 6)
    print(results)

asyncio.run(main())

Using TaskGroup (Python 3.11+)

code.pyPython
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch("api/users", 2))
        task2 = tg.create_task(fetch("api/posts", 1))
        task3 = tg.create_task(fetch("api/comments", 3))

    # All tasks complete when exiting the context
    print(task1.result(), task2.result(), task3.result())

Real-World HTTP Example

code.pyPython
import asyncio
import aiohttp

async def fetch_page(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1"
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_page(session, url) for url in urls]
        pages = await asyncio.gather(*tasks)

    # Total time: ~2 seconds (max delay, not sum)
    print(f"Fetched {len(pages)} pages")

asyncio.run(main())

Async Context Managers and Generators

code.pyPython
import asyncio

# Async context manager
class AsyncTimer:
    async def __aenter__(self):
        self.start = asyncio.get_event_loop().time()
        return self

    async def __aexit__(self, *args):
        elapsed = asyncio.get_event_loop().time() - self.start
        print(f"Elapsed: {elapsed:.2f}s")

async def main():
    async with AsyncTimer():
        await asyncio.sleep(1)
    # Elapsed: 1.00s

# Async generator
async def async_countdown(n):
    while n > 0:
        yield n
        await asyncio.sleep(0.5)
        n -= 1

async def main():
    async for count in async_countdown(3):
        print(count)  # 3, 2, 1 (with delays)

asyncio.run(main())

Error Handling

code.pyPython
import asyncio

async def risky_task(name):
    await asyncio.sleep(0.5)
    if name == "bad":
        raise ValueError(f"{name} failed!")
    return f"{name} succeeded"

async def main():
    # Option 1: try/except around await
    try:
        result = await risky_task("bad")
    except ValueError as e:
        print(f"Caught: {e}")

    # Option 2: gather with return_exceptions
    results = await asyncio.gather(
        risky_task("good"),
        risky_task("bad"),
        risky_task("good2"),
        return_exceptions=True
    )

    for r in results:
        if isinstance(r, Exception):
            print(f"Failed: {r}")
        else:
            print(f"Success: {r}")

asyncio.run(main())

Timeouts and Cancellation

code.pyPython
import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "Done"

async def main():
    # Timeout using wait_for
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
    except asyncio.TimeoutError:
        print("Operation timed out!")

    # Timeout using asyncio.timeout (Python 3.11+)
    try:
        async with asyncio.timeout(2.0):
            await slow_operation()
    except asyncio.TimeoutError:
        print("Timed out!")

    # Manual cancellation
    task = asyncio.create_task(slow_operation())
    await asyncio.sleep(1)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

asyncio.run(main())

Asyncio vs Threading

AspectAsyncioThreading
ConcurrencyCooperativePreemptive
GIL ImpactNot affectedLimited by GIL
Context SwitchAt await pointsAny time
MemoryLower overheadHigher overhead
Use CaseI/O-boundI/O-bound (legacy)
code.pyPython
# Use asyncio for:
# - HTTP requests (aiohttp)
# - Database queries (asyncpg, aiomysql)
# - WebSocket connections
# - File I/O (aiofiles)

# Use threading for:
# - Legacy synchronous libraries
# - Quick scripts
# - CPU-bound with GIL-releasing code

Interview Tip

When asked about asyncio:

  1. async def creates coroutines; await suspends execution
  2. asyncio.gather() runs tasks concurrently
  3. Only I/O-bound tasks benefit; CPU-bound blocks the loop
  4. Use async with for context managers, async for for iteration
  5. Prefer TaskGroup (3.11+) for structured concurrency