Refactor crawler

pull/79/head
Aloïs Micard 3 years ago
parent 5ad83d57a0
commit 4ff76ed552
No known key found for this signature in database
GPG Key ID: 1A0EB82F071F5EFE

@ -4,8 +4,8 @@ import (
"crypto/tls"
"fmt"
"github.com/creekorful/trandoshan/internal/crawler/http"
"github.com/creekorful/trandoshan/internal/event"
"github.com/creekorful/trandoshan/internal/logging"
"github.com/creekorful/trandoshan/internal/messaging"
"github.com/creekorful/trandoshan/internal/util"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
@ -13,7 +13,10 @@ import (
"github.com/valyala/fasthttp/fasthttpproxy"
"io"
"io/ioutil"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
@ -70,7 +73,7 @@ func execute(ctx *cli.Context) error {
})
// Create the subscriber
sub, err := messaging.NewSubscriber(ctx.String("hub-uri"))
sub, err := event.NewSubscriber(ctx.String("hub-uri"))
if err != nil {
return err
}
@ -78,38 +81,56 @@ func execute(ctx *cli.Context) error {
log.Info().Msg("Successfully initialized tdsh-crawler. Waiting for URLs")
handler := handleMessage(httpClient, ctx.StringSlice("allowed-ct"))
if err := sub.QueueSubscribe(messaging.URLTodoSubject, "crawlers", handler); err != nil {
s := State{
httpClient: httpClient,
allowedContentTypes: ctx.StringSlice("allowed-ct"),
}
if err := sub.SubscribeAsync(event.NewURLExchange, "crawlingQueue", s.handleNewURLEvent); err != nil {
return err
}
// Handle graceful shutdown
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
// Block until we receive our signal.
<-c
if err := sub.Close(); err != nil {
return err
}
return nil
}
func handleMessage(httpClient http.Client, allowedContentTypes []string) messaging.MsgHandler {
return func(sub messaging.Subscriber, msg io.Reader) error {
var urlMsg messaging.URLTodoMsg
if err := sub.ReadMsg(msg, &urlMsg); err != nil {
return err
}
type State struct {
httpClient http.Client
allowedContentTypes []string
}
body, headers, err := crawURL(httpClient, urlMsg.URL, allowedContentTypes)
if err != nil {
return fmt.Errorf("error while crawling URL: %s", err)
}
func (state *State) handleNewURLEvent(subscriber event.Subscriber, body io.Reader) error {
var evt event.NewURLEvent
if err := subscriber.Read(body, &evt); err != nil {
return err
}
// Publish resource body
res := messaging.NewResourceMsg{
URL: urlMsg.URL,
Body: body,
Headers: headers,
}
if err := sub.PublishMsg(&res); err != nil {
return fmt.Errorf("error while publishing resource: %s", err)
}
b, headers, err := crawURL(state.httpClient, evt.URL, state.allowedContentTypes)
if err != nil {
return err
}
return nil
res := event.NewResourceEvent{
URL: evt.URL,
Body: b,
Headers: headers,
}
if err := subscriber.Publish(&res); err != nil {
return err
}
return nil
}
func crawURL(httpClient http.Client, url string, allowedContentTypes []string) (string, map[string]string, error) {

@ -3,8 +3,8 @@ package crawler
import (
"bytes"
"github.com/creekorful/trandoshan/internal/crawler/http_mock"
"github.com/creekorful/trandoshan/internal/messaging"
"github.com/creekorful/trandoshan/internal/messaging_mock"
"github.com/creekorful/trandoshan/internal/event"
"github.com/creekorful/trandoshan/internal/event_mock"
"github.com/golang/mock/gomock"
"strings"
"testing"
@ -91,14 +91,14 @@ func TestHandleMessage(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl)
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
httpClientMock := http_mock.NewMockClient(mockCtrl)
httpResponseMock := http_mock.NewMockResponse(mockCtrl)
msg := bytes.NewReader(nil)
subscriberMock.EXPECT().
ReadMsg(msg, &messaging.URLTodoMsg{}).
SetArg(1, messaging.URLTodoMsg{URL: "https://example.onion/image.png?id=12&test=2"}).
Read(msg, &event.NewURLEvent{}).
SetArg(1, event.NewURLEvent{URL: "https://example.onion/image.png?id=12&test=2"}).
Return(nil)
httpResponseMock.EXPECT().Headers().Times(2).Return(map[string]string{"Content-Type": "text/plain", "Server": "Debian"})
@ -106,13 +106,14 @@ func TestHandleMessage(t *testing.T) {
httpClientMock.EXPECT().Get("https://example.onion/image.png?id=12&test=2").Return(httpResponseMock, nil)
subscriberMock.EXPECT().PublishMsg(&messaging.NewResourceMsg{
subscriberMock.EXPECT().Publish(&event.NewResourceEvent{
URL: "https://example.onion/image.png?id=12&test=2",
Body: "Hello",
Headers: map[string]string{"Content-Type": "text/plain", "Server": "Debian"},
}).Return(nil)
if err := handleMessage(httpClientMock, []string{"text/plain", "text/css"})(subscriberMock, msg); err != nil {
s := State{httpClient: httpClientMock, allowedContentTypes: []string{"text/plain", "text/css"}}
if err := s.handleNewURLEvent(subscriberMock, msg); err != nil {
t.Fail()
}
}

@ -10,7 +10,7 @@ import (
)
// Handler represent an event handler
type Handler func(s Subscriber, body io.Reader) error
type Handler func(Subscriber, io.Reader) error
// Subscriber is something that read msg from an event queue
type Subscriber interface {

Loading…
Cancel
Save