commit
7cbfbb7794
@ -0,0 +1,24 @@
|
||||
# build image
|
||||
FROM golang:1.15.0-alpine as builder
|
||||
|
||||
RUN apk update && apk upgrade && \
|
||||
apk add --no-cache bash git openssh
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy and download dependencies to cache them and faster build time
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
||||
COPY . .
|
||||
|
||||
# Test then build app
|
||||
RUN go build -v github.com/creekorful/trandoshan/cmd/tdsh-archiver
|
||||
|
||||
# runtime image
|
||||
FROM alpine:latest
|
||||
COPY --from=builder /app/tdsh-archiver /app/
|
||||
|
||||
WORKDIR /app/
|
||||
|
||||
ENTRYPOINT ["./tdsh-archiver"]
|
@ -0,0 +1,13 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/creekorful/trandoshan/internal/archiver"
|
||||
"os"
|
||||
)
|
||||
|
||||
func main() {
|
||||
app := archiver.GetApp()
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package api
|
||||
package rest
|
||||
|
||||
import (
|
||||
"fmt"
|
@ -1,105 +0,0 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"github.com/creekorful/trandoshan/api"
|
||||
"github.com/creekorful/trandoshan/internal/api/database"
|
||||
"github.com/creekorful/trandoshan/internal/messaging"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
type service interface {
|
||||
searchResources(params *database.ResSearchParams) ([]api.ResourceDto, int64, error)
|
||||
addResource(res api.ResourceDto) (api.ResourceDto, error)
|
||||
scheduleURL(url string) error
|
||||
close()
|
||||
}
|
||||
|
||||
type svc struct {
|
||||
db database.Database
|
||||
pub messaging.Publisher
|
||||
}
|
||||
|
||||
func newService(c *cli.Context) (service, error) {
|
||||
// Connect to the NATS server
|
||||
pub, err := messaging.NewPublisher(c.String("nats-uri"))
|
||||
if err != nil {
|
||||
log.Err(err).Str("uri", c.String("nats-uri")).Msg("Error while connecting to NATS server")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create Elasticsearch client
|
||||
db, err := database.NewElasticDB(c.String("elasticsearch-uri"))
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Error while connecting to the database")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &svc{
|
||||
db: db,
|
||||
pub: pub,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *svc) searchResources(params *database.ResSearchParams) ([]api.ResourceDto, int64, error) {
|
||||
totalCount, err := s.db.CountResources(params)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Error while counting on ES")
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
res, err := s.db.SearchResources(params)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Error while searching on ES")
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
var resources []api.ResourceDto
|
||||
for _, r := range res {
|
||||
resources = append(resources, api.ResourceDto{
|
||||
URL: r.URL,
|
||||
Body: r.Body,
|
||||
Title: r.Title,
|
||||
Time: r.Time,
|
||||
})
|
||||
}
|
||||
|
||||
return resources, totalCount, nil
|
||||
}
|
||||
|
||||
func (s *svc) addResource(res api.ResourceDto) (api.ResourceDto, error) {
|
||||
log.Debug().Str("url", res.URL).Msg("Saving resource")
|
||||
|
||||
// Create Elasticsearch document
|
||||
doc := database.ResourceIdx{
|
||||
URL: res.URL,
|
||||
Body: res.Body,
|
||||
Time: res.Time,
|
||||
Title: res.Title,
|
||||
Meta: res.Meta,
|
||||
Description: res.Description,
|
||||
}
|
||||
|
||||
if err := s.db.AddResource(doc); err != nil {
|
||||
log.Err(err).Msg("Error while adding resource")
|
||||
return api.ResourceDto{}, err
|
||||
}
|
||||
|
||||
log.Debug().Str("url", res.URL).Msg("Successfully saved resource")
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (s *svc) scheduleURL(url string) error {
|
||||
// Publish the URL
|
||||
if err := s.pub.PublishMsg(&messaging.URLFoundMsg{URL: url}); err != nil {
|
||||
log.Err(err).Msg("Unable to publish URL")
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug().Str("url", url).Msg("Successfully published URL")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *svc) close() {
|
||||
s.pub.Close()
|
||||
}
|
@ -0,0 +1,138 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/creekorful/trandoshan/api"
|
||||
"github.com/creekorful/trandoshan/internal/api/database"
|
||||
"github.com/creekorful/trandoshan/internal/duration"
|
||||
"github.com/creekorful/trandoshan/internal/event"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/urfave/cli/v2"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Service represent the functionality the API expose
|
||||
type Service struct {
|
||||
db database.Database
|
||||
pub event.Publisher
|
||||
refreshDelay time.Duration
|
||||
}
|
||||
|
||||
// New create a new Service instance
|
||||
func New(c *cli.Context) (*Service, error) {
|
||||
// Connect to the messaging server
|
||||
pub, err := event.NewPublisher(c.String("hub-uri"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error while connecting to hub server: %s", err)
|
||||
}
|
||||
|
||||
// Create Elasticsearch client
|
||||
db, err := database.NewElasticDB(c.String("elasticsearch-uri"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error while connecting to the database: %s", err)
|
||||
}
|
||||
|
||||
refreshDelay := duration.ParseDuration(c.String("refresh-delay"))
|
||||
|
||||
return &Service{
|
||||
db: db,
|
||||
pub: pub,
|
||||
refreshDelay: refreshDelay,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SearchResources allows to search resources using given params
|
||||
func (s *Service) SearchResources(params *api.ResSearchParams) ([]api.ResourceDto, int64, error) {
|
||||
totalCount, err := s.db.CountResources(params)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Error while counting on ES")
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
res, err := s.db.SearchResources(params)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Error while searching on ES")
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
var resources []api.ResourceDto
|
||||
for _, r := range res {
|
||||
resources = append(resources, api.ResourceDto{
|
||||
URL: r.URL,
|
||||
Body: r.Body,
|
||||
Title: r.Title,
|
||||
Time: r.Time,
|
||||
})
|
||||
}
|
||||
|
||||
return resources, totalCount, nil
|
||||
}
|
||||
|
||||
// AddResource allows to add given resource
|
||||
func (s *Service) AddResource(res api.ResourceDto) (api.ResourceDto, error) {
|
||||
log.Debug().Str("url", res.URL).Msg("Saving resource")
|
||||
|
||||
// Hacky stuff to prevent from adding 'duplicate resource'
|
||||
// the thing is: even with the scheduler preventing from crawling 'duplicates' URL by adding a refresh period
|
||||
// and checking if the resource is not already indexed, this implementation may not work if the URLs was published
|
||||
// before the resource is saved. And this happen a LOT of time.
|
||||
// therefore the best thing to do is to make the API check if the resource should **really** be added by checking if
|
||||
// it isn't present on the database. This may sounds hacky, but it's the best solution i've come up at this time.
|
||||
endDate := time.Time{}
|
||||
if s.refreshDelay != -1 {
|
||||
endDate = time.Now().Add(-s.refreshDelay)
|
||||
}
|
||||
|
||||
count, err := s.db.CountResources(&api.ResSearchParams{
|
||||
URL: res.URL,
|
||||
EndDate: endDate,
|
||||
PageSize: 1,
|
||||
PageNumber: 1,
|
||||
})
|
||||
if err != nil {
|
||||
log.Err(err).Msg("error while searching for resource")
|
||||
return api.ResourceDto{}, nil
|
||||
}
|
||||
|
||||
if count > 0 {
|
||||
// Not an error
|
||||
log.Debug().Str("url", res.URL).Msg("Skipping duplicate resource")
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// Create Elasticsearch document
|
||||
doc := database.ResourceIdx{
|
||||
URL: res.URL,
|
||||
Body: res.Body,
|
||||
Time: res.Time,
|
||||
Title: res.Title,
|
||||
Meta: res.Meta,
|
||||
Description: res.Description,
|
||||
Headers: res.Headers,
|
||||
}
|
||||
|
||||
if err := s.db.AddResource(doc); err != nil {
|
||||
log.Err(err).Msg("Error while adding resource")
|
||||
return api.ResourceDto{}, err
|
||||
}
|
||||
|
||||
log.Debug().Str("url", res.URL).Msg("Successfully saved resource")
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// ScheduleURL schedule given url for crawling
|
||||
func (s *Service) ScheduleURL(url string) error {
|
||||
// Publish the URL
|
||||
if err := s.pub.Publish(&event.FoundURLEvent{URL: url}); err != nil {
|
||||
log.Err(err).Msg("Unable to publish URL")
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug().Str("url", url).Msg("Successfully published URL")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close disconnect the service consumer
|
||||
func (s *Service) Close() {
|
||||
s.pub.Close()
|
||||
}
|
@ -0,0 +1,202 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"github.com/creekorful/trandoshan/api"
|
||||
"github.com/creekorful/trandoshan/internal/api/database"
|
||||
"github.com/creekorful/trandoshan/internal/api/database_mock"
|
||||
"github.com/creekorful/trandoshan/internal/event"
|
||||
"github.com/creekorful/trandoshan/internal/event_mock"
|
||||
"github.com/golang/mock/gomock"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestSearchResources(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
params := &api.ResSearchParams{Keyword: "example"}
|
||||
|
||||
dbMock := database_mock.NewMockDatabase(mockCtrl)
|
||||
|
||||
dbMock.EXPECT().CountResources(params).Return(int64(150), nil)
|
||||
dbMock.EXPECT().SearchResources(params).Return([]database.ResourceIdx{
|
||||
{
|
||||
URL: "example-1.onion",
|
||||
Body: "Example 1",
|
||||
Title: "Example 1",
|
||||
Time: time.Time{},
|
||||
},
|
||||
{
|
||||
URL: "example-2.onion",
|
||||
Body: "Example 2",
|
||||
Title: "Example 2",
|
||||
Time: time.Time{},
|
||||
},
|
||||
}, nil)
|
||||
|
||||
s := Service{db: dbMock}
|
||||
|
||||
res, count, err := s.SearchResources(params)
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
if count != 150 {
|
||||
t.Error()
|
||||
}
|
||||
if len(res) != 2 {
|
||||
t.Error()
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddResource(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
dbMock := database_mock.NewMockDatabase(mockCtrl)
|
||||
|
||||
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
|
||||
URL: "https://example.onion",
|
||||
PageSize: 1,
|
||||
PageNumber: 1,
|
||||
}}).Return(int64(0), nil)
|
||||
|
||||
dbMock.EXPECT().AddResource(database.ResourceIdx{
|
||||
URL: "https://example.onion",
|
||||
Body: "TheBody",
|
||||
Title: "Example",
|
||||
Time: time.Time{},
|
||||
Meta: map[string]string{"content": "content-meta"},
|
||||
Description: "the description",
|
||||
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
|
||||
})
|
||||
|
||||
s := Service{db: dbMock, refreshDelay: 5 * time.Hour}
|
||||
|
||||
res, err := s.AddResource(api.ResourceDto{
|
||||
URL: "https://example.onion",
|
||||
Body: "TheBody",
|
||||
Title: "Example",
|
||||
Time: time.Time{},
|
||||
Meta: map[string]string{"content": "content-meta"},
|
||||
Description: "the description",
|
||||
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
|
||||
})
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
if res.URL != "https://example.onion" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Body != "TheBody" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Title != "Example" {
|
||||
t.FailNow()
|
||||
}
|
||||
if !res.Time.IsZero() {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Meta["content"] != "content-meta" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Description != "the description" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Headers["Content-Type"] != "application/html" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Headers["Server"] != "Traefik" {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddResourceDuplicateNotAllowed(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
dbMock := database_mock.NewMockDatabase(mockCtrl)
|
||||
|
||||
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
|
||||
URL: "https://example.onion",
|
||||
PageSize: 1,
|
||||
PageNumber: 1,
|
||||
}, endDateZero: true}).Return(int64(1), nil)
|
||||
|
||||
s := Service{db: dbMock, refreshDelay: -1}
|
||||
|
||||
_, err := s.AddResource(api.ResourceDto{
|
||||
URL: "https://example.onion",
|
||||
Body: "TheBody",
|
||||
Title: "Example",
|
||||
Time: time.Time{},
|
||||
Meta: map[string]string{"content": "content-meta"},
|
||||
Description: "the description",
|
||||
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
|
||||
})
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddResourceTooYoung(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
dbMock := database_mock.NewMockDatabase(mockCtrl)
|
||||
|
||||
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
|
||||
URL: "https://example.onion",
|
||||
EndDate: time.Now().Add(-10 * time.Minute),
|
||||
PageSize: 1,
|
||||
PageNumber: 1,
|
||||
}}).Return(int64(1), nil)
|
||||
|
||||
s := Service{db: dbMock, refreshDelay: -10 * time.Minute}
|
||||
|
||||
_, err := s.AddResource(api.ResourceDto{
|
||||
URL: "https://example.onion",
|
||||
Body: "TheBody",
|
||||
Title: "Example",
|
||||
Time: time.Time{},
|
||||
Meta: map[string]string{"content": "content-meta"},
|
||||
Description: "the description",
|
||||
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
|
||||
})
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func TestScheduleURL(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
pubMock := event_mock.NewMockPublisher(mockCtrl)
|
||||
|
||||
s := Service{pub: pubMock}
|
||||
|
||||
pubMock.EXPECT().Publish(&event.FoundURLEvent{URL: "https://example.onion"})
|
||||
|
||||
if err := s.ScheduleURL("https://example.onion"); err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
// custom matcher to ignore time field when doing comparison ;(
|
||||
// todo: do less crappy?
|
||||
type searchParamsMatcher struct {
|
||||
target api.ResSearchParams
|
||||
endDateZero bool
|
||||
}
|
||||
|
||||
func (sm *searchParamsMatcher) Matches(x interface{}) bool {
|
||||
arg := x.(*api.ResSearchParams)
|
||||
return arg.URL == sm.target.URL && arg.PageSize == sm.target.PageSize && arg.PageNumber == sm.target.PageNumber &&
|
||||
sm.endDateZero == arg.EndDate.IsZero()
|
||||
}
|
||||
|
||||
func (sm *searchParamsMatcher) String() string {
|
||||
return "is valid search params"
|
||||
}
|
@ -1,114 +0,0 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"github.com/creekorful/trandoshan/api"
|
||||
"github.com/creekorful/trandoshan/internal/api/database"
|
||||
"github.com/creekorful/trandoshan/internal/api/database_mock"
|
||||
"github.com/creekorful/trandoshan/internal/messaging"
|
||||
"github.com/creekorful/trandoshan/internal/messaging_mock"
|
||||
"github.com/golang/mock/gomock"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestSearchResources(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
params := &database.ResSearchParams{Keyword: "example"}
|
||||
|
||||
dbMock := database_mock.NewMockDatabase(mockCtrl)
|
||||
|
||||
dbMock.EXPECT().CountResources(params).Return(int64(150), nil)
|
||||
dbMock.EXPECT().SearchResources(params).Return([]database.ResourceIdx{
|
||||
{
|
||||
URL: "example-1.onion",
|
||||
Body: "Example 1",
|
||||
Title: "Example 1",
|
||||
Time: time.Time{},
|
||||
},
|
||||
{
|
||||
URL: "example-2.onion",
|
||||
Body: "Example 2",
|
||||
Title: "Example 2",
|
||||
Time: time.Time{},
|
||||
},
|
||||
}, nil)
|
||||
|
||||
s := svc{db: dbMock}
|
||||
|
||||
res, count, err := s.searchResources(params)
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
if count != 150 {
|
||||
t.Error()
|
||||
}
|
||||
if len(res) != 2 {
|
||||
t.Error()
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddResource(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
dbMock := database_mock.NewMockDatabase(mockCtrl)
|
||||
|
||||
dbMock.EXPECT().AddResource(database.ResourceIdx{
|
||||
URL: "https://example.onion",
|
||||
Body: "TheBody",
|
||||
Title: "Example",
|
||||
Time: time.Time{},
|
||||
Meta: map[string]string{"content": "content-meta"},
|
||||
Description: "the description",
|
||||
})
|
||||
|
||||
s := svc{db: dbMock}
|
||||
|
||||
res, err := s.addResource(api.ResourceDto{
|
||||
URL: "https://example.onion",
|
||||
Body: "TheBody",
|
||||
Title: "Example",
|
||||
Time: time.Time{},
|
||||
Meta: map[string]string{"content": "content-meta"},
|
||||
Description: "the description",
|
||||
})
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
if res.URL != "https://example.onion" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Body != "TheBody" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Title != "Example" {
|
||||
t.FailNow()
|
||||
}
|
||||
if !res.Time.IsZero() {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Meta["content"] != "content-meta" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Description != "the description" {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func TestScheduleURL(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
pubMock := messaging_mock.NewMockPublisher(mockCtrl)
|
||||
|
||||
s := svc{pub: pubMock}
|
||||
|
||||
pubMock.EXPECT().PublishMsg(&messaging.URLFoundMsg{URL: "https://example.onion"})
|
||||
|
||||
if err := s.scheduleURL("https://example.onion"); err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
@ -0,0 +1,122 @@
|
||||
package archiver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/creekorful/trandoshan/internal/archiver/storage"
|
||||
"github.com/creekorful/trandoshan/internal/event"
|
||||
"github.com/creekorful/trandoshan/internal/logging"
|
||||
"github.com/creekorful/trandoshan/internal/util"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/urfave/cli/v2"
|
||||
"io"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// GetApp return the crawler app
|
||||
func GetApp() *cli.App {
|
||||
return &cli.App{
|
||||
Name: "tdsh-archiver",
|
||||
Version: "0.7.0",
|
||||
Usage: "Trandoshan archiver component",
|
||||
Flags: []cli.Flag{
|
||||
logging.GetLogFlag(),
|
||||
util.GetHubURI(),
|
||||
&cli.StringFlag{
|
||||
Name: "storage-dir",
|
||||
Usage: "Path to the storage directory",
|
||||
Required: true,
|
||||
},
|
||||
},
|
||||
Action: execute,
|
||||
}
|
||||
}
|
||||
|
||||
func execute(ctx *cli.Context) error {
|
||||
logging.ConfigureLogger(ctx)
|
||||
|
||||
log.Info().
|
||||
Str("ver", ctx.App.Version).
|
||||
Str("hub-uri", ctx.String("hub-uri")).
|
||||
Str("storage-dir", ctx.String("storage-dir")).
|
||||
Msg("Starting tdsh-archiver")
|
||||
|
||||
// Create the subscriber
|
||||
sub, err := event.NewSubscriber(ctx.String("hub-uri"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer sub.Close()
|
||||
|
||||
// Create local storage
|
||||
st, err := storage.NewLocalStorage(ctx.String("storage-dir"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
state := state{
|
||||
storage: st,
|
||||
}
|
||||
|
||||
if err := sub.SubscribeAsync(event.NewResourceExchange, "archivingQueue", state.handleNewResourceEvent); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info().Msg("Successfully initialized tdsh-archiver. Waiting for resources")
|
||||
|
||||
// Handle graceful shutdown
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// Block until we receive our signal.
|
||||
<-c
|
||||
|
||||
if err := sub.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type state struct {
|
||||
storage storage.Storage
|
||||
}
|
||||
|
||||
func (state *state) handleNewResourceEvent(subscriber event.Subscriber, body io.Reader) error {
|
||||
var evt event.NewResourceEvent
|
||||
if err := subscriber.Read(body, &evt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug().Str("url", evt.URL).Msg("Processing new resource")
|
||||
|
||||
res, err := formatResource(&evt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while formatting resource: %s", err)
|
||||
}
|
||||
|
||||
if err := state.storage.Store(evt.URL, evt.Time, res); err != nil {
|
||||
return fmt.Errorf("error while storing resource: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func formatResource(evt *event.NewResourceEvent) ([]byte, error) {
|
||||
builder := strings.Builder{}
|
||||
|
||||
// First headers
|
||||
for key, value := range evt.Headers {
|
||||
builder.WriteString(fmt.Sprintf("%s: %s\r\n", key, value))
|
||||
}
|
||||
|
||||
// Then separator for body
|
||||
builder.WriteString("\r\n")
|
||||
|
||||
// Then body
|
||||
builder.WriteString(evt.Body)
|
||||
|
||||
return []byte(builder.String()), nil
|
||||
}
|
@ -0,0 +1,56 @@
|
||||
package archiver
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/creekorful/trandoshan/internal/archiver/storage_mock"
|
||||
"github.com/creekorful/trandoshan/internal/event"
|
||||
"github.com/creekorful/trandoshan/internal/event_mock"
|
||||
"github.com/golang/mock/gomock"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestHandleNewResourceEvent(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
|
||||
storageMock := storage_mock.NewMockStorage(mockCtrl)
|
||||
|
||||
tn := time.Now()
|
||||
|
||||
msg := bytes.NewReader(nil)
|
||||
subscriberMock.EXPECT().
|
||||
Read(msg, &event.NewResourceEvent{}).
|
||||
SetArg(1, event.NewResourceEvent{
|
||||
URL: "https://example.onion",
|
||||
Body: "Hello, world",
|
||||
Headers: map[string]string{"Server": "Traefik", "Content-Type": "application/html"},
|
||||
Time: tn,
|
||||
}).Return(nil)
|
||||
|
||||
storageMock.EXPECT().Store("https://example.onion", tn, []byte("Server: Traefik\r\nContent-Type: application/html\r\n\r\nHello, world")).Return(nil)
|
||||
|
||||
s := state{storage: storageMock}
|
||||
if err := s.handleNewResourceEvent(subscriberMock, msg); err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestFormatResource(t *testing.T) {
|
||||
evt := &event.NewResourceEvent{
|
||||
URL: "https://google.com",
|
||||
Body: "Hello, world",
|
||||
Headers: map[string]string{"Server": "Traefik", "Content-Type": "text/html"},
|
||||
Time: time.Now(),
|
||||
}
|
||||
|
||||
res, err := formatResource(evt)
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
if string(res) != "Server: Traefik\r\nContent-Type: text/html\r\n\r\nHello, world" {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
@ -0,0 +1,68 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type localStorage struct {
|
||||
baseDir string
|
||||
}
|
||||
|
||||
// NewLocalStorage returns a new Storage that use local file system
|
||||
func NewLocalStorage(root string) (Storage, error) {
|
||||
return &localStorage{baseDir: root}, nil
|
||||
}
|
||||
|
||||
func (s *localStorage) Store(url string, time time.Time, body []byte) error {
|
||||
path, err := formatPath(url, time)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fullPath := filepath.Join(s.baseDir, path)
|
||||
dir := filepath.Dir(fullPath)
|
||||
|
||||
if err := os.MkdirAll(dir, 0750); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := ioutil.WriteFile(fullPath, body, 0640); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func formatPath(rawURL string, time time.Time) (string, error) {
|
||||
b := strings.Builder{}
|
||||
|
||||
u, err := url.Parse(rawURL)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Protocol
|
||||
b.WriteString(u.Scheme)
|
||||
b.WriteRune(os.PathSeparator)
|
||||
|
||||
// Hostname
|
||||
b.WriteString(u.Host)
|
||||
b.WriteRune(os.PathSeparator)
|
||||
|
||||
// Write path
|
||||
if uri := u.RequestURI(); uri != "/" {
|
||||
b.WriteString(strings.TrimPrefix(u.RequestURI(), "/"))
|
||||
b.WriteRune(os.PathSeparator)
|
||||
}
|
||||
|
||||
// Write unix time
|
||||
b.WriteString(fmt.Sprintf("%d", time.Unix()))
|
||||
|
||||
return b.String(), nil
|
||||
}
|
@ -0,0 +1,82 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestFormatPath(t *testing.T) {
|
||||
type test struct {
|
||||
url string
|
||||
time time.Time
|
||||
path string
|
||||
}
|
||||
|
||||
ti := time.Date(2020, time.October, 29, 12, 4, 9, 0, time.UTC)
|
||||
|
||||
tests := []test{
|
||||
{
|
||||
url: "https://google.com",
|
||||
time: ti,
|
||||
path: "https/google.com/1603973049",
|
||||
},
|
||||
{
|
||||
url: "http://facebook.com/admin/login.php?username=admin",
|
||||
time: ti,
|
||||
path: "http/facebook.com/admin/login.php?username=admin/1603973049",
|
||||
},
|
||||
{
|
||||
url: "http://thisisalonghostname.onion/admin/tools/list-accounts.php?token=123223453&username=test",
|
||||
time: ti,
|
||||
path: "http/thisisalonghostname.onion/admin/tools/list-accounts.php?token=123223453&username=test/1603973049",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
res, err := formatPath(test.url, test.time)
|
||||
if err != nil {
|
||||
t.Error()
|
||||
}
|
||||
|
||||
if res != test.path {
|
||||
t.Errorf("got: %s, want: %s", res, test.path)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocalStorage_Store(t *testing.T) {
|
||||
d, err := ioutil.TempDir("", "")
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
defer os.RemoveAll(d)
|
||||
|
||||
s := localStorage{baseDir: d}
|
||||
|
||||
ti := time.Date(2020, time.October, 29, 12, 4, 9, 0, time.UTC)
|
||||
|
||||
if err := s.Store("https://google.com", ti, []byte("Hello, world")); err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
p := filepath.Join(d, "https", "google.com", "1603973049")
|
||||
|
||||
inf, err := os.Stat(p)
|
||||
if err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
if inf.Mode() != 0640 {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadFile(p)
|
||||
if err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
if string(b) != "Hello, world" {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
package storage
|
||||
|
||||
import "time"
|
||||
|
||||
//go:generate mockgen -destination=../storage_mock/storage_mock.go -package=storage_mock . Storage
|
||||
|
||||
// Storage is a abstraction layer where we store resource
|
||||
type Storage interface {
|
||||
// Store the resource
|
||||
Store(url string, time time.Time, body []byte) error
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
package clock
|
||||
|
||||
//go:generate mockgen -destination=../clock_mock/client_mock.go -package=clock_mock . Clock
|
||||
|
||||
import "time"
|
||||
|
||||
// Clock is an interface to ease unit testing
|
||||
type Clock interface {
|
||||
// Now return current time
|
||||
Now() time.Time
|
||||
}
|
||||
|
||||
// SystemClock is a clock that use system time
|
||||
type SystemClock struct {
|
||||
}
|
||||
|
||||
// Now return now from system clock
|
||||
func (clock *SystemClock) Now() time.Time {
|
||||
return time.Now()
|
||||
}
|
@ -1 +1,130 @@
|
||||
package crawler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/creekorful/trandoshan/internal/clock_mock"
|
||||
"github.com/creekorful/trandoshan/internal/crawler/http_mock"
|
||||
"github.com/creekorful/trandoshan/internal/event"
|
||||
"github.com/creekorful/trandoshan/internal/event_mock"
|
||||
"github.com/golang/mock/gomock"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestCrawlURLForbiddenContentType(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
httpClientMock := http_mock.NewMockClient(mockCtrl)
|
||||
url := "https://example.onion"
|
||||
allowedContentTypes := []string{"text/plain"}
|
||||
|
||||
httpResponseMock := http_mock.NewMockResponse(mockCtrl)
|
||||
httpResponseMock.EXPECT().Headers().Return(map[string]string{"Content-Type": "image/png"})
|
||||
|
||||
httpClientMock.EXPECT().Get(url).Return(httpResponseMock, nil)
|
||||
|
||||
body, headers, err := crawURL(httpClientMock, url, allowedContentTypes)
|
||||
if body != "" || headers != nil || err == nil {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestCrawlURLSameContentType(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
httpClientMock := http_mock.NewMockClient(mockCtrl)
|
||||
url := "https://example.onion"
|
||||
allowedContentTypes := []string{"text/plain"}
|
||||
|
||||
httpResponseMock := http_mock.NewMockResponse(mockCtrl)
|
||||
httpResponseMock.EXPECT().Headers().Times(2).Return(map[string]string{"Content-Type": "text/plain"})
|
||||
httpResponseMock.EXPECT().Body().Return(strings.NewReader("Hello"))
|
||||
|
||||
httpClientMock.EXPECT().Get(url).Return(httpResponseMock, nil)
|
||||
|
||||
body, headers, err := crawURL(httpClientMock, url, allowedContentTypes)
|
||||
if err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
if body != "Hello" {
|
||||
t.Fail()
|
||||
}
|
||||
if len(headers) != 1 {
|
||||
t.Fail()
|
||||
}
|
||||
if headers["Content-Type"] != "text/plain" {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestCrawlURLNoContentTypeFiltering(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
httpClientMock := http_mock.NewMockClient(mockCtrl)
|
||||
url := "https://example.onion"
|
||||
allowedContentTypes := []string{""}
|
||||
|
||||
httpResponseMock := http_mock.NewMockResponse(mockCtrl)
|
||||
httpResponseMock.EXPECT().Headers().Times(2).Return(map[string]string{"Content-Type": "text/plain"})
|
||||
httpResponseMock.EXPECT().Body().Return(strings.NewReader("Hello"))
|
||||
|
||||
httpClientMock.EXPECT().Get(url).Return(httpResponseMock, nil)
|
||||
|
||||
body, headers, err := crawURL(httpClientMock, url, allowedContentTypes)
|
||||
if err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
if body != "Hello" {
|
||||
t.Fail()
|
||||
}
|
||||
if len(headers) != 1 {
|
||||
t.Fail()
|
||||
}
|
||||
if headers["Content-Type"] != "text/plain" {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleNewURLEvent(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
|
||||
httpClientMock := http_mock.NewMockClient(mockCtrl)
|
||||
httpResponseMock := http_mock.NewMockResponse(mockCtrl)
|
||||
clockMock := clock_mock.NewMockClock(mockCtrl)
|
||||
|
||||
msg := bytes.NewReader(nil)
|
||||
subscriberMock.EXPECT().
|
||||
Read(msg, &event.NewURLEvent{}).
|
||||
SetArg(1, event.NewURLEvent{URL: "https://example.onion/image.png?id=12&test=2"}).
|
||||
Return(nil)
|
||||
|
||||
httpResponseMock.EXPECT().Headers().Times(2).Return(map[string]string{"Content-Type": "text/plain", "Server": "Debian"})
|
||||
httpResponseMock.EXPECT().Body().Return(strings.NewReader("Hello"))
|
||||
|
||||
httpClientMock.EXPECT().Get("https://example.onion/image.png?id=12&test=2").Return(httpResponseMock, nil)
|
||||
|
||||
tn := time.Now()
|
||||
clockMock.EXPECT().Now().Return(tn)
|
||||
|
||||
subscriberMock.EXPECT().Publish(&event.NewResourceEvent{
|
||||
URL: "https://example.onion/image.png?id=12&test=2",
|
||||
Body: "Hello",
|
||||
Headers: map[string]string{"Content-Type": "text/plain", "Server": "Debian"},
|
||||
Time: tn,
|
||||
}).Return(nil)
|
||||
|
||||
s := state{
|
||||
httpClient: httpClientMock,
|
||||
allowedContentTypes: []string{"text/plain", "text/css"},
|
||||
clock: clockMock,
|
||||
}
|
||||
if err := s.handleNewURLEvent(subscriberMock, msg); err != nil {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,52 @@
|
||||
package http
|
||||
|
||||
//go:generate mockgen -destination=../http_mock/client_mock.go -package=http_mock . Client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/valyala/fasthttp"
|
||||
)
|
||||
|
||||
// Client is an HTTP client
|
||||
type Client interface {
|
||||
// Get the corresponding URL
|
||||
// this methods follows redirections
|
||||
Get(URL string) (Response, error)
|
||||
}
|
||||
|
||||
type client struct {
|
||||
c *fasthttp.Client
|
||||
}
|
||||
|
||||
// NewFastHTTPClient create a new Client using fasthttp.Client as backend
|
||||
func NewFastHTTPClient(c *fasthttp.Client) Client {
|
||||
return &client{c: c}
|
||||
}
|
||||
|
||||
func (c *client) Get(URL string) (Response, error) {
|
||||
req := fasthttp.AcquireRequest()
|
||||
resp := fasthttp.AcquireResponse()
|
||||
defer fasthttp.ReleaseRequest(req)
|
||||
defer fasthttp.ReleaseResponse(resp)
|
||||
|
||||
req.SetRequestURI(URL)
|
||||
|
||||
if err := c.c.Do(req, resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch code := resp.StatusCode(); {
|
||||
case code > 302:
|
||||
return nil, fmt.Errorf("non-managed error code %d", code)
|
||||
// follow redirect
|
||||
case code == 301 || code == 302:
|
||||
if location := string(resp.Header.Peek("Location")); location != "" {
|
||||
return c.Get(location)
|
||||
}
|
||||
}
|
||||
|
||||
r := &response{}
|
||||
resp.CopyTo(&r.raw)
|
||||
|
||||
return r, nil
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
package http
|
||||
|
||||
//go:generate mockgen -destination=../http_mock/response_mock.go -package=http_mock . Response
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/valyala/fasthttp"
|
||||
"io"
|
||||
)
|
||||
|
||||
// Response is an HTTP response
|
||||
type Response interface {
|
||||
// Headers returns the response headers
|
||||
Headers() map[string]string
|
||||
// Body return the response body
|
||||
Body() io.Reader
|
||||
}
|
||||
|
||||
type response struct {
|
||||
raw fasthttp.Response
|
||||
}
|
||||
|
||||
func (r *response) Headers() map[string]string {
|
||||
headers := map[string]string{}
|
||||
r.raw.Header.VisitAll(func(key, value []byte) {
|
||||
headers[string(key)] = string(value) // TODO manage multiple values?
|
||||
})
|
||||
return headers
|
||||
}
|
||||
|
||||
func (r *response) Body() io.Reader {
|
||||
return bytes.NewReader(r.raw.Body())
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package duration
|
||||
|
||||
import (
|
||||
"github.com/xhit/go-str2duration/v2"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ParseDuration parse given duration into time.Duration
|
||||
// or returns -1 if fails
|
||||
func ParseDuration(duration string) time.Duration {
|
||||
if duration == "" {
|
||||
return -1
|
||||
}
|
||||
|
||||
val, err := str2duration.ParseDuration(duration)
|
||||
if err != nil {
|
||||
return -1
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
package duration
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestParseDuration(t *testing.T) {
|
||||
if ParseDuration("") != -1 {
|
||||
t.Fail()
|
||||
}
|
||||
if ParseDuration("50s") != time.Second*50 {
|
||||
t.Fail()
|
||||
}
|
||||
if ParseDuration("50m") != time.Minute*50 {
|
||||
t.Fail()
|
||||
}
|
||||
if ParseDuration("50h") != time.Hour*50 {
|
||||
t.Fail()
|
||||
}
|
||||
if ParseDuration("50d") != time.Hour*24*50 {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
package event
|
||||
|
||||
import "time"
|
||||
|
||||
//go:generate mockgen -destination=../event_mock/event_mock.go -package=event_mock . Publisher,Subscriber
|
||||
|
||||
const (
|
||||
// NewURLExchange is the subject used when an URL is schedule for crawling
|
||||
NewURLExchange = "url.new"
|
||||
// FoundURLExchange is the subject used when an URL is extracted from resource
|
||||
FoundURLExchange = "url.found"
|
||||
// NewResourceExchange is the subject used when a new resource has been crawled
|
||||
NewResourceExchange = "resource.new"
|
||||
)
|
||||
|
||||
// Event represent a event
|
||||
type Event interface {
|
||||
// Exchange returns the exchange where event should be push
|
||||
Exchange() string
|
||||
}
|
||||
|
||||
// NewURLEvent represent an URL to crawl
|
||||
type NewURLEvent struct {
|
||||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
// Exchange returns the exchange where event should be push
|
||||
func (msg *NewURLEvent) Exchange() string {
|
||||
return NewURLExchange
|
||||
}
|
||||
|
||||
// FoundURLEvent represent a found URL
|
||||
type FoundURLEvent struct {
|
||||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
// Exchange returns the exchange where event should be push
|
||||
func (msg *FoundURLEvent) Exchange() string {
|
||||
return FoundURLExchange
|
||||
}
|
||||
|
||||
// NewResourceEvent represent a crawled resource
|
||||
type NewResourceEvent struct {
|
||||
URL string `json:"url"`
|
||||
Body string `json:"body"`
|
||||
Headers map[string]string `json:"headers"`
|
||||
Time time.Time `json:"time"`
|
||||
}
|
||||
|
||||
// Exchange returns the exchange where event should be push
|
||||
func (msg *NewResourceEvent) Exchange() string {
|
||||
return NewResourceExchange
|
||||
}
|
@ -0,0 +1,55 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
// Publisher is something that push an event
|
||||
type Publisher interface {
|
||||
Publish(event Event) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
type publisher struct {
|
||||
channel *amqp.Channel
|
||||
}
|
||||
|
||||
// NewPublisher create a new Publisher instance
|
||||
func NewPublisher(amqpURI string) (Publisher, error) {
|
||||
conn, err := amqp.Dial(amqpURI)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := conn.Channel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &publisher{
|
||||
channel: c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *publisher) Publish(event Event) error {
|
||||
return publishJSON(p.channel, event.Exchange(), event)
|
||||
}
|
||||
|
||||
func (p *publisher) Close() error {
|
||||
return p.channel.Close()
|
||||
}
|
||||
|
||||
func publishJSON(rc *amqp.Channel, exchange string, event interface{}) error {
|
||||
evtBytes, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while encoding event: %s", err)
|
||||
}
|
||||
|
||||
return rc.Publish(exchange, "", false, false, amqp.Publishing{
|
||||
ContentType: "application/json",
|
||||
Body: evtBytes,
|
||||
DeliveryMode: amqp.Persistent,
|
||||
})
|
||||
}
|
@ -0,0 +1,105 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/streadway/amqp"
|
||||
"io"
|
||||
)
|
||||
|
||||
// Handler represent an event handler
|
||||
type Handler func(Subscriber, io.Reader) error
|
||||
|
||||
// Subscriber is something that read msg from an event queue
|
||||
type Subscriber interface {
|
||||
Publisher
|
||||
|
||||
Read(body io.Reader, event Event) error
|
||||
SubscribeAsync(exchange, queue string, handler Handler) error
|
||||
}
|
||||
|
||||
// Subscriber represent a subscriber
|
||||
type subscriber struct {
|
||||
channel *amqp.Channel
|
||||
}
|
||||
|
||||
// NewSubscriber create a new subscriber and connect it to given server
|
||||
func NewSubscriber(amqpURI string) (Subscriber, error) {
|
||||
conn, err := amqp.Dial(amqpURI)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := conn.Channel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := c.Qos(1, 0, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &subscriber{
|
||||
channel: c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *subscriber) Publish(event Event) error {
|
||||
return publishJSON(s.channel, event.Exchange(), event)
|
||||
}
|
||||
|
||||
func (s *subscriber) Close() error {
|
||||
return s.channel.Close()
|
||||
}
|
||||
|
||||
func (s *subscriber) Read(body io.Reader, event Event) error {
|
||||
return readJSON(body, event)
|
||||
}
|
||||
|
||||
func (s *subscriber) SubscribeAsync(exchange, queue string, handler Handler) error {
|
||||
// First of all declare the exchange
|
||||
if err := s.channel.ExchangeDeclare(exchange, amqp.ExchangeFanout, true, false, false, false, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Then declare the queue
|
||||
q, err := s.channel.QueueDeclare(queue, true, false, false, false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Bind the queue to the exchange
|
||||
if err := s.channel.QueueBind(q.Name, "", exchange, false, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start consuming asynchronously
|
||||
deliveries, err := s.channel.Consume(q.Name, "", false, false, false, false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for delivery := range deliveries {
|
||||
if err := handler(s, bytes.NewReader(delivery.Body)); err != nil {
|
||||
log.Err(err).Msg("error while processing event")
|
||||
}
|
||||
|
||||
// Ack no matter what happen since we doesn't care about failing event (yet?)
|
||||
if err := delivery.Ack(false); err != nil {
|
||||
log.Err(err).Msg("error while acknowledging event")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func readJSON(body io.Reader, event interface{}) error {
|
||||
if err := json.NewDecoder(body).Decode(event); err != nil {
|
||||
return fmt.Errorf("error while decoding event: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -1,7 +0,0 @@
|
||||
package messaging
|
||||
|
||||
// Msg represent a message send-able trough NATS
|
||||
type Msg interface {
|
||||
// Subject returns the subject where message should be push
|
||||
Subject() string
|
||||
}
|
@ -1,43 +0,0 @@
|
||||
package messaging
|
||||
|
||||
//go:generate mockgen -destination=../messaging_mock/publisher_mock.go -package=messaging_mock . Publisher,Subscriber
|
||||
|
||||
const (
|
||||
// URLTodoSubject is the subject used when an URL is schedule for crawling
|
||||
URLTodoSubject = "url.todo"
|
||||
// URLFoundSubject is the subject used when an URL is extracted from resource
|
||||
URLFoundSubject = "url.found"
|
||||
// NewResourceSubject is the subject used when a new resource has been crawled
|
||||
NewResourceSubject = "resource.new"
|
||||
)
|
||||
|
||||
// URLTodoMsg represent an URL to crawl
|
||||
type URLTodoMsg struct {
|
||||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
// Subject returns the subject where message should be push
|
||||
func (msg *URLTodoMsg) Subject() string {
|
||||
return URLTodoSubject
|
||||
}
|
||||
|
||||
// URLFoundMsg represent a found URL
|
||||
type URLFoundMsg struct {
|
||||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
// Subject returns the subject where message should be push
|
||||
func (msg *URLFoundMsg) Subject() string {
|
||||
return URLFoundSubject
|
||||
}
|
||||
|
||||
// NewResourceMsg represent a crawled resource
|
||||
type NewResourceMsg struct {
|
||||
URL string `json:"url"`
|
||||
Body string `json:"body"`
|
||||
}
|
||||
|
||||
// Subject returns the subject where message should be push
|
||||
func (msg *NewResourceMsg) Subject() string {
|
||||
return NewResourceSubject
|
||||
}
|
@ -1,46 +0,0 @@
|
||||
package messaging
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
// Publisher is something that push msg to an event queue
|
||||
type Publisher interface {
|
||||
PublishMsg(msg Msg) error
|
||||
Close()
|
||||
}
|
||||
|
||||
type publisher struct {
|
||||
nc *nats.Conn
|
||||
}
|
||||
|
||||
// NewPublisher create a new Publisher instance
|
||||
func NewPublisher(natsURI string) (Publisher, error) {
|
||||
nc, err := nats.Connect(natsURI)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &publisher{
|
||||
nc: nc,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *publisher) PublishMsg(msg Msg) error {
|
||||
return publishJSON(p.nc, msg.Subject(), msg)
|
||||
}
|
||||
|
||||
func (p *publisher) Close() {
|
||||
p.nc.Close()
|
||||
}
|
||||
|
||||
func publishJSON(nc *nats.Conn, subject string, msg interface{}) error {
|
||||
msgBytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while encoding message: %s", err)
|
||||
}
|
||||
|
||||
return nc.Publish(subject, msgBytes)
|
||||
}
|
@ -1,81 +0,0 @@
|
||||
package messaging
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// MsgHandler represent an handler for a NATS subscriber
|
||||
type MsgHandler func(s Subscriber, msg *nats.Msg) error
|
||||
|
||||
// Subscriber is something that read msg from an event queue
|
||||
type Subscriber interface {
|
||||
Publisher
|
||||
|
||||
ReadMsg(natsMsg *nats.Msg, msg Msg) error
|
||||
QueueSubscribe(subject, queue string, handler MsgHandler) error
|
||||
Close()
|
||||
}
|
||||
|
||||
// Subscriber represent a NATS subscriber
|
||||
type subscriber struct {
|
||||
nc *nats.Conn
|
||||
}
|
||||
|
||||
// NewSubscriber create a new subscriber and connect it to given NATS server
|
||||
func NewSubscriber(address string) (Subscriber, error) {
|
||||
nc, err := nats.Connect(address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &subscriber{
|
||||
nc: nc,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *subscriber) ReadMsg(natsMsg *nats.Msg, msg Msg) error {
|
||||
return readJSON(natsMsg, msg)
|
||||
}
|
||||
|
||||
func (s *subscriber) QueueSubscribe(subject, queue string, handler MsgHandler) error {
|
||||
// Create the subscriber
|
||||
sub, err := s.nc.QueueSubscribeSync(subject, queue)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
// Read incoming message
|
||||
msg, err := sub.NextMsgWithContext(context.Background())
|
||||
if err != nil {
|
||||
log.Warn().Str("err", err.Error()).Msg("error while reading incoming message, skipping it")
|
||||
continue
|
||||
}
|
||||
|
||||
// ... And process it
|
||||
if err := handler(s, msg); err != nil {
|
||||
log.Err(err).Msg("error while processing message")
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subscriber) PublishMsg(msg Msg) error {
|
||||
return publishJSON(s.nc, msg.Subject(), msg)
|
||||
}
|
||||
|
||||
func (s *subscriber) Close() {
|
||||
s.nc.Close()
|
||||
}
|
||||
|
||||
func readJSON(msg *nats.Msg, body interface{}) error {
|
||||
if err := json.Unmarshal(msg.Data, body); err != nil {
|
||||
return fmt.Errorf("error while decoding message: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
package util
|
||||
|
||||
import "github.com/urfave/cli/v2"
|
||||
|
||||
// GetHubURI return the URI of the hub (event) server
|
||||
func GetHubURI() *cli.StringFlag {
|
||||
return &cli.StringFlag{
|
||||
Name: "hub-uri",
|
||||
Usage: "URI to the hub (event) server",
|
||||
Required: true,
|
||||
}
|
||||
}
|
@ -1,12 +0,0 @@
|
||||
package util
|
||||
|
||||
import "github.com/urfave/cli/v2"
|
||||
|
||||
// GetNATSURIFlag return the nats uri from cli flag
|
||||
func GetNATSURIFlag() *cli.StringFlag {
|
||||
return &cli.StringFlag{
|
||||
Name: "nats-uri",
|
||||
Usage: "URI to the NATS server",
|
||||
Required: true,
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue