Skip to content
Diese Seite wurde mit KI-Unterstützung erstellt und übersetzt. Falls Ihnen Ungenauigkeiten auffallen, helfen Sie gerne bei der Verbesserung. Auf GitHub bearbeiten

Event-Bus

Der PRX-Event-Bus ermoglicht die Kommunikation zwischen Plugins und dem Host-System uber einen themenbasierten Publish/Subscribe-Mechanismus. Plugins konnen Events veroffentlichen, Themen abonnieren und auf Lebenszyklus-Events reagieren -- alles ohne direkte Kopplung zwischen Komponenten.

Ubersicht

Der Event-Bus bietet:

  • Themenbasiertes Routing -- Events werden an hierarchische Themen veroffentlicht und an passende Abonnenten zugestellt
  • Wildcard-Abonnements -- ganze Themen-Unterbaume mit Glob-Mustern abonnieren
  • Payload-Limits -- maximal 64 KB pro Event-Payload, um Ressourcenmissbrauch zu verhindern
  • Rekursionsschutz -- maximal 8 Ebenen von Event-ausgelosten-Event-Tiefe, um Endlosschleifen zu verhindern
  • At-most-once-Zustellung -- Events werden ohne Persistenz oder Wiederholung an Abonnenten zugestellt

Themenstruktur

Themen folgen einer hierarchischen, punktgetrennten Namenskonvention unter dem prx.-Namensraum:

prx.<kategorie>.<event>

Eingebaute Themen

ThemaVeroffentlicht vonBeschreibung
prx.lifecycle.startedHostPRX wurde gestartet und alle Komponenten sind initialisiert
prx.lifecycle.stoppingHostPRX fahrt herunter; Plugins sollten aufraumen
prx.lifecycle.config_reloadedHostKonfiguration wurde hot-reloaded
prx.session.createdHostEine neue Agenten-Sitzung wurde erstellt
prx.session.terminatedHostEine Agenten-Sitzung wurde beendet
prx.session.messageHostEine Nachricht wurde in einer Sitzung gesendet oder empfangen
prx.channel.connectedHostEin Kanal hat eine Verbindung hergestellt
prx.channel.disconnectedHostEin Kanal hat seine Verbindung verloren
prx.channel.errorHostEin Kanal hat einen Fehler festgestellt
prx.tool.before_executeHostEin Werkzeug steht kurz vor der Ausfuhrung (kann abgefangen werden)
prx.tool.after_executeHostEine Werkzeug-Ausfuhrung wurde abgeschlossen
prx.plugin.loadedHostEin Plugin wurde geladen
prx.plugin.unloadedHostEin Plugin wurde entladen
prx.evolution.proposedHostEin Selbstevolutions-Vorschlag wurde generiert
prx.evolution.appliedHostEine Selbstevolutions-Anderung wurde angewendet
prx.evolution.rolled_backHostEine Selbstevolutions-Anderung wurde zuruckgerollt
prx.memory.storedHostEin Gedachtnis-Eintrag wurde gespeichert
prx.memory.recalledHostErinnerungen wurden fur den Kontext abgerufen
prx.cron.tickHostEin Cron-Heartbeat ist aufgetreten

Benutzerdefinierte Themen

Plugins konnen unter ihrem eigenen Namensraum auf benutzerdefinierte Themen veroffentlichen:

prx.plugin.<plugin_name>.<event>

Ein Wetter-Plugin konnte beispielsweise veroffentlichen:

prx.plugin.weather.forecast_updated
prx.plugin.weather.alert_issued

Abonnement-Muster

Exakte Ubereinstimmung

Ein einzelnes spezifisches Thema abonnieren:

rust
event_bus.subscribe("prx.session.created", handler);

Wildcard-Ubereinstimmung

Alle Themen unter einem Unterbaum mit * (einzelne Ebene) oder ** (mehrere Ebenen) abonnieren:

rust
// Alle Sitzungs-Events
event_bus.subscribe("prx.session.*", handler);

