From 71f82d4aadcaf531fac74f59a53420e529c5b0be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alo=C3=AFs=20Micard?= Date: Thu, 7 Jan 2021 10:33:25 +0100 Subject: [PATCH 01/11] process: Rework whole flags system - Turn the flag into Feature system to allow easier configuration. - Add prefetch flag to event feature --- deployments/docker/docker-compose.yml | 35 +++--- internal/blacklister/blacklister.go | 6 +- internal/blacklister/blacklister_test.go | 4 +- internal/configapi/configapi.go | 6 +- internal/configapi/configapi_test.go | 4 +- internal/crawler/crawler.go | 6 +- internal/crawler/crawler_test.go | 4 +- internal/event/subscriber.go | 4 +- internal/indexer/indexer.go | 6 +- internal/indexer/indexer_test.go | 4 +- internal/process/process.go | 129 ++++++++++++----------- internal/scheduler/scheduler.go | 6 +- internal/scheduler/scheduler_test.go | 4 +- internal/test/process.go | 14 +-- 14 files changed, 121 insertions(+), 111 deletions(-) diff --git a/deployments/docker/docker-compose.yml b/deployments/docker/docker-compose.yml index d9ebbde..10e20c9 100644 --- a/deployments/docker/docker-compose.yml +++ b/deployments/docker/docker-compose.yml @@ -32,29 +32,32 @@ services: image: creekorful/tdsh-crawler:latest command: > --log-level debug - --hub-uri amqp://guest:guest@rabbitmq:5672 - --tor-uri torproxy:9050 - --config-api-uri http://configapi:8080 + --event-srv amqp://guest:guest@rabbitmq:5672 + --tor-proxy torproxy:9050 + --config-api http://configapi:8080 restart: always depends_on: - rabbitmq - torproxy + - configapi scheduler: image: creekorful/tdsh-scheduler:latest command: > --log-level debug - --hub-uri amqp://guest:guest@rabbitmq:5672 - --config-api-uri http://configapi:8080 - --redis-uri redis:6379 + --event-srv amqp://guest:guest@rabbitmq:5672 + --config-api http://configapi:8080 + --redis redis:6379 restart: always depends_on: - rabbitmq + - configapi + - redis indexer-local: image: creekorful/tdsh-indexer:latest command: > --log-level debug - --hub-uri amqp://guest:guest@rabbitmq:5672 - --config-api-uri http://configapi:8080 + --event-srv amqp://guest:guest@rabbitmq:5672 + --config-api http://configapi:8080 --index-driver local --index-dest /archive restart: always @@ -67,8 +70,8 @@ services: image: creekorful/tdsh-indexer:latest command: > --log-level debug - --hub-uri amqp://guest:guest@rabbitmq:5672 - --config-api-uri http://configapi:8080 + --event-srv amqp://guest:guest@rabbitmq:5672 + --config-api http://configapi:8080 --index-driver elastic --index-dest http://elasticsearch:9200 restart: always @@ -80,8 +83,8 @@ services: image: creekorful/tdsh-configapi:latest command: > --log-level debug - --hub-uri amqp://guest:guest@rabbitmq:5672 - --redis-uri redis:6379 + --event-srv amqp://guest:guest@rabbitmq:5672 + --redis redis:6379 --default-value forbidden-hostnames="[]" --default-value allowed-mime-types="[{\"content-type\":\"text/\",\"extensions\":[\"html\",\"php\",\"aspx\", \"htm\"]}]" --default-value refresh-delay="{\"delay\": -1}" @@ -96,10 +99,10 @@ services: image: creekorful/tdsh-blacklister:latest command: > --log-level debug - --hub-uri amqp://guest:guest@rabbitmq:5672 - --config-api-uri http://configapi:8080 - --redis-uri redis:6379 - --tor-uri torproxy:9050 + --event-srv amqp://guest:guest@rabbitmq:5672 + --config-api http://configapi:8080 + --redis redis:6379 + --tor-proxy torproxy:9050 restart: always depends_on: - rabbitmq diff --git a/internal/blacklister/blacklister.go b/internal/blacklister/blacklister.go index bde6651..40be668 100644 --- a/internal/blacklister/blacklister.go +++ b/internal/blacklister/blacklister.go @@ -27,9 +27,9 @@ func (state *State) Name() string { return "blacklister" } -// CommonFlags return process common flags -func (state *State) CommonFlags() []string { - return []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag, process.UserAgentFlag, process.TorURIFlag} +// Features return the process features +func (state *State) Features() []process.Feature { + return []process.Feature{process.EventFeature, process.ConfigFeature, process.CacheFeature, process.CrawlingFeature} } // CustomFlags return process custom flags diff --git a/internal/blacklister/blacklister_test.go b/internal/blacklister/blacklister_test.go index fe3a78f..c8ef709 100644 --- a/internal/blacklister/blacklister_test.go +++ b/internal/blacklister/blacklister_test.go @@ -24,9 +24,9 @@ func TestState_Name(t *testing.T) { } } -func TestState_CommonFlags(t *testing.T) { +func TestState_Features(t *testing.T) { s := State{} - test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag, process.UserAgentFlag, process.TorURIFlag}) + test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.ConfigFeature, process.CacheFeature, process.CrawlingFeature}) } func TestState_CustomFlags(t *testing.T) { diff --git a/internal/configapi/configapi.go b/internal/configapi/configapi.go index 7bdbee8..21cbbb8 100644 --- a/internal/configapi/configapi.go +++ b/internal/configapi/configapi.go @@ -24,9 +24,9 @@ func (state *State) Name() string { return "configapi" } -// CommonFlags return process common flags -func (state *State) CommonFlags() []string { - return []string{process.HubURIFlag, process.RedisURIFlag} +// Features return the process features +func (state *State) Features() []process.Feature { + return []process.Feature{process.EventFeature, process.CacheFeature} } // CustomFlags return process custom flags diff --git a/internal/configapi/configapi_test.go b/internal/configapi/configapi_test.go index 71d0405..923ea8a 100644 --- a/internal/configapi/configapi_test.go +++ b/internal/configapi/configapi_test.go @@ -24,9 +24,9 @@ func TestState_Name(t *testing.T) { } } -func TestState_CommonFlags(t *testing.T) { +func TestState_Features(t *testing.T) { s := State{} - test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.RedisURIFlag}) + test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.CacheFeature}) } func TestState_CustomFlags(t *testing.T) { diff --git a/internal/crawler/crawler.go b/internal/crawler/crawler.go index 161a52f..48a33f7 100644 --- a/internal/crawler/crawler.go +++ b/internal/crawler/crawler.go @@ -32,9 +32,9 @@ func (state *State) Name() string { return "crawler" } -// CommonFlags return process common flags -func (state *State) CommonFlags() []string { - return []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.UserAgentFlag, process.TorURIFlag} +// Features return the process features +func (state *State) Features() []process.Feature { + return []process.Feature{process.EventFeature, process.ConfigFeature, process.CrawlingFeature} } // CustomFlags return process custom flags diff --git a/internal/crawler/crawler_test.go b/internal/crawler/crawler_test.go index eeb895f..6a35ea9 100644 --- a/internal/crawler/crawler_test.go +++ b/internal/crawler/crawler_test.go @@ -25,9 +25,9 @@ func TestState_Name(t *testing.T) { } } -func TestState_CommonFlags(t *testing.T) { +func TestState_Features(t *testing.T) { s := State{} - test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.UserAgentFlag, process.TorURIFlag}) + test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.ConfigFeature, process.CrawlingFeature}) } func TestState_CustomFlags(t *testing.T) { diff --git a/internal/event/subscriber.go b/internal/event/subscriber.go index b559308..ae9b5b0 100644 --- a/internal/event/subscriber.go +++ b/internal/event/subscriber.go @@ -36,7 +36,7 @@ type subscriber struct { } // NewSubscriber create a new subscriber and connect it to given server -func NewSubscriber(amqpURI string) (Subscriber, error) { +func NewSubscriber(amqpURI string, prefetch int) (Subscriber, error) { conn, err := amqp.Dial(amqpURI) if err != nil { return nil, err @@ -46,7 +46,7 @@ func NewSubscriber(amqpURI string) (Subscriber, error) { if err != nil { return nil, err } - if err := c.Qos(1, 0, false); err != nil { + if err := c.Qos(prefetch, 0, false); err != nil { return nil, err } diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index 79abfc9..43caa28 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -26,9 +26,9 @@ func (state *State) Name() string { return "indexer" } -// CommonFlags return process common flags -func (state *State) CommonFlags() []string { - return []string{process.HubURIFlag, process.ConfigAPIURIFlag} +// Features return the process features +func (state *State) Features() []process.Feature { + return []process.Feature{process.EventFeature, process.ConfigFeature} } // CustomFlags return process custom flags diff --git a/internal/indexer/indexer_test.go b/internal/indexer/indexer_test.go index 023f89d..bebc5de 100644 --- a/internal/indexer/indexer_test.go +++ b/internal/indexer/indexer_test.go @@ -22,9 +22,9 @@ func TestState_Name(t *testing.T) { } } -func TestState_CommonFlags(t *testing.T) { +func TestState_Features(t *testing.T) { s := State{} - test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag}) + test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.ConfigFeature}) } func TestState_CustomFlags(t *testing.T) { diff --git a/internal/process/process.go b/internal/process/process.go index cbe3fc9..f92e751 100644 --- a/internal/process/process.go +++ b/internal/process/process.go @@ -23,22 +23,27 @@ import ( "time" ) +// Feature represent a process feature +type Feature int + const ( version = "0.10.0" - // APIURIFlag is the api-uri flag - APIURIFlag = "api-uri" - // APITokenFlag is the api-token flag - APITokenFlag = "api-token" - // HubURIFlag is the hub-uri flag - HubURIFlag = "hub-uri" - // ConfigAPIURIFlag is the config-api-uri flag - ConfigAPIURIFlag = "config-api-uri" - // RedisURIFlag is the redis-uri flag - RedisURIFlag = "redis-uri" - // TorURIFlag is the tor-uri flag - TorURIFlag = "tor-uri" - // UserAgentFlag is the user-agent flag - UserAgentFlag = "user-agent" + + // EventFeature is the feature to plug the process to the event server + EventFeature Feature = iota + // ConfigFeature is the feature to plug the process to the ConfigAPI framework + ConfigFeature + // CacheFeature is the feature to plug the process to the cache server + CacheFeature + // CrawlingFeature is the feature to plug the process with a tor-compatible HTTP client + CrawlingFeature + + eventURIFlag = "event-srv" + eventPrefetchFlag = "event-prefetch" + configAPIURIFlag = "config-api" + redisURIFlag = "redis" + torURIFlag = "tor-proxy" + userAgentFlag = "user-agent" ) // Provider is the implementation provider @@ -80,30 +85,30 @@ func (p *defaultProvider) ConfigClient(keys []string) (configapi.Client, error) return nil, err } - return configapi.NewConfigClient(p.ctx.String(ConfigAPIURIFlag), sub, keys) + return configapi.NewConfigClient(p.ctx.String(configAPIURIFlag), sub, keys) } func (p *defaultProvider) Subscriber() (event.Subscriber, error) { - return event.NewSubscriber(p.ctx.String(HubURIFlag)) + return event.NewSubscriber(p.ctx.String(eventURIFlag), p.ctx.Int(eventPrefetchFlag)) } func (p *defaultProvider) Publisher() (event.Publisher, error) { - return event.NewPublisher(p.ctx.String(HubURIFlag)) + return event.NewPublisher(p.ctx.String(eventURIFlag)) } func (p *defaultProvider) Cache(keyPrefix string) (cache.Cache, error) { - return cache.NewRedisCache(p.ctx.String(RedisURIFlag), keyPrefix) + return cache.NewRedisCache(p.ctx.String(redisURIFlag), keyPrefix) } func (p *defaultProvider) HTTPClient() (chttp.Client, error) { return chttp.NewFastHTTPClient(&fasthttp.Client{ // Use given TOR proxy to reach the hidden services - Dial: fasthttpproxy.FasthttpSocksDialer(p.ctx.String(TorURIFlag)), + Dial: fasthttpproxy.FasthttpSocksDialer(p.ctx.String(torURIFlag)), // Disable SSL verification since we do not really care about this TLSConfig: &tls.Config{InsecureSkipVerify: true}, ReadTimeout: time.Second * 5, WriteTimeout: time.Second * 5, - Name: p.ctx.String(UserAgentFlag), + Name: p.ctx.String(userAgentFlag), }), nil } @@ -125,7 +130,7 @@ type SubscriberDef struct { // Process is a component of Trandoshan type Process interface { Name() string - CommonFlags() []string + Features() []Feature CustomFlags() []cli.Flag Initialize(provider Provider) error Subscribers() []SubscriberDef @@ -148,11 +153,11 @@ func MakeApp(process Process) *cli.App { Action: execute(process), } - // Add common flags - flags := getCustomFlags() - for _, flag := range process.CommonFlags() { - if customFlag, contains := flags[flag]; contains { - app.Flags = append(app.Flags, customFlag) + // Add features flags + featureFlags := getFeaturesFlags() + for _, feature := range process.Features() { + if values, exist := featureFlags[feature]; exist { + app.Flags = append(app.Flags, values...) } } @@ -236,43 +241,49 @@ func execute(process Process) cli.ActionFunc { } } -func getCustomFlags() map[string]cli.Flag { - flags := map[string]cli.Flag{} +func getFeaturesFlags() map[Feature][]cli.Flag { + flags := map[Feature][]cli.Flag{} - flags[HubURIFlag] = &cli.StringFlag{ - Name: HubURIFlag, - Usage: "URI to the hub (event) server", - Required: true, - } - flags[APIURIFlag] = &cli.StringFlag{ - Name: APIURIFlag, - Usage: "URI to the API server", - Required: true, - } - flags[APITokenFlag] = &cli.StringFlag{ - Name: APITokenFlag, - Usage: "Token to use to authenticate against the API", - Required: true, - } - flags[ConfigAPIURIFlag] = &cli.StringFlag{ - Name: ConfigAPIURIFlag, - Usage: "URI to the ConfigAPI server", - Required: true, + flags[EventFeature] = []cli.Flag{ + &cli.StringFlag{ + Name: eventURIFlag, + Usage: "URI to the event server", + Required: true, + }, + &cli.IntFlag{ + Name: eventPrefetchFlag, + Usage: "Prefetch for the event subscriber", + Value: 1, + }, } - flags[RedisURIFlag] = &cli.StringFlag{ - Name: RedisURIFlag, - Usage: "URI to the Redis server", - Required: true, + + flags[ConfigFeature] = []cli.Flag{ + &cli.StringFlag{ + Name: configAPIURIFlag, + Usage: "URI to the ConfigAPI server", + Required: true, + }, } - flags[TorURIFlag] = &cli.StringFlag{ - Name: TorURIFlag, - Usage: "URI to the TOR SOCKS proxy", - Required: true, + + flags[CacheFeature] = []cli.Flag{ + &cli.StringFlag{ + Name: redisURIFlag, + Usage: "URI to the Redis server", + Required: true, + }, } - flags[UserAgentFlag] = &cli.StringFlag{ - Name: UserAgentFlag, - Usage: "User agent to use", - Value: "Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0", + + flags[CrawlingFeature] = []cli.Flag{ + &cli.StringFlag{ + Name: torURIFlag, + Usage: "URI to the TOR SOCKS proxy", + Required: true, + }, + &cli.StringFlag{ + Name: userAgentFlag, + Usage: "User agent to use", + Value: "Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0", + }, } return flags diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 9372711..cfd3235 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -39,9 +39,9 @@ func (state *State) Name() string { return "scheduler" } -// CommonFlags return process common flags -func (state *State) CommonFlags() []string { - return []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag} +// Features return the process features +func (state *State) Features() []process.Feature { + return []process.Feature{process.EventFeature, process.ConfigFeature, process.CacheFeature} } // CustomFlags return process custom flags diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index aca19bd..3cc08f9 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -23,9 +23,9 @@ func TestState_Name(t *testing.T) { } } -func TestState_CommonFlags(t *testing.T) { +func TestState_Features(t *testing.T) { s := State{} - test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag}) + test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.ConfigFeature, process.CacheFeature}) } func TestState_CustomFlags(t *testing.T) { diff --git a/internal/test/process.go b/internal/test/process.go index 93ff18f..e4309af 100644 --- a/internal/test/process.go +++ b/internal/test/process.go @@ -14,10 +14,10 @@ type SubscriberDef struct { Exchange string } -// CheckProcessCommonFlags check process defined common flags -func CheckProcessCommonFlags(t *testing.T, p process.Process, wantFlags []string) { - if !checkListEquals(p.CommonFlags(), wantFlags) { - t.Errorf("Differents flags: %v %v", p.CommonFlags(), wantFlags) +// CheckProcessFeatures check process defined features +func CheckProcessFeatures(t *testing.T, p process.Process, wantFeatures []process.Feature) { + if !reflect.DeepEqual(p.Features(), wantFeatures) { + t.Errorf("Differents flags: %v %v", p.Features(), wantFeatures) } } @@ -28,7 +28,7 @@ func CheckProcessCustomFlags(t *testing.T, p process.Process, wantFlags []string names = append(names, customFlag.Names()[0]) } - if !checkListEquals(names, wantFlags) { + if !reflect.DeepEqual(names, wantFlags) { t.Errorf("Differents flags: %v %v", names, wantFlags) } } @@ -62,7 +62,3 @@ func CheckProcessSubscribers(t *testing.T, p process.Process, subscribers []Subs } // TODO HTTPHandler - -func checkListEquals(a []string, b []string) bool { - return reflect.DeepEqual(a, b) -} From 9b46dc205e62acb7d719e1944fdc3211b0919e27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alo=C3=AFs=20Micard?= Date: Thu, 7 Jan 2021 11:13:14 +0100 Subject: [PATCH 02/11] Indexer: support buffered indexing --- deployments/docker/docker-compose.yml | 1 + internal/configapi/configapi.go | 2 +- internal/configapi/configapi_test.go | 2 +- internal/indexer/index/elastic.go | 46 +++++--- internal/indexer/index/elastic_test.go | 23 ++-- internal/indexer/index/index.go | 14 ++- internal/indexer/index/local.go | 18 +++- internal/indexer/index/local_test.go | 49 ++++++++- internal/indexer/indexer.go | 50 ++++++++- internal/indexer/indexer_test.go | 139 ++++++++++++++++++++++++- internal/process/process.go | 36 ++++--- 11 files changed, 324 insertions(+), 56 deletions(-) diff --git a/deployments/docker/docker-compose.yml b/deployments/docker/docker-compose.yml index 10e20c9..97fcc77 100644 --- a/deployments/docker/docker-compose.yml +++ b/deployments/docker/docker-compose.yml @@ -71,6 +71,7 @@ services: command: > --log-level debug --event-srv amqp://guest:guest@rabbitmq:5672 + --event-prefetch 20 --config-api http://configapi:8080 --index-driver elastic --index-dest http://elasticsearch:9200 diff --git a/internal/configapi/configapi.go b/internal/configapi/configapi.go index 21cbbb8..b8affe1 100644 --- a/internal/configapi/configapi.go +++ b/internal/configapi/configapi.go @@ -54,7 +54,7 @@ func (state *State) Initialize(provider process.Provider) error { state.pub = pub defaultValues := map[string]string{} - for _, value := range provider.GetValues("default-value") { + for _, value := range provider.GetStrValues("default-value") { parts := strings.Split(value, "=") if len(parts) == 2 { diff --git a/internal/configapi/configapi_test.go b/internal/configapi/configapi_test.go index 923ea8a..e1a89ce 100644 --- a/internal/configapi/configapi_test.go +++ b/internal/configapi/configapi_test.go @@ -38,7 +38,7 @@ func TestState_Initialize(t *testing.T) { test.CheckInitialize(t, &State{}, func(p *process_mock.MockProviderMockRecorder) { p.Cache("configuration") p.Publisher() - p.GetValues("default-value") + p.GetStrValues("default-value") }) } diff --git a/internal/indexer/index/elastic.go b/internal/indexer/index/elastic.go index a920921..96a3275 100644 --- a/internal/indexer/index/elastic.go +++ b/internal/indexer/index/elastic.go @@ -9,8 +9,7 @@ import ( "time" ) -var resourcesIndex = "resources" - +const resourcesIndexName = "resources" const mapping = ` { "settings": { @@ -93,29 +92,48 @@ func newElasticIndex(uri string) (Index, error) { }, nil } -func (e *elasticSearchIndex) IndexResource(url string, time time.Time, body string, headers map[string]string) error { - res, err := extractResource(url, time, body, headers) +func (e *elasticSearchIndex) IndexResource(resource Resource) error { + res, err := indexResource(resource) if err != nil { return err } _, err = e.client.Index(). - Index(resourcesIndex). + Index(resourcesIndexName). BodyJson(res). Do(context.Background()) return err } +func (e *elasticSearchIndex) IndexResources(resources []Resource) error { + bulkRequest := e.client.Bulk() + + for _, resource := range resources { + resourceIndex, err := indexResource(resource) + if err != nil { + return err + } + + req := elastic.NewBulkIndexRequest(). + Index(resourcesIndexName). + Doc(resourceIndex) + bulkRequest.Add(req) + } + + _, err := bulkRequest.Do(context.Background()) + return err +} + func setupElasticSearch(ctx context.Context, es *elastic.Client) error { // Setup index if doesn't exist - exist, err := es.IndexExists(resourcesIndex).Do(ctx) + exist, err := es.IndexExists(resourcesIndexName).Do(ctx) if err != nil { return err } if !exist { - log.Debug().Str("index", resourcesIndex).Msg("Creating missing index") + log.Debug().Str("index", resourcesIndexName).Msg("Creating missing index") - q := es.CreateIndex(resourcesIndex).BodyString(mapping) + q := es.CreateIndex(resourcesIndexName).BodyString(mapping) if _, err := q.Do(ctx); err != nil { return err } @@ -124,8 +142,8 @@ func setupElasticSearch(ctx context.Context, es *elastic.Client) error { return nil } -func extractResource(url string, time time.Time, body string, headers map[string]string) (*resourceIdx, error) { - doc, err := goquery.NewDocumentFromReader(strings.NewReader(body)) +func indexResource(resource Resource) (*resourceIdx, error) { + doc, err := goquery.NewDocumentFromReader(strings.NewReader(resource.Body)) if err != nil { return nil, err } @@ -152,14 +170,14 @@ func extractResource(url string, time time.Time, body string, headers map[string // Lowercase headers lowerCasedHeaders := map[string]string{} - for key, value := range headers { + for key, value := range resource.Headers { lowerCasedHeaders[strings.ToLower(key)] = value } return &resourceIdx{ - URL: url, - Body: body, - Time: time, + URL: resource.URL, + Body: resource.Body, + Time: resource.Time, Title: title, Meta: meta, Description: meta["description"], diff --git a/internal/indexer/index/elastic_test.go b/internal/indexer/index/elastic_test.go index a4e145b..89affd1 100644 --- a/internal/indexer/index/elastic_test.go +++ b/internal/indexer/index/elastic_test.go @@ -6,7 +6,7 @@ import ( "time" ) -func TestExtractResource(t *testing.T) { +func TestIndexResource(t *testing.T) { body := ` Creekorful Inc @@ -23,34 +23,39 @@ This is sparta Body: body, } - resDto, err := extractResource("https://example.org/300", time.Time{}, body, map[string]string{"Content-Type": "application/json"}) + resIdx, err := indexResource(Resource{ + URL: "https://example.org/300", + Time: time.Time{}, + Body: body, + Headers: map[string]string{"Content-Type": "application/json"}, + }) if err != nil { t.FailNow() } - if resDto.URL != "https://example.org/300" { + if resIdx.URL != "https://example.org/300" { t.Fail() } - if resDto.Title != "Creekorful Inc" { + if resIdx.Title != "Creekorful Inc" { t.Fail() } - if resDto.Body != msg.Body { + if resIdx.Body != msg.Body { t.Fail() } - if resDto.Description != "Zhello world" { + if resIdx.Description != "Zhello world" { t.Fail() } - if resDto.Meta["description"] != "Zhello world" { + if resIdx.Meta["description"] != "Zhello world" { t.Fail() } - if resDto.Meta["og:url"] != "https://example.org" { + if resIdx.Meta["og:url"] != "https://example.org" { t.Fail() } - if resDto.Headers["content-type"] != "application/json" { + if resIdx.Headers["content-type"] != "application/json" { t.Fail() } } diff --git a/internal/indexer/index/index.go b/internal/indexer/index/index.go index a3da953..42d2825 100644 --- a/internal/indexer/index/index.go +++ b/internal/indexer/index/index.go @@ -14,10 +14,18 @@ const ( Local = "local" ) -// Index is the interface used to abstract communication -// with the persistence unit +// Resource represent a resource that should be indexed +type Resource struct { + URL string + Time time.Time + Body string + Headers map[string]string +} + +// Index is the interface used to abstract communication with the persistence unit type Index interface { - IndexResource(url string, time time.Time, body string, headers map[string]string) error + IndexResource(resource Resource) error + IndexResources(resources []Resource) error } // NewIndex create a new index using given driver, destination diff --git a/internal/indexer/index/local.go b/internal/indexer/index/local.go index 44f14f9..9b84f92 100644 --- a/internal/indexer/index/local.go +++ b/internal/indexer/index/local.go @@ -20,13 +20,13 @@ func newLocalIndex(root string) (Index, error) { return &localIndex{baseDir: root}, nil } -func (s *localIndex) IndexResource(url string, time time.Time, body string, headers map[string]string) error { - path, err := formatPath(url, time) +func (s *localIndex) IndexResource(resource Resource) error { + path, err := formatPath(resource.URL, resource.Time) if err != nil { return err } - content, err := formatResource(url, body, headers) + content, err := formatResource(resource.URL, resource.Body, resource.Headers) if err != nil { return err } @@ -45,6 +45,18 @@ func (s *localIndex) IndexResource(url string, time time.Time, body string, head return nil } +func (s *localIndex) IndexResources(resources []Resource) error { + // No specific implementation for the local driver. + // we simply call IndexResource n-times + for _, resource := range resources { + if err := s.IndexResource(resource); err != nil { + return err + } + } + + return nil +} + func formatResource(url string, body string, headers map[string]string) ([]byte, error) { builder := strings.Builder{} diff --git a/internal/indexer/index/local_test.go b/internal/indexer/index/local_test.go index e3763b6..00bca29 100644 --- a/internal/indexer/index/local_test.go +++ b/internal/indexer/index/local_test.go @@ -57,8 +57,55 @@ func TestLocalIndex_IndexResource(t *testing.T) { s := localIndex{baseDir: d} ti := time.Date(2020, time.October, 29, 12, 4, 9, 0, time.UTC) + if err := s.IndexResource(Resource{ + URL: "https://google.com", + Time: ti, + Body: "Hello, world", + Headers: map[string]string{"Server": "Traefik"}, + }); err != nil { + t.Fail() + } + + p := filepath.Join(d, "https", "google.com", "1603973049") + + inf, err := os.Stat(p) + if err != nil { + t.Fail() + } + if inf.Mode() != 0640 { + t.Fail() + } + + b, err := ioutil.ReadFile(p) + if err != nil { + t.Fail() + } + if string(b) != "https://google.com\n\nServer: Traefik\n\nHello, world" { + t.Fail() + } +} + +func TestLocalIndex_IndexResources(t *testing.T) { + d, err := ioutil.TempDir("", "") + if err != nil { + t.FailNow() + } + defer os.RemoveAll(d) + + s := localIndex{baseDir: d} + + ti := time.Date(2020, time.October, 29, 12, 4, 9, 0, time.UTC) + + resources := []Resource{ + { + URL: "https://google.com", + Time: ti, + Body: "Hello, world", + Headers: map[string]string{"Server": "Traefik"}, + }, + } - if err := s.IndexResource("https://google.com", ti, "Hello, world", map[string]string{"Server": "Traefik"}); err != nil { + if err := s.IndexResources(resources); err != nil { t.Fail() } diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index 43caa28..a550638 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -19,6 +19,9 @@ type State struct { index index.Index indexDriver string configClient configapi.Client + + bufferThreshold int + resources []index.Resource } // Name return the process name @@ -49,13 +52,14 @@ func (state *State) CustomFlags() []cli.Flag { // Initialize the process func (state *State) Initialize(provider process.Provider) error { - indexDriver := provider.GetValue("index-driver") - idx, err := index.NewIndex(indexDriver, provider.GetValue("index-dest")) + indexDriver := provider.GetStrValue("index-driver") + idx, err := index.NewIndex(indexDriver, provider.GetStrValue("index-dest")) if err != nil { return err } state.index = idx state.indexDriver = indexDriver + state.bufferThreshold = provider.GetIntValue(process.EventPrefetchFlag) configClient, err := provider.ConfigClient([]string{configapi.ForbiddenHostnamesKey}) if err != nil { @@ -89,11 +93,47 @@ func (state *State) handleNewResourceEvent(subscriber event.Subscriber, msg even return fmt.Errorf("%s %w", evt.URL, errHostnameNotAllowed) } - if err := state.index.IndexResource(evt.URL, evt.Time, evt.Body, evt.Headers); err != nil { - return fmt.Errorf("error while indexing resource: %s", err) + // Direct saving (no buffering) + if state.bufferThreshold == 1 { + if err := state.index.IndexResource(index.Resource{ + URL: evt.URL, + Time: evt.Time, + Body: evt.Body, + Headers: evt.Headers, + }); err != nil { + return fmt.Errorf("error while indexing resource: %s", err) + } + + log.Info(). + Str("url", evt.URL). + Msg("Successfully indexed resource") + + return nil } - log.Info().Str("url", evt.URL).Msg("Successfully indexed resource") + // Otherwise we are in buffered saving mode + state.resources = append(state.resources, index.Resource{ + URL: evt.URL, + Time: evt.Time, + Body: evt.Body, + Headers: evt.Headers, + }) + + log.Debug().Str("url", evt.URL).Msg("Successfully stored resource in buffer") + + if len(state.resources) >= state.bufferThreshold { + // Time to save! + if err := state.index.IndexResources(state.resources); err != nil { + return fmt.Errorf("error while indexing resources: %s", err) + } + + log.Info(). + Int("count", len(state.resources)). + Msg("Successfully indexed buffered resources") + + // Clear cache + state.resources = []index.Resource{} + } return nil } diff --git a/internal/indexer/indexer_test.go b/internal/indexer/indexer_test.go index bebc5de..7931737 100644 --- a/internal/indexer/indexer_test.go +++ b/internal/indexer/indexer_test.go @@ -6,11 +6,13 @@ import ( "github.com/creekorful/trandoshan/internal/configapi/client_mock" "github.com/creekorful/trandoshan/internal/event" "github.com/creekorful/trandoshan/internal/event_mock" + "github.com/creekorful/trandoshan/internal/indexer/index" "github.com/creekorful/trandoshan/internal/indexer/index_mock" "github.com/creekorful/trandoshan/internal/process" "github.com/creekorful/trandoshan/internal/process_mock" "github.com/creekorful/trandoshan/internal/test" "github.com/golang/mock/gomock" + "reflect" "testing" "time" ) @@ -35,14 +37,18 @@ func TestState_CustomFlags(t *testing.T) { func TestState_Initialize(t *testing.T) { s := State{} test.CheckInitialize(t, &s, func(p *process_mock.MockProviderMockRecorder) { - p.GetValue("index-driver").Return("local") - p.GetValue("index-dest") + p.GetStrValue("index-driver").Return("local") + p.GetStrValue("index-dest") + p.GetIntValue(process.EventPrefetchFlag).Return(10) p.ConfigClient([]string{client.ForbiddenHostnamesKey}) }) if s.indexDriver != "local" { t.Errorf("wrong driver: got: %s want: %s", s.indexDriver, "local") } + if s.bufferThreshold != 10 { + t.Errorf("wrong buffer threshold: got: %d want: %d", s.bufferThreshold, 10) + } } func TestState_Subscribers(t *testing.T) { @@ -52,7 +58,53 @@ func TestState_Subscribers(t *testing.T) { }) } -func TestHandleNewResourceEvent(t *testing.T) { +func TestHandleNewResourceEvent_NoBuffering(t *testing.T) { + body := ` +Creekorful Inc + +This is sparta (hosted on https://example.org) + + + +Thanks to https://help.facebook.onion/ for the hosting :D + + +` + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + subscriberMock := event_mock.NewMockSubscriber(mockCtrl) + configClientMock := client_mock.NewMockClient(mockCtrl) + indexMock := index_mock.NewMockIndex(mockCtrl) + + tn := time.Now() + + msg := event.RawMessage{} + subscriberMock.EXPECT(). + Read(&msg, &event.NewResourceEvent{}). + SetArg(1, event.NewResourceEvent{ + URL: "https://example.onion", + Body: body, + Headers: map[string]string{"Server": "Traefik", "Content-Type": "application/html"}, + Time: tn, + }).Return(nil) + + configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{{Hostname: "example2.onion"}}, nil) + indexMock.EXPECT().IndexResource(index.Resource{ + URL: "https://example.onion", + Time: tn, + Body: body, + Headers: map[string]string{"Server": "Traefik", "Content-Type": "application/html"}, + }) + + s := State{index: indexMock, configClient: configClientMock, bufferThreshold: 1} + if err := s.handleNewResourceEvent(subscriberMock, msg); err != nil { + t.FailNow() + } +} + +func TestHandleNewResourceEvent_Buffering_NoDispatch(t *testing.T) { body := ` Creekorful Inc @@ -85,12 +137,89 @@ Thanks to https://help.facebook.onion/ for the hosting :D }).Return(nil) configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{{Hostname: "example2.onion"}}, nil) - indexMock.EXPECT().IndexResource("https://example.onion", tn, body, map[string]string{"Server": "Traefik", "Content-Type": "application/html"}) - s := State{index: indexMock, configClient: configClientMock} + s := State{index: indexMock, configClient: configClientMock, bufferThreshold: 5} if err := s.handleNewResourceEvent(subscriberMock, msg); err != nil { t.FailNow() } + + if len(s.resources) != 1 { + t.FailNow() + } + + if s.resources[0].URL != "https://example.onion" { + t.Fail() + } + if s.resources[0].Body != body { + t.Fail() + } + if !reflect.DeepEqual(s.resources[0].Headers, map[string]string{"Server": "Traefik", "Content-Type": "application/html"}) { + t.Fail() + } + if s.resources[0].Time != tn { + t.Fail() + } +} + +func TestHandleNewResourceEvent_Buffering_Dispatch(t *testing.T) { + body := ` +Creekorful Inc + +This is sparta (hosted on https://example.org) + + + +Thanks to https://help.facebook.onion/ for the hosting :D + + +` + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + subscriberMock := event_mock.NewMockSubscriber(mockCtrl) + configClientMock := client_mock.NewMockClient(mockCtrl) + indexMock := index_mock.NewMockIndex(mockCtrl) + + tn := time.Now() + + msg := event.RawMessage{} + subscriberMock.EXPECT(). + Read(&msg, &event.NewResourceEvent{}). + SetArg(1, event.NewResourceEvent{ + URL: "https://example.onion", + Body: body, + Headers: map[string]string{"Server": "Traefik", "Content-Type": "application/html"}, + Time: tn, + }).Return(nil) + + configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{{Hostname: "example2.onion"}}, nil) + indexMock.EXPECT().IndexResources([]index.Resource{ + { + URL: "https://google.onion", + }, + { + URL: "https://example.onion", + Time: tn, + Body: body, + Headers: map[string]string{"Server": "Traefik", "Content-Type": "application/html"}, + }, + }) + + s := State{ + index: indexMock, + configClient: configClientMock, + bufferThreshold: 2, + resources: []index.Resource{{URL: "https://google.onion"}}, + } + if err := s.handleNewResourceEvent(subscriberMock, msg); err != nil { + t.FailNow() + } + + // should be reset + if len(s.resources) != 0 { + t.Fail() + } } func TestHandleMessageForbiddenHostname(t *testing.T) { diff --git a/internal/process/process.go b/internal/process/process.go index f92e751..0047d3f 100644 --- a/internal/process/process.go +++ b/internal/process/process.go @@ -38,12 +38,14 @@ const ( // CrawlingFeature is the feature to plug the process with a tor-compatible HTTP client CrawlingFeature - eventURIFlag = "event-srv" - eventPrefetchFlag = "event-prefetch" - configAPIURIFlag = "config-api" - redisURIFlag = "redis" - torURIFlag = "tor-proxy" - userAgentFlag = "user-agent" + // EventPrefetchFlag is the prefetch count for the event subscriber + EventPrefetchFlag = "event-prefetch" + + eventURIFlag = "event-srv" + configAPIURIFlag = "config-api" + redisURIFlag = "redis" + torURIFlag = "tor-proxy" + userAgentFlag = "user-agent" ) // Provider is the implementation provider @@ -60,10 +62,12 @@ type Provider interface { Cache(keyPrefix string) (cache.Cache, error) // HTTPClient return a new configured http client HTTPClient() (chttp.Client, error) - // GetValue return value for given key - GetValue(key string) string - // GetValue return values for given key - GetValues(key string) []string + // GetStrValue return string value for given key + GetStrValue(key string) string + // GetStrValues return string slice for given key + GetStrValues(key string) []string + // GetIntValue return int value for given key + GetIntValue(key string) int } type defaultProvider struct { @@ -89,7 +93,7 @@ func (p *defaultProvider) ConfigClient(keys []string) (configapi.Client, error) } func (p *defaultProvider) Subscriber() (event.Subscriber, error) { - return event.NewSubscriber(p.ctx.String(eventURIFlag), p.ctx.Int(eventPrefetchFlag)) + return event.NewSubscriber(p.ctx.String(eventURIFlag), p.ctx.Int(EventPrefetchFlag)) } func (p *defaultProvider) Publisher() (event.Publisher, error) { @@ -112,14 +116,18 @@ func (p *defaultProvider) HTTPClient() (chttp.Client, error) { }), nil } -func (p *defaultProvider) GetValue(key string) string { +func (p *defaultProvider) GetStrValue(key string) string { return p.ctx.String(key) } -func (p *defaultProvider) GetValues(key string) []string { +func (p *defaultProvider) GetStrValues(key string) []string { return p.ctx.StringSlice(key) } +func (p *defaultProvider) GetIntValue(key string) int { + return p.ctx.Int(key) +} + // SubscriberDef is the subscriber definition type SubscriberDef struct { Exchange string @@ -251,7 +259,7 @@ func getFeaturesFlags() map[Feature][]cli.Flag { Required: true, }, &cli.IntFlag{ - Name: eventPrefetchFlag, + Name: EventPrefetchFlag, Usage: "Prefetch for the event subscriber", Value: 1, }, From 7820820fa9416fb9a3927bbbb616295de1e6b22c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alo=C3=AFs=20Micard?= Date: Thu, 7 Jan 2021 15:20:38 +0100 Subject: [PATCH 03/11] scheduler: add batch support for dialing with cache --- internal/blacklister/blacklister.go | 2 +- internal/blacklister/blacklister_test.go | 2 +- internal/cache/cache.go | 6 +- internal/cache/redis.go | 55 ++++++++++++++-- internal/configapi/configapi.go | 7 +- internal/scheduler/scheduler.go | 47 +++++++------ internal/scheduler/scheduler_test.go | 84 +++++++++++++++++++----- 7 files changed, 154 insertions(+), 49 deletions(-) diff --git a/internal/blacklister/blacklister.go b/internal/blacklister/blacklister.go index bde6651..da17166 100644 --- a/internal/blacklister/blacklister.go +++ b/internal/blacklister/blacklister.go @@ -119,7 +119,7 @@ func (state *State) handleTimeoutURLEvent(subscriber event.Subscriber, msg event cacheKey := u.Hostname() count, err := state.hostnameCache.GetInt64(cacheKey) - if err != nil && err != cache.ErrNIL { + if err != nil { return err } count++ diff --git a/internal/blacklister/blacklister_test.go b/internal/blacklister/blacklister_test.go index fe3a78f..b023566 100644 --- a/internal/blacklister/blacklister_test.go +++ b/internal/blacklister/blacklister_test.go @@ -96,7 +96,7 @@ func TestHandleTimeoutURLEventNoDispatch(t *testing.T) { configClientMock.EXPECT().GetForbiddenHostnames().Return([]configapi.ForbiddenHostname{}, nil) configClientMock.EXPECT().GetBlackListThreshold().Return(configapi.BlackListThreshold{Threshold: 10}, nil) - hostnameCacheMock.EXPECT().GetInt64("down-example.onion").Return(int64(0), cache.ErrNIL) + hostnameCacheMock.EXPECT().GetInt64("down-example.onion").Return(int64(0), nil) hostnameCacheMock.EXPECT().SetInt64("down-example.onion", int64(1), cache.NoTTL).Return(nil) s := State{configClient: configClientMock, hostnameCache: hostnameCacheMock, httpClient: httpClientMock} diff --git a/internal/cache/cache.go b/internal/cache/cache.go index d2a975d..a2e8285 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -3,15 +3,12 @@ package cache //go:generate mockgen -destination=../cache_mock/cache_mock.go -package=cache_mock . Cache import ( - "errors" "time" ) var ( // NoTTL define an entry that lives forever NoTTL = time.Duration(0) - // ErrNIL is returned when there's no value for given key - ErrNIL = errors.New("value is nil") ) // Cache represent a KV database @@ -21,4 +18,7 @@ type Cache interface { GetInt64(key string) (int64, error) SetInt64(key string, value int64, TTL time.Duration) error + + GetManyInt64(keys []string) (map[string]int64, error) + SetManyInt64(values map[string]int64, TTL time.Duration) error } diff --git a/internal/cache/redis.go b/internal/cache/redis.go index a1252e5..41d58cb 100644 --- a/internal/cache/redis.go +++ b/internal/cache/redis.go @@ -25,11 +25,11 @@ func NewRedisCache(URI string, keyPrefix string) (Cache, error) { func (rc *redisCache) GetBytes(key string) ([]byte, error) { val, err := rc.client.Get(context.Background(), rc.getKey(key)).Bytes() - if err == redis.Nil { - err = ErrNIL + if err != nil && err != redis.Nil { + return nil, err } - return val, err + return val, nil } func (rc *redisCache) SetBytes(key string, value []byte, TTL time.Duration) error { @@ -38,17 +38,60 @@ func (rc *redisCache) SetBytes(key string, value []byte, TTL time.Duration) erro func (rc *redisCache) GetInt64(key string) (int64, error) { val, err := rc.client.Get(context.Background(), rc.getKey(key)).Int64() - if err == redis.Nil { - err = ErrNIL + if err != nil && err != redis.Nil { + return 0, err } - return val, err + return val, nil } func (rc *redisCache) SetInt64(key string, value int64, TTL time.Duration) error { return rc.client.Set(context.Background(), rc.getKey(key), value, TTL).Err() } +func (rc *redisCache) GetManyInt64(keys []string) (map[string]int64, error) { + pipeline := rc.client.Pipeline() + + // Execute commands and keep pointer to them + commands := map[string]*redis.StringCmd{} + for _, key := range keys { + commands[key] = pipeline.Get(context.Background(), rc.getKey(key)) + } + + // Execute pipeline + if _, err := pipeline.Exec(context.Background()); err != nil && err != redis.Nil { + return nil, err + } + + // Get back values + values := map[string]int64{} + for _, key := range keys { + val, err := commands[key].Int64() + if err != nil { + // If it's a real error + if err != redis.Nil { + return nil, err + } + } else { + // Only returns entry if there's one + values[key] = val + } + } + + return values, nil +} + +func (rc *redisCache) SetManyInt64(values map[string]int64, TTL time.Duration) error { + pipeline := rc.client.TxPipeline() + + for key, value := range values { + pipeline.Set(context.Background(), rc.getKey(key), value, TTL) + } + + _, err := pipeline.Exec(context.Background()) + return err +} + func (rc *redisCache) getKey(key string) string { if rc.keyPrefix == "" { return key diff --git a/internal/configapi/configapi.go b/internal/configapi/configapi.go index 7bdbee8..0ca7446 100644 --- a/internal/configapi/configapi.go +++ b/internal/configapi/configapi.go @@ -135,7 +135,12 @@ func (state *State) setConfiguration(w http.ResponseWriter, r *http.Request) { func setDefaultValues(configCache cache.Cache, values map[string]string) error { for key, value := range values { - if _, err := configCache.GetBytes(key); err == cache.ErrNIL { + b, err := configCache.GetBytes(key) + if err != nil { + return err + } + + if b == nil { if err := configCache.SetBytes(key, []byte(value), cache.NoTTL); err != nil { return fmt.Errorf("error while setting default value of %s: %s", key, err) } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 9372711..b074bad 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -92,16 +92,38 @@ func (state *State) handleNewResourceEvent(subscriber event.Subscriber, msg even return fmt.Errorf("error while extracting URLs") } + // Load values in batch + urlCache, err := state.urlCache.GetManyInt64(urls) + if err != nil { + return err + } + for _, u := range urls { - if err := state.processURL(u, subscriber); err != nil { + if err := state.processURL(u, subscriber, urlCache); err != nil { log.Err(err).Msg("error while processing URL") } } + // Update URL cache + delay, err := state.configClient.GetRefreshDelay() + if err != nil { + return err + } + + ttl := delay.Delay + if ttl == -1 { + ttl = cache.NoTTL + } + + // Update values in batch + if err := state.urlCache.SetManyInt64(urlCache, ttl); err != nil { + return err + } + return nil } -func (state *State) processURL(rawURL string, pub event.Publisher) error { +func (state *State) processURL(rawURL string, pub event.Publisher, urlCache map[string]int64) error { u, err := url.Parse(rawURL) if err != nil { return fmt.Errorf("error while parsing URL: %s", err) @@ -157,30 +179,13 @@ func (state *State) processURL(rawURL string, pub event.Publisher) error { } // Check if URL should be scheduled - count, err := state.urlCache.GetInt64(rawURL) - if err != nil && err != cache.ErrNIL { - return err - } - if count > 0 { + if urlCache[rawURL] > 0 { return fmt.Errorf("%s %w", u, errAlreadyScheduled) } log.Debug().Stringer("url", u).Msg("URL should be scheduled") - // Update URL cache - delay, err := state.configClient.GetRefreshDelay() - if err != nil { - return err - } - - ttl := delay.Delay - if ttl == -1 { - ttl = cache.NoTTL - } - - if err := state.urlCache.SetInt64(rawURL, count+1, ttl); err != nil { - return err - } + urlCache[rawURL]++ if err := pub.PublishEvent(&event.NewURLEvent{URL: rawURL}); err != nil { return fmt.Errorf("error while publishing URL: %s", err) diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index aca19bd..c8009d7 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -13,7 +13,6 @@ import ( "github.com/creekorful/trandoshan/internal/test" "github.com/golang/mock/gomock" "testing" - "time" ) func TestState_Name(t *testing.T) { @@ -66,7 +65,7 @@ func TestProcessURL_NotDotOnion(t *testing.T) { for _, url := range urls { state := State{} - if err := state.processURL(url, nil); !errors.Is(err, errNotOnionHostname) { + if err := state.processURL(url, nil, nil); !errors.Is(err, errNotOnionHostname) { t.Fail() } } @@ -80,7 +79,7 @@ func TestProcessURL_ProtocolForbidden(t *testing.T) { for _, url := range urls { state := State{} - if err := state.processURL(url, nil); !errors.Is(err, errProtocolNotAllowed) { + if err := state.processURL(url, nil, nil); !errors.Is(err, errProtocolNotAllowed) { t.Fail() } } @@ -98,7 +97,7 @@ func TestProcessURL_ExtensionForbidden(t *testing.T) { configClientMock.EXPECT().GetAllowedMimeTypes().Return([]client.MimeType{{Extensions: []string{"html", "php"}}}, nil) state := State{configClient: configClientMock} - if err := state.processURL(url, nil); !errors.Is(err, errExtensionNotAllowed) { + if err := state.processURL(url, nil, nil); !errors.Is(err, errExtensionNotAllowed) { t.Fail() } } @@ -139,7 +138,7 @@ func TestProcessURL_HostnameForbidden(t *testing.T) { configClientMock.EXPECT().GetForbiddenHostnames().Return(tst.forbiddenHostnames, nil) state := State{configClient: configClientMock} - if err := state.processURL(tst.url, nil); !errors.Is(err, errHostnameNotAllowed) { + if err := state.processURL(tst.url, nil, nil); !errors.Is(err, errHostnameNotAllowed) { t.Fail() } } @@ -149,42 +148,95 @@ func TestProcessURL_AlreadyScheduled(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - urlCacheMock := cache_mock.NewMockCache(mockCtrl) configClientMock := client_mock.NewMockClient(mockCtrl) - urlCacheMock.EXPECT().GetInt64("https://facebookcorewwi.onion/test.php?id=12").Return(int64(1), nil) configClientMock.EXPECT().GetAllowedMimeTypes().Return([]client.MimeType{{Extensions: []string{"html", "php"}}}, nil) configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{}, nil) - state := State{urlCache: urlCacheMock, configClient: configClientMock} - if err := state.processURL("https://facebookcorewwi.onion/test.php?id=12", nil); !errors.Is(err, errAlreadyScheduled) { + urlCache := map[string]int64{"https://facebookcorewwi.onion/test.php?id=12": 1} + state := State{configClient: configClientMock} + if err := state.processURL("https://facebookcorewwi.onion/test.php?id=12", nil, urlCache); !errors.Is(err, errAlreadyScheduled) { t.Fail() } } -func TestHandleNewResourceEvent(t *testing.T) { +func TestProcessURL(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - urlCacheMock := cache_mock.NewMockCache(mockCtrl) configClientMock := client_mock.NewMockClient(mockCtrl) pubMock := event_mock.NewMockPublisher(mockCtrl) urls := []string{"https://example.onion/index.php", "http://google.onion/admin.secret/login.html", "https://example.onion", "https://www.facebookcorewwwi.onion/recover.now/initiate?ars=facebook_login"} + // pre fill cache + urlCache := map[string]int64{} for _, url := range urls { - urlCacheMock.EXPECT().GetInt64(url).Return(int64(0), cache.ErrNIL) configClientMock.EXPECT().GetAllowedMimeTypes().Return([]client.MimeType{{Extensions: []string{"html", "php"}}}, nil) configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{}, nil) - configClientMock.EXPECT().GetRefreshDelay().Return(client.RefreshDelay{Delay: 10 * time.Hour}, nil) - urlCacheMock.EXPECT().SetInt64(url, int64(1), time.Duration(10*time.Hour)).Return(nil) pubMock.EXPECT().PublishEvent(&event.NewURLEvent{URL: url}).Return(nil) - state := State{urlCache: urlCacheMock, configClient: configClientMock} - if err := state.processURL(url, pubMock); err != nil { + state := State{configClient: configClientMock} + if err := state.processURL(url, pubMock, urlCache); err != nil { t.Fail() } + + if val, exist := urlCache[url]; !exist || val != 1 { + t.Fail() + } + } +} + +func TestHandleNewResourceEvent(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + subscriberMock := event_mock.NewMockSubscriber(mockCtrl) + urlCacheMock := cache_mock.NewMockCache(mockCtrl) + configClientMock := client_mock.NewMockClient(mockCtrl) + + msg := event.RawMessage{} + subscriberMock.EXPECT(). + Read(&msg, &event.NewResourceEvent{}). + SetArg(1, event.NewResourceEvent{ + URL: "https://l.facebookcorewwwi.onion/test.php", + Body: ` +This is a little test. +Check out https://google.onion. This is an image https://example.onion/test.png +This domain is blacklisted: https://m.fbi.onion/test.php +`, + }). + Return(nil) + + urlCacheMock.EXPECT(). + GetManyInt64([]string{"https://facebook.onion/test.php?id=1", "https://google.onion", "https://example.onion/test.png", "https://m.fbi.onion/test.php"}). + Return(map[string]int64{ + "https://google.onion": 1, + }, nil) + + configClientMock.EXPECT().GetAllowedMimeTypes(). + Times(4). + Return([]client.MimeType{{Extensions: []string{"php"}}}, nil) + configClientMock.EXPECT().GetForbiddenHostnames(). + Times(3). + Return([]client.ForbiddenHostname{ + {Hostname: "fbi.onion"}, + }, nil) + configClientMock.EXPECT().GetRefreshDelay().Return(client.RefreshDelay{Delay: -1}, nil) + + subscriberMock.EXPECT().PublishEvent(&event.NewURLEvent{ + URL: "https://facebook.onion/test.php?id=1", + }) + + urlCacheMock.EXPECT().SetManyInt64(map[string]int64{ + "https://google.onion": 1, + "https://facebook.onion/test.php?id=1": 1, + }, cache.NoTTL).Return(nil) + + s := State{urlCache: urlCacheMock, configClient: configClientMock} + if err := s.handleNewResourceEvent(subscriberMock, msg); err != nil { + t.Fail() } } From afed403e6a5f8e426c48ea382f690cf50d1a3813 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alo=C3=AFs=20Micard?= Date: Thu, 7 Jan 2021 18:56:43 +0100 Subject: [PATCH 04/11] Remove useless regex --- internal/scheduler/scheduler.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 16315b8..9c99ff1 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -14,7 +14,6 @@ import ( "mvdan.cc/xurls/v2" "net/http" "net/url" - "regexp" "strings" ) @@ -24,8 +23,6 @@ var ( errExtensionNotAllowed = errors.New("extension is not allowed") errHostnameNotAllowed = errors.New("hostname is not allowed") errAlreadyScheduled = errors.New("URL is already scheduled") - - extensionRegex = regexp.MustCompile("\\.[\\w]+") ) // State represent the application state @@ -161,7 +158,7 @@ func (state *State) processURL(rawURL string, pub event.Publisher, urlCache map[ } // generally no extension means text/* content-type - if !extensionRegex.MatchString(components[lastIdx]) { + if !strings.Contains(components[lastIdx], ".") { allowed = true } } From 84a28c5be0262a64cc25bf36611d5eefdb430023 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alo=C3=AFs=20Micard?= Date: Thu, 7 Jan 2021 18:59:45 +0100 Subject: [PATCH 05/11] scheduler: increase event prefetch --- deployments/docker/docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/deployments/docker/docker-compose.yml b/deployments/docker/docker-compose.yml index 97fcc77..88d9029 100644 --- a/deployments/docker/docker-compose.yml +++ b/deployments/docker/docker-compose.yml @@ -45,6 +45,7 @@ services: command: > --log-level debug --event-srv amqp://guest:guest@rabbitmq:5672 + --event-prefetch 20 --config-api http://configapi:8080 --redis redis:6379 restart: always From faee8b48c14775b4d8e2a2a863bc87c1297ed6fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alo=C3=AFs=20Micard?= Date: Thu, 7 Jan 2021 20:58:33 +0100 Subject: [PATCH 06/11] indexer: sort headers to have deterministic output --- internal/indexer/index/local.go | 12 ++++++++++-- internal/indexer/index/local_test.go | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/internal/indexer/index/local.go b/internal/indexer/index/local.go index 9b84f92..8d4c8af 100644 --- a/internal/indexer/index/local.go +++ b/internal/indexer/index/local.go @@ -7,6 +7,7 @@ import ( "net/url" "os" "path/filepath" + "sort" "strconv" "strings" "time" @@ -63,9 +64,16 @@ func formatResource(url string, body string, headers map[string]string) ([]byte, // First URL builder.WriteString(fmt.Sprintf("%s\n\n", url)) + // Sort headers to have deterministic output + var headerNames []string + for headerName := range headers { + headerNames = append(headerNames, headerName) + } + sort.Strings(headerNames) + // Then headers - for key, value := range headers { - builder.WriteString(fmt.Sprintf("%s: %s\n", key, value)) + for _, name := range headerNames { + builder.WriteString(fmt.Sprintf("%s: %s\n", name, headers[name])) } builder.WriteString("\n") diff --git a/internal/indexer/index/local_test.go b/internal/indexer/index/local_test.go index 00bca29..b349e8e 100644 --- a/internal/indexer/index/local_test.go +++ b/internal/indexer/index/local_test.go @@ -134,7 +134,7 @@ func TestFormatResource(t *testing.T) { t.FailNow() } - if string(res) != "https://google.com\n\nServer: Traefik\nContent-Type: text/html\n\nHello, world" { + if string(res) != "https://google.com\n\nContent-Type: text/html\nServer: Traefik\n\nHello, world" { t.Errorf("got %s want %s", string(res), "https://google.com\n\nServer: Traefik\nContent-Type: text/html\n\nHello, world") } } From e07ed8156e39ebae647b372fef1ea157a6332d50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alo=C3=AFs=20Micard?= Date: Fri, 8 Jan 2021 13:00:39 +0100 Subject: [PATCH 07/11] scheduler: hash url before caching it --- internal/scheduler/scheduler.go | 27 ++++++++++++++++++++++++--- internal/scheduler/scheduler_test.go | 21 +++++++++++++++------ 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 9c99ff1..b81e64b 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -11,9 +11,11 @@ import ( "github.com/creekorful/trandoshan/internal/process" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" + "hash/fnv" "mvdan.cc/xurls/v2" "net/http" "net/url" + "strconv" "strings" ) @@ -89,8 +91,20 @@ func (state *State) handleNewResourceEvent(subscriber event.Subscriber, msg even return fmt.Errorf("error while extracting URLs") } + // We are working using URL hash to reduce memory consumption. + // See: https://github.com/creekorful/trandoshan/issues/130 + var urlHashes []string + for _, u := range urls { + c := fnv.New64() + if _, err := c.Write([]byte(u)); err != nil { + return fmt.Errorf("error while computing url hash: %s", err) + } + + urlHashes = append(urlHashes, strconv.FormatUint(c.Sum64(), 10)) + } + // Load values in batch - urlCache, err := state.urlCache.GetManyInt64(urls) + urlCache, err := state.urlCache.GetManyInt64(urlHashes) if err != nil { return err } @@ -175,14 +189,21 @@ func (state *State) processURL(rawURL string, pub event.Publisher, urlCache map[ return fmt.Errorf("%s %w", u, errHostnameNotAllowed) } + // Compute url hash + c := fnv.New64() + if _, err := c.Write([]byte(rawURL)); err != nil { + return fmt.Errorf("error while computing url hash: %s", err) + } + urlHash := strconv.FormatUint(c.Sum64(), 10) + // Check if URL should be scheduled - if urlCache[rawURL] > 0 { + if urlCache[urlHash] > 0 { return fmt.Errorf("%s %w", u, errAlreadyScheduled) } log.Debug().Stringer("url", u).Msg("URL should be scheduled") - urlCache[rawURL]++ + urlCache[urlHash]++ if err := pub.PublishEvent(&event.NewURLEvent{URL: rawURL}); err != nil { return fmt.Errorf("error while publishing URL: %s", err) diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index dbbff27..7e7807b 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -12,6 +12,8 @@ import ( "github.com/creekorful/trandoshan/internal/process_mock" "github.com/creekorful/trandoshan/internal/test" "github.com/golang/mock/gomock" + "hash/fnv" + "strconv" "testing" ) @@ -153,7 +155,7 @@ func TestProcessURL_AlreadyScheduled(t *testing.T) { configClientMock.EXPECT().GetAllowedMimeTypes().Return([]client.MimeType{{Extensions: []string{"html", "php"}}}, nil) configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{}, nil) - urlCache := map[string]int64{"https://facebookcorewwi.onion/test.php?id=12": 1} + urlCache := map[string]int64{"3056224523184958": 1} state := State{configClient: configClientMock} if err := state.processURL("https://facebookcorewwi.onion/test.php?id=12", nil, urlCache); !errors.Is(err, errAlreadyScheduled) { t.Fail() @@ -183,7 +185,14 @@ func TestProcessURL(t *testing.T) { t.Fail() } - if val, exist := urlCache[url]; !exist || val != 1 { + // Compute url hash + c := fnv.New64() + if _, err := c.Write([]byte(url)); err != nil { + t.Error(err) + } + urlHash := strconv.FormatUint(c.Sum64(), 10) + + if val, exist := urlCache[urlHash]; !exist || val != 1 { t.Fail() } } @@ -211,9 +220,9 @@ This domain is blacklisted: https://m.fbi.onion/test.php Return(nil) urlCacheMock.EXPECT(). - GetManyInt64([]string{"https://facebook.onion/test.php?id=1", "https://google.onion", "https://example.onion/test.png", "https://m.fbi.onion/test.php"}). + GetManyInt64([]string{"15038381360563270096", "17173291053643777680", "14332094874591870497", "5985629257333875968"}). Return(map[string]int64{ - "https://google.onion": 1, + "17173291053643777680": 1, }, nil) configClientMock.EXPECT().GetAllowedMimeTypes(). @@ -231,8 +240,8 @@ This domain is blacklisted: https://m.fbi.onion/test.php }) urlCacheMock.EXPECT().SetManyInt64(map[string]int64{ - "https://google.onion": 1, - "https://facebook.onion/test.php?id=1": 1, + "17173291053643777680": 1, + "15038381360563270096": 1, }, cache.NoTTL).Return(nil) s := State{urlCache: urlCacheMock, configClient: configClientMock} From 69352f7237750d8cd3f49aecb42e14a8a944eda7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alo=C3=AFs=20Micard?= Date: Thu, 7 Jan 2021 20:58:33 +0100 Subject: [PATCH 08/11] indexer: sort headers to have deterministic output --- internal/indexer/index/local.go | 12 ++++++++++-- internal/indexer/index/local_test.go | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/internal/indexer/index/local.go b/internal/indexer/index/local.go index 9b84f92..8d4c8af 100644 --- a/internal/indexer/index/local.go +++ b/internal/indexer/index/local.go @@ -7,6 +7,7 @@ import ( "net/url" "os" "path/filepath" + "sort" "strconv" "strings" "time" @@ -63,9 +64,16 @@ func formatResource(url string, body string, headers map[string]string) ([]byte, // First URL builder.WriteString(fmt.Sprintf("%s\n\n", url)) + // Sort headers to have deterministic output + var headerNames []string + for headerName := range headers { + headerNames = append(headerNames, headerName) + } + sort.Strings(headerNames) + // Then headers - for key, value := range headers { - builder.WriteString(fmt.Sprintf("%s: %s\n", key, value)) + for _, name := range headerNames { + builder.WriteString(fmt.Sprintf("%s: %s\n", name, headers[name])) } builder.WriteString("\n") diff --git a/internal/indexer/index/local_test.go b/internal/indexer/index/local_test.go index 00bca29..b349e8e 100644 --- a/internal/indexer/index/local_test.go +++ b/internal/indexer/index/local_test.go @@ -134,7 +134,7 @@ func TestFormatResource(t *testing.T) { t.FailNow() } - if string(res) != "https://google.com\n\nServer: Traefik\nContent-Type: text/html\n\nHello, world" { + if string(res) != "https://google.com\n\nContent-Type: text/html\nServer: Traefik\n\nHello, world" { t.Errorf("got %s want %s", string(res), "https://google.com\n\nServer: Traefik\nContent-Type: text/html\n\nHello, world") } } From 871daf1bcc5641b919d1ef3c6be154270d1de760 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alo=C3=AFs=20Micard?= Date: Fri, 8 Jan 2021 13:00:39 +0100 Subject: [PATCH 09/11] scheduler: hash url before caching it --- internal/scheduler/scheduler.go | 27 ++++++++++++++++++++++++--- internal/scheduler/scheduler_test.go | 21 +++++++++++++++------ 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 9c99ff1..b81e64b 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -11,9 +11,11 @@ import ( "github.com/creekorful/trandoshan/internal/process" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" + "hash/fnv" "mvdan.cc/xurls/v2" "net/http" "net/url" + "strconv" "strings" ) @@ -89,8 +91,20 @@ func (state *State) handleNewResourceEvent(subscriber event.Subscriber, msg even return fmt.Errorf("error while extracting URLs") } + // We are working using URL hash to reduce memory consumption. + // See: https://github.com/creekorful/trandoshan/issues/130 + var urlHashes []string + for _, u := range urls { + c := fnv.New64() + if _, err := c.Write([]byte(u)); err != nil { + return fmt.Errorf("error while computing url hash: %s", err) + } + + urlHashes = append(urlHashes, strconv.FormatUint(c.Sum64(), 10)) + } + // Load values in batch - urlCache, err := state.urlCache.GetManyInt64(urls) + urlCache, err := state.urlCache.GetManyInt64(urlHashes) if err != nil { return err } @@ -175,14 +189,21 @@ func (state *State) processURL(rawURL string, pub event.Publisher, urlCache map[ return fmt.Errorf("%s %w", u, errHostnameNotAllowed) } + // Compute url hash + c := fnv.New64() + if _, err := c.Write([]byte(rawURL)); err != nil { + return fmt.Errorf("error while computing url hash: %s", err) + } + urlHash := strconv.FormatUint(c.Sum64(), 10) + // Check if URL should be scheduled - if urlCache[rawURL] > 0 { + if urlCache[urlHash] > 0 { return fmt.Errorf("%s %w", u, errAlreadyScheduled) } log.Debug().Stringer("url", u).Msg("URL should be scheduled") - urlCache[rawURL]++ + urlCache[urlHash]++ if err := pub.PublishEvent(&event.NewURLEvent{URL: rawURL}); err != nil { return fmt.Errorf("error while publishing URL: %s", err) diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index dbbff27..7e7807b 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -12,6 +12,8 @@ import ( "github.com/creekorful/trandoshan/internal/process_mock" "github.com/creekorful/trandoshan/internal/test" "github.com/golang/mock/gomock" + "hash/fnv" + "strconv" "testing" ) @@ -153,7 +155,7 @@ func TestProcessURL_AlreadyScheduled(t *testing.T) { configClientMock.EXPECT().GetAllowedMimeTypes().Return([]client.MimeType{{Extensions: []string{"html", "php"}}}, nil) configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{}, nil) - urlCache := map[string]int64{"https://facebookcorewwi.onion/test.php?id=12": 1} + urlCache := map[string]int64{"3056224523184958": 1} state := State{configClient: configClientMock} if err := state.processURL("https://facebookcorewwi.onion/test.php?id=12", nil, urlCache); !errors.Is(err, errAlreadyScheduled) { t.Fail() @@ -183,7 +185,14 @@ func TestProcessURL(t *testing.T) { t.Fail() } - if val, exist := urlCache[url]; !exist || val != 1 { + // Compute url hash + c := fnv.New64() + if _, err := c.Write([]byte(url)); err != nil { + t.Error(err) + } + urlHash := strconv.FormatUint(c.Sum64(), 10) + + if val, exist := urlCache[urlHash]; !exist || val != 1 { t.Fail() } } @@ -211,9 +220,9 @@ This domain is blacklisted: https://m.fbi.onion/test.php Return(nil) urlCacheMock.EXPECT(). - GetManyInt64([]string{"https://facebook.onion/test.php?id=1", "https://google.onion", "https://example.onion/test.png", "https://m.fbi.onion/test.php"}). + GetManyInt64([]string{"15038381360563270096", "17173291053643777680", "14332094874591870497", "5985629257333875968"}). Return(map[string]int64{ - "https://google.onion": 1, + "17173291053643777680": 1, }, nil) configClientMock.EXPECT().GetAllowedMimeTypes(). @@ -231,8 +240,8 @@ This domain is blacklisted: https://m.fbi.onion/test.php }) urlCacheMock.EXPECT().SetManyInt64(map[string]int64{ - "https://google.onion": 1, - "https://facebook.onion/test.php?id=1": 1, + "17173291053643777680": 1, + "15038381360563270096": 1, }, cache.NoTTL).Return(nil) s := State{urlCache: urlCacheMock, configClient: configClientMock} From 6c678478a19ff49abdcaec285eacd677bc3d1185 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alo=C3=AFs=20Micard?= Date: Fri, 8 Jan 2021 20:28:35 +0100 Subject: [PATCH 10/11] Implement better blacklist config --- deployments/docker/docker-compose.yml | 4 +-- internal/blacklister/blacklister.go | 26 +++++++++++---- internal/blacklister/blacklister_test.go | 22 +++++++++---- internal/cache/cache.go | 2 ++ internal/cache/redis.go | 4 +++ internal/configapi/client/client.go | 40 +++++++++++++----------- internal/scheduler/scheduler.go | 7 +---- internal/scheduler/scheduler_test.go | 2 +- 8 files changed, 68 insertions(+), 39 deletions(-) diff --git a/deployments/docker/docker-compose.yml b/deployments/docker/docker-compose.yml index 88d9029..3ceb156 100644 --- a/deployments/docker/docker-compose.yml +++ b/deployments/docker/docker-compose.yml @@ -89,8 +89,8 @@ services: --redis redis:6379 --default-value forbidden-hostnames="[]" --default-value allowed-mime-types="[{\"content-type\":\"text/\",\"extensions\":[\"html\",\"php\",\"aspx\", \"htm\"]}]" - --default-value refresh-delay="{\"delay\": -1}" - --default-value blacklist-threshold="{\"threshold\": 5}" + --default-value refresh-delay="{\"delay\": 0}" + --default-value blacklist-config="{\"threshold\": 5, \"ttl\": 1200}" restart: always depends_on: - rabbitmq diff --git a/internal/blacklister/blacklister.go b/internal/blacklister/blacklister.go index a69140b..c9147e6 100644 --- a/internal/blacklister/blacklister.go +++ b/internal/blacklister/blacklister.go @@ -45,7 +45,7 @@ func (state *State) Initialize(provider process.Provider) error { } state.hostnameCache = hostnameCache - configClient, err := provider.ConfigClient([]string{configapi.ForbiddenHostnamesKey, configapi.BlackListThresholdKey}) + configClient, err := provider.ConfigClient([]string{configapi.ForbiddenHostnamesKey, configapi.BlackListConfigKey}) if err != nil { return err } @@ -104,7 +104,22 @@ func (state *State) handleTimeoutURLEvent(subscriber event.Subscriber, msg event // Check by ourselves if the hostname doesn't respond _, err = state.httpClient.Get(fmt.Sprintf("%s://%s", u.Scheme, u.Host)) - if err == nil || err != chttp.ErrTimeout { + if err != nil && err != chttp.ErrTimeout { + return err + } + + cacheKey := u.Hostname() + + if err == nil { + log.Debug(). + Str("hostname", u.Hostname()). + Msg("Response received.") + + // Host is not down, remove it from cache + if err := state.hostnameCache.Remove(cacheKey); err != nil { + return err + } + return nil } @@ -112,19 +127,18 @@ func (state *State) handleTimeoutURLEvent(subscriber event.Subscriber, msg event Str("hostname", u.Hostname()). Msg("Timeout confirmed") - threshold, err := state.configClient.GetBlackListThreshold() + blackListConfig, err := state.configClient.GetBlackListConfig() if err != nil { return err } - cacheKey := u.Hostname() count, err := state.hostnameCache.GetInt64(cacheKey) if err != nil { return err } count++ - if count >= threshold.Threshold { + if count >= blackListConfig.Threshold { forbiddenHostnames, err := state.configClient.GetForbiddenHostnames() if err != nil { return err @@ -155,7 +169,7 @@ func (state *State) handleTimeoutURLEvent(subscriber event.Subscriber, msg event } // Update count - if err := state.hostnameCache.SetInt64(cacheKey, count, cache.NoTTL); err != nil { + if err := state.hostnameCache.SetInt64(cacheKey, count, blackListConfig.TTL); err != nil { return err } diff --git a/internal/blacklister/blacklister_test.go b/internal/blacklister/blacklister_test.go index a2e26df..64f13f1 100644 --- a/internal/blacklister/blacklister_test.go +++ b/internal/blacklister/blacklister_test.go @@ -2,7 +2,6 @@ package blacklister import ( "errors" - "github.com/creekorful/trandoshan/internal/cache" "github.com/creekorful/trandoshan/internal/cache_mock" configapi "github.com/creekorful/trandoshan/internal/configapi/client" "github.com/creekorful/trandoshan/internal/configapi/client_mock" @@ -15,6 +14,7 @@ import ( "github.com/creekorful/trandoshan/internal/test" "github.com/golang/mock/gomock" "testing" + "time" ) func TestState_Name(t *testing.T) { @@ -37,7 +37,7 @@ func TestState_CustomFlags(t *testing.T) { func TestState_Initialize(t *testing.T) { test.CheckInitialize(t, &State{}, func(p *process_mock.MockProviderMockRecorder) { p.Cache("down-hostname") - p.ConfigClient([]string{configapi.ForbiddenHostnamesKey, configapi.BlackListThresholdKey}) + p.ConfigClient([]string{configapi.ForbiddenHostnamesKey, configapi.BlackListConfigKey}) p.HTTPClient() }) } @@ -69,6 +69,8 @@ func TestHandleTimeoutURLEventNoTimeout(t *testing.T) { httpClientMock.EXPECT().Get("https://down-example.onion:8080").Return(httpResponseMock, nil) configClientMock.EXPECT().GetForbiddenHostnames().Return([]configapi.ForbiddenHostname{}, nil) + hostnameCacheMock.EXPECT().Remove("down-example.onion") + s := State{configClient: configClientMock, hostnameCache: hostnameCacheMock, httpClient: httpClientMock} if err := s.handleTimeoutURLEvent(subscriberMock, msg); err != nil { t.Fail() @@ -94,10 +96,13 @@ func TestHandleTimeoutURLEventNoDispatch(t *testing.T) { httpClientMock.EXPECT().Get("https://down-example.onion").Return(httpResponseMock, http.ErrTimeout) configClientMock.EXPECT().GetForbiddenHostnames().Return([]configapi.ForbiddenHostname{}, nil) - configClientMock.EXPECT().GetBlackListThreshold().Return(configapi.BlackListThreshold{Threshold: 10}, nil) + configClientMock.EXPECT().GetBlackListConfig().Return(configapi.BlackListConfig{ + Threshold: 10, + TTL: 5, + }, nil) hostnameCacheMock.EXPECT().GetInt64("down-example.onion").Return(int64(0), nil) - hostnameCacheMock.EXPECT().SetInt64("down-example.onion", int64(1), cache.NoTTL).Return(nil) + hostnameCacheMock.EXPECT().SetInt64("down-example.onion", int64(1), time.Duration(5)).Return(nil) s := State{configClient: configClientMock, hostnameCache: hostnameCacheMock, httpClient: httpClientMock} if err := s.handleTimeoutURLEvent(subscriberMock, msg); err != nil { @@ -124,7 +129,10 @@ func TestHandleTimeoutURLEvent(t *testing.T) { httpClientMock.EXPECT().Get("https://down-example.onion").Return(httpResponseMock, http.ErrTimeout) configClientMock.EXPECT().GetForbiddenHostnames().Return([]configapi.ForbiddenHostname{}, nil) - configClientMock.EXPECT().GetBlackListThreshold().Return(configapi.BlackListThreshold{Threshold: 10}, nil) + configClientMock.EXPECT().GetBlackListConfig().Return(configapi.BlackListConfig{ + Threshold: 10, + TTL: 5, + }, nil) hostnameCacheMock.EXPECT().GetInt64("down-example.onion").Return(int64(9), nil) @@ -138,7 +146,9 @@ func TestHandleTimeoutURLEvent(t *testing.T) { }). Return(nil) - hostnameCacheMock.EXPECT().SetInt64("down-example.onion", int64(10), cache.NoTTL).Return(nil) + hostnameCacheMock.EXPECT(). + SetInt64("down-example.onion", int64(10), time.Duration(5)). + Return(nil) s := State{configClient: configClientMock, hostnameCache: hostnameCacheMock, httpClient: httpClientMock} if err := s.handleTimeoutURLEvent(subscriberMock, msg); err != nil { diff --git a/internal/cache/cache.go b/internal/cache/cache.go index a2e8285..0a550fb 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -21,4 +21,6 @@ type Cache interface { GetManyInt64(keys []string) (map[string]int64, error) SetManyInt64(values map[string]int64, TTL time.Duration) error + + Remove(key string) error } diff --git a/internal/cache/redis.go b/internal/cache/redis.go index 41d58cb..1b78608 100644 --- a/internal/cache/redis.go +++ b/internal/cache/redis.go @@ -92,6 +92,10 @@ func (rc *redisCache) SetManyInt64(values map[string]int64, TTL time.Duration) e return err } +func (rc *redisCache) Remove(key string) error { + return rc.client.Del(context.Background(), rc.getKey(key)).Err() +} + func (rc *redisCache) getKey(key string) string { if rc.keyPrefix == "" { return key diff --git a/internal/configapi/client/client.go b/internal/configapi/client/client.go index 268dc9a..77106a0 100644 --- a/internal/configapi/client/client.go +++ b/internal/configapi/client/client.go @@ -21,8 +21,8 @@ const ( ForbiddenHostnamesKey = "forbidden-hostnames" // RefreshDelayKey is the key to access the refresh delay config RefreshDelayKey = "refresh-delay" - // BlackListThresholdKey is the key to access the blacklist threshold config - BlackListThresholdKey = "blacklist-threshold" + // BlackListConfigKey is the key to access the blacklist configuration + BlackListConfigKey = "blacklist-config" ) // MimeType is the mime type as represented in the config @@ -43,9 +43,10 @@ type RefreshDelay struct { Delay time.Duration `json:"delay"` } -// BlackListThreshold is the threshold to reach before blacklisting domain -type BlackListThreshold struct { - Threshold int64 `json:"threshold"` +// BlackListConfig is the config used for hostname blacklisting +type BlackListConfig struct { + Threshold int64 `json:"threshold"` + TTL time.Duration `json:"ttl"` } // Client is a nice client interface for the ConfigAPI @@ -53,7 +54,7 @@ type Client interface { GetAllowedMimeTypes() ([]MimeType, error) GetForbiddenHostnames() ([]ForbiddenHostname, error) GetRefreshDelay() (RefreshDelay, error) - GetBlackListThreshold() (BlackListThreshold, error) + GetBlackListConfig() (BlackListConfig, error) Set(key string, value interface{}) error } @@ -68,7 +69,7 @@ type client struct { allowedMimeTypes []MimeType forbiddenHostnames []ForbiddenHostname refreshDelay RefreshDelay - blackListThreshold BlackListThreshold + blackListConfig BlackListConfig } // NewConfigClient create a new client for the ConfigAPI. @@ -150,18 +151,21 @@ func (c *client) setRefreshDelay(value RefreshDelay) error { return nil } -func (c *client) GetBlackListThreshold() (BlackListThreshold, error) { - c.mutexes[BlackListThresholdKey].RLock() - defer c.mutexes[BlackListThresholdKey].RUnlock() +func (c *client) GetBlackListConfig() (BlackListConfig, error) { + c.mutexes[BlackListConfigKey].RLock() + defer c.mutexes[BlackListConfigKey].RUnlock() - return c.blackListThreshold, nil + return c.blackListConfig, nil } -func (c *client) setBlackListThreshold(value BlackListThreshold) error { - c.mutexes[BlackListThresholdKey].Lock() - defer c.mutexes[BlackListThresholdKey].Unlock() +func (c *client) setBlackListConfig(value BlackListConfig) error { + c.mutexes[BlackListConfigKey].Lock() + defer c.mutexes[BlackListConfigKey].Unlock() - c.blackListThreshold = value + c.blackListConfig = BlackListConfig{ + Threshold: value.Threshold, + TTL: value.TTL * time.Second, // TTL is in seconds + } return nil } @@ -232,12 +236,12 @@ func (c *client) setValue(key string, value []byte) error { return err } break - case BlackListThresholdKey: - var val BlackListThreshold + case BlackListConfigKey: + var val BlackListConfig if err := json.Unmarshal(value, &val); err != nil { return err } - if err := c.setBlackListThreshold(val); err != nil { + if err := c.setBlackListConfig(val); err != nil { return err } break diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index b81e64b..10efc1b 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -121,13 +121,8 @@ func (state *State) handleNewResourceEvent(subscriber event.Subscriber, msg even return err } - ttl := delay.Delay - if ttl == -1 { - ttl = cache.NoTTL - } - // Update values in batch - if err := state.urlCache.SetManyInt64(urlCache, ttl); err != nil { + if err := state.urlCache.SetManyInt64(urlCache, delay.Delay); err != nil { return err } diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 7e7807b..b4e54eb 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -233,7 +233,7 @@ This domain is blacklisted: https://m.fbi.onion/test.php Return([]client.ForbiddenHostname{ {Hostname: "fbi.onion"}, }, nil) - configClientMock.EXPECT().GetRefreshDelay().Return(client.RefreshDelay{Delay: -1}, nil) + configClientMock.EXPECT().GetRefreshDelay().Return(client.RefreshDelay{Delay: 0}, nil) subscriberMock.EXPECT().PublishEvent(&event.NewURLEvent{ URL: "https://facebook.onion/test.php?id=1", From b657123d915cc317120fcae8753b1006feb451d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alo=C3=AFs=20Micard?= Date: Fri, 8 Jan 2021 23:56:06 +0100 Subject: [PATCH 11/11] Bump version --- internal/process/process.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/process/process.go b/internal/process/process.go index 0047d3f..b284135 100644 --- a/internal/process/process.go +++ b/internal/process/process.go @@ -27,7 +27,7 @@ import ( type Feature int const ( - version = "0.10.0" + version = "0.11.0" // EventFeature is the feature to plug the process to the event server EventFeature Feature = iota