package borealis import ( "bytes" "encoding/base64" "encoding/json" "fmt" "reflect" "time" "github.com/fxamacker/cbor/v2" "github.com/johncgriffin/overflow" "github.com/segmentio/ksuid" ) const btcEpoch = 1231006505 // Bitcoin genesis, 2009-01-03T18:15:05Z type RawEvent struct { _ struct{} `cbor:",toarray" json:"-"` Type uint16 SequentialID uint64 TimestampS uint32 TimestampMS uint16 UniqueID []byte Payload interface{} } func NewRawEvent(type_ uint16, payload interface{}) (*RawEvent, error) { now := time.Now() ksuid, err := ksuid.NewRandomWithTime(now) if err != nil { return nil, err } return &RawEvent{ Type: type_, SequentialID: 0, TimestampS: uint32(now.Unix() - btcEpoch), TimestampMS: uint16(now.UnixMilli() % 1000), UniqueID: ksuid[4:], Payload: payload, }, nil } func (rawEvent RawEvent) Check() (*Event, error) { timestampS, ok := overflow.Add64(btcEpoch, int64(rawEvent.TimestampS)) if !ok || rawEvent.TimestampMS > 999 { return nil, fmt.Errorf("timestamp overflow") } timestampNS := int64(rawEvent.TimestampMS) * 1000000 timestamp := time.Unix(timestampS, timestampNS) ksuid, err := ksuid.FromParts(timestamp, rawEvent.UniqueID) if err != nil { return nil, err } event := Event{ Type: EventType(rawEvent.Type), SequentialID: rawEvent.SequentialID, Timestamp: timestamp, UniqueID: ksuid, Payload: rawEvent.Payload, } return &event, nil } func (event RawEvent) Equal(other RawEvent) bool { if event.Type != other.Type || event.SequentialID != other.SequentialID || event.TimestampS != other.TimestampS || event.TimestampMS != other.TimestampMS || !bytes.Equal(event.UniqueID, other.UniqueID) { return false } return reflect.DeepEqual(event.Payload, other.Payload) } func (event RawEvent) JSON() string { output, err := json.Marshal(event) if err != nil { panic(err) } return string(output) } func (event RawEvent) MarshalJSON() ([]byte, error) { return json.Marshal([]interface{}{ event.Type, event.SequentialID, event.TimestampS, event.TimestampMS, event.UniqueID, // automatically Base64-encoded event.Payload, }) } func (event RawEvent) MarshalCBOR() ([]byte, error) { return cbor.Marshal([]interface{}{ event.Type, event.SequentialID, event.TimestampS, event.TimestampMS, event.UniqueID, event.Payload, }) } func (event *RawEvent) UnmarshalJSON(input []byte) error { var err error array := []interface{}{} if err = json.Unmarshal(input, &array); err != nil { return err } if len(array) != 6 { return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array)) } event.Type = uint16(array[0].(float64)) event.SequentialID = uint64(array[1].(float64)) event.TimestampS = uint32(array[2].(float64)) event.TimestampMS = uint16(array[3].(float64)) event.UniqueID, err = base64.StdEncoding.DecodeString(array[4].(string)) if err != nil { return err } payload, ok := array[5].(map[string]interface{}) if !ok { return fmt.Errorf("event payload must be a map") } event.Payload = payload return nil } func (event *RawEvent) UnmarshalCBOR(input []byte) error { var err error array := []interface{}{} if err = cbor.Unmarshal(input, &array); err != nil { return err } if len(array) != 6 { return fmt.Errorf("event must be an array of length %d, but got %d", 6, len(array)) } event.Type = uint16(array[0].(uint64)) event.SequentialID = uint64(array[1].(uint64)) event.TimestampS = uint32(array[2].(uint64)) event.TimestampMS = uint16(array[3].(uint64)) event.UniqueID = array[4].([]byte) payloadIn, ok := array[5].(map[interface{}]interface{}) if !ok { return fmt.Errorf("event payload must be a map") } payloadOut := make(map[string]interface{}) for k, v := range payloadIn { var key string if key, ok = k.(string); !ok { return fmt.Errorf("event payload must be a map with string keys, but got the key %v", k) } payloadOut[key] = v } event.Payload = payloadOut return nil }