Bladeren bron

Distinguish between `RawEvent` and `Event`.

Arto Bendiken 3 jaren geleden
bovenliggende
commit
12bbaf9d59
6 gewijzigde bestanden met toevoegingen van 237 en 160 verwijderingen
  1. 7 101
      event.go
  2. 0 59
      event_test.go
  3. 2 0
      go.mod
  4. 4 0
      go.sum
  5. 143 0
      raw_event.go
  6. 81 0
      raw_event_test.go

+ 7 - 101
event.go

@@ -1,114 +1,20 @@
 package borealis
 
 import (
-	"bytes"
-	"encoding/base64"
-	"encoding/json"
-	"fmt"
 	"reflect"
+	"time"
 
-	"github.com/fxamacker/cbor/v2"
+	"github.com/segmentio/ksuid"
 )
 
-type Event struct { // TODO: EventEnvelope or RawEvent
+type Event struct {
 	Type         uint16
-	TimestampS   uint32
-	TimestampMS  uint16
 	SequentialID uint64
-	UniqueID     []byte
-	Payload      map[string]interface{}
+	Timestamp    time.Time
+	UniqueID     ksuid.KSUID
+	Payload      interface{}
 }
 
 func (event Event) Equal(other Event) bool {
-	if event.Type != other.Type ||
-		event.TimestampS != other.TimestampS ||
-		event.TimestampMS != other.TimestampMS ||
-		event.SequentialID != other.SequentialID ||
-		!bytes.Equal(event.UniqueID, other.UniqueID) {
-		return false
-	}
-	return reflect.DeepEqual(event.Payload, other.Payload)
-}
-
-func (event Event) MarshalJSON() ([]byte, error) {
-	return json.Marshal([]interface{}{
-		event.Type,
-		event.TimestampS,
-		event.TimestampMS,
-		event.SequentialID,
-		event.UniqueID, // automatically Base64-encoded
-		event.Payload,
-	})
-}
-
-func (event Event) MarshalCBOR() ([]byte, error) {
-	return cbor.Marshal([]interface{}{
-		event.Type,
-		event.TimestampS,
-		event.TimestampMS,
-		event.SequentialID,
-		event.UniqueID,
-		event.Payload,
-	})
-}
-
-func (event *Event) UnmarshalJSON(input []byte) error {
-	var err error
-	array := []interface{}{}
-	if err = json.Unmarshal(input, &array); err != nil {
-		return err
-	}
-	if len(array) != 6 {
-		return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array))
-	}
-
-	event.Type = uint16(array[0].(float64))
-	event.TimestampS = uint32(array[1].(float64))
-	event.TimestampMS = uint16(array[2].(float64))
-	event.SequentialID = uint64(array[3].(float64))
-	event.UniqueID, err = base64.StdEncoding.DecodeString(array[4].(string))
-	if err != nil {
-		return err
-	}
-
-	payload, ok := array[5].(map[string]interface{})
-	if !ok {
-		return fmt.Errorf("event payload must be a map")
-	}
-	event.Payload = payload
-
-	return nil
-}
-
-func (event *Event) UnmarshalCBOR(input []byte) error {
-	var err error
-	array := []interface{}{}
-	if err = cbor.Unmarshal(input, &array); err != nil {
-		return err
-	}
-	if len(array) != 6 {
-		return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array))
-	}
-
-	event.Type = uint16(array[0].(uint64))
-	event.TimestampS = uint32(array[1].(uint64))
-	event.TimestampMS = uint16(array[2].(uint64))
-	event.SequentialID = uint64(array[3].(uint64))
-	event.UniqueID = array[4].([]byte)
-
-	payloadIn, ok := array[5].(map[interface{}]interface{})
-	if !ok {
-		return fmt.Errorf("event payload must be a map")
-	}
-	payloadOut := make(map[string]interface{})
-	for k, v := range payloadIn {
-		var key string
-		if key, ok = k.(string); !ok {
-			return fmt.Errorf("event payload must be a map with string keys, but got the key %v", k)
-		}
-		payloadOut[key] = v
-	}
-	event.Payload = payloadOut
-
-	return nil
+	return reflect.DeepEqual(event, other)
 }

+ 0 - 59
event_test.go

@@ -1,60 +1 @@
 package borealis
