123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- package borealis
- import (
- "bytes"
- "encoding/base64"
- "encoding/json"
- "fmt"
- "reflect"
- "time"
- "github.com/fxamacker/cbor/v2"
- "github.com/johncgriffin/overflow"
- "github.com/segmentio/ksuid"
- )
- const (
- btcEpoch = 1231006505 // Bitcoin genesis, 2009-01-03T18:15:05Z
- V1MessageVersion = 1 // Current version for messages on the bus
- DefaultMessageVersion = V1MessageVersion
- )
- type Envelop struct {
- _ struct{} `json:"-"`
- Type uint16 `cbor:"event_type"`
- SequentialID uint64 `cbor:"sequential_id"`
- TimestampS uint32 `cbor:"timestamp_s"`
- TimestampMS uint16 `cbor:"timestamp_ms"`
- UniqueID [16]byte `cbor:"unique_id"`
- }
- type RawEvent[T any] struct {
- _ struct{} `json:"-"`
- Version uint8
- Envelop Envelop
- Payload T
- }
- func NewRawEvent[T any](type_ uint16, payload T) (*RawEvent[T], error) {
- now := time.Now()
- ksuid, err := ksuid.NewRandomWithTime(now)
- if err != nil {
- return nil, err
- }
- unique_id := [16]byte{}
- copy(unique_id[:], ksuid[4:])
- return &RawEvent[T]{
- Version: DefaultMessageVersion,
- Envelop: Envelop{
- Type: type_,
- SequentialID: 0,
- TimestampS: uint32(now.Unix() - btcEpoch),
- TimestampMS: uint16(now.UnixMilli() % 1000),
- UniqueID: unique_id,
- },
- Payload: payload,
- }, nil
- }
- func (rawEvent RawEvent[T]) Check() (*Event[T], error) {
- timestampS, ok := overflow.Add64(btcEpoch, int64(rawEvent.Envelop.TimestampS))
- if !ok || rawEvent.Envelop.TimestampMS > 999 {
- return nil, fmt.Errorf("timestamp overflow")
- }
- timestampNS := int64(rawEvent.Envelop.TimestampMS) * 1000000
- timestamp := time.Unix(timestampS, timestampNS)
- ksuid, err := ksuid.FromParts(timestamp, rawEvent.Envelop.UniqueID[:])
- if err != nil {
- return nil, err
- }
- event := Event[T]{
- Type: EventType(rawEvent.Envelop.Type),
- SequentialID: rawEvent.Envelop.SequentialID,
- Timestamp: timestamp,
- UniqueID: ksuid,
- Payload: rawEvent.Payload,
- }
- return &event, nil
- }
- func (event RawEvent[T]) Equal(other RawEvent[T]) bool {
- if event.Envelop.Type != other.Envelop.Type ||
- event.Envelop.SequentialID != other.Envelop.SequentialID ||
- event.Envelop.TimestampS != other.Envelop.TimestampS ||
- event.Envelop.TimestampMS != other.Envelop.TimestampMS ||
- !bytes.Equal(event.Envelop.UniqueID[:], other.Envelop.UniqueID[:]) {
- return false
- }
- return reflect.DeepEqual(event.Payload, other.Payload)
- }
- func (event RawEvent[T]) JSON() string {
- output, err := json.Marshal(event)
- if err != nil {
- panic(err)
- }
- return string(output)
- }
- func (event RawEvent[T]) MarshalJSON() ([]byte, error) {
- return json.Marshal([]interface{}{
- event.Version,
- event.Envelop.Type,
- event.Envelop.SequentialID,
- event.Envelop.TimestampS,
- event.Envelop.TimestampMS,
- event.Envelop.UniqueID[:], // automatically Base64-encoded
- event.Payload,
- })
- }
- func (event *RawEvent[T]) UnmarshalJSON(input []byte) error {
- var err error
- array := []interface{}{}
- if err = json.Unmarshal(input, &array); err != nil {
- return err
- }
- if len(array) != 7 {
- return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array))
- }
- event.Version = uint8(array[0].(float64))
- event.Envelop.Type = uint16(array[1].(float64))
- event.Envelop.SequentialID = uint64(array[2].(float64))
- event.Envelop.TimestampS = uint32(array[3].(float64))
- event.Envelop.TimestampMS = uint16(array[4].(float64))
- unique_id, err := base64.StdEncoding.DecodeString(array[5].(string))
- if err != nil {
- return err
- }
- copy(event.Envelop.UniqueID[:], unique_id)
- payload, ok := array[6].(T)
- if !ok {
- return fmt.Errorf("event payload must be a map")
- }
- event.Payload = payload
- return nil
- }
- func (event RawEvent[T]) EncodeCBOR() ([]byte, error) {
- encoded_envelop, err := cbor.Marshal(event.Envelop)
- if err != nil {
- return nil, err
- }
- encoded_payload, err := cbor.Marshal(event.Payload)
- if err != nil {
- return nil, err
- }
- partial := append([]byte{V1MessageVersion}, encoded_envelop...)
- return append(partial, encoded_payload...), nil
- }
- func (event *RawEvent[T]) DecodeCBOR(input []byte) error {
- var err error
- if len(input) < 1 {
- return fmt.Errorf("Empty message. Missing Version number")
- }
- version_message := input[0]
- switch version_message {
- case V1MessageVersion:
- decoder := cbor.NewDecoder(bytes.NewReader(input[1:]))
- err = decoder.Decode(&event.Envelop)
- if err != nil {
- return err
- }
- err = decoder.Decode(&event.Payload)
- if err != nil {
- return err
- }
- return nil
- default:
- return fmt.Errorf("Unsupported version message %d", version_message)
- }
- }
|