BusDK Update

Integraatiotyöntekijät saavat yhteisen runtime-kirjaston

bus-integration kokoaa Bus Events -pohjaisten integraatiotyöntekijöiden yhteisen runtime-koodin yhteen Go-kirjastoon. Se tarjoaa worker-loopin, response-eventien publisher-adapterin ja request/reply-korrelaation.

Kirjasto on tarkoituksella kapea. Pilvi-API:t, SSH-yhteydet, Podman-skriptit, konttien ajaminen ja HTTP-controllerit pysyvät omissa moduuleissaan.

26.4.2026bus-integrationBus EventsIntegration runtime

Tiiviisti

TL;DR

  • bus-integration on library-only-moduuli bus-integration-*-työntekijöille.
  • RunWorker sitoo Events API -kuuntelun handleriin ja antaa handlerille yhden Publisher-rajapinnan vastausten julkaisuun.
  • Requester hoitaa request/reply-korrelaation, ja Host voi ajaa useita worker-rekisteröintejä samassa prosessissa.
  • Yhteiset retry/readiness-asetukset tekevät vaaditusta listeneristä valmiin heti, kun Events API -stream avautuu, eivät vasta ensimmäisen eventin jälkeen.

Integraatiotyöntekijä voi käynnistää tapahtumakuuntelun ilman omaa loop-koodia. Worker kertoo kuunneltavat event-nimet, work-ryhmän ja consumerin; handler saa sisään tulevan eventin ja publisherin:

err := integration.RunWorker(ctx, integration.WorkerOptions{
    Client:   eventsClient,
    Names:    []string{"bus.vm.status.request"},
    Group:    "bus-integration-upcloud",
    Consumer: "worker-1",
    Once:     true,
}, func(ctx context.Context, publisher integration.Publisher, event events.Event) error {
    return publisher.Publish(ctx, events.Event{
        Version:       events.EnvelopeVersionV1,
        Name:          "bus.vm.status.response",
        CorrelationID: event.CorrelationID,
        Payload:       []byte(`{"status":{"state":"ready"}}`),
    })
})

Request/reply-puolella caller julkaisee pyynnön ja odottaa vastausta samalla correlation ID:llä. Jos vastaus tulee väärällä nimellä, payload sisältää event-errorin tai timeout täyttyy, caller saa virheen sen sijaan, että integraatiomoduulin tarvitsee rakentaa oma pending-map jokaiseen worker-prosessiin:

requester := integration.NewRequester(eventsClient, "bus-integration")
payload, err := requester.Request(
    ctx,
    "bus.vm.status.request",
    "bus.vm.status.response",
    map[string]string{"account_id": accountID},
    30*time.Second,
)

Vaadittujen response-listenerien valmius käyttää samaa kirjastoa. BUS_EVENTS_LISTENER_RETRY ohjaa reconnect-politiikkaa, ja BUS_EVENTS_LISTENER_REQUIRED=1 antaa providerille readiness-tilan, jonka se voi julkaista omassa /readyz-vastauksessaan. Tila muuttuu valmiiksi, kun Events API hyväksyy streamin; tyhjä mutta auki oleva stream riittää, eikä palvelun tarvitse odottaa ensimmäistä response-eventtiä.

Kun worker-moduuli haluaa olla sekä itsenäinen binääri että osa jaettua host-prosessia, se voi palauttaa WorkerRegistration-arvon. Host ajaa yhden tai useamman rekisteröinnin samalla Events API -clientilla ja palauttaa ensimmäisen worker-virheen.

Tämän runtime-kerroksen arvo näkyy heti UpCloud-polussa. bus-api-provider-vm ja bus-api-provider-containers voivat pysyä HTTP- ja JWT-provider-moduuleina, bus-integration-upcloud voi omistaa UpCloud-työn, ja bus-integration-ssh-runner voi omistaa SSH-transportin. Yhteinen event-worker-mekaniikka ei valu niihin kaikkiin kopioituna.

Moduulin nykyinen pinta löytyy bus-integration-dokumentaatiosta. Events API -pohja on kuvattu bus-eventsin dokumentaatiossa, ja UpCloud-workerin provider-raja löytyy bus-integration-upcloudin dokumentaatiosta.