Refactor base api

pull/94/head
Aloïs Micard 3 years ago
parent 8555c5eb05
commit c6a9038dad
No known key found for this signature in database
GPG Key ID: 1A0EB82F071F5EFE

@ -2,11 +2,12 @@ package main
import (
"github.com/creekorful/trandoshan/internal/api"
"github.com/creekorful/trandoshan/internal/process"
"os"
)
func main() {
app := api.GetApp()
app := process.MakeApp(&api.State{})
if err := app.Run(os.Args); err != nil {
os.Exit(1)
}

@ -1,83 +1,322 @@
package api
import (
"github.com/creekorful/trandoshan/internal/api/auth"
"github.com/creekorful/trandoshan/internal/api/rest"
"github.com/creekorful/trandoshan/internal/api/service"
"github.com/creekorful/trandoshan/internal/logging"
"github.com/creekorful/trandoshan/internal/util"
"github.com/labstack/echo/v4"
"encoding/base64"
"encoding/json"
"github.com/creekorful/trandoshan/api"
"github.com/creekorful/trandoshan/internal/api/database"
"github.com/creekorful/trandoshan/internal/duration"
"github.com/creekorful/trandoshan/internal/event"
"github.com/creekorful/trandoshan/internal/process"
"github.com/gorilla/mux"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"net/http"
"strconv"
"strings"
"time"
)
// GetApp return the api app
func GetApp() *cli.App {
return &cli.App{
Name: "tdsh-api",
Version: "0.7.0",
Usage: "Trandoshan API component",
Flags: []cli.Flag{
logging.GetLogFlag(),
util.GetHubURI(),
&cli.StringFlag{
Name: "elasticsearch-uri",
Usage: "URI to the Elasticsearch server",
Required: true,
},
&cli.StringFlag{
Name: "signing-key",
Usage: "Signing key for the JWT token",
Required: true,
},
&cli.StringSliceFlag{
Name: "users",
Usage: "List of API users. (Format user:password)",
Required: false,
},
&cli.StringFlag{
Name: "refresh-delay",
Usage: "Duration before allowing indexation of existing resource (none = never)",
},
var (
defaultPaginationSize = 50
maxPaginationSize = 100
)
type State struct {
db database.Database
pub event.Publisher
refreshDelay time.Duration
}
func (state *State) Name() string {
return "api"
}
func (state *State) CommonFlags() []string {
return []string{process.HubURIFlag}
}
func (state *State) CustomFlags() []cli.Flag {
return []cli.Flag{
&cli.StringFlag{
Name: "elasticsearch-uri",
Usage: "URI to the Elasticsearch server",
Required: true,
},
&cli.StringFlag{
Name: "signing-key",
Usage: "Signing key for the JWT token",
Required: true,
},
&cli.StringSliceFlag{
Name: "users",
Usage: "List of API users. (Format user:password)",
Required: false,
},
&cli.StringFlag{
Name: "refresh-delay",
Usage: "Duration before allowing indexation of existing resource (none = never)",
},
Action: execute,
}
}
func execute(c *cli.Context) error {
logging.ConfigureLogger(c)
func (state *State) Provide(provider process.Provider) error {
db, err := database.NewElasticDB(provider.GetValue("elasticsearch-uri"))
if err != nil {
return err
}
state.db = db
e := echo.New()
e.HTTPErrorHandler = func(err error, c echo.Context) {
log.Err(err).Msg("error while processing API call")
e.DefaultHTTPErrorHandler(err, c)
pub, err := provider.Subscriber()
if err != nil {
return err
}
e.HideBanner = true
state.pub = pub
log.Info().Str("ver", c.App.Version).
Str("elasticsearch-uri", c.String("elasticsearch-uri")).
Str("hub-uri", c.String("hub-uri")).
Msg("Starting tdsh-api")
state.refreshDelay = duration.ParseDuration(provider.GetValue("refresh-delay"))
signingKey := []byte(c.String("signing-key"))
return nil
}
// Create the service
svc, err := service.New(c)
func (state *State) Subscribers() []process.SubscriberDef {
return []process.SubscriberDef{}
}
func (state *State) HTTPHandler() http.Handler {
r := mux.NewRouter()
// TODO auth middleware
// signingKey := []byte(c.String("signing-key"))
// authMiddleware := auth.NewMiddleware(signingKey)
// e.Use(authMiddleware.Middleware())
r.HandleFunc("/v1/resources", state.searchResources).Methods(http.MethodGet)
r.HandleFunc("/v1/resources", state.addResource).Methods(http.MethodPost)
r.HandleFunc("/v1/urls", state.scheduleURL).Methods(http.MethodPost)
return r
}
func (state *State) searchResources(w http.ResponseWriter, r *http.Request) {
searchParams, err := getSearchParams(r)
if err != nil {
log.Err(err).Msg("error while creating API service")
return err
log.Err(err).Msg("error while getting search params")
w.WriteHeader(http.StatusBadRequest)
return
}
totalCount, err := state.db.CountResources(searchParams)
if err != nil {
log.Err(err).Msg("error while counting on ES")
w.WriteHeader(http.StatusInternalServerError)
return
}
res, err := state.db.SearchResources(searchParams)
if err != nil {
log.Err(err).Msg("error while searching on ES")
w.WriteHeader(http.StatusInternalServerError)
return
}
var resources []api.ResourceDto
for _, r := range res {
resources = append(resources, api.ResourceDto{
URL: r.URL,
Body: r.Body,
Title: r.Title,
Time: r.Time,
})
}
w.WriteHeader(http.StatusOK)
// Write pagination headers
writePagination(w, searchParams, totalCount)
// Write body
if err := json.NewEncoder(w).Encode(resources); err != nil {
log.Err(err).Msg("error while encoding response")
w.WriteHeader(http.StatusInternalServerError)
}
}
func (state *State) addResource(w http.ResponseWriter, r *http.Request) {
var res api.ResourceDto
if err := json.NewDecoder(r.Body).Decode(&res); err != nil {
log.Warn().Str("err", err.Error()).Msg("error while decoding request body")
w.WriteHeader(http.StatusUnprocessableEntity)
return
}
// Hacky stuff to prevent from adding 'duplicate resource'
// the thing is: even with the scheduler preventing from crawling 'duplicates' URL by adding a refresh period
// and checking if the resource is not already indexed, this implementation may not work if the URLs was published
// before the resource is saved. And this happen a LOT of time.
// therefore the best thing to do is to make the API check if the resource should **really** be added by checking if
// it isn't present on the database. This may sounds hacky, but it's the best solution i've come up at this time.
endDate := time.Time{}
if state.refreshDelay != -1 {
endDate = time.Now().Add(-state.refreshDelay)
}
count, err := state.db.CountResources(&api.ResSearchParams{
URL: res.URL,
EndDate: endDate,
PageSize: 1,
PageNumber: 1,
})
if err != nil {
log.Err(err).Msg("error while searching for resource")
w.WriteHeader(http.StatusInternalServerError)
return
}
// Setup middlewares
authMiddleware := auth.NewMiddleware(signingKey)
e.Use(authMiddleware.Middleware())
if count > 0 {
// Not an error
log.Debug().Str("url", res.URL).Msg("Skipping duplicate resource")
w.WriteHeader(http.StatusOK)
return
}
// Create Elasticsearch document
doc := database.ResourceIdx{
URL: res.URL,
Body: res.Body,
Time: res.Time,
Title: res.Title,
Meta: res.Meta,
Description: res.Description,
Headers: res.Headers,
}
if err := state.db.AddResource(doc); err != nil {
log.Err(err).Msg("Error while adding resource")
w.WriteHeader(http.StatusInternalServerError)
return
}
log.Info().Str("url", res.URL).Msg("Successfully saved resource")
// Add endpoints
e.GET("/v1/resources", rest.SearchResources(svc))
e.POST("/v1/resources", rest.AddResource(svc))
e.POST("/v1/urls", rest.ScheduleURL(svc))
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(res); err != nil {
log.Err(err).Msg("error while encoding response")
w.WriteHeader(http.StatusInternalServerError)
}
}
func (state *State) scheduleURL(w http.ResponseWriter, r *http.Request) {
var url string
if err := json.NewDecoder(r.Body).Decode(&url); err != nil {
log.Warn().Str("err", err.Error()).Msg("error while decoding request body")
w.WriteHeader(http.StatusUnprocessableEntity)
return
}
if err := state.pub.PublishEvent(&event.FoundURLEvent{URL: url}); err != nil {
log.Err(err).Msg("unable to schedule URL")
w.WriteHeader(http.StatusInternalServerError)
return
}
log.Info().Str("url", url).Msg("successfully scheduled URL")
w.WriteHeader(http.StatusOK)
}
func getSearchParams(r *http.Request) (*api.ResSearchParams, error) {
params := &api.ResSearchParams{}
if param := r.URL.Query()["keyword"]; len(param) == 1 {
params.Keyword = param[0]
}
if param := r.URL.Query()["with-body"]; len(param) == 1 {
params.WithBody = param[0] == "true"
}
// extract raw query params (unescaped to keep + sign when parsing date)
rawQueryParams := getRawQueryParam(r.URL.RawQuery)
if val, exist := rawQueryParams["start-date"]; exist {
d, err := time.Parse(time.RFC3339, val)
if err == nil {
params.StartDate = d
} else {
return nil, err
}
}
log.Info().Msg("Successfully initialized tdsh-api. Waiting for requests")
if val, exist := rawQueryParams["end-date"]; exist {
d, err := time.Parse(time.RFC3339, val)
if err == nil {
params.EndDate = d
} else {
return nil, err
}
}
// base64decode the URL
if param := r.URL.Query()["url"]; len(param) == 1 {
b, err := base64.URLEncoding.DecodeString(param[0])
if err != nil {
return nil, err
}
params.URL = string(b)
}
// Acquire pagination
page, size := getPagination(r)
params.PageNumber = page
params.PageSize = size
return params, nil
}
func writePagination(w http.ResponseWriter, searchParams *api.ResSearchParams, total int64) {
w.Header().Set(api.PaginationPageHeader, strconv.Itoa(searchParams.PageNumber))
w.Header().Set(api.PaginationSizeHeader, strconv.Itoa(searchParams.PageSize))
w.Header().Set(api.PaginationCountHeader, strconv.FormatInt(total, 10))
}
func getPagination(r *http.Request) (page int, size int) {
page = 1
size = defaultPaginationSize
// Get pagination page
if param := r.URL.Query()[api.PaginationPageQueryParam]; len(param) == 1 {
if val, err := strconv.Atoi(param[0]); err == nil {
page = val
}
}
// Get pagination size
if param := r.URL.Query()[api.PaginationSizeQueryParam]; len(param) == 1 {
if val, err := strconv.Atoi(param[0]); err == nil {
size = val
}
}
// Prevent too much results from being returned
if size > maxPaginationSize {
size = maxPaginationSize
}
return page, size
}
func getRawQueryParam(url string) map[string]string {
if url == "" {
return map[string]string{}
}
val := map[string]string{}
parts := strings.Split(url, "&")
for _, part := range parts {
p := strings.Split(part, "=")
val[p[0]] = p[1]
}
return e.Start(":8080")
return val
}

@ -0,0 +1,349 @@
package api
import (
"bytes"
"encoding/json"
"fmt"
"github.com/creekorful/trandoshan/api"
"github.com/creekorful/trandoshan/internal/api/database"
"github.com/creekorful/trandoshan/internal/api/database_mock"
"github.com/creekorful/trandoshan/internal/event"
"github.com/creekorful/trandoshan/internal/event_mock"
"github.com/golang/mock/gomock"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
)
func TestWritePagination(t *testing.T) {
rec := httptest.NewRecorder()
searchParams := &api.ResSearchParams{
PageSize: 15,
PageNumber: 7,
}
total := int64(1200)
writePagination(rec, searchParams, total)
if rec.Header().Get(api.PaginationPageHeader) != "7" {
t.Fail()
}
if rec.Header().Get(api.PaginationSizeHeader) != "15" {
t.Fail()
}
if rec.Header().Get(api.PaginationCountHeader) != "1200" {
t.Fail()
}
}
func TestReadPagination(t *testing.T) {
// valid params
req := httptest.NewRequest(http.MethodGet, "/index.php?pagination-page=1&pagination-size=10", nil)
if page, size := getPagination(req); page != 1 || size != 10 {
t.Errorf("wanted page: 1, size: 10 (got %d, %d)", page, size)
}
// make sure invalid parameter are set as wanted
req = httptest.NewRequest(http.MethodGet, "/index.php?pagination-page=abcd&pagination-size=lol", nil)
if page, size := getPagination(req); page != 1 || size != defaultPaginationSize {
t.Errorf("wanted page: 1, size: %d (got %d, %d)", defaultPaginationSize, page, size)
}
// make sure we prevent too much results from being returned
target := fmt.Sprintf("/index.php?pagination-page=10&pagination-size=%d", maxPaginationSize+1)
req = httptest.NewRequest(http.MethodGet, target, nil)
if page, size := getPagination(req); page != 10 || size != maxPaginationSize {
t.Errorf("wanted page: 10, size: %d (got %d, %d)", maxPaginationSize, page, size)
}
// make sure no parameter we set to default
req = httptest.NewRequest(http.MethodGet, "/index.php", nil)
if page, size := getPagination(req); page != 1 || size != defaultPaginationSize {
t.Errorf("wanted page: 1, size: %d (got %d, %d)", defaultPaginationSize, page, size)
}
}
func TestGetSearchParams(t *testing.T) {
startDate := time.Now()
target := fmt.Sprintf("/resources?with-body=true&pagination-page=1&keyword=keyword&url=dXJs&start-date=%s", startDate.Format(time.RFC3339))
req := httptest.NewRequest(http.MethodPost, target, nil)
params, err := getSearchParams(req)
if err != nil {
t.Errorf("error while parsing search params: %s", err)
t.FailNow()
}
if !params.WithBody {
t.Errorf("wrong withBody: %v", params.WithBody)
}
if params.PageSize != 50 {
t.Errorf("wrong pagination-size: %d", params.PageSize)
}
if params.PageNumber != 1 {
t.Errorf("wrong pagination-page: %d", params.PageNumber)
}
if params.Keyword != "keyword" {
t.Errorf("wrong keyword: %s", params.Keyword)
}
if params.StartDate.Year() != startDate.Year() {
t.Errorf("wrong start-date (year)")
}
if params.StartDate.Month() != startDate.Month() {
t.Errorf("wrong start-date (month)")
}
if params.StartDate.Day() != startDate.Day() {
t.Errorf("wrong start-date (day)")
}
if params.StartDate.Hour() != startDate.Hour() {
t.Errorf("wrong start-date (hour)")
}
if params.StartDate.Minute() != startDate.Minute() {
t.Errorf("wrong start-date (minute)")
}
if params.StartDate.Second() != startDate.Second() {
t.Errorf("wrong start-date (second)")
}
if params.URL != "url" {
t.Errorf("wrong url: %s", params.URL)
}
}
func TestScheduleURL(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
// The requests
req := httptest.NewRequest(http.MethodPost, "/v1/urls", strings.NewReader("\"https://google.onion\""))
rec := httptest.NewRecorder()
// Mocking status
pubMock := event_mock.NewMockPublisher(mockCtrl)
s := State{pub: pubMock}
pubMock.EXPECT().PublishEvent(&event.FoundURLEvent{URL: "https://google.onion"}).Return(nil)
s.scheduleURL(rec, req)
if rec.Code != http.StatusOK {
t.Fail()
}
}
func TestAddResource(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
body := api.ResourceDto{
URL: "https://example.onion",
Body: "TheBody",
Title: "Example",
Time: time.Time{},
Meta: map[string]string{"content": "content-meta"},
Description: "the description",
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
}
bodyBytes, err := json.Marshal(body)
if err != nil {
t.FailNow()
}
// The requests
req := httptest.NewRequest(http.MethodPost, "/v1/resources", bytes.NewReader(bodyBytes))
rec := httptest.NewRecorder()
dbMock := database_mock.NewMockDatabase(mockCtrl)
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
URL: "https://example.onion",
PageSize: 1,
PageNumber: 1,
}}).Return(int64(0), nil)
dbMock.EXPECT().AddResource(database.ResourceIdx{
URL: "https://example.onion",
Body: "TheBody",
Title: "Example",
Time: time.Time{},
Meta: map[string]string{"content": "content-meta"},
Description: "the description",
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
})
s := State{db: dbMock, refreshDelay: 5 * time.Hour}
s.addResource(rec, req)
if rec.Code != http.StatusOK {
t.FailNow()
}
var res api.ResourceDto
if err := json.NewDecoder(rec.Body).Decode(&res); err != nil {
t.FailNow()
}
if res.URL != "https://example.onion" {
t.FailNow()
}
if res.Body != "TheBody" {
t.FailNow()
}
if res.Title != "Example" {
t.FailNow()
}
if !res.Time.IsZero() {
t.FailNow()
}
if res.Meta["content"] != "content-meta" {
t.FailNow()
}
if res.Description != "the description" {
t.FailNow()
}
if res.Headers["Content-Type"] != "application/html" {
t.FailNow()
}
if res.Headers["Server"] != "Traefik" {
t.FailNow()
}
}
func TestAddResourceDuplicateNotAllowed(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
body := api.ResourceDto{
URL: "https://example.onion",
Body: "TheBody",
Title: "Example",
Time: time.Time{},
Meta: map[string]string{"content": "content-meta"},
Description: "the description",
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
}
bodyBytes, err := json.Marshal(body)
if err != nil {
t.FailNow()
}
// The requests
req := httptest.NewRequest(http.MethodPost, "/v1/resources", bytes.NewReader(bodyBytes))
rec := httptest.NewRecorder()
dbMock := database_mock.NewMockDatabase(mockCtrl)
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
URL: "https://example.onion",
PageSize: 1,
PageNumber: 1,
}, endDateZero: true}).Return(int64(1), nil)
s := State{db: dbMock, refreshDelay: -1}
s.addResource(rec, req)
if rec.Code != http.StatusOK {
t.FailNow()
}
}
func TestAddResourceTooYoung(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
body := api.ResourceDto{
URL: "https://example.onion",
Body: "TheBody",
Title: "Example",
Time: time.Time{},
Meta: map[string]string{"content": "content-meta"},
Description: "the description",
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
}
bodyBytes, err := json.Marshal(body)
if err != nil {
t.FailNow()
}
// The requests
req := httptest.NewRequest(http.MethodPost, "/v1/resources", bytes.NewReader(bodyBytes))
rec := httptest.NewRecorder()
dbMock := database_mock.NewMockDatabase(mockCtrl)
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
URL: "https://example.onion",
EndDate: time.Now().Add(-10 * time.Minute),
PageSize: 1,
PageNumber: 1,
}}).Return(int64(1), nil)
s := State{db: dbMock, refreshDelay: -10 * time.Minute}
s.addResource(rec, req)
if rec.Code != http.StatusOK {
t.FailNow()
}
}
func TestSearchResources(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
// The requests
req := httptest.NewRequest(http.MethodPost, "/v1/resources?keyword=example", nil)
rec := httptest.NewRecorder()
dbMock := database_mock.NewMockDatabase(mockCtrl)
dbMock.EXPECT().CountResources(gomock.Any()).Return(int64(150), nil)
dbMock.EXPECT().SearchResources(gomock.Any()).Return([]database.ResourceIdx{
{
URL: "example-1.onion",
Body: "Example 1",
Title: "Example 1",
Time: time.Time{},
},
{
URL: "example-2.onion",
Body: "Example 2",
Title: "Example 2",
Time: time.Time{},
},
}, nil)
s := State{db: dbMock}
s.searchResources(rec, req)
if rec.Header().Get(api.PaginationCountHeader) != "150" {
t.Fail()
}
var resources []api.ResourceDto
if err := json.NewDecoder(rec.Body).Decode(&resources); err != nil {
t.Fatalf("error while decoding body: %s", err)
}
if len(resources) != 2 {
t.Errorf("got %d resources want 2", len(resources))
}
}
// custom matcher to ignore time field when doing comparison ;(
// todo: do less crappy?
type searchParamsMatcher struct {
target api.ResSearchParams
endDateZero bool
}
func (sm *searchParamsMatcher) Matches(x interface{}) bool {
arg := x.(*api.ResSearchParams)
return arg.URL == sm.target.URL && arg.PageSize == sm.target.PageSize && arg.PageNumber == sm.target.PageNumber &&
sm.endDateZero == arg.EndDate.IsZero()
}
func (sm *searchParamsMatcher) String() string {
return "is valid search params"
}

