Refactor to use RabbitMQ

pull/53/head
Aloïs Micard 4 years ago
parent 09f38acf71
commit 0dc70f63f7
No known key found for this signature in database
GPG Key ID: 1A0EB82F071F5EFE

@ -1,8 +1,10 @@
version: '3'
services:
nats:
image: nats:2.1.9-alpine3.12
rabbitmq:
image: rabbitmq:3.8.9-management-alpine
ports:
- 15672:15672
torproxy:
image: dperson/torproxy:latest
elasticsearch:
@ -22,17 +24,17 @@ services:
image: creekorful/tdsh-crawler:latest
command: >
--log-level debug
--nats-uri nats
--event-srv-uri amqp://guest:guest@rabbitmq:5672
--tor-uri torproxy:9050
restart: always
depends_on:
- nats
- rabbitmq
- torproxy
scheduler:
image: creekorful/tdsh-scheduler:latest
command: >
--log-level debug
--nats-uri nats
--event-srv-uri amqp://guest:guest@rabbitmq:5672
--api-uri http://api:8080
--api-token eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6InNjaGVkdWxlciIsInJpZ2h0cyI6eyJHRVQiOlsiL3YxL3Jlc291cmNlcyJdfX0.dBR6KLQp2h2srY-By3zikEznhQplLCtDrvOkcXP6USY
--forbidden-extensions png
@ -42,24 +44,24 @@ services:
--forbidden-extensions bmp
restart: always
depends_on:
- nats
- rabbitmq
- api
extractor:
image: creekorful/tdsh-extractor:latest
command: >
--log-level debug
--nats-uri nats
--event-srv-uri amqp://guest:guest@rabbitmq:5672
--api-uri http://api:8080
--api-token eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImV4dHJhY3RvciIsInJpZ2h0cyI6eyJQT1NUIjpbIi92MS9yZXNvdXJjZXMiXX19.mytGd_9zyK8y_T3fsWAmH8FnaBNr6qWefwCPDOx4in0
restart: always
depends_on:
- nats
- rabbitmq
- api
api:
image: creekorful/tdsh-api:latest
command: >
--log-level debug
--nats-uri nats
--event-srv-uri amqp://guest:guest@rabbitmq:5672
--elasticsearch-uri http://elasticsearch:9200
--signing-key K==M5RsU_DQa4_XSbkX?L27s^xWmde25
restart: always

@ -9,16 +9,14 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/go-resty/resty/v2 v2.3.0
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.4.2 // indirect
github.com/labstack/echo/v4 v4.1.16
github.com/nats-io/nats-server/v2 v2.1.8 // indirect
github.com/nats-io/nats.go v1.10.0
github.com/olekukonko/tablewriter v0.0.4
github.com/olivere/elastic/v7 v7.0.20
github.com/rs/zerolog v1.20.0
github.com/streadway/amqp v1.0.0
github.com/urfave/cli/v2 v2.2.0
github.com/valyala/fasthttp v1.9.0
github.com/xhit/go-str2duration/v2 v2.0.0
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 // indirect
mvdan.cc/xurls/v2 v2.1.0
)

@ -30,17 +30,7 @@ github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
@ -65,17 +55,6 @@ github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHX
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/nats-server/v2 v2.1.8 h1:d5GoJA6W7vQkmt99Nfdeie3pEFFUEjIwt1YZp50DkIQ=
github.com/nats-io/nats-server/v2 v2.1.8/go.mod h1:rbRrRE/Iv93O/rUvZ9dh4NfT0Cm9HWjW/BqOWLGgYiE=
github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/olekukonko/tablewriter v0.0.4 h1:vHD/YYe1Wolo78koG299f7V/VAS08c6IpCLn+Ejf/w8=
github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA=
github.com/olivere/elastic/v7 v7.0.20 h1:5FFpGPVJlBSlWBOdict406Y3yNTIpVpAiUvdFZeSbAo=
@ -96,6 +75,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
@ -116,7 +97,6 @@ github.com/xhit/go-str2duration/v2 v2.0.0 h1:uFtk6FWB375bP7ewQl+/1wBcn840GPhnySO
github.com/xhit/go-str2duration/v2 v2.0.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@ -148,7 +128,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8=
@ -175,14 +154,6 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=