// Alle Lebenszyklus-Events
event_bus.subscribe("prx.lifecycle.*", handler);

// Alle Events eines bestimmten Plugins
event_bus.subscribe("prx.plugin.weather.*", handler);

// Alle Events (sparsam verwenden)
event_bus.subscribe("prx.**", handler);
MusterTrifft zuTrifft nicht zu
prx.session.*prx.session.created, prx.session.terminatedprx.session.message.sent
prx.session.**prx.session.created, prx.session.message.sentprx.channel.connected
prx.*.connectedprx.channel.connectedprx.channel.error
prx.**Alles unter prx.Themen ausserhalb des prx.-Namensraums

Event-Struktur

Jedes Event enthalt:

FeldTypBeschreibung
topicStringDer vollstandige Themenpfad (z.B. prx.session.created)
payloadVec<u8>Serialisierte Event-Daten (JSON nach Konvention, max. 64 KB)
sourceStringDie Identitat des Veroffentlichers (z.B. host, plugin:weather)
timestampu64Unix-Zeitstempel in Millisekunden
correlation_idOption<String>Optionale ID zur Nachverfolgung zusammenhangender Events

Payload-Format

Payloads werden nach Konvention als JSON serialisiert. Jedes Thema definiert sein eigenes Payload-Schema. Beispiel:

prx.session.created:

json
{
  "session_id": "sess_abc123",
  "channel": "telegram",
  "user_id": "user:telegram:123456789"
}

prx.tool.after_execute:

json
{
  "session_id": "sess_abc123",
  "tool_name": "shell",
  "command": "ls -la /tmp",
  "duration_ms": 45,
  "success": true
}

Konfiguration

toml
[plugins.event_bus]
enabled = true
max_payload_bytes = 65536           # 64 KB
max_recursion_depth = 8             # Endlose Event-Schleifen verhindern
max_subscribers_per_topic = 64      # Abonnenten pro Thema begrenzen
channel_capacity = 1024             # Interne Event-Queue-Kapazitat
delivery_timeout_ms = 5000          # Timeout fur langsame Abonnenten

Konfigurationsreferenz

FeldTypStandardBeschreibung
enabledbooltrueEvent-Bus aktivieren oder deaktivieren
max_payload_bytesusize65536Maximale Event-Payload-Grosse (64 KB)
max_recursion_depthu88Maximale Tiefe von Event-ausgelosten-Event-Ketten
max_subscribers_per_topicusize64Maximale Abonnenten pro exaktem Thema
channel_capacityusize1024Begrenzte Kanalkapazitat fur die Event-Queue
delivery_timeout_msu645000Maximale Wartezeit, bis ein Abonnent ein Event verarbeitet

Event-Bus in Plugins verwenden

PDK (Plugin Development Kit)

Das PRX-PDK bietet Hilfsfunktionen fur die Event-Bus-Interaktion innerhalb von WASM-Plugins:

rust
use prx_pdk::event_bus;

// Events abonnieren
event_bus::subscribe("prx.session.created", |event| {
    let payload: SessionCreated = serde_json::from_slice(&event.payload)?;
    log::info!("New session: {}", payload.session_id);
    Ok(())
})?;

// Ein Event veroffentlichen
let payload = serde_json::to_vec(&MyEvent { data: "hello" })?;
event_bus::publish("prx.plugin.my_plugin.my_event", &payload)?;

Abonnements im Plugin-Manifest

Plugins deklarieren ihre Abonnements in der Manifest-Datei:

toml
# plugin.toml
[plugin]
name = "my-plugin"
version = "1.0.0"

[permissions]
event_bus_subscribe = [
    "prx.session.*",
    "prx.tool.after_execute",
]
event_bus_publish = [
    "prx.plugin.my_plugin.*",
]

Der Host erzwingt diese Berechtigungsdeklarationen. Ein Plugin kann keine Themen ausserhalb seiner deklarierten Berechtigungen abonnieren oder darin veroffentlichen.

Zustellungsgarantien

