Merge remote-tracking branch 'origin/develop' into 124-improve-scheduler-speed

pull/127/head
Aloïs Micard 3 years ago
commit 4e33813b21
No known key found for this signature in database
GPG Key ID: 1A0EB82F071F5EFE

@ -32,29 +32,32 @@ services:
image: creekorful/tdsh-crawler:latest
command: >
--log-level debug
--hub-uri amqp://guest:guest@rabbitmq:5672
--tor-uri torproxy:9050
--config-api-uri http://configapi:8080
--event-srv amqp://guest:guest@rabbitmq:5672
--tor-proxy torproxy:9050
--config-api http://configapi:8080
restart: always
depends_on:
- rabbitmq
- torproxy
- configapi
scheduler:
image: creekorful/tdsh-scheduler:latest
command: >
--log-level debug
--hub-uri amqp://guest:guest@rabbitmq:5672
--config-api-uri http://configapi:8080
--redis-uri redis:6379
--event-srv amqp://guest:guest@rabbitmq:5672
--config-api http://configapi:8080
--redis redis:6379
restart: always
depends_on:
- rabbitmq
- configapi
- redis
indexer-local:
image: creekorful/tdsh-indexer:latest
command: >
--log-level debug
--hub-uri amqp://guest:guest@rabbitmq:5672
--config-api-uri http://configapi:8080
--event-srv amqp://guest:guest@rabbitmq:5672
--config-api http://configapi:8080
--index-driver local
--index-dest /archive
restart: always
@ -67,8 +70,9 @@ services:
image: creekorful/tdsh-indexer:latest
command: >
--log-level debug
--hub-uri amqp://guest:guest@rabbitmq:5672
--config-api-uri http://configapi:8080
--event-srv amqp://guest:guest@rabbitmq:5672
--event-prefetch 20
--config-api http://configapi:8080
--index-driver elastic
--index-dest http://elasticsearch:9200
restart: always
@ -80,8 +84,8 @@ services:
image: creekorful/tdsh-configapi:latest
command: >
--log-level debug
--hub-uri amqp://guest:guest@rabbitmq:5672
--redis-uri redis:6379
--event-srv amqp://guest:guest@rabbitmq:5672
--redis redis:6379
--default-value forbidden-hostnames="[]"
--default-value allowed-mime-types="[{\"content-type\":\"text/\",\"extensions\":[\"html\",\"php\",\"aspx\", \"htm\"]}]"
--default-value refresh-delay="{\"delay\": -1}"
@ -96,10 +100,10 @@ services:
image: creekorful/tdsh-blacklister:latest
command: >
--log-level debug
--hub-uri amqp://guest:guest@rabbitmq:5672
--config-api-uri http://configapi:8080
--redis-uri redis:6379
--tor-uri torproxy:9050
--event-srv amqp://guest:guest@rabbitmq:5672
--config-api http://configapi:8080
--redis redis:6379
--tor-proxy torproxy:9050
restart: always
depends_on:
- rabbitmq

@ -27,9 +27,9 @@ func (state *State) Name() string {
return "blacklister"
}
// CommonFlags return process common flags
func (state *State) CommonFlags() []string {
return []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag, process.UserAgentFlag, process.TorURIFlag}
// Features return the process features
func (state *State) Features() []process.Feature {
return []process.Feature{process.EventFeature, process.ConfigFeature, process.CacheFeature, process.CrawlingFeature}
}
// CustomFlags return process custom flags

