package borealis import ( "fmt" "github.com/nats-io/nats.go" ) type Bus struct { NATS *nats.EncodedConn } type SubscriberHandler func(event *Event) func Connect(url string) (*Bus, error) { nats.RegisterEncoder("v1", &V1MessageEncoder{}) conn, err := nats.Connect(url) if err != nil { return nil, fmt.Errorf("failed to connect to the broker: %w", err) } encodedConn, err := nats.NewEncodedConn(conn, "v1") if err != nil { return nil, fmt.Errorf("failed to configure the bus encoding: %w", err) } return &Bus{NATS: encodedConn}, nil } func (bus *Bus) Close() { bus.NATS.Close() } func (bus *Bus) Drain() error { return bus.NATS.Drain() } func (bus *Bus) SendEmail(request *SendEmail) error { return bus.Publish("email.outbound", request) } func (bus *Bus) RegisterID(request *RegisterID) error { return bus.Publish("id.register", request) } func (bus *Bus) VerifyEmail(request *VerifyEmail) error { return bus.Publish("id.verify", request) } func (bus *Bus) GrantAccess(request *GrantAccess) error { return bus.Publish("access.grant", request) } func (bus *Bus) RevokeAccess(request *RevokeAccess) error { return bus.Publish("access.revoke", request) } func (bus *Bus) Publish(stream string, request interface{}) error { rawEvent, err := NewRawEvent(0, request) // TODO if err != nil { return err } return bus.PublishRawEvent(stream, rawEvent) } func (bus *Bus) PublishRawEvent(stream string, rawEvent *RawEvent) error { if err := bus.NATS.Publish(stream, rawEvent); err != nil { return fmt.Errorf("failed to publish message: %w", err) } return nil } func (bus *Bus) Subscribe(stream string, handler SubscriberHandler) (*nats.Subscription, error) { return bus.NATS.Subscribe(stream, func(rawEvent RawEvent) { event, err := rawEvent.Check() if err == nil { // ignore invalid messages handler(event) } }) }