-
-import (
-	"bytes"
-	"encoding/json"
-	"testing"
-
-	"github.com/fxamacker/cbor/v2"
-)
-
-var event = Event{
-	Type:         1,
-	TimestampS:   2,
-	TimestampMS:  3,
-	SequentialID: 4,
-	UniqueID:     []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xA, 0xB, 0xC, 0xD, 0xE, 0xF},
-	Payload:      map[string]interface{}{},
-}
-var eventJSON = "[1,2,3,4,\"AAECAwQFBgcICQoLDA0ODw==\",{}]"
-var eventCBOR = []byte{0x86, 1, 2, 3, 4, 0x50, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0xa0}
-
-func TestEncodeEventJSON(t *testing.T) {
-	output, err := json.Marshal(event)
-	if err != nil {
-		t.Error(err)
-	}
-	actual := string(output)
-	if actual != eventJSON {
-		t.Errorf("expected %v, but got %v", eventJSON, actual)
-	}
-}
-
-func TestEncodeEventCBOR(t *testing.T) {
-	actual, err := cbor.Marshal(event)
-	if err != nil {
-		t.Error(err)
-	}
-	if !bytes.Equal(actual, eventCBOR) {
-		t.Errorf("expected %v, but got %v", eventCBOR, actual)
-	}
-}
-
-func TestDecodeEventJSON(t *testing.T) {
-	var actual Event
-	if err := json.Unmarshal([]byte(eventJSON), &actual); err != nil {
-		t.Error(err)
-	}
-	if !event.Equal(actual) {
-		t.Errorf("expected %v, but got %v", event, actual)
-	}
-}
-func TestDecodeEventCBOR(t *testing.T) {
-	var actual Event
-	if err := cbor.Unmarshal([]byte(eventCBOR), &actual); err != nil {
-		t.Error(err)
-	}
-	if !event.Equal(actual) {
-		t.Errorf("expected %v, but got %v", event, actual)
-	}
-}

+ 2 - 0
go.mod

@@ -7,8 +7,10 @@ replace github.com/aurora-is-near/aurora-events/go v0.0.0 => ./events/go
 require (
 	github.com/aurora-is-near/aurora-events/go v0.0.0
 	github.com/fxamacker/cbor/v2 v2.3.0
+	github.com/johncgriffin/overflow v0.0.0-20211019200055-46fa312c352c
 	github.com/mr-tron/base58 v1.2.0
 	github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc
+	github.com/segmentio/ksuid v1.0.4
 )
 
 require (

+ 4 - 0
go.sum

@@ -14,6 +14,8 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/johncgriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:2n/HCxBM7oa5PNCPKIhV26EtJkaPXFfcVojPAT3ujTU=
+github.com/johncgriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:B9OPZOhZ3FIi6bu54lAgCMzXLh11Z7ilr3rOr/ClP+E=
 github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s=
 github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
 github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
@@ -30,6 +32,8 @@ github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
 github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
 github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
 github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
+github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c=
+github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE=
 github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
 github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
 golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=

+ 143 - 0
raw_event.go

@@ -0,0 +1,143 @@
+package borealis
+
+import (
+	"bytes"
+	"encoding/base64"
+	"encoding/json"
+	"fmt"
+	"reflect"
+	"time"
+
+	"github.com/fxamacker/cbor/v2"
+	"github.com/johncgriffin/overflow"
+	"github.com/segmentio/ksuid"
+)
+
+const btcEpoch = 1231006505 // Bitcoin genesis, 2009-01-03T18:15:05Z
+
+type RawEvent struct {
+	_            struct{} `cbor:",toarray" json:"-"`
+	Type         uint16
+	SequentialID uint64
+	TimestampS   uint32
+	TimestampMS  uint16
+	UniqueID     []byte
+	Payload      map[string]interface{}
+}
+
+func (rawEvent RawEvent) Check() (*Event, error) {
+	timestampS, ok := overflow.Add64(btcEpoch, int64(rawEvent.TimestampS))
+	if !ok || rawEvent.TimestampMS > 999 {
+		return nil, fmt.Errorf("timestamp overflow")
+	}
+	timestampNS := int64(rawEvent.TimestampMS) * 1000000
+	timestamp := time.Unix(timestampS, timestampNS)
+
+	ksuid, err := ksuid.FromParts(timestamp, rawEvent.UniqueID)
+	if err != nil {
+		return nil, err
+	}
+
+	event := Event{
+		Type:         rawEvent.Type,
+		SequentialID: rawEvent.SequentialID,
+		Timestamp:    timestamp,
+		UniqueID:     ksuid,
+		Payload:      rawEvent.Payload,
+	}
+	return &event, nil
+}
+
+func (event RawEvent) Equal(other RawEvent) bool {
+	if event.Type != other.Type ||
+		event.SequentialID != other.SequentialID ||
+		event.TimestampS != other.TimestampS ||
+		event.TimestampMS != other.TimestampMS ||
+		!bytes.Equal(event.UniqueID, other.UniqueID) {
+		return false
+	}
+	return reflect.DeepEqual(event.Payload, other.Payload)
+}
+
+func (event RawEvent) MarshalJSON() ([]byte, error) {
+	return json.Marshal([]interface{}{
+		event.Type,
+		event.SequentialID,
+		event.TimestampS,
+		event.TimestampMS,
+		event.UniqueID, // automatically Base64-encoded
+		event.Payload,
+	})
+}
+
+func (event RawEvent) MarshalCBOR() ([]byte, error) {
+	return cbor.Marshal([]interface{}{
+		event.Type,
+		event.SequentialID,
+		event.TimestampS,
+		event.TimestampMS,
+		event.UniqueID,
+		event.Payload,
+	})
+}
+
+func (event *RawEvent) UnmarshalJSON(input []byte) error {
+	var err error
+	array := []interface{}{}
+	if err = json.Unmarshal(input, &array); err != nil {
+		return err
+	}
+	if len(array) != 6 {
+		return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array))
+	}
+
+	event.Type = uint16(array[0].(float64))
+	event.SequentialID = uint64(array[1].(float64))
+	event.TimestampS = uint32(array[2].(float64))
+	event.TimestampMS = uint16(array[3].(float64))
+	event.UniqueID, err = base64.StdEncoding.DecodeString(array[4].(string))
+	if err != nil {
+		return err
+	}
+
+	payload, ok := array[5].(map[string]interface{})
+	if !ok {
+		return fmt.Errorf("event payload must be a map")
+	}
+	event.Payload = payload
+
+	return nil
+}
+
+func (event *RawEvent) UnmarshalCBOR(input []byte) error {
+	var err error
+	array := []interface{}{}
+	if err = cbor.Unmarshal(input, &array); err != nil {
+		return err
+	}
+	if len(array) != 6 {
+		return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array))
+	}
+
+	event.Type = uint16(array[0].(uint64))
+	event.SequentialID = uint64(array[1].(uint64))
+	event.TimestampS = uint32(array[2].(uint64))
+	event.TimestampMS = uint16(array[3].(uint64))
+	event.UniqueID = array[4].([]byte)
+
+	payloadIn, ok := array[5].(map[interface{}]interface{})
+	if !ok {
+		return fmt.Errorf("event payload must be a map")
+	}
+	payloadOut := make(map[string]interface{})
+	for k, v := range payloadIn {
+		var key string
+		if key, ok = k.(string); !ok {
+			return fmt.Errorf("event payload must be a map with string keys, but got the key %v", k)
+		}
+		payloadOut[key] = v
+	}
+	event.Payload = payloadOut
+
+	return nil
+}