@ -22,7 +22,7 @@ func GetApp() *cli.App {
Usage: "Trandoshan API component",
Flags: []cli.Flag{
logging.GetLogFlag(),
util.GetNATSURIFlag(),
util.GetEventSrvURI(),
&cli.StringFlag{
Name: "elasticsearch-uri",
Usage: "URI to the Elasticsearch server",
@ -55,7 +55,7 @@ func execute(c *cli.Context) error {
log.Info().Str("ver", c.App.Version).
Str("elasticsearch-uri", c.String("elasticsearch-uri")).
Str("nats-uri", c.String("nats-uri")).
Str("event-srv-uri", c.String("event-srv-uri")).
Msg("Starting tdsh-api")
signingKey := []byte(c.String("signing-key"))

@ -21,10 +21,10 @@ type svc struct {
}
func newService(c *cli.Context) (service, error) {
// Connect to the NATS server
pub, err := messaging.NewPublisher(c.String("nats-uri"))
// Connect to the messaging server
pub, err := messaging.NewPublisher(c.String("event-srv-uri"))
if err != nil {
log.Err(err).Str("uri", c.String("nats-uri")).Msg("Error while connecting to NATS server")
log.Err(err).Str("uri", c.String("event-srv-uri")).Msg("Error while connecting to event server")
return nil, err
}

@ -6,11 +6,11 @@ import (
"github.com/creekorful/trandoshan/internal/logging"
"github.com/creekorful/trandoshan/internal/messaging"
"github.com/creekorful/trandoshan/internal/util"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"github.com/valyala/fasthttp"
"github.com/valyala/fasthttp/fasthttpproxy"
"io"
"strings"
"time"
)
@ -25,7 +25,7 @@ func GetApp() *cli.App {
Usage: "Trandoshan crawler component",
Flags: []cli.Flag{
logging.GetLogFlag(),
util.GetNATSURIFlag(),
util.GetEventSrvURI(),
&cli.StringFlag{
Name: "tor-uri",
Usage: "URI to the TOR SOCKS proxy",
@ -51,7 +51,7 @@ func execute(ctx *cli.Context) error {
log.Info().
Str("ver", ctx.App.Version).
Str("nats-uri", ctx.String("nats-uri")).
Str("event-srv-uri", ctx.String("event-srv-uri")).
Str("tor-uri", ctx.String("tor-uri")).
Strs("allowed-content-types", ctx.StringSlice("allowed-ct")).
Msg("Starting tdsh-crawler")
@ -67,8 +67,8 @@ func execute(ctx *cli.Context) error {
Name: ctx.String("user-agent"),
}
// Create the NATS subscriber
sub, err := messaging.NewSubscriber(ctx.String("nats-uri"))
// Create the subscriber
sub, err := messaging.NewSubscriber(ctx.String("event-srv-uri"))
if err != nil {
return err
}
@ -85,7 +85,7 @@ func execute(ctx *cli.Context) error {
}
func handleMessage(httpClient *fasthttp.Client, allowedContentTypes []string) messaging.MsgHandler {
return func(sub messaging.Subscriber, msg *nats.Msg) error {
return func(sub messaging.Subscriber, msg io.Reader) error {
var urlMsg messaging.URLTodoMsg
if err := sub.ReadMsg(msg, &urlMsg); err != nil {
return err

@ -8,9 +8,9 @@ import (
"github.com/creekorful/trandoshan/internal/logging"
"github.com/creekorful/trandoshan/internal/messaging"
"github.com/creekorful/trandoshan/internal/util"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"io"
"mvdan.cc/xurls/v2"
"regexp"
"strings"
@ -29,7 +29,7 @@ func GetApp() *cli.App {
Usage: "Trandoshan extractor component",
Flags: []cli.Flag{
logging.GetLogFlag(),
util.GetNATSURIFlag(),
util.GetEventSrvURI(),
util.GetAPIURIFlag(),
util.GetAPITokenFlag(),
},
@ -42,14 +42,14 @@ func execute(ctx *cli.Context) error {
log.Info().
Str("ver", ctx.App.Version).
Str("nats-uri", ctx.String("nats-uri")).
Str("event-srv-uri", ctx.String("event-srv-uri")).
Str("api-uri", ctx.String("api-uri")).
Msg("Starting tdsh-extractor")
apiClient := util.GetAPIClient(ctx)
// Create the NATS subscriber
sub, err := messaging.NewSubscriber(ctx.String("nats-uri"))
// Create the event subscriber
sub, err := messaging.NewSubscriber(ctx.String("event-srv-uri"))
if err != nil {
return err
}
@ -66,7 +66,7 @@ func execute(ctx *cli.Context) error {
}
func handleMessage(apiClient api.Client) messaging.MsgHandler {
return func(sub messaging.Subscriber, msg *nats.Msg) error {
return func(sub messaging.Subscriber, msg io.Reader) error {
var resMsg messaging.NewResourceMsg
if err := sub.ReadMsg(msg, &resMsg); err != nil {
return err

@ -1,12 +1,12 @@
package extractor
import (
"bytes"
"github.com/creekorful/trandoshan/api"
"github.com/creekorful/trandoshan/api_mock"
"github.com/creekorful/trandoshan/internal/messaging"
"github.com/creekorful/trandoshan/internal/messaging_mock"
"github.com/golang/mock/gomock"
"github.com/nats-io/nats.go"
"testing"
)
@ -90,9 +90,9 @@ This is sparta
apiClientMock := api_mock.NewMockClient(mockCtrl)
subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl)
msg := nats.Msg{}
msg := bytes.NewReader(nil)
subscriberMock.EXPECT().
ReadMsg(&msg, &messaging.NewResourceMsg{}).
ReadMsg(msg, &messaging.NewResourceMsg{}).
SetArg(1, messaging.NewResourceMsg{URL: "https://example.onion", Body: body}).
Return(nil)
@ -113,7 +113,7 @@ This is sparta
PublishMsg(&messaging.URLFoundMsg{URL: "https://google.com/test?test=test"}).
Return(nil)
if err := handleMessage(apiClientMock)(subscriberMock, &msg); err != nil {
if err := handleMessage(apiClientMock)(subscriberMock, msg); err != nil {
t.FailNow()
}
}

@ -1,7 +0,0 @@
package messaging
// Msg represent a message send-able trough NATS
type Msg interface {
// Subject returns the subject where message should be push
Subject() string
}

@ -11,6 +11,12 @@ const (
NewResourceSubject = "resource.new"
)
// Msg represent a message send-able trough queuing
type Msg interface {
// Subject returns the subject where message should be push
Subject() string
}
// URLTodoMsg represent an URL to crawl
type URLTodoMsg struct {
URL string `json:"url"`

@ -3,7 +3,7 @@ package messaging
import (
"encoding/json"
"fmt"
"github.com/nats-io/nats.go"
"github.com/streadway/amqp"
)
// Publisher is something that push msg to an event queue
@ -13,34 +13,42 @@ type Publisher interface {
}
type publisher struct {
nc *nats.Conn
rc *amqp.Channel
}
// NewPublisher create a new Publisher instance
func NewPublisher(natsURI string) (Publisher, error) {
nc, err := nats.Connect(natsURI)
func NewPublisher(amqpURI string) (Publisher, error) {
conn, err := amqp.Dial(amqpURI)
if err != nil {
return nil, err
}
c, err := conn.Channel()
if err != nil {
return nil, err
}
return &publisher{
nc: nc,
rc: c,
}, nil
}
func (p *publisher) PublishMsg(msg Msg) error {
return publishJSON(p.nc, msg.Subject(), msg)
return publishJSON(p.rc, msg.Subject(), msg)
}
func (p *publisher) Close() {
p.nc.Close()
_ = p.rc.Close()
}
func publishJSON(nc *nats.Conn, subject string, msg interface{}) error {
func publishJSON(rc *amqp.Channel, subject string, msg interface{}) error {
msgBytes, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("error while encoding message: %s", err)
}
return nc.Publish(subject, msgBytes)
return rc.Publish("", subject, false, false, amqp.Publishing{
ContentType: "application/json",
Body: msgBytes,
})
}

@ -1,79 +1,92 @@
package messaging
import (
"context"
"bytes"
"encoding/json"
"fmt"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
"github.com/streadway/amqp"
"io"
)
// MsgHandler represent an handler for a NATS subscriber
type MsgHandler func(s Subscriber, msg *nats.Msg) error
// MsgHandler represent an handler for a subscriber
type MsgHandler func(s Subscriber, body io.Reader) error
// Subscriber is something that read msg from an event queue
type Subscriber interface {
Publisher
ReadMsg(natsMsg *nats.Msg, msg Msg) error
ReadMsg(body io.Reader, msg Msg) error
QueueSubscribe(subject, queue string, handler MsgHandler) error
Close()
}
// Subscriber represent a NATS subscriber
// Subscriber represent a subscriber
type subscriber struct {
nc *nats.Conn
rc *amqp.Channel
}
// NewSubscriber create a new subscriber and connect it to given NATS server
func NewSubscriber(address string) (Subscriber, error) {
nc, err := nats.Connect(address)
// NewSubscriber create a new subscriber and connect it to given server
func NewSubscriber(amqpURI string) (Subscriber, error) {
conn, err := amqp.Dial(amqpURI)
if err != nil {
return nil, err
}
c, err := conn.Channel()
if err != nil {
return nil, err
}
if err := c.Qos(1, 0, false); err != nil {
return nil, err
}
return &subscriber{
nc: nc,
rc: c,
}, nil
}
func (s *subscriber) ReadMsg(natsMsg *nats.Msg, msg Msg) error {
return readJSON(natsMsg, msg)
func (s *subscriber) ReadMsg(body io.Reader, msg Msg) error {
return readJSON(body, msg)
}
func (s *subscriber) QueueSubscribe(subject, queue string, handler MsgHandler) error {
// Create the subscriber
sub, err := s.nc.QueueSubscribeSync(subject, queue)
q, err := s.rc.QueueDeclare(subject, true, false, false, false, nil)
if err != nil {
return err
return fmt.Errorf("error while declaring queue: %s", err)
}
for {
// Read incoming message
msg, err := sub.NextMsgWithContext(context.Background())
if err != nil {
log.Warn().Str("err", err.Error()).Msg("error while reading incoming message, skipping it")
deliveries, err := s.rc.Consume(q.Name, "", false, false, false, false, nil)
if err != nil {
return fmt.Errorf("error while consuming queue: %s", err)
}
for d := range deliveries {
if err := handler(s, bytes.NewReader(d.Body)); err != nil {
log.Err(err).Msg("error while processing message")
continue
}
// ... And process it
if err := handler(s, msg); err != nil {
log.Err(err).Msg("error while processing message")
// Ack no matter what since we doesn't care about failing messages
if err := d.Ack(false); err != nil {
log.Err(err).Msg("error while ack`ing message")
continue
}
}
return nil
}
func (s *subscriber) PublishMsg(msg Msg) error {
return publishJSON(s.nc, msg.Subject(), msg)
return publishJSON(s.rc, msg.Subject(), msg)
}
func (s *subscriber) Close() {
s.nc.Close()
_ = s.rc.Close()
}
func readJSON(msg *nats.Msg, body interface{}) error {
if err := json.Unmarshal(msg.Data, body); err != nil {
func readJSON(body io.Reader, where interface{}) error {
if err := json.NewDecoder(body).Decode(where); err != nil {
return fmt.Errorf("error while decoding message: %s", err)
}

@ -6,10 +6,10 @@ import (
"github.com/creekorful/trandoshan/internal/logging"
"github.com/creekorful/trandoshan/internal/messaging"
"github.com/creekorful/trandoshan/internal/util"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"github.com/xhit/go-str2duration/v2"
"io"
"net/url"
"strings"
"time"
@ -23,7 +23,7 @@ func GetApp() *cli.App {
Usage: "Trandoshan scheduler component",
Flags: []cli.Flag{
logging.GetLogFlag(),
util.GetNATSURIFlag(),
util.GetEventSrvURI(),
util.GetAPIURIFlag(),
util.GetAPITokenFlag(),
&cli.StringFlag{
@ -46,7 +46,7 @@ func execute(ctx *cli.Context) error {
log.Info().
Str("ver", ctx.App.Version).
Str("nats-uri", ctx.String("nats-uri")).
Str("event-srv-uri", ctx.String("event-srv-uri")).
Str("api-uri", ctx.String("api-uri")).
Strs("forbidden-exts", ctx.StringSlice("forbidden-extensions")).
Dur("refresh-delay", refreshDelay).
@ -55,8 +55,8 @@ func execute(ctx *cli.Context) error {
// Create the API client
apiClient := util.GetAPIClient(ctx)
// Create the NATS subscriber
sub, err := messaging.NewSubscriber(ctx.String("nats-uri"))
// Create the subscriber
sub, err := messaging.NewSubscriber(ctx.String("event-srv-uri"))
if err != nil {
return err
}
@ -73,7 +73,7 @@ func execute(ctx *cli.Context) error {
}
func handleMessage(apiClient api.Client, refreshDelay time.Duration, forbiddenExtensions []string) messaging.MsgHandler {
return func(sub messaging.Subscriber, msg *nats.Msg) error {
return func(sub messaging.Subscriber, msg io.Reader) error {
var urlMsg messaging.URLFoundMsg
if err := sub.ReadMsg(msg, &urlMsg); err != nil {
return err

@ -1,12 +1,12 @@
package scheduler
import (
"bytes"
"github.com/creekorful/trandoshan/api"
"github.com/creekorful/trandoshan/api_mock"
"github.com/creekorful/trandoshan/internal/messaging"
"github.com/creekorful/trandoshan/internal/messaging_mock"
"github.com/golang/mock/gomock"
"github.com/nats-io/nats.go"
"testing"
"time"
)
@ -36,13 +36,13 @@ func TestHandleMessageNotOnion(t *testing.T) {
apiClientMock := api_mock.NewMockClient(mockCtrl)
subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl)
msg := nats.Msg{}
msg := bytes.NewReader(nil)
subscriberMock.EXPECT().
ReadMsg(&msg, &messaging.URLFoundMsg{}).
ReadMsg(msg, &messaging.URLFoundMsg{}).
SetArg(1, messaging.URLFoundMsg{URL: "https://example.org"}).
Return(nil)
if err := handleMessage(apiClientMock, -1, []string{})(subscriberMock, &msg); err != nil {
if err := handleMessage(apiClientMock, -1, []string{})(subscriberMock, msg); err != nil {
t.FailNow()
}
}
@ -54,9 +54,9 @@ func TestHandleMessageAlreadyCrawled(t *testing.T) {
apiClientMock := api_mock.NewMockClient(mockCtrl)
subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl)
msg := nats.Msg{}
msg := bytes.NewReader(nil)
subscriberMock.EXPECT().
ReadMsg(&msg, &messaging.URLFoundMsg{}).
ReadMsg(msg, &messaging.URLFoundMsg{}).
SetArg(1, messaging.URLFoundMsg{URL: "https://example.onion"}).
Return(nil)
@ -64,7 +64,7 @@ func TestHandleMessageAlreadyCrawled(t *testing.T) {
SearchResources("https://example.onion", "", time.Time{}, time.Time{}, 1, 1).
Return([]api.ResourceDto{}, int64(1), nil)
if err := handleMessage(apiClientMock, -1, []string{"png"})(subscriberMock, &msg); err != nil {
if err := handleMessage(apiClientMock, -1, []string{"png"})(subscriberMock, msg); err != nil {
t.FailNow()
}
}
@ -76,13 +76,13 @@ func TestHandleMessageForbiddenExtensions(t *testing.T) {
apiClientMock := api_mock.NewMockClient(mockCtrl)
subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl)
msg := nats.Msg{}
msg := bytes.NewReader(nil)
subscriberMock.EXPECT().
ReadMsg(&msg, &messaging.URLFoundMsg{}).
ReadMsg(msg, &messaging.URLFoundMsg{}).
SetArg(1, messaging.URLFoundMsg{URL: "https://example.onion/image.png?id=12&test=2"}).
Return(nil)
if err := handleMessage(apiClientMock, -1, []string{"png"})(subscriberMock, &msg); err != nil {
if err := handleMessage(apiClientMock, -1, []string{"png"})(subscriberMock, msg); err != nil {
t.FailNow()
}
}
@ -94,9 +94,9 @@ func TestHandleMessage(t *testing.T) {
apiClientMock := api_mock.NewMockClient(mockCtrl)
subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl)
msg := nats.Msg{}
msg := bytes.NewReader(nil)
subscriberMock.EXPECT().
ReadMsg(&msg, &messaging.URLFoundMsg{}).
ReadMsg(msg, &messaging.URLFoundMsg{}).
SetArg(1, messaging.URLFoundMsg{URL: "https://example.onion"}).
Return(nil)
@ -108,7 +108,7 @@ func TestHandleMessage(t *testing.T) {
PublishMsg(&messaging.URLTodoMsg{URL: "https://example.onion"}).
Return(nil)
if err := handleMessage(apiClientMock, -1, []string{})(subscriberMock, &msg); err != nil {
if err := handleMessage(apiClientMock, -1, []string{})(subscriberMock, msg); err != nil {
t.FailNow()
}
}

@ -0,0 +1,12 @@
package util
import "github.com/urfave/cli/v2"
// GetEventSrvURI return the URI of the event server
func GetEventSrvURI() *cli.StringFlag {
return &cli.StringFlag{
Name: "event-srv-uri",
Usage: "URI to the event server",
Required: true,
}
}

@ -1,12 +0,0 @@
package util
import "github.com/urfave/cli/v2"
// GetNATSURIFlag return the nats uri from cli flag
func GetNATSURIFlag() *cli.StringFlag {
return &cli.StringFlag{
Name: "nats-uri",
Usage: "URI to the NATS server",
Required: true,
}
}
Loading…
Cancel
Save