123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- package borealis
- import (
- "bytes"
- "encoding/base64"
- "encoding/json"
- "fmt"
- "reflect"
- "time"
- "github.com/fxamacker/cbor/v2"
- "github.com/johncgriffin/overflow"
- "github.com/segmentio/ksuid"
- )
- const btcEpoch = 1231006505 // Bitcoin genesis, 2009-01-03T18:15:05Z
- type RawEvent struct {
- _ struct{} `cbor:",toarray" json:"-"`
- Type uint16
- SequentialID uint64
- TimestampS uint32
- TimestampMS uint16
- UniqueID []byte
- Payload interface{}
- }
- func NewRawEvent(type_ uint16, payload interface{}) (*RawEvent, error) {
- now := time.Now()
- ksuid, err := ksuid.NewRandomWithTime(now)
- if err != nil {
- return nil, err
- }
- return &RawEvent{
- Type: type_,
- SequentialID: 0,
- TimestampS: uint32(now.Unix() - btcEpoch),
- TimestampMS: uint16(now.UnixMilli() % 1000),
- UniqueID: ksuid[4:],
- Payload: payload,
- }, nil
- }
- func (rawEvent RawEvent) Check() (*Event, error) {
- timestampS, ok := overflow.Add64(btcEpoch, int64(rawEvent.TimestampS))
- if !ok || rawEvent.TimestampMS > 999 {
- return nil, fmt.Errorf("timestamp overflow")
- }
- timestampNS := int64(rawEvent.TimestampMS) * 1000000
- timestamp := time.Unix(timestampS, timestampNS)
- ksuid, err := ksuid.FromParts(timestamp, rawEvent.UniqueID)
- if err != nil {
- return nil, err
- }
- event := Event{
- Type: EventType(rawEvent.Type),
- SequentialID: rawEvent.SequentialID,
- Timestamp: timestamp,
- UniqueID: ksuid,
- Payload: rawEvent.Payload,
- }
- return &event, nil
- }
- func (event RawEvent) Equal(other RawEvent) bool {
- if event.Type != other.Type ||
- event.SequentialID != other.SequentialID ||
- event.TimestampS != other.TimestampS ||
- event.TimestampMS != other.TimestampMS ||
- !bytes.Equal(event.UniqueID, other.UniqueID) {
- return false
- }
- return reflect.DeepEqual(event.Payload, other.Payload)
- }
- func (event RawEvent) JSON() string {
- output, err := json.Marshal(event)
- if err != nil {
- panic(err)
- }
- return string(output)
- }
- func (event RawEvent) MarshalJSON() ([]byte, error) {
- return json.Marshal([]interface{}{
- event.Type,
- event.SequentialID,
- event.TimestampS,
- event.TimestampMS,
- event.UniqueID, // automatically Base64-encoded
- event.Payload,
- })
- }
- func (event RawEvent) MarshalCBOR() ([]byte, error) {
- return cbor.Marshal([]interface{}{
- event.Type,
- event.SequentialID,
- event.TimestampS,
- event.TimestampMS,
- event.UniqueID,
- event.Payload,
- })
- }
- func (event *RawEvent) UnmarshalJSON(input []byte) error {
- var err error
- array := []interface{}{}
- if err = json.Unmarshal(input, &array); err != nil {
- return err
- }
- if len(array) != 6 {
- return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array))
- }
- event.Type = uint16(array[0].(float64))
- event.SequentialID = uint64(array[1].(float64))
- event.TimestampS = uint32(array[2].(float64))
- event.TimestampMS = uint16(array[3].(float64))
- event.UniqueID, err = base64.StdEncoding.DecodeString(array[4].(string))
- if err != nil {
- return err
- }
- payload, ok := array[5].(map[string]interface{})
- if !ok {
- return fmt.Errorf("event payload must be a map")
- }
- event.Payload = payload
- return nil
- }
- func (event *RawEvent) UnmarshalCBOR(input []byte) error {
- var err error
- array := []interface{}{}
- if err = cbor.Unmarshal(input, &array); err != nil {
- return err
- }
- if len(array) != 6 {
- return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array))
- }
- event.Type = uint16(array[0].(uint64))
- event.SequentialID = uint64(array[1].(uint64))
- event.TimestampS = uint32(array[2].(uint64))
- event.TimestampMS = uint16(array[3].(uint64))
- event.UniqueID = array[4].([]byte)
- payloadIn, ok := array[5].(map[interface{}]interface{})
- if !ok {
- return fmt.Errorf("event payload must be a map")
- }
- payloadOut := make(map[string]interface{})
- for k, v := range payloadIn {
- var key string
- if key, ok = k.(string); !ok {
- return fmt.Errorf("event payload must be a map with string keys, but got the key %v", k)
- }
- payloadOut[key] = v
- }
- event.Payload = payloadOut
- return nil
- }
|