+ 81 - 0
raw_event_test.go

@@ -0,0 +1,81 @@
+package borealis
+
+import (
+	"bytes"
+	"encoding/json"
+	"testing"
+	"time"
+
+	"github.com/fxamacker/cbor/v2"
+	"github.com/segmentio/ksuid"
+)
+
+var expectedEvent = Event{
+	Type:         1,
+	SequentialID: 2,
+	Timestamp:    time.UnixMicro(1231006508004000), // 2009-01-03T18:15:08.004Z
+	UniqueID:     ksuid.KSUID{245, 237, 93, 44, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
+	Payload:      map[string]interface{}{},
+}
+
+var expectedEventRaw = RawEvent{
+	Type:         1,
+	SequentialID: 2,
+	TimestampS:   3,
+	TimestampMS:  4,
+	UniqueID:     []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xA, 0xB, 0xC, 0xD, 0xE, 0xF},
+	Payload:      map[string]interface{}{},
+}
+
+var expectedEventJSON = "[1,2,3,4,\"AAECAwQFBgcICQoLDA0ODw==\",{}]"
+var expectedEventCBOR = []byte{0x86, 1, 2, 3, 4, 0x50, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0xa0}
+
+func TestCheckEvent(t *testing.T) {
+	actualEvent, err := expectedEventRaw.Check()
+	if err != nil {
+		t.Error(err)
+	}
+	if !expectedEvent.Equal(*actualEvent) {
+		t.Errorf("expected %v, but got %v", expectedEvent, actualEvent)
+	}
+}
+
+func TestEncodeEventJSON(t *testing.T) {
+	output, err := json.Marshal(expectedEventRaw)
+	if err != nil {
+		t.Error(err)
+	}
+	actual := string(output)
+	if actual != expectedEventJSON {
+		t.Errorf("expected %v, but got %v", expectedEventJSON, actual)
+	}
+}
+
+func TestEncodeEventCBOR(t *testing.T) {
+	actual, err := cbor.Marshal(expectedEventRaw)
+	if err != nil {
+		t.Error(err)
+	}
+	if !bytes.Equal(actual, expectedEventCBOR) {
+		t.Errorf("expected %v, but got %v", expectedEventCBOR, actual)
+	}
+}
+
+func TestDecodeEventJSON(t *testing.T) {
+	var actual RawEvent
+	if err := json.Unmarshal([]byte(expectedEventJSON), &actual); err != nil {
+		t.Error(err)
+	}
+	if !expectedEventRaw.Equal(actual) {
+		t.Errorf("expected %v, but got %v", expectedEventRaw, actual)
+	}
+}
+func TestDecodeEventCBOR(t *testing.T) {
+	var actual RawEvent
+	if err := cbor.Unmarshal([]byte(expectedEventCBOR), &actual); err != nil {
+		t.Error(err)
+	}
+	if !expectedEventRaw.Equal(actual) {
+		t.Errorf("expected %v, but got %v", expectedEventRaw, actual)
+	}
+}