Потоковая передача (Streaming) и асинхронная обработка запросов

45 минут Урок 5

Введение: Битва за миллисекунды

Добро пожаловать на урок, который разделяет просто работающие скрипты и профессиональные высоконагруженные системы. Когда вы строите архитектуру ИИ-решений, особенно на базе мощных моделей вроде Gemini 3, вы сталкиваетесь с двумя главными врагами: задержкой (Latency) и блокировкой ресурсов.

Представьте ситуацию: пользователь задает вашему боту сложный аналитический вопрос. Модели требуется 10 секунд на размышление и генерацию полного ответа. В синхронном мире без потоковой передачи (streaming) пользователь будет смотреть на пустой экран 10 секунд. Для современного веба это вечность. Пользователь решит, что сервис завис, и уйдет.

В этом уроке мы разберем два инструмента, которые решают эту проблему:

  1. Streaming (Потоковая передача): Позволяет отдавать пользователю ответ по частям (токенам) по мере их генерации, создавая эффект «живого набора текста» и радикально снижая TTFT (Time To First Token).
  2. Asyncio (Асинхронность): Позволяет вашему серверу обрабатывать тысячи других запросов, пока Gemini «думает» над ответом для одного пользователя.

Мы переходим от написания скриптов к проектированию систем.

Часть 1: Механика Streaming в Gemini API

По умолчанию API работает в режиме request-response: вы отправляете промпт, сервер Google обрабатывает его целиком и возвращает полный JSON. Это просто, но неэффективно для длинных текстов.

При включении Streaming, сервер Google использует механизм Chunked Transfer Encoding. Он начинает отправлять пакеты данных (chunks) сразу, как только сгенерированы первые токены. В Python SDK для Gemini это реализовано через возвращение итератора, а не статического объекта.

Ключевые преимущества стриминга:

  • Психологический комфорт: Пользователь видит активность через 0.5–1 сек, а не через 10 сек.
  • Возможность прерывания: Если пользователь видит, что ответ идет «не туда», он может остановить генерацию, сэкономив ваши деньги и лимиты.

python
import os
import google.generativeai as genai

# Настройка API ключа
genai.configure(api_key=os.environ["GEMINI_API_KEY"])

def stream_example():
    model = genai.GenerativeModel('gemini-1.5-pro')
    
    print("--- Начало генерации (Streaming) ---")
    
    # Ключевой параметр: stream=True
    response = model.generate_content(
        "Напиши короткую, но детализированную историю о киберпанк-детективе.",
        stream=True
    )
    
    # response теперь не объект с текстом, а итератор
    for chunk in response:
        # Каждый chunk содержит часть сгенерированного текста
        # end='' используется, чтобы не делать лишние переносы строк
        print(chunk.text, end='', flush=True)
        
    print("\n--- Генерация завершена ---")

# stream_example()

Часть 2: Асинхронность (Asyncio) — спасение для I/O операций

Стриминг решает проблему восприятия пользователем, но не решает проблему простоя вашего сервера. В стандартном синхронном коде (blocking I/O), пока ваш скрипт ждет ответа от Google (даже первого чанка), он блокируется. Он не может принять новый запрос от другого пользователя.

Для архитектора ИИ-решений это недопустимо. Вызовы к LLM — это классические I/O-bound операции (связанные с вводом-выводом). Процессор вашего сервера простаивает, ожидая сетевой пакет.

Использование asyncio и асинхронных методов клиента Gemini позволяет «отпустить» управление на время ожидания. Пока один запрос летит к Google, ваш Event Loop может обработать сотню других входящих соединений.

Важно: В Python SDK для Gemini асинхронные методы обычно имеют суффикс _async или находятся в отдельном пространстве имен.

python
import asyncio
import google.generativeai as genai
import time

async def async_generate_example():
    model = genai.GenerativeModel('gemini-1.5-flash')
    
    print(f"[{time.strftime('%X')}] Отправляем запрос...")
    
    # Используем generate_content_async вместо generate_content
    # Обратите внимание на ключевое слово await
    response = await model.generate_content_async(
        "Объясни концепцию Event Loop в 3 предложениях."
    )
    
    print(f"[{time.strftime('%X')}] Ответ получен:")
    print(response.text)

