Rewrite event support
parent
91a0dbb0ba
commit
5ad83d57a0
@ -0,0 +1,50 @@
|
||||
package event
|
||||
|
||||
//go:generate mockgen -destination=../event_mock/event_mock.go -package=event_mock . Publisher,Subscriber
|
||||
|
||||
const (
|
||||
// NewURLExchange is the subject used when an URL is schedule for crawling
|
||||
NewURLExchange = "url.new"
|
||||
// FoundURLExchange is the subject used when an URL is extracted from resource
|
||||
FoundURLExchange = "url.found"
|
||||
// NewResourceExchange is the subject used when a new resource has been crawled
|
||||
NewResourceExchange = "resource.new"
|
||||
)
|
||||
|
||||
// Event represent a event
|
||||
type Event interface {
|
||||
// Exchange returns the exchange where event should be push
|
||||
Exchange() string
|
||||
}
|
||||
|
||||
// NewURLEvent represent an URL to crawl
|
||||
type NewURLEvent struct {
|
||||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
// Exchange returns the exchange where event should be push
|
||||
func (msg *NewURLEvent) Exchange() string {
|
||||
return NewURLExchange
|
||||
}
|
||||
|
||||
// FoundURLEvent represent a found URL
|
||||
type FoundURLEvent struct {
|
||||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
// Exchange returns the exchange where event should be push
|
||||
func (msg *FoundURLEvent) Exchange() string {
|
||||
return FoundURLExchange
|
||||
}
|
||||
|
||||
// NewResourceEvent represent a crawled resource
|
||||
type NewResourceEvent struct {
|
||||
URL string `json:"url"`
|
||||
Body string `json:"body"`
|
||||
Headers map[string]string `json:"headers"`
|
||||
}
|
||||
|
||||
// Exchange returns the exchange where event should be push
|
||||
func (msg *NewResourceEvent) Exchange() string {
|
||||
return NewResourceExchange
|
||||
}
|
@ -0,0 +1,55 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
// Publisher is something that push an event
|
||||
type Publisher interface {
|
||||
Publish(event Event) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
type publisher struct {
|
||||
channel *amqp.Channel
|
||||
}
|
||||
|
||||
// NewPublisher create a new Publisher instance
|
||||
func NewPublisher(amqpURI string) (Publisher, error) {
|
||||
conn, err := amqp.Dial(amqpURI)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := conn.Channel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &publisher{
|
||||
channel: c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *publisher) Publish(event Event) error {
|
||||
return publishJSON(p.channel, event.Exchange(), event)
|
||||
}
|
||||
|
||||
func (p *publisher) Close() error {
|
||||
return p.channel.Close()
|
||||
}
|
||||
|
||||
func publishJSON(rc *amqp.Channel, exchange string, event interface{}) error {
|
||||
evtBytes, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while encoding event: %s", err)
|
||||
}
|
||||
|
||||
return rc.Publish(exchange, "", false, false, amqp.Publishing{
|
||||
ContentType: "application/json",
|
||||
Body: evtBytes,
|
||||
DeliveryMode: amqp.Persistent,
|
||||
})
|
||||
}
|
@ -0,0 +1,105 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/streadway/amqp"
|
||||
"io"
|
||||
)
|
||||
|
||||
// Handler represent an event handler
|
||||
type Handler func(s Subscriber, body io.Reader) error
|
||||
|
||||
// Subscriber is something that read msg from an event queue
|
||||
type Subscriber interface {
|
||||
Publisher
|
||||
|
||||
Read(body io.Reader, event Event) error
|
||||
SubscribeAsync(exchange, queue string, handler Handler) error
|
||||
}
|
||||
|
||||
// Subscriber represent a subscriber
|
||||
type subscriber struct {
|
||||
channel *amqp.Channel
|
||||
}
|
||||
|
||||
// NewSubscriber create a new subscriber and connect it to given server
|
||||
func NewSubscriber(amqpURI string) (Subscriber, error) {
|
||||
conn, err := amqp.Dial(amqpURI)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := conn.Channel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := c.Qos(1, 0, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &subscriber{
|
||||
channel: c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *subscriber) Publish(event Event) error {
|
||||
return publishJSON(s.channel, event.Exchange(), event)
|
||||
}
|
||||
|
||||
func (s *subscriber) Close() error {
|
||||
return s.channel.Close()
|
||||
}
|
||||
|
||||
func (s *subscriber) Read(body io.Reader, event Event) error {
|
||||
return readJSON(body, event)
|
||||
}
|
||||
|
||||
func (s *subscriber) SubscribeAsync(exchange, queue string, handler Handler) error {
|
||||
// First of all declare the exchange
|
||||
if err := s.channel.ExchangeDeclare(exchange, amqp.ExchangeFanout, true, false, false, false, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Then declare the queue
|
||||
q, err := s.channel.QueueDeclare(queue, true, false, false, false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Bind the queue to the exchange
|
||||
if err := s.channel.QueueBind(q.Name, "", exchange, false, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start consuming asynchronously
|
||||
deliveries, err := s.channel.Consume(q.Name, "", false, false, false, false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for delivery := range deliveries {
|
||||
if err := handler(s, bytes.NewReader(delivery.Body)); err != nil {
|
||||
log.Err(err).Msg("error while processing event")
|
||||
}
|
||||
|
||||
// Ack no matter what happen since we doesn't care about failing event (yet?)
|
||||
if err := delivery.Ack(false); err != nil {
|
||||
log.Err(err).Msg("error while acknowledging event")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func readJSON(body io.Reader, event interface{}) error {
|
||||
if err := json.NewDecoder(body).Decode(event); err != nil {
|
||||
return fmt.Errorf("error while decoding event: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -1,50 +0,0 @@
|
||||
package messaging
|
||||
|
||||
//go:generate mockgen -destination=../messaging_mock/publisher_mock.go -package=messaging_mock . Publisher,Subscriber
|
||||
|
||||
const (
|
||||
// URLTodoSubject is the subject used when an URL is schedule for crawling
|
||||
URLTodoSubject = "url.todo"
|
||||
// URLFoundSubject is the subject used when an URL is extracted from resource
|
||||
URLFoundSubject = "url.found"
|
||||
// NewResourceSubject is the subject used when a new resource has been crawled
|
||||
NewResourceSubject = "resource.new"
|
||||
)
|
||||
|
||||
// Msg represent a message send-able trough queuing
|
||||
type Msg interface {
|
||||
// Subject returns the subject where message should be push
|
||||
Subject() string
|
||||
}
|
||||
|
||||
// URLTodoMsg represent an URL to crawl
|
||||
type URLTodoMsg struct {
|
||||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
// Subject returns the subject where message should be push
|
||||
func (msg *URLTodoMsg) Subject() string {
|
||||
return URLTodoSubject
|
||||
}
|
||||
|
||||
// URLFoundMsg represent a found URL
|
||||
type URLFoundMsg struct {
|
||||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
// Subject returns the subject where message should be push
|
||||
func (msg *URLFoundMsg) Subject() string {
|
||||
return URLFoundSubject
|
||||
}
|
||||
|
||||
// NewResourceMsg represent a crawled resource
|
||||
type NewResourceMsg struct {
|
||||
URL string `json:"url"`
|
||||
Body string `json:"body"`
|
||||
Headers map[string]string `json:"headers"`
|
||||
}
|
||||
|
||||
// Subject returns the subject where message should be push
|
||||
func (msg *NewResourceMsg) Subject() string {
|
||||
return NewResourceSubject
|
||||
}
|
@ -1,55 +0,0 @@
|
||||
package messaging
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
// Publisher is something that push msg to an event queue
|
||||
type Publisher interface {
|
||||
PublishMsg(msg Msg) error
|
||||
Close()
|
||||
}
|
||||
|
||||
type publisher struct {
|
||||
rc *amqp.Channel
|
||||
}
|
||||
|
||||
// NewPublisher create a new Publisher instance
|
||||
func NewPublisher(amqpURI string) (Publisher, error) {
|
||||
conn, err := amqp.Dial(amqpURI)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := conn.Channel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &publisher{
|
||||
rc: c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *publisher) PublishMsg(msg Msg) error {
|
||||
return publishJSON(p.rc, msg.Subject(), msg)
|
||||
}
|
||||
|
||||
func (p *publisher) Close() {
|
||||
_ = p.rc.Close()
|
||||
}
|
||||
|
||||
func publishJSON(rc *amqp.Channel, subject string, msg interface{}) error {
|
||||
msgBytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while encoding message: %s", err)
|
||||
}
|
||||
|
||||
return rc.Publish("", subject, false, false, amqp.Publishing{
|
||||
ContentType: "application/json",
|
||||
Body: msgBytes,
|
||||
DeliveryMode: amqp.Persistent,
|
||||
})
|
||||
}
|
@ -1,92 +0,0 @@
|
||||
package messaging
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/streadway/amqp"
|
||||
"io"
|
||||
)
|
||||
|
||||
// MsgHandler represent an handler for a subscriber
|
||||
type MsgHandler func(s Subscriber, body io.Reader) error
|
||||
|
||||
// Subscriber is something that read msg from an event queue
|
||||
type Subscriber interface {
|
||||
Publisher
|
||||
|
||||
ReadMsg(body io.Reader, msg Msg) error
|
||||
QueueSubscribe(subject, queue string, handler MsgHandler) error
|
||||
Close()
|
||||
}
|
||||
|
||||
// Subscriber represent a subscriber
|
||||
type subscriber struct {
|
||||
rc *amqp.Channel
|
||||
}
|
||||
|
||||
// NewSubscriber create a new subscriber and connect it to given server
|
||||
func NewSubscriber(amqpURI string) (Subscriber, error) {
|
||||
conn, err := amqp.Dial(amqpURI)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := conn.Channel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := c.Qos(1, 0, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &subscriber{
|
||||
rc: c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *subscriber) ReadMsg(body io.Reader, msg Msg) error {
|
||||
return readJSON(body, msg)
|
||||
}
|
||||
|
||||
func (s *subscriber) QueueSubscribe(subject, queue string, handler MsgHandler) error {
|
||||
q, err := s.rc.QueueDeclare(subject, true, false, false, false, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while declaring queue: %s", err)
|
||||
}
|
||||
|
||||
deliveries, err := s.rc.Consume(q.Name, "", false, false, false, false, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while consuming queue: %s", err)
|
||||
}
|
||||
|
||||
for d := range deliveries {
|
||||
if err := handler(s, bytes.NewReader(d.Body)); err != nil {
|
||||
log.Err(err).Msg("error while processing message")
|
||||
}
|
||||
|
||||
// Ack no matter what since we doesn't care about failing messages
|
||||
if err := d.Ack(false); err != nil {
|
||||
log.Err(err).Msg("error while ack`ing message")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *subscriber) PublishMsg(msg Msg) error {
|
||||
return publishJSON(s.rc, msg.Subject(), msg)
|
||||
}
|
||||
|
||||
func (s *subscriber) Close() {
|
||||
_ = s.rc.Close()
|
||||
}
|
||||
|
||||
func readJSON(body io.Reader, where interface{}) error {
|
||||
if err := json.NewDecoder(body).Decode(where); err != nil {
|
||||
return fmt.Errorf("error while decoding message: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue