raw_event.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package borealis
  2. import (
  3. "bytes"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "reflect"
  8. "time"
  9. "github.com/fxamacker/cbor/v2"
  10. "github.com/johncgriffin/overflow"
  11. "github.com/segmentio/ksuid"
  12. )
  13. const (
  14. btcEpoch = 1231006505 // Bitcoin genesis, 2009-01-03T18:15:05Z
  15. V1MessageVersion = 1 // Current version for messages on the bus
  16. DefaultMessageVersion = V1MessageVersion
  17. )
  18. type Envelop struct {
  19. _ struct{} `json:"-"`
  20. Type uint16 `cbor:"event_type"`
  21. SequentialID uint64 `cbor:"sequential_id"`
  22. TimestampS uint32 `cbor:"timestamp_s"`
  23. TimestampMS uint16 `cbor:"timestamp_ms"`
  24. UniqueID [16]byte `cbor:"unique_id"`
  25. }
  26. type RawEvent[T any] struct {
  27. _ struct{} `json:"-"`
  28. Version uint8
  29. Envelop Envelop
  30. Payload T
  31. }
  32. func NewRawEvent[T any](type_ uint16, payload T) (*RawEvent[T], error) {
  33. now := time.Now()
  34. ksuid, err := ksuid.NewRandomWithTime(now)
  35. if err != nil {
  36. return nil, err
  37. }
  38. unique_id := [16]byte{}
  39. copy(unique_id[:], ksuid[4:])
  40. return &RawEvent[T]{
  41. Version: DefaultMessageVersion,
  42. Envelop: Envelop{
  43. Type: type_,
  44. SequentialID: 0,
  45. TimestampS: uint32(now.Unix() - btcEpoch),
  46. TimestampMS: uint16(now.UnixMilli() % 1000),
  47. UniqueID: unique_id,
  48. },
  49. Payload: payload,
  50. }, nil
  51. }
  52. func (rawEvent RawEvent[T]) Check() (*Event[T], error) {
  53. timestampS, ok := overflow.Add64(btcEpoch, int64(rawEvent.Envelop.TimestampS))
  54. if !ok || rawEvent.Envelop.TimestampMS > 999 {
  55. return nil, fmt.Errorf("timestamp overflow")
  56. }
  57. timestampNS := int64(rawEvent.Envelop.TimestampMS) * 1000000
  58. timestamp := time.Unix(timestampS, timestampNS)
  59. ksuid, err := ksuid.FromParts(timestamp, rawEvent.Envelop.UniqueID[:])
  60. if err != nil {
  61. return nil, err
  62. }
  63. event := Event[T]{
  64. Type: EventType(rawEvent.Envelop.Type),
  65. SequentialID: rawEvent.Envelop.SequentialID,
  66. Timestamp: timestamp,
  67. UniqueID: ksuid,
  68. Payload: rawEvent.Payload,
  69. }
  70. return &event, nil
  71. }
  72. func (event RawEvent[T]) Equal(other RawEvent[T]) bool {
  73. if event.Envelop.Type != other.Envelop.Type ||
  74. event.Envelop.SequentialID != other.Envelop.SequentialID ||
  75. event.Envelop.TimestampS != other.Envelop.TimestampS ||
  76. event.Envelop.TimestampMS != other.Envelop.TimestampMS ||
  77. !bytes.Equal(event.Envelop.UniqueID[:], other.Envelop.UniqueID[:]) {
  78. return false
  79. }
  80. return reflect.DeepEqual(event.Payload, other.Payload)
  81. }
  82. func (event RawEvent[T]) JSON() string {
  83. output, err := json.Marshal(event)
  84. if err != nil {
  85. panic(err)
  86. }
  87. return string(output)
  88. }
  89. func (event RawEvent[T]) MarshalJSON() ([]byte, error) {
  90. return json.Marshal([]interface{}{
  91. event.Version,
  92. event.Envelop.Type,
  93. event.Envelop.SequentialID,
  94. event.Envelop.TimestampS,
  95. event.Envelop.TimestampMS,
  96. event.Envelop.UniqueID[:], // automatically Base64-encoded
  97. event.Payload,
  98. })
  99. }
  100. func (event *RawEvent[T]) UnmarshalJSON(input []byte) error {
  101. var err error
  102. array := []interface{}{}
  103. if err = json.Unmarshal(input, &array); err != nil {
  104. return err
  105. }
  106. if len(array) != 7 {
  107. return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array))
  108. }
  109. event.Version = uint8(array[0].(float64))
  110. event.Envelop.Type = uint16(array[1].(float64))
  111. event.Envelop.SequentialID = uint64(array[2].(float64))
  112. event.Envelop.TimestampS = uint32(array[3].(float64))
  113. event.Envelop.TimestampMS = uint16(array[4].(float64))
  114. unique_id, err := base64.StdEncoding.DecodeString(array[5].(string))
  115. if err != nil {
  116. return err
  117. }
  118. copy(event.Envelop.UniqueID[:], unique_id)
  119. payload, ok := array[6].(T)
  120. if !ok {
  121. return fmt.Errorf("event payload must be a map")
  122. }
  123. event.Payload = payload
  124. return nil
  125. }
  126. func (event RawEvent[T]) EncodeCBOR() ([]byte, error) {
  127. encoded_envelop, err := cbor.Marshal(event.Envelop)
  128. if err != nil {
  129. return nil, err
  130. }
  131. encoded_payload, err := cbor.Marshal(event.Payload)
  132. if err != nil {
  133. return nil, err
  134. }
  135. partial := append([]byte{V1MessageVersion}, encoded_envelop...)
  136. return append(partial, encoded_payload...), nil
  137. }
  138. func (event *RawEvent[T]) DecodeCBOR(input []byte) error {
  139. var err error
  140. if len(input) < 1 {
  141. return fmt.Errorf("Empty message. Missing Version number")
  142. }
  143. version_message := input[0]
  144. switch version_message {
  145. case V1MessageVersion:
  146. decoder := cbor.NewDecoder(bytes.NewReader(input[1:]))
  147. err = decoder.Decode(&event.Envelop)
  148. if err != nil {
  149. return err
  150. }
  151. err = decoder.Decode(&event.Payload)
  152. if err != nil {
  153. return err
  154. }
  155. return nil
  156. default:
  157. return fmt.Errorf("Unsupported version message %d", version_message)
  158. }
  159. }