async def main_task():
    # Запускаем 3 запроса "параллельно" (конкурентно)
    # В синхронном коде это заняло бы время T1 + T2 + T3
    # В асинхронном это займет max(T1, T2, T3) + небольшие накладные расходы
    await asyncio.gather(
        async_generate_example(),
        async_generate_example(),
        async_generate_example()
    )

# Для запуска в скрипте:
# asyncio.run(main_task())

Часть 3: Высший пилотаж — Async Streaming

Теперь объединим эти концепции. Это именно тот паттерн, который вы будете использовать при создании чат-ботов, RAG-систем или веб-интерфейсов для работы с Gemini.

Асинхронный стриминг (Async Iterator) позволяет вам не блокировать поток, пока вы ждете следующего кусочка текста. Это критически важно, если вы, например, транслируете ответ модели через WebSocket на фронтенд. Если вы заблокируетесь на ожидании чанка, вы можете пропустить

python
import asyncio
import google.generativeai as genai

async def async_stream_chat(prompt, user_id):
    model = genai.GenerativeModel('gemini-1.5-pro')
    
    # 1. Инициализация асинхронного стрима
    response_stream = await model.generate_content_async(
        prompt,
        stream=True
    )
    
    print(f"User {user_id}: Ждем первого токена...")
    
    # 2. Асинхронный цикл for (async for)
    # Скрипт может переключаться на другие задачи между приходом чанков
    collected_text = ""
    async for chunk in response_stream:
        if chunk.text:
            # В реальном приложении здесь была бы отправка в WebSocket
            # await websocket.send(chunk.text)
            print(f"[User {user_id} stream]: {chunk.text[:10]}..." ) # Логируем начало чанка
            collected_text += chunk.text
            
    print(f"\nUser {user_id}: Генерация завершена. Длина: {len(collected_text)} символов.")
    return collected_text

async def main_server_simulation():
    # Симуляция одновременного обращения двух пользователей
    prompt1 = "Напиши рецепт пасты карбонара."
    prompt2 = "Напиши список 5 лучших языков программирования."
    
    await asyncio.gather(
        async_stream_chat(prompt1, 1),
        async_stream_chat(prompt2, 2)
    )

# asyncio.run(main_server_simulation())

Обработка ошибок и архитектурные нюансы

При работе с потоками (особенно асинхронными) важно учитывать, что сетевое соединение может разорваться посередине генерации.

Что нужно учесть архитектору:

  • Агрегация ответа: Всегда сохраняйте полный ответ на стороне сервера (как в переменной collected_text в примере выше), даже если вы сразу стримите его клиенту. Это нужно для сохранения истории диалога (контекста) для следующих запросов.
  • Safety Filters (Фильтры безопасности): Иногда Gemini может начать генерацию, но прервать её на середине, если встретит контент, нарушающий политики безопасности. В этом случае стрим завершится, а свойство finish_reason укажет на SAFETY. Ваш код должен уметь обрабатывать ситуацию, когда chunk.text не существует или пуст.
  • Timeouts: В asyncio всегда оборачивайте вызовы во внешние API в asyncio.wait_for, чтобы избежать вечного ожидания зависших соединений.

Упражнение

Создайте асинхронный скрипт 'AI-Интервьюер'. Скрипт должен:<br>1. Иметь список из 3-х вопросов для кандидата (например, про Python, SQL и Git).<br>2. Одновременно (конкурентно) запустить 3 независимых сессии генерации ответов на эти вопросы от лица разных 'экспертов' (используйте разные system instructions или просто разные промпты).<br>3. Выводить ответы в консоль в реальном времени (streaming), добавляя префикс эксперта к каждому чанку (например, '[Эксперт по SQL]: ...').<br><br>Цель: Увидеть, как чанки от разных запросов перемешиваются в консоли, доказывая асинхронность.

Вопрос

Почему использование `async for chunk in response` предпочтительнее обычного `for chunk in response` при разработке высоконагруженного веб-сервера?