raw_event.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package borealis
  2. import (
  3. "bytes"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "reflect"
  8. "time"
  9. "github.com/fxamacker/cbor/v2"
  10. "github.com/johncgriffin/overflow"
  11. "github.com/segmentio/ksuid"
  12. )
  13. const btcEpoch = 1231006505 // Bitcoin genesis, 2009-01-03T18:15:05Z
  14. type RawEvent struct {
  15. _ struct{} `cbor:",toarray" json:"-"`
  16. Type uint16
  17. SequentialID uint64
  18. TimestampS uint32
  19. TimestampMS uint16
  20. UniqueID []byte
  21. Payload interface{}
  22. }
  23. func NewRawEvent(type_ uint16, payload interface{}) (*RawEvent, error) {
  24. now := time.Now()
  25. ksuid, err := ksuid.NewRandomWithTime(now)
  26. if err != nil {
  27. return nil, err
  28. }
  29. return &RawEvent{
  30. Type: type_,
  31. SequentialID: 0,
  32. TimestampS: uint32(now.Unix() - btcEpoch),
  33. TimestampMS: uint16(now.UnixMilli() % 1000),
  34. UniqueID: ksuid[4:],
  35. Payload: payload,
  36. }, nil
  37. }
  38. func (rawEvent RawEvent) Check() (*Event, error) {
  39. timestampS, ok := overflow.Add64(btcEpoch, int64(rawEvent.TimestampS))
  40. if !ok || rawEvent.TimestampMS > 999 {
  41. return nil, fmt.Errorf("timestamp overflow")
  42. }
  43. timestampNS := int64(rawEvent.TimestampMS) * 1000000
  44. timestamp := time.Unix(timestampS, timestampNS)
  45. ksuid, err := ksuid.FromParts(timestamp, rawEvent.UniqueID)
  46. if err != nil {
  47. return nil, err
  48. }
  49. event := Event{
  50. Type: rawEvent.Type,
  51. SequentialID: rawEvent.SequentialID,
  52. Timestamp: timestamp,
  53. UniqueID: ksuid,
  54. Payload: rawEvent.Payload,
  55. }
  56. return &event, nil
  57. }
  58. func (event RawEvent) Equal(other RawEvent) bool {
  59. if event.Type != other.Type ||
  60. event.SequentialID != other.SequentialID ||
  61. event.TimestampS != other.TimestampS ||
  62. event.TimestampMS != other.TimestampMS ||
  63. !bytes.Equal(event.UniqueID, other.UniqueID) {
  64. return false
  65. }
  66. return reflect.DeepEqual(event.Payload, other.Payload)
  67. }
  68. func (event RawEvent) MarshalJSON() ([]byte, error) {
  69. return json.Marshal([]interface{}{
  70. event.Type,
  71. event.SequentialID,
  72. event.TimestampS,
  73. event.TimestampMS,
  74. event.UniqueID, // automatically Base64-encoded
  75. event.Payload,
  76. })
  77. }
  78. func (event RawEvent) MarshalCBOR() ([]byte, error) {
  79. return cbor.Marshal([]interface{}{
  80. event.Type,
  81. event.SequentialID,
  82. event.TimestampS,
  83. event.TimestampMS,
  84. event.UniqueID,
  85. event.Payload,
  86. })
  87. }
  88. func (event *RawEvent) UnmarshalJSON(input []byte) error {
  89. var err error
  90. array := []interface{}{}
  91. if err = json.Unmarshal(input, &array); err != nil {
  92. return err
  93. }
  94. if len(array) != 6 {
  95. return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array))
  96. }
  97. event.Type = uint16(array[0].(float64))
  98. event.SequentialID = uint64(array[1].(float64))
  99. event.TimestampS = uint32(array[2].(float64))
  100. event.TimestampMS = uint16(array[3].(float64))
  101. event.UniqueID, err = base64.StdEncoding.DecodeString(array[4].(string))
  102. if err != nil {
  103. return err
  104. }
  105. payload, ok := array[5].(map[string]interface{})
  106. if !ok {
  107. return fmt.Errorf("event payload must be a map")
  108. }
  109. event.Payload = payload
  110. return nil
  111. }
  112. func (event *RawEvent) UnmarshalCBOR(input []byte) error {
  113. var err error
  114. array := []interface{}{}
  115. if err = cbor.Unmarshal(input, &array); err != nil {
  116. return err
  117. }
  118. if len(array) != 6 {
  119. return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array))
  120. }
  121. event.Type = uint16(array[0].(uint64))
  122. event.SequentialID = uint64(array[1].(uint64))
  123. event.TimestampS = uint32(array[2].(uint64))
  124. event.TimestampMS = uint16(array[3].(uint64))
  125. event.UniqueID = array[4].([]byte)
  126. payloadIn, ok := array[5].(map[interface{}]interface{})
  127. if !ok {
  128. return fmt.Errorf("event payload must be a map")
  129. }
  130. payloadOut := make(map[string]interface{})
  131. for k, v := range payloadIn {
  132. var key string
  133. if key, ok = k.(string); !ok {
  134. return fmt.Errorf("event payload must be a map with string keys, but got the key %v", k)
  135. }
  136. payloadOut[key] = v
  137. }
  138. event.Payload = payloadOut
  139. return nil
  140. }