bus.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package borealis
  2. import (
  3. "fmt"
  4. "github.com/nats-io/nats.go"
  5. )
  6. type Bus struct {
  7. NATS *nats.EncodedConn
  8. }
  9. func Connect(url string) (*Bus, error) {
  10. nats.RegisterEncoder("v1", &V1MessageEncoder{})
  11. conn, err := nats.Connect(url)
  12. if err != nil {
  13. return nil, fmt.Errorf("failed to connect to the broker: %w", err)
  14. }
  15. encodedConn, err := nats.NewEncodedConn(conn, "v1")
  16. if err != nil {
  17. return nil, fmt.Errorf("failed to configure the bus encoding: %w", err)
  18. }
  19. return &Bus{NATS: encodedConn}, nil
  20. }
  21. func (bus *Bus) SendEmail(request *SendEmail) error {
  22. return bus.Publish("email.outbound", request)
  23. }
  24. func (bus *Bus) RegisterID(request *RegisterID) error {
  25. return bus.Publish("id.register", request)
  26. }
  27. func (bus *Bus) VerifyEmail(request *VerifyEmail) error {
  28. return bus.Publish("id.verify", request)
  29. }
  30. func (bus *Bus) GrantAccess(request *GrantAccess) error {
  31. return bus.Publish("access.grant", request)
  32. }
  33. func (bus *Bus) RevokeAccess(request *RevokeAccess) error {
  34. return bus.Publish("access.revoke", request)
  35. }
  36. func (bus *Bus) Publish(stream string, request interface{}) error {
  37. rawEvent, err := NewRawEvent(0, request) // TODO
  38. if err != nil {
  39. return err
  40. }
  41. return bus.PublishRawEvent(stream, rawEvent)
  42. }
  43. func (bus *Bus) PublishRawEvent(stream string, rawEvent *RawEvent) error {
  44. if err := bus.NATS.Publish(stream, rawEvent); err != nil {
  45. return fmt.Errorf("failed to publish message: %w", err)
  46. }
  47. return nil
  48. }