process: Rework whole flags system

- Turn the flag into Feature system to allow easier configuration.
- Add prefetch flag to event feature
pull/126/head
Aloïs Micard 3 years ago
parent 829afcbb6a
commit 71f82d4aad
No known key found for this signature in database
GPG Key ID: 1A0EB82F071F5EFE

@ -32,29 +32,32 @@ services:
image: creekorful/tdsh-crawler:latest
command: >
--log-level debug
--hub-uri amqp://guest:guest@rabbitmq:5672
--tor-uri torproxy:9050
--config-api-uri http://configapi:8080
--event-srv amqp://guest:guest@rabbitmq:5672
--tor-proxy torproxy:9050
--config-api http://configapi:8080
restart: always
depends_on:
- rabbitmq
- torproxy
- configapi
scheduler:
image: creekorful/tdsh-scheduler:latest
command: >
--log-level debug
--hub-uri amqp://guest:guest@rabbitmq:5672
--config-api-uri http://configapi:8080
--redis-uri redis:6379
--event-srv amqp://guest:guest@rabbitmq:5672
--config-api http://configapi:8080
--redis redis:6379
restart: always
depends_on:
- rabbitmq
- configapi
- redis
indexer-local:
image: creekorful/tdsh-indexer:latest
command: >
--log-level debug
--hub-uri amqp://guest:guest@rabbitmq:5672
--config-api-uri http://configapi:8080
--event-srv amqp://guest:guest@rabbitmq:5672
--config-api http://configapi:8080
--index-driver local
--index-dest /archive
restart: always
@ -67,8 +70,8 @@ services:
image: creekorful/tdsh-indexer:latest
command: >
--log-level debug
--hub-uri amqp://guest:guest@rabbitmq:5672
--config-api-uri http://configapi:8080
--event-srv amqp://guest:guest@rabbitmq:5672
--config-api http://configapi:8080
--index-driver elastic
--index-dest http://elasticsearch:9200
restart: always
@ -80,8 +83,8 @@ services:
image: creekorful/tdsh-configapi:latest
command: >
--log-level debug
--hub-uri amqp://guest:guest@rabbitmq:5672
--redis-uri redis:6379
--event-srv amqp://guest:guest@rabbitmq:5672
--redis redis:6379
--default-value forbidden-hostnames="[]"
--default-value allowed-mime-types="[{\"content-type\":\"text/\",\"extensions\":[\"html\",\"php\",\"aspx\", \"htm\"]}]"
--default-value refresh-delay="{\"delay\": -1}"
@ -96,10 +99,10 @@ services:
image: creekorful/tdsh-blacklister:latest
command: >
--log-level debug
--hub-uri amqp://guest:guest@rabbitmq:5672
--config-api-uri http://configapi:8080
--redis-uri redis:6379
--tor-uri torproxy:9050
--event-srv amqp://guest:guest@rabbitmq:5672
--config-api http://configapi:8080
--redis redis:6379
--tor-proxy torproxy:9050
restart: always
depends_on:
- rabbitmq

@ -27,9 +27,9 @@ func (state *State) Name() string {
return "blacklister"
}
// CommonFlags return process common flags
func (state *State) CommonFlags() []string {
return []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag, process.UserAgentFlag, process.TorURIFlag}
// Features return the process features
func (state *State) Features() []process.Feature {
return []process.Feature{process.EventFeature, process.ConfigFeature, process.CacheFeature, process.CrawlingFeature}
}
// CustomFlags return process custom flags

