Параллельное и конкурентное программирование в Python это способы выполнения нескольких задач одновременно или по очереди в рамках одной программы. Три основных подхода: threading (потоки), multiprocessing (процессы), asyncio (асинхронный ввод-вывод). Выбор неправильного подхода делает код медленнее, не быстрее.
Ключевой факт: все три подхода решают разные задачи. Threading и asyncio дают concurrency (конкурентность), но не parallelism (параллельность) из-за GIL. Multiprocessing даёт настоящий параллелизм. Python 3.13+ добавил экспериментальный free-threaded build без GIL, что меняет всю картину.
Бенчмарки: CPU-bound задача 54.6 секунды синхронно, 6.2 секунды с multiprocessing (9x ускорение). asyncio самый быстрый для I/O-bound, самый медленный для CPU-bound. Free-threaded Python 3.14: 3.1x ускорение на 4 потоках для CPU-bound.
В статье: что такое GIL и почему это всё важно, когда использовать каждый подход, полные примеры кода для asyncio / threading / multiprocessing / concurrent.futures, гибридные подходы, что изменилось в Python 3.13–3.14.
1. Concurrency vs Parallelism: в чём разница
Большинство путаются с этими терминами. Это разные вещи, и путаница приводит к неправильному выбору подхода.
Concurrency (конкурентность) это когда задачи выполняются перекрывающимися временными отрезками. Одновременно может выполняться только одна. Они переключаются: одна начала, потом пауза, вторая начала, потом пауза, первая продолжила. Для пользователя похоже на параллельность, но на самом деле задачи чередуются.
Parallelism (параллельность) это когда задачи выполняются действительно одновременно на разных CPU ядрах.
| Характеристика | Concurrency | Parallelism |
|---|---|---|
| Количество ядер CPU | 1 достаточно | Нужны несколько ядер |
| Одновременное выполнение | Нет, чередование | Да, одновременно |
| Применимо в Python | threading, asyncio | multiprocessing |
| Подходит для | I/O-bound задачи | CPU-bound задачи |
Аналогия:
Concurrency — один повар готовит 5 блюд. Поставил воду кипятиться, пока ждёт — нарезает овощи. Потом переключается к чему-то другому. Один человек, задачи перемешиваются.
Parallelism — пять поваров, каждый готовит своё блюдо одновременно. Пять ядер CPU, каждое выполняет свою задачу.
2. GIL: почему это всё работает именно так
GIL (Global Interpreter Lock) это мьютекс в CPython, который гарантирует что только один поток может выполнять Python bytecode в любой момент времени. Даже если у вас 16 ядер CPU и 16 потоков.
Почему GIL существует
GIL введён Гидо ван Россумом в 1992 году. Причины:
Memory management: Python использует reference counting для управления памятью. Без GIL несколько потоков могут одновременно менять счётчики ссылок, что приводит к race conditions и утечкам памяти
C extensions: огромное количество библиотек написаны на C (NumPy, pandas). GIL упрощает их интеграцию, так как не нужно продумывать thread safety для каждой
Single-thread performance: без GIL каждая операция должна быть атомарной, что замедляет однопоточный код
Что GIL значит для практики
| Сценарий | Влияние GIL | Решение |
|---|---|---|
| Один поток | Ничего, GIL не влияет | Не нужно ничего делать |
| Многопоточный, I/O-bound | Минимальное. Потоки освобождают GIL во время I/O ожидания | threading или asyncio работают |
| Многопоточный, CPU-bound | Критическое. Потоки борются за GIL, по одному | multiprocessing или free-threaded Python 3.13+ |
| C extensions (NumPy, PyTorch) | Почти нет. Эти библиотеки освобождают GIL внутри себя | Обычно работает из коробки |
Частая ошибка: люди запускают CPU-bound задачу в нескольких потоках, думая что это даст ускорение. С GIL потоки выполняются по очереди. Результат — то же время, плюс overhead на context switching. Итог: код стал медленнее.
3. I/O-bound vs CPU-bound: определяете тип задачи
Всё зависит от того, что ограничивает вашу задачу.
I/O-bound (ограничены ввод-выводом)
Задача тормозит потому что ждёт внешних ресурсов. CPU при этом почти не загружен.
Примеры:
HTTP запросы к API
Чтение/запись файлов
Запросы к базе данных
Парсинг веб-страниц
Чтение из сети (sockets)
CPU-bound (ограничены процессором)
Задача тормозит потому что выполняет тяжёлые вычисления. CPU загружен на 100%.
Примеры:
Матричные вычисления (если без NumPy)
Генерация простых чисел
Обработка изображений (если без C-библиотек)
Архивация файлов
Cryptography, hashing
Machine learning (если без PyTorch/TensorFlow)
Как определить тип задачи
import time
import psutil # pip install psutil
def measure_task(func):
"""Определяет тип задачи: I/O-bound или CPU-bound"""
start_cpu = psutil.cpu_percent(interval=0.1)
start_time = time.perf_counter()
func()
elapsed = time.perf_counter() - start_time
avg_cpu = psutil.cpu_percent(interval=0.1)
print(f"Время: {elapsed:.2f}s")
print(f"CPU загрузка: {avg_cpu}%")
if avg_cpu < 30:
print("→ Похоже на I/O-bound задачу")
else:
print("→ Похоже на CPU-bound задачу")
4. Threading: потоки для I/O-bound задач
Что такое threading
Несколько потоков в одном процессе. Делят одну память. OS управляет переключением потоков (preemptive multitasking). Потоки автоматически переключаются когда один ждёт I/O.
Когда использовать threading:
I/O-bound задачи: downloading файлов, HTTP запросы через blocking библиотеки (requests), чтение файлов
Существующие библиотеки не поддерживают asyncio (запрашивающие blocking API)
Background tasks: мониторинг, periodic jobs
GUI приложения: UI thread + worker thread
Когда НЕ использовать threading:
CPU-bound задачи (GIL блокирует параллельное выполнение)
Когда можно использовать asyncio (asyncio эффективнее для большого числа I/O задач)
Базовый пример: threading
import threading
import time
import requests
def download_file(url: str, filename: str):
"""Скачивает файл по URL"""
print(f"[{filename}] Начало загрузки...")
response = requests.get(url)
with open(filename, 'wb') as f:
f.write(response.content)
print(f"[{filename}] Загрузка завершена: {len(response.content)} байт")
def main():
urls = [
("https://example.com/file1.zip", "file1.zip"),
("https://example.com/file2.zip", "file2.zip"),
("https://example.com/file3.zip", "file3.zip"),
]
start = time.perf_counter()
threads = []
for url, filename in urls:
t = threading.Thread(target=download_file, args=(url, filename))
threads.append(t)
t.start()
for t in threads:
t.join() # Ждём завершения всех потоков
elapsed = time.perf_counter() - start
print(f"Все файлы скачаны за {elapsed:.2f}s")
# Без threading: 3 файла × ~3s = ~9s
# С threading: ~3s (все параллельно)
Thread safety: Race conditions и Lock
Потоки делят память. Если два потока одновременно изменяют одну переменную — race condition.
import threading
class Counter:
def __init__(self):
self.value = 0
self.lock = threading.Lock() # Мьютекс для thread safety
def increment(self):
with self.lock: # Только один поток в этом блоке
self.value += 1
counter = Counter()
threads = []
for _ in range(1000):
t = threading.Thread(target=counter.increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Counter: {counter.value}") # Всегда 1000, даже с 1000 потоками
Без Lock: counter.value может оказаться меньше 1000. Два потока одновременно читают значение, оба добавляют 1, оба записывают результат. Одно +=1 теряется. Это race condition.
5. Asyncio: для высокой конкурентности I/O
Что такое asyncio
Один поток, event loop управляет задачами. Когда задача ждёт I/O, она добровольно передаёт управление event loop (cooperative multitasking). Event loop запускает следующую задачу. Так обрабатываются тысячи одновременных I/O операций в одном потоке.
Ключевые понятия:
Coroutine (корутина): функция объявленная с
async def. Может быть остановлена и возобновленаEvent loop: цикл событий, который запускает корутины и переключается между ними
await: точка где корутина говорит "я жду, можете запустить другую"
Task: обёртка над корутиной для concurrent выполнения
Когда использовать asyncio:
Много одновременных I/O операций: парсинг, API запросы, thousand+ HTTP запросов
Веб серверы (FastAPI, aiohttp)
Real-time приложения: чаты, WebSockets
Database запросы через async drivers (asyncpg, aiosqlite)
Когда НЕ использовать asyncio:
CPU-bound задачи (event loop блокируется, всё встаёт)
Когда существующие библиотеки не поддерживают async (используйте threading)
Simple одноразовые задачи где overhead event loop не оправдан
Базовый пример: asyncio
import asyncio
import aiohttp # pip install aiohttp
import time
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
"""Скачивает URL асинхронно"""
async with session.get(url) as response:
content = await response.text()
return {"url": url, "length": len(content), "status": response.status}
async def main():
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
"https://example.com/page4",
"https://example.com/page5",
]
start = time.perf_counter()
async with aiohttp.ClientSession() as session:
# asyncio.gather запускает все задачи "параллельно" (конкурентно)
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
for r in results:
print(f"{r['url']}: {r['length']} chars, status {r['status']}")
print(f"Все запросы за {elapsed:.2f}s")
# Без asyncio: 5 запросов × ~1s = ~5s
# С asyncio: ~1s (все ждут одновременно)
asyncio.run(main())
Частые ошибки с asyncio
Ошибка 1: blocking код в async функции
async def bad_example():
# ❌ НЕЛЬЗЯ: requests.get блокирует event loop
# Пока waiting на network, НИКАКИЕ другие задачи не выполняются
response = requests.get("https://example.com")
return response.text
async def good_example():
# ✅ ПРАВИЛЬНО: aiohttp не блокирует event loop
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com") as resp:
return await resp.text()
Ошибка 2: забыли await
async def fetch():
return "data"
async def bad():
result = fetch() # ❌ Это coroutine объект, не результат!
print(result) # Выведет: <coroutine object fetch at 0x...>
async def good():
result = await fetch() # ✅ await запускает корутину и ждёт результат
print(result) # Выведет: data
Ошибка 3: CPU-bound задача в async функции
async def bad_cpu_task():
# ❌ Тяжёлое вычисление блокирует event loop
# Пока считается, другие задачи НЕ работают
result = 0
for i in range(10_000_000):
result += i ** 2
return result
async def good_cpu_task():
# ✅ Отдаём CPU-bound в executor (thread pool или process pool)
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as executor:
result = await loop.run_in_executor(executor, heavy_computation)
return result
Пример: парсинг 1000 URL с ограничением concurrency
import asyncio
import aiohttp
async def fetch(session, url, semaphore):
async with semaphore: # Максимум 50 одновременных запросов
async with session.get(url) as resp:
return await resp.text()
async def scrape_all(urls: list[str]):
semaphore = asyncio.Semaphore(50) # Лимит concurrency
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Отделяем успешные от ошибок
successes = [r for r in results if not isinstance(r, Exception)]
errors = [r for r in results if isinstance(r, Exception)]
print(f"Успешно: {len(successes)}, Ошибки: {len(errors)}")
return successes
6. Multiprocessing: настоящий параллелизм
Что такое multiprocessing
Несколько процессов, каждый со своим интерпретатором Python и своей памятью. GIL не влияет — каждый процесс имеет свой GIL. Задачи выполняются действительно параллельно на разных CPU ядрах.
Когда использовать multiprocessing:
CPU-bound задачи: вычисления,image processing, cryptography
Machine learning: обучение моделей
Data processing: обработка больших массивов данных
Когда нужно изолировать процессы (сбой одного не ломает другие)
Когда НЕ использовать multiprocessing:
I/O-bound задачи (overhead процессов > benefit)
Задачи требующие частого обмена данными между процессами
Маленькие задачи (overhead запуска процесса > время выполнения)
Базовый пример: multiprocessing
import multiprocessing
import time
def compute_primes(start: int, end: int) -> list[int]:
"""Находит простые числа в диапазоне"""
primes = []
for num in range(start, end):
if is_prime(num):
primes.append(num)
return primes
def is_prime(n: int) -> bool:
if n < 2:
return False
for i in range(2, int(n ** 0.5) + 1):
if n % i == 0:
return False
return True
def main():
# Делим работу на 4 части (по количеству ядер)
num_workers = multiprocessing.cpu_count()
chunk_size = 1_000_000 // num_workers
start = time.perf_counter()
with multiprocessing.Pool(num_workers) as pool:
ranges = [
(i * chunk_size, (i + 1) * chunk_size)
for i in range(num_workers)
]
results = pool.starmap(compute_primes, ranges)
# Собираем результаты из всех процессов
all_primes = [p for chunk in results for p in chunk]
elapsed = time.perf_counter() - start
print(f"Найдено {len(all_primes)} простых чисел за {elapsed:.2f}s")
if __name__ == '__main__': # Обязательно! На Windows без этого crash
main()
Передача данных между процессами
Процессы не делят память. Для обмена данных есть специальные механизмы:
import multiprocessing
from multiprocessing import Queue, Value, Array
# 1. Queue — для передачи объектов между процессами
def producer(queue: Queue):
for i in range(5):
queue.put(f"item_{i}")
queue.put(None) # Сигнал завершения
def consumer(queue: Queue):
while True:
item = queue.get()
if item is None:
break
print(f"Обработали: {item}")
# 2. Value — для одиночных значений (shared memory)
def increment(counter: Value):
with counter.get_lock():
counter.value += 1
# 3. Array — для массивов (shared memory)
shared_array = Array('i', [0] * 10) # массив из 10 int
Ограничения multiprocessing:
Объекты между процессами должны быть picklable (сериализуемы). Lambda, generator, local functions не работают
На Windows процесс запускается через
spawn. Код должен быть подif __name__ == '__main__'Запуск процесса тяжелее запуска потока (~50-100ms vs ~1ms)
7. concurrent.futures: высокий уровень
concurrent.futures это модуль стандартной библиотеки, который даёт простой интерфейс для threading и multiprocessing. В большинстве случаев используйте его вместо threading и multiprocessing напрямую.
ThreadPoolExecutor: потоки через pool
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
def fetch_url(url: str) -> dict:
response = requests.get(url)
return {"url": url, "status": response.status_code, "length": len(response.text)}
urls = [f"https://example.com/page{i}" for i in range(20)]
start = time.perf_counter()
# max_workers=10: максимум 10 потоков одновременно
with ThreadPoolExecutor(max_workers=10) as executor:
# Отправляем задачи
future_to_url = {executor.submit(fetch_url, url): url for url in urls}
# Обрабатываем результаты по мере завершения
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
print(f"✓ {result['url']}: {result['status']}")
except Exception as e:
print(f"✗ {url}: {e}")
elapsed = time.perf_counter() - start
print(f"Всего за {elapsed:.2f}s")
ProcessPoolExecutor: процессы через pool
from concurrent.futures import ProcessPoolExecutor
import time
from multiprocessing import cpu_count
def heavy_computation(n: int) -> int:
"""Тяжёлое вычисление"""
result = 0
for i in range(n):
result += i ** 2
return result
def main():
tasks = [1_000_000] * 8 # 8 одинаковых задач
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
# map(): простой способ для одинаковых задач
results = list(executor.map(heavy_computation, tasks))
elapsed = time.perf_counter() - start
print(f"Результаты: {results}")
print(f"Время: {elapsed:.2f}s")
# Синхронно: ~54s
# С ProcessPoolExecutor (8 ядер): ~6s (9x ускорение)
if __name__ == '__main__':
main()
Когда использовать concurrent.futures vs threading/multiprocessing напрямую
| Ситуация | Что использовать |
|---|---|
| Простые I/O задачи с пулом потоков | ThreadPoolExecutor |
| Простые CPU-bound задачи | ProcessPoolExecutor |
| Нужна Fine-grained контроль над потоками (Lock, Event, Condition) | threading напрямую |
| Нужна shared memory между процессами (Value, Array) | multiprocessing напрямую |
| Producer-consumer с Queue | multiprocessing.Queue |
| CPU-bound + нужно запустить из asyncio | loop.run_in_executor(ProcessPoolExecutor) |
8. Гибридный подход: asyncio + multiprocessing
Реальные приложения часто комбинируют I/O-bound и CPU-bound задачи. Например: скачивать данные по сети (I/O), потом обрабатывать их (CPU). Asyncio + multiprocessing вместе дают максимальную производительность.
Паттерн: async I/O + CPU-bound в процессах
import asyncio
from concurrent.futures import ProcessPoolExecutor
import aiohttp
import json
def process_data(data: dict) -> dict:
"""CPU-bound обработка (выполняется в отдельном процессе)"""
# Тяжёлые вычисления над данными
result = sum(x ** 2 for x in data.get("values", []))
return {"id": data["id"], "result": result}
async def fetch_and_process(session, url, executor, loop):
# Шаг 1: Async I/O — скачиваем данные (не блокирует event loop)
async with session.get(url) as resp:
data = await resp.json()
# Шаг 2: CPU-bound — отдаём в ProcessPoolExecutor
# await позволяет event loop работать пока считается
result = await loop.run_in_executor(executor, process_data, data)
return result
async def main():
urls = [f"https://api.example.com/data/{i}" for i in range(100)]
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as executor:
async with aiohttp.ClientSession() as session:
tasks = [
fetch_and_process(session, url, executor, loop)
for url in urls
]
# Все 100 задач: I/O параллельно, CPU параллельно
results = await asyncio.gather(*tasks)
print(f"Обработано {len(results)} задач")
asyncio.run(main())
Что происходит:
100 HTTP запросов летят параллельно через asyncio (event loop)
Когда данные приходят, CPU-обработка идёт в отдельных процессах через ProcessPoolExecutor
Event loop не блокируется ни на I/O, ни на вычисления
Результат: максимальная скорость и для I/O, и для CPU
9. Python 3.13–3.14: free-threaded build без GIL
Что изменилось
PEP 703 одобрен июль 2023. Python 3.13 (октябрь 2024) добавил экспериментальный free-threaded build где GIL отключён. Это самое крупное изменение внутренней архитектуры Python за десятки лет.
Что это значит на практике
| Интерпретатор | Single-thread скорость | Multi-thread (4 потока) CPU-bound |
|---|---|---|
| Python 3.12 (GIL) | 100% (baseline) | 1.0x (GIL блокирует) |
| Python 3.13 (GIL) | ~98% | 1.0x |
| Python 3.13t (free-threaded) | ~60% (overhead) | 2.2x |
| Python 3.14t (free-threaded) | ~91% (overhead снижен) | 3.1x |
Реальный бенчмарк (CPU-bound, подсчёт простых чисел):
# Python 3.13 standard (с GIL):
# Single-threaded: 7.51 секунд
# Multi-threaded (5 потоков): 7.07 секунд ← почти то же самое (GIL)
# Python 3.13t free-threaded (без GIL):
# Single-threaded: 1.86 секунд
# Multi-threaded (5 потоков): 0.39 секунд ← 4.8x быстрее!
Как подключить free-threaded build
# Инсталл (macOS/Windows, Python 3.13+)
# При установке с python.org выбрать "Download free-threaded binaries"
# Или через source:
# ./configure --disable-gil
# make
# make install
# Запуск:
# python3.13t main.py ← free-threaded build
# Проверить что GIL отключён:
import sys
print(sys.version) # содержит "experimental free-threading build"
print(sys._is_gil_enabled()) # False = GIL отключён
Когда использовать free-threaded build в 2025:
CPU-bound multi-threaded задачи где multiprocessing слишком тяжёл
AI/ML pipelines: PyTorch DataLoader, feature preprocessing
Эксперименты и бенчмарки
Когда НЕ использовать пока:
Production приложения (ещё экспериментально)
Когда key библиотеки не поддержали free-threading (Django, pandas частично)
I/O-bound или single-threaded код (overhead без пользы)
10. Сравнительная таблица: итоговый выбор
| Критерий | asyncio | threading | multiprocessing |
|---|---|---|---|
Тип задачи | I/O-bound | I/O-bound | CPU-bound |
Модель | Cooperative (задача сама отдаёт контроль) | Preemptive (OS переключает) | Полная изоляция процессов |
Настоящий параллелизм | Нет (один поток) | Нет (GIL) | Да |
Общая память | Да (один поток) | Да (shared memory) | Нет (нужны Queue/Pipe/Value) |
Overhead запуска | Минимальный | Малый (~1ms) | Большой (~50-100ms) |
Скоординированность | Тысячи задач легко | Десятки потоков (больше = тяжелее) | Десятки процессов (= кол-во ядер) |
Сложность | Средняя (async/await + правильные библиотеки) | Средняя (locks, race conditions) | Высокая (serialization, IPC) |
Когда использовать | Web scraping, API calls, real-time apps | Blocking libraries, background tasks, GUI | Тяжёлые вычисления, image processing, ML |
Правило Python community для I/O-bound задач: «Use asyncio when you can, threading when you must.» Asyncio даёт лучшее ускорение для I/O, но требует async-friendly библиотек (aiohttp вместо requests, asyncpg вместо psycopg2).
11. Чек-лист: как выбрать подход
Шаг 1: определите тип задачи
☐ Задача ждёт сетевой ответ, чтения файла, DB запроса? → I/O-bound
☐ Задача считает, обрабатывает данные, CPU загружен на 100%? → CPU-bound
Шаг 2: I/O-bound — выбираете между asyncio и threading
☐ Есть async-ready библиотеки для всех I/O операций (aiohttp, asyncpg, aiofiles)? → asyncio
☐ Нужно использовать blocking библиотеки (requests, psycopg2)? → ThreadPoolExecutor
☐ Количество одновременных задач > 100? → asyncio (потоки слишком тяжелее)
☐ Background задача (monitoring, periodic job)? → threading.Thread
☐ GUI приложение? → threading.Thread (UI thread + worker)
Шаг 3: CPU-bound — выбираете между multiprocessing и free-threading
☐ Нужны отделённые процессы? → ProcessPoolExecutor
☐ Нужно запустить CPU-bound из async? → loop.run_in_executor(ProcessPoolExecutor)
☐ Python 3.13+ и задача подходит? → попробуйте free-threaded build
☐ Тяжёлые вычисления через NumPy/PyTorch? → они уже освобождают GIL внутри себя, threading может работать
Шаг 4: проверка
☐ Запустили — замерли время (perf_counter)
☐ Сравнили с синхронной версией
☐ Если не быстрее, выбрали неправильный подход
☐ Проверили что объекты передаются между процессами корректно (pickling)
☐ Протестировали edge cases (exceptions, timeouts, empty data)
Красные флаги
☐ Использовали threading для CPU-bound (GIL блокирует, нет ускорения)
☐ Использовали asyncio для CPU-bound (event loop блокируется)
☐ Использовали multiprocessing для простой I/O задачи (overhead > benefit)
☐ Забыли
if __name__ == '__main__'при multiprocessing на Windows☐ Пытаетесь передать non-picklable объект между процессами
☐ Создаёте тысячи потоков (потоки тяжелее, используйте ThreadPoolExecutor или asyncio)
☐ Не используете Lock для shared state в threading
А лучшие вакансии для разработчиков ищите на hirehi.ru