package borealis import ( "bytes" "encoding/base64" "encoding/json" "fmt" "reflect" "time" "github.com/fxamacker/cbor/v2" "github.com/johncgriffin/overflow" "github.com/segmentio/ksuid" ) const ( btcEpoch = 1231006505 // Bitcoin genesis, 2009-01-03T18:15:05Z V1MessageVersion = 1 // Current version for messages on the bus DefaultMessageVersion = V1MessageVersion ) type Envelop struct { _ struct{} `json:"-"` Type uint16 `cbor:"event_type"` SequentialID uint64 `cbor:"sequential_id"` TimestampS uint32 `cbor:"timestamp_s"` TimestampMS uint16 `cbor:"timestamp_ms"` UniqueID [16]byte `cbor:"unique_id"` } type RawEvent[T any] struct { _ struct{} `json:"-"` Version uint8 Envelop Envelop Payload T } func NewRawEvent[T any](type_ uint16, payload T) (*RawEvent[T], error) { now := time.Now() ksuid, err := ksuid.NewRandomWithTime(now) if err != nil { return nil, err } unique_id := [16]byte{} copy(unique_id[:], ksuid[4:]) return &RawEvent[T]{ Version: DefaultMessageVersion, Envelop: Envelop{ Type: type_, SequentialID: 0, TimestampS: uint32(now.Unix() - btcEpoch), TimestampMS: uint16(now.UnixMilli() % 1000), UniqueID: unique_id, }, Payload: payload, }, nil } func (rawEvent RawEvent[T]) Check() (*Event[T], error) { timestampS, ok := overflow.Add64(btcEpoch, int64(rawEvent.Envelop.TimestampS)) if !ok || rawEvent.Envelop.TimestampMS > 999 { return nil, fmt.Errorf("timestamp overflow") } timestampNS := int64(rawEvent.Envelop.TimestampMS) * 1000000 timestamp := time.Unix(timestampS, timestampNS) ksuid, err := ksuid.FromParts(timestamp, rawEvent.Envelop.UniqueID[:]) if err != nil { return nil, err } event := Event[T]{ Type: EventType(rawEvent.Envelop.Type), SequentialID: rawEvent.Envelop.SequentialID, Timestamp: timestamp, UniqueID: ksuid, Payload: rawEvent.Payload, } return &event, nil } func (event RawEvent[T]) Equal(other RawEvent[T]) bool { if event.Envelop.Type != other.Envelop.Type || event.Envelop.SequentialID != other.Envelop.SequentialID || event.Envelop.TimestampS != other.Envelop.TimestampS || event.Envelop.TimestampMS != other.Envelop.TimestampMS || !bytes.Equal(event.Envelop.UniqueID[:], other.Envelop.UniqueID[:]) { return false } return reflect.DeepEqual(event.Payload, other.Payload) } func (event RawEvent[T]) JSON() string { output, err := json.Marshal(event) if err != nil { panic(err) } return string(output) } func (event RawEvent[T]) MarshalJSON() ([]byte, error) { return json.Marshal([]interface{}{ event.Version, event.Envelop.Type, event.Envelop.SequentialID, event.Envelop.TimestampS, event.Envelop.TimestampMS, event.Envelop.UniqueID[:], // automatically Base64-encoded event.Payload, }) } func (event *RawEvent[T]) UnmarshalJSON(input []byte) error { var err error array := []interface{}{} if err = json.Unmarshal(input, &array); err != nil { return err } if len(array) != 7 { return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array)) } event.Version = uint8(array[0].(float64)) event.Envelop.Type = uint16(array[1].(float64)) event.Envelop.SequentialID = uint64(array[2].(float64)) event.Envelop.TimestampS = uint32(array[3].(float64)) event.Envelop.TimestampMS = uint16(array[4].(float64)) unique_id, err := base64.StdEncoding.DecodeString(array[5].(string)) if err != nil { return err } copy(event.Envelop.UniqueID[:], unique_id) payload, ok := array[6].(T) if !ok { return fmt.Errorf("event payload must be a map") } event.Payload = payload return nil } func (event RawEvent[T]) EncodeCBOR() ([]byte, error) { encoded_envelop, err := cbor.Marshal(event.Envelop) if err != nil { return nil, err } encoded_payload, err := cbor.Marshal(event.Payload) if err != nil { return nil, err } partial := append([]byte{V1MessageVersion}, encoded_envelop...) return append(partial, encoded_payload...), nil } func (event *RawEvent[T]) DecodeCBOR(input []byte) error { var err error if len(input) < 1 { return fmt.Errorf("Empty message. Missing Version number") } version_message := input[0] switch version_message { case V1MessageVersion: decoder := cbor.NewDecoder(bytes.NewReader(input[1:])) err = decoder.Decode(&event.Envelop) if err != nil { return err } err = decoder.Decode(&event.Payload) if err != nil { return err } return nil default: return fmt.Errorf("Unsupported version message %d", version_message) } }