bus.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package borealis
  2. import (
  3. "fmt"
  4. "github.com/nats-io/nats.go"
  5. )
  6. type Bus struct {
  7. NATS *nats.EncodedConn
  8. }
  9. type SubscriberHandler func(event *Event[any])
  10. func Connect[T any](url string) (*Bus, error) {
  11. nats.RegisterEncoder("v1", &V1MessageCodec[T]{})
  12. conn, err := nats.Connect(url)
  13. if err != nil {
  14. return nil, fmt.Errorf("failed to connect to the broker: %w", err)
  15. }
  16. encodedConn, err := nats.NewEncodedConn(conn, "v1")
  17. if err != nil {
  18. return nil, fmt.Errorf("failed to configure the bus encoding: %w", err)
  19. }
  20. return &Bus{NATS: encodedConn}, nil
  21. }
  22. func (bus *Bus) Close() {
  23. bus.NATS.Close()
  24. }
  25. func (bus *Bus) Drain() error {
  26. return bus.NATS.Drain()
  27. }
  28. func (bus *Bus) SendEmail(request *SendEmail) error {
  29. return bus.Publish("email.outbound", request)
  30. }
  31. func (bus *Bus) RegisterID(request *RegisterID) error {
  32. return bus.Publish("id.register", request)
  33. }
  34. func (bus *Bus) VerifyEmail(request *VerifyEmail) error {
  35. return bus.Publish("id.verify", request)
  36. }
  37. func (bus *Bus) GrantAccess(request *GrantAccess) error {
  38. return bus.Publish("access.grant", request)
  39. }
  40. func (bus *Bus) RevokeAccess(request *RevokeAccess) error {
  41. return bus.Publish("access.revoke", request)
  42. }
  43. func (bus *Bus) Publish(stream string, request interface{}) error {
  44. rawEvent, err := NewRawEvent(0, request) // TODO
  45. if err != nil {
  46. return err
  47. }
  48. return bus.PublishRawEvent(stream, rawEvent)
  49. }
  50. func (bus *Bus) PublishRawEvent(stream string, rawEvent *RawEvent[any]) error {
  51. if err := bus.NATS.Publish(stream, rawEvent); err != nil {
  52. return fmt.Errorf("failed to publish message: %w", err)
  53. }
  54. return nil
  55. }
  56. func (bus *Bus) Subscribe(stream string, handler SubscriberHandler) (*nats.Subscription, error) {
  57. return bus.NATS.Subscribe(stream, func(rawEvent RawEvent[any]) {
  58. event, err := rawEvent.Check()
  59. if err == nil { // ignore invalid messages
  60. handler(event)
  61. }
  62. })
  63. }