Параллельное программирование в Python: asyncio, threading, multiprocessing — когда использовать каждый подход

Параллельное программирование в Python: asyncio, threading, multiprocessing — когда использовать каждый подход

Параллельное и конкурентное программирование в Python это способы выполнения нескольких задач одновременно или по очереди в рамках одной программы. Три основных подхода: threading (потоки), multiprocessing (процессы), asyncio (асинхронный ввод-вывод). Выбор неправильного подхода делает код медленнее, не быстрее.

Ключевой факт: все три подхода решают разные задачи. Threading и asyncio дают concurrency (конкурентность), но не parallelism (параллельность) из-за GIL. Multiprocessing даёт настоящий параллелизм. Python 3.13+ добавил экспериментальный free-threaded build без GIL, что меняет всю картину.

Бенчмарки: CPU-bound задача 54.6 секунды синхронно, 6.2 секунды с multiprocessing (9x ускорение). asyncio самый быстрый для I/O-bound, самый медленный для CPU-bound. Free-threaded Python 3.14: 3.1x ускорение на 4 потоках для CPU-bound.

В статье: что такое GIL и почему это всё важно, когда использовать каждый подход, полные примеры кода для asyncio / threading / multiprocessing / concurrent.futures, гибридные подходы, что изменилось в Python 3.13–3.14.

1. Concurrency vs Parallelism: в чём разница

Большинство путаются с этими терминами. Это разные вещи, и путаница приводит к неправильному выбору подхода.

Concurrency (конкурентность) это когда задачи выполняются перекрывающимися временными отрезками. Одновременно может выполняться только одна. Они переключаются: одна начала, потом пауза, вторая начала, потом пауза, первая продолжила. Для пользователя похоже на параллельность, но на самом деле задачи чередуются.

Parallelism (параллельность) это когда задачи выполняются действительно одновременно на разных CPU ядрах.

ХарактеристикаConcurrencyParallelism
Количество ядер CPU1 достаточноНужны несколько ядер
Одновременное выполнениеНет, чередованиеДа, одновременно
Применимо в Pythonthreading, asynciomultiprocessing
Подходит дляI/O-bound задачиCPU-bound задачи

Аналогия:

Concurrency — один повар готовит 5 блюд. Поставил воду кипятиться, пока ждёт — нарезает овощи. Потом переключается к чему-то другому. Один человек, задачи перемешиваются.

Parallelism — пять поваров, каждый готовит своё блюдо одновременно. Пять ядер CPU, каждое выполняет свою задачу.

2. GIL: почему это всё работает именно так

GIL (Global Interpreter Lock) это мьютекс в CPython, который гарантирует что только один поток может выполнять Python bytecode в любой момент времени. Даже если у вас 16 ядер CPU и 16 потоков.

Почему GIL существует

GIL введён Гидо ван Россумом в 1992 году. Причины:

  • Memory management: Python использует reference counting для управления памятью. Без GIL несколько потоков могут одновременно менять счётчики ссылок, что приводит к race conditions и утечкам памяти

  • C extensions: огромное количество библиотек написаны на C (NumPy, pandas). GIL упрощает их интеграцию, так как не нужно продумывать thread safety для каждой

  • Single-thread performance: без GIL каждая операция должна быть атомарной, что замедляет однопоточный код

Что GIL значит для практики

СценарийВлияние GILРешение
Один потокНичего, GIL не влияетНе нужно ничего делать
Многопоточный, I/O-boundМинимальное. Потоки освобождают GIL во время I/O ожиданияthreading или asyncio работают
Многопоточный, CPU-boundКритическое. Потоки борются за GIL, по одномуmultiprocessing или free-threaded Python 3.13+
C extensions (NumPy, PyTorch)Почти нет. Эти библиотеки освобождают GIL внутри себяОбычно работает из коробки

Частая ошибка: люди запускают CPU-bound задачу в нескольких потоках, думая что это даст ускорение. С GIL потоки выполняются по очереди. Результат — то же время, плюс overhead на context switching. Итог: код стал медленнее.

3. I/O-bound vs CPU-bound: определяете тип задачи