@ -1,18 +1,12 @@
package database
import (
"context"
"encoding/json"
"github.com/creekorful/trandoshan/api"
"github.com/olivere/elastic/v7"
"github.com/rs/zerolog/log"
"time"
)
//go:generate mockgen -destination=../database_mock/database_mock.go -package=database_mock . Database
var resourcesIndex = "resources"
// ResourceIdx represent a resource as stored in elasticsearch
type ResourceIdx struct {
URL string `json:"url"`
@ -31,139 +25,3 @@ type Database interface {
CountResources(params *api.ResSearchParams) (int64, error)
AddResource(res ResourceIdx) error
}
type elasticSearchDB struct {
client *elastic.Client
}
// NewElasticDB create a new Database based on ES instance
func NewElasticDB(uri string) (Database, error) {
// Create Elasticsearch client
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
ec, err := elastic.DialContext(ctx,
elastic.SetURL(uri),
elastic.SetSniff(false),
elastic.SetHealthcheck(false),
)
if err != nil {
return nil, err
}
if err := setupElasticSearch(ctx, ec); err != nil {
return nil, err
}
return &elasticSearchDB{
client: ec,
}, nil
}
func (e *elasticSearchDB) SearchResources(params *api.ResSearchParams) ([]ResourceIdx, error) {
q := buildSearchQuery(params)
from := (params.PageNumber - 1) * params.PageSize
res, err := e.client.Search().
Index(resourcesIndex).
Query(q).
From(from).
Size(params.PageSize).
Do(context.Background())
if err != nil {
return nil, err
}
var resources []ResourceIdx
for _, hit := range res.Hits.Hits {
var resource ResourceIdx
if err := json.Unmarshal(hit.Source, &resource); err != nil {
log.Warn().Str("err", err.Error()).Msg("Error while un-marshaling resource")
continue
}
// Remove body if not wanted
if !params.WithBody {
resource.Body = ""
}
resources = append(resources, resource)
}
return resources, nil
}
func (e *elasticSearchDB) CountResources(params *api.ResSearchParams) (int64, error) {
q := buildSearchQuery(params)
count, err := e.client.Count(resourcesIndex).Query(q).Do(context.Background())
if err != nil {
return 0, err
}
return count, nil
}
func (e *elasticSearchDB) AddResource(res ResourceIdx) error {
_, err := e.client.Index().
Index(resourcesIndex).
BodyJson(res).
Do(context.Background())
return err
}
func buildSearchQuery(params *api.ResSearchParams) elastic.Query {
var queries []elastic.Query
if params.URL != "" {
log.Trace().Str("url", params.URL).Msg("SearchQuery: Setting url")
queries = append(queries, elastic.NewTermQuery("url.keyword", params.URL))
}
if params.Keyword != "" {
log.Trace().Str("body", params.Keyword).Msg("SearchQuery: Setting body")
queries = append(queries, elastic.NewMatchQuery("body", params.Keyword))
}
if !params.StartDate.IsZero() || !params.EndDate.IsZero() {
timeQuery := elastic.NewRangeQuery("time")
if !params.StartDate.IsZero() {
log.Trace().
Str("startDate", params.StartDate.Format(time.RFC3339)).
Msg("SearchQuery: Setting startDate")
timeQuery.Gte(params.StartDate.Format(time.RFC3339))
}
if !params.EndDate.IsZero() {
log.Trace().
Str("endDate", params.EndDate.Format(time.RFC3339)).
Msg("SearchQuery: Setting endDate")
timeQuery.Lte(params.EndDate.Format(time.RFC3339))
}
queries = append(queries, timeQuery)
}
// Handle specific case
if len(queries) == 0 {
return elastic.NewMatchAllQuery()
}
if len(queries) == 1 {
return queries[0]
}
// otherwise AND combine them
return elastic.NewBoolQuery().Must(queries...)
}
func setupElasticSearch(ctx context.Context, es *elastic.Client) error {
// Setup index if doesn't exist
exist, err := es.IndexExists(resourcesIndex).Do(ctx)
if err != nil {
return err
}
if !exist {
log.Debug().Str("index", resourcesIndex).Msg("Creating missing index")
if _, err := es.CreateIndex(resourcesIndex).Do(ctx); err != nil {
return err
}
}
return nil
}

@ -0,0 +1,148 @@
package database
import (
"context"
"encoding/json"
"github.com/creekorful/trandoshan/api"
"github.com/olivere/elastic/v7"
"github.com/rs/zerolog/log"
"time"
)
var resourcesIndex = "resources"
type elasticSearchDB struct {
client *elastic.Client
}
// NewElasticDB create a new Database based on ES instance
func NewElasticDB(uri string) (Database, error) {
// Create Elasticsearch client
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
ec, err := elastic.DialContext(ctx,
elastic.SetURL(uri),
elastic.SetSniff(false),
elastic.SetHealthcheck(false),
)
if err != nil {
return nil, err
}
if err := setupElasticSearch(ctx, ec); err != nil {
return nil, err
}
return &elasticSearchDB{
client: ec,
}, nil
}
func (e *elasticSearchDB) SearchResources(params *api.ResSearchParams) ([]ResourceIdx, error) {
q := buildSearchQuery(params)
from := (params.PageNumber - 1) * params.PageSize
res, err := e.client.Search().
Index(resourcesIndex).
Query(q).
From(from).
Size(params.PageSize).
Do(context.Background())
if err != nil {
return nil, err
}
var resources []ResourceIdx
for _, hit := range res.Hits.Hits {
var resource ResourceIdx
if err := json.Unmarshal(hit.Source, &resource); err != nil {
log.Warn().Str("err", err.Error()).Msg("Error while un-marshaling resource")
continue
}
// Remove body if not wanted
if !params.WithBody {
resource.Body = ""
}
resources = append(resources, resource)
}
return resources, nil
}
func (e *elasticSearchDB) CountResources(params *api.ResSearchParams) (int64, error) {
q := buildSearchQuery(params)
count, err := e.client.Count(resourcesIndex).Query(q).Do(context.Background())
if err != nil {
return 0, err
}
return count, nil
}
func (e *elasticSearchDB) AddResource(res ResourceIdx) error {
_, err := e.client.Index().
Index(resourcesIndex).
BodyJson(res).
Do(context.Background())
return err
}
func buildSearchQuery(params *api.ResSearchParams) elastic.Query {
var queries []elastic.Query
if params.URL != "" {
log.Trace().Str("url", params.URL).Msg("SearchQuery: Setting url")
queries = append(queries, elastic.NewTermQuery("url.keyword", params.URL))
}
if params.Keyword != "" {
log.Trace().Str("body", params.Keyword).Msg("SearchQuery: Setting body")
queries = append(queries, elastic.NewMatchQuery("body", params.Keyword))
}
if !params.StartDate.IsZero() || !params.EndDate.IsZero() {
timeQuery := elastic.NewRangeQuery("time")
if !params.StartDate.IsZero() {
log.Trace().
Str("startDate", params.StartDate.Format(time.RFC3339)).
Msg("SearchQuery: Setting startDate")
timeQuery.Gte(params.StartDate.Format(time.RFC3339))
}
if !params.EndDate.IsZero() {
log.Trace().
Str("endDate", params.EndDate.Format(time.RFC3339)).
Msg("SearchQuery: Setting endDate")
timeQuery.Lte(params.EndDate.Format(time.RFC3339))
}
queries = append(queries, timeQuery)
}
// Handle specific case
if len(queries) == 0 {
return elastic.NewMatchAllQuery()
}
if len(queries) == 1 {
return queries[0]
}
// otherwise AND combine them
return elastic.NewBoolQuery().Must(queries...)
}
func setupElasticSearch(ctx context.Context, es *elastic.Client) error {
// Setup index if doesn't exist
exist, err := es.IndexExists(resourcesIndex).Do(ctx)
if err != nil {
return err
}
if !exist {
log.Debug().Str("index", resourcesIndex).Msg("Creating missing index")
if _, err := es.CreateIndex(resourcesIndex).Do(ctx); err != nil {
return err
}
}
return nil
}

@ -1,150 +0,0 @@
package rest
import (
"encoding/base64"
"github.com/creekorful/trandoshan/api"
"github.com/creekorful/trandoshan/internal/api/service"
"github.com/labstack/echo/v4"
"net/http"
"strconv"
"strings"
"time"
)
var (
defaultPaginationSize = 50
maxPaginationSize = 100
)
// SearchResources allows to search resources
func SearchResources(s *service.Service) echo.HandlerFunc {
return func(c echo.Context) error {
searchParams, err := newSearchParams(c)
if err != nil {
return err
}
resources, total, err := s.SearchResources(searchParams)
if err != nil {
return err
}
writePagination(c, searchParams, total)
return c.JSON(http.StatusOK, resources)
}
}
// AddResource persist a new resource
func AddResource(s *service.Service) echo.HandlerFunc {
return func(c echo.Context) error {
var res api.ResourceDto
if err := c.Bind(&res); err != nil {
return err
}
res, err := s.AddResource(res)
if err != nil {
return err
}
return c.JSON(http.StatusCreated, res)
}
}
// ScheduleURL schedule given URL for crawling
func ScheduleURL(s *service.Service) echo.HandlerFunc {
return func(c echo.Context) error {
var url string
if err := c.Bind(&url); err != nil {
return err
}
return s.ScheduleURL(url)
}
}
func readPagination(c echo.Context) (int, int) {
paginationPage, err := strconv.Atoi(c.QueryParam(api.PaginationPageQueryParam))
if err != nil {
paginationPage = 1
}
paginationSize, err := strconv.Atoi(c.QueryParam(api.PaginationSizeQueryParam))
if err != nil {
paginationSize = defaultPaginationSize
}
// Prevent too much results from being returned
if paginationSize > maxPaginationSize {
paginationSize = maxPaginationSize
}
return paginationPage, paginationSize
}
func writePagination(c echo.Context, s *api.ResSearchParams, totalCount int64) {
c.Response().Header().Set(api.PaginationPageHeader, strconv.Itoa(s.PageNumber))
c.Response().Header().Set(api.PaginationSizeHeader, strconv.Itoa(s.PageSize))
c.Response().Header().Set(api.PaginationCountHeader, strconv.FormatInt(totalCount, 10))
}
func newSearchParams(c echo.Context) (*api.ResSearchParams, error) {
params := &api.ResSearchParams{}
params.Keyword = c.QueryParam("keyword")
if c.QueryParam("with-body") == "true" {
params.WithBody = true
}
// extract raw query params (unescaped to keep + sign when parsing date)
rawQueryParams := getRawQueryParam(c.QueryString())
if val, exist := rawQueryParams["start-date"]; exist {
d, err := time.Parse(time.RFC3339, val)
if err == nil {
params.StartDate = d
} else {
return nil, err
}
}
if val, exist := rawQueryParams["end-date"]; exist {
d, err := time.Parse(time.RFC3339, val)
if err == nil {
params.EndDate = d
} else {
return nil, err
}
}
// First of all base64decode the URL
b64URL := c.QueryParam("url")
b, err := base64.URLEncoding.DecodeString(b64URL)
if err != nil {
return nil, err
}
params.URL = string(b)
// Acquire pagination
page, size := readPagination(c)
params.PageNumber = page
params.PageSize = size
return params, nil
}
func getRawQueryParam(url string) map[string]string {
if url == "" {
return map[string]string{}
}
val := map[string]string{}
parts := strings.Split(url, "&")
for _, part := range parts {
p := strings.Split(part, "=")
val[p[0]] = p[1]
}
return val
}

@ -1,62 +0,0 @@
package rest
import (
"fmt"
"github.com/labstack/echo/v4"
"net/http"
"net/http/httptest"
"testing"
"time"
)
func TestNewSearchParams(t *testing.T) {
e := echo.New()
startDate := time.Now()
target := fmt.Sprintf("/resources?with-body=true&pagination-page=1&keyword=keyword&url=dXJs&start-date=%s", startDate.Format(time.RFC3339))
req := httptest.NewRequest(http.MethodPost, target, nil)
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
params, err := newSearchParams(c)
if err != nil {
t.Errorf("error while parsing search params: %s", err)
t.FailNow()
}
if !params.WithBody {
t.Errorf("wrong withBody: %v", params.WithBody)
}
if params.PageSize != 50 {
t.Errorf("wrong pagination-size: %d", params.PageSize)
}
if params.PageNumber != 1 {
t.Errorf("wrong pagination-page: %d", params.PageNumber)
}
if params.Keyword != "keyword" {
t.Errorf("wrong keyword: %s", params.Keyword)
}
if params.StartDate.Year() != startDate.Year() {
t.Errorf("wrong start-date (year)")
}
if params.StartDate.Month() != startDate.Month() {
t.Errorf("wrong start-date (month)")
}
if params.StartDate.Day() != startDate.Day() {
t.Errorf("wrong start-date (day)")
}
if params.StartDate.Hour() != startDate.Hour() {
t.Errorf("wrong start-date (hour)")
}
if params.StartDate.Minute() != startDate.Minute() {
t.Errorf("wrong start-date (minute)")
}
if params.StartDate.Second() != startDate.Second() {
t.Errorf("wrong start-date (second)")
}
if params.URL != "url" {
t.Errorf("wrong url: %s", params.URL)
}
}

@ -1,136 +0,0 @@
package service
import (
"fmt"
"github.com/creekorful/trandoshan/api"
"github.com/creekorful/trandoshan/internal/api/database"
"github.com/creekorful/trandoshan/internal/duration"
"github.com/creekorful/trandoshan/internal/event"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"time"
)
// Service represent the functionality the API expose
type Service struct {
db database.Database
pub event.Publisher
refreshDelay time.Duration
}
// New create a new Service instance
func New(c *cli.Context) (*Service, error) {
// Connect to the messaging server
pub, err := event.NewPublisher(c.String("hub-uri"))
if err != nil {
return nil, fmt.Errorf("error while connecting to hub server: %s", err)
}
// Create Elasticsearch client
db, err := database.NewElasticDB(c.String("elasticsearch-uri"))
if err != nil {
return nil, fmt.Errorf("error while connecting to the database: %s", err)
}
refreshDelay := duration.ParseDuration(c.String("refresh-delay"))
return &Service{
db: db,
pub: pub,
refreshDelay: refreshDelay,
}, nil
}
// SearchResources allows to search resources using given params
func (s *Service) SearchResources(params *api.ResSearchParams) ([]api.ResourceDto, int64, error) {
totalCount, err := s.db.CountResources(params)
if err != nil {
log.Err(err).Msg("Error while counting on ES")
return nil, 0, err
}
res, err := s.db.SearchResources(params)
if err != nil {
log.Err(err).Msg("Error while searching on ES")
return nil, 0, err
}
var resources []api.ResourceDto
for _, r := range res {
resources = append(resources, api.ResourceDto{
URL: r.URL,
Body: r.Body,
Title: r.Title,
Time: r.Time,
})
}
return resources, totalCount, nil
}
// AddResource allows to add given resource
func (s *Service) AddResource(res api.ResourceDto) (api.ResourceDto, error) {
// Hacky stuff to prevent from adding 'duplicate resource'
// the thing is: even with the scheduler preventing from crawling 'duplicates' URL by adding a refresh period
// and checking if the resource is not already indexed, this implementation may not work if the URLs was published
// before the resource is saved. And this happen a LOT of time.
// therefore the best thing to do is to make the API check if the resource should **really** be added by checking if
// it isn't present on the database. This may sounds hacky, but it's the best solution i've come up at this time.
endDate := time.Time{}
if s.refreshDelay != -1 {
endDate = time.Now().Add(-s.refreshDelay)
}
count, err := s.db.CountResources(&api.ResSearchParams{
URL: res.URL,
EndDate: endDate,
PageSize: 1,
PageNumber: 1,
})
if err != nil {
log.Err(err).Msg("error while searching for resource")
return api.ResourceDto{}, nil
}
if count > 0 {
// Not an error
log.Debug().Str("url", res.URL).Msg("Skipping duplicate resource")
return res, nil
}
// Create Elasticsearch document
doc := database.ResourceIdx{
URL: res.URL,
Body: res.Body,
Time: res.Time,
Title: res.Title,
Meta: res.Meta,
Description: res.Description,
Headers: res.Headers,
}
if err := s.db.AddResource(doc); err != nil {
log.Err(err).Msg("Error while adding resource")
return api.ResourceDto{}, err
}
log.Debug().Str("url", res.URL).Msg("Successfully saved resource")
return res, nil
}
// ScheduleURL schedule given url for crawling
func (s *Service) ScheduleURL(url string) error {
// Publish the URL
if err := s.pub.PublishEvent(&event.FoundURLEvent{URL: url}); err != nil {
log.Err(err).Msg("Unable to publish URL")
return err
}
log.Debug().Str("url", url).Msg("Successfully published URL")
return nil
}
// Close disconnect the service consumer
func (s *Service) Close() {
s.pub.Close()
}

@ -1,202 +0,0 @@
package service
import (
"github.com/creekorful/trandoshan/api"
"github.com/creekorful/trandoshan/internal/api/database"
"github.com/creekorful/trandoshan/internal/api/database_mock"
"github.com/creekorful/trandoshan/internal/event"
"github.com/creekorful/trandoshan/internal/event_mock"
"github.com/golang/mock/gomock"
"testing"
"time"
)
func TestSearchResources(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
params := &api.ResSearchParams{Keyword: "example"}
dbMock := database_mock.NewMockDatabase(mockCtrl)
dbMock.EXPECT().CountResources(params).Return(int64(150), nil)
dbMock.EXPECT().SearchResources(params).Return([]database.ResourceIdx{
{
URL: "example-1.onion",
Body: "Example 1",
Title: "Example 1",
Time: time.Time{},
},
{
URL: "example-2.onion",
Body: "Example 2",
Title: "Example 2",
Time: time.Time{},
},
}, nil)
s := Service{db: dbMock}
res, count, err := s.SearchResources(params)
if err != nil {
t.FailNow()
}
if count != 150 {
t.Error()
}
if len(res) != 2 {
t.Error()
}
}
func TestAddResource(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
dbMock := database_mock.NewMockDatabase(mockCtrl)
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
URL: "https://example.onion",
PageSize: 1,
PageNumber: 1,
}}).Return(int64(0), nil)
dbMock.EXPECT().AddResource(database.ResourceIdx{
URL: "https://example.onion",
Body: "TheBody",
Title: "Example",
Time: time.Time{},
Meta: map[string]string{"content": "content-meta"},
Description: "the description",
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
})
s := Service{db: dbMock, refreshDelay: 5 * time.Hour}
res, err := s.AddResource(api.ResourceDto{
URL: "https://example.onion",
Body: "TheBody",
Title: "Example",
Time: time.Time{},
Meta: map[string]string{"content": "content-meta"},
Description: "the description",
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
})
if err != nil {
t.FailNow()
}
if res.URL != "https://example.onion" {
t.FailNow()
}
if res.Body != "TheBody" {
t.FailNow()
}
if res.Title != "Example" {
t.FailNow()
}
if !res.Time.IsZero() {
t.FailNow()
}
if res.Meta["content"] != "content-meta" {
t.FailNow()
}
if res.Description != "the description" {
t.FailNow()
}
if res.Headers["Content-Type"] != "application/html" {
t.FailNow()
}
if res.Headers["Server"] != "Traefik" {
t.FailNow()
}
}
func TestAddResourceDuplicateNotAllowed(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
dbMock := database_mock.NewMockDatabase(mockCtrl)
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
URL: "https://example.onion",
PageSize: 1,
PageNumber: 1,
}, endDateZero: true}).Return(int64(1), nil)
s := Service{db: dbMock, refreshDelay: -1}
_, err := s.AddResource(api.ResourceDto{
URL: "https://example.onion",
Body: "TheBody",
Title: "Example",
Time: time.Time{},
Meta: map[string]string{"content": "content-meta"},
Description: "the description",
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
})
if err != nil {
t.FailNow()
}
}
func TestAddResourceTooYoung(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
dbMock := database_mock.NewMockDatabase(mockCtrl)
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
URL: "https://example.onion",
EndDate: time.Now().Add(-10 * time.Minute),
PageSize: 1,
PageNumber: 1,
}}).Return(int64(1), nil)
s := Service{db: dbMock, refreshDelay: -10 * time.Minute}
_, err := s.AddResource(api.ResourceDto{
URL: "https://example.onion",
Body: "TheBody",
Title: "Example",
Time: time.Time{},
Meta: map[string]string{"content": "content-meta"},
Description: "the description",
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
})
if err != nil {
t.FailNow()
}
}
func TestScheduleURL(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
pubMock := event_mock.NewMockPublisher(mockCtrl)
s := Service{pub: pubMock}
pubMock.EXPECT().PublishEvent(&event.FoundURLEvent{URL: "https://example.onion"})
if err := s.ScheduleURL("https://example.onion"); err != nil {
t.FailNow()
}
}
// custom matcher to ignore time field when doing comparison ;(
// todo: do less crappy?
type searchParamsMatcher struct {
target api.ResSearchParams
endDateZero bool
}
func (sm *searchParamsMatcher) Matches(x interface{}) bool {
arg := x.(*api.ResSearchParams)
return arg.URL == sm.target.URL && arg.PageSize == sm.target.PageSize && arg.PageNumber == sm.target.PageNumber &&
sm.endDateZero == arg.EndDate.IsZero()
}
func (sm *searchParamsMatcher) String() string {
return "is valid search params"
}

@ -30,6 +30,7 @@ type Provider interface {
ConfigClient(keys []string) (configapi.Client, error)
APIClient() (api.API, error)
Subscriber() (event.Subscriber, error)
Publisher() (event.Publisher, error)
GetValue(key string) string
GetValues(key string) []string
}
@ -63,6 +64,10 @@ func (p *defaultProvider) Subscriber() (event.Subscriber, error) {
return event.NewSubscriber(p.ctx.String(HubURIFlag))
}
func (p *defaultProvider) Publisher() (event.Publisher, error) {
return event.NewPublisher(p.ctx.String(HubURIFlag))
}
func (p *defaultProvider) GetValue(key string) string {
return p.ctx.String(key)
}

Loading…
Cancel
Save