Потоковая передача (Streaming) и асинхронные запросы
Введение: Скорость восприятия против реальной скорости
Добро пожаловать на урок, который разделит вашу разработку на «до» и «после». Мы переходим от простых скриптов, которые «висят» в ожидании ответа, к профессиональным, отзывчивым приложениям. Сегодняшняя тема — Потоковая передача (Streaming) и асинхронность.
Давайте начнем с реальной проблемы. Представьте, что вы создаете помощника для написания кода. Пользователь просит сгенерировать сложный SQL-запрос. Если вы используете стандартный синхронный запрос, процесс выглядит так:
- Пользователь нажимает «Отправить».
- Интерфейс замирает или показывает крутящийся спиннер.
- Сервер отправляет запрос к Gemini.
- Проходит 5... 10... 15 секунд (модель «думает» и генерирует весь ответ целиком).
- Бах! Огромный кусок текста мгновенно появляется на экране.
С технической точки зрения, это работает. С точки зрения UX (пользовательского опыта) — это катастрофа. Пользователь начинает думать, что приложение зависло, уже на 3-й секунде. В мире Enterprise-решений, где время — деньги, такое поведение недопустимо.
В этом уроке мы научимся двум вещам:
- Streaming (Потоковая передача): Показывать ответ по мере его генерации, слово за словом, снижая Time to First Token (TTFT) до миллисекунд.
- Asynchronous Requests (Асинхронность): Обрабатывать сотни запросов параллельно, не блокируя основной поток выполнения вашего приложения.
Анатомия асинхронности в Python
Прежде чем нырнуть в специфику Gemini 3, давайте синхронизируем (каламбур намерен) наше понимание асинхронности. В Python, когда мы говорим об асинхронности в контексте API, мы чаще всего имеем в виду библиотеку asyncio.
В синхронном коде (blocking I/O) ваша программа похожа на человека в очереди в кофейне. Вы заказали кофе (сделали запрос к API) и стоите у кассы, ничего не делая, пока бариста не отдаст стакан. Вы не можете проверить почту, позвонить другу или сделать другой заказ. Вы заблокированы.
В асинхронном коде (non-blocking I/O) вы делаете заказ, получаете пейджер (awaitable object) и отходите. Пока кофе готовится, вы можете отвечать на сообщения, делать другие заказы или читать новости. Когда пейджер запищит, вы заберете результат.
Почему это критично для LLM?
Сетевые запросы к большим языковым моделям — это «медленные» операции ввода-вывода. Пока ваш процессор ждет ответа от серверов Google, он простаивает миллиарды циклов. Асинхронность позволяет утилизировать это время ожидания для обработки других задач.
import asyncio
import time
# Предположим, это наш клиент Gemini 3
from google.genai import GenerativeModel
# СИНХРОННЫЙ ПОДХОД (Блокирующий)
def sync_generate():
model = GenerativeModel("gemini-3-pro")
print("Начало синхронного запроса...")
# Программа 'зависает' здесь, пока не придет полный ответ
response = model.generate_content("Расскажи кратко о квантовой физике")
print(f"Готово. Длина ответа: {len(response.text)}")
# АСИНХРОННЫЙ ПОДХОД (Неблокирующий)
async def async_generate():
model = GenerativeModel("gemini-3-pro")
print("Начало асинхронного запроса...")
# Ключевое слово await отдает управление обратно в Event Loop
# пока мы ждем ответа от сервера
response = await model.generate_content_async("Расскажи кратко о квантовой физике")
print(f"Готово. Длина ответа: {len(response.text)}")
# Пример запуска асинхронной функции
# asyncio.run(async_generate())
Потоковая передача (Streaming): Магия живого текста
Теперь поговорим о том, как сделать приложение «живым». Когда вы видите, как ChatGPT или Gemini печатают текст в реальном времени, это не просто визуальный эффект. Это фундаментально другой способ получения данных.
Как это работает под капотом?
LLM генерируют текст токен за токеном (авторегрессионно). В обычном режиме API накапливает все токены в буфер на сервере Google, и только когда генерация завершена (или достигнут лимит токенов), отправляет вам полный JSON.
В режиме Streaming, сервер открывает соединение и начинает «выплевывать» чанки (кусочки данных) сразу же, как только сгенерирован новый токен или группа токенов. Обычно чанк — это объект, содержащий от одного до нескольких слов.
Ключевая метрика: TTFT (Time To First Token)
Это время от момента отправки запроса до появления первого символа на экране.
- Без стриминга: TTFT = Время генерации всего текста + Сетевая задержка. (Может быть 10-20 секунд).
- Со стримингом: TTFT = Время генерации первого токена + Сетевая задержка. (Обычно 0.5 - 1.5 секунды).
# Пример потоковой передачи (Синхронный итератор)
# В Gemini 3 API стриминг реализован через итераторы
def stream_example():
model = GenerativeModel("gemini-3-pro")
# stream=True меняет тип возвращаемого значения
# Теперь это не Response, а итератор чанков
response_stream = model.generate_content(
"Напиши эссе о будущем ИИ",
stream=True
)
print("Ответ модели:", end=" ")
# Мы итерируемся по потоку по мере поступления данных
for chunk in response_stream:
# Каждый chunk содержит часть текста
# flush=True важен для консоли, чтобы видеть вывод сразу
print(chunk.text, end="", flush=True)
# Здесь можно обрабатывать метаданные, например safety_ratings
if chunk.candidates[0].finish_reason:
print(f"\n[Генерация завершена: {chunk.candidates[0].finish_reason}]")
Высший пилотаж: Асинхронный стриминг
Мы подошли к самой мощной технике в арсенале разработчика Gemini — объединению асинхронности и стриминга. Это стандарт де-факто для высоконагруженных чат-ботов и веб-приложений.
Представьте веб-сервер (например, на FastAPI), который обслуживает 1000 пользователей одновременно. Если вы будете использовать синхронный стриминг, один долгий запрос от пользователя заблокирует рабочий процесс (worker), и остальные пользователи будут ждать. Асинхронный стриминг позволяет одному процессу обслуживать множество соединений, переключаясь между ними в моменты ожидания новых пакетов данных от API.
В Python для этого используется синтаксис async for. Это позволяет перебирать элементы итератора, который подгружает данные асинхронно.
- Обработка ошибок внутри потока: Сеть может разорваться посередине генерации. Ваш код должен быть готов перехватить исключение внутри цикла
async for. - Агрегация: Если вам нужно сохранить полный ответ в базу данных после показа пользователю, вам придется собирать текст вручную, конкатенируя чанки.
import asyncio
from google.genai import GenerativeModel
async def async_stream_handler():
model = GenerativeModel("gemini-3-ultra")
prompt = "Разработай архитектуру микросервисов для онлайн-магазина."
full_response_log = []
print("--- Начало потока ---")
try:
# Обратите внимание: метод generate_content_async с stream=True
response_iterator = await model.generate_content_async(
prompt,
stream=True
)
# Использование async for для неблокирующего получения чанков
async for chunk in response_iterator:
# Симуляция отправки чанка на фронтенд (например, через WebSocket)
content = chunk.text
if content:
print(content, end="", flush=True)
full_response_log.append(content)
# Имитация небольшой задержки обработки, чтобы показать суть async
# В реальности здесь может быть логика сохранения в БД
await asyncio.sleep(0.01)
except Exception as e:
print(f"\n[Ошибка во время стриминга: {e}]")
finally:
print("\n--- Конец потока ---")
# Агрегированный результат для сохранения
final_text = "".join(full_response_log)
return final_text
# Для запуска в jupyter или скрипте:
# asyncio.run(async_stream_handler())
Практические советы и паттерны
Работая с Gemini 3 API в продакшене, вы столкнетесь с нюансами, о которых не пишут в "Hello World" туториалах.
- UI/UX дрожание: Чанки приходят неравномерно. Иногда прилетает один символ, иногда — целое предложение. Для гладкого UI часто используют небольшой буфер на стороне клиента или фронтенда, который «печатает» текст с равномерной скоростью, сглаживая рывки сети.
- Таймауты: В асинхронном стриминге важно настраивать таймауты не на весь запрос, а на получение следующего чанка. Если поток «замолчал» на 30 секунд — это повод разорвать соединение и повторить попытку.
- Отмена генерации: Пользователи часто нажимают «Стоп», когда видят, что ответ идет не туда. В асинхронном коде вы должны уметь корректно прерывать цикл
async forи отправлять сигнал отмены (если API это поддерживает) или просто закрывать соединение, экономя токены (хотя модель уже могла их сгенерировать).
Создайте простой консольный чат-бот с памятью контекста, который использует асинхронный стриминг. Бот должен:<br>1. Принимать ввод пользователя в бесконечном цикле.<br>2. Отправлять запрос к модели, учитывая историю переписки (ChatSession).<br>3. Выводить ответ потоком (stream) по мере поступления.<br>4. Корректно завершать работу по команде 'exit'.<br><br>Используйте 'asyncio' и класс 'ChatSession' (или его аналог start_chat) из SDK.
Какое главное преимущество использования стриминга (stream=True) с точки зрения пользовательского опыта (UX)?