8 min read
ā¢Question 27 of 41mediumMultithreading in Python
Concurrent execution with threads.
What You'll Learn
- Creating and managing threads
- Thread synchronization with locks
- Thread pools with concurrent.futures
- Understanding the GIL
- When to use threading vs multiprocessing
Basic Threading
code.pyPython
import threading
import time
def task(name, delay):
print(f"{name} starting on {threading.current_thread().name}")
time.sleep(delay)
print(f"{name} finished")
return f"{name} result"
# Create threads
t1 = threading.Thread(target=task, args=("Task 1", 2), name="Worker-1")
t2 = threading.Thread(target=task, args=("Task 2", 1), name="Worker-2")
# Start threads
t1.start()
t2.start()
# Check if running
print(f"t1 alive: {t1.is_alive()}") # True
# Wait for completion
t1.join() # Block until t1 finishes
t2.join()
print("All tasks completed")Thread Safety and Race Conditions
Without synchronization, shared data can become corrupted:
code.pyPython
import threading
# UNSAFE - Race condition
counter = 0
def increment_unsafe():
global counter
for _ in range(100000):
counter += 1 # Not atomic!
threads = [threading.Thread(target=increment_unsafe) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # Less than 500000! (race condition)
# SAFE - With lock
counter = 0
lock = threading.Lock()
def increment_safe():
global counter
for _ in range(100000):
with lock: # Acquire and release automatically
counter += 1
threads = [threading.Thread(target=increment_safe) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # Exactly 500000Synchronization Primitives
code.pyPython
import threading
# Lock - Basic mutual exclusion
lock = threading.Lock()
with lock:
# Critical section
pass
# RLock - Reentrant lock (can acquire multiple times)
rlock = threading.RLock()
with rlock:
with rlock: # OK - same thread can reacquire
pass
# Semaphore - Limit concurrent access
semaphore = threading.Semaphore(3) # Max 3 threads at once
with semaphore:
# Up to 3 threads can be here simultaneously
pass
# Event - Thread signaling
event = threading.Event()
def waiter():
print("Waiting for event...")
event.wait() # Block until set
print("Event received!")
def setter():
time.sleep(2)
event.set() # Signal all waiters
threading.Thread(target=waiter).start()
threading.Thread(target=setter).start()
# Condition - Complex synchronization
condition = threading.Condition()
def consumer():
with condition:
condition.wait() # Wait for notification
print("Consumed!")
def producer():
with condition:
# Produce something
condition.notify_all() # Wake up waitersThread Pools with concurrent.futures
code.pyPython
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def download(url):
time.sleep(1) # Simulate I/O
return f"Downloaded {url}"
urls = [f"https://example.com/{i}" for i in range(5)]
# Using map (ordered results)
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(download, urls))
print(results)
# Using submit (get futures)
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(download, url) for url in urls]
# Process as completed (unordered)
for future in as_completed(futures):
result = future.result()
print(result)
# With timeout
with ThreadPoolExecutor() as executor:
future = executor.submit(download, "https://slow.com")
try:
result = future.result(timeout=0.5)
except TimeoutError:
print("Task timed out!")The GIL (Global Interpreter Lock)
The GIL prevents multiple threads from executing Python bytecode simultaneously:
code.pyPython
import threading
import time
# CPU-bound task - GIL is a bottleneck
def cpu_bound(n):
return sum(i * i for i in range(n))
# I/O-bound task - GIL is released during I/O
def io_bound(seconds):
time.sleep(seconds) # GIL released
return "done"
# For I/O-bound: threading is effective
# For CPU-bound: use multiprocessing insteadThreading vs Multiprocessing
| Aspect | Threading | Multiprocessing |
|---|---|---|
| Memory | Shared | Separate |
| GIL | Affected | Not affected |
| Overhead | Low | Higher |
| Best for | I/O-bound | CPU-bound |
| Communication | Direct | IPC (Queue, Pipe) |
code.pyPython
from multiprocessing import Pool, cpu_count
def cpu_task(n):
return sum(i * i for i in range(n))
# Use multiprocessing for CPU-bound
if __name__ == "__main__":
with Pool(cpu_count()) as pool:
results = pool.map(cpu_task, [10**6] * 4)
print(results)Thread-Local Data
code.pyPython
import threading
# Thread-local storage
local_data = threading.local()
def worker(value):
local_data.x = value # Each thread has its own x
print(f"Thread {threading.current_thread().name}: x = {local_data.x}")
threads = [
threading.Thread(target=worker, args=(i,), name=f"Worker-{i}")
for i in range(3)
]
for t in threads:
t.start()
for t in threads:
t.join()Interview Tip
When asked about multithreading:
- GIL limits CPU parallelism; threads run one at a time
- Threading is good for I/O-bound tasks (network, file)
- Use multiprocessing for CPU-bound tasks
- Always protect shared data with locks
- ThreadPoolExecutor is the modern, high-level API