Всё зависит от того, что ограничивает вашу задачу.

I/O-bound (ограничены ввод-выводом)

Задача тормозит потому что ждёт внешних ресурсов. CPU при этом почти не загружен.

Примеры:

  • HTTP запросы к API

  • Чтение/запись файлов

  • Запросы к базе данных

  • Парсинг веб-страниц

  • Чтение из сети (sockets)

CPU-bound (ограничены процессором)

Задача тормозит потому что выполняет тяжёлые вычисления. CPU загружен на 100%.

Примеры:

  • Матричные вычисления (если без NumPy)

  • Генерация простых чисел

  • Обработка изображений (если без C-библиотек)

  • Архивация файлов

  • Cryptography, hashing

  • Machine learning (если без PyTorch/TensorFlow)

Как определить тип задачи

import time
import psutil  # pip install psutil

def measure_task(func):
    """Определяет тип задачи: I/O-bound или CPU-bound"""
    start_cpu = psutil.cpu_percent(interval=0.1)
    start_time = time.perf_counter()

    func()

    elapsed = time.perf_counter() - start_time
    avg_cpu = psutil.cpu_percent(interval=0.1)

    print(f"Время: {elapsed:.2f}s")
    print(f"CPU загрузка: {avg_cpu}%")

    if avg_cpu < 30:
        print("→ Похоже на I/O-bound задачу")
    else:
        print("→ Похоже на CPU-bound задачу")

4. Threading: потоки для I/O-bound задач

Что такое threading

Несколько потоков в одном процессе. Делят одну память. OS управляет переключением потоков (preemptive multitasking). Потоки автоматически переключаются когда один ждёт I/O.

Когда использовать threading:

  • I/O-bound задачи: downloading файлов, HTTP запросы через blocking библиотеки (requests), чтение файлов

  • Существующие библиотеки не поддерживают asyncio (запрашивающие blocking API)

  • Background tasks: мониторинг, periodic jobs

  • GUI приложения: UI thread + worker thread

Когда НЕ использовать threading:

  • CPU-bound задачи (GIL блокирует параллельное выполнение)

  • Когда можно использовать asyncio (asyncio эффективнее для большого числа I/O задач)

Базовый пример: threading

import threading
import time
import requests

def download_file(url: str, filename: str):
    """Скачивает файл по URL"""
    print(f"[{filename}] Начало загрузки...")
    response = requests.get(url)
    with open(filename, 'wb') as f:
        f.write(response.content)
    print(f"[{filename}] Загрузка завершена: {len(response.content)} байт")

def main():
    urls = [
        ("https://example.com/file1.zip", "file1.zip"),
        ("https://example.com/file2.zip", "file2.zip"),
        ("https://example.com/file3.zip", "file3.zip"),
    ]

    start = time.perf_counter()
    threads = []

    for url, filename in urls:
        t = threading.Thread(target=download_file, args=(url, filename))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()  # Ждём завершения всех потоков

    elapsed = time.perf_counter() - start
    print(f"Все файлы скачаны за {elapsed:.2f}s")
    # Без threading: 3 файла × ~3s = ~9s
    # С threading: ~3s (все параллельно)

Thread safety: Race conditions и Lock

Потоки делят память. Если два потока одновременно изменяют одну переменную — race condition.

import threading

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()  # Мьютекс для thread safety

    def increment(self):
        with self.lock:  # Только один поток в этом блоке
            self.value += 1

counter = Counter()
threads = []

