Implement persister

Add kibana & elasticsearch dependencies.
pull/3/head
Aloïs Micard 4 years ago
parent 7d2e666ba9
commit 5f1dd4bec8
No known key found for this signature in database
GPG Key ID: 1A0EB82F071F5EFE

@ -9,6 +9,21 @@ services:
image: dperson/torproxy:latest
logging:
driver: none
elasticsearch:
image: elasticsearch:7.5.1
logging:
driver: none
environment:
- discovery.type=single-node
- JAVA_OPTS=-Xms2g -Xmx2g
kibana:
image: kibana:7.5.1
logging:
driver: none
depends_on:
- elasticsearch
ports:
- 15004:5601
crawler:
image: trandoshan.io/crawler:latest
command: --log-level debug --nats-uri nats --tor-uri torproxy:9050
@ -27,7 +42,7 @@ services:
- nats
persister:
image: trandoshan.io/persister:latest
command: --log-level debug --nats-uri nats
command: --log-level debug --nats-uri nats --elasticsearch-uri http://elasticsearch:9200
restart: always
depends_on:
- nats

@ -5,6 +5,7 @@ go 1.14
require (
github.com/PuerkitoBio/purell v1.1.1
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/elastic/go-elasticsearch/v7 v7.6.0 // indirect
github.com/nats-io/nats.go v1.9.2
github.com/sirupsen/logrus v1.5.0
github.com/urfave/cli/v2 v2.2.0

@ -6,6 +6,9 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA=
github.com/elastic/go-elasticsearch/v7 v7.6.0 h1:sYpGLpEFHgLUKLsZUBfuaVI9QgHjS3JdH9fX4/z8QI8=
github.com/elastic/go-elasticsearch/v7 v7.6.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs=
github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=

@ -105,6 +105,6 @@ func handleMessage(httpClient *fasthttp.Client) natsutil.MsgHandler {
}
}
return nil // TODO
return nil
}
}

@ -1,14 +1,28 @@
package persister
import (
"bytes"
"context"
"encoding/json"
"github.com/creekorful/trandoshan/internal/log"
"github.com/creekorful/trandoshan/internal/natsutil"
"github.com/creekorful/trandoshan/pkg/proto"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
"strings"
"time"
)
type resourceIndex struct {
Url string `json:"url"`
Body string `json:"body"`
Title string `json:"title"`
Time time.Time `json:"time"`
}
// GetApp return the persister app
func GetApp() *cli.App {
return &cli.App{
@ -22,6 +36,11 @@ func GetApp() *cli.App {
Usage: "URI to the NATS server",
Required: true,
},
&cli.StringFlag{
Name: "elasticsearch-uri",
Usage: "URI to the Elasticsearch server",
Required: true,
},
},
Action: execute,
}
@ -34,6 +53,13 @@ func execute(ctx *cli.Context) error {
logrus.Debugf("Using NATS server at: %s", ctx.String("nats-uri"))
// Create Elasticsearch client
es, err := elasticsearch.NewClient(elasticsearch.Config{Addresses: []string{ctx.String("elasticsearch-uri")}})
if err != nil {
logrus.Errorf("Error while creating elasticsearch client: %s", err)
return err
}
// Create the NATS subscriber
sub, err := natsutil.NewSubscriber(ctx.String("nats-uri"))
if err != nil {
@ -43,14 +69,14 @@ func execute(ctx *cli.Context) error {
logrus.Info("Successfully initialized trandoshan-persister. Waiting for resources")
if err := sub.QueueSubscribe(proto.ResourceSubject, "persisters", handleMessage()); err != nil {
if err := sub.QueueSubscribe(proto.ResourceSubject, "persisters", handleMessage(es)); err != nil {
return err
}
return nil
}
func handleMessage() natsutil.MsgHandler {
func handleMessage(es *elasticsearch.Client) natsutil.MsgHandler {
return func(nc *nats.Conn, msg *nats.Msg) error {
var resMsg proto.ResourceMsg
if err := natsutil.ReadJSON(msg, &resMsg); err != nil {
@ -59,6 +85,47 @@ func handleMessage() natsutil.MsgHandler {
logrus.Debugf("Processing resource: %s", resMsg.URL)
// TODO store on file system
// Create Elasticsearch document
doc := resourceIndex{
Url: resMsg.URL,
Body: resMsg.Body,
Title: extractTitle(resMsg.Body),
Time: time.Now(),
}
// Serialize document into json
docBytes, err := json.Marshal(&doc)
if err != nil {
logrus.Warnf("Error while serializing document into json: %s", err)
}
// Use Elasticsearch to index document
req := esapi.IndexRequest{
Index: "resources",
Body: bytes.NewReader(docBytes),
Refresh: "true",
}
res, err := req.Do(context.Background(), es)
if err != nil {
logrus.Warnf("Error while creating elasticsearch index: %s", err)
}
defer res.Body.Close()
return nil
}
}
// extract title from html body
func extractTitle(body string) string {
cleanBody := strings.ToLower(body)
startPos := strings.Index(cleanBody, "<title>") + len("<title>")
endPos := strings.Index(cleanBody, "</title>")
// html tag absent of malformed
if startPos == -1 || endPos == -1 {
return ""
}
return body[startPos:endPos]
}

Loading…
Cancel
Save