diff --git a/README.md b/README.md index 135adbb..a8e9104 100644 --- a/README.md +++ b/README.md @@ -30,50 +30,16 @@ and wait for all containers to start. # How to initiate crawling -Since the API is exposed on localhost:15005, one can use it to start crawling: +One can use the RabbitMQ dashhboard available at localhost:15003, and publish a new JSON object in the **crawlingQueue**. -using trandoshanctl executable: +The object should look like this: -```sh -$ trandoshanctl --api-token schedule https://www.facebookcorewwwi.onion -``` - -or using the docker image: - -```sh -$ docker run creekorful/trandoshanctl --api-token --api-uri schedule https://www.facebookcorewwwi.onion -``` - -(you'll need to specify the api uri if you use the docker container) - -this will schedule given URL for crawling. - -## Example token - -Here's a working API token that you can use with trandoshanctl if you haven't changed the API signing key: - -``` -eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6InRyYW5kb3NoYW5jdGwiLCJyaWdodHMiOnsiUE9TVCI6WyIvdjEvdXJscyJdLCJHRVQiOlsiL3YxL3Jlc291cmNlcyJdfX0.jGA8WODYKtKy7ZijngoV8C3iWi1eTvMitA8Z1Is2GUg -``` - -This token is the representation of the following payload: - -``` +```json { - "username": "trandoshanctl", - "rights": { - "POST": [ - "/v1/urls" - ], - "GET": [ - "/v1/resources" - ] - } + "url": "https://facebookcorewwwi.onion" } ``` -you may create your own tokens with the rights needed. In the future a CLI tool will allow token generation easily. - ## How to speed up crawling If one want to speed up the crawling, he can scale the instance of crawling component in order to increase performances. @@ -87,14 +53,6 @@ this will set the number of crawler instance to 5. # How to view results -## Using trandoshanctl - -```sh -$ trandoshanctl search -``` - -## Using kibana - You can use the Kibana dashboard available at http://localhost:15004. You will need to create an index pattern named ' resources', and when it asks for the time field, choose 'time'. diff --git a/build/docker/Dockerfile.tdsh-archiver b/build/docker/Dockerfile.tdsh-archiver deleted file mode 100644 index 8041833..0000000 --- a/build/docker/Dockerfile.tdsh-archiver +++ /dev/null @@ -1,24 +0,0 @@ -# build image -FROM golang:1.15.0-alpine as builder - -RUN apk update && apk upgrade && \ - apk add --no-cache bash git openssh - -WORKDIR /app - -# Copy and download dependencies to cache them and faster build time -COPY go.mod go.sum ./ -RUN go mod download - -COPY . . - -# Test then build app -RUN go build -v github.com/creekorful/trandoshan/cmd/tdsh-archiver - -# runtime image -FROM alpine:latest -COPY --from=builder /app/tdsh-archiver /app/ - -WORKDIR /app/ - -ENTRYPOINT ["./tdsh-archiver"] \ No newline at end of file diff --git a/build/docker/Dockerfile.trandoshanctl b/build/docker/Dockerfile.trandoshanctl deleted file mode 100644 index 11a4b90..0000000 --- a/build/docker/Dockerfile.trandoshanctl +++ /dev/null @@ -1,24 +0,0 @@ -# build image -FROM golang:1.15.0-alpine as builder - -RUN apk update && apk upgrade && \ - apk add --no-cache bash git openssh - -WORKDIR /app - -# Copy and download dependencies to cache them and faster build time -COPY go.mod go.sum ./ -RUN go mod download - -COPY . . - -# Test then build app -RUN go build -v github.com/creekorful/trandoshan/cmd/trandoshanctl - -# runtime image -FROM alpine:latest -COPY --from=builder /app/trandoshanctl /app/ - -WORKDIR /app/ - -ENTRYPOINT ["./trandoshanctl"] \ No newline at end of file diff --git a/cmd/tdsh-archiver/tdsh-archiver.go b/cmd/tdsh-archiver/tdsh-archiver.go deleted file mode 100644 index 6b38e09..0000000 --- a/cmd/tdsh-archiver/tdsh-archiver.go +++ /dev/null @@ -1,14 +0,0 @@ -package main - -import ( - "github.com/creekorful/trandoshan/internal/archiver" - "github.com/creekorful/trandoshan/internal/process" - "os" -) - -func main() { - app := process.MakeApp(&archiver.State{}) - if err := app.Run(os.Args); err != nil { - os.Exit(1) - } -} diff --git a/cmd/trandoshanctl/trandoshanctl.go b/cmd/trandoshanctl/trandoshanctl.go deleted file mode 100644 index afe6c14..0000000 --- a/cmd/trandoshanctl/trandoshanctl.go +++ /dev/null @@ -1,13 +0,0 @@ -package main - -import ( - "github.com/creekorful/trandoshan/internal/trandoshanctl" - "os" -) - -func main() { - app := trandoshanctl.GetApp() - if err := app.Run(os.Args); err != nil { - os.Exit(1) - } -} diff --git a/deployments/docker/docker-compose.yml b/deployments/docker/docker-compose.yml index 0e5bb9f..d9ebbde 100644 --- a/deployments/docker/docker-compose.yml +++ b/deployments/docker/docker-compose.yml @@ -45,47 +45,47 @@ services: --log-level debug --hub-uri amqp://guest:guest@rabbitmq:5672 --config-api-uri http://configapi:8080 + --redis-uri redis:6379 restart: always depends_on: - rabbitmq - archiver: - image: creekorful/tdsh-archiver:latest + indexer-local: + image: creekorful/tdsh-indexer:latest command: > --log-level debug --hub-uri amqp://guest:guest@rabbitmq:5672 - --storage-dir /archive + --config-api-uri http://configapi:8080 + --index-driver local + --index-dest /archive restart: always volumes: - archiverdata:/archive depends_on: - rabbitmq - indexer: + - configapi + indexer-es: image: creekorful/tdsh-indexer:latest command: > --log-level debug --hub-uri amqp://guest:guest@rabbitmq:5672 - --elasticsearch-uri http://elasticsearch:9200 - --signing-key K==M5RsU_DQa4_XSbkX?L27s^xWmde25 --config-api-uri http://configapi:8080 - --redis-uri redis:6379 + --index-driver elastic + --index-dest http://elasticsearch:9200 restart: always depends_on: - rabbitmq - elasticsearch - configapi - - redis - ports: - - 15005:8080 configapi: image: creekorful/tdsh-configapi:latest command: > --log-level debug --hub-uri amqp://guest:guest@rabbitmq:5672 --redis-uri redis:6379 - --default-value forbidden-hostnames="[{\"hostname\": \"facebookcorewwwi.onion\"}, {\"hostname\": \"nytimes3xbfgragh.onion\"}]" + --default-value forbidden-hostnames="[]" --default-value allowed-mime-types="[{\"content-type\":\"text/\",\"extensions\":[\"html\",\"php\",\"aspx\", \"htm\"]}]" --default-value refresh-delay="{\"delay\": -1}" - --default-value blacklist-threshold="{\"threshold\": 10}" + --default-value blacklist-threshold="{\"threshold\": 5}" restart: always depends_on: - rabbitmq @@ -99,11 +99,13 @@ services: --hub-uri amqp://guest:guest@rabbitmq:5672 --config-api-uri http://configapi:8080 --redis-uri redis:6379 + --tor-uri torproxy:9050 restart: always depends_on: - rabbitmq - configapi - redis + - torproxy volumes: esdata: @@ -113,4 +115,4 @@ volumes: archiverdata: driver: local redisdata: - driver: local \ No newline at end of file + driver: local diff --git a/docs/architecture.png b/docs/architecture.png index 7a4090f..af552ee 100644 Binary files a/docs/architecture.png and b/docs/architecture.png differ diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go deleted file mode 100644 index 4c2d795..0000000 --- a/internal/archiver/archiver.go +++ /dev/null @@ -1,99 +0,0 @@ -package archiver - -import ( - "fmt" - "github.com/creekorful/trandoshan/internal/archiver/storage" - "github.com/creekorful/trandoshan/internal/event" - "github.com/creekorful/trandoshan/internal/process" - "github.com/rs/zerolog/log" - "github.com/urfave/cli/v2" - "net/http" - "strings" -) - -// State represent the application state -type State struct { - storage storage.Storage -} - -// Name return the process name -func (state *State) Name() string { - return "archiver" -} - -// CommonFlags return process common flags -func (state *State) CommonFlags() []string { - return []string{process.HubURIFlag} -} - -// CustomFlags return process custom flags -func (state *State) CustomFlags() []cli.Flag { - return []cli.Flag{ - &cli.StringFlag{ - Name: "storage-dir", - Usage: "Path to the storage directory", - Required: true, - }, - } -} - -// Initialize the process -func (state *State) Initialize(provider process.Provider) error { - st, err := storage.NewLocalStorage(provider.GetValue("storage-dir")) - if err != nil { - return err - } - state.storage = st - - return nil -} - -// Subscribers return the process subscribers -func (state *State) Subscribers() []process.SubscriberDef { - return []process.SubscriberDef{ - {Exchange: event.NewIndexExchange, Queue: "archivingQueue", Handler: state.handleNewIndexEvent}, - } -} - -// HTTPHandler returns the HTTP API the process expose -func (state *State) HTTPHandler(provider process.Provider) http.Handler { - return nil -} - -func (state *State) handleNewIndexEvent(subscriber event.Subscriber, msg event.RawMessage) error { - var evt event.NewIndexEvent - if err := subscriber.Read(&msg, &evt); err != nil { - return err - } - - res, err := formatResource(&evt) - if err != nil { - return fmt.Errorf("error while formatting resource: %s", err) - } - - if err := state.storage.Store(evt.URL, evt.Time, res); err != nil { - return fmt.Errorf("error while storing resource: %s", err) - } - - log.Debug().Str("url", evt.URL).Msg("Successfully archived resource") - - return nil -} - -func formatResource(evt *event.NewIndexEvent) ([]byte, error) { - builder := strings.Builder{} - - // First URL - builder.WriteString(fmt.Sprintf("%s\n\n", evt.URL)) - - // Then headers - for key, value := range evt.Headers { - builder.WriteString(fmt.Sprintf("%s: %s\n", key, value)) - } - builder.WriteString("\n") - - // Then body - builder.WriteString(evt.Body) - - return []byte(builder.String()), nil -} diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go deleted file mode 100644 index 19b01e5..0000000 --- a/internal/archiver/archiver_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package archiver - -import ( - "github.com/creekorful/trandoshan/internal/archiver/storage_mock" - "github.com/creekorful/trandoshan/internal/event" - "github.com/creekorful/trandoshan/internal/event_mock" - "github.com/golang/mock/gomock" - "testing" - "time" -) - -func TestHandleNewResourceEvent(t *testing.T) { - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - - subscriberMock := event_mock.NewMockSubscriber(mockCtrl) - storageMock := storage_mock.NewMockStorage(mockCtrl) - - tn := time.Now() - - msg := event.RawMessage{} - subscriberMock.EXPECT(). - Read(&msg, &event.NewIndexEvent{}). - SetArg(1, event.NewIndexEvent{ - URL: "https://example.onion", - Body: "Hello, world", - Headers: map[string]string{"Server": "Traefik", "Content-Type": "application/html"}, - Time: tn, - }).Return(nil) - - storageMock.EXPECT().Store("https://example.onion", tn, []byte("https://example.onion\n\nServer: Traefik\nContent-Type: application/html\n\nHello, world")).Return(nil) - - s := State{storage: storageMock} - if err := s.handleNewIndexEvent(subscriberMock, msg); err != nil { - t.Fail() - } -} - -func TestFormatResource(t *testing.T) { - evt := &event.NewIndexEvent{ - URL: "https://google.com", - Body: "Hello, world", - Headers: map[string]string{"Server": "Traefik", "Content-Type": "text/html"}, - Time: time.Now(), - } - - res, err := formatResource(evt) - if err != nil { - t.FailNow() - } - - if string(res) != "https://google.com\n\nServer: Traefik\nContent-Type: text/html\n\nHello, world" { - t.Fail() - } -} diff --git a/internal/archiver/storage/storage.go b/internal/archiver/storage/storage.go deleted file mode 100644 index d1525a3..0000000 --- a/internal/archiver/storage/storage.go +++ /dev/null @@ -1,11 +0,0 @@ -package storage - -import "time" - -//go:generate mockgen -destination=../storage_mock/storage_mock.go -package=storage_mock . Storage - -// Storage is a abstraction layer where we store resource -type Storage interface { - // Store the resource - Store(url string, time time.Time, body []byte) error -} diff --git a/internal/blacklister/blacklister.go b/internal/blacklister/blacklister.go index d31fdf6..bde6651 100644 --- a/internal/blacklister/blacklister.go +++ b/internal/blacklister/blacklister.go @@ -5,6 +5,7 @@ import ( "github.com/creekorful/trandoshan/internal/cache" configapi "github.com/creekorful/trandoshan/internal/configapi/client" "github.com/creekorful/trandoshan/internal/event" + chttp "github.com/creekorful/trandoshan/internal/http" "github.com/creekorful/trandoshan/internal/process" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" @@ -12,10 +13,13 @@ import ( "net/url" ) +var errAlreadyBlacklisted = fmt.Errorf("hostname is already blacklisted") + // State represent the application state type State struct { configClient configapi.Client hostnameCache cache.Cache + httpClient chttp.Client } // Name return the process name @@ -25,7 +29,7 @@ func (state *State) Name() string { // CommonFlags return process common flags func (state *State) CommonFlags() []string { - return []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag} + return []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag, process.UserAgentFlag, process.TorURIFlag} } // CustomFlags return process custom flags @@ -35,17 +39,23 @@ func (state *State) CustomFlags() []cli.Flag { // Initialize the process func (state *State) Initialize(provider process.Provider) error { - hostnameCache, err := provider.Cache() + hostnameCache, err := provider.Cache("down-hostname") if err != nil { return err } state.hostnameCache = hostnameCache - client, err := provider.ConfigClient([]string{configapi.ForbiddenHostnamesKey, configapi.BlackListThresholdKey}) + configClient, err := provider.ConfigClient([]string{configapi.ForbiddenHostnamesKey, configapi.BlackListThresholdKey}) + if err != nil { + return err + } + state.configClient = configClient + + httpClient, err := provider.HTTPClient() if err != nil { return err } - state.configClient = client + state.httpClient = httpClient return nil } @@ -58,7 +68,7 @@ func (state *State) Subscribers() []process.SubscriberDef { } // HTTPHandler returns the HTTP API the process expose -func (state *State) HTTPHandler(provider process.Provider) http.Handler { +func (state *State) HTTPHandler() http.Handler { return nil } @@ -73,12 +83,41 @@ func (state *State) handleTimeoutURLEvent(subscriber event.Subscriber, msg event return err } + // Make sure hostname is not already 'blacklisted' + forbiddenHostnames, err := state.configClient.GetForbiddenHostnames() + if err != nil { + return err + } + + // prevent duplicates + found := false + for _, hostname := range forbiddenHostnames { + if hostname.Hostname == u.Hostname() { + found = true + break + } + } + + if found { + return fmt.Errorf("%s %w", u.Hostname(), errAlreadyBlacklisted) + } + + // Check by ourselves if the hostname doesn't respond + _, err = state.httpClient.Get(fmt.Sprintf("%s://%s", u.Scheme, u.Host)) + if err == nil || err != chttp.ErrTimeout { + return nil + } + + log.Debug(). + Str("hostname", u.Hostname()). + Msg("Timeout confirmed") + threshold, err := state.configClient.GetBlackListThreshold() if err != nil { return err } - cacheKey := fmt.Sprintf("hostnames:%s", u.Hostname()) + cacheKey := u.Hostname() count, err := state.hostnameCache.GetInt64(cacheKey) if err != nil && err != cache.ErrNIL { return err @@ -86,11 +125,6 @@ func (state *State) handleTimeoutURLEvent(subscriber event.Subscriber, msg event count++ if count >= threshold.Threshold { - log.Info(). - Str("hostname", u.Hostname()). - Int64("count", count). - Msg("Blacklisting hostname") - forbiddenHostnames, err := state.configClient.GetForbiddenHostnames() if err != nil { return err @@ -106,8 +140,13 @@ func (state *State) handleTimeoutURLEvent(subscriber event.Subscriber, msg event } if found { - log.Trace().Str("hostname", u.Hostname()).Msg("skipping duplicate hostname") + log.Trace().Str("hostname", u.Hostname()).Msg("Skipping duplicate hostname") } else { + log.Info(). + Str("hostname", u.Hostname()). + Int64("count", count). + Msg("Blacklisting hostname") + forbiddenHostnames = append(forbiddenHostnames, configapi.ForbiddenHostname{Hostname: u.Hostname()}) if err := state.configClient.Set(configapi.ForbiddenHostnamesKey, forbiddenHostnames); err != nil { return err diff --git a/internal/blacklister/blacklister_test.go b/internal/blacklister/blacklister_test.go index 8132d63..fe3a78f 100644 --- a/internal/blacklister/blacklister_test.go +++ b/internal/blacklister/blacklister_test.go @@ -1,16 +1,80 @@ package blacklister import ( + "errors" "github.com/creekorful/trandoshan/internal/cache" "github.com/creekorful/trandoshan/internal/cache_mock" configapi "github.com/creekorful/trandoshan/internal/configapi/client" "github.com/creekorful/trandoshan/internal/configapi/client_mock" "github.com/creekorful/trandoshan/internal/event" "github.com/creekorful/trandoshan/internal/event_mock" + "github.com/creekorful/trandoshan/internal/http" + "github.com/creekorful/trandoshan/internal/http_mock" + "github.com/creekorful/trandoshan/internal/process" + "github.com/creekorful/trandoshan/internal/process_mock" + "github.com/creekorful/trandoshan/internal/test" "github.com/golang/mock/gomock" "testing" ) +func TestState_Name(t *testing.T) { + s := State{} + if s.Name() != "blacklister" { + t.Fail() + } +} + +func TestState_CommonFlags(t *testing.T) { + s := State{} + test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag, process.UserAgentFlag, process.TorURIFlag}) +} + +func TestState_CustomFlags(t *testing.T) { + s := State{} + test.CheckProcessCustomFlags(t, &s, nil) +} + +func TestState_Initialize(t *testing.T) { + test.CheckInitialize(t, &State{}, func(p *process_mock.MockProviderMockRecorder) { + p.Cache("down-hostname") + p.ConfigClient([]string{configapi.ForbiddenHostnamesKey, configapi.BlackListThresholdKey}) + p.HTTPClient() + }) +} + +func TestState_Subscribers(t *testing.T) { + s := State{} + test.CheckProcessSubscribers(t, &s, []test.SubscriberDef{ + {Queue: "blacklistingQueue", Exchange: "url.timeout"}, + }) +} + +func TestHandleTimeoutURLEventNoTimeout(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + subscriberMock := event_mock.NewMockSubscriber(mockCtrl) + configClientMock := client_mock.NewMockClient(mockCtrl) + hostnameCacheMock := cache_mock.NewMockCache(mockCtrl) + httpClientMock := http_mock.NewMockClient(mockCtrl) + httpResponseMock := http_mock.NewMockResponse(mockCtrl) + + msg := event.RawMessage{} + subscriberMock.EXPECT(). + Read(&msg, &event.TimeoutURLEvent{}). + SetArg(1, event.TimeoutURLEvent{ + URL: "https://down-example.onion:8080/reset-password?username=test", + }).Return(nil) + + httpClientMock.EXPECT().Get("https://down-example.onion:8080").Return(httpResponseMock, nil) + configClientMock.EXPECT().GetForbiddenHostnames().Return([]configapi.ForbiddenHostname{}, nil) + + s := State{configClient: configClientMock, hostnameCache: hostnameCacheMock, httpClient: httpClientMock} + if err := s.handleTimeoutURLEvent(subscriberMock, msg); err != nil { + t.Fail() + } +} + func TestHandleTimeoutURLEventNoDispatch(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -18,20 +82,24 @@ func TestHandleTimeoutURLEventNoDispatch(t *testing.T) { subscriberMock := event_mock.NewMockSubscriber(mockCtrl) configClientMock := client_mock.NewMockClient(mockCtrl) hostnameCacheMock := cache_mock.NewMockCache(mockCtrl) + httpClientMock := http_mock.NewMockClient(mockCtrl) + httpResponseMock := http_mock.NewMockResponse(mockCtrl) msg := event.RawMessage{} subscriberMock.EXPECT(). Read(&msg, &event.TimeoutURLEvent{}). SetArg(1, event.TimeoutURLEvent{ - URL: "https://down-example.onion", + URL: "https://down-example.onion/login.php", }).Return(nil) + httpClientMock.EXPECT().Get("https://down-example.onion").Return(httpResponseMock, http.ErrTimeout) + configClientMock.EXPECT().GetForbiddenHostnames().Return([]configapi.ForbiddenHostname{}, nil) configClientMock.EXPECT().GetBlackListThreshold().Return(configapi.BlackListThreshold{Threshold: 10}, nil) - hostnameCacheMock.EXPECT().GetInt64("hostnames:down-example.onion").Return(int64(0), cache.ErrNIL) - hostnameCacheMock.EXPECT().SetInt64("hostnames:down-example.onion", int64(1), cache.NoTTL).Return(nil) + hostnameCacheMock.EXPECT().GetInt64("down-example.onion").Return(int64(0), cache.ErrNIL) + hostnameCacheMock.EXPECT().SetInt64("down-example.onion", int64(1), cache.NoTTL).Return(nil) - s := State{configClient: configClientMock, hostnameCache: hostnameCacheMock} + s := State{configClient: configClientMock, hostnameCache: hostnameCacheMock, httpClient: httpClientMock} if err := s.handleTimeoutURLEvent(subscriberMock, msg); err != nil { t.Fail() } @@ -44,17 +112,21 @@ func TestHandleTimeoutURLEvent(t *testing.T) { subscriberMock := event_mock.NewMockSubscriber(mockCtrl) configClientMock := client_mock.NewMockClient(mockCtrl) hostnameCacheMock := cache_mock.NewMockCache(mockCtrl) + httpClientMock := http_mock.NewMockClient(mockCtrl) + httpResponseMock := http_mock.NewMockResponse(mockCtrl) msg := event.RawMessage{} subscriberMock.EXPECT(). Read(&msg, &event.TimeoutURLEvent{}). SetArg(1, event.TimeoutURLEvent{ - URL: "https://down-example.onion", + URL: "https://down-example.onion/test.html", }).Return(nil) + httpClientMock.EXPECT().Get("https://down-example.onion").Return(httpResponseMock, http.ErrTimeout) + configClientMock.EXPECT().GetForbiddenHostnames().Return([]configapi.ForbiddenHostname{}, nil) configClientMock.EXPECT().GetBlackListThreshold().Return(configapi.BlackListThreshold{Threshold: 10}, nil) - hostnameCacheMock.EXPECT().GetInt64("hostnames:down-example.onion").Return(int64(9), nil) + hostnameCacheMock.EXPECT().GetInt64("down-example.onion").Return(int64(9), nil) configClientMock.EXPECT(). GetForbiddenHostnames(). @@ -66,9 +138,9 @@ func TestHandleTimeoutURLEvent(t *testing.T) { }). Return(nil) - hostnameCacheMock.EXPECT().SetInt64("hostnames:down-example.onion", int64(10), cache.NoTTL).Return(nil) + hostnameCacheMock.EXPECT().SetInt64("down-example.onion", int64(10), cache.NoTTL).Return(nil) - s := State{configClient: configClientMock, hostnameCache: hostnameCacheMock} + s := State{configClient: configClientMock, hostnameCache: hostnameCacheMock, httpClient: httpClientMock} if err := s.handleTimeoutURLEvent(subscriberMock, msg); err != nil { t.Fail() } @@ -86,23 +158,13 @@ func TestHandleTimeoutURLEventNoDuplicates(t *testing.T) { subscriberMock.EXPECT(). Read(&msg, &event.TimeoutURLEvent{}). SetArg(1, event.TimeoutURLEvent{ - URL: "https://facebookcorewwwi.onion", + URL: "https://facebookcorewwwi.onion/morning-routine.php?id=12", }).Return(nil) - configClientMock.EXPECT().GetBlackListThreshold().Return(configapi.BlackListThreshold{Threshold: 3}, nil) - - hostnameCacheMock.EXPECT().GetInt64("hostnames:facebookcorewwwi.onion").Return(int64(2), nil) - - configClientMock.EXPECT(). - GetForbiddenHostnames(). - Return([]configapi.ForbiddenHostname{{Hostname: "facebookcorewwwi.onion"}}, nil) - // Config not updated since hostname is already 'blacklisted' - // this may due because of change in threshold - - hostnameCacheMock.EXPECT().SetInt64("hostnames:facebookcorewwwi.onion", int64(3), cache.NoTTL).Return(nil) + configClientMock.EXPECT().GetForbiddenHostnames().Return([]configapi.ForbiddenHostname{{Hostname: "facebookcorewwwi.onion"}}, nil) s := State{configClient: configClientMock, hostnameCache: hostnameCacheMock} - if err := s.handleTimeoutURLEvent(subscriberMock, msg); err != nil { + if err := s.handleTimeoutURLEvent(subscriberMock, msg); !errors.Is(err, errAlreadyBlacklisted) { t.Fail() } } diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 3a5b676..d2a975d 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -1,5 +1,7 @@ package cache +//go:generate mockgen -destination=../cache_mock/cache_mock.go -package=cache_mock . Cache + import ( "errors" "time" @@ -12,8 +14,6 @@ var ( ErrNIL = errors.New("value is nil") ) -//go:generate mockgen -destination=../cache_mock/cache_mock.go -package=cache_mock . Cache - // Cache represent a KV database type Cache interface { GetBytes(key string) ([]byte, error) diff --git a/internal/cache/redis.go b/internal/cache/redis.go index 07bc9fe..a1252e5 100644 --- a/internal/cache/redis.go +++ b/internal/cache/redis.go @@ -2,26 +2,29 @@ package cache import ( "context" + "fmt" "github.com/go-redis/redis/v8" "time" ) type redisCache struct { - client *redis.Client + client *redis.Client + keyPrefix string } // NewRedisCache return a new Cache using redis as backend -func NewRedisCache(URI string) (Cache, error) { +func NewRedisCache(URI string, keyPrefix string) (Cache, error) { return &redisCache{ client: redis.NewClient(&redis.Options{ Addr: URI, DB: 0, }), + keyPrefix: keyPrefix, }, nil } func (rc *redisCache) GetBytes(key string) ([]byte, error) { - val, err := rc.client.Get(context.Background(), key).Bytes() + val, err := rc.client.Get(context.Background(), rc.getKey(key)).Bytes() if err == redis.Nil { err = ErrNIL } @@ -30,11 +33,11 @@ func (rc *redisCache) GetBytes(key string) ([]byte, error) { } func (rc *redisCache) SetBytes(key string, value []byte, TTL time.Duration) error { - return rc.client.Set(context.Background(), key, value, TTL).Err() + return rc.client.Set(context.Background(), rc.getKey(key), value, TTL).Err() } func (rc *redisCache) GetInt64(key string) (int64, error) { - val, err := rc.client.Get(context.Background(), key).Int64() + val, err := rc.client.Get(context.Background(), rc.getKey(key)).Int64() if err == redis.Nil { err = ErrNIL } @@ -43,5 +46,13 @@ func (rc *redisCache) GetInt64(key string) (int64, error) { } func (rc *redisCache) SetInt64(key string, value int64, TTL time.Duration) error { - return rc.client.Set(context.Background(), key, value, TTL).Err() + return rc.client.Set(context.Background(), rc.getKey(key), value, TTL).Err() +} + +func (rc *redisCache) getKey(key string) string { + if rc.keyPrefix == "" { + return key + } + + return fmt.Sprintf("%s:%s", rc.keyPrefix, key) } diff --git a/internal/cache/redis_test.go b/internal/cache/redis_test.go new file mode 100644 index 0000000..bdd8a52 --- /dev/null +++ b/internal/cache/redis_test.go @@ -0,0 +1,15 @@ +package cache + +import "testing" + +func TestRedisCache_GetKey(t *testing.T) { + rc := redisCache{} + if got := rc.getKey("user"); got != "user" { + t.Errorf("got %s want %s", got, "user") + } + + rc.keyPrefix = "config" + if got := rc.getKey("user"); got != "config:user" { + t.Errorf("got %s want %s", got, "config:user") + } +} diff --git a/internal/configapi/client/client.go b/internal/configapi/client/client.go index 8889824..268dc9a 100644 --- a/internal/configapi/client/client.go +++ b/internal/configapi/client/client.go @@ -1,5 +1,7 @@ package client +//go:generate mockgen -destination=../client_mock/client_mock.go -package=client_mock . Client + import ( "bytes" "encoding/json" @@ -12,8 +14,6 @@ import ( "time" ) -//go:generate mockgen -destination=../client_mock/client_mock.go -package=client_mock . Client - const ( // AllowedMimeTypesKey is the key to access the allowed mime types config AllowedMimeTypesKey = "allowed-mime-types" diff --git a/internal/configapi/configapi.go b/internal/configapi/configapi.go index d382f4c..7bdbee8 100644 --- a/internal/configapi/configapi.go +++ b/internal/configapi/configapi.go @@ -41,7 +41,7 @@ func (state *State) CustomFlags() []cli.Flag { // Initialize the process func (state *State) Initialize(provider process.Provider) error { - configCache, err := provider.Cache() + configCache, err := provider.Cache("configuration") if err != nil { return err } @@ -76,7 +76,7 @@ func (state *State) Subscribers() []process.SubscriberDef { } // HTTPHandler returns the HTTP API the process expose -func (state *State) HTTPHandler(provider process.Provider) http.Handler { +func (state *State) HTTPHandler() http.Handler { r := mux.NewRouter() r.HandleFunc("/config/{key}", state.getConfiguration).Methods(http.MethodGet) r.HandleFunc("/config/{key}", state.setConfiguration).Methods(http.MethodPut) @@ -90,7 +90,7 @@ func (state *State) getConfiguration(w http.ResponseWriter, r *http.Request) { log.Debug().Str("key", key).Msg("Getting key") - b, err := state.configCache.GetBytes("conf:" + key) + b, err := state.configCache.GetBytes(key) if err != nil { log.Err(err).Msg("error while retrieving configuration") w.WriteHeader(http.StatusInternalServerError) @@ -114,7 +114,7 @@ func (state *State) setConfiguration(w http.ResponseWriter, r *http.Request) { log.Debug().Str("key", key).Bytes("value", b).Msg("Setting key") - if err := state.configCache.SetBytes("conf:"+key, b, cache.NoTTL); err != nil { + if err := state.configCache.SetBytes(key, b, cache.NoTTL); err != nil { log.Err(err).Msg("error while setting configuration") w.WriteHeader(http.StatusInternalServerError) return @@ -135,8 +135,8 @@ func (state *State) setConfiguration(w http.ResponseWriter, r *http.Request) { func setDefaultValues(configCache cache.Cache, values map[string]string) error { for key, value := range values { - if _, err := configCache.GetBytes("conf:" + key); err == cache.ErrNIL { - if err := configCache.SetBytes("conf:"+key, []byte(value), cache.NoTTL); err != nil { + if _, err := configCache.GetBytes(key); err == cache.ErrNIL { + if err := configCache.SetBytes(key, []byte(value), cache.NoTTL); err != nil { return fmt.Errorf("error while setting default value of %s: %s", key, err) } } diff --git a/internal/configapi/configapi_test.go b/internal/configapi/configapi_test.go index 7986741..71d0405 100644 --- a/internal/configapi/configapi_test.go +++ b/internal/configapi/configapi_test.go @@ -5,6 +5,9 @@ import ( "github.com/creekorful/trandoshan/internal/cache_mock" "github.com/creekorful/trandoshan/internal/event" "github.com/creekorful/trandoshan/internal/event_mock" + "github.com/creekorful/trandoshan/internal/process" + "github.com/creekorful/trandoshan/internal/process_mock" + "github.com/creekorful/trandoshan/internal/test" "github.com/golang/mock/gomock" "github.com/gorilla/mux" "io/ioutil" @@ -14,12 +17,42 @@ import ( "testing" ) +func TestState_Name(t *testing.T) { + s := State{} + if s.Name() != "configapi" { + t.Fail() + } +} + +func TestState_CommonFlags(t *testing.T) { + s := State{} + test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.RedisURIFlag}) +} + +func TestState_CustomFlags(t *testing.T) { + s := State{} + test.CheckProcessCustomFlags(t, &s, []string{"default-value"}) +} + +func TestState_Initialize(t *testing.T) { + test.CheckInitialize(t, &State{}, func(p *process_mock.MockProviderMockRecorder) { + p.Cache("configuration") + p.Publisher() + p.GetValues("default-value") + }) +} + +func TestState_Subscribers(t *testing.T) { + s := State{} + test.CheckProcessSubscribers(t, &s, nil) +} + func TestGetConfiguration(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() configCacheMock := cache_mock.NewMockCache(mockCtrl) - configCacheMock.EXPECT().GetBytes("conf:hello").Return([]byte("{\"ttl\": \"10s\"}"), nil) + configCacheMock.EXPECT().GetBytes("hello").Return([]byte("{\"ttl\": \"10s\"}"), nil) req := httptest.NewRequest(http.MethodGet, "/config/hello", nil) req = mux.SetURLVars(req, map[string]string{"key": "hello"}) @@ -53,7 +86,7 @@ func TestSetConfiguration(t *testing.T) { configCacheMock := cache_mock.NewMockCache(mockCtrl) pubMock := event_mock.NewMockPublisher(mockCtrl) - configCacheMock.EXPECT().SetBytes("conf:hello", []byte("{\"ttl\": \"10s\"}"), cache.NoTTL).Return(nil) + configCacheMock.EXPECT().SetBytes("hello", []byte("{\"ttl\": \"10s\"}"), cache.NoTTL).Return(nil) pubMock.EXPECT().PublishJSON("config", event.RawMessage{ Body: []byte("{\"ttl\": \"10s\"}"), Headers: map[string]interface{}{"Config-Key": "hello"}, diff --git a/internal/constraint/hostname.go b/internal/constraint/hostname.go index 0e3ee8d..dacf4af 100644 --- a/internal/constraint/hostname.go +++ b/internal/constraint/hostname.go @@ -6,7 +6,7 @@ import ( "strings" ) -// CheckHostnameAllowed check if given URL hostname is allowed for crawling +// CheckHostnameAllowed check if given URL hostname is allowed func CheckHostnameAllowed(configClient configapi.Client, rawurl string) (bool, error) { u, err := url.Parse(rawurl) if err != nil { diff --git a/internal/crawler/crawler.go b/internal/crawler/crawler.go index 2514dc9..161a52f 100644 --- a/internal/crawler/crawler.go +++ b/internal/crawler/crawler.go @@ -1,22 +1,18 @@ package crawler import ( - "crypto/tls" "fmt" "github.com/creekorful/trandoshan/internal/clock" configapi "github.com/creekorful/trandoshan/internal/configapi/client" "github.com/creekorful/trandoshan/internal/constraint" - chttp "github.com/creekorful/trandoshan/internal/crawler/http" "github.com/creekorful/trandoshan/internal/event" + chttp "github.com/creekorful/trandoshan/internal/http" "github.com/creekorful/trandoshan/internal/process" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" - "github.com/valyala/fasthttp" - "github.com/valyala/fasthttp/fasthttpproxy" "io/ioutil" "net/http" "strings" - "time" ) var ( @@ -38,36 +34,21 @@ func (state *State) Name() string { // CommonFlags return process common flags func (state *State) CommonFlags() []string { - return []string{process.HubURIFlag, process.ConfigAPIURIFlag} + return []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.UserAgentFlag, process.TorURIFlag} } // CustomFlags return process custom flags func (state *State) CustomFlags() []cli.Flag { - return []cli.Flag{ - &cli.StringFlag{ - Name: "tor-uri", - Usage: "URI to the TOR SOCKS proxy", - Required: true, - }, - &cli.StringFlag{ - Name: "user-agent", - Usage: "User agent to use", - Value: "Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0", - }, - } + return []cli.Flag{} } // Initialize the process func (state *State) Initialize(provider process.Provider) error { - state.httpClient = chttp.NewFastHTTPClient(&fasthttp.Client{ - // Use given TOR proxy to reach the hidden services - Dial: fasthttpproxy.FasthttpSocksDialer(provider.GetValue("tor-uri")), - // Disable SSL verification since we do not really care about this - TLSConfig: &tls.Config{InsecureSkipVerify: true}, - ReadTimeout: time.Second * 5, - WriteTimeout: time.Second * 5, - Name: provider.GetValue("user-agent"), - }) + httpClient, err := provider.HTTPClient() + if err != nil { + return err + } + state.httpClient = httpClient cl, err := provider.Clock() if err != nil { @@ -92,7 +73,7 @@ func (state *State) Subscribers() []process.SubscriberDef { } // HTTPHandler returns the HTTP API the process expose -func (state *State) HTTPHandler(provider process.Provider) http.Handler { +func (state *State) HTTPHandler() http.Handler { return nil } @@ -108,7 +89,7 @@ func (state *State) handleNewURLEvent(subscriber event.Subscriber, msg event.Raw return err } else if !allowed { log.Debug().Str("url", evt.URL).Msg("Skipping forbidden hostname") - return errHostnameNotAllowed + return fmt.Errorf("%s %w", evt.URL, errHostnameNotAllowed) } r, err := state.httpClient.Get(evt.URL) diff --git a/internal/crawler/crawler_test.go b/internal/crawler/crawler_test.go index 485f140..eeb895f 100644 --- a/internal/crawler/crawler_test.go +++ b/internal/crawler/crawler_test.go @@ -5,16 +5,51 @@ import ( "github.com/creekorful/trandoshan/internal/clock_mock" "github.com/creekorful/trandoshan/internal/configapi/client" "github.com/creekorful/trandoshan/internal/configapi/client_mock" - "github.com/creekorful/trandoshan/internal/crawler/http" - "github.com/creekorful/trandoshan/internal/crawler/http_mock" "github.com/creekorful/trandoshan/internal/event" "github.com/creekorful/trandoshan/internal/event_mock" + "github.com/creekorful/trandoshan/internal/http" + "github.com/creekorful/trandoshan/internal/http_mock" + "github.com/creekorful/trandoshan/internal/process" + "github.com/creekorful/trandoshan/internal/process_mock" + "github.com/creekorful/trandoshan/internal/test" "github.com/golang/mock/gomock" "strings" "testing" "time" ) +func TestState_Name(t *testing.T) { + s := State{} + if s.Name() != "crawler" { + t.Fail() + } +} + +func TestState_CommonFlags(t *testing.T) { + s := State{} + test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.UserAgentFlag, process.TorURIFlag}) +} + +func TestState_CustomFlags(t *testing.T) { + s := State{} + test.CheckProcessCustomFlags(t, &s, nil) +} + +func TestState_Initialize(t *testing.T) { + test.CheckInitialize(t, &State{}, func(p *process_mock.MockProviderMockRecorder) { + p.HTTPClient() + p.Clock() + p.ConfigClient([]string{client.AllowedMimeTypesKey, client.ForbiddenHostnamesKey}) + }) +} + +func TestState_Subscribers(t *testing.T) { + s := State{} + test.CheckProcessSubscribers(t, &s, []test.SubscriberDef{ + {Queue: "crawlingQueue", Exchange: "url.new"}, + }) +} + func TestHandleNewURLEvent(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -167,7 +202,7 @@ func TestHandleNewURLEventHostnameForbidden(t *testing.T) { configClientMock.EXPECT().GetForbiddenHostnames(). Return([]client.ForbiddenHostname{{Hostname: "facebookcorewwwi.onion"}}, nil) - if err := s.handleNewURLEvent(subscriberMock, msg); err != errHostnameNotAllowed { + if err := s.handleNewURLEvent(subscriberMock, msg); !errors.Is(err, errHostnameNotAllowed) { t.Fail() } } diff --git a/internal/event/event.go b/internal/event/event.go index 0f40fec..5309860 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -1,20 +1,16 @@ package event -import "time" - //go:generate mockgen -destination=../event_mock/event_mock.go -package=event_mock . Publisher,Subscriber +import "time" + const ( // NewURLExchange is the exchange used when an URL is schedule for crawling NewURLExchange = "url.new" - // FoundURLExchange is the exchange used when an URL is extracted from resource - FoundURLExchange = "url.found" // TimeoutURLExchange is the exchange used when a crawling fail because of timeout TimeoutURLExchange = "url.timeout" // NewResourceExchange is the exchange used when a new resource has been crawled NewResourceExchange = "resource.new" - // NewIndexExchange is the exchange used when a resource has been indexed - NewIndexExchange = "index.new" // ConfigExchange is the exchange used to dispatch new configuration ConfigExchange = "config" ) @@ -35,16 +31,6 @@ func (msg *NewURLEvent) Exchange() string { return NewURLExchange } -// FoundURLEvent represent a found URL -type FoundURLEvent struct { - URL string `json:"url"` -} - -// Exchange returns the exchange where event should be push -func (msg *FoundURLEvent) Exchange() string { - return FoundURLExchange -} - // TimeoutURLEvent represent a failed crawling because of timeout type TimeoutURLEvent struct { URL string `json:"url"` @@ -67,19 +53,3 @@ type NewResourceEvent struct { func (msg *NewResourceEvent) Exchange() string { return NewResourceExchange } - -// NewIndexEvent represent a indexed resource -type NewIndexEvent struct { - URL string `json:"url"` - Body string `json:"body"` - Time time.Time `json:"time"` - Title string `json:"title"` - Meta map[string]string `json:"meta"` - Description string `json:"description"` - Headers map[string]string `json:"headers"` -} - -// Exchange returns the exchange where event should be push -func (msg *NewIndexEvent) Exchange() string { - return NewIndexExchange -} diff --git a/internal/crawler/http/client.go b/internal/http/client.go similarity index 100% rename from internal/crawler/http/client.go rename to internal/http/client.go diff --git a/internal/crawler/http/response.go b/internal/http/response.go similarity index 100% rename from internal/crawler/http/response.go rename to internal/http/response.go diff --git a/internal/indexer/auth/auth.go b/internal/indexer/auth/auth.go deleted file mode 100644 index 4d3ff0f..0000000 --- a/internal/indexer/auth/auth.go +++ /dev/null @@ -1,124 +0,0 @@ -package auth - -import ( - "context" - "fmt" - "github.com/dgrijalva/jwt-go" - "github.com/gorilla/mux" - "github.com/rs/zerolog/log" - "net/http" - "strings" -) - -type key int - -const ( - usernameKey key = iota -) - -// Token is the authentication token used by processes when dialing with the API -type Token struct { - // Username used for logging purposes - Username string `json:"username"` - - // Rights that the token provides - // Format is: METHOD - list of paths - Rights map[string][]string `json:"rights"` -} - -// Middleware is the authentication middleware -type Middleware struct { - signingKey []byte -} - -// NewMiddleware create a new Middleware instance with given secret token signing key -func NewMiddleware(signingKey []byte) *Middleware { - return &Middleware{signingKey: signingKey} -} - -// Middleware return an net/http compatible middleware func to use -func (m *Middleware) Middleware() mux.MiddlewareFunc { - return func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Extract authorization header - tokenStr := r.Header.Get("Authorization") - if tokenStr == "" { - log.Warn().Msg("missing token") - w.WriteHeader(http.StatusUnauthorized) - return - } - - tokenStr = strings.TrimPrefix(tokenStr, "Bearer ") - - // Decode the JWT token - token, err := jwt.Parse(tokenStr, func(t *jwt.Token) (interface{}, error) { - // Validate expected alg - if v, ok := t.Method.(*jwt.SigningMethodHMAC); !ok || v.Name != "HS256" { - return nil, fmt.Errorf("unexpected signing method: %s", t.Header["alg"]) - } - - // Return signing secret - return m.signingKey, nil - }) - if err != nil { - log.Err(err).Msg("error while decoding JWT token") - w.WriteHeader(http.StatusUnauthorized) - return - } - - // From here we have a valid JWT token, extract claims - claims, ok := token.Claims.(jwt.MapClaims) - if !ok { - log.Err(err).Msg("error while decoding token claims") - w.WriteHeader(http.StatusInternalServerError) - return - } - - rights := map[string][]string{} - for method, paths := range claims["rights"].(map[string]interface{}) { - for _, path := range paths.([]interface{}) { - rights[method] = append(rights[method], path.(string)) - } - } - - t := Token{ - Username: claims["username"].(string), - Rights: rights, - } - - // Validate rights - paths, contains := t.Rights[r.Method] - if !contains { - log.Warn(). - Str("username", t.Username). - Str("method", r.Method). - Str("resource", r.URL.Path). - Msg("Access to resources is unauthorized") - w.WriteHeader(http.StatusUnauthorized) - return - } - - authorized := false - for _, path := range paths { - if path == r.URL.Path { - authorized = true - break - } - } - - if !authorized { - log.Warn(). - Str("username", t.Username). - Str("method", r.Method). - Str("resource", r.URL.Path). - Msg("Access to resources is unauthorized") - w.WriteHeader(http.StatusUnauthorized) - return - } - - // Everything's fine, call next handler ;D - ctx := context.WithValue(r.Context(), usernameKey, t.Username) - next.ServeHTTP(w, r.WithContext(ctx)) - }) - } -} diff --git a/internal/indexer/auth/auth_test.go b/internal/indexer/auth/auth_test.go deleted file mode 100644 index cd87694..0000000 --- a/internal/indexer/auth/auth_test.go +++ /dev/null @@ -1,85 +0,0 @@ -package auth - -import ( - "fmt" - "io/ioutil" - "net/http" - "net/http/httptest" - "testing" -) - -func TestMiddleware_NoTokenShouldReturnUnauthorized(t *testing.T) { - m := (&Middleware{signingKey: []byte("test")}).Middleware()(okHandler()) - - // no token shouldn't be able to access - req := httptest.NewRequest(http.MethodGet, "/users", nil) - rec := httptest.NewRecorder() - - m.ServeHTTP(rec, req) - - if rec.Code != http.StatusUnauthorized { - t.Errorf("StatusUnauthorized was expected") - } -} - -func TestMiddleware_InvalidTokenShouldReturnUnauthorized(t *testing.T) { - m := (&Middleware{signingKey: []byte("test")}).Middleware()(okHandler()) - - req := httptest.NewRequest(http.MethodGet, "/users", nil) - req.Header.Add("Authorization", "zarBR") - rec := httptest.NewRecorder() - - m.ServeHTTP(rec, req) - - if rec.Code != http.StatusUnauthorized { - t.Errorf("StatusUnauthorized was expected") - } -} - -func TestMiddleware_BadRightsShouldReturnUnauthorized(t *testing.T) { - m := (&Middleware{signingKey: []byte("test")}).Middleware()(okHandler()) - - req := httptest.NewRequest(http.MethodPost, "/users", nil) - req.Header.Add("Authorization", "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6IkpvaG4gRG9lIiwicmlnaHRzIjp7IkdFVCI6WyIvdXNlcnMiXSwiUE9TVCI6WyIvc2VhcmNoIl19fQ.fRx0Q66ZgnY_rKCf-9Vaz6gzGKH_tKSgkVHhoQMtKfM") - rec := httptest.NewRecorder() - - m.ServeHTTP(rec, req) - - if rec.Code != http.StatusUnauthorized { - t.Errorf("StatusUnauthorized was expected") - } -} - -func TestMiddleware(t *testing.T) { - m := (&Middleware{signingKey: []byte("test")}).Middleware()(okHandler()) - - req := httptest.NewRequest(http.MethodGet, "/users?id=10", nil) - req.Header.Add("Authorization", "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6IkpvaG4gRG9lIiwicmlnaHRzIjp7IkdFVCI6WyIvdXNlcnMiXSwiUE9TVCI6WyIvc2VhcmNoIl19fQ.fRx0Q66ZgnY_rKCf-9Vaz6gzGKH_tKSgkVHhoQMtKfM") - rec := httptest.NewRecorder() - - m.ServeHTTP(rec, req) - - if rec.Code != http.StatusOK { - t.Errorf("StatusUnauthorized was expected") - } - - b, err := ioutil.ReadAll(rec.Body) - if err != nil { - t.Fail() - } - if string(b) != "Hello, John Doe" { - t.Fail() - } -} - -func okHandler() http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if username := r.Context().Value(usernameKey).(string); username != "" { - w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte(fmt.Sprintf("Hello, %s", username))) - return - } - - w.WriteHeader(http.StatusNoContent) - } -} diff --git a/internal/indexer/client/client.go b/internal/indexer/client/client.go deleted file mode 100644 index f5e2b49..0000000 --- a/internal/indexer/client/client.go +++ /dev/null @@ -1,141 +0,0 @@ -package client - -import ( - "encoding/base64" - "fmt" - "github.com/go-resty/resty/v2" - "strconv" - "time" -) - -//go:generate mockgen -destination=../client_mock/client_mock.go -package=client_mock . Client - -const ( - // PaginationPageHeader is the header to determinate current page in paginated endpoint - PaginationPageHeader = "X-Pagination-Page" - // PaginationSizeHeader is the header to determinate page size in paginated endpoint - PaginationSizeHeader = "X-Pagination-Size" - // PaginationCountHeader is the header to determinate total count of element in paginated endpoint - PaginationCountHeader = "X-Pagination-Count" - // PaginationPageQueryParam is the query parameter used to set current page in paginated endpoint - PaginationPageQueryParam = "pagination-page" - // PaginationSizeQueryParam is the query parameter used to set page size in paginated endpoint - PaginationSizeQueryParam = "pagination-size" -) - -// ResourceDto represent a resource as given by the API -type ResourceDto struct { - URL string `json:"url"` - Body string `json:"body"` - Time time.Time `json:"time"` - Title string `json:"title"` - Meta map[string]string `json:"meta"` - Description string `json:"description"` - Headers map[string]string `json:"headers"` -} - -// CredentialsDto represent the credential when logging in the API -type CredentialsDto struct { - Username string `json:"username"` - Password string `json:"password"` -} - -// ResSearchParams is the search params used -type ResSearchParams struct { - URL string - Keyword string - StartDate time.Time - EndDate time.Time - WithBody bool - PageSize int - PageNumber int - // TODO allow searching by meta - // TODO allow searching by headers -} - -// Client is the interface to interact with the indexer API -type Client interface { - SearchResources(params *ResSearchParams) ([]ResourceDto, int64, error) - ScheduleURL(url string) error -} - -type client struct { - httpClient *resty.Client - baseURL string -} - -func (c *client) SearchResources(params *ResSearchParams) ([]ResourceDto, int64, error) { - targetEndpoint := fmt.Sprintf("%s/v1/resources?", c.baseURL) - - req := c.httpClient.R() - - if params.URL != "" { - b64URL := base64.URLEncoding.EncodeToString([]byte(params.URL)) - req.SetQueryParam("url", b64URL) - } - - if params.Keyword != "" { - req.SetQueryParam("keyword", params.Keyword) - } - - if !params.StartDate.IsZero() { - req.SetQueryParam("start-date", params.StartDate.Format(time.RFC3339)) - } - - if !params.EndDate.IsZero() { - req.SetQueryParam("end-date", params.EndDate.Format(time.RFC3339)) - } - - if params.PageNumber != 0 { - req.Header.Set(PaginationPageHeader, strconv.Itoa(params.PageNumber)) - } - if params.PageSize != 0 { - req.Header.Set(PaginationSizeHeader, strconv.Itoa(params.PageSize)) - } - - var resources []ResourceDto - req.SetResult(&resources) - - res, err := req.Get(targetEndpoint) - if err != nil { - return nil, 0, err - } - - count, err := strconv.ParseInt(res.Header().Get(PaginationCountHeader), 10, 64) - if err != nil { - return nil, 0, err - } - - return resources, count, nil -} - -func (c *client) ScheduleURL(url string) error { - targetEndpoint := fmt.Sprintf("%s/v1/urls", c.baseURL) - - req := c.httpClient.R() - req.SetHeader("Content-Type", "application/json") - req.SetBody(fmt.Sprintf("\"%s\"", url)) - - _, err := req.Post(targetEndpoint) - return err -} - -// NewClient create a new API client using given details -func NewClient(baseURL, token string) Client { - httpClient := resty.New() - httpClient.SetAuthScheme("Bearer") - httpClient.SetAuthToken(token) - httpClient.OnAfterResponse(func(c *resty.Client, r *resty.Response) error { - if r.StatusCode() > 302 { - return fmt.Errorf("error when making HTTP request: %s", r.Status()) - } - return nil - }) - - client := &client{ - httpClient: httpClient, - baseURL: baseURL, - } - - return client -} diff --git a/internal/indexer/index/elastic.go b/internal/indexer/index/elastic.go index 25c035d..a920921 100644 --- a/internal/indexer/index/elastic.go +++ b/internal/indexer/index/elastic.go @@ -2,10 +2,10 @@ package index import ( "context" - "encoding/json" - "github.com/creekorful/trandoshan/internal/indexer/client" + "github.com/PuerkitoBio/goquery" "github.com/olivere/elastic/v7" "github.com/rs/zerolog/log" + "strings" "time" ) @@ -56,12 +56,21 @@ const mapping = ` } }` -type elasticSearchDB struct { +type resourceIdx struct { + URL string `json:"url"` + Body string `json:"body"` + Time time.Time `json:"time"` + Title string `json:"title"` + Meta map[string]string `json:"meta"` + Description string `json:"description"` + Headers map[string]string `json:"headers"` +} + +type elasticSearchIndex struct { client *elastic.Client } -// NewElasticIndex create a new index based on ES instance -func NewElasticIndex(uri string) (Index, error) { +func newElasticIndex(uri string) (Index, error) { // Create Elasticsearch client ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -79,103 +88,24 @@ func NewElasticIndex(uri string) (Index, error) { return nil, err } - return &elasticSearchDB{ + return &elasticSearchIndex{ client: ec, }, nil } -func (e *elasticSearchDB) SearchResources(params *client.ResSearchParams) ([]ResourceIdx, error) { - q := buildSearchQuery(params) - from := (params.PageNumber - 1) * params.PageSize - - res, err := e.client.Search(). - Index(resourcesIndex). - Query(q). - From(from). - Size(params.PageSize). - Do(context.Background()) - if err != nil { - return nil, err - } - - var resources []ResourceIdx - for _, hit := range res.Hits.Hits { - var resource ResourceIdx - if err := json.Unmarshal(hit.Source, &resource); err != nil { - log.Warn().Str("err", err.Error()).Msg("Error while un-marshaling resource") - continue - } - - // Remove body if not wanted - if !params.WithBody { - resource.Body = "" - } - - resources = append(resources, resource) - } - - return resources, nil -} - -func (e *elasticSearchDB) CountResources(params *client.ResSearchParams) (int64, error) { - q := buildSearchQuery(params) - - count, err := e.client.Count(resourcesIndex).Query(q).Do(context.Background()) +func (e *elasticSearchIndex) IndexResource(url string, time time.Time, body string, headers map[string]string) error { + res, err := extractResource(url, time, body, headers) if err != nil { - return 0, err + return err } - return count, nil -} - -func (e *elasticSearchDB) AddResource(res ResourceIdx) error { - _, err := e.client.Index(). + _, err = e.client.Index(). Index(resourcesIndex). BodyJson(res). Do(context.Background()) return err } -func buildSearchQuery(params *client.ResSearchParams) elastic.Query { - var queries []elastic.Query - if params.URL != "" { - log.Trace().Str("url", params.URL).Msg("SearchQuery: Setting url") - queries = append(queries, elastic.NewTermQuery("url.keyword", params.URL)) - } - if params.Keyword != "" { - log.Trace().Str("body", params.Keyword).Msg("SearchQuery: Setting body") - queries = append(queries, elastic.NewMatchQuery("body", params.Keyword)) - } - if !params.StartDate.IsZero() || !params.EndDate.IsZero() { - timeQuery := elastic.NewRangeQuery("time") - - if !params.StartDate.IsZero() { - log.Trace(). - Str("startDate", params.StartDate.Format(time.RFC3339)). - Msg("SearchQuery: Setting startDate") - timeQuery.Gte(params.StartDate.Format(time.RFC3339)) - } - if !params.EndDate.IsZero() { - log.Trace(). - Str("endDate", params.EndDate.Format(time.RFC3339)). - Msg("SearchQuery: Setting endDate") - timeQuery.Lte(params.EndDate.Format(time.RFC3339)) - } - queries = append(queries, timeQuery) - } - - // Handle specific case - if len(queries) == 0 { - return elastic.NewMatchAllQuery() - } - if len(queries) == 1 { - return queries[0] - } - - // otherwise AND combine them - return elastic.NewBoolQuery().Must(queries...) -} - func setupElasticSearch(ctx context.Context, es *elastic.Client) error { // Setup index if doesn't exist exist, err := es.IndexExists(resourcesIndex).Do(ctx) @@ -193,3 +123,46 @@ func setupElasticSearch(ctx context.Context, es *elastic.Client) error { return nil } + +func extractResource(url string, time time.Time, body string, headers map[string]string) (*resourceIdx, error) { + doc, err := goquery.NewDocumentFromReader(strings.NewReader(body)) + if err != nil { + return nil, err + } + + // Get resource title + title := doc.Find("title").First().Text() + + // Get meta values + meta := map[string]string{} + doc.Find("meta").Each(func(i int, s *goquery.Selection) { + name, _ := s.Attr("name") + value, _ := s.Attr("content") + + // if name is empty then try to lookup using property + if name == "" { + name, _ = s.Attr("property") + if name == "" { + return + } + } + + meta[strings.ToLower(name)] = value + }) + + // Lowercase headers + lowerCasedHeaders := map[string]string{} + for key, value := range headers { + lowerCasedHeaders[strings.ToLower(key)] = value + } + + return &resourceIdx{ + URL: url, + Body: body, + Time: time, + Title: title, + Meta: meta, + Description: meta["description"], + Headers: lowerCasedHeaders, + }, nil +} diff --git a/internal/indexer/index/elastic_test.go b/internal/indexer/index/elastic_test.go new file mode 100644 index 0000000..a4e145b --- /dev/null +++ b/internal/indexer/index/elastic_test.go @@ -0,0 +1,56 @@ +package index + +import ( + "github.com/creekorful/trandoshan/internal/event" + "testing" + "time" +) + +func TestExtractResource(t *testing.T) { + body := ` +Creekorful Inc + +This is sparta + + + + + +` + + msg := event.NewResourceEvent{ + URL: "https://example.org/300", + Body: body, + } + + resDto, err := extractResource("https://example.org/300", time.Time{}, body, map[string]string{"Content-Type": "application/json"}) + if err != nil { + t.FailNow() + } + + if resDto.URL != "https://example.org/300" { + t.Fail() + } + if resDto.Title != "Creekorful Inc" { + t.Fail() + } + if resDto.Body != msg.Body { + t.Fail() + } + + if resDto.Description != "Zhello world" { + t.Fail() + } + + if resDto.Meta["description"] != "Zhello world" { + t.Fail() + } + + if resDto.Meta["og:url"] != "https://example.org" { + t.Fail() + } + + if resDto.Headers["content-type"] != "application/json" { + t.Fail() + } +} diff --git a/internal/indexer/index/index.go b/internal/indexer/index/index.go index 3446612..a3da953 100644 --- a/internal/indexer/index/index.go +++ b/internal/indexer/index/index.go @@ -1,27 +1,33 @@ package index +//go:generate mockgen -destination=../index_mock/index_mock.go -package=index_mock . Index + import ( - "github.com/creekorful/trandoshan/internal/indexer/client" + "fmt" "time" ) -//go:generate mockgen -destination=../index_mock/index_mock.go -package=index_mock . Index - -// ResourceIdx represent a resource as stored in elasticsearch -type ResourceIdx struct { - URL string `json:"url"` - Body string `json:"body"` - Time time.Time `json:"time"` - Title string `json:"title"` - Meta map[string]string `json:"meta"` - Description string `json:"description"` - Headers map[string]string `json:"headers"` -} +const ( + // Elastic is an Index backed by ES instance + Elastic = "elastic" + // Local is an Index backed by local FS instance + Local = "local" +) // Index is the interface used to abstract communication // with the persistence unit type Index interface { - SearchResources(params *client.ResSearchParams) ([]ResourceIdx, error) - CountResources(params *client.ResSearchParams) (int64, error) - AddResource(res ResourceIdx) error + IndexResource(url string, time time.Time, body string, headers map[string]string) error +} + +// NewIndex create a new index using given driver, destination +func NewIndex(driver string, dest string) (Index, error) { + switch driver { + case Elastic: + return newElasticIndex(dest) + case Local: + return newLocalIndex(dest) + default: + return nil, fmt.Errorf("no driver named %s found", driver) + } } diff --git a/internal/archiver/storage/local.go b/internal/indexer/index/local.go similarity index 57% rename from internal/archiver/storage/local.go rename to internal/indexer/index/local.go index c498c0b..44f14f9 100644 --- a/internal/archiver/storage/local.go +++ b/internal/indexer/index/local.go @@ -1,4 +1,4 @@ -package storage +package index import ( "fmt" @@ -12,21 +12,25 @@ import ( "time" ) -type localStorage struct { +type localIndex struct { baseDir string } -// NewLocalStorage returns a new Storage that use local file system -func NewLocalStorage(root string) (Storage, error) { - return &localStorage{baseDir: root}, nil +func newLocalIndex(root string) (Index, error) { + return &localIndex{baseDir: root}, nil } -func (s *localStorage) Store(url string, time time.Time, body []byte) error { +func (s *localIndex) IndexResource(url string, time time.Time, body string, headers map[string]string) error { path, err := formatPath(url, time) if err != nil { return err } + content, err := formatResource(url, body, headers) + if err != nil { + return err + } + fullPath := filepath.Join(s.baseDir, path) dir := filepath.Dir(fullPath) @@ -34,13 +38,31 @@ func (s *localStorage) Store(url string, time time.Time, body []byte) error { return err } - if err := ioutil.WriteFile(fullPath, body, 0640); err != nil { + if err := ioutil.WriteFile(fullPath, content, 0640); err != nil { return err } return nil } +func formatResource(url string, body string, headers map[string]string) ([]byte, error) { + builder := strings.Builder{} + + // First URL + builder.WriteString(fmt.Sprintf("%s\n\n", url)) + + // Then headers + for key, value := range headers { + builder.WriteString(fmt.Sprintf("%s: %s\n", key, value)) + } + builder.WriteString("\n") + + // Then body + builder.WriteString(body) + + return []byte(builder.String()), nil +} + func formatPath(rawURL string, time time.Time) (string, error) { b := strings.Builder{} diff --git a/internal/archiver/storage/local_test.go b/internal/indexer/index/local_test.go similarity index 63% rename from internal/archiver/storage/local_test.go rename to internal/indexer/index/local_test.go index a16912a..e3763b6 100644 --- a/internal/archiver/storage/local_test.go +++ b/internal/indexer/index/local_test.go @@ -1,4 +1,4 @@ -package storage +package index import ( "io/ioutil" @@ -47,18 +47,18 @@ func TestFormatPath(t *testing.T) { } } -func TestLocalStorage_Store(t *testing.T) { +func TestLocalIndex_IndexResource(t *testing.T) { d, err := ioutil.TempDir("", "") if err != nil { t.FailNow() } defer os.RemoveAll(d) - s := localStorage{baseDir: d} + s := localIndex{baseDir: d} ti := time.Date(2020, time.October, 29, 12, 4, 9, 0, time.UTC) - if err := s.Store("https://google.com", ti, []byte("Hello, world")); err != nil { + if err := s.IndexResource("https://google.com", ti, "Hello, world", map[string]string{"Server": "Traefik"}); err != nil { t.Fail() } @@ -76,7 +76,18 @@ func TestLocalStorage_Store(t *testing.T) { if err != nil { t.Fail() } - if string(b) != "Hello, world" { + if string(b) != "https://google.com\n\nServer: Traefik\n\nHello, world" { t.Fail() } } + +func TestFormatResource(t *testing.T) { + res, err := formatResource("https://google.com", "Hello, world", map[string]string{"Server": "Traefik", "Content-Type": "text/html"}) + if err != nil { + t.FailNow() + } + + if string(res) != "https://google.com\n\nServer: Traefik\nContent-Type: text/html\n\nHello, world" { + t.Errorf("got %s want %s", string(res), "https://google.com\n\nServer: Traefik\nContent-Type: text/html\n\nHello, world") + } +} diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index bc5966a..79abfc9 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -1,43 +1,24 @@ package indexer import ( - "encoding/base64" - "encoding/json" "fmt" - "github.com/PuerkitoBio/goquery" - "github.com/PuerkitoBio/purell" - "github.com/creekorful/trandoshan/internal/cache" configapi "github.com/creekorful/trandoshan/internal/configapi/client" + "github.com/creekorful/trandoshan/internal/constraint" "github.com/creekorful/trandoshan/internal/event" - "github.com/creekorful/trandoshan/internal/indexer/auth" - "github.com/creekorful/trandoshan/internal/indexer/client" "github.com/creekorful/trandoshan/internal/indexer/index" "github.com/creekorful/trandoshan/internal/process" - "github.com/gorilla/mux" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" - "mvdan.cc/xurls/v2" "net/http" - "net/url" - "strconv" - "strings" - "time" ) -var ( - defaultPaginationSize = 50 - maxPaginationSize = 100 - - errHostnameNotAllowed = fmt.Errorf("hostname is not allowed") - errAlreadyIndexed = fmt.Errorf("resource is already indexed") -) +var errHostnameNotAllowed = fmt.Errorf("hostname is not allowed") // State represent the application state type State struct { index index.Index - pub event.Publisher + indexDriver string configClient configapi.Client - urlCache cache.Cache } // Name return the process name @@ -47,20 +28,20 @@ func (state *State) Name() string { // CommonFlags return process common flags func (state *State) CommonFlags() []string { - return []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag} + return []string{process.HubURIFlag, process.ConfigAPIURIFlag} } // CustomFlags return process custom flags func (state *State) CustomFlags() []cli.Flag { return []cli.Flag{ &cli.StringFlag{ - Name: "elasticsearch-uri", - Usage: "URI to the Elasticsearch server", + Name: "index-driver", + Usage: "Name of the storage driver", Required: true, }, &cli.StringFlag{ - Name: "signing-key", - Usage: "Signing key for the JWT token", + Name: "index-dest", + Usage: "Destination (config) passed to the driver", Required: true, }, } @@ -68,108 +49,33 @@ func (state *State) CustomFlags() []cli.Flag { // Initialize the process func (state *State) Initialize(provider process.Provider) error { - db, err := index.NewElasticIndex(provider.GetValue("elasticsearch-uri")) - if err != nil { - return err - } - state.index = db - - pub, err := provider.Subscriber() + indexDriver := provider.GetValue("index-driver") + idx, err := index.NewIndex(indexDriver, provider.GetValue("index-dest")) if err != nil { return err } - state.pub = pub + state.index = idx + state.indexDriver = indexDriver - configClient, err := provider.ConfigClient([]string{configapi.RefreshDelayKey, configapi.ForbiddenHostnamesKey}) + configClient, err := provider.ConfigClient([]string{configapi.ForbiddenHostnamesKey}) if err != nil { return err } state.configClient = configClient - urlCache, err := provider.Cache() - if err != nil { - return err - } - state.urlCache = urlCache - return nil } // Subscribers return the process subscribers func (state *State) Subscribers() []process.SubscriberDef { return []process.SubscriberDef{ - {Exchange: event.NewResourceExchange, Queue: "indexingQueue", Handler: state.handleNewResourceEvent}, + {Exchange: event.NewResourceExchange, Queue: fmt.Sprintf("%sIndexingQueue", state.indexDriver), Handler: state.handleNewResourceEvent}, } } // HTTPHandler returns the HTTP API the process expose -func (state *State) HTTPHandler(provider process.Provider) http.Handler { - r := mux.NewRouter() - - signingKey := []byte(provider.GetValue("signing-key")) - authMiddleware := auth.NewMiddleware(signingKey) - r.Use(authMiddleware.Middleware()) - - r.HandleFunc("/v1/resources", state.searchResources).Methods(http.MethodGet) - r.HandleFunc("/v1/urls", state.scheduleURL).Methods(http.MethodPost) - - return r -} - -func (state *State) searchResources(w http.ResponseWriter, r *http.Request) { - searchParams, err := getSearchParams(r) - if err != nil { - log.Err(err).Msg("error while getting search params") - w.WriteHeader(http.StatusBadRequest) - return - } - - totalCount, err := state.index.CountResources(searchParams) - if err != nil { - log.Err(err).Msg("error while counting on ES") - w.WriteHeader(http.StatusInternalServerError) - return - } - - res, err := state.index.SearchResources(searchParams) - if err != nil { - log.Err(err).Msg("error while searching on ES") - w.WriteHeader(http.StatusInternalServerError) - return - } - - var resources []client.ResourceDto - for _, r := range res { - resources = append(resources, client.ResourceDto{ - URL: r.URL, - Body: r.Body, - Title: r.Title, - Time: r.Time, - }) - } - - // Write pagination headers - writePagination(w, searchParams, totalCount) - - // Write body - writeJSON(w, resources) -} - -func (state *State) scheduleURL(w http.ResponseWriter, r *http.Request) { - var url string - if err := json.NewDecoder(r.Body).Decode(&url); err != nil { - log.Warn().Str("err", err.Error()).Msg("error while decoding request body") - w.WriteHeader(http.StatusUnprocessableEntity) - return - } - - if err := state.pub.PublishEvent(&event.FoundURLEvent{URL: url}); err != nil { - log.Err(err).Msg("unable to schedule URL") - w.WriteHeader(http.StatusInternalServerError) - return - } - - log.Info().Str("url", url).Msg("successfully scheduled URL") +func (state *State) HTTPHandler() http.Handler { + return nil } func (state *State) handleNewResourceEvent(subscriber event.Subscriber, msg event.RawMessage) error { @@ -178,288 +84,16 @@ func (state *State) handleNewResourceEvent(subscriber event.Subscriber, msg even return err } - // Extract & process resource - resDto, urls, err := extractResource(evt) - if err != nil { - return fmt.Errorf("error while extracting resource: %s", err) - } - - // Lowercase headers - resDto.Headers = map[string]string{} - for key, value := range evt.Headers { - resDto.Headers[strings.ToLower(key)] = value - } - - // Get current refresh delay - refreshDelay := time.Duration(-1) - if val, err := state.configClient.GetRefreshDelay(); err == nil { - refreshDelay = val.Delay - } - - // Save resource - if err := state.tryAddResource(resDto); err != nil { - return err - } - - log.Info().Str("url", evt.URL).Msg("Successfully indexed resource") - - // Finally push found URLs - for _, u := range urls { - // make sure url has not been published (yet) - count, err := state.urlCache.GetInt64(fmt.Sprintf("urls:%s", u)) - if err != nil && err != cache.ErrNIL { - log.Err(err). - Str("url", u). - Msg("error while checking URL cache") - continue - } - if count > 0 { - log.Trace(). - Str("url", u). - Msg("skipping already published URL") - continue - } - - // Update cache - ttl := refreshDelay - if refreshDelay == -1 { - ttl = cache.NoTTL - } - - if err := state.urlCache.SetInt64(fmt.Sprintf("urls:%s", u), count+1, ttl); err != nil { - log.Err(err).Msg("error while updating URL cache") - } - - if err := subscriber.PublishEvent(&event.FoundURLEvent{URL: u}); err != nil { - log.Warn(). - Str("url", u). - Str("err", err.Error()). - Msg("Error while publishing URL") - } - - log.Trace(). - Str("url", u). - Msg("Published found URL") - } - - return nil -} - -func (state *State) tryAddResource(res *client.ResourceDto) error { - forbiddenHostnames, err := state.configClient.GetForbiddenHostnames() - if err != nil { - return err - } - - u, err := url.Parse(res.URL) - if err != nil { - return err - } - // make sure hostname hasn't been flagged as forbidden - for _, hostname := range forbiddenHostnames { - if strings.Contains(u.Hostname(), hostname.Hostname) { - return errHostnameNotAllowed - } + if allowed, err := constraint.CheckHostnameAllowed(state.configClient, evt.URL); !allowed || err != nil { + return fmt.Errorf("%s %w", evt.URL, errHostnameNotAllowed) } - // Create Elasticsearch document - doc := index.ResourceIdx{ - URL: res.URL, - Body: res.Body, - Time: res.Time, - Title: res.Title, - Meta: res.Meta, - Description: res.Description, - Headers: res.Headers, + if err := state.index.IndexResource(evt.URL, evt.Time, evt.Body, evt.Headers); err != nil { + return fmt.Errorf("error while indexing resource: %s", err) } - if err := state.index.AddResource(doc); err != nil { - return err - } - - // Publish linked event - if err := state.pub.PublishEvent(&event.NewIndexEvent{ - URL: res.URL, - Body: res.Body, - Time: res.Time, - Title: res.Title, - Meta: res.Meta, - Description: res.Description, - Headers: res.Headers, - }); err != nil { - return err - } + log.Info().Str("url", evt.URL).Msg("Successfully indexed resource") return nil } - -func getSearchParams(r *http.Request) (*client.ResSearchParams, error) { - params := &client.ResSearchParams{} - - if param := r.URL.Query()["keyword"]; len(param) == 1 { - params.Keyword = param[0] - } - - if param := r.URL.Query()["with-body"]; len(param) == 1 { - params.WithBody = param[0] == "true" - } - - // extract raw query params (unescaped to keep + sign when parsing date) - rawQueryParams := getRawQueryParam(r.URL.RawQuery) - - if val, exist := rawQueryParams["start-date"]; exist { - d, err := time.Parse(time.RFC3339, val) - if err == nil { - params.StartDate = d - } else { - return nil, err - } - } - - if val, exist := rawQueryParams["end-date"]; exist { - d, err := time.Parse(time.RFC3339, val) - if err == nil { - params.EndDate = d - } else { - return nil, err - } - } - - // base64decode the URL - if param := r.URL.Query()["url"]; len(param) == 1 { - b, err := base64.URLEncoding.DecodeString(param[0]) - if err != nil { - return nil, err - } - params.URL = string(b) - } - - // Acquire pagination - page, size := getPagination(r) - params.PageNumber = page - params.PageSize = size - - return params, nil -} - -func writePagination(w http.ResponseWriter, searchParams *client.ResSearchParams, total int64) { - w.Header().Set(client.PaginationPageHeader, strconv.Itoa(searchParams.PageNumber)) - w.Header().Set(client.PaginationSizeHeader, strconv.Itoa(searchParams.PageSize)) - w.Header().Set(client.PaginationCountHeader, strconv.FormatInt(total, 10)) -} - -func getPagination(r *http.Request) (page int, size int) { - page = 1 - size = defaultPaginationSize - - // Get pagination page - if param := r.URL.Query()[client.PaginationPageQueryParam]; len(param) == 1 { - if val, err := strconv.Atoi(param[0]); err == nil { - page = val - } - } - - // Get pagination size - if param := r.URL.Query()[client.PaginationSizeQueryParam]; len(param) == 1 { - if val, err := strconv.Atoi(param[0]); err == nil { - size = val - } - } - - // Prevent too much results from being returned - if size > maxPaginationSize { - size = maxPaginationSize - } - - return page, size -} - -func getRawQueryParam(url string) map[string]string { - if url == "" { - return map[string]string{} - } - - val := map[string]string{} - parts := strings.Split(url, "&") - - for _, part := range parts { - p := strings.Split(part, "=") - val[p[0]] = p[1] - } - - return val -} - -func writeJSON(w http.ResponseWriter, body interface{}) { - b, err := json.Marshal(body) - if err != nil { - log.Err(err).Msg("error while serializing body") - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write(b) -} - -func extractResource(msg event.NewResourceEvent) (*client.ResourceDto, []string, error) { - doc, err := goquery.NewDocumentFromReader(strings.NewReader(msg.Body)) - if err != nil { - return nil, nil, err - } - - // Get resource title - title := doc.Find("title").First().Text() - - // Get meta values - meta := map[string]string{} - doc.Find("meta").Each(func(i int, s *goquery.Selection) { - name, _ := s.Attr("name") - value, _ := s.Attr("content") - - // if name is empty then try to lookup using property - if name == "" { - name, _ = s.Attr("property") - if name == "" { - return - } - } - - meta[strings.ToLower(name)] = value - }) - - // Extract & normalize URLs - xu := xurls.Strict() - urls := xu.FindAllString(msg.Body, -1) - - var normalizedURLS []string - - for _, u := range urls { - normalizedURL, err := normalizeURL(u) - if err != nil { - continue - } - - normalizedURLS = append(normalizedURLS, normalizedURL) - } - - return &client.ResourceDto{ - URL: msg.URL, - Body: msg.Body, - Time: msg.Time, - Title: title, - Meta: meta, - Description: meta["description"], - }, normalizedURLS, nil -} - -func normalizeURL(u string) (string, error) { - normalizedURL, err := purell.NormalizeURLString(u, purell.FlagsUsuallySafeGreedy| - purell.FlagRemoveDirectoryIndex|purell.FlagRemoveFragment|purell.FlagRemoveDuplicateSlashes) - if err != nil { - return "", fmt.Errorf("error while normalizing URL %s: %s", u, err) - } - - return normalizedURL, nil -} diff --git a/internal/indexer/indexer_test.go b/internal/indexer/indexer_test.go index 3a27e93..023f89d 100644 --- a/internal/indexer/indexer_test.go +++ b/internal/indexer/indexer_test.go @@ -1,325 +1,55 @@ package indexer import ( - "encoding/json" - "fmt" - "github.com/creekorful/trandoshan/internal/cache" - "github.com/creekorful/trandoshan/internal/cache_mock" + "errors" "github.com/creekorful/trandoshan/internal/configapi/client" "github.com/creekorful/trandoshan/internal/configapi/client_mock" "github.com/creekorful/trandoshan/internal/event" "github.com/creekorful/trandoshan/internal/event_mock" - client2 "github.com/creekorful/trandoshan/internal/indexer/client" - "github.com/creekorful/trandoshan/internal/indexer/index" "github.com/creekorful/trandoshan/internal/indexer/index_mock" + "github.com/creekorful/trandoshan/internal/process" + "github.com/creekorful/trandoshan/internal/process_mock" + "github.com/creekorful/trandoshan/internal/test" "github.com/golang/mock/gomock" - "net/http" - "net/http/httptest" - "strings" "testing" "time" ) -func TestWritePagination(t *testing.T) { - rec := httptest.NewRecorder() - searchParams := &client2.ResSearchParams{ - PageSize: 15, - PageNumber: 7, - } - total := int64(1200) - - writePagination(rec, searchParams, total) - - if rec.Header().Get(client2.PaginationPageHeader) != "7" { - t.Fail() - } - if rec.Header().Get(client2.PaginationSizeHeader) != "15" { - t.Fail() - } - if rec.Header().Get(client2.PaginationCountHeader) != "1200" { +func TestState_Name(t *testing.T) { + s := State{} + if s.Name() != "indexer" { t.Fail() } } -func TestReadPagination(t *testing.T) { - // valid params - req := httptest.NewRequest(http.MethodGet, "/index.php?pagination-page=1&pagination-size=10", nil) - if page, size := getPagination(req); page != 1 || size != 10 { - t.Errorf("wanted page: 1, size: 10 (got %d, %d)", page, size) - } - - // make sure invalid parameter are set as wanted - req = httptest.NewRequest(http.MethodGet, "/index.php?pagination-page=abcd&pagination-size=lol", nil) - if page, size := getPagination(req); page != 1 || size != defaultPaginationSize { - t.Errorf("wanted page: 1, size: %d (got %d, %d)", defaultPaginationSize, page, size) - } - - // make sure we prevent too much results from being returned - target := fmt.Sprintf("/index.php?pagination-page=10&pagination-size=%d", maxPaginationSize+1) - req = httptest.NewRequest(http.MethodGet, target, nil) - if page, size := getPagination(req); page != 10 || size != maxPaginationSize { - t.Errorf("wanted page: 10, size: %d (got %d, %d)", maxPaginationSize, page, size) - } - - // make sure no parameter we set to default - req = httptest.NewRequest(http.MethodGet, "/index.php", nil) - if page, size := getPagination(req); page != 1 || size != defaultPaginationSize { - t.Errorf("wanted page: 1, size: %d (got %d, %d)", defaultPaginationSize, page, size) - } +func TestState_CommonFlags(t *testing.T) { + s := State{} + test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag}) } -func TestGetSearchParams(t *testing.T) { - startDate := time.Now() - target := fmt.Sprintf("/resources?with-body=true&pagination-page=1&keyword=keyword&url=dXJs&start-date=%s", startDate.Format(time.RFC3339)) - - req := httptest.NewRequest(http.MethodPost, target, nil) - - params, err := getSearchParams(req) - if err != nil { - t.Errorf("error while parsing search params: %s", err) - t.FailNow() - } - - if !params.WithBody { - t.Errorf("wrong withBody: %v", params.WithBody) - } - if params.PageSize != 50 { - t.Errorf("wrong pagination-size: %d", params.PageSize) - } - if params.PageNumber != 1 { - t.Errorf("wrong pagination-page: %d", params.PageNumber) - } - if params.Keyword != "keyword" { - t.Errorf("wrong keyword: %s", params.Keyword) - } - if params.StartDate.Year() != startDate.Year() { - t.Errorf("wrong start-date (year)") - } - if params.StartDate.Month() != startDate.Month() { - t.Errorf("wrong start-date (month)") - } - if params.StartDate.Day() != startDate.Day() { - t.Errorf("wrong start-date (day)") - } - if params.StartDate.Hour() != startDate.Hour() { - t.Errorf("wrong start-date (hour)") - } - if params.StartDate.Minute() != startDate.Minute() { - t.Errorf("wrong start-date (minute)") - } - if params.StartDate.Second() != startDate.Second() { - t.Errorf("wrong start-date (second)") - } - if params.URL != "url" { - t.Errorf("wrong url: %s", params.URL) - } -} - -func TestScheduleURL(t *testing.T) { - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - - // The requests - req := httptest.NewRequest(http.MethodPost, "/v1/urls", strings.NewReader("\"https://google.onion\"")) - rec := httptest.NewRecorder() - - // Mocking status - pubMock := event_mock.NewMockPublisher(mockCtrl) - - s := State{pub: pubMock} - - pubMock.EXPECT().PublishEvent(&event.FoundURLEvent{URL: "https://google.onion"}).Return(nil) - - s.scheduleURL(rec, req) - - if rec.Code != http.StatusOK { - t.Fail() - } +func TestState_CustomFlags(t *testing.T) { + s := State{} + test.CheckProcessCustomFlags(t, &s, []string{"index-driver", "index-dest"}) } -func TestAddResource(t *testing.T) { - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - - body := client2.ResourceDto{ - URL: "https://example.onion", - Body: "TheBody", - Title: "Example", - Time: time.Time{}, - Meta: map[string]string{"content": "content-meta"}, - Description: "the description", - Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"}, - } - - indexMock := index_mock.NewMockIndex(mockCtrl) - configClientMock := client_mock.NewMockClient(mockCtrl) - pubMock := event_mock.NewMockPublisher(mockCtrl) - - indexMock.EXPECT().AddResource(index.ResourceIdx{ - URL: "https://example.onion", - Body: "TheBody", - Title: "Example", - Time: time.Time{}, - Meta: map[string]string{"content": "content-meta"}, - Description: "the description", - Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"}, - }) - - pubMock.EXPECT().PublishEvent(&event.NewIndexEvent{ - URL: "https://example.onion", - Body: "TheBody", - Title: "Example", - Time: time.Time{}, - Meta: map[string]string{"content": "content-meta"}, - Description: "the description", - Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"}, +func TestState_Initialize(t *testing.T) { + s := State{} + test.CheckInitialize(t, &s, func(p *process_mock.MockProviderMockRecorder) { + p.GetValue("index-driver").Return("local") + p.GetValue("index-dest") + p.ConfigClient([]string{client.ForbiddenHostnamesKey}) }) - configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{}, nil) - - s := State{index: indexMock, configClient: configClientMock, pub: pubMock} - if err := s.tryAddResource(&body); err != nil { - t.FailNow() - } -} - -func TestAddResourceForbiddenHostname(t *testing.T) { - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - - body := client2.ResourceDto{ - URL: "https://example.onion", - Body: "TheBody", - Title: "Example", - Time: time.Time{}, - Meta: map[string]string{"content": "content-meta"}, - Description: "the description", - Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"}, - } - - configClientMock := client_mock.NewMockClient(mockCtrl) - - configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{{Hostname: "example.onion"}}, nil) - - s := State{configClient: configClientMock} - - if err := s.tryAddResource(&body); err != errHostnameNotAllowed { - t.FailNow() - } -} - -func TestSearchResources(t *testing.T) { - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - - // The requests - req := httptest.NewRequest(http.MethodPost, "/v1/resources?keyword=example", nil) - rec := httptest.NewRecorder() - - indexMock := index_mock.NewMockIndex(mockCtrl) - - indexMock.EXPECT().CountResources(gomock.Any()).Return(int64(150), nil) - indexMock.EXPECT().SearchResources(gomock.Any()).Return([]index.ResourceIdx{ - { - URL: "example-1.onion", - Body: "Example 1", - Title: "Example 1", - Time: time.Time{}, - }, - { - URL: "example-2.onion", - Body: "Example 2", - Title: "Example 2", - Time: time.Time{}, - }, - }, nil) - - s := State{index: indexMock} - s.searchResources(rec, req) - - if rec.Code != http.StatusOK { - t.Fail() - } - if rec.Header().Get("Content-Type") != "application/json" { - t.Fail() - } - if rec.Header().Get(client2.PaginationCountHeader) != "150" { - t.Fail() - } - - var resources []client2.ResourceDto - if err := json.NewDecoder(rec.Body).Decode(&resources); err != nil { - t.Fatalf("error while decoding body: %s", err) - } - if len(resources) != 2 { - t.Errorf("got %d resources want 2", len(resources)) - } -} - -func TestExtractResource(t *testing.T) { - body := ` -Creekorful Inc - -This is sparta - - - - - -` - - msg := event.NewResourceEvent{ - URL: "https://example.org/300", - Body: body, - } - - resDto, urls, err := extractResource(msg) - if err != nil { - t.FailNow() - } - - if resDto.URL != "https://example.org/300" { - t.Fail() - } - if resDto.Title != "Creekorful Inc" { - t.Fail() - } - if resDto.Body != msg.Body { - t.Fail() - } - - if len(urls) != 2 { - t.FailNow() - } - if urls[0] != "https://google.com/test?test=test" { - t.Fail() - } - if urls[1] != "https://example.org" { - t.Fail() - } - - if resDto.Description != "Zhello world" { - t.Fail() - } - - if resDto.Meta["description"] != "Zhello world" { - t.Fail() - } - - if resDto.Meta["og:url"] != "https://example.org" { - t.Fail() + if s.indexDriver != "local" { + t.Errorf("wrong driver: got: %s want: %s", s.indexDriver, "local") } } -func TestNormalizeURL(t *testing.T) { - url, err := normalizeURL("https://this-is-sparta.de?url=url-query-param#fragment-23") - if err != nil { - t.FailNow() - } - - if url != "https://this-is-sparta.de?url=url-query-param" { - t.Fail() - } +func TestState_Subscribers(t *testing.T) { + s := State{indexDriver: "elastic"} + test.CheckProcessSubscribers(t, &s, []test.SubscriberDef{ + {Queue: "elasticIndexingQueue", Exchange: "resource.new"}, + }) } func TestHandleNewResourceEvent(t *testing.T) { @@ -341,8 +71,6 @@ Thanks to https://help.facebook.onion/ for the hosting :D subscriberMock := event_mock.NewMockSubscriber(mockCtrl) configClientMock := client_mock.NewMockClient(mockCtrl) indexMock := index_mock.NewMockIndex(mockCtrl) - pubMock := event_mock.NewMockPublisher(mockCtrl) - urlCacheMock := cache_mock.NewMockCache(mockCtrl) tn := time.Now() @@ -357,49 +85,9 @@ Thanks to https://help.facebook.onion/ for the hosting :D }).Return(nil) configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{{Hostname: "example2.onion"}}, nil) - configClientMock.EXPECT().GetRefreshDelay().Times(1).Return(client.RefreshDelay{Delay: -1}, nil) - - // make sure we are creating the resource - indexMock.EXPECT().AddResource(index.ResourceIdx{ - URL: "https://example.onion", - Body: body, - Title: "Creekorful Inc", - Meta: map[string]string{"description": "Zhello world", "og:url": "https://example.org"}, - Description: "Zhello world", - Headers: map[string]string{"server": "Traefik", "content-type": "application/html"}, - Time: tn, - }).Return(nil) - - pubMock.EXPECT().PublishEvent(&event.NewIndexEvent{ - URL: "https://example.onion", - Body: body, - Title: "Creekorful Inc", - Meta: map[string]string{"description": "Zhello world", "og:url": "https://example.org"}, - Description: "Zhello world", - Headers: map[string]string{"server": "Traefik", "content-type": "application/html"}, - Time: tn, - }).Return(nil) - - // make sure we are pushing found URLs (but only if refresh delay elapsed) - urlCacheMock.EXPECT().GetInt64("urls:https://example.org").Return(int64(0), cache.ErrNIL) - - urlCacheMock.EXPECT().GetInt64("urls:https://example.org").Return(int64(1), nil) - - urlCacheMock.EXPECT().GetInt64("urls:https://help.facebook.onion").Return(int64(1), nil) - - urlCacheMock.EXPECT().GetInt64("urls:https://google.com/test?test=test").Return(int64(0), cache.ErrNIL) - - subscriberMock.EXPECT(). - PublishEvent(&event.FoundURLEvent{URL: "https://example.org"}). - Return(nil) - urlCacheMock.EXPECT().SetInt64("urls:https://example.org", int64(1), cache.NoTTL).Return(nil) - - subscriberMock.EXPECT(). - PublishEvent(&event.FoundURLEvent{URL: "https://google.com/test?test=test"}). - Return(nil) - urlCacheMock.EXPECT().SetInt64("urls:https://google.com/test?test=test", int64(1), cache.NoTTL).Return(nil) + indexMock.EXPECT().IndexResource("https://example.onion", tn, body, map[string]string{"Server": "Traefik", "Content-Type": "application/html"}) - s := State{index: indexMock, configClient: configClientMock, pub: pubMock, urlCache: urlCacheMock} + s := State{index: indexMock, configClient: configClientMock} if err := s.handleNewResourceEvent(subscriberMock, msg); err != nil { t.FailNow() } @@ -434,11 +122,10 @@ This is sparta (hosted on https://example.org) Time: tn, }).Return(nil) - configClientMock.EXPECT().GetRefreshDelay().Return(client.RefreshDelay{Delay: -1}, nil) configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{{Hostname: "example.onion"}}, nil) s := State{configClient: configClientMock} - if err := s.handleNewResourceEvent(subscriberMock, msg); err != errHostnameNotAllowed { + if err := s.handleNewResourceEvent(subscriberMock, msg); !errors.Is(err, errHostnameNotAllowed) { t.FailNow() } } diff --git a/internal/logging/log.go b/internal/logging/log.go deleted file mode 100644 index 76a0e20..0000000 --- a/internal/logging/log.go +++ /dev/null @@ -1,31 +0,0 @@ -package logging - -import ( - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" - "github.com/urfave/cli/v2" - "os" -) - -// GetLogFlag return the CLI flag parameter used to setup application log level -func GetLogFlag() *cli.StringFlag { - return &cli.StringFlag{ - Name: "log-level", - Usage: "Set the application log level", - Value: "info", - } -} - -// ConfigureLogger configure the logger using given log level (read from cli context) -func ConfigureLogger(ctx *cli.Context) { - log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) - - // Set application log level - if lvl, err := zerolog.ParseLevel(ctx.String("log-level")); err == nil { - zerolog.SetGlobalLevel(lvl) - } else { - zerolog.SetGlobalLevel(zerolog.InfoLevel) - } - - log.Debug().Stringer("lvl", zerolog.GlobalLevel()).Msg("Setting log level") -} diff --git a/internal/logging/log_test.go b/internal/logging/log_test.go deleted file mode 100644 index 8e09e9e..0000000 --- a/internal/logging/log_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package logging - -import ( - "testing" -) - -func TestGetLogFlag(t *testing.T) { - flag := GetLogFlag() - if flag.Name != "log-level" { - t.Fail() - } - if flag.Usage != "Set the application log level" { - t.Fail() - } - if flag.Value != "info" { - t.Fail() - } -} diff --git a/internal/process/process.go b/internal/process/process.go index efc3eb2..0344e53 100644 --- a/internal/process/process.go +++ b/internal/process/process.go @@ -1,16 +1,21 @@ package process +//go:generate mockgen -destination=../process_mock/process_mock.go -package=process_mock . Provider + import ( "context" + "crypto/tls" "fmt" "github.com/creekorful/trandoshan/internal/cache" "github.com/creekorful/trandoshan/internal/clock" configapi "github.com/creekorful/trandoshan/internal/configapi/client" "github.com/creekorful/trandoshan/internal/event" - "github.com/creekorful/trandoshan/internal/indexer/client" - "github.com/creekorful/trandoshan/internal/logging" + chttp "github.com/creekorful/trandoshan/internal/http" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" + "github.com/valyala/fasthttp" + "github.com/valyala/fasthttp/fasthttpproxy" "net/http" "os" "os/signal" @@ -30,6 +35,10 @@ const ( ConfigAPIURIFlag = "config-api-uri" // RedisURIFlag is the redis-uri flag RedisURIFlag = "redis-uri" + // TorURIFlag is the tor-uri flag + TorURIFlag = "tor-uri" + // UserAgentFlag is the user-agent flag + UserAgentFlag = "user-agent" ) // Provider is the implementation provider @@ -38,14 +47,14 @@ type Provider interface { Clock() (clock.Clock, error) // ConfigClient return a new configured configapi.Client ConfigClient(keys []string) (configapi.Client, error) - // IndexerClient return a new configured indexer client - IndexerClient() (client.Client, error) // Subscriber return a new configured subscriber Subscriber() (event.Subscriber, error) // Publisher return a new configured publisher Publisher() (event.Publisher, error) // Cache return a new configured cache - Cache() (cache.Cache, error) + Cache(keyPrefix string) (cache.Cache, error) + // HTTPClient return a new configured http client + HTTPClient() (chttp.Client, error) // GetValue return value for given key GetValue(key string) string // GetValue return values for given key @@ -74,10 +83,6 @@ func (p *defaultProvider) ConfigClient(keys []string) (configapi.Client, error) return configapi.NewConfigClient(p.ctx.String(ConfigAPIURIFlag), sub, keys) } -func (p *defaultProvider) IndexerClient() (client.Client, error) { - return client.NewClient(p.ctx.String(APIURIFlag), p.ctx.String(APITokenFlag)), nil -} - func (p *defaultProvider) Subscriber() (event.Subscriber, error) { return event.NewSubscriber(p.ctx.String(HubURIFlag)) } @@ -86,8 +91,20 @@ func (p *defaultProvider) Publisher() (event.Publisher, error) { return event.NewPublisher(p.ctx.String(HubURIFlag)) } -func (p *defaultProvider) Cache() (cache.Cache, error) { - return cache.NewRedisCache(p.ctx.String(RedisURIFlag)) +func (p *defaultProvider) Cache(keyPrefix string) (cache.Cache, error) { + return cache.NewRedisCache(p.ctx.String(RedisURIFlag), keyPrefix) +} + +func (p *defaultProvider) HTTPClient() (chttp.Client, error) { + return chttp.NewFastHTTPClient(&fasthttp.Client{ + // Use given TOR proxy to reach the hidden services + Dial: fasthttpproxy.FasthttpSocksDialer(p.ctx.String(TorURIFlag)), + // Disable SSL verification since we do not really care about this + TLSConfig: &tls.Config{InsecureSkipVerify: true}, + ReadTimeout: time.Second * 5, + WriteTimeout: time.Second * 5, + Name: p.ctx.String(UserAgentFlag), + }), nil } func (p *defaultProvider) GetValue(key string) string { @@ -112,7 +129,7 @@ type Process interface { CustomFlags() []cli.Flag Initialize(provider Provider) error Subscribers() []SubscriberDef - HTTPHandler(provider Provider) http.Handler + HTTPHandler() http.Handler } // MakeApp return cli.App corresponding for given Process @@ -122,7 +139,11 @@ func MakeApp(process Process) *cli.App { Version: version, Usage: fmt.Sprintf("Trandoshan %s component", process.Name()), Flags: []cli.Flag{ - logging.GetLogFlag(), + &cli.StringFlag{ + Name: "log-level", + Usage: "Set the application log level", + Value: "info", + }, }, Action: execute(process), } @@ -148,7 +169,7 @@ func execute(process Process) cli.ActionFunc { provider := NewDefaultProvider(c) // Common setup - logging.ConfigureLogger(c) + configureLogger(c) // Custom setup if err := process.Initialize(provider); err != nil { @@ -178,7 +199,7 @@ func execute(process Process) cli.ActionFunc { var srv *http.Server // Expose HTTP API if any - if h := process.HTTPHandler(provider); h != nil { + if h := process.HTTPHandler(); h != nil { srv = &http.Server{ Addr: "0.0.0.0:8080", // Good practice to set timeouts to avoid Slowloris attacks. @@ -243,6 +264,29 @@ func getCustomFlags() map[string]cli.Flag { Usage: "URI to the Redis server", Required: true, } + flags[TorURIFlag] = &cli.StringFlag{ + Name: TorURIFlag, + Usage: "URI to the TOR SOCKS proxy", + Required: true, + } + flags[UserAgentFlag] = &cli.StringFlag{ + Name: UserAgentFlag, + Usage: "User agent to use", + Value: "Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0", + } return flags } + +func configureLogger(ctx *cli.Context) { + log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) + + // Set application log level + if lvl, err := zerolog.ParseLevel(ctx.String("log-level")); err == nil { + zerolog.SetGlobalLevel(lvl) + } else { + zerolog.SetGlobalLevel(zerolog.InfoLevel) + } + + log.Debug().Stringer("lvl", zerolog.GlobalLevel()).Msg("Setting log level") +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 691a815..9372711 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -3,12 +3,15 @@ package scheduler import ( "errors" "fmt" + "github.com/PuerkitoBio/purell" + "github.com/creekorful/trandoshan/internal/cache" configapi "github.com/creekorful/trandoshan/internal/configapi/client" "github.com/creekorful/trandoshan/internal/constraint" "github.com/creekorful/trandoshan/internal/event" "github.com/creekorful/trandoshan/internal/process" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" + "mvdan.cc/xurls/v2" "net/http" "net/url" "regexp" @@ -20,6 +23,7 @@ var ( errProtocolNotAllowed = errors.New("protocol is not allowed") errExtensionNotAllowed = errors.New("extension is not allowed") errHostnameNotAllowed = errors.New("hostname is not allowed") + errAlreadyScheduled = errors.New("URL is already scheduled") extensionRegex = regexp.MustCompile("\\.[\\w]+") ) @@ -27,6 +31,7 @@ var ( // State represent the application state type State struct { configClient configapi.Client + urlCache cache.Cache } // Name return the process name @@ -36,7 +41,7 @@ func (state *State) Name() string { // CommonFlags return process common flags func (state *State) CommonFlags() []string { - return []string{process.HubURIFlag, process.ConfigAPIURIFlag} + return []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag} } // CustomFlags return process custom flags @@ -46,37 +51,58 @@ func (state *State) CustomFlags() []cli.Flag { // Initialize the process func (state *State) Initialize(provider process.Provider) error { - keys := []string{configapi.AllowedMimeTypesKey, configapi.ForbiddenHostnamesKey} + keys := []string{configapi.AllowedMimeTypesKey, configapi.ForbiddenHostnamesKey, configapi.RefreshDelayKey} configClient, err := provider.ConfigClient(keys) if err != nil { return err } state.configClient = configClient + urlCache, err := provider.Cache("url") + if err != nil { + return err + } + state.urlCache = urlCache + return nil } // Subscribers return the process subscribers func (state *State) Subscribers() []process.SubscriberDef { return []process.SubscriberDef{ - {Exchange: event.FoundURLExchange, Queue: "schedulingQueue", Handler: state.handleURLFoundEvent}, + {Exchange: event.NewResourceExchange, Queue: "schedulingQueue", Handler: state.handleNewResourceEvent}, } } // HTTPHandler returns the HTTP API the process expose -func (state *State) HTTPHandler(provider process.Provider) http.Handler { +func (state *State) HTTPHandler() http.Handler { return nil } -func (state *State) handleURLFoundEvent(subscriber event.Subscriber, msg event.RawMessage) error { - var evt event.FoundURLEvent +func (state *State) handleNewResourceEvent(subscriber event.Subscriber, msg event.RawMessage) error { + var evt event.NewResourceEvent if err := subscriber.Read(&msg, &evt); err != nil { return err } - log.Trace().Str("url", evt.URL).Msg("Processing URL") + log.Trace().Str("url", evt.URL).Msg("Processing new resource") + + urls, err := extractURLS(&evt) + if err != nil { + return fmt.Errorf("error while extracting URLs") + } + + for _, u := range urls { + if err := state.processURL(u, subscriber); err != nil { + log.Err(err).Msg("error while processing URL") + } + } + + return nil +} - u, err := url.Parse(evt.URL) +func (state *State) processURL(rawURL string, pub event.Publisher) error { + u, err := url.Parse(rawURL) if err != nil { return fmt.Errorf("error while parsing URL: %s", err) } @@ -123,18 +149,71 @@ func (state *State) handleURLFoundEvent(subscriber event.Subscriber, msg event.R } // Make sure hostname is not forbidden - if allowed, err := constraint.CheckHostnameAllowed(state.configClient, evt.URL); err != nil { + if allowed, err := constraint.CheckHostnameAllowed(state.configClient, rawURL); err != nil { return err } else if !allowed { - log.Debug().Str("url", evt.URL).Msg("Skipping forbidden hostname") + log.Debug().Str("url", rawURL).Msg("Skipping forbidden hostname") return fmt.Errorf("%s %w", u, errHostnameNotAllowed) } + // Check if URL should be scheduled + count, err := state.urlCache.GetInt64(rawURL) + if err != nil && err != cache.ErrNIL { + return err + } + if count > 0 { + return fmt.Errorf("%s %w", u, errAlreadyScheduled) + } + log.Debug().Stringer("url", u).Msg("URL should be scheduled") - if err := subscriber.PublishEvent(&event.NewURLEvent{URL: evt.URL}); err != nil { + // Update URL cache + delay, err := state.configClient.GetRefreshDelay() + if err != nil { + return err + } + + ttl := delay.Delay + if ttl == -1 { + ttl = cache.NoTTL + } + + if err := state.urlCache.SetInt64(rawURL, count+1, ttl); err != nil { + return err + } + + if err := pub.PublishEvent(&event.NewURLEvent{URL: rawURL}); err != nil { return fmt.Errorf("error while publishing URL: %s", err) } return nil } + +func extractURLS(msg *event.NewResourceEvent) ([]string, error) { + // Extract & normalize URLs + xu := xurls.Strict() + urls := xu.FindAllString(msg.Body, -1) + + var normalizedURLS []string + + for _, u := range urls { + normalizedURL, err := normalizeURL(u) + if err != nil { + continue + } + + normalizedURLS = append(normalizedURLS, normalizedURL) + } + + return normalizedURLS, nil +} + +func normalizeURL(u string) (string, error) { + normalizedURL, err := purell.NormalizeURLString(u, purell.FlagsUsuallySafeGreedy| + purell.FlagRemoveDirectoryIndex|purell.FlagRemoveFragment|purell.FlagRemoveDuplicateSlashes) + if err != nil { + return "", fmt.Errorf("error while normalizing URL %s: %s", u, err) + } + + return normalizedURL, nil +} diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index f5a731c..aca19bd 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -2,109 +2,122 @@ package scheduler import ( "errors" - "fmt" + "github.com/creekorful/trandoshan/internal/cache" + "github.com/creekorful/trandoshan/internal/cache_mock" "github.com/creekorful/trandoshan/internal/configapi/client" "github.com/creekorful/trandoshan/internal/configapi/client_mock" "github.com/creekorful/trandoshan/internal/event" "github.com/creekorful/trandoshan/internal/event_mock" + "github.com/creekorful/trandoshan/internal/process" + "github.com/creekorful/trandoshan/internal/process_mock" + "github.com/creekorful/trandoshan/internal/test" "github.com/golang/mock/gomock" "testing" + "time" ) -func TestHandleMessageNotOnion(t *testing.T) { - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() +func TestState_Name(t *testing.T) { + s := State{} + if s.Name() != "scheduler" { + t.Fail() + } +} - subscriberMock := event_mock.NewMockSubscriber(mockCtrl) - configClientMock := client_mock.NewMockClient(mockCtrl) +func TestState_CommonFlags(t *testing.T) { + s := State{} + test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag}) +} - urls := []string{"https://example.org", "https://pastebin.onionsearchengine.com"} +func TestState_CustomFlags(t *testing.T) { + s := State{} + test.CheckProcessCustomFlags(t, &s, nil) +} - for _, url := range urls { - msg := event.RawMessage{} - subscriberMock.EXPECT(). - Read(&msg, &event.FoundURLEvent{}). - SetArg(1, event.FoundURLEvent{URL: url}). - Return(nil) - - s := State{ - configClient: configClientMock, - } +func TestState_Initialize(t *testing.T) { + test.CheckInitialize(t, &State{}, func(p *process_mock.MockProviderMockRecorder) { + p.Cache("url") + p.ConfigClient([]string{client.AllowedMimeTypesKey, client.ForbiddenHostnamesKey, client.RefreshDelayKey}) + }) +} - if err := s.handleURLFoundEvent(subscriberMock, msg); !errors.Is(err, errNotOnionHostname) { - t.FailNow() - } +func TestState_Subscribers(t *testing.T) { + s := State{} + test.CheckProcessSubscribers(t, &s, []test.SubscriberDef{ + {Queue: "schedulingQueue", Exchange: "resource.new"}, + }) +} + +func TestNormalizeURL(t *testing.T) { + url, err := normalizeURL("https://this-is-sparta.de?url=url-query-param#fragment-23") + if err != nil { + t.FailNow() + } + + if url != "https://this-is-sparta.de?url=url-query-param" { + t.Fail() } } -func TestHandleMessageWrongProtocol(t *testing.T) { +func TestProcessURL_NotDotOnion(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - subscriberMock := event_mock.NewMockSubscriber(mockCtrl) - configClientMock := client_mock.NewMockClient(mockCtrl) - - msg := event.RawMessage{} + urls := []string{"https://example.org", "https://pastebin.onionsearchengine.com"} - s := State{ - configClient: configClientMock, + for _, url := range urls { + state := State{} + if err := state.processURL(url, nil); !errors.Is(err, errNotOnionHostname) { + t.Fail() + } } +} - for _, protocol := range []string{"irc", "ftp"} { - subscriberMock.EXPECT(). - Read(&msg, &event.FoundURLEvent{}). - SetArg(1, event.FoundURLEvent{URL: fmt.Sprintf("%s://example.onion", protocol)}). - Return(nil) +func TestProcessURL_ProtocolForbidden(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + urls := []string{"ftp://example.onion", "irc://example.onion"} - if err := s.handleURLFoundEvent(subscriberMock, msg); !errors.Is(err, errProtocolNotAllowed) { - t.FailNow() + for _, url := range urls { + state := State{} + if err := state.processURL(url, nil); !errors.Is(err, errProtocolNotAllowed) { + t.Fail() } } } -func TestHandleMessageForbiddenExtensions(t *testing.T) { +func TestProcessURL_ExtensionForbidden(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - subscriberMock := event_mock.NewMockSubscriber(mockCtrl) configClientMock := client_mock.NewMockClient(mockCtrl) urls := []string{"https://example.onion/image.PNG?id=12&test=2", "https://example.onion/favicon.ico"} for _, url := range urls { - msg := event.RawMessage{} - subscriberMock.EXPECT(). - Read(&msg, &event.FoundURLEvent{}). - SetArg(1, event.FoundURLEvent{URL: url}). - Return(nil) - - configClientMock.EXPECT().GetAllowedMimeTypes().Return([]client.MimeType{{Extensions: []string{"php", "html"}}}, nil) - - s := State{ - configClient: configClientMock, - } + configClientMock.EXPECT().GetAllowedMimeTypes().Return([]client.MimeType{{Extensions: []string{"html", "php"}}}, nil) - if err := s.handleURLFoundEvent(subscriberMock, msg); !errors.Is(err, errExtensionNotAllowed) { - t.FailNow() + state := State{configClient: configClientMock} + if err := state.processURL(url, nil); !errors.Is(err, errExtensionNotAllowed) { + t.Fail() } } } -func TestHandleMessageHostnameForbidden(t *testing.T) { +func TestProcessURL_HostnameForbidden(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - subscriberMock := event_mock.NewMockSubscriber(mockCtrl) configClientMock := client_mock.NewMockClient(mockCtrl) - type test struct { + type testDef struct { url string forbiddenHostnames []client.ForbiddenHostname } - tests := []test{ + tests := []testDef{ { - url: "https://facebookcorewwwi.onion/image.png?id=12&test=2", + url: "https://facebookcorewwwi.onion/login.html?id=12&test=2", forbiddenHostnames: []client.ForbiddenHostname{{Hostname: "facebookcorewwwi.onion"}}, }, { @@ -121,56 +134,57 @@ func TestHandleMessageHostnameForbidden(t *testing.T) { }, } - for _, test := range tests { - msg := event.RawMessage{} - subscriberMock.EXPECT(). - Read(&msg, &event.FoundURLEvent{}). - SetArg(1, event.FoundURLEvent{URL: test.url}). - Return(nil) - - configClientMock.EXPECT().GetAllowedMimeTypes().Return([]client.MimeType{{Extensions: []string{"png", "php"}}}, nil) - configClientMock.EXPECT().GetForbiddenHostnames().Return(test.forbiddenHostnames, nil) + for _, tst := range tests { + configClientMock.EXPECT().GetAllowedMimeTypes().Return([]client.MimeType{{Extensions: []string{"html", "php"}}}, nil) + configClientMock.EXPECT().GetForbiddenHostnames().Return(tst.forbiddenHostnames, nil) - s := State{ - configClient: configClientMock, + state := State{configClient: configClientMock} + if err := state.processURL(tst.url, nil); !errors.Is(err, errHostnameNotAllowed) { + t.Fail() } + } +} - if err := s.handleURLFoundEvent(subscriberMock, msg); !errors.Is(err, errHostnameNotAllowed) { - t.Errorf("%s has not returned errHostnameNotAllowed", test.url) - } +func TestProcessURL_AlreadyScheduled(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + urlCacheMock := cache_mock.NewMockCache(mockCtrl) + configClientMock := client_mock.NewMockClient(mockCtrl) + + urlCacheMock.EXPECT().GetInt64("https://facebookcorewwi.onion/test.php?id=12").Return(int64(1), nil) + configClientMock.EXPECT().GetAllowedMimeTypes().Return([]client.MimeType{{Extensions: []string{"html", "php"}}}, nil) + configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{}, nil) + + state := State{urlCache: urlCacheMock, configClient: configClientMock} + if err := state.processURL("https://facebookcorewwi.onion/test.php?id=12", nil); !errors.Is(err, errAlreadyScheduled) { + t.Fail() } } -func TestHandleMessage(t *testing.T) { +func TestHandleNewResourceEvent(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - subscriberMock := event_mock.NewMockSubscriber(mockCtrl) + urlCacheMock := cache_mock.NewMockCache(mockCtrl) configClientMock := client_mock.NewMockClient(mockCtrl) + pubMock := event_mock.NewMockPublisher(mockCtrl) urls := []string{"https://example.onion/index.php", "http://google.onion/admin.secret/login.html", "https://example.onion", "https://www.facebookcorewwwi.onion/recover.now/initiate?ars=facebook_login"} - for _, u := range urls { - msg := event.RawMessage{} - subscriberMock.EXPECT(). - Read(&msg, &event.FoundURLEvent{}). - SetArg(1, event.FoundURLEvent{URL: u}). - Return(nil) - - subscriberMock.EXPECT(). - PublishEvent(&event.NewURLEvent{URL: u}). - Return(nil) - + for _, url := range urls { + urlCacheMock.EXPECT().GetInt64(url).Return(int64(0), cache.ErrNIL) configClientMock.EXPECT().GetAllowedMimeTypes().Return([]client.MimeType{{Extensions: []string{"html", "php"}}}, nil) configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{}, nil) + configClientMock.EXPECT().GetRefreshDelay().Return(client.RefreshDelay{Delay: 10 * time.Hour}, nil) - s := State{ - configClient: configClientMock, - } + urlCacheMock.EXPECT().SetInt64(url, int64(1), time.Duration(10*time.Hour)).Return(nil) + pubMock.EXPECT().PublishEvent(&event.NewURLEvent{URL: url}).Return(nil) - if err := s.handleURLFoundEvent(subscriberMock, msg); err != nil { - t.FailNow() + state := State{urlCache: urlCacheMock, configClient: configClientMock} + if err := state.processURL(url, pubMock); err != nil { + t.Fail() } } } diff --git a/internal/test/process.go b/internal/test/process.go new file mode 100644 index 0000000..93ff18f --- /dev/null +++ b/internal/test/process.go @@ -0,0 +1,68 @@ +package test + +import ( + "github.com/creekorful/trandoshan/internal/process" + "github.com/creekorful/trandoshan/internal/process_mock" + "github.com/golang/mock/gomock" + "reflect" + "testing" +) + +// SubscriberDef is use to test subscriber definition +type SubscriberDef struct { + Queue string + Exchange string +} + +// CheckProcessCommonFlags check process defined common flags +func CheckProcessCommonFlags(t *testing.T, p process.Process, wantFlags []string) { + if !checkListEquals(p.CommonFlags(), wantFlags) { + t.Errorf("Differents flags: %v %v", p.CommonFlags(), wantFlags) + } +} + +// CheckProcessCustomFlags check process defined custom flags +func CheckProcessCustomFlags(t *testing.T, p process.Process, wantFlags []string) { + var names []string + for _, customFlag := range p.CustomFlags() { + names = append(names, customFlag.Names()[0]) + } + + if !checkListEquals(names, wantFlags) { + t.Errorf("Differents flags: %v %v", names, wantFlags) + } +} + +// CheckInitialize check process initialization phase +func CheckInitialize(t *testing.T, p process.Process, callback func(provider *process_mock.MockProviderMockRecorder)) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + providerMock := process_mock.NewMockProvider(mockCtrl) + callback(providerMock.EXPECT()) + + if err := p.Initialize(providerMock); err != nil { + t.Errorf("Error while Initializing process: %s", err) + } +} + +// CheckProcessSubscribers check process defined subscribers +func CheckProcessSubscribers(t *testing.T, p process.Process, subscribers []SubscriberDef) { + var defs []SubscriberDef + for _, sub := range p.Subscribers() { + defs = append(defs, SubscriberDef{ + Queue: sub.Queue, + Exchange: sub.Exchange, + }) + } + + if !reflect.DeepEqual(defs, subscribers) { + t.Errorf("Differents subscribers: %v %v", defs, subscribers) + } +} + +// TODO HTTPHandler + +func checkListEquals(a []string, b []string) bool { + return reflect.DeepEqual(a, b) +} diff --git a/internal/trandoshanctl/trandoshanctl.go b/internal/trandoshanctl/trandoshanctl.go deleted file mode 100644 index f14478a..0000000 --- a/internal/trandoshanctl/trandoshanctl.go +++ /dev/null @@ -1,119 +0,0 @@ -package trandoshanctl - -import ( - "fmt" - "github.com/creekorful/trandoshan/internal/indexer/client" - "github.com/creekorful/trandoshan/internal/logging" - "github.com/olekukonko/tablewriter" - "github.com/rs/zerolog/log" - "github.com/urfave/cli/v2" - "os" - "time" -) - -// GetApp returns the Trandoshan CLI app -func GetApp() *cli.App { - return &cli.App{ - Name: "trandoshanctl", - Version: "0.9.0", - Usage: "Trandoshan CLI", - Flags: []cli.Flag{ - logging.GetLogFlag(), - &cli.StringFlag{ - Name: "api-uri", - Usage: "URI to the API server", - Value: "http://localhost:15005", - Required: false, - }, - &cli.StringFlag{ - Name: "api-token", - Usage: "Token to use to authenticate against the API", - Required: true, - }, - }, - Commands: []*cli.Command{ - { - Name: "schedule", - Usage: "Schedule crawling for given URL", - Action: schedule, - ArgsUsage: "URL", - }, - { - Name: "search", - Usage: "Search for specific resources", - ArgsUsage: "keyword", - Action: search, - }, - }, - Before: before, - } -} - -func before(ctx *cli.Context) error { - logging.ConfigureLogger(ctx) - return nil -} - -func schedule(c *cli.Context) error { - if c.NArg() == 0 { - return fmt.Errorf("missing argument URL") - } - - url := c.Args().First() - - // Create the API client - apiClient := client.NewClient(c.String("api-uri"), c.String("api-token")) - - if err := apiClient.ScheduleURL(url); err != nil { - log.Err(err).Str("url", url).Msg("Unable to schedule crawling for URL") - return err - } - - log.Info().Str("url", url).Msg("Successfully schedule crawling") - - return nil -} - -func search(c *cli.Context) error { - keyword := c.Args().First() - - // Create the API client - apiClient := client.NewClient(c.String("api-uri"), c.String("api-token")) - - params := client.ResSearchParams{ - Keyword: keyword, - WithBody: false, - PageSize: 1, - PageNumber: 10, - } - res, count, err := apiClient.SearchResources(¶ms) - if err != nil { - log.Err(err).Str("keyword", keyword).Msg("Unable to search resources") - return err - } - - if len(res) == 0 { - fmt.Println("No resources crawled (yet).") - } - - table := tablewriter.NewWriter(os.Stdout) - table.SetHeader([]string{"Time", "URL", "Title"}) - - for _, v := range res { - table.Append([]string{v.Time.Format(time.RFC822), shortenURL(v.URL), v.Title}) - } - table.Render() - - fmt.Printf("Total: %d\n", count) - - return nil -} - -func shortenURL(url string) string { - if len(url) > 125 { - url := url[0:125] - return url + "..." - } - - return url -} diff --git a/scripts/blacklist-hostnames.py b/scripts/blacklist-hostnames.py new file mode 100755 index 0000000..f6ddc3a --- /dev/null +++ b/scripts/blacklist-hostnames.py @@ -0,0 +1,56 @@ +import json +import sys +from typing import List + +import requests + +# This script is used to import list of hostnames to 'blacklist' +# it will pull hostnames from the CT log source (see url variable) & custom define ones +# and blacklist them to prevent useless crawling + +url = "https://raw.githubusercontent.com/alecmuffett/real-world-onion-sites/master/ct-log.txt" +custom_hostnames = [ + 'gamebombfak3pwnh.onion', # gaming forum, lot of noise + 'metagerv65pwclop2rsfzg4jwowpavpwd6grhhlvdgsswvo6ii4akgyd.onion' # search engine, lot of noise +] +config_api_uri = sys.argv[1] + + +def add_if_not_exist(a: List[dict], b: str): + found = False + for i in a: + if i['hostname'] == b: + found = True + + if not found: + a.append({'hostname': b}) + + +# Get up-to-date list of real-world / legit .onion +r = requests.get(url) +new_hostnames = [] +for hostname in r.text.splitlines(): + new_hostnames.append({'hostname': hostname}) +print("pulled {} real world hostnames from ct-log.txt".format(len(new_hostnames))) + +# Append custom hostnames ignore list +for custom_hostname in custom_hostnames: + add_if_not_exist(new_hostnames, custom_hostname) +print("added {} custom hostnames".format(len(custom_hostnames))) + +# Query existing blacklisted hostnames from ConfigAPI +r = requests.get(config_api_uri + "/config/forbidden-hostnames") +forbidden_hostnames = r.json() +print("there is {} forbidden hostnames defined in ConfigAPI".format(len(forbidden_hostnames))) + +# Merge the lists while preventing duplicates +for forbidden_hostname in forbidden_hostnames: + add_if_not_exist(new_hostnames, forbidden_hostname['hostname']) +print("there is {} forbidden hostnames now".format(len(forbidden_hostnames))) + +# Update ConfigAPI +headers = {'Content-Type': 'application/json', 'Accept': 'application/json'} +r = requests.put(config_api_uri + "/config/forbidden-hostnames", json.dumps(forbidden_hostnames), headers=headers) + +if r.ok: + print("successfully updated forbidden hostnames")