for _ in range(1000):
    t = threading.Thread(target=counter.increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Counter: {counter.value}")  # Всегда 1000, даже с 1000 потоками

Без Lock: counter.value может оказаться меньше 1000. Два потока одновременно читают значение, оба добавляют 1, оба записывают результат. Одно +=1 теряется. Это race condition.

5. Asyncio: для высокой конкурентности I/O

Что такое asyncio

Один поток, event loop управляет задачами. Когда задача ждёт I/O, она добровольно передаёт управление event loop (cooperative multitasking). Event loop запускает следующую задачу. Так обрабатываются тысячи одновременных I/O операций в одном потоке.

Ключевые понятия:

  • Coroutine (корутина): функция объявленная с async def. Может быть остановлена и возобновлена

  • Event loop: цикл событий, который запускает корутины и переключается между ними

  • await: точка где корутина говорит "я жду, можете запустить другую"

  • Task: обёртка над корутиной для concurrent выполнения

Когда использовать asyncio:

  • Много одновременных I/O операций: парсинг, API запросы, thousand+ HTTP запросов

  • Веб серверы (FastAPI, aiohttp)

  • Real-time приложения: чаты, WebSockets

  • Database запросы через async drivers (asyncpg, aiosqlite)

Когда НЕ использовать asyncio:

  • CPU-bound задачи (event loop блокируется, всё встаёт)

  • Когда существующие библиотеки не поддерживают async (используйте threading)

  • Simple одноразовые задачи где overhead event loop не оправдан

Базовый пример: asyncio

import asyncio
import aiohttp  # pip install aiohttp
import time

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    """Скачивает URL асинхронно"""
    async with session.get(url) as response:
        content = await response.text()
        return {"url": url, "length": len(content), "status": response.status}

async def main():
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3",
        "https://example.com/page4",
        "https://example.com/page5",
    ]

    start = time.perf_counter()

    async with aiohttp.ClientSession() as session:
        # asyncio.gather запускает все задачи "параллельно" (конкурентно)
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    elapsed = time.perf_counter() - start

    for r in results:
        print(f"{r['url']}: {r['length']} chars, status {r['status']}")
    print(f"Все запросы за {elapsed:.2f}s")
    # Без asyncio: 5 запросов × ~1s = ~5s
    # С asyncio: ~1s (все ждут одновременно)

asyncio.run(main())

Частые ошибки с asyncio

Ошибка 1: blocking код в async функции

async def bad_example():
    # ❌ НЕЛЬЗЯ: requests.get блокирует event loop
    # Пока waiting на network, НИКАКИЕ другие задачи не выполняются
    response = requests.get("https://example.com")
    return response.text

async def good_example():
    # ✅ ПРАВИЛЬНО: aiohttp не блокирует event loop
    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com") as resp:
            return await resp.text()

Ошибка 2: забыли await

async def fetch():
    return "data"

async def bad():
    result = fetch()       # ❌ Это coroutine объект, не результат!
    print(result)          # Выведет: <coroutine object fetch at 0x...>

async def good():
    result = await fetch()  # ✅ await запускает корутину и ждёт результат
    print(result)          # Выведет: data

Ошибка 3: CPU-bound задача в async функции

async def bad_cpu_task():
    # ❌ Тяжёлое вычисление блокирует event loop
    # Пока считается, другие задачи НЕ работают
    result = 0
    for i in range(10_000_000):
        result += i ** 2
    return result

async def good_cpu_task():
    # ✅ Отдаём CPU-bound в executor (thread pool или process pool)
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as executor:
        result = await loop.run_in_executor(executor, heavy_computation)
    return result

Пример: парсинг 1000 URL с ограничением concurrency

import asyncio
import aiohttp

async def fetch(session, url, semaphore):
    async with semaphore:  # Максимум 50 одновременных запросов
        async with session.get(url) as resp:
            return await resp.text()

async def scrape_all(urls: list[str]):
    semaphore = asyncio.Semaphore(50)  # Лимит concurrency

    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    # Отделяем успешные от ошибок
    successes = [r for r in results if not isinstance(r, Exception)]
    errors = [r for r in results if isinstance(r, Exception)]

    print(f"Успешно: {len(successes)}, Ошибки: {len(errors)}")
    return successes

6. Multiprocessing: настоящий параллелизм

Что такое multiprocessing

Несколько процессов, каждый со своим интерпретатором Python и своей памятью. GIL не влияет — каждый процесс имеет свой GIL. Задачи выполняются действительно параллельно на разных CPU ядрах.

Когда использовать multiprocessing:

  • CPU-bound задачи: вычисления,image processing, cryptography

  • Machine learning: обучение моделей

  • Data processing: обработка больших массивов данных

  • Когда нужно изолировать процессы (сбой одного не ломает другие)

