Skip to content
Эта страница создана и переведена с помощью ИИ. Если вы заметили неточности, помогите нам улучшить её. Редактировать на GitHub

Шина событий

Шина событий PRX обеспечивает коммуникацию между плагинами и хост-системой через механизм публикации/подписки на основе топиков. Плагины могут публиковать события, подписываться на топики и реагировать на события жизненного цикла -- всё без прямой связи между компонентами.

Обзор

Шина событий обеспечивает:

  • Маршрутизация по топикам -- события публикуются в иерархические топики и доставляются соответствующим подписчикам
  • Подстановочные подписки -- подписка на целые поддеревья топиков с помощью glob-паттернов
  • Ограничения полезной нагрузки -- максимум 64 КБ на полезную нагрузку события для предотвращения злоупотребления ресурсами
  • Защита от рекурсии -- максимум 8 уровней глубины событие-порождает-событие для предотвращения бесконечных циклов
  • Доставка at-most-once -- события доставляются подписчикам без персистентности или повторных попыток

Структура топиков

Топики следуют иерархической точечно-разделённой конвенции именования в пространстве имён prx.:

prx.<категория>.<событие>

Встроенные топики

ТопикПубликуетсяОписание
prx.lifecycle.startedХостPRX запущен и все компоненты инициализированы
prx.lifecycle.stoppingХостPRX завершает работу; плагины должны выполнить очистку
prx.lifecycle.config_reloadedХостКонфигурация была перезагружена на лету
prx.session.createdХостСоздана новая сессия агента
prx.session.terminatedХостСессия агента завершена
prx.session.messageХостСообщение отправлено или получено в сессии
prx.channel.connectedХостКанал установил соединение
prx.channel.disconnectedХостКанал потерял соединение
prx.channel.errorХостКанал столкнулся с ошибкой
prx.tool.before_executeХостИнструмент будет выполнен (может быть перехвачен)
prx.tool.after_executeХостВыполнение инструмента завершено
prx.plugin.loadedХостПлагин загружен
prx.plugin.unloadedХостПлагин выгружен
prx.evolution.proposedХостСгенерировано предложение самоэволюции
prx.evolution.appliedХостПрименено изменение самоэволюции
prx.evolution.rolled_backХостИзменение самоэволюции откачено
prx.memory.storedХостЗапись памяти сохранена
prx.memory.recalledХостВоспоминания вспомнены для контекста
prx.cron.tickХостПроизошёл тик cron

Пользовательские топики

Плагины могут публиковать в пользовательские топики под собственным пространством имён:

prx.plugin.<имя_плагина>.<событие>

Например, погодный плагин может публиковать:

prx.plugin.weather.forecast_updated
prx.plugin.weather.alert_issued

Паттерны подписки

Точное совпадение

Подписка на один конкретный топик:

rust
event_bus.subscribe("prx.session.created", handler);

Подстановочное совпадение

Подписка на все топики в поддереве с помощью * (один уровень) или ** (несколько уровней):

rust
// Все события сессии
event_bus.subscribe("prx.session.*", handler);

// Все события жизненного цикла
event_bus.subscribe("prx.lifecycle.*", handler);

// Все события конкретного плагина
event_bus.subscribe("prx.plugin.weather.*", handler);

// Все события (используйте осторожно)
event_bus.subscribe("prx.**", handler);
ПаттернСовпадаетНе совпадает
prx.session.*prx.session.created, prx.session.terminatedprx.session.message.sent
prx.session.**prx.session.created, prx.session.message.sentprx.channel.connected
prx.*.connectedprx.channel.connectedprx.channel.error
prx.**Всё под prx.Топики вне пространства имён prx.

Структура события

Каждое событие содержит:

ПолеТипОписание
topicStringПолный путь топика (напр., prx.session.created)
payloadVec<u8>Сериализованные данные события (JSON по соглашению, макс. 64 КБ)
sourceStringИдентификатор публикатора (напр., host, plugin:weather)
timestampu64Unix-временная метка в миллисекундах
correlation_idOption<String>Опциональный ID для трассировки связанных событий

Формат полезной нагрузки

Полезные нагрузки сериализуются как JSON по соглашению. Каждый топик определяет собственную схему. Например:

prx.session.created:

json
{
  "session_id": "sess_abc123",
  "channel": "telegram",
  "user_id": "user:telegram:123456789"
}

prx.tool.after_execute:

json
{
  "session_id": "sess_abc123",
  "tool_name": "shell",
  "command": "ls -la /tmp",
  "duration_ms": 45,
  "success": true
}

Конфигурация

toml
[plugins.event_bus]
enabled = true
max_payload_bytes = 65536           # 64 КБ
max_recursion_depth = 8             # предотвращение бесконечных циклов событий
max_subscribers_per_topic = 64      # лимит подписчиков на топик
channel_capacity = 1024             # ёмкость внутренней очереди событий
delivery_timeout_ms = 5000          # таймаут для медленных подписчиков

