Browse Source

Prefix a version byte to messages.

Arto Bendiken 2 years ago
parent
commit
780e3dbcc9
2 changed files with 27 additions and 8 deletions
  1. 1 1
      bus.go
  2. 26 7
      message.go

+ 1 - 1
bus.go

@@ -11,7 +11,7 @@ type Bus struct {
 }
 
 func Connect(url string) (*Bus, error) {
-	nats.RegisterEncoder("v1", &MessageEncoderV1{})
+	nats.RegisterEncoder("v1", &V1MessageEncoder{})
 	conn, err := nats.Connect(url)
 	if err != nil {
 		return nil, fmt.Errorf("failed to connect to the broker: %w", err)

+ 26 - 7
message.go

@@ -1,19 +1,38 @@
 package borealis
 
-import "github.com/fxamacker/cbor/v2"
+import (
+	"fmt"
 
-type MessageEncoderV1 struct{}
+	"github.com/fxamacker/cbor/v2"
+)
 
-func (encoder *MessageEncoderV1) Encode(stream string, message interface{}) ([]byte, error) {
-	return cbor.Marshal(message)
+const (
+	V1MessageVersion = 1
+	V1MessageMinLen  = 23
+)
+
+type V1MessageEncoder struct{}
+
+func (encoder *V1MessageEncoder) Encode(stream string, message interface{}) ([]byte, error) {
+	output, err := cbor.Marshal(message)
+	if err != nil {
+		return nil, err
+	}
+	return append([]byte{V1MessageVersion}, output...), nil
 }
 
-func (encoder *MessageEncoderV1) Decode(stream string, input []byte, messagePtr interface{}) error {
+func (encoder *V1MessageEncoder) Decode(stream string, input []byte, messagePtr interface{}) error {
+	if len(input) < V1MessageMinLen {
+		return fmt.Errorf("expected message length %d+, but got %d", V1MessageMinLen, len(input))
+	}
+	if input[0] != V1MessageVersion {
+		return fmt.Errorf("expected message version %d, but got %d", V1MessageVersion, input[0])
+	}
 	switch message := messagePtr.(type) {
 	case *[]byte:
-		*message = input
+		*message = input[1:]
 		return nil
 	default:
-		return cbor.Unmarshal(input, message)
+		return cbor.Unmarshal(input[1:], message)
 	}
 }