Indexer: support buffered indexing

pull/126/head
Aloïs Micard 3 years ago
parent 71f82d4aad
commit 9b46dc205e
No known key found for this signature in database
GPG Key ID: 1A0EB82F071F5EFE

@ -71,6 +71,7 @@ services:
command: >
--log-level debug
--event-srv amqp://guest:guest@rabbitmq:5672
--event-prefetch 20
--config-api http://configapi:8080
--index-driver elastic
--index-dest http://elasticsearch:9200

@ -54,7 +54,7 @@ func (state *State) Initialize(provider process.Provider) error {
state.pub = pub
defaultValues := map[string]string{}
for _, value := range provider.GetValues("default-value") {
for _, value := range provider.GetStrValues("default-value") {
parts := strings.Split(value, "=")
if len(parts) == 2 {

@ -38,7 +38,7 @@ func TestState_Initialize(t *testing.T) {
test.CheckInitialize(t, &State{}, func(p *process_mock.MockProviderMockRecorder) {
p.Cache("configuration")
p.Publisher()
p.GetValues("default-value")
p.GetStrValues("default-value")
})
}

@ -9,8 +9,7 @@ import (
"time"
)
var resourcesIndex = "resources"
const resourcesIndexName = "resources"
const mapping = `
{
"settings": {
@ -93,29 +92,48 @@ func newElasticIndex(uri string) (Index, error) {
}, nil
}
func (e *elasticSearchIndex) IndexResource(url string, time time.Time, body string, headers map[string]string) error {
res, err := extractResource(url, time, body, headers)
func (e *elasticSearchIndex) IndexResource(resource Resource) error {
res, err := indexResource(resource)
if err != nil {
return err
}
_, err = e.client.Index().
Index(resourcesIndex).
Index(resourcesIndexName).
BodyJson(res).
Do(context.Background())
return err
}
func (e *elasticSearchIndex) IndexResources(resources []Resource) error {
bulkRequest := e.client.Bulk()
for _, resource := range resources {
resourceIndex, err := indexResource(resource)
if err != nil {
return err
}
req := elastic.NewBulkIndexRequest().
Index(resourcesIndexName).
Doc(resourceIndex)
bulkRequest.Add(req)
}
_, err := bulkRequest.Do(context.Background())
return err
}
func setupElasticSearch(ctx context.Context, es *elastic.Client) error {
// Setup index if doesn't exist
exist, err := es.IndexExists(resourcesIndex).Do(ctx)
exist, err := es.IndexExists(resourcesIndexName).Do(ctx)
if err != nil {
return err
}
if !exist {
log.Debug().Str("index", resourcesIndex).Msg("Creating missing index")
log.Debug().Str("index", resourcesIndexName).Msg("Creating missing index")
q := es.CreateIndex(resourcesIndex).BodyString(mapping)
q := es.CreateIndex(resourcesIndexName).BodyString(mapping)
if _, err := q.Do(ctx); err != nil {
return err
}
@ -124,8 +142,8 @@ func setupElasticSearch(ctx context.Context, es *elastic.Client) error {
return nil
}
func extractResource(url string, time time.Time, body string, headers map[string]string) (*resourceIdx, error) {
doc, err := goquery.NewDocumentFromReader(strings.NewReader(body))
func indexResource(resource Resource) (*resourceIdx, error) {
doc, err := goquery.NewDocumentFromReader(strings.NewReader(resource.Body))
if err != nil {
return nil, err
}
@ -152,14 +170,14 @@ func extractResource(url string, time time.Time, body string, headers map[string
// Lowercase headers
lowerCasedHeaders := map[string]string{}
for key, value := range headers {
for key, value := range resource.Headers {
lowerCasedHeaders[strings.ToLower(key)] = value
}
return &resourceIdx{
URL: url,
Body: body,
Time: time,
URL: resource.URL,
Body: resource.Body,
Time: resource.Time,
Title: title,
Meta: meta,
Description: meta["description"],

@ -6,7 +6,7 @@ import (
"time"
)
func TestExtractResource(t *testing.T) {
func TestIndexResource(t *testing.T) {
body := `
<title>Creekorful Inc</title>
@ -23,34 +23,39 @@ This is sparta
Body: body,
}
resDto, err := extractResource("https://example.org/300", time.Time{}, body, map[string]string{"Content-Type": "application/json"})
resIdx, err := indexResource(Resource{
URL: "https://example.org/300",
Time: time.Time{},
Body: body,
Headers: map[string]string{"Content-Type": "application/json"},
})
if err != nil {
t.FailNow()
}
if resDto.URL != "https://example.org/300" {
if resIdx.URL != "https://example.org/300" {
t.Fail()
}
if resDto.Title != "Creekorful Inc" {
if resIdx.Title != "Creekorful Inc" {
t.Fail()
}
if resDto.Body != msg.Body {
if resIdx.Body != msg.Body {
t.Fail()
}
if resDto.Description != "Zhello world" {
if resIdx.Description != "Zhello world" {
t.Fail()
}
if resDto.Meta["description"] != "Zhello world" {
if resIdx.Meta["description"] != "Zhello world" {
t.Fail()
}
if resDto.Meta["og:url"] != "https://example.org" {
if resIdx.Meta["og:url"] != "https://example.org" {
t.Fail()
}
if resDto.Headers["content-type"] != "application/json" {
if resIdx.Headers["content-type"] != "application/json" {
t.Fail()
}
}

@ -14,10 +14,18 @@ const (
Local = "local"
)
// Index is the interface used to abstract communication
// with the persistence unit
// Resource represent a resource that should be indexed
type Resource struct {
URL string
Time time.Time
Body string
Headers map[string]string
}
// Index is the interface used to abstract communication with the persistence unit
type Index interface {
IndexResource(url string, time time.Time, body string, headers map[string]string) error
IndexResource(resource Resource) error
IndexResources(resources []Resource) error
}
// NewIndex create a new index using given driver, destination

@ -20,13 +20,13 @@ func newLocalIndex(root string) (Index, error) {
return &localIndex{baseDir: root}, nil
}
func (s *localIndex) IndexResource(url string, time time.Time, body string, headers map[string]string) error {
path, err := formatPath(url, time)
func (s *localIndex) IndexResource(resource Resource) error {
path, err := formatPath(resource.URL, resource.Time)
if err != nil {
return err
}
content, err := formatResource(url, body, headers)
content, err := formatResource(resource.URL, resource.Body, resource.Headers)
if err != nil {
return err
}
@ -45,6 +45,18 @@ func (s *localIndex) IndexResource(url string, time time.Time, body string, head
return nil
}
func (s *localIndex) IndexResources(resources []Resource) error {
// No specific implementation for the local driver.
// we simply call IndexResource n-times
for _, resource := range resources {
if err := s.IndexResource(resource); err != nil {
return err
}
}
return nil
}
func formatResource(url string, body string, headers map[string]string) ([]byte, error) {
builder := strings.Builder{}

@ -57,8 +57,55 @@ func TestLocalIndex_IndexResource(t *testing.T) {
s := localIndex{baseDir: d}
ti := time.Date(2020, time.October, 29, 12, 4, 9, 0, time.UTC)
if err := s.IndexResource(Resource{
URL: "https://google.com",
Time: ti,
Body: "Hello, world",
Headers: map[string]string{"Server": "Traefik"},
}); err != nil {
t.Fail()
}
p := filepath.Join(d, "https", "google.com", "1603973049")
inf, err := os.Stat(p)
if err != nil {
t.Fail()
}
if inf.Mode() != 0640 {
t.Fail()
}
b, err := ioutil.ReadFile(p)
if err != nil {
t.Fail()
}
if string(b) != "https://google.com\n\nServer: Traefik\n\nHello, world" {
t.Fail()
}
}
func TestLocalIndex_IndexResources(t *testing.T) {
d, err := ioutil.TempDir("", "")
if err != nil {
t.FailNow()
}
defer os.RemoveAll(d)
s := localIndex{baseDir: d}
ti := time.Date(2020, time.October, 29, 12, 4, 9, 0, time.UTC)
resources := []Resource{
{
URL: "https://google.com",
Time: ti,
Body: "Hello, world",
Headers: map[string]string{"Server": "Traefik"},
},
}
if err := s.IndexResource("https://google.com", ti, "Hello, world", map[string]string{"Server": "Traefik"}); err != nil {
if err := s.IndexResources(resources); err != nil {
t.Fail()
}

@ -19,6 +19,9 @@ type State struct {
index index.Index
indexDriver string
configClient configapi.Client
bufferThreshold int
resources []index.Resource
}
// Name return the process name
@ -49,13 +52,14 @@ func (state *State) CustomFlags() []cli.Flag {
// Initialize the process
func (state *State) Initialize(provider process.Provider) error {
indexDriver := provider.GetValue("index-driver")
idx, err := index.NewIndex(indexDriver, provider.GetValue("index-dest"))
indexDriver := provider.GetStrValue("index-driver")
idx, err := index.NewIndex(indexDriver, provider.GetStrValue("index-dest"))
if err != nil {
return err
}
state.index = idx
state.indexDriver = indexDriver
state.bufferThreshold = provider.GetIntValue(process.EventPrefetchFlag)
configClient, err := provider.ConfigClient([]string{configapi.ForbiddenHostnamesKey})
if err != nil {
@ -89,11 +93,47 @@ func (state *State) handleNewResourceEvent(subscriber event.Subscriber, msg even
return fmt.Errorf("%s %w", evt.URL, errHostnameNotAllowed)
}
if err := state.index.IndexResource(evt.URL, evt.Time, evt.Body, evt.Headers); err != nil {
return fmt.Errorf("error while indexing resource: %s", err)
// Direct saving (no buffering)
if state.bufferThreshold == 1 {
if err := state.index.IndexResource(index.Resource{
URL: evt.URL,
Time: evt.Time,
Body: evt.Body,
Headers: evt.Headers,
}); err != nil {
return fmt.Errorf("error while indexing resource: %s", err)
}
log.Info().
Str("url", evt.URL).
Msg("Successfully indexed resource")
return nil
}
log.Info().Str("url", evt.URL).Msg("Successfully indexed resource")
// Otherwise we are in buffered saving mode
state.resources = append(state.resources, index.Resource{
URL: evt.URL,
Time: evt.Time,
Body: evt.Body,
Headers: evt.Headers,
})
log.Debug().Str("url", evt.URL).Msg("Successfully stored resource in buffer")
if len(state.resources) >= state.bufferThreshold {
// Time to save!
if err := state.index.IndexResources(state.resources); err != nil {
return fmt.Errorf("error while indexing resources: %s", err)
}
log.Info().
Int("count", len(state.resources)).
Msg("Successfully indexed buffered resources")
// Clear cache
state.resources = []index.Resource{}
}
return nil
}

@ -6,11 +6,13 @@ import (
"github.com/creekorful/trandoshan/internal/configapi/client_mock"
"github.com/creekorful/trandoshan/internal/event"
"github.com/creekorful/trandoshan/internal/event_mock"
"github.com/creekorful/trandoshan/internal/indexer/index"
"github.com/creekorful/trandoshan/internal/indexer/index_mock"
"github.com/creekorful/trandoshan/internal/process"
"github.com/creekorful/trandoshan/internal/process_mock"
"github.com/creekorful/trandoshan/internal/test"
"github.com/golang/mock/gomock"
"reflect"
"testing"
"time"
)
@ -35,14 +37,18 @@ func TestState_CustomFlags(t *testing.T) {
func TestState_Initialize(t *testing.T) {
s := State{}
test.CheckInitialize(t, &s, func(p *process_mock.MockProviderMockRecorder) {
p.GetValue("index-driver").Return("local")
p.GetValue("index-dest")
p.GetStrValue("index-driver").Return("local")
p.GetStrValue("index-dest")
p.GetIntValue(process.EventPrefetchFlag).Return(10)
p.ConfigClient([]string{client.ForbiddenHostnamesKey})
})
if s.indexDriver != "local" {
t.Errorf("wrong driver: got: %s want: %s", s.indexDriver, "local")
}
if s.bufferThreshold != 10 {
t.Errorf("wrong buffer threshold: got: %d want: %d", s.bufferThreshold, 10)
}
}
func TestState_Subscribers(t *testing.T) {
@ -52,7 +58,53 @@ func TestState_Subscribers(t *testing.T) {
})
}
func TestHandleNewResourceEvent(t *testing.T) {
func TestHandleNewResourceEvent_NoBuffering(t *testing.T) {
body := `
<title>Creekorful Inc</title>
This is sparta (hosted on https://example.org)
<a href="https://google.com/test?test=test#12">
Thanks to https://help.facebook.onion/ for the hosting :D
<meta name="DescriptIon" content="Zhello world">
<meta property="og:url" content="https://example.org">`
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
configClientMock := client_mock.NewMockClient(mockCtrl)
indexMock := index_mock.NewMockIndex(mockCtrl)
tn := time.Now()
msg := event.RawMessage{}
subscriberMock.EXPECT().
Read(&msg, &event.NewResourceEvent{}).
SetArg(1, event.NewResourceEvent{
URL: "https://example.onion",
Body: body,
Headers: map[string]string{"Server": "Traefik", "Content-Type": "application/html"},
Time: tn,
}).Return(nil)
configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{{Hostname: "example2.onion"}}, nil)
indexMock.EXPECT().IndexResource(index.Resource{
URL: "https://example.onion",
Time: tn,
Body: body,
Headers: map[string]string{"Server": "Traefik", "Content-Type": "application/html"},
})
s := State{index: indexMock, configClient: configClientMock, bufferThreshold: 1}
if err := s.handleNewResourceEvent(subscriberMock, msg); err != nil {
t.FailNow()
}
}
func TestHandleNewResourceEvent_Buffering_NoDispatch(t *testing.T) {
body := `
<title>Creekorful Inc</title>
@ -85,12 +137,89 @@ Thanks to https://help.facebook.onion/ for the hosting :D
}).Return(nil)
configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{{Hostname: "example2.onion"}}, nil)
indexMock.EXPECT().IndexResource("https://example.onion", tn, body, map[string]string{"Server": "Traefik", "Content-Type": "application/html"})
s := State{index: indexMock, configClient: configClientMock}
s := State{index: indexMock, configClient: configClientMock, bufferThreshold: 5}
if err := s.handleNewResourceEvent(subscriberMock, msg); err != nil {
t.FailNow()
}
if len(s.resources) != 1 {
t.FailNow()
}
if s.resources[0].URL != "https://example.onion" {
t.Fail()
}
if s.resources[0].Body != body {
t.Fail()
}
if !reflect.DeepEqual(s.resources[0].Headers, map[string]string{"Server": "Traefik", "Content-Type": "application/html"}) {
t.Fail()
}
if s.resources[0].Time != tn {
t.Fail()
}
}
func TestHandleNewResourceEvent_Buffering_Dispatch(t *testing.T) {
body := `
<title>Creekorful Inc</title>
This is sparta (hosted on https://example.org)
<a href="https://google.com/test?test=test#12">
Thanks to https://help.facebook.onion/ for the hosting :D
<meta name="DescriptIon" content="Zhello world">
<meta property="og:url" content="https://example.org">`
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
subscriberMock := event_mock.NewMockSubscriber(mockCtrl)
configClientMock := client_mock.NewMockClient(mockCtrl)
indexMock := index_mock.NewMockIndex(mockCtrl)
tn := time.Now()
msg := event.RawMessage{}
subscriberMock.EXPECT().
Read(&msg, &event.NewResourceEvent{}).
SetArg(1, event.NewResourceEvent{
URL: "https://example.onion",
Body: body,
Headers: map[string]string{"Server": "Traefik", "Content-Type": "application/html"},
Time: tn,
}).Return(nil)
configClientMock.EXPECT().GetForbiddenHostnames().Return([]client.ForbiddenHostname{{Hostname: "example2.onion"}}, nil)
indexMock.EXPECT().IndexResources([]index.Resource{
{
URL: "https://google.onion",
},
{
URL: "https://example.onion",
Time: tn,
Body: body,
Headers: map[string]string{"Server": "Traefik", "Content-Type": "application/html"},
},
})
s := State{
index: indexMock,
configClient: configClientMock,
bufferThreshold: 2,
resources: []index.Resource{{URL: "https://google.onion"}},
}
if err := s.handleNewResourceEvent(subscriberMock, msg); err != nil {
t.FailNow()
}
// should be reset
if len(s.resources) != 0 {
t.Fail()
}
}
func TestHandleMessageForbiddenHostname(t *testing.T) {

@ -38,12 +38,14 @@ const (
// CrawlingFeature is the feature to plug the process with a tor-compatible HTTP client
CrawlingFeature
eventURIFlag = "event-srv"
eventPrefetchFlag = "event-prefetch"
configAPIURIFlag = "config-api"
redisURIFlag = "redis"
torURIFlag = "tor-proxy"
userAgentFlag = "user-agent"
// EventPrefetchFlag is the prefetch count for the event subscriber
EventPrefetchFlag = "event-prefetch"
eventURIFlag = "event-srv"
configAPIURIFlag = "config-api"
redisURIFlag = "redis"
torURIFlag = "tor-proxy"
userAgentFlag = "user-agent"
)
// Provider is the implementation provider
@ -60,10 +62,12 @@ type Provider interface {
Cache(keyPrefix string) (cache.Cache, error)
// HTTPClient return a new configured http client
HTTPClient() (chttp.Client, error)
// GetValue return value for given key
GetValue(key string) string
// GetValue return values for given key
GetValues(key string) []string
// GetStrValue return string value for given key
GetStrValue(key string) string
// GetStrValues return string slice for given key
GetStrValues(key string) []string
// GetIntValue return int value for given key
GetIntValue(key string) int
}
type defaultProvider struct {
@ -89,7 +93,7 @@ func (p *defaultProvider) ConfigClient(keys []string) (configapi.Client, error)
}
func (p *defaultProvider) Subscriber() (event.Subscriber, error) {
return event.NewSubscriber(p.ctx.String(eventURIFlag), p.ctx.Int(eventPrefetchFlag))
return event.NewSubscriber(p.ctx.String(eventURIFlag), p.ctx.Int(EventPrefetchFlag))
}
func (p *defaultProvider) Publisher() (event.Publisher, error) {
@ -112,14 +116,18 @@ func (p *defaultProvider) HTTPClient() (chttp.Client, error) {
}), nil
}
func (p *defaultProvider) GetValue(key string) string {
func (p *defaultProvider) GetStrValue(key string) string {
return p.ctx.String(key)
}
func (p *defaultProvider) GetValues(key string) []string {
func (p *defaultProvider) GetStrValues(key string) []string {
return p.ctx.StringSlice(key)
}
func (p *defaultProvider) GetIntValue(key string) int {
return p.ctx.Int(key)
}
// SubscriberDef is the subscriber definition
type SubscriberDef struct {
Exchange string
@ -251,7 +259,7 @@ func getFeaturesFlags() map[Feature][]cli.Flag {
Required: true,
},
&cli.IntFlag{
Name: eventPrefetchFlag,
Name: EventPrefetchFlag,
Usage: "Prefetch for the event subscriber",
Value: 1,
},

Loading…
Cancel
Save