#1 Data Analytics Program in India
₹2,499₹1,499Enroll Now
8 min read
•Question 36 of 41hard

Concurrency Patterns

Advanced concurrent programming.

Concurrency Patterns

Producer-Consumer

code.pyPython
import asyncio
from asyncio import Queue

async def producer(queue: Queue, n: int):
    for i in range(n):
        await queue.put(i)
        print(f"Produced: {i}")
        await asyncio.sleep(0.1)
    await queue.put(None)  # Signal end

async def consumer(queue: Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Consumed: {item}")
        await asyncio.sleep(0.2)

async def main():
    queue = Queue()
    await asyncio.gather(
        producer(queue, 5),
        consumer(queue)
    )

asyncio.run(main())

Semaphore (Rate Limiting)

code.pyPython
import asyncio

async def fetch(url: str, semaphore: asyncio.Semaphore):
    async with semaphore:
        print(f"Fetching {url}")
        await asyncio.sleep(1)
        return f"Data from {url}"

async def main():
    semaphore = asyncio.Semaphore(3)  # Max 3 concurrent

    urls = [f"url_{i}" for i in range(10)]
    tasks = [fetch(url, semaphore) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

asyncio.run(main())

Event Signaling

code.pyPython
import asyncio

async def waiter(event: asyncio.Event, name: str):
    print(f"{name} waiting...")
    await event.wait()
    print(f"{name} proceeding!")

async def setter(event: asyncio.Event):
    await asyncio.sleep(2)
    print("Setting event!")
    event.set()

async def main():
    event = asyncio.Event()
    await asyncio.gather(
        waiter(event, "Task 1"),
        waiter(event, "Task 2"),
        setter(event)
    )

asyncio.run(main())

Timeout Pattern

code.pyPython
import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "Done"

async def main():
    try:
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=2.0
        )
    except asyncio.TimeoutError:
        print("Operation timed out!")

asyncio.run(main())