Когда НЕ использовать multiprocessing:

  • I/O-bound задачи (overhead процессов > benefit)

  • Задачи требующие частого обмена данными между процессами

  • Маленькие задачи (overhead запуска процесса > время выполнения)

Базовый пример: multiprocessing

import multiprocessing
import time

def compute_primes(start: int, end: int) -> list[int]:
    """Находит простые числа в диапазоне"""
    primes = []
    for num in range(start, end):
        if is_prime(num):
            primes.append(num)
    return primes

def is_prime(n: int) -> bool:
    if n < 2:
        return False
    for i in range(2, int(n ** 0.5) + 1):
        if n % i == 0:
            return False
    return True

def main():
    # Делим работу на 4 части (по количеству ядер)
    num_workers = multiprocessing.cpu_count()
    chunk_size = 1_000_000 // num_workers

    start = time.perf_counter()

    with multiprocessing.Pool(num_workers) as pool:
        ranges = [
            (i * chunk_size, (i + 1) * chunk_size)
            for i in range(num_workers)
        ]
        results = pool.starmap(compute_primes, ranges)

    # Собираем результаты из всех процессов
    all_primes = [p for chunk in results for p in chunk]

    elapsed = time.perf_counter() - start
    print(f"Найдено {len(all_primes)} простых чисел за {elapsed:.2f}s")

if __name__ == '__main__':  # Обязательно! На Windows без этого crash
    main()

Передача данных между процессами

Процессы не делят память. Для обмена данных есть специальные механизмы:

import multiprocessing
from multiprocessing import Queue, Value, Array

# 1. Queue — для передачи объектов между процессами
def producer(queue: Queue):
    for i in range(5):
        queue.put(f"item_{i}")
    queue.put(None)  # Сигнал завершения

