event: add PublishJson method

pull/89/head
Aloïs Micard 4 years ago
parent 278f8e58c2
commit 54ff87a130
No known key found for this signature in database
GPG Key ID: 1A0EB82F071F5EFE

@ -121,7 +121,7 @@ func (s *Service) AddResource(res api.ResourceDto) (api.ResourceDto, error) {
// ScheduleURL schedule given url for crawling
func (s *Service) ScheduleURL(url string) error {
// Publish the URL
if err := s.pub.Publish(&event.FoundURLEvent{URL: url}); err != nil {
if err := s.pub.PublishEvent(&event.FoundURLEvent{URL: url}); err != nil {
log.Err(err).Msg("Unable to publish URL")
return err
}

@ -177,7 +177,7 @@ func TestScheduleURL(t *testing.T) {
s := Service{pub: pubMock}
pubMock.EXPECT().Publish(&event.FoundURLEvent{URL: "https://example.onion"})
pubMock.EXPECT().PublishEvent(&event.FoundURLEvent{URL: "https://example.onion"})
if err := s.ScheduleURL("https://example.onion"); err != nil {
t.FailNow()

@ -130,7 +130,7 @@ func (state *state) handleNewURLEvent(subscriber event.Subscriber, body io.Reade
Time: state.clock.Now(),
}
if err := subscriber.Publish(&res); err != nil {
if err := subscriber.PublishEvent(&res); err != nil {
return err
}

@ -112,7 +112,7 @@ func TestHandleNewURLEvent(t *testing.T) {
tn := time.Now()
clockMock.EXPECT().Now().Return(tn)
subscriberMock.EXPECT().Publish(&event.NewResourceEvent{
subscriberMock.EXPECT().PublishEvent(&event.NewResourceEvent{
URL: "https://example.onion/image.png?id=12&test=2",
Body: "Hello",
Headers: map[string]string{"Content-Type": "text/plain", "Server": "Debian"},

@ -8,7 +8,8 @@ import (
// Publisher is something that push an event
type Publisher interface {
Publish(event Event) error
PublishEvent(event Event) error
PublishJSON(exchange string, event []byte) error
Close() error
}
@ -33,23 +34,23 @@ func NewPublisher(amqpURI string) (Publisher, error) {
}, nil
}
func (p *publisher) Publish(event Event) error {
return publishJSON(p.channel, event.Exchange(), event)
}
func (p *publisher) Close() error {
return p.channel.Close()
}
func publishJSON(rc *amqp.Channel, exchange string, event interface{}) error {
func (p *publisher) PublishEvent(event Event) error {
evtBytes, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("error while encoding event: %s", err)
}
return rc.Publish(exchange, "", false, false, amqp.Publishing{
return p.PublishJSON(event.Exchange(), evtBytes)
}
func (p *publisher) PublishJSON(exchange string, event []byte) error {
return p.channel.Publish(exchange, "", false, false, amqp.Publishing{
ContentType: "application/json",
Body: evtBytes,
Body: event,
DeliveryMode: amqp.Persistent,
})
}
func (p *publisher) Close() error {
return p.channel.Close()
}

@ -45,8 +45,21 @@ func NewSubscriber(amqpURI string) (Subscriber, error) {
}, nil
}
func (s *subscriber) Publish(event Event) error {
return publishJSON(s.channel, event.Exchange(), event)
func (s *subscriber) PublishEvent(event Event) error {
evtBytes, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("error while encoding event: %s", err)
}
return s.PublishJSON(event.Exchange(), evtBytes)
}
func (p *subscriber) PublishJSON(exchange string, event []byte) error {
return p.channel.Publish(exchange, "", false, false, amqp.Publishing{
ContentType: "application/json",
Body: event,
DeliveryMode: amqp.Persistent,
})
}
func (s *subscriber) Close() error {

@ -118,7 +118,7 @@ func (state *state) handleNewResourceEvent(subscriber event.Subscriber, body io.
Str("url", url).
Msg("Publishing found URL")
if err := subscriber.Publish(&event.FoundURLEvent{URL: url}); err != nil {
if err := subscriber.PublishEvent(&event.FoundURLEvent{URL: url}); err != nil {
log.Warn().
Str("url", url).
Str("err", err.Error()).

@ -121,10 +121,10 @@ This is sparta (hosted on https://example.org)
// should be called only one time
subscriberMock.EXPECT().
Publish(&event.FoundURLEvent{URL: "https://example.org"}).
PublishEvent(&event.FoundURLEvent{URL: "https://example.org"}).
Return(nil)
subscriberMock.EXPECT().
Publish(&event.FoundURLEvent{URL: "https://google.com/test?test=test"}).
PublishEvent(&event.FoundURLEvent{URL: "https://google.com/test?test=test"}).
Return(nil)
s := state{apiClient: apiClientMock}

@ -172,7 +172,7 @@ func (state *state) handleURLFoundEvent(subscriber event.Subscriber, body io.Rea
// No matches: schedule!
log.Debug().Stringer("url", u).Msg("URL should be scheduled")
if err := subscriber.Publish(&event.NewURLEvent{URL: evt.URL}); err != nil {
if err := subscriber.PublishEvent(&event.NewURLEvent{URL: evt.URL}); err != nil {
return fmt.Errorf("error while publishing URL: %s", err)
}

@ -205,7 +205,7 @@ func TestHandleMessage(t *testing.T) {
Return([]api.ResourceDto{}, int64(0), nil)
subscriberMock.EXPECT().
Publish(&event.NewURLEvent{URL: "https://example.onion"}).
PublishEvent(&event.NewURLEvent{URL: "https://example.onion"}).
Return(nil)
configClientMock.EXPECT().GetForbiddenMimeTypes().Return([]client.ForbiddenMimeType{}, nil)

Loading…
Cancel
Save