@ -24,9 +24,9 @@ func TestState_Name(t *testing.T) {
}
}
func TestState_CommonFlags(t *testing.T) {
func TestState_Features(t *testing.T) {
s := State{}
test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag, process.UserAgentFlag, process.TorURIFlag})
test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.ConfigFeature, process.CacheFeature, process.CrawlingFeature})
}
func TestState_CustomFlags(t *testing.T) {

@ -24,9 +24,9 @@ func (state *State) Name() string {
return "configapi"
}
// CommonFlags return process common flags
func (state *State) CommonFlags() []string {
return []string{process.HubURIFlag, process.RedisURIFlag}
// Features return the process features
func (state *State) Features() []process.Feature {
return []process.Feature{process.EventFeature, process.CacheFeature}
}
// CustomFlags return process custom flags
@ -54,7 +54,7 @@ func (state *State) Initialize(provider process.Provider) error {
state.pub = pub
defaultValues := map[string]string{}
for _, value := range provider.GetValues("default-value") {
for _, value := range provider.GetStrValues("default-value") {
parts := strings.Split(value, "=")
if len(parts) == 2 {

@ -24,9 +24,9 @@ func TestState_Name(t *testing.T) {
}
}
func TestState_CommonFlags(t *testing.T) {
func TestState_Features(t *testing.T) {
s := State{}
test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.RedisURIFlag})
test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.CacheFeature})
}
func TestState_CustomFlags(t *testing.T) {
@ -38,7 +38,7 @@ func TestState_Initialize(t *testing.T) {
test.CheckInitialize(t, &State{}, func(p *process_mock.MockProviderMockRecorder) {
p.Cache("configuration")
p.Publisher()
p.GetValues("default-value")
p.GetStrValues("default-value")
})
}

@ -32,9 +32,9 @@ func (state *State) Name() string {
return "crawler"
}
// CommonFlags return process common flags
func (state *State) CommonFlags() []string {
return []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.UserAgentFlag, process.TorURIFlag}
// Features return the process features
func (state *State) Features() []process.Feature {
return []process.Feature{process.EventFeature, process.ConfigFeature, process.CrawlingFeature}
}
// CustomFlags return process custom flags

@ -25,9 +25,9 @@ func TestState_Name(t *testing.T) {
}
}
func TestState_CommonFlags(t *testing.T) {
func TestState_Features(t *testing.T) {
s := State{}
test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.UserAgentFlag, process.TorURIFlag})
test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.ConfigFeature, process.CrawlingFeature})
}
func TestState_CustomFlags(t *testing.T) {

@ -36,7 +36,7 @@ type subscriber struct {
}
// NewSubscriber create a new subscriber and connect it to given server
func NewSubscriber(amqpURI string) (Subscriber, error) {
func NewSubscriber(amqpURI string, prefetch int) (Subscriber, error) {
conn, err := amqp.Dial(amqpURI)
if err != nil {
return nil, err
@ -46,7 +46,7 @@ func NewSubscriber(amqpURI string) (Subscriber, error) {
if err != nil {
return nil, err
}
if err := c.Qos(1, 0, false); err != nil {
if err := c.Qos(prefetch, 0, false); err != nil {
return nil, err
}

@ -9,8 +9,7 @@ import (
"time"
)
var resourcesIndex = "resources"
const resourcesIndexName = "resources"
const mapping = `
{
"settings": {
@ -93,29 +92,48 @@ func newElasticIndex(uri string) (Index, error) {
}, nil
}
func (e *elasticSearchIndex) IndexResource(url string, time time.Time, body string, headers map[string]string) error {
res, err := extractResource(url, time, body, headers)
func (e *elasticSearchIndex) IndexResource(resource Resource) error {
res, err := indexResource(resource)
if err != nil {
return err
}
_, err = e.client.Index().
Index(resourcesIndex).
Index(resourcesIndexName).
BodyJson(res).
Do(context.Background())
return err
}
func (e *elasticSearchIndex) IndexResources(resources []Resource) error {
bulkRequest := e.client.Bulk()
for _, resource := range resources {
resourceIndex, err := indexResource(resource)
if err != nil {
return err
}
req := elastic.NewBulkIndexRequest().
Index(resourcesIndexName).
Doc(resourceIndex)
bulkRequest.Add(req)
}
_, err := bulkRequest.Do(context.Background())
return err
}
func setupElasticSearch(ctx context.Context, es *elastic.Client) error {
// Setup index if doesn't exist
exist, err := es.IndexExists(resourcesIndex).Do(ctx)
exist, err := es.IndexExists(resourcesIndexName).Do(ctx)
if err != nil {
return err
}
if !exist {
log.Debug().Str("index", resourcesIndex).Msg("Creating missing index")
log.Debug().Str("index", resourcesIndexName).Msg("Creating missing index")
q := es.CreateIndex(resourcesIndex).BodyString(mapping)
q := es.CreateIndex(resourcesIndexName).BodyString(mapping)
if _, err := q.Do(ctx); err != nil {
return err
}
@ -124,8 +142,8 @@ func setupElasticSearch(ctx context.Context, es *elastic.Client) error {
return nil
}
func extractResource(url string, time time.Time, body string, headers map[string]string) (*resourceIdx, error) {
doc, err := goquery.NewDocumentFromReader(strings.NewReader(body))
func indexResource(resource Resource) (*resourceIdx, error) {
doc, err := goquery.NewDocumentFromReader(strings.NewReader(resource.Body))
if err != nil {
return nil, err
}
@ -152,14 +170,14 @@ func extractResource(url string, time time.Time, body string, headers map[string
// Lowercase headers
lowerCasedHeaders := map[string]string{}
for key, value := range headers {
for key, value := range resource.Headers {
lowerCasedHeaders[strings.ToLower(key)] = value
}
return &resourceIdx{
URL: url,
Body: body,
Time: time,
URL: resource.URL,
Body: resource.Body,
Time: resource.Time,
Title: title,
Meta: meta,
Description: meta["description"],

@ -6,7 +6,7 @@ import (
"time"
)
func TestExtractResource(t *testing.T) {
func TestIndexResource(t *testing.T) {
body := `
<title>Creekorful Inc</title>
@ -23,34 +23,39 @@ This is sparta
Body: body,
}
resDto, err := extractResource("https://example.org/300", time.Time{}, body, map[string]string{"Content-Type": "application/json"})
resIdx, err := indexResource(Resource{
URL: "https://example.org/300",
Time: time.Time{},
Body: body,
Headers: map[string]string{"Content-Type": "application/json"},
})
if err != nil {
t.FailNow()
}
if resDto.URL != "https://example.org/300" {
if resIdx.URL != "https://example.org/300" {
t.Fail()
}
if resDto.Title != "Creekorful Inc" {
if resIdx.Title != "Creekorful Inc" {
t.Fail()
}
if resDto.Body != msg.Body {
if resIdx.Body != msg.Body {
t.Fail()
}
if resDto.Description != "Zhello world" {
if resIdx.Description != "Zhello world" {
t.Fail()
}
if resDto.Meta["description"] != "Zhello world" {
if resIdx.Meta["description"] != "Zhello world" {
t.Fail()
}
if resDto.Meta["og:url"] != "https://example.org" {
if resIdx.Meta["og:url"] != "https://example.org" {
t.Fail()
}
if resDto.Headers["content-type"] != "application/json" {
if resIdx.Headers["content-type"] != "application/json" {
t.Fail()
}
}

@ -14,10 +14,18 @@ const (
Local = "local"
)
// Index is the interface used to abstract communication
// with the persistence unit
// Resource represent a resource that should be indexed
type Resource struct {
URL string
Time time.Time
Body string
Headers map[string]string
}
// Index is the interface used to abstract communication with the persistence unit
type Index interface {
IndexResource(url string, time time.Time, body string, headers map[string]string) error
IndexResource(resource Resource) error
IndexResources(resources []Resource) error
}
// NewIndex create a new index using given driver, destination

@ -20,13 +20,13 @@ func newLocalIndex(root string) (Index, error) {
return &localIndex{baseDir: root}, nil
}
func (s *localIndex) IndexResource(url string, time time.Time, body string, headers map[string]string) error {
path, err := formatPath(url, time)
func (s *localIndex) IndexResource(resource Resource) error {
path, err := formatPath(resource.URL, resource.Time)
if err != nil {
return err
}
content, err := formatResource(url, body, headers)
content, err := formatResource(resource.URL, resource.Body, resource.Headers)
if err != nil {
return err
}
@ -45,6 +45,18 @@ func (s *localIndex) IndexResource(url string, time time.Time, body string, head
return nil
}
func (s *localIndex) IndexResources(resources []Resource) error {
// No specific implementation for the local driver.
// we simply call IndexResource n-times
for _, resource := range resources {
if err := s.IndexResource(resource); err != nil {
return err
}
}
return nil
}
func formatResource(url string, body string, headers map[string]string) ([]byte, error) {
builder := strings.Builder{}

@ -57,8 +57,55 @@ func TestLocalIndex_IndexResource(t *testing.T) {
s := localIndex{baseDir: d}
ti := time.Date(2020, time.October, 29, 12, 4, 9, 0, time.UTC)
if err := s.IndexResource(Resource{
URL: "https://google.com",
Time: ti,
Body: "Hello, world",
Headers: map[string]string{"Server": "Traefik"},
}); err != nil {
t.Fail()
}
p := filepath.Join(d, "https", "google.com", "1603973049")
inf, err := os.Stat(p)
if err != nil {
t.Fail()
}
if inf.Mode() != 0640 {
t.Fail()
}
b, err := ioutil.ReadFile(p)
if err != nil {
t.Fail()
}
if string(b) != "https://google.com\n\nServer: Traefik\n\nHello, world" {
t.Fail()
}
}
func TestLocalIndex_IndexResources(t *testing.T) {
d, err := ioutil.TempDir("", "")
if err != nil {
t.FailNow()
}
defer os.RemoveAll(d)
s := localIndex{baseDir: d}
ti := time.Date(2020, time.October, 29, 12, 4, 9, 0, time.UTC)
resources := []Resource{
{
URL: "https://google.com",
Time: ti,
Body: "Hello, world",
Headers: map[string]string{"Server": "Traefik"},
},
}
if err := s.IndexResource("https://google.com", ti, "Hello, world", map[string]string{"Server": "Traefik"}); err != nil {
if err := s.IndexResources(resources); err != nil {
t.Fail()
}

@ -19,6 +19,9 @@ type State struct {
index index.Index
indexDriver string
configClient configapi.Client
bufferThreshold int
resources []index.Resource
}
// Name return the process name
@ -26,9 +29,9 @@ func (state *State) Name() string {
return "indexer"
}
// CommonFlags return process common flags
func (state *State) CommonFlags() []string {
return []string{process.HubURIFlag, process.ConfigAPIURIFlag}
// Features return the process features
func (state *State) Features() []process.Feature {
return []process.Feature{process.EventFeature, process.ConfigFeature}
}
// CustomFlags return process custom flags
@ -49,13 +52,14 @@ func (state *State) CustomFlags() []cli.Flag {
// Initialize the process
func (state *State) Initialize(provider process.Provider) error {
indexDriver := provider.GetValue("index-driver")
idx, err := index.NewIndex(indexDriver, provider.GetValue("index-dest"))
indexDriver := provider.GetStrValue("index-driver")
idx, err := index.NewIndex(indexDriver, provider.GetStrValue("index-dest"))
if err != nil {
return err
}
state.index = idx
state.indexDriver = indexDriver
state.bufferThreshold = provider.GetIntValue(process.EventPrefetchFlag)
configClient, err := provider.ConfigClient([]string{configapi.ForbiddenHostnamesKey})
if err != nil {
@ -89,11 +93,47 @@ func (state *State) handleNewResourceEvent(subscriber event.Subscriber, msg even
return fmt.Errorf("%s %w", evt.URL, errHostnameNotAllowed)
}
if err := state.index.IndexResource(evt.URL, evt.Time, evt.Body, evt.Headers); err != nil {
return fmt.Errorf("error while indexing resource: %s", err)
// Direct saving (no buffering)
if state.bufferThreshold == 1 {
if err := state.index.IndexResource(index.Resource{
URL: evt.URL,
Time: evt.Time,
Body: evt.Body,
Headers: evt.Headers,
}); err != nil {
return fmt.Errorf("error while indexing resource: %s", err)
}
log.Info().
Str("url", evt.URL).
Msg("Successfully indexed resource")
return nil
}
log.Info().Str("url", evt.URL).Msg("Successfully indexed resource")
// Otherwise we are in buffered saving mode
state.resources = append(state.resources, index.Resource{
URL: evt.URL,
Time: evt.Time,
Body: evt.Body,
Headers: evt.Headers,
})
log.Debug().Str("url", evt.URL).Msg("Successfully stored resource in buffer")
if len(state.resources) >= state.bufferThreshold {
// Time to save!
if err := state.index.IndexResources(state.resources); err != nil {
return fmt.Errorf("error while indexing resources: %s", err)
}
log.Info().
Int("count", len(state.resources)).
Msg("Successfully indexed buffered resources")
// Clear cache
state.resources = []index.Resource{}
}
return nil
}

@ -6,11 +6,13 @@ import (
"github.com/creekorful/trandoshan/internal/configapi/client_mock"
"github.com/creekorful/trandoshan/internal/event"
"github.com/creekorful/trandoshan/internal/event_mock"
"github.com/creekorful/trandoshan/internal/indexer/index"
"github.com/creekorful/trandoshan/internal/indexer/index_mock"
"github.com/creekorful/trandoshan/internal/process"
"github.com/creekorful/trandoshan/internal/process_mock"
"github.com/creekorful/trandoshan/internal/test"
"github.com/golang/mock/gomock"
"reflect"
"testing"
"time"
)
@ -22,9 +24,9 @@ func TestState_Name(t *testing.T) {
}
}
func TestState_CommonFlags(t *testing.T) {
func TestState_Features(t *testing.T) {
s := State{}
test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag})
test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.ConfigFeature})
}
func TestState_CustomFlags(t *testing.T) {
@ -35,14 +37,18 @@ func TestState_CustomFlags(t *testing.T) {
func TestState_Initialize(t *testing.T) {
s := State{}
test.CheckInitialize(t, &s, func(p *process_mock.MockProviderMockRecorder) {
p.GetValue("index-driver").Return("local")
p.GetValue("index-dest")
p.GetStrValue("index-driver").Return("local")
p.GetStrValue("index-dest")
p.GetIntValue(process.EventPrefetchFlag).Return(10)
p.ConfigClient([]string{client.ForbiddenHostnamesKey})
})
if s.indexDriver != "local" {
t.Errorf("wrong driver: got: %s want: %s", s.indexDriver, "local")
}
if s.bufferThreshold != 10 {
t.Errorf("wrong buffer threshold: got: %d want: %d", s.bufferThreshold, 10)
}
}
func TestState_Subscribers(t *testing.T) {
@ -52,7 +58,53 @@ func TestState_Subscribers(t *testing.T) {
})
}
func TestHandleNewResourceEvent(t *testing.T) {
func TestHandleNewResourceEvent_NoBuffering(t *testing.T) {
body := `
<title>Creekorful Inc</title>
This is sparta (hosted on https://example.org)
<a href="https://google.com/test?test=test#12">
Thanks to https://help.facebook.onion/ for the hosting :D
<meta name="DescriptIon" content="Zhello world">
<meta property="og:url" content="https://example.org">`
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
configClientMock := client_mock.NewMockClient(mockCtrl)
indexMock := index_mock.NewMockIndex(mockCtrl)
tn := time.Now()
msg := event.RawMessage{}
subscriberMock.EXPECT().
Read(&msg, &event.NewResourceEvent{}).
SetArg(1, event.NewResourceEvent{
URL: "https://example.onion",
Body: body,
Headers: map[string]string{"Server": "Traefik", "Content-Type": "application/html"},
Time: tn,
}).Return(nil)
configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{{Hostname: "example2.onion"}}, nil)
indexMock.EXPECT().IndexResource(index.Resource{
URL: "https://example.onion",
Time: tn,
Body: body,
Headers: map[string]string{"Server": "Traefik", "Content-Type": "application/html"},
})
s := State{index: indexMock, configClient: configClientMock, bufferThreshold: 1}
if err := s.handleNewResourceEvent(subscriberMock, msg); err != nil {
t.FailNow()
}
}
func TestHandleNewResourceEvent_Buffering_NoDispatch(t *testing.T) {
body := `
<title>Creekorful Inc</title>
@ -85,12 +137,89 @@ Thanks to https://help.facebook.onion/ for the hosting :D
}).Return(nil)
configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{{Hostname: "example2.onion"}}, nil)
indexMock.EXPECT().IndexResource("https://example.onion", tn, body, map[string]string{"Server": "Traefik", "Content-Type": "application/html"})
s := State{index: indexMock, configClient: configClientMock}
s := State{index: indexMock, configClient: configClientMock, bufferThreshold: 5}
if err := s.handleNewResourceEvent(subscriberMock, msg); err != nil {
t.FailNow()
}
if len(s.resources) != 1 {
t.FailNow()
}
if s.resources[0].URL != "https://example.onion" {
t.Fail()
}
if s.resources[0].Body != body {
t.Fail()
}
if !reflect.DeepEqual(s.resources[0].Headers, map[string]string{"Server": "Traefik", "Content-Type": "application/html"}) {
t.Fail()
}
if s.resources[0].Time != tn {
t.Fail()
}
}
func TestHandleNewResourceEvent_Buffering_Dispatch(t *testing.T) {
body := `
<title>Creekorful Inc</title>
This is sparta (hosted on https://example.org)
<a href="https://google.com/test?test=test#12">
Thanks to https://help.facebook.onion/ for the hosting :D
<meta name="DescriptIon" content="Zhello world">
<meta property="og:url" content="https://example.org">`
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
configClientMock := client_mock.NewMockClient(mockCtrl)
indexMock := index_mock.NewMockIndex(mockCtrl)
tn := time.Now()
msg := event.RawMessage{}
subscriberMock.EXPECT().
Read(&msg, &event.NewResourceEvent{}).
SetArg(1, event.NewResourceEvent{
URL: "https://example.onion",
Body: body,
Headers: map[string]string{"Server": "Traefik", "Content-Type": "application/html"},
Time: tn,
}).Return(nil)
configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{{Hostname: "example2.onion"}}, nil)
indexMock.EXPECT().IndexResources([]index.Resource{
{
URL: "https://google.onion",
},
{
URL: "https://example.onion",
Time: tn,
Body: body,
Headers: map[string]string{"Server": "Traefik", "Content-Type": "application/html"},
},
})
s := State{
index: indexMock,
configClient: configClientMock,
bufferThreshold: 2,
resources: []index.Resource{{URL: "https://google.onion"}},
}
if err := s.handleNewResourceEvent(subscriberMock, msg); err != nil {
t.FailNow()
}
// should be reset
if len(s.resources) != 0 {
t.Fail()
}
}
func TestHandleMessageForbiddenHostname(t *testing.T) {

@ -23,22 +23,29 @@ import (
"time"
)
// Feature represent a process feature
type Feature int
const (
version = "0.10.0"
// APIURIFlag is the api-uri flag
APIURIFlag = "api-uri"
// APITokenFlag is the api-token flag
APITokenFlag = "api-token"
// HubURIFlag is the hub-uri flag
HubURIFlag = "hub-uri"
// ConfigAPIURIFlag is the config-api-uri flag
ConfigAPIURIFlag = "config-api-uri"
// RedisURIFlag is the redis-uri flag
RedisURIFlag = "redis-uri"
// TorURIFlag is the tor-uri flag
TorURIFlag = "tor-uri"
// UserAgentFlag is the user-agent flag
UserAgentFlag = "user-agent"
// EventFeature is the feature to plug the process to the event server
EventFeature Feature = iota
// ConfigFeature is the feature to plug the process to the ConfigAPI framework
ConfigFeature
// CacheFeature is the feature to plug the process to the cache server
CacheFeature
// CrawlingFeature is the feature to plug the process with a tor-compatible HTTP client
CrawlingFeature
// EventPrefetchFlag is the prefetch count for the event subscriber
EventPrefetchFlag = "event-prefetch"
eventURIFlag = "event-srv"
configAPIURIFlag = "config-api"
redisURIFlag = "redis"
torURIFlag = "tor-proxy"
userAgentFlag = "user-agent"
)
// Provider is the implementation provider
@ -55,10 +62,12 @@ type Provider interface {
Cache(keyPrefix string) (cache.Cache, error)
// HTTPClient return a new configured http client
HTTPClient() (chttp.Client, error)
// GetValue return value for given key
GetValue(key string) string
// GetValue return values for given key
GetValues(key string) []string
// GetStrValue return string value for given key
GetStrValue(key string) string
// GetStrValues return string slice for given key
GetStrValues(key string) []string
// GetIntValue return int value for given key
GetIntValue(key string) int
}
type defaultProvider struct {
@ -80,41 +89,45 @@ func (p *defaultProvider) ConfigClient(keys []string) (configapi.Client, error)
return nil, err
}
return configapi.NewConfigClient(p.ctx.String(ConfigAPIURIFlag), sub, keys)
return configapi.NewConfigClient(p.ctx.String(configAPIURIFlag), sub, keys)
}
func (p *defaultProvider) Subscriber() (event.Subscriber, error) {
return event.NewSubscriber(p.ctx.String(HubURIFlag))
return event.NewSubscriber(p.ctx.String(eventURIFlag), p.ctx.Int(EventPrefetchFlag))
}
func (p *defaultProvider) Publisher() (event.Publisher, error) {
return event.NewPublisher(p.ctx.String(HubURIFlag))
return event.NewPublisher(p.ctx.String(eventURIFlag))
}
func (p *defaultProvider) Cache(keyPrefix string) (cache.Cache, error) {
return cache.NewRedisCache(p.ctx.String(RedisURIFlag), keyPrefix)
return cache.NewRedisCache(p.ctx.String(redisURIFlag), keyPrefix)
}
func (p *defaultProvider) HTTPClient() (chttp.Client, error) {
return chttp.NewFastHTTPClient(&fasthttp.Client{
// Use given TOR proxy to reach the hidden services
Dial: fasthttpproxy.FasthttpSocksDialer(p.ctx.String(TorURIFlag)),
Dial: fasthttpproxy.FasthttpSocksDialer(p.ctx.String(torURIFlag)),
// Disable SSL verification since we do not really care about this
TLSConfig: &tls.Config{InsecureSkipVerify: true},
ReadTimeout: time.Second * 5,
WriteTimeout: time.Second * 5,
Name: p.ctx.String(UserAgentFlag),
Name: p.ctx.String(userAgentFlag),
}), nil
}
func (p *defaultProvider) GetValue(key string) string {
func (p *defaultProvider) GetStrValue(key string) string {
return p.ctx.String(key)
}
func (p *defaultProvider) GetValues(key string) []string {
func (p *defaultProvider) GetStrValues(key string) []string {
return p.ctx.StringSlice(key)
}
func (p *defaultProvider) GetIntValue(key string) int {
return p.ctx.Int(key)
}
// SubscriberDef is the subscriber definition
type SubscriberDef struct {
Exchange string
@ -125,7 +138,7 @@ type SubscriberDef struct {
// Process is a component of Trandoshan
type Process interface {
Name() string
CommonFlags() []string
Features() []Feature
CustomFlags() []cli.Flag
Initialize(provider Provider) error
Subscribers() []SubscriberDef
@ -148,11 +161,11 @@ func MakeApp(process Process) *cli.App {
Action: execute(process),
}
// Add common flags
flags := getCustomFlags()
for _, flag := range process.CommonFlags() {
if customFlag, contains := flags[flag]; contains {
app.Flags = append(app.Flags, customFlag)
// Add features flags
featureFlags := getFeaturesFlags()
for _, feature := range process.Features() {
if values, exist := featureFlags[feature]; exist {
app.Flags = append(app.Flags, values...)
}
}
@ -236,43 +249,49 @@ func execute(process Process) cli.ActionFunc {
}
}
func getCustomFlags() map[string]cli.Flag {
flags := map[string]cli.Flag{}
func getFeaturesFlags() map[Feature][]cli.Flag {
flags := map[Feature][]cli.Flag{}
flags[HubURIFlag] = &cli.StringFlag{
Name: HubURIFlag,
Usage: "URI to the hub (event) server",
Required: true,
}
flags[APIURIFlag] = &cli.StringFlag{
Name: APIURIFlag,
Usage: "URI to the API server",
Required: true,
}
flags[APITokenFlag] = &cli.StringFlag{
Name: APITokenFlag,
Usage: "Token to use to authenticate against the API",
Required: true,
}
flags[ConfigAPIURIFlag] = &cli.StringFlag{
Name: ConfigAPIURIFlag,
Usage: "URI to the ConfigAPI server",
Required: true,
flags[EventFeature] = []cli.Flag{
&cli.StringFlag{
Name: eventURIFlag,
Usage: "URI to the event server",
Required: true,
},
&cli.IntFlag{
Name: EventPrefetchFlag,
Usage: "Prefetch for the event subscriber",
Value: 1,
},
}
flags[RedisURIFlag] = &cli.StringFlag{
Name: RedisURIFlag,
Usage: "URI to the Redis server",
Required: true,
flags[ConfigFeature] = []cli.Flag{
&cli.StringFlag{
Name: configAPIURIFlag,
Usage: "URI to the ConfigAPI server",
Required: true,
},
}
flags[TorURIFlag] = &cli.StringFlag{
Name: TorURIFlag,
Usage: "URI to the TOR SOCKS proxy",
Required: true,
flags[CacheFeature] = []cli.Flag{
&cli.StringFlag{
Name: redisURIFlag,
Usage: "URI to the Redis server",
Required: true,
},
}
flags[UserAgentFlag] = &cli.StringFlag{
Name: UserAgentFlag,
Usage: "User agent to use",
Value: "Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0",
flags[CrawlingFeature] = []cli.Flag{
&cli.StringFlag{
Name: torURIFlag,
Usage: "URI to the TOR SOCKS proxy",
Required: true,
},
&cli.StringFlag{
Name: userAgentFlag,
Usage: "User agent to use",
Value: "Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0",
},
}
return flags

@ -39,9 +39,9 @@ func (state *State) Name() string {
return "scheduler"
}
// CommonFlags return process common flags
func (state *State) CommonFlags() []string {
return []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag}
// Features return the process features
func (state *State) Features() []process.Feature {
return []process.Feature{process.EventFeature, process.ConfigFeature, process.CacheFeature}
}
// CustomFlags return process custom flags

@ -22,9 +22,9 @@ func TestState_Name(t *testing.T) {
}
}
func TestState_CommonFlags(t *testing.T) {
func TestState_Features(t *testing.T) {
s := State{}
test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag})
test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.ConfigFeature, process.CacheFeature})
}
func TestState_CustomFlags(t *testing.T) {

@ -14,10 +14,10 @@ type SubscriberDef struct {
Exchange string
}
// CheckProcessCommonFlags check process defined common flags
func CheckProcessCommonFlags(t *testing.T, p process.Process, wantFlags []string) {
if !checkListEquals(p.CommonFlags(), wantFlags) {
t.Errorf("Differents flags: %v %v", p.CommonFlags(), wantFlags)
// CheckProcessFeatures check process defined features
func CheckProcessFeatures(t *testing.T, p process.Process, wantFeatures []process.Feature) {
if !reflect.DeepEqual(p.Features(), wantFeatures) {
t.Errorf("Differents flags: %v %v", p.Features(), wantFeatures)
}
}
@ -28,7 +28,7 @@ func CheckProcessCustomFlags(t *testing.T, p process.Process, wantFlags []string
names = append(names, customFlag.Names()[0])
}
if !checkListEquals(names, wantFlags) {
if !reflect.DeepEqual(names, wantFlags) {
t.Errorf("Differents flags: %v %v", names, wantFlags)
}
}
@ -62,7 +62,3 @@ func CheckProcessSubscribers(t *testing.T, p process.Process, subscribers []Subs
}
// TODO HTTPHandler
func checkListEquals(a []string, b []string) bool {
return reflect.DeepEqual(a, b)
}

Loading…
Cancel
Save