Refactor scheduler

pull/79/head
Aloïs Micard 4 years ago
parent ca47be907f
commit 8516c8a00c
No known key found for this signature in database
GPG Key ID: 1A0EB82F071F5EFE

@ -4,14 +4,17 @@ import (
"fmt"
"github.com/creekorful/trandoshan/api"
"github.com/creekorful/trandoshan/internal/duration"
"github.com/creekorful/trandoshan/internal/event"
"github.com/creekorful/trandoshan/internal/logging"
"github.com/creekorful/trandoshan/internal/messaging"
"github.com/creekorful/trandoshan/internal/util"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"io"
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
@ -56,81 +59,101 @@ func execute(ctx *cli.Context) error {
apiClient := util.GetAPIClient(ctx)
// Create the subscriber
sub, err := messaging.NewSubscriber(ctx.String("hub-uri"))
sub, err := event.NewSubscriber(ctx.String("hub-uri"))
if err != nil {
return err
}
defer sub.Close()
state := state{
apiClient: apiClient,
refreshDelay: refreshDelay,
forbiddenExtensions: ctx.StringSlice("forbidden-extensions"),
}
if err := sub.SubscribeAsync(event.FoundURLExchange, "schedulingQueue", state.handleURLFoundEvent); err != nil {
return err
}
log.Info().Msg("Successfully initialized tdsh-scheduler. Waiting for URLs")
handler := handleMessage(apiClient, refreshDelay, ctx.StringSlice("forbidden-extensions"))
if err := sub.QueueSubscribe(messaging.URLFoundSubject, "schedulers", handler); err != nil {
// Handle graceful shutdown
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
// Block until we receive our signal.
<-c
if err := sub.Close(); err != nil {
return err
}
return nil
}
func handleMessage(apiClient api.Client, refreshDelay time.Duration, forbiddenExtensions []string) messaging.MsgHandler {
return func(sub messaging.Subscriber, msg io.Reader) error {
var urlMsg messaging.URLFoundMsg
if err := sub.ReadMsg(msg, &urlMsg); err != nil {
return err
}
type state struct {
apiClient api.Client
refreshDelay time.Duration
forbiddenExtensions []string
}
log.Trace().Str("url", urlMsg.URL).Msg("Processing URL")
func (state *state) handleURLFoundEvent(subscriber event.Subscriber, body io.Reader) error {
var evt event.FoundURLEvent
if err := subscriber.Read(body, &evt); err != nil {
return err
}
u, err := url.Parse(urlMsg.URL)
if err != nil {
return fmt.Errorf("error while parsing URL: %s", err)
}
log.Trace().Str("url", evt.URL).Msg("Processing URL")
// Make sure URL is valid .onion
if !strings.Contains(u.Host, ".onion") {
log.Trace().Stringer("url", u).Msg("URL is not a valid hidden service")
return nil // Technically not an error
}
u, err := url.Parse(evt.URL)
if err != nil {
return fmt.Errorf("error while parsing URL: %s", err)
}
// Make sure protocol is allowed
if !strings.HasPrefix(u.Scheme, "http") {
log.Trace().Stringer("url", u).Msg("URL has invalid scheme")
return nil // Technically not an error
}
// Make sure URL is valid .onion
if !strings.Contains(u.Host, ".onion") {
log.Trace().Stringer("url", u).Msg("URL is not a valid hidden service")
return nil // Technically not an error
}
// Make sure extension is not forbidden
for _, ext := range forbiddenExtensions {
if strings.HasSuffix(u.Path, "."+ext) {
log.Trace().
Stringer("url", u).
Str("ext", ext).
Msg("Skipping URL with forbidden extension")
return nil // Technically not an error
}
}
// Make sure protocol is allowed
if !strings.HasPrefix(u.Scheme, "http") {
log.Trace().Stringer("url", u).Msg("URL has invalid scheme")
return nil // Technically not an error
}
// If we want to allow re-schedule of existing crawled resources we need to retrieve only resources
// that are newer than `now - refreshDelay`.
endDate := time.Time{}
if refreshDelay != -1 {
endDate = time.Now().Add(-refreshDelay)
// Make sure extension is not forbidden
for _, ext := range state.forbiddenExtensions {
if strings.HasSuffix(u.Path, "."+ext) {
log.Trace().
Stringer("url", u).
Str("ext", ext).
Msg("Skipping URL with forbidden extension")
return nil // Technically not an error
}
}
_, count, err := apiClient.SearchResources(u.String(), "", time.Time{}, endDate, 1, 1)
if err != nil {
return fmt.Errorf("error while searching resource (%s): %s", u, err)
}
// If we want to allow re-schedule of existing crawled resources we need to retrieve only resources
// that are newer than `now - refreshDelay`.
endDate := time.Time{}
if state.refreshDelay != -1 {
endDate = time.Now().Add(-state.refreshDelay)
}
// No matches: schedule!
if count == 0 {
log.Debug().Stringer("url", u).Msg("URL should be scheduled")
if err := sub.PublishMsg(&messaging.URLTodoMsg{URL: urlMsg.URL}); err != nil {
return fmt.Errorf("error while publishing URL: %s", err)
}
} else {
log.Trace().Stringer("url", u).Msg("URL should not be scheduled")
}
_, count, err := state.apiClient.SearchResources(u.String(), "", time.Time{}, endDate, 1, 1)
if err != nil {
return fmt.Errorf("error while searching resource (%s): %s", u, err)
}
return nil
// No matches: schedule!
if count == 0 {
log.Debug().Stringer("url", u).Msg("URL should be scheduled")
if err := subscriber.Publish(&event.NewURLEvent{URL: evt.URL}); err != nil {
return fmt.Errorf("error while publishing URL: %s", err)
}
} else {
log.Trace().Stringer("url", u).Msg("URL should not be scheduled")
}
return nil
}

@ -5,8 +5,8 @@ import (
"fmt"
"github.com/creekorful/trandoshan/api"
"github.com/creekorful/trandoshan/api_mock"
"github.com/creekorful/trandoshan/internal/messaging"
"github.com/creekorful/trandoshan/internal/messaging_mock"
"github.com/creekorful/trandoshan/internal/event"
"github.com/creekorful/trandoshan/internal/event_mock"
"github.com/golang/mock/gomock"
"testing"
"time"
@ -17,15 +17,21 @@ func TestHandleMessageNotOnion(t *testing.T) {
defer mockCtrl.Finish()
apiClientMock := api_mock.NewMockClient(mockCtrl)
subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl)
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
msg := bytes.NewReader(nil)
subscriberMock.EXPECT().
ReadMsg(msg, &messaging.URLFoundMsg{}).
SetArg(1, messaging.URLFoundMsg{URL: "https://example.org"}).
Read(msg, &event.FoundURLEvent{}).
SetArg(1, event.FoundURLEvent{URL: "https://example.org"}).
Return(nil)
if err := handleMessage(apiClientMock, -1, []string{})(subscriberMock, msg); err != nil {
s := state{
apiClient: apiClientMock,
refreshDelay: -1,
forbiddenExtensions: []string{},
}
if err := s.handleURLFoundEvent(subscriberMock, msg); err != nil {
t.FailNow()
}
}
@ -35,17 +41,23 @@ func TestHandleMessageWrongProtocol(t *testing.T) {
defer mockCtrl.Finish()
apiClientMock := api_mock.NewMockClient(mockCtrl)
subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl)
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
msg := bytes.NewReader(nil)
s := state{
apiClient: apiClientMock,
refreshDelay: -1,
forbiddenExtensions: []string{},
}
for _, protocol := range []string{"irc", "ftp"} {
subscriberMock.EXPECT().
ReadMsg(msg, &messaging.URLFoundMsg{}).
SetArg(1, messaging.URLFoundMsg{URL: fmt.Sprintf("%s://example.onion", protocol)}).
Read(msg, &event.FoundURLEvent{}).
SetArg(1, event.FoundURLEvent{URL: fmt.Sprintf("%s://example.onion", protocol)}).
Return(nil)
if err := handleMessage(apiClientMock, -1, []string{})(subscriberMock, msg); err != nil {
if err := s.handleURLFoundEvent(subscriberMock, msg); err != nil {
t.FailNow()
}
}
@ -56,19 +68,25 @@ func TestHandleMessageAlreadyCrawled(t *testing.T) {
defer mockCtrl.Finish()
apiClientMock := api_mock.NewMockClient(mockCtrl)
subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl)
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
msg := bytes.NewReader(nil)
subscriberMock.EXPECT().
ReadMsg(msg, &messaging.URLFoundMsg{}).
SetArg(1, messaging.URLFoundMsg{URL: "https://example.onion"}).
Read(msg, &event.FoundURLEvent{}).
SetArg(1, event.FoundURLEvent{URL: "https://example.onion"}).
Return(nil)
apiClientMock.EXPECT().
SearchResources("https://example.onion", "", time.Time{}, time.Time{}, 1, 1).
Return([]api.ResourceDto{}, int64(1), nil)
if err := handleMessage(apiClientMock, -1, []string{"png"})(subscriberMock, msg); err != nil {
s := state{
apiClient: apiClientMock,
refreshDelay: -1,
forbiddenExtensions: []string{"png"},
}
if err := s.handleURLFoundEvent(subscriberMock, msg); err != nil {
t.FailNow()
}
}
@ -78,15 +96,21 @@ func TestHandleMessageForbiddenExtensions(t *testing.T) {
defer mockCtrl.Finish()
apiClientMock := api_mock.NewMockClient(mockCtrl)
subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl)
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
msg := bytes.NewReader(nil)
subscriberMock.EXPECT().
ReadMsg(msg, &messaging.URLFoundMsg{}).
SetArg(1, messaging.URLFoundMsg{URL: "https://example.onion/image.png?id=12&test=2"}).
Read(msg, &event.FoundURLEvent{}).
SetArg(1, event.FoundURLEvent{URL: "https://example.onion/image.png?id=12&test=2"}).
Return(nil)
if err := handleMessage(apiClientMock, -1, []string{"png"})(subscriberMock, msg); err != nil {
s := state{
apiClient: apiClientMock,
refreshDelay: -1,
forbiddenExtensions: []string{"png"},
}
if err := s.handleURLFoundEvent(subscriberMock, msg); err != nil {
t.FailNow()
}
}
@ -96,12 +120,12 @@ func TestHandleMessage(t *testing.T) {
defer mockCtrl.Finish()
apiClientMock := api_mock.NewMockClient(mockCtrl)
subscriberMock := messaging_mock.NewMockSubscriber(mockCtrl)
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
msg := bytes.NewReader(nil)
subscriberMock.EXPECT().
ReadMsg(msg, &messaging.URLFoundMsg{}).
SetArg(1, messaging.URLFoundMsg{URL: "https://example.onion"}).
Read(msg, &event.FoundURLEvent{}).
SetArg(1, event.FoundURLEvent{URL: "https://example.onion"}).
Return(nil)
apiClientMock.EXPECT().
@ -109,10 +133,16 @@ func TestHandleMessage(t *testing.T) {
Return([]api.ResourceDto{}, int64(0), nil)
subscriberMock.EXPECT().
PublishMsg(&messaging.URLTodoMsg{URL: "https://example.onion"}).
Publish(&event.NewURLEvent{URL: "https://example.onion"}).
Return(nil)
if err := handleMessage(apiClientMock, -1, []string{})(subscriberMock, msg); err != nil {
s := state{
apiClient: apiClientMock,
refreshDelay: -1,
forbiddenExtensions: []string{},
}
if err := s.handleURLFoundEvent(subscriberMock, msg); err != nil {
t.FailNow()
}
}

Loading…
Cancel
Save