Паттерны асинхронной обработки и Streaming API
Введение: Иллюзия скорости и реальность блокировок
Добро пожаловать на один из самых критичных уроков первого модуля. Сегодня мы не просто пишем код, мы работаем над восприятием вашего продукта пользователем. Когда речь заходит о больших языковых моделях (LLM), таких как Gemini 3, время отклика — это ваш главный враг.
Представьте сценарий: пользователь просит вашего AI-ассистента написать эссе на 2000 слов. В синхронном мире (blocking I/O) ваше приложение «зависает» на 15–20 секунд, пока модель генерирует весь текст целиком на серверах Google. Для пользователя это выглядит как сломанный интерфейс. Он нажимает кнопку, и ничего не происходит. Это UX-катастрофа.
В этом уроке мы разберем два фундаментальных подхода, которые решают эту проблему:
- Асинхронность (Async/Await): Позволяет вашему приложению делать несколько дел одновременно, не дожидаясь завершения запроса к API.
- Стриминг (Streaming): Позволяет показывать пользователю ответ по частям, по мере его генерации, создавая эффект мгновенной реакции.
Мы переходим от парадигмы «Запрос -> Долгое ожидание -> Ответ» к парадигме «Запрос -> Мгновенный поток данных».
Анатомия задержки: TTFT vs Total Latency
Прежде чем писать код, давайте определимся с метриками. В мире LLM есть два ключевых показателя:
- TTFT (Time To First Token): Время от отправки запроса до появления первого символа на экране. Это то, что определяет «ощущение» скорости.
- Total Latency (Общая задержка): Время до полного завершения генерации ответа.
Синхронные запросы убивают TTFT. Вы получаете первый токен одновременно с последним. Streaming API позволяет сократить TTFT до сотен миллисекунд, даже если Total Latency составляет минуты.
Давайте посмотрим, как выглядит классический синхронный (блокирующий) запрос, которого мы стараемся избегать в высоконагруженных системах.
import time
import google.generativeai as genai
import os
# Настройка API ключа (предполагаем, что он в переменных окружения)
genai.configure(api_key=os.environ["GEMINI_API_KEY"])
def blocking_request():
model = genai.GenerativeModel('gemini-3-pro') # Используем условную версию Gemini 3
print("Начинаем запрос (Синхронно)...")
start_time = time.time()
# Этот вызов блокирует выполнение программы до полной готовности ответа
response = model.generate_content("Напиши подробную историю о колонизации Марса (около 500 слов).")
end_time = time.time()
print(f"Запрос завершен. Прошло времени: {end_time - start_time:.2f} сек.")
print(f"Длина ответа: {len(response.text)} символов.")
# При запуске вы увидите паузу в 10+ секунд, прежде чем появится хоть какой-то вывод
# blocking_request()
Стриминг: Психология ожидания
Теперь давайте изменим подход. Стриминг в Gemini API работает через Server-Sent Events (SSE) или аналогичные механизмы под капотом. Вместо одного огромного JSON-объекта в конце, сервер отправляет серию небольших пакетов (chunks), каждый из которых содержит несколько сгенерированных токенов.
Почему это важно для Gemini 3?
Современные модели имеют огромное контекстное окно. Если вы просите модель проанализировать книгу и дать резюме, генерация может занять много времени. Стриминг позволяет пользователю начать читать первый абзац резюме, пока модель все еще «додумывает» выводы в конце.
В Python SDK Google это реализуется через аргумент stream=True. Обратите внимание: объект ответа превращается в итератор.
def streaming_request():
model = genai.GenerativeModel('gemini-3-pro')
print("Начинаем запрос (Streaming)...")
# Включаем stream=True
response_stream = model.generate_content(
"Объясни квантовую запутанность простыми словами.",
stream=True
)
print("Ответ:", end=" ")
# Мы итерируемся по потоку. Как только приходит кусок данных — мы его выводим.
for chunk in response_stream:
# flush=True важен, чтобы текст появлялся в консоли мгновенно
print(chunk.text, end="", flush=True)
print("\n[Генерация завершена]")
# streaming_request()
Асинхронность (Asyncio): Параллельная магия
Стриминг решает проблему восприятия одного пользователя. Но что, если вы строите бэкенд, который должен обслуживать 1000 пользователей одновременно? Или вам нужно запустить 5 агентов, которые спорят друг с другом?
Если вы используете синхронный код (даже со стримингом), ваш Python-скрипт будет занят обслуживанием одного соединения. Здесь на сцену выходит asyncio.
Ключевое отличие:
В синхронном коде вы ждете (waiting). В асинхронном вы ожидаете (awaiting), освобождая процессор для других задач, пока данные летят по сети.
Google Gen AI SDK предоставляет нативные асинхронные методы: generate_content_async и stream_generate_content_async (названия могут варьироваться в зависимости от версии SDK, но принцип await неизменен).
Ниже приведен паттерн «Gathering» (Сбор). Это классический прием, когда нам нужно выполнить несколько независимых задач параллельно и собрать их результаты.
import asyncio
async def analyze_topic(topic_id, topic_name):
model = genai.GenerativeModel('gemini-3-flash') # Используем быструю модель для параллельных задач
print(f"[{topic_id}] Начинаю анализ темы: {topic_name}")
# Используем generate_content_async
response = await model.generate_content_async(
f"Дай одно предложение с ключевым фактом про: {topic_name}"
)
print(f"[{topic_id}] Готово!")
return f"{topic_name}: {response.text.strip()}"
async def main_parallel():
topics = ["Черные дыры", "Фотосинтез", "Римская Империя", "Нейросети", "Графен"]
# Создаем список задач (Tasks). Они запускаются мгновенно при создании.
tasks = [
analyze_topic(i, topic)
for i, topic in enumerate(topics)
]
print("Запускаем пакетную обработку...")
# gather ждет завершения ВСЕХ задач параллельно
results = await asyncio.gather(*tasks)
print("\n--- Результаты ---")
for res in results:
print(res)
# Для запуска в Jupyter Notebook используйте await main_parallel()
# Для запуска в скрипте: asyncio.run(main_parallel())
Паттерн: Асинхронный Стриминг (The Ultimate Combo)
Высший пилотаж — это объединение асинхронности и стриминга. Это необходимо, например, при создании веб-сокетов для чат-бота, где сервер должен одновременно держать соединение с клиентом открытым, принимать данные от Gemini API по кусочкам и сразу пересылать их клиенту.
Здесь мы используем конструкцию async for. Это асинхронный итератор. Он не блокирует цикл событий (Event Loop) между получением токенов.
Важное предупреждение (Rate Limiting):
Когда вы начинаете использовать асинхронность, очень легко упереться в лимиты API (RPM - Requests Per Minute). Gemini 3 API мощный, но не бесконечный. Если вы запустите 100 задач через asyncio.gather, вы, скорее всего, получите ошибку 429 Resource Exhausted.
Для решения этой проблемы в продакшене используется asyncio.Semaphore. Это «вышибала» на входе в клуб, который пускает только определенное количество запросов одновременно.
async def safe_stream_generator(prompt, sem):
# Семафор ограничивает количество одновременных входов в этот блок
async with sem:
model = genai.GenerativeModel('gemini-3-pro')
stream = await model.generate_content_async(prompt, stream=True)
collected_text = ""
async for chunk in stream:
# Здесь мы могли бы отправлять данные в веб-сокет
# await websocket.send(chunk.text)
collected_text += chunk.text
# Имитация работы
return collected_text[:50] + "..." # Возвращаем начало для демонстрации
async def main_controlled():
# Разрешаем только 3 одновременных запроса
sem = asyncio.Semaphore(3)
prompts = [f"Расскажи факт номер {i}" for i in range(10)]
tasks = [safe_stream_generator(p, sem) for p in prompts]
# По мере освобождения слотов, будут запускаться новые задачи
results = await asyncio.gather(*tasks)
print(f"Обработано {len(results)} запросов с контролем потока.")
Обработка ошибок в потоке
Одна из самых коварных частей стриминга — ошибки возникают в процессе.
В обычном запросе, если вы словили Exception, у вас просто нет ответа. В стриминге вы могли уже показать пользователю половину предложения, а потом отвалился интернет или сработал фильтр безопасности (Safety Filter).
Как обрабатывать `FinishReason`:
В Gemini API каждый чанк и финальный ответ содержат метаданные. Если поток прерывается, важно проверить candidate.finish_reason. Если он равен SAFETY, модель отказалась продолжать текст из-за нарушения политик. Ваш UI должен уметь корректно это обработать, не оставляя пользователя с оборванным на полуслове текстом.
Создайте асинхронный скрипт 'AI-Дебатный клуб'.<br>1. Скрипт должен иметь две роли (два разных системных промпта): 'Оптимист' и 'Пессимист'.<br>2. Тема спора: 'Будущее искусственного интеллекта'.<br>3. Запустите генерацию аргументов для обеих сторон ПАРАЛЛЕЛЬНО (асинхронно).<br>4. Используйте streaming для вывода аргументов в консоль, но добавьте префикс перед каждой строкой (например, [ОПТИМИСТ]: ...).<br>5. Реализуйте простую обработку ошибок: если один из потоков падает, второй должен завершиться.
При использовании asyncio.gather() для отправки 50 запросов к Gemini API одновременно, вы получаете ошибку 429 Resource Exhausted. Какое решение является архитектурно наиболее правильным?