raw_event.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package borealis
  2. import (
  3. "bytes"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "reflect"
  8. "time"
  9. "github.com/fxamacker/cbor/v2"
  10. "github.com/johncgriffin/overflow"
  11. "github.com/segmentio/ksuid"
  12. )
  13. const btcEpoch = 1231006505 // Bitcoin genesis, 2009-01-03T18:15:05Z
  14. type RawEvent struct {
  15. _ struct{} `cbor:",toarray" json:"-"`
  16. Type uint16
  17. SequentialID uint64
  18. TimestampS uint32
  19. TimestampMS uint16
  20. UniqueID []byte
  21. Payload interface{}
  22. }
  23. func NewRawEvent(type_ uint16, payload interface{}) (*RawEvent, error) {
  24. now := time.Now()
  25. ksuid, err := ksuid.NewRandomWithTime(now)
  26. if err != nil {
  27. return nil, err
  28. }
  29. return &RawEvent{
  30. Type: type_,
  31. SequentialID: 0,
  32. TimestampS: uint32(now.Unix() - btcEpoch),
  33. TimestampMS: uint16(now.UnixMilli() % 1000),
  34. UniqueID: ksuid[4:],
  35. Payload: payload,
  36. }, nil
  37. }
  38. func (rawEvent RawEvent) Check() (*Event, error) {
  39. timestampS, ok := overflow.Add64(btcEpoch, int64(rawEvent.TimestampS))
  40. if !ok || rawEvent.TimestampMS > 999 {
  41. return nil, fmt.Errorf("timestamp overflow")
  42. }
  43. timestampNS := int64(rawEvent.TimestampMS) * 1000000
  44. timestamp := time.Unix(timestampS, timestampNS)
  45. ksuid, err := ksuid.FromParts(timestamp, rawEvent.UniqueID)
  46. if err != nil {
  47. return nil, err
  48. }
  49. event := Event{
  50. Type: EventType(rawEvent.Type),
  51. SequentialID: rawEvent.SequentialID,
  52. Timestamp: timestamp,
  53. UniqueID: ksuid,
  54. Payload: rawEvent.Payload,
  55. }
  56. return &event, nil
  57. }
  58. func (event RawEvent) Equal(other RawEvent) bool {
  59. if event.Type != other.Type ||
  60. event.SequentialID != other.SequentialID ||
  61. event.TimestampS != other.TimestampS ||
  62. event.TimestampMS != other.TimestampMS ||
  63. !bytes.Equal(event.UniqueID, other.UniqueID) {
  64. return false
  65. }
  66. return reflect.DeepEqual(event.Payload, other.Payload)
  67. }
  68. func (event RawEvent) JSON() string {
  69. output, err := json.Marshal(event)
  70. if err != nil {
  71. panic(err)
  72. }
  73. return string(output)
  74. }
  75. func (event RawEvent) MarshalJSON() ([]byte, error) {
  76. return json.Marshal([]interface{}{
  77. event.Type,
  78. event.SequentialID,
  79. event.TimestampS,
  80. event.TimestampMS,
  81. event.UniqueID, // automatically Base64-encoded
  82. event.Payload,
  83. })
  84. }
  85. func (event RawEvent) MarshalCBOR() ([]byte, error) {
  86. return cbor.Marshal([]interface{}{
  87. event.Type,
  88. event.SequentialID,
  89. event.TimestampS,
  90. event.TimestampMS,
  91. event.UniqueID,
  92. event.Payload,
  93. })
  94. }
  95. func (event *RawEvent) UnmarshalJSON(input []byte) error {
  96. var err error
  97. array := []interface{}{}
  98. if err = json.Unmarshal(input, &array); err != nil {
  99. return err
  100. }
  101. if len(array) != 6 {
  102. return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array))
  103. }
  104. event.Type = uint16(array[0].(float64))
  105. event.SequentialID = uint64(array[1].(float64))
  106. event.TimestampS = uint32(array[2].(float64))
  107. event.TimestampMS = uint16(array[3].(float64))
  108. event.UniqueID, err = base64.StdEncoding.DecodeString(array[4].(string))
  109. if err != nil {
  110. return err
  111. }
  112. payload, ok := array[5].(map[string]interface{})
  113. if !ok {
  114. return fmt.Errorf("event payload must be a map")
  115. }
  116. event.Payload = payload
  117. return nil
  118. }
  119. func (event *RawEvent) UnmarshalCBOR(input []byte) error {
  120. var err error
  121. array := []interface{}{}
  122. if err = cbor.Unmarshal(input, &array); err != nil {
  123. return err
  124. }
  125. if len(array) != 6 {
  126. return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array))
  127. }
  128. event.Type = uint16(array[0].(uint64))
  129. event.SequentialID = uint64(array[1].(uint64))
  130. event.TimestampS = uint32(array[2].(uint64))
  131. event.TimestampMS = uint16(array[3].(uint64))
  132. event.UniqueID = array[4].([]byte)
  133. payloadIn, ok := array[5].(map[interface{}]interface{})
  134. if !ok {
  135. return fmt.Errorf("event payload must be a map")
  136. }
  137. payloadOut := make(map[string]interface{})
  138. for k, v := range payloadIn {
  139. var key string
  140. if key, ok = k.(string); !ok {
  141. return fmt.Errorf("event payload must be a map with string keys, but got the key %v", k)
  142. }
  143. payloadOut[key] = v
  144. }
  145. event.Payload = payloadOut
  146. return nil
  147. }