Справочник конфигурации

ПолеТипПо умолчаниюОписание
enabledbooltrueВключить или отключить шину событий
max_payload_bytesusize65536Максимальный размер полезной нагрузки события (64 КБ)
max_recursion_depthu88Максимальная глубина цепочек событие-порождает-событие
max_subscribers_per_topicusize64Максимум подписчиков на точный топик
channel_capacityusize1024Ёмкость ограниченного канала для очереди событий
delivery_timeout_msu645000Максимальное время ожидания обработки события подписчиком

Использование шины событий в плагинах

PDK (Plugin Development Kit)

PRX PDK предоставляет вспомогательные функции для взаимодействия с шиной событий внутри WASM-плагинов:

rust
use prx_pdk::event_bus;

// Подписка на события
event_bus::subscribe("prx.session.created", |event| {
    let payload: SessionCreated = serde_json::from_slice(&event.payload)?;
    log::info!("New session: {}", payload.session_id);
    Ok(())
})?;

// Публикация события
let payload = serde_json::to_vec(&MyEvent { data: "hello" })?;
event_bus::publish("prx.plugin.my_plugin.my_event", &payload)?;

Подписка в манифесте плагина

Плагины объявляют свои подписки в файле манифеста:

toml
# plugin.toml
[plugin]
name = "my-plugin"
version = "1.0.0"

[permissions]
event_bus_subscribe = [
    "prx.session.*",
    "prx.tool.after_execute",
]
event_bus_publish = [
    "prx.plugin.my_plugin.*",
]

Хост обеспечивает соблюдение этих объявлений разрешений. Плагин не может подписываться на или публиковать топики за пределами объявленных разрешений.

Гарантии доставки

Шина событий обеспечивает доставку at-most-once:

  • События диспетчеризуются всем соответствующим подписчикам асинхронно
  • Если подписчик медленный или не отвечает, событие отбрасывается после delivery_timeout_ms
  • Если внутренняя очередь событий заполнена (достигнута channel_capacity), новые события отбрасываются с предупреждением
  • Нет механизма персистентности, повторных попыток или подтверждения

Для сценариев, требующих гарантированной доставки, рассмотрите использование системы вебхуков или внешней очереди сообщений.

Защита от рекурсии

Когда обработчик события публикует другое событие, создаётся цепочка. Шина событий отслеживает глубину рекурсии и обеспечивает соблюдение max_recursion_depth:

prx.session.created           <- глубина 0
  -> обработчик публикует prx.plugin.audit.session_log    <- глубина 1
    -> обработчик публикует prx.plugin.metrics.counter     <- глубина 2
      -> ...

Если глубина превышает лимит, событие отбрасывается и записывается предупреждение:

WARN event_bus: Recursion depth 8 exceeded for topic prx.plugin.metrics.counter, event dropped

Перехват выполнения инструментов

Событие prx.tool.before_execute поддерживает перехват. Подписчики могут модифицировать или отменить вызов инструмента до его выполнения:

rust
event_bus::subscribe("prx.tool.before_execute", |event| {
    let mut payload: ToolBeforeExecute = serde_json::from_slice(&event.payload)?;

    // Блокировка опасных команд
    if payload.tool_name == "shell" && payload.args.contains("rm -rf") {
        return Err(EventBusError::Rejected("Dangerous command blocked".into()));
    }

    Ok(())
})?;

Когда любой подписчик возвращает ошибку, выполнение инструмента отменяется и ошибка сообщается агенту.

Мониторинг

CLI

bash
# Просмотр недавней активности шины событий
prx events --tail 50

# Фильтрация по паттерну топика
prx events --topic "prx.session.*"

# Показать полезные нагрузки событий
prx events --verbose

# Просмотр количества подписчиков
prx events stats

Метрики

Шина событий предоставляет метрики Prometheus:

МетрикаТипОписание
prx_event_bus_published_totalCounterВсего опубликованных событий по топикам
prx_event_bus_delivered_totalCounterВсего доставленных событий подписчикам
prx_event_bus_dropped_totalCounterОтброшенные события (очередь полна, таймаут, рекурсия)
prx_event_bus_delivery_duration_secondsHistogramВремя доставки событий подписчикам
prx_event_bus_subscribersGaugeТекущее количество подписчиков по топикам

Ограничения

  • Доставка at-most-once означает, что события могут быть потеряны при заполненной очереди или медленных подписчиках
  • Шина событий локальна для процесса PRX; события не распределяются между узлами
  • Размер полезной нагрузки ограничен 64 КБ; большие данные следует ссылать по ID, а не встраивать
  • Подстановочные подписки (особенно prx.**) могут создавать значительную нагрузку; используйте осторожно
  • Обработчики событий плагинов работают в песочнице WASM и не могут напрямую обращаться к файловой системе или сети
  • Порядок событий обеспечивается по мере возможности; подписчики могут получать события не по порядку при высокой нагрузке

Связанные страницы

Released under the Apache-2.0 License.