def consumer(queue: Queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Обработали: {item}")

# 2. Value — для одиночных значений (shared memory)
def increment(counter: Value):
    with counter.get_lock():
        counter.value += 1

# 3. Array — для массивов (shared memory)
shared_array = Array('i', [0] * 10)  # массив из 10 int

Ограничения multiprocessing:

  • Объекты между процессами должны быть picklable (сериализуемы). Lambda, generator, local functions не работают

  • На Windows процесс запускается через spawn. Код должен быть под if __name__ == '__main__'

  • Запуск процесса тяжелее запуска потока (~50-100ms vs ~1ms)

7. concurrent.futures: высокий уровень

concurrent.futures это модуль стандартной библиотеки, который даёт простой интерфейс для threading и multiprocessing. В большинстве случаев используйте его вместо threading и multiprocessing напрямую.

ThreadPoolExecutor: потоки через pool

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time

def fetch_url(url: str) -> dict:
    response = requests.get(url)
    return {"url": url, "status": response.status_code, "length": len(response.text)}

urls = [f"https://example.com/page{i}" for i in range(20)]

start = time.perf_counter()

# max_workers=10: максимум 10 потоков одновременно
with ThreadPoolExecutor(max_workers=10) as executor:
    # Отправляем задачи
    future_to_url = {executor.submit(fetch_url, url): url for url in urls}

    # Обрабатываем результаты по мере завершения
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            result = future.result()
            print(f"✓ {result['url']}: {result['status']}")
        except Exception as e:
            print(f"✗ {url}: {e}")

elapsed = time.perf_counter() - start
print(f"Всего за {elapsed:.2f}s")

ProcessPoolExecutor: процессы через pool

from concurrent.futures import ProcessPoolExecutor
import time
from multiprocessing import cpu_count

def heavy_computation(n: int) -> int:
    """Тяжёлое вычисление"""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

def main():
    tasks = [1_000_000] * 8  # 8 одинаковых задач

    start = time.perf_counter()

    with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
        # map(): простой способ для одинаковых задач
        results = list(executor.map(heavy_computation, tasks))

    elapsed = time.perf_counter() - start
    print(f"Результаты: {results}")
    print(f"Время: {elapsed:.2f}s")
    # Синхронно: ~54s
    # С ProcessPoolExecutor (8 ядер): ~6s (9x ускорение)

if __name__ == '__main__':
    main()

Когда использовать concurrent.futures vs threading/multiprocessing напрямую

СитуацияЧто использовать
Простые I/O задачи с пулом потоковThreadPoolExecutor
Простые CPU-bound задачиProcessPoolExecutor
Нужна Fine-grained контроль над потоками (Lock, Event, Condition)threading напрямую
Нужна shared memory между процессами (Value, Array)multiprocessing напрямую
Producer-consumer с Queuemultiprocessing.Queue
CPU-bound + нужно запустить из asyncioloop.run_in_executor(ProcessPoolExecutor)

8. Гибридный подход: asyncio + multiprocessing

Реальные приложения часто комбинируют I/O-bound и CPU-bound задачи. Например: скачивать данные по сети (I/O), потом обрабатывать их (CPU). Asyncio + multiprocessing вместе дают максимальную производительность.

Паттерн: async I/O + CPU-bound в процессах

import asyncio
from concurrent.futures import ProcessPoolExecutor
import aiohttp
import json

def process_data(data: dict) -> dict:
    """CPU-bound обработка (выполняется в отдельном процессе)"""
    # Тяжёлые вычисления над данными
    result = sum(x ** 2 for x in data.get("values", []))
    return {"id": data["id"], "result": result}

async def fetch_and_process(session, url, executor, loop):
    # Шаг 1: Async I/O — скачиваем данные (не блокирует event loop)
    async with session.get(url) as resp:
        data = await resp.json()

    # Шаг 2: CPU-bound — отдаём в ProcessPoolExecutor
    # await позволяет event loop работать пока считается
    result = await loop.run_in_executor(executor, process_data, data)

    return result

async def main():
    urls = [f"https://api.example.com/data/{i}" for i in range(100)]
    loop = asyncio.get_event_loop()

    with ProcessPoolExecutor() as executor:
        async with aiohttp.ClientSession() as session:
            tasks = [
                fetch_and_process(session, url, executor, loop)
                for url in urls
            ]
            # Все 100 задач: I/O параллельно, CPU параллельно
            results = await asyncio.gather(*tasks)

    print(f"Обработано {len(results)} задач")

asyncio.run(main())

Что происходит:

  • 100 HTTP запросов летят параллельно через asyncio (event loop)

  • Когда данные приходят, CPU-обработка идёт в отдельных процессах через ProcessPoolExecutor

  • Event loop не блокируется ни на I/O, ни на вычисления

  • Результат: максимальная скорость и для I/O, и для CPU

9. Python 3.13–3.14: free-threaded build без GIL

Что изменилось

PEP 703 одобрен июль 2023. Python 3.13 (октябрь 2024) добавил экспериментальный free-threaded build где GIL отключён. Это самое крупное изменение внутренней архитектуры Python за десятки лет.

Что это значит на практике

ИнтерпретаторSingle-thread скоростьMulti-thread (4 потока) CPU-bound
Python 3.12 (GIL)100% (baseline)1.0x (GIL блокирует)
Python 3.13 (GIL)~98%1.0x
Python 3.13t (free-threaded)~60% (overhead)

2.2x

Python 3.14t (free-threaded)~91% (overhead снижен)

3.1x

Реальный бенчмарк (CPU-bound, подсчёт простых чисел):

# Python 3.13 standard (с GIL):
# Single-threaded:  7.51 секунд
# Multi-threaded (5 потоков): 7.07 секунд  ← почти то же самое (GIL)

# Python 3.13t free-threaded (без GIL):
# Single-threaded:  1.86 секунд
# Multi-threaded (5 потоков): 0.39 секунд  ← 4.8x быстрее!

Как подключить free-threaded build

# Инсталл (macOS/Windows, Python 3.13+)
# При установке с python.org выбрать "Download free-threaded binaries"

# Или через source:
# ./configure --disable-gil
# make
# make install

# Запуск:
# python3.13t main.py  ← free-threaded build

# Проверить что GIL отключён:

import sys
print(sys.version)  # содержит "experimental free-threading build"
print(sys._is_gil_enabled())  # False = GIL отключён

Когда использовать free-threaded build в 2025:

  • CPU-bound multi-threaded задачи где multiprocessing слишком тяжёл

  • AI/ML pipelines: PyTorch DataLoader, feature preprocessing

  • Эксперименты и бенчмарки

Когда НЕ использовать пока:

  • Production приложения (ещё экспериментально)

  • Когда key библиотеки не поддержали free-threading (Django, pandas частично)

  • I/O-bound или single-threaded код (overhead без пользы)

10. Сравнительная таблица: итоговый выбор

Критерийasynciothreadingmultiprocessing

Тип задачи

I/O-boundI/O-boundCPU-bound

Модель

Cooperative (задача сама отдаёт контроль)Preemptive (OS переключает)Полная изоляция процессов

Настоящий параллелизм

Нет (один поток)Нет (GIL)

Да

Общая память

Да (один поток)Да (shared memory)Нет (нужны Queue/Pipe/Value)

Overhead запуска

МинимальныйМалый (~1ms)Большой (~50-100ms)

Скоординированность

Тысячи задач легкоДесятки потоков (больше = тяжелее)Десятки процессов (= кол-во ядер)

Сложность

Средняя (async/await + правильные библиотеки)Средняя (locks, race conditions)Высокая (serialization, IPC)

Когда использовать

Web scraping, API calls, real-time appsBlocking libraries, background tasks, GUIТяжёлые вычисления, image processing, ML

Правило Python community для I/O-bound задач: «Use asyncio when you can, threading when you must.» Asyncio даёт лучшее ускорение для I/O, но требует async-friendly библиотек (aiohttp вместо requests, asyncpg вместо psycopg2).

11. Чек-лист: как выбрать подход

Шаг 1: определите тип задачи

  1. ☐ Задача ждёт сетевой ответ, чтения файла, DB запроса? → I/O-bound

  2. ☐ Задача считает, обрабатывает данные, CPU загружен на 100%? → CPU-bound

Шаг 2: I/O-bound — выбираете между asyncio и threading

  1. ☐ Есть async-ready библиотеки для всех I/O операций (aiohttp, asyncpg, aiofiles)? → asyncio

  2. ☐ Нужно использовать blocking библиотеки (requests, psycopg2)? → ThreadPoolExecutor

  3. ☐ Количество одновременных задач > 100? → asyncio (потоки слишком тяжелее)

  4. ☐ Background задача (monitoring, periodic job)? → threading.Thread

  5. ☐ GUI приложение? → threading.Thread (UI thread + worker)

Шаг 3: CPU-bound — выбираете между multiprocessing и free-threading

  1. ☐ Нужны отделённые процессы? → ProcessPoolExecutor

  2. ☐ Нужно запустить CPU-bound из async? → loop.run_in_executor(ProcessPoolExecutor)

  3. ☐ Python 3.13+ и задача подходит? → попробуйте free-threaded build

  4. ☐ Тяжёлые вычисления через NumPy/PyTorch? → они уже освобождают GIL внутри себя, threading может работать

Шаг 4: проверка

  1. ☐ Запустили — замерли время (perf_counter)

  2. ☐ Сравнили с синхронной версией

  3. ☐ Если не быстрее, выбрали неправильный подход

  4. ☐ Проверили что объекты передаются между процессами корректно (pickling)

  5. ☐ Протестировали edge cases (exceptions, timeouts, empty data)

Красные флаги

  1. ☐ Использовали threading для CPU-bound (GIL блокирует, нет ускорения)

  2. ☐ Использовали asyncio для CPU-bound (event loop блокируется)

  3. ☐ Использовали multiprocessing для простой I/O задачи (overhead > benefit)

  4. ☐ Забыли if __name__ == '__main__' при multiprocessing на Windows

  5. ☐ Пытаетесь передать non-picklable объект между процессами

  6. ☐ Создаёте тысячи потоков (потоки тяжелее, используйте ThreadPoolExecutor или asyncio)

  7. ☐ Не используете Lock для shared state в threading

А лучшие вакансии для разработчиков ищите на hirehi.ru