|
@@ -10,6 +10,8 @@ type Bus struct {
|
|
|
NATS *nats.EncodedConn
|
|
|
}
|
|
|
|
|
|
+type SubscriberHandler func(event *Event)
|
|
|
+
|
|
|
func Connect(url string) (*Bus, error) {
|
|
|
nats.RegisterEncoder("v1", &V1MessageEncoder{})
|
|
|
conn, err := nats.Connect(url)
|
|
@@ -57,3 +59,12 @@ func (bus *Bus) PublishRawEvent(stream string, rawEvent *RawEvent) error {
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
+
|
|
|
+func (bus *Bus) Subscribe(stream string, handler SubscriberHandler) (*nats.Subscription, error) {
|
|
|
+ return bus.NATS.Subscribe(stream, func(rawEvent RawEvent) {
|
|
|
+ event, err := rawEvent.Check()
|
|
|
+ if err == nil { // ignore invalid messages
|
|
|
+ handler(event)
|
|
|
+ }
|
|
|
+ })
|
|
|
+}
|