Шина событий
Шина событий PRX обеспечивает коммуникацию между плагинами и хост-системой через механизм публикации/подписки на основе топиков. Плагины могут публиковать события, подписываться на топики и реагировать на события жизненного цикла -- всё без прямой связи между компонентами.
Обзор
Шина событий обеспечивает:
- Маршрутизация по топикам -- события публикуются в иерархические топики и доставляются соответствующим подписчикам
- Подстановочные подписки -- подписка на целые поддеревья топиков с помощью glob-паттернов
- Ограничения полезной нагрузки -- максимум 64 КБ на полезную нагрузку события для предотвращения злоупотребления ресурсами
- Защита от рекурсии -- максимум 8 уровней глубины событие-порождает-событие для предотвращения бесконечных циклов
- Доставка at-most-once -- события доставляются подписчикам без персистентности или повторных попыток
Структура топиков
Топики следуют иерархической точечно-разделённой конвенции именования в пространстве имён prx.:
prx.<категория>.<событие>Встроенные топики
| Топик | Публикуется | Описание |
|---|---|---|
prx.lifecycle.started | Хост | PRX запущен и все компоненты инициализированы |
prx.lifecycle.stopping | Хост | PRX завершает работу; плагины должны выполнить очистку |
prx.lifecycle.config_reloaded | Хост | Конфигурация была перезагружена на лету |
prx.session.created | Хост | Создана новая сессия агента |
prx.session.terminated | Хост | Сессия агента завершена |
prx.session.message | Хост | Сообщение отправлено или получено в сессии |
prx.channel.connected | Хост | Канал установил соединение |
prx.channel.disconnected | Хост | Канал потерял соединение |
prx.channel.error | Хост | Канал столкнулся с ошибкой |
prx.tool.before_execute | Хост | Инструмент будет выполнен (может быть перехвачен) |
prx.tool.after_execute | Хост | Выполнение инструмента завершено |
prx.plugin.loaded | Хост | Плагин загружен |
prx.plugin.unloaded | Хост | Плагин выгружен |
prx.evolution.proposed | Хост | Сгенерировано предложение самоэволюции |
prx.evolution.applied | Хост | Применено изменение самоэволюции |
prx.evolution.rolled_back | Хост | Изменение самоэволюции откачено |
prx.memory.stored | Хост | Запись памяти сохранена |
prx.memory.recalled | Хост | Воспоминания вспомнены для контекста |
prx.cron.tick | Хост | Произошёл тик cron |
Пользовательские топики
Плагины могут публиковать в пользовательские топики под собственным пространством имён:
prx.plugin.<имя_плагина>.<событие>Например, погодный плагин может публиковать:
prx.plugin.weather.forecast_updated
prx.plugin.weather.alert_issuedПаттерны подписки
Точное совпадение
Подписка на один конкретный топик:
event_bus.subscribe("prx.session.created", handler);Подстановочное совпадение
Подписка на все топики в поддереве с помощью * (один уровень) или ** (несколько уровней):
// Все события сессии
event_bus.subscribe("prx.session.*", handler);
// Все события жизненного цикла
event_bus.subscribe("prx.lifecycle.*", handler);
// Все события конкретного плагина
event_bus.subscribe("prx.plugin.weather.*", handler);
// Все события (используйте осторожно)
event_bus.subscribe("prx.**", handler);| Паттерн | Совпадает | Не совпадает |
|---|---|---|
prx.session.* | prx.session.created, prx.session.terminated | prx.session.message.sent |
prx.session.** | prx.session.created, prx.session.message.sent | prx.channel.connected |
prx.*.connected | prx.channel.connected | prx.channel.error |
prx.** | Всё под prx. | Топики вне пространства имён prx. |
Структура события
Каждое событие содержит:
| Поле | Тип | Описание |
|---|---|---|
topic | String | Полный путь топика (напр., prx.session.created) |
payload | Vec<u8> | Сериализованные данные события (JSON по соглашению, макс. 64 КБ) |
source | String | Идентификатор публикатора (напр., host, plugin:weather) |
timestamp | u64 | Unix-временная метка в миллисекундах |
correlation_id | Option<String> | Опциональный ID для трассировки связанных событий |
Формат полезной нагрузки
Полезные нагрузки сериализуются как JSON по соглашению. Каждый топик определяет собственную схему. Например:
prx.session.created:
{
"session_id": "sess_abc123",
"channel": "telegram",
"user_id": "user:telegram:123456789"
}prx.tool.after_execute:
{
"session_id": "sess_abc123",
"tool_name": "shell",
"command": "ls -la /tmp",
"duration_ms": 45,
"success": true
}Конфигурация
[plugins.event_bus]
enabled = true
max_payload_bytes = 65536 # 64 КБ
max_recursion_depth = 8 # предотвращение бесконечных циклов событий
max_subscribers_per_topic = 64 # лимит подписчиков на топик
channel_capacity = 1024 # ёмкость внутренней очереди событий
delivery_timeout_ms = 5000 # таймаут для медленных подписчиковСправочник конфигурации
| Поле | Тип | По умолчанию | Описание |
|---|---|---|---|
enabled | bool | true | Включить или отключить шину событий |
max_payload_bytes | usize | 65536 | Максимальный размер полезной нагрузки события (64 КБ) |
max_recursion_depth | u8 | 8 | Максимальная глубина цепочек событие-порождает-событие |
max_subscribers_per_topic | usize | 64 | Максимум подписчиков на точный топик |
channel_capacity | usize | 1024 | Ёмкость ограниченного канала для очереди событий |
delivery_timeout_ms | u64 | 5000 | Максимальное время ожидания обработки события подписчиком |
Использование шины событий в плагинах
PDK (Plugin Development Kit)
PRX PDK предоставляет вспомогательные функции для взаимодействия с шиной событий внутри WASM-плагинов:
use prx_pdk::event_bus;
// Подписка на события
event_bus::subscribe("prx.session.created", |event| {
let payload: SessionCreated = serde_json::from_slice(&event.payload)?;
log::info!("New session: {}", payload.session_id);
Ok(())
})?;
// Публикация события
let payload = serde_json::to_vec(&MyEvent { data: "hello" })?;
event_bus::publish("prx.plugin.my_plugin.my_event", &payload)?;Подписка в манифесте плагина
Плагины объявляют свои подписки в файле манифеста:
# plugin.toml
[plugin]
name = "my-plugin"
version = "1.0.0"
[permissions]
event_bus_subscribe = [
"prx.session.*",
"prx.tool.after_execute",
]
event_bus_publish = [
"prx.plugin.my_plugin.*",
]Хост обеспечивает соблюдение этих объявлений разрешений. Плагин не может подписываться на или публиковать топики за пределами объявленных разрешений.
Гарантии доставки
Шина событий обеспечивает доставку at-most-once:
- События диспетчеризуются всем соответствующим подписчикам асинхронно
- Если подписчик медленный или не отвечает, событие отбрасывается после
delivery_timeout_ms - Если внутренняя очередь событий заполнена (достигнута
channel_capacity), новые события отбрасываются с предупреждением - Нет механизма персистентности, повторных попыток или подтверждения
Для сценариев, требующих гарантированной доставки, рассмотрите использование системы вебхуков или внешней очереди сообщений.
Защита от рекурсии
Когда обработчик события публикует другое событие, создаётся цепочка. Шина событий отслеживает глубину рекурсии и обеспечивает соблюдение max_recursion_depth:
prx.session.created <- глубина 0
-> обработчик публикует prx.plugin.audit.session_log <- глубина 1
-> обработчик публикует prx.plugin.metrics.counter <- глубина 2
-> ...Если глубина превышает лимит, событие отбрасывается и записывается предупреждение:
WARN event_bus: Recursion depth 8 exceeded for topic prx.plugin.metrics.counter, event droppedПерехват выполнения инструментов
Событие prx.tool.before_execute поддерживает перехват. Подписчики могут модифицировать или отменить вызов инструмента до его выполнения:
event_bus::subscribe("prx.tool.before_execute", |event| {
let mut payload: ToolBeforeExecute = serde_json::from_slice(&event.payload)?;
// Блокировка опасных команд
if payload.tool_name == "shell" && payload.args.contains("rm -rf") {
return Err(EventBusError::Rejected("Dangerous command blocked".into()));
}
Ok(())
})?;Когда любой подписчик возвращает ошибку, выполнение инструмента отменяется и ошибка сообщается агенту.
Мониторинг
CLI
# Просмотр недавней активности шины событий
prx events --tail 50
# Фильтрация по паттерну топика
prx events --topic "prx.session.*"
# Показать полезные нагрузки событий
prx events --verbose
# Просмотр количества подписчиков
prx events statsМетрики
Шина событий предоставляет метрики Prometheus:
| Метрика | Тип | Описание |
|---|---|---|
prx_event_bus_published_total | Counter | Всего опубликованных событий по топикам |
prx_event_bus_delivered_total | Counter | Всего доставленных событий подписчикам |
prx_event_bus_dropped_total | Counter | Отброшенные события (очередь полна, таймаут, рекурсия) |
prx_event_bus_delivery_duration_seconds | Histogram | Время доставки событий подписчикам |
prx_event_bus_subscribers | Gauge | Текущее количество подписчиков по топикам |
Ограничения
- Доставка at-most-once означает, что события могут быть потеряны при заполненной очереди или медленных подписчиках
- Шина событий локальна для процесса PRX; события не распределяются между узлами
- Размер полезной нагрузки ограничен 64 КБ; большие данные следует ссылать по ID, а не встраивать
- Подстановочные подписки (особенно
prx.**) могут создавать значительную нагрузку; используйте осторожно - Обработчики событий плагинов работают в песочнице WASM и не могут напрямую обращаться к файловой системе или сети
- Порядок событий обеспечивается по мере возможности; подписчики могут получать события не по порядку при высокой нагрузке
Связанные страницы
- Обзор системы плагинов
- Архитектура плагинов -- среда выполнения WASM и граница хост-гость
- Руководство разработчика -- создание плагинов с PDK
- Хост-функции -- хост-функции, доступные плагинам
- Вебхуки -- для гарантированной доставки внешним системам