Rework event system: add RawMessage

pull/89/head
Aloïs Micard 3 years ago
parent c50c4bb9c9
commit a9e1d44e6c
No known key found for this signature in database
GPG Key ID: 1A0EB82F071F5EFE

@ -8,7 +8,6 @@ import (
"github.com/creekorful/trandoshan/internal/util"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"io"
"os"
"os/signal"
"strings"
@ -84,9 +83,9 @@ type state struct {
storage storage.Storage
}
func (state *state) handleNewResourceEvent(subscriber event.Subscriber, body io.Reader) error {
func (state *state) handleNewResourceEvent(subscriber event.Subscriber, msg event.RawMessage) error {
var evt event.NewResourceEvent
if err := subscriber.Read(body, &evt); err != nil {
if err := subscriber.Read(&msg, &evt); err != nil {
return err
}

@ -1,7 +1,6 @@
package archiver
import (
"bytes"
"github.com/creekorful/trandoshan/internal/archiver/storage_mock"
"github.com/creekorful/trandoshan/internal/event"
"github.com/creekorful/trandoshan/internal/event_mock"
@ -19,9 +18,9 @@ func TestHandleNewResourceEvent(t *testing.T) {
tn := time.Now()
msg := bytes.NewReader(nil)
msg := event.RawMessage{}
subscriberMock.EXPECT().
Read(msg, &event.NewResourceEvent{}).
Read(&msg, &event.NewResourceEvent{}).
SetArg(1, event.NewResourceEvent{
URL: "https://example.onion",
Body: "Hello, world",

@ -6,7 +6,6 @@ import (
"github.com/creekorful/trandoshan/internal/configapi/api"
"github.com/creekorful/trandoshan/internal/event"
"github.com/rs/zerolog/log"
"io"
"io/ioutil"
"net/http"
"sync"
@ -60,6 +59,7 @@ type client struct {
configAPIURL string
sub event.Subscriber
mutexes map[string]*sync.RWMutex
keys []string
forbiddenMimeTypes []ForbiddenMimeType
forbiddenHostnames []ForbiddenHostname
@ -72,6 +72,7 @@ func NewConfigClient(configAPIURL, processName string, subscriber event.Subscrib
configAPIURL: configAPIURL,
sub: subscriber,
mutexes: map[string]*sync.RWMutex{},
keys: keys,
}
// Pre-load wanted keys & create mutex
@ -201,6 +202,6 @@ func (c *client) setValue(key string, value []byte) error {
return nil
}
func (c *client) handleConfigEvent(subscriber event.Subscriber, body io.Reader) error {
func (c *client) handleConfigEvent(subscriber event.Subscriber, msg event.RawMessage) error {
return nil // TODO
}

@ -36,5 +36,8 @@ func (s *service) Set(key string, value []byte) error {
}
// publish event to notify config changed
return s.pub.PublishJSON(fmt.Sprintf("config.%s", key), value)
return s.pub.PublishJSON(fmt.Sprintf("config.%s", key), event.RawMessage{
Body: value,
Headers: map[string]interface{}{"Config-Key": key},
})
}

@ -2,6 +2,7 @@ package service
import (
"github.com/creekorful/trandoshan/internal/configapi/database_mock"
"github.com/creekorful/trandoshan/internal/event"
"github.com/creekorful/trandoshan/internal/event_mock"
"github.com/golang/mock/gomock"
"testing"
@ -35,7 +36,10 @@ func TestService_Set(t *testing.T) {
pubMock := event_mock.NewMockPublisher(mockCtrl)
dbMock.EXPECT().Set("test-key", []byte("hello")).Return(nil)
pubMock.EXPECT().PublishJSON("config.test-key", []byte("hello")).Return(nil)
pubMock.EXPECT().PublishJSON("config.test-key", event.RawMessage{
Body: []byte("hello"),
Headers: map[string]interface{}{"Config-Key": "test-key"},
}).Return(nil)
s := service{
db: dbMock,

@ -12,7 +12,6 @@ import (
"github.com/urfave/cli/v2"
"github.com/valyala/fasthttp"
"github.com/valyala/fasthttp/fasthttpproxy"
"io"
"io/ioutil"
"os"
"os/signal"
@ -112,9 +111,9 @@ type state struct {
clock clock.Clock
}
func (state *state) handleNewURLEvent(subscriber event.Subscriber, body io.Reader) error {
func (state *state) handleNewURLEvent(subscriber event.Subscriber, msg event.RawMessage) error {
var evt event.NewURLEvent
if err := subscriber.Read(body, &evt); err != nil {
if err := subscriber.Read(&msg, &evt); err != nil {
return err
}

@ -1,7 +1,6 @@
package crawler
import (
"bytes"
"github.com/creekorful/trandoshan/internal/clock_mock"
"github.com/creekorful/trandoshan/internal/crawler/http_mock"
"github.com/creekorful/trandoshan/internal/event"
@ -98,9 +97,9 @@ func TestHandleNewURLEvent(t *testing.T) {
httpResponseMock := http_mock.NewMockResponse(mockCtrl)
clockMock := clock_mock.NewMockClock(mockCtrl)
msg := bytes.NewReader(nil)
msg := event.RawMessage{}
subscriberMock.EXPECT().
Read(msg, &event.NewURLEvent{}).
Read(&msg, &event.NewURLEvent{}).
SetArg(1, event.NewURLEvent{URL: "https://example.onion/image.png?id=12&test=2"}).
Return(nil)

@ -9,7 +9,7 @@ import (
// Publisher is something that push an event
type Publisher interface {
PublishEvent(event Event) error
PublishJSON(exchange string, event []byte) error
PublishJSON(exchange string, msg RawMessage) error
Close() error
}
@ -40,14 +40,15 @@ func (p *publisher) PublishEvent(event Event) error {
return fmt.Errorf("error while encoding event: %s", err)
}
return p.PublishJSON(event.Exchange(), evtBytes)
return p.PublishJSON(event.Exchange(), RawMessage{Body: evtBytes})
}
func (p *publisher) PublishJSON(exchange string, event []byte) error {
func (p *publisher) PublishJSON(exchange string, msg RawMessage) error {
return p.channel.Publish(exchange, "", false, false, amqp.Publishing{
ContentType: "application/json",
Body: event,
Body: msg.Body,
DeliveryMode: amqp.Persistent,
Headers: msg.Headers,
})
}

@ -1,22 +1,26 @@
package event
import (
"bytes"
"encoding/json"
"fmt"
"github.com/rs/zerolog/log"
"github.com/streadway/amqp"
"io"
)
// RawMessage is a raw message as viewed by the messaging system
type RawMessage struct {
Body []byte
Headers map[string]interface{}
}
// Handler represent an event handler
type Handler func(Subscriber, io.Reader) error
type Handler func(Subscriber, RawMessage) error
// Subscriber is something that read msg from an event queue
type Subscriber interface {
Publisher
Read(body io.Reader, event Event) error
Read(msg *RawMessage, event Event) error
SubscribeAsync(exchange, queue string, handler Handler) error
}
@ -51,14 +55,15 @@ func (s *subscriber) PublishEvent(event Event) error {
return fmt.Errorf("error while encoding event: %s", err)
}
return s.PublishJSON(event.Exchange(), evtBytes)
return s.PublishJSON(event.Exchange(), RawMessage{Body: evtBytes})
}
func (s *subscriber) PublishJSON(exchange string, event []byte) error {
func (s *subscriber) PublishJSON(exchange string, msg RawMessage) error {
return s.channel.Publish(exchange, "", false, false, amqp.Publishing{
ContentType: "application/json",
Body: event,
Body: msg.Body,
DeliveryMode: amqp.Persistent,
Headers: msg.Headers,
})
}
@ -66,8 +71,12 @@ func (s *subscriber) Close() error {
return s.channel.Close()
}
func (s *subscriber) Read(body io.Reader, event Event) error {
return readJSON(body, event)
func (s *subscriber) Read(msg *RawMessage, event Event) error {
if err := json.Unmarshal(msg.Body, event); err != nil {
return err
}
return nil
}
func (s *subscriber) SubscribeAsync(exchange, queue string, handler Handler) error {
@ -95,7 +104,11 @@ func (s *subscriber) SubscribeAsync(exchange, queue string, handler Handler) err
go func() {
for delivery := range deliveries {
if err := handler(s, bytes.NewReader(delivery.Body)); err != nil {
msg := RawMessage{
Body: delivery.Body,
Headers: delivery.Headers,
}
if err := handler(s, msg); err != nil {
log.Err(err).Msg("error while processing event")
}
@ -108,11 +121,3 @@ func (s *subscriber) SubscribeAsync(exchange, queue string, handler Handler) err
return nil
}
func readJSON(body io.Reader, event interface{}) error {
if err := json.NewDecoder(body).Decode(event); err != nil {
return fmt.Errorf("error while decoding event: %s", err)
}
return nil
}

@ -10,7 +10,6 @@ import (
"github.com/creekorful/trandoshan/internal/util"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"io"
"mvdan.cc/xurls/v2"
"os"
"os/signal"
@ -78,9 +77,9 @@ type state struct {
apiClient api.API
}
func (state *state) handleNewResourceEvent(subscriber event.Subscriber, body io.Reader) error {
func (state *state) handleNewResourceEvent(subscriber event.Subscriber, msg event.RawMessage) error {
var evt event.NewResourceEvent
if err := subscriber.Read(body, &evt); err != nil {
if err := subscriber.Read(&msg, &evt); err != nil {
return err
}

@ -1,7 +1,6 @@
package extractor
import (
"bytes"
"github.com/creekorful/trandoshan/api"
"github.com/creekorful/trandoshan/api_mock"
"github.com/creekorful/trandoshan/internal/event"
@ -96,9 +95,9 @@ This is sparta (hosted on https://example.org)
tn := time.Now()
msg := bytes.NewReader(nil)
msg := event.RawMessage{}
subscriberMock.EXPECT().
Read(msg, &event.NewResourceEvent{}).
Read(&msg, &event.NewResourceEvent{}).
SetArg(1, event.NewResourceEvent{
URL: "https://example.onion",
Body: body,

@ -10,7 +10,6 @@ import (
"github.com/creekorful/trandoshan/internal/util"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"io"
"net/url"
"os"
"os/signal"
@ -102,9 +101,9 @@ type state struct {
configClient client.Client
}
func (state *state) handleURLFoundEvent(subscriber event.Subscriber, body io.Reader) error {
func (state *state) handleURLFoundEvent(subscriber event.Subscriber, msg event.RawMessage) error {
var evt event.FoundURLEvent
if err := subscriber.Read(body, &evt); err != nil {
if err := subscriber.Read(&msg, &evt); err != nil {
return err
}

@ -1,7 +1,6 @@
package scheduler
import (
"bytes"
"errors"
"fmt"
"github.com/creekorful/trandoshan/api"
@ -22,9 +21,9 @@ func TestHandleMessageNotOnion(t *testing.T) {
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
configClientMock := client_mock.NewMockClient(mockCtrl)
msg := bytes.NewReader(nil)
msg := event.RawMessage{}
subscriberMock.EXPECT().
Read(msg, &event.FoundURLEvent{}).
Read(&msg, &event.FoundURLEvent{}).
SetArg(1, event.FoundURLEvent{URL: "https://example.org"}).
Return(nil)
@ -46,7 +45,7 @@ func TestHandleMessageWrongProtocol(t *testing.T) {
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
configClientMock := client_mock.NewMockClient(mockCtrl)
msg := bytes.NewReader(nil)
msg := event.RawMessage{}
s := state{
apiClient: apiClientMock,
@ -55,7 +54,7 @@ func TestHandleMessageWrongProtocol(t *testing.T) {
for _, protocol := range []string{"irc", "ftp"} {
subscriberMock.EXPECT().
Read(msg, &event.FoundURLEvent{}).
Read(&msg, &event.FoundURLEvent{}).
SetArg(1, event.FoundURLEvent{URL: fmt.Sprintf("%s://example.onion", protocol)}).
Return(nil)
@ -73,9 +72,9 @@ func TestHandleMessageAlreadyCrawled(t *testing.T) {
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
configClientMock := client_mock.NewMockClient(mockCtrl)
msg := bytes.NewReader(nil)
msg := event.RawMessage{}
subscriberMock.EXPECT().
Read(msg, &event.FoundURLEvent{}).
Read(&msg, &event.FoundURLEvent{}).
SetArg(1, event.FoundURLEvent{URL: "https://example.onion"}).
Return(nil)
@ -110,9 +109,9 @@ func TestHandleMessageForbiddenExtensions(t *testing.T) {
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
configClientMock := client_mock.NewMockClient(mockCtrl)
msg := bytes.NewReader(nil)
msg := event.RawMessage{}
subscriberMock.EXPECT().
Read(msg, &event.FoundURLEvent{}).
Read(&msg, &event.FoundURLEvent{}).
SetArg(1, event.FoundURLEvent{URL: "https://example.onion/image.png?id=12&test=2"}).
Return(nil)
@ -161,9 +160,9 @@ func TestHandleMessageHostnameForbidden(t *testing.T) {
}
for _, test := range tests {
msg := bytes.NewReader(nil)
msg := event.RawMessage{}
subscriberMock.EXPECT().
Read(msg, &event.FoundURLEvent{}).
Read(&msg, &event.FoundURLEvent{}).
SetArg(1, event.FoundURLEvent{URL: test.url}).
Return(nil)
@ -189,9 +188,9 @@ func TestHandleMessage(t *testing.T) {
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
configClientMock := client_mock.NewMockClient(mockCtrl)
msg := bytes.NewReader(nil)
msg := event.RawMessage{}
subscriberMock.EXPECT().
Read(msg, &event.FoundURLEvent{}).
Read(&msg, &event.FoundURLEvent{}).
SetArg(1, event.FoundURLEvent{URL: "https://example.onion"}).
Return(nil)

Loading…
Cancel
Save