Потоковая передача (Streaming) и асинхронные запросы

45 минут Урок 4

Введение: Скорость восприятия против реальной скорости

Добро пожаловать на урок, который разделит вашу разработку на «до» и «после». Мы переходим от простых скриптов, которые «висят» в ожидании ответа, к профессиональным, отзывчивым приложениям. Сегодняшняя тема — Потоковая передача (Streaming) и асинхронность.

Давайте начнем с реальной проблемы. Представьте, что вы создаете помощника для написания кода. Пользователь просит сгенерировать сложный SQL-запрос. Если вы используете стандартный синхронный запрос, процесс выглядит так:

  1. Пользователь нажимает «Отправить».
  2. Интерфейс замирает или показывает крутящийся спиннер.
  3. Сервер отправляет запрос к Gemini.
  4. Проходит 5... 10... 15 секунд (модель «думает» и генерирует весь ответ целиком).
  5. Бах! Огромный кусок текста мгновенно появляется на экране.

С технической точки зрения, это работает. С точки зрения UX (пользовательского опыта) — это катастрофа. Пользователь начинает думать, что приложение зависло, уже на 3-й секунде. В мире Enterprise-решений, где время — деньги, такое поведение недопустимо.

В этом уроке мы научимся двум вещам:

  • Streaming (Потоковая передача): Показывать ответ по мере его генерации, слово за словом, снижая Time to First Token (TTFT) до миллисекунд.
  • Asynchronous Requests (Асинхронность): Обрабатывать сотни запросов параллельно, не блокируя основной поток выполнения вашего приложения.

Анатомия асинхронности в Python

Прежде чем нырнуть в специфику Gemini 3, давайте синхронизируем (каламбур намерен) наше понимание асинхронности. В Python, когда мы говорим об асинхронности в контексте API, мы чаще всего имеем в виду библиотеку asyncio.

В синхронном коде (blocking I/O) ваша программа похожа на человека в очереди в кофейне. Вы заказали кофе (сделали запрос к API) и стоите у кассы, ничего не делая, пока бариста не отдаст стакан. Вы не можете проверить почту, позвонить другу или сделать другой заказ. Вы заблокированы.

В асинхронном коде (non-blocking I/O) вы делаете заказ, получаете пейджер (awaitable object) и отходите. Пока кофе готовится, вы можете отвечать на сообщения, делать другие заказы или читать новости. Когда пейджер запищит, вы заберете результат.

Почему это критично для LLM?
Сетевые запросы к большим языковым моделям — это «медленные» операции ввода-вывода. Пока ваш процессор ждет ответа от серверов Google, он простаивает миллиарды циклов. Асинхронность позволяет утилизировать это время ожидания для обработки других задач.

python
import asyncio
import time
# Предположим, это наш клиент Gemini 3
from google.genai import GenerativeModel

# СИНХРОННЫЙ ПОДХОД (Блокирующий)
def sync_generate():
    model = GenerativeModel("gemini-3-pro")
    print("Начало синхронного запроса...")
    # Программа 'зависает' здесь, пока не придет полный ответ
    response = model.generate_content("Расскажи кратко о квантовой физике")
    print(f"Готово. Длина ответа: {len(response.text)}")

# АСИНХРОННЫЙ ПОДХОД (Неблокирующий)
async def async_generate():
    model = GenerativeModel("gemini-3-pro")
    print("Начало асинхронного запроса...")
    # Ключевое слово await отдает управление обратно в Event Loop
    # пока мы ждем ответа от сервера
    response = await model.generate_content_async("Расскажи кратко о квантовой физике")
    print(f"Готово. Длина ответа: {len(response.text)}")

# Пример запуска асинхронной функции
# asyncio.run(async_generate())

Потоковая передача (Streaming): Магия живого текста

Теперь поговорим о том, как сделать приложение «живым». Когда вы видите, как ChatGPT или Gemini печатают текст в реальном времени, это не просто визуальный эффект. Это фундаментально другой способ получения данных.

Как это работает под капотом?
LLM генерируют текст токен за токеном (авторегрессионно). В обычном режиме API накапливает все токены в буфер на сервере Google, и только когда генерация завершена (или достигнут лимит токенов), отправляет вам полный JSON.

В режиме Streaming, сервер открывает соединение и начинает «выплевывать» чанки (кусочки данных) сразу же, как только сгенерирован новый токен или группа токенов. Обычно чанк — это объект, содержащий от одного до нескольких слов.

Ключевая метрика: TTFT (Time To First Token)
Это время от момента отправки запроса до появления первого символа на экране.

  • Без стриминга: TTFT = Время генерации всего текста + Сетевая задержка. (Может быть 10-20 секунд).
  • Со стримингом: TTFT = Время генерации первого токена + Сетевая задержка. (Обычно 0.5 - 1.5 секунды).
