Browse Source

Hook up the message encoder/decoder.

Arto Bendiken 3 years ago
parent
commit
a44378680d
4 changed files with 63 additions and 25 deletions
  1. 20 24
      bus.go
  2. 19 0
      message.go
  3. 17 1
      raw_event.go
  4. 7 0
      raw_event_test.go

+ 20 - 24
bus.go

@@ -11,11 +11,12 @@ type Bus struct {
 }
 }
 
 
 func Connect(url string) (*Bus, error) {
 func Connect(url string) (*Bus, error) {
+	nats.RegisterEncoder("v1", &MessageEncoderV1{})
 	conn, err := nats.Connect(url)
 	conn, err := nats.Connect(url)
 	if err != nil {
 	if err != nil {
 		return nil, fmt.Errorf("failed to connect to the broker: %w", err)
 		return nil, fmt.Errorf("failed to connect to the broker: %w", err)
 	}
 	}
-	encodedConn, err := nats.NewEncodedConn(conn, nats.JSON_ENCODER) // TODO
+	encodedConn, err := nats.NewEncodedConn(conn, "v1")
 	if err != nil {
 	if err != nil {
 		return nil, fmt.Errorf("failed to configure the bus encoding: %w", err)
 		return nil, fmt.Errorf("failed to configure the bus encoding: %w", err)
 	}
 	}
@@ -23,41 +24,36 @@ func Connect(url string) (*Bus, error) {
 }
 }
 
 
 func (bus *Bus) SendEmail(request *SendEmail) error {
 func (bus *Bus) SendEmail(request *SendEmail) error {
-	envelope := request // TODO
-	if err := bus.NATS.Publish("email.outbound", envelope); err != nil {
-		return fmt.Errorf("failed to enqueue outbound email: %w", err)
-	}
-	return nil
+	return bus.Publish("email.outbound", request)
 }
 }
 
 
 func (bus *Bus) RegisterID(request *RegisterID) error {
 func (bus *Bus) RegisterID(request *RegisterID) error {
-	envelope := request // TODO
-	if err := bus.NATS.Publish("id.register", envelope); err != nil {
-		return fmt.Errorf("failed to enqueue ID registration: %w", err)
-	}
-	return nil
+	return bus.Publish("id.register", request)
 }
 }
 
 
 func (bus *Bus) VerifyEmail(request *VerifyEmail) error {
 func (bus *Bus) VerifyEmail(request *VerifyEmail) error {
-	envelope := request // TODO
-	if err := bus.NATS.Publish("id.verify", envelope); err != nil {
-		return fmt.Errorf("failed to enqueue ID verification: %w", err)
-	}
-	return nil
+	return bus.Publish("id.verify", request)
 }
 }
 
 
 func (bus *Bus) GrantAccess(request *GrantAccess) error {
 func (bus *Bus) GrantAccess(request *GrantAccess) error {
-	envelope := request // TODO
-	if err := bus.NATS.Publish("access.grant", envelope); err != nil {
-		return fmt.Errorf("failed to enqueue access grant: %w", err)
-	}
-	return nil
+	return bus.Publish("access.grant", request)
 }
 }
 
 
 func (bus *Bus) RevokeAccess(request *RevokeAccess) error {
 func (bus *Bus) RevokeAccess(request *RevokeAccess) error {
-	envelope := request // TODO
-	if err := bus.NATS.Publish("access.revoke", envelope); err != nil {
-		return fmt.Errorf("failed to enqueue access revocation: %w", err)
+	return bus.Publish("access.revoke", request)
+}
+
+func (bus *Bus) Publish(stream string, request interface{}) error {
+	rawEvent, err := NewRawEvent(0, request) // TODO
+	if err != nil {
+		return err
+	}
+	return bus.PublishRawEvent(stream, rawEvent)
+}
+
+func (bus *Bus) PublishRawEvent(stream string, rawEvent *RawEvent) error {
+	if err := bus.NATS.Publish(stream, rawEvent); err != nil {
+		return fmt.Errorf("failed to publish message: %w", err)
 	}
 	}
 	return nil
 	return nil
 }
 }

+ 19 - 0
message.go

@@ -0,0 +1,19 @@
+package borealis
+
+import "github.com/fxamacker/cbor/v2"
+
+type MessageEncoderV1 struct{}
+
+func (encoder *MessageEncoderV1) Encode(stream string, message interface{}) ([]byte, error) {
+	return cbor.Marshal(message)
+}
+
+func (encoder *MessageEncoderV1) Decode(stream string, input []byte, messagePtr interface{}) error {
+	switch message := messagePtr.(type) {
+	case *[]byte:
+		*message = input
+		return nil
+	default:
+		return cbor.Unmarshal(input, message)
+	}
+}

+ 17 - 1
raw_event.go

@@ -22,7 +22,23 @@ type RawEvent struct {
 	TimestampS   uint32
 	TimestampS   uint32
 	TimestampMS  uint16
 	TimestampMS  uint16
 	UniqueID     []byte
 	UniqueID     []byte
-	Payload      map[string]interface{}
+	Payload      interface{}
+}
+
+func NewRawEvent(type_ uint16, payload interface{}) (*RawEvent, error) {
+	now := time.Now()
+	ksuid, err := ksuid.NewRandomWithTime(now)
+	if err != nil {
+		return nil, err
+	}
+	return &RawEvent{
+		Type:         type_,
+		SequentialID: 0,
+		TimestampS:   uint32(now.Unix() - btcEpoch),
+		TimestampMS:  uint16(now.UnixMilli() % 1000),
+		UniqueID:     ksuid[4:],
+		Payload:      payload,
+	}, nil
 }
 }
 
 
 func (rawEvent RawEvent) Check() (*Event, error) {
 func (rawEvent RawEvent) Check() (*Event, error) {

+ 7 - 0
raw_event_test.go

@@ -30,6 +30,13 @@ var expectedEventRaw = RawEvent{
 var expectedEventJSON = "[1,2,3,4,\"AAECAwQFBgcICQoLDA0ODw==\",{}]"
 var expectedEventJSON = "[1,2,3,4,\"AAECAwQFBgcICQoLDA0ODw==\",{}]"
 var expectedEventCBOR = []byte{0x86, 1, 2, 3, 4, 0x50, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0xa0}
 var expectedEventCBOR = []byte{0x86, 1, 2, 3, 4, 0x50, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0xa0}
 
 
+func TestNewRawEvent(t *testing.T) {
+	_, err := NewRawEvent(0x1234, make(map[string]interface{}))
+	if err != nil {
+		t.Error(err)
+	}
+}
+
 func TestCheckEvent(t *testing.T) {
 func TestCheckEvent(t *testing.T) {
 	actualEvent, err := expectedEventRaw.Check()
 	actualEvent, err := expectedEventRaw.Check()
 	if err != nil {
 	if err != nil {