8 min read
ā¢Question 36 of 41hardConcurrency Patterns
Advanced concurrent programming.
Concurrency Patterns
Producer-Consumer
code.pyPython
import asyncio
from asyncio import Queue
async def producer(queue: Queue, n: int):
for i in range(n):
await queue.put(i)
print(f"Produced: {i}")
await asyncio.sleep(0.1)
await queue.put(None) # Signal end
async def consumer(queue: Queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed: {item}")
await asyncio.sleep(0.2)
async def main():
queue = Queue()
await asyncio.gather(
producer(queue, 5),
consumer(queue)
)
asyncio.run(main())Semaphore (Rate Limiting)
code.pyPython
import asyncio
async def fetch(url: str, semaphore: asyncio.Semaphore):
async with semaphore:
print(f"Fetching {url}")
await asyncio.sleep(1)
return f"Data from {url}"
async def main():
semaphore = asyncio.Semaphore(3) # Max 3 concurrent
urls = [f"url_{i}" for i in range(10)]
tasks = [fetch(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
asyncio.run(main())Event Signaling
code.pyPython
import asyncio
async def waiter(event: asyncio.Event, name: str):
print(f"{name} waiting...")
await event.wait()
print(f"{name} proceeding!")
async def setter(event: asyncio.Event):
await asyncio.sleep(2)
print("Setting event!")
event.set()
async def main():
event = asyncio.Event()
await asyncio.gather(
waiter(event, "Task 1"),
waiter(event, "Task 2"),
setter(event)
)
asyncio.run(main())Timeout Pattern
code.pyPython
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "Done"
async def main():
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=2.0
)
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(main())