@ -24,9 +24,9 @@ func TestState_Name(t *testing.T) {
}
}
func TestState_CommonFlags(t *testing.T) {
func TestState_Features(t *testing.T) {
s := State{}
test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag, process.UserAgentFlag, process.TorURIFlag})
test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.ConfigFeature, process.CacheFeature, process.CrawlingFeature})
}
func TestState_CustomFlags(t *testing.T) {

@ -24,9 +24,9 @@ func (state *State) Name() string {
return "configapi"
}
// CommonFlags return process common flags
func (state *State) CommonFlags() []string {
return []string{process.HubURIFlag, process.RedisURIFlag}
// Features return the process features
func (state *State) Features() []process.Feature {
return []process.Feature{process.EventFeature, process.CacheFeature}
}
// CustomFlags return process custom flags

@ -24,9 +24,9 @@ func TestState_Name(t *testing.T) {
}
}
func TestState_CommonFlags(t *testing.T) {
func TestState_Features(t *testing.T) {
s := State{}
test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.RedisURIFlag})
test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.CacheFeature})
}
func TestState_CustomFlags(t *testing.T) {

@ -32,9 +32,9 @@ func (state *State) Name() string {
return "crawler"
}
// CommonFlags return process common flags
func (state *State) CommonFlags() []string {
return []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.UserAgentFlag, process.TorURIFlag}
// Features return the process features
func (state *State) Features() []process.Feature {
return []process.Feature{process.EventFeature, process.ConfigFeature, process.CrawlingFeature}
}
// CustomFlags return process custom flags

@ -25,9 +25,9 @@ func TestState_Name(t *testing.T) {
}
}
func TestState_CommonFlags(t *testing.T) {
func TestState_Features(t *testing.T) {
s := State{}
test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.UserAgentFlag, process.TorURIFlag})
test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.ConfigFeature, process.CrawlingFeature})
}
func TestState_CustomFlags(t *testing.T) {

@ -36,7 +36,7 @@ type subscriber struct {
}
// NewSubscriber create a new subscriber and connect it to given server
func NewSubscriber(amqpURI string) (Subscriber, error) {
func NewSubscriber(amqpURI string, prefetch int) (Subscriber, error) {
conn, err := amqp.Dial(amqpURI)
if err != nil {
return nil, err
@ -46,7 +46,7 @@ func NewSubscriber(amqpURI string) (Subscriber, error) {
if err != nil {
return nil, err
}
if err := c.Qos(1, 0, false); err != nil {
if err := c.Qos(prefetch, 0, false); err != nil {
return nil, err
}

@ -26,9 +26,9 @@ func (state *State) Name() string {
return "indexer"
}
// CommonFlags return process common flags
func (state *State) CommonFlags() []string {
return []string{process.HubURIFlag, process.ConfigAPIURIFlag}
// Features return the process features
func (state *State) Features() []process.Feature {
return []process.Feature{process.EventFeature, process.ConfigFeature}
}
// CustomFlags return process custom flags

@ -22,9 +22,9 @@ func TestState_Name(t *testing.T) {
}
}
func TestState_CommonFlags(t *testing.T) {
func TestState_Features(t *testing.T) {
s := State{}
test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag})
test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.ConfigFeature})
}
func TestState_CustomFlags(t *testing.T) {

@ -23,22 +23,27 @@ import (
"time"
)
// Feature represent a process feature
type Feature int
const (
version = "0.10.0"
// APIURIFlag is the api-uri flag
APIURIFlag = "api-uri"
// APITokenFlag is the api-token flag
APITokenFlag = "api-token"
// HubURIFlag is the hub-uri flag
HubURIFlag = "hub-uri"
// ConfigAPIURIFlag is the config-api-uri flag
ConfigAPIURIFlag = "config-api-uri"
// RedisURIFlag is the redis-uri flag
RedisURIFlag = "redis-uri"
// TorURIFlag is the tor-uri flag
TorURIFlag = "tor-uri"
// UserAgentFlag is the user-agent flag
UserAgentFlag = "user-agent"
// EventFeature is the feature to plug the process to the event server
EventFeature Feature = iota
// ConfigFeature is the feature to plug the process to the ConfigAPI framework
ConfigFeature
// CacheFeature is the feature to plug the process to the cache server
CacheFeature
// CrawlingFeature is the feature to plug the process with a tor-compatible HTTP client
CrawlingFeature
eventURIFlag = "event-srv"
eventPrefetchFlag = "event-prefetch"
configAPIURIFlag = "config-api"
redisURIFlag = "redis"
torURIFlag = "tor-proxy"
userAgentFlag = "user-agent"
)
// Provider is the implementation provider
@ -80,30 +85,30 @@ func (p *defaultProvider) ConfigClient(keys []string) (configapi.Client, error)
return nil, err
}
return configapi.NewConfigClient(p.ctx.String(ConfigAPIURIFlag), sub, keys)
return configapi.NewConfigClient(p.ctx.String(configAPIURIFlag), sub, keys)
}
func (p *defaultProvider) Subscriber() (event.Subscriber, error) {
return event.NewSubscriber(p.ctx.String(HubURIFlag))
return event.NewSubscriber(p.ctx.String(eventURIFlag), p.ctx.Int(eventPrefetchFlag))
}
func (p *defaultProvider) Publisher() (event.Publisher, error) {
return event.NewPublisher(p.ctx.String(HubURIFlag))
return event.NewPublisher(p.ctx.String(eventURIFlag))
}
func (p *defaultProvider) Cache(keyPrefix string) (cache.Cache, error) {
return cache.NewRedisCache(p.ctx.String(RedisURIFlag), keyPrefix)
return cache.NewRedisCache(p.ctx.String(redisURIFlag), keyPrefix)
}
func (p *defaultProvider) HTTPClient() (chttp.Client, error) {
return chttp.NewFastHTTPClient(&fasthttp.Client{
// Use given TOR proxy to reach the hidden services
Dial: fasthttpproxy.FasthttpSocksDialer(p.ctx.String(TorURIFlag)),
Dial: fasthttpproxy.FasthttpSocksDialer(p.ctx.String(torURIFlag)),
// Disable SSL verification since we do not really care about this
TLSConfig: &tls.Config{InsecureSkipVerify: true},
ReadTimeout: time.Second * 5,
WriteTimeout: time.Second * 5,
Name: p.ctx.String(UserAgentFlag),
Name: p.ctx.String(userAgentFlag),
}), nil
}
@ -125,7 +130,7 @@ type SubscriberDef struct {
// Process is a component of Trandoshan
type Process interface {
Name() string
CommonFlags() []string
Features() []Feature
CustomFlags() []cli.Flag
Initialize(provider Provider) error
Subscribers() []SubscriberDef
@ -148,11 +153,11 @@ func MakeApp(process Process) *cli.App {
Action: execute(process),
}
// Add common flags
flags := getCustomFlags()
for _, flag := range process.CommonFlags() {
if customFlag, contains := flags[flag]; contains {
app.Flags = append(app.Flags, customFlag)
// Add features flags
featureFlags := getFeaturesFlags()
for _, feature := range process.Features() {
if values, exist := featureFlags[feature]; exist {
app.Flags = append(app.Flags, values...)
}
}
@ -236,43 +241,49 @@ func execute(process Process) cli.ActionFunc {
}
}
func getCustomFlags() map[string]cli.Flag {
flags := map[string]cli.Flag{}
func getFeaturesFlags() map[Feature][]cli.Flag {
flags := map[Feature][]cli.Flag{}
flags[HubURIFlag] = &cli.StringFlag{
Name: HubURIFlag,
Usage: "URI to the hub (event) server",
Required: true,
}
flags[APIURIFlag] = &cli.StringFlag{
Name: APIURIFlag,
Usage: "URI to the API server",
Required: true,
}
flags[APITokenFlag] = &cli.StringFlag{
Name: APITokenFlag,
Usage: "Token to use to authenticate against the API",
Required: true,
}
flags[ConfigAPIURIFlag] = &cli.StringFlag{
Name: ConfigAPIURIFlag,
Usage: "URI to the ConfigAPI server",
Required: true,
flags[EventFeature] = []cli.Flag{
&cli.StringFlag{
Name: eventURIFlag,
Usage: "URI to the event server",
Required: true,
},
&cli.IntFlag{
Name: eventPrefetchFlag,
Usage: "Prefetch for the event subscriber",
Value: 1,
},
}
flags[RedisURIFlag] = &cli.StringFlag{
Name: RedisURIFlag,
Usage: "URI to the Redis server",
Required: true,
flags[ConfigFeature] = []cli.Flag{
&cli.StringFlag{
Name: configAPIURIFlag,
Usage: "URI to the ConfigAPI server",
Required: true,
},
}
flags[TorURIFlag] = &cli.StringFlag{
Name: TorURIFlag,
Usage: "URI to the TOR SOCKS proxy",
Required: true,
flags[CacheFeature] = []cli.Flag{
&cli.StringFlag{
Name: redisURIFlag,
Usage: "URI to the Redis server",
Required: true,
},
}
flags[UserAgentFlag] = &cli.StringFlag{
Name: UserAgentFlag,
Usage: "User agent to use",
Value: "Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0",
flags[CrawlingFeature] = []cli.Flag{
&cli.StringFlag{
Name: torURIFlag,
Usage: "URI to the TOR SOCKS proxy",
Required: true,
},
&cli.StringFlag{
Name: userAgentFlag,
Usage: "User agent to use",
Value: "Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0",
},
}
return flags

@ -39,9 +39,9 @@ func (state *State) Name() string {
return "scheduler"
}
// CommonFlags return process common flags
func (state *State) CommonFlags() []string {
return []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag}
// Features return the process features
func (state *State) Features() []process.Feature {
return []process.Feature{process.EventFeature, process.ConfigFeature, process.CacheFeature}
}
// CustomFlags return process custom flags

@ -23,9 +23,9 @@ func TestState_Name(t *testing.T) {
}
}
func TestState_CommonFlags(t *testing.T) {
func TestState_Features(t *testing.T) {
s := State{}
test.CheckProcessCommonFlags(t, &s, []string{process.HubURIFlag, process.ConfigAPIURIFlag, process.RedisURIFlag})
test.CheckProcessFeatures(t, &s, []process.Feature{process.EventFeature, process.ConfigFeature, process.CacheFeature})
}
func TestState_CustomFlags(t *testing.T) {

@ -14,10 +14,10 @@ type SubscriberDef struct {
Exchange string
}
// CheckProcessCommonFlags check process defined common flags
func CheckProcessCommonFlags(t *testing.T, p process.Process, wantFlags []string) {
if !checkListEquals(p.CommonFlags(), wantFlags) {
t.Errorf("Differents flags: %v %v", p.CommonFlags(), wantFlags)
// CheckProcessFeatures check process defined features
func CheckProcessFeatures(t *testing.T, p process.Process, wantFeatures []process.Feature) {
if !reflect.DeepEqual(p.Features(), wantFeatures) {
t.Errorf("Differents flags: %v %v", p.Features(), wantFeatures)
}
}
@ -28,7 +28,7 @@ func CheckProcessCustomFlags(t *testing.T, p process.Process, wantFlags []string
names = append(names, customFlag.Names()[0])
}
if !checkListEquals(names, wantFlags) {
if !reflect.DeepEqual(names, wantFlags) {
t.Errorf("Differents flags: %v %v", names, wantFlags)
}
}
@ -62,7 +62,3 @@ func CheckProcessSubscribers(t *testing.T, p process.Process, subscribers []Subs
}
// TODO HTTPHandler
func checkListEquals(a []string, b []string) bool {
return reflect.DeepEqual(a, b)
}

Loading…
Cancel
Save