123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- package borealis
- import (
- "fmt"
- "github.com/nats-io/nats.go"
- )
- type Bus struct {
- NATS *nats.EncodedConn
- }
- type SubscriberHandler func(event *Event)
- func Connect(url string) (*Bus, error) {
- nats.RegisterEncoder("v1", &V1MessageEncoder{})
- conn, err := nats.Connect(url)
- if err != nil {
- return nil, fmt.Errorf("failed to connect to the broker: %w", err)
- }
- encodedConn, err := nats.NewEncodedConn(conn, "v1")
- if err != nil {
- return nil, fmt.Errorf("failed to configure the bus encoding: %w", err)
- }
- return &Bus{NATS: encodedConn}, nil
- }
- func (bus *Bus) Close() {
- bus.NATS.Close()
- }
- func (bus *Bus) Drain() error {
- return bus.NATS.Drain()
- }
- func (bus *Bus) SendEmail(request *SendEmail) error {
- return bus.Publish("email.outbound", request)
- }
- func (bus *Bus) RegisterID(request *RegisterID) error {
- return bus.Publish("id.register", request)
- }
- func (bus *Bus) VerifyEmail(request *VerifyEmail) error {
- return bus.Publish("id.verify", request)
- }
- func (bus *Bus) GrantAccess(request *GrantAccess) error {
- return bus.Publish("access.grant", request)
- }
- func (bus *Bus) RevokeAccess(request *RevokeAccess) error {
- return bus.Publish("access.revoke", request)
- }
- func (bus *Bus) Publish(stream string, request interface{}) error {
- rawEvent, err := NewRawEvent(0, request) // TODO
- if err != nil {
- return err
- }
- return bus.PublishRawEvent(stream, rawEvent)
- }
- func (bus *Bus) PublishRawEvent(stream string, rawEvent *RawEvent) error {
- if err := bus.NATS.Publish(stream, rawEvent); err != nil {
- return fmt.Errorf("failed to publish message: %w", err)
- }
- return nil
- }
- func (bus *Bus) Subscribe(stream string, handler SubscriberHandler) (*nats.Subscription, error) {
- return bus.NATS.Subscribe(stream, func(rawEvent RawEvent) {
- event, err := rawEvent.Check()
- if err == nil { // ignore invalid messages
- handler(event)
- }
- })
- }
|