Der Event-Bus bietet At-most-once-Zustellung:

  • Events werden asynchron an alle passenden Abonnenten verteilt
  • Wenn ein Abonnent langsam oder nicht reagierend ist, wird das Event nach delivery_timeout_ms verworfen
  • Wenn die interne Event-Queue voll ist (channel_capacity erreicht), werden neue Events mit einer Warnung verworfen
  • Es gibt keinen Persistenz-, Wiederholungs- oder Bestatigungs-Mechanismus

Fur Anwendungsfalle, die garantierte Zustellung erfordern, sollten Sie das Webhook-System oder eine externe Nachrichtenwarteschlange verwenden.

Rekursionsschutz

Wenn ein Event-Handler ein weiteres Event veroffentlicht, entsteht eine Kette. Der Event-Bus verfolgt die Rekursionstiefe und erzwingt max_recursion_depth:

prx.session.created           ← Tiefe 0
  → Handler veroffentlicht prx.plugin.audit.session_log    ← Tiefe 1
    → Handler veroffentlicht prx.plugin.metrics.counter     ← Tiefe 2
      → ...

Wenn die Tiefe das Limit uberschreitet, wird das Event verworfen und eine Warnung protokolliert:

WARN event_bus: Recursion depth 8 exceeded for topic prx.plugin.metrics.counter, event dropped

Werkzeug-Ausfuhrung abfangen

Das prx.tool.before_execute-Event unterstutzt Abfangen. Abonnenten konnen einen Werkzeugaufruf modifizieren oder abbrechen, bevor er ausgefuhrt wird:

rust
event_bus::subscribe("prx.tool.before_execute", |event| {
    let mut payload: ToolBeforeExecute = serde_json::from_slice(&event.payload)?;

    // Gefahrliche Befehle blockieren
    if payload.tool_name == "shell" && payload.args.contains("rm -rf") {
        return Err(EventBusError::Rejected("Dangerous command blocked".into()));
    }

    Ok(())
})?;

Wenn ein Abonnent einen Fehler zuruckgibt, wird die Werkzeug-Ausfuhrung abgebrochen und der Fehler dem Agenten gemeldet.

Uberwachung

CLI

bash
# Letzte Event-Bus-Aktivitat anzeigen
prx events --tail 50

# Nach Themenmuster filtern
prx events --topic "prx.session.*"

# Event-Payloads anzeigen
prx events --verbose

# Abonnenten-Zahlen anzeigen
prx events stats

Metriken

Der Event-Bus stellt Prometheus-Metriken bereit:

MetrikTypBeschreibung
prx_event_bus_published_totalCounterGesamtzahl veroffentlichter Events nach Thema
prx_event_bus_delivered_totalCounterGesamtzahl an Abonnenten zugestellter Events
prx_event_bus_dropped_totalCounterVerworfene Events (Queue voll, Timeout, Rekursion)
prx_event_bus_delivery_duration_secondsHistogramZeit zur Zustellung von Events an Abonnenten
prx_event_bus_subscribersGaugeAktuelle Abonnenten-Anzahl nach Thema

Einschrankungen

  • At-most-once-Zustellung bedeutet, dass Events verloren gehen konnen, wenn die Queue voll ist oder Abonnenten langsam sind
  • Der Event-Bus ist lokal fur den PRX-Prozess; Events werden nicht uber Nodes verteilt
  • Die Payload-Grosse ist auf 64 KB begrenzt; grosse Daten sollten uber ID referenziert statt eingebettet werden
  • Wildcard-Abonnements (insbesondere prx.**) konnen erhebliche Last erzeugen; sparsam verwenden
  • Plugin-Event-Handler laufen in der WASM-Sandbox und konnen nicht direkt auf das Dateisystem oder Netzwerk zugreifen
  • Die Event-Reihenfolge ist best-effort; Abonnenten konnen Events unter hoher Last in falscher Reihenfolge empfangen

Verwandte Seiten

Released under the Apache-2.0 License.