Это колоссальная разница для восприятия пользователя.

python
# Пример потоковой передачи (Синхронный итератор)
# В Gemini 3 API стриминг реализован через итераторы

def stream_example():
    model = GenerativeModel("gemini-3-pro")
    
    # stream=True меняет тип возвращаемого значения
    # Теперь это не Response, а итератор чанков
    response_stream = model.generate_content(
        "Напиши эссе о будущем ИИ",
        stream=True
    )
    
    print("Ответ модели:", end=" ")
    
    # Мы итерируемся по потоку по мере поступления данных
    for chunk in response_stream:
        # Каждый chunk содержит часть текста
        # flush=True важен для консоли, чтобы видеть вывод сразу
        print(chunk.text, end="", flush=True)
        
        # Здесь можно обрабатывать метаданные, например safety_ratings
        if chunk.candidates[0].finish_reason:
            print(f"\n[Генерация завершена: {chunk.candidates[0].finish_reason}]")

Высший пилотаж: Асинхронный стриминг

Мы подошли к самой мощной технике в арсенале разработчика Gemini — объединению асинхронности и стриминга. Это стандарт де-факто для высоконагруженных чат-ботов и веб-приложений.

Представьте веб-сервер (например, на FastAPI), который обслуживает 1000 пользователей одновременно. Если вы будете использовать синхронный стриминг, один долгий запрос от пользователя заблокирует рабочий процесс (worker), и остальные пользователи будут ждать. Асинхронный стриминг позволяет одному процессу обслуживать множество соединений, переключаясь между ними в моменты ожидания новых пакетов данных от API.

В Python для этого используется синтаксис async for. Это позволяет перебирать элементы итератора, который подгружает данные асинхронно.

Важные нюансы реализации:
  • Обработка ошибок внутри потока: Сеть может разорваться посередине генерации. Ваш код должен быть готов перехватить исключение внутри цикла async for.
  • Агрегация: Если вам нужно сохранить полный ответ в базу данных после показа пользователю, вам придется собирать текст вручную, конкатенируя чанки.

python
import asyncio
from google.genai import GenerativeModel

async def async_stream_handler():
    model = GenerativeModel("gemini-3-ultra")
    prompt = "Разработай архитектуру микросервисов для онлайн-магазина."
    
    full_response_log = []
    
    print("--- Начало потока ---")
    
    try:
        # Обратите внимание: метод generate_content_async с stream=True
        response_iterator = await model.generate_content_async(
            prompt,
            stream=True
        )
        
        # Использование async for для неблокирующего получения чанков
        async for chunk in response_iterator:
            # Симуляция отправки чанка на фронтенд (например, через WebSocket)
            content = chunk.text
            if content:
                print(content, end="", flush=True)
                full_response_log.append(content)
                
                # Имитация небольшой задержки обработки, чтобы показать суть async
                # В реальности здесь может быть логика сохранения в БД
                await asyncio.sleep(0.01) 
                
    except Exception as e:
        print(f"\n[Ошибка во время стриминга: {e}]")
    finally:
        print("\n--- Конец потока ---")
        
    # Агрегированный результат для сохранения
    final_text = "".join(full_response_log)
    return final_text

# Для запуска в jupyter или скрипте:
# asyncio.run(async_stream_handler())

Практические советы и паттерны

Работая с Gemini 3 API в продакшене, вы столкнетесь с нюансами, о которых не пишут в "Hello World" туториалах.

  1. UI/UX дрожание: Чанки приходят неравномерно. Иногда прилетает один символ, иногда — целое предложение. Для гладкого UI часто используют небольшой буфер на стороне клиента или фронтенда, который «печатает» текст с равномерной скоростью, сглаживая рывки сети.
  2. Таймауты: В асинхронном стриминге важно настраивать таймауты не на весь запрос, а на получение следующего чанка. Если поток «замолчал» на 30 секунд — это повод разорвать соединение и повторить попытку.
  3. Отмена генерации: Пользователи часто нажимают «Стоп», когда видят, что ответ идет не туда. В асинхронном коде вы должны уметь корректно прерывать цикл async for и отправлять сигнал отмены (если API это поддерживает) или просто закрывать соединение, экономя токены (хотя модель уже могла их сгенерировать).

Упражнение

Создайте простой консольный чат-бот с памятью контекста, который использует асинхронный стриминг. Бот должен:<br>1. Принимать ввод пользователя в бесконечном цикле.<br>2. Отправлять запрос к модели, учитывая историю переписки (ChatSession).<br>3. Выводить ответ потоком (stream) по мере поступления.<br>4. Корректно завершать работу по команде 'exit'.<br><br>Используйте 'asyncio' и класс 'ChatSession' (или его аналог start_chat) из SDK.

Вопрос

Какое главное преимущество использования стриминга (stream=True) с точки зрения пользовательского опыта (UX)?