event.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package borealis
  2. import (
  3. "bytes"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "reflect"
  8. "github.com/fxamacker/cbor/v2"
  9. )
  10. type Event struct { // TODO: EventEnvelope or RawEvent
  11. Type uint16
  12. TimestampS uint32
  13. TimestampMS uint16
  14. SequentialID uint64
  15. UniqueID []byte
  16. Payload map[string]interface{}
  17. }
  18. func (event Event) Equal(other Event) bool {
  19. if event.Type != other.Type ||
  20. event.TimestampS != other.TimestampS ||
  21. event.TimestampMS != other.TimestampMS ||
  22. event.SequentialID != other.SequentialID ||
  23. !bytes.Equal(event.UniqueID, other.UniqueID) {
  24. return false
  25. }
  26. return reflect.DeepEqual(event.Payload, other.Payload)
  27. }
  28. func (event Event) MarshalJSON() ([]byte, error) {
  29. return json.Marshal([]interface{}{
  30. event.Type,
  31. event.TimestampS,
  32. event.TimestampMS,
  33. event.SequentialID,
  34. event.UniqueID, // automatically Base64-encoded
  35. event.Payload,
  36. })
  37. }
  38. func (event Event) MarshalCBOR() ([]byte, error) {
  39. return cbor.Marshal([]interface{}{
  40. event.Type,
  41. event.TimestampS,
  42. event.TimestampMS,
  43. event.SequentialID,
  44. event.UniqueID,
  45. event.Payload,
  46. })
  47. }
  48. func (event *Event) UnmarshalJSON(input []byte) error {
  49. var err error
  50. array := []interface{}{}
  51. if err = json.Unmarshal(input, &array); err != nil {
  52. return err
  53. }
  54. if len(array) != 6 {
  55. return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array))
  56. }
  57. event.Type = uint16(array[0].(float64))
  58. event.TimestampS = uint32(array[1].(float64))
  59. event.TimestampMS = uint16(array[2].(float64))
  60. event.SequentialID = uint64(array[3].(float64))
  61. event.UniqueID, err = base64.StdEncoding.DecodeString(array[4].(string))
  62. if err != nil {
  63. return err
  64. }
  65. payload, ok := array[5].(map[string]interface{})
  66. if !ok {
  67. return fmt.Errorf("event payload must be a map")
  68. }
  69. event.Payload = payload
  70. return nil
  71. }
  72. func (event *Event) UnmarshalCBOR(input []byte) error {
  73. var err error
  74. array := []interface{}{}
  75. if err = cbor.Unmarshal(input, &array); err != nil {
  76. return err
  77. }
  78. if len(array) != 6 {
  79. return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array))
  80. }
  81. event.Type = uint16(array[0].(uint64))
  82. event.TimestampS = uint32(array[1].(uint64))
  83. event.TimestampMS = uint16(array[2].(uint64))
  84. event.SequentialID = uint64(array[3].(uint64))
  85. event.UniqueID = array[4].([]byte)
  86. payloadIn, ok := array[5].(map[interface{}]interface{})
  87. if !ok {
  88. return fmt.Errorf("event payload must be a map")
  89. }
  90. payloadOut := make(map[string]interface{})
  91. for k, v := range payloadIn {
  92. var key string
  93. if key, ok = k.(string); !ok {
  94. return fmt.Errorf("event payload must be a map with string keys, but got the key %v", k)
  95. }
  96. payloadOut[key] = v
  97. }
  98. event.Payload = payloadOut
  99. return nil
  100. }