commit
c21783d436
@ -1,83 +1,332 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"github.com/creekorful/trandoshan/api"
|
||||
"github.com/creekorful/trandoshan/internal/api/auth"
|
||||
"github.com/creekorful/trandoshan/internal/api/rest"
|
||||
"github.com/creekorful/trandoshan/internal/api/service"
|
||||
"github.com/creekorful/trandoshan/internal/logging"
|
||||
"github.com/creekorful/trandoshan/internal/util"
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/creekorful/trandoshan/internal/api/database"
|
||||
configapi "github.com/creekorful/trandoshan/internal/configapi/client"
|
||||
"github.com/creekorful/trandoshan/internal/event"
|
||||
"github.com/creekorful/trandoshan/internal/process"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/urfave/cli/v2"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// GetApp return the api app
|
||||
func GetApp() *cli.App {
|
||||
return &cli.App{
|
||||
Name: "tdsh-api",
|
||||
Version: "0.7.0",
|
||||
Usage: "Trandoshan API component",
|
||||
Flags: []cli.Flag{
|
||||
logging.GetLogFlag(),
|
||||
util.GetHubURI(),
|
||||
&cli.StringFlag{
|
||||
Name: "elasticsearch-uri",
|
||||
Usage: "URI to the Elasticsearch server",
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "signing-key",
|
||||
Usage: "Signing key for the JWT token",
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringSliceFlag{
|
||||
Name: "users",
|
||||
Usage: "List of API users. (Format user:password)",
|
||||
Required: false,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "refresh-delay",
|
||||
Usage: "Duration before allowing indexation of existing resource (none = never)",
|
||||
},
|
||||
},
|
||||
Action: execute,
|
||||
}
|
||||
var (
|
||||
defaultPaginationSize = 50
|
||||
maxPaginationSize = 100
|
||||
)
|
||||
|
||||
// State represent the application state
|
||||
type State struct {
|
||||
db database.Database
|
||||
pub event.Publisher
|
||||
configClient configapi.Client
|
||||
}
|
||||
|
||||
// Name return the process name
|
||||
func (state *State) Name() string {
|
||||
return "api"
|
||||
}
|
||||
|
||||
func execute(c *cli.Context) error {
|
||||
logging.ConfigureLogger(c)
|
||||
// CommonFlags return process common flags
|
||||
func (state *State) CommonFlags() []string {
|
||||
return []string{process.HubURIFlag, process.ConfigAPIURIFlag}
|
||||
}
|
||||
|
||||
e := echo.New()
|
||||
e.HTTPErrorHandler = func(err error, c echo.Context) {
|
||||
log.Err(err).Msg("error while processing API call")
|
||||
e.DefaultHTTPErrorHandler(err, c)
|
||||
// CustomFlags return process custom flags
|
||||
func (state *State) CustomFlags() []cli.Flag {
|
||||
return []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "elasticsearch-uri",
|
||||
Usage: "URI to the Elasticsearch server",
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "signing-key",
|
||||
Usage: "Signing key for the JWT token",
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringSliceFlag{
|
||||
Name: "users",
|
||||
Usage: "List of API users. (Format user:password)",
|
||||
Required: false,
|
||||
},
|
||||
}
|
||||
e.HideBanner = true
|
||||
}
|
||||
|
||||
log.Info().Str("ver", c.App.Version).
|
||||
Str("elasticsearch-uri", c.String("elasticsearch-uri")).
|
||||
Str("hub-uri", c.String("hub-uri")).
|
||||
Msg("Starting tdsh-api")
|
||||
// Initialize the process
|
||||
func (state *State) Initialize(provider process.Provider) error {
|
||||
db, err := database.NewElasticDB(provider.GetValue("elasticsearch-uri"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
state.db = db
|
||||
|
||||
signingKey := []byte(c.String("signing-key"))
|
||||
pub, err := provider.Subscriber()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
state.pub = pub
|
||||
|
||||
// Create the service
|
||||
svc, err := service.New(c)
|
||||
configClient, err := provider.ConfigClient([]string{configapi.RefreshDelayKey})
|
||||
if err != nil {
|
||||
log.Err(err).Msg("error while creating API service")
|
||||
return err
|
||||
}
|
||||
state.configClient = configClient
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribers return the process subscribers
|
||||
func (state *State) Subscribers() []process.SubscriberDef {
|
||||
return []process.SubscriberDef{}
|
||||
}
|
||||
|
||||
// Setup middlewares
|
||||
// HTTPHandler returns the HTTP API the process expose
|
||||
func (state *State) HTTPHandler(provider process.Provider) http.Handler {
|
||||
r := mux.NewRouter()
|
||||
|
||||
signingKey := []byte(provider.GetValue("signing-key"))
|
||||
authMiddleware := auth.NewMiddleware(signingKey)
|
||||
e.Use(authMiddleware.Middleware())
|
||||
r.Use(authMiddleware.Middleware())
|
||||
|
||||
r.HandleFunc("/v1/resources", state.searchResources).Methods(http.MethodGet)
|
||||
r.HandleFunc("/v1/resources", state.addResource).Methods(http.MethodPost)
|
||||
r.HandleFunc("/v1/urls", state.scheduleURL).Methods(http.MethodPost)
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func (state *State) searchResources(w http.ResponseWriter, r *http.Request) {
|
||||
searchParams, err := getSearchParams(r)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("error while getting search params")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
totalCount, err := state.db.CountResources(searchParams)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("error while counting on ES")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
res, err := state.db.SearchResources(searchParams)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("error while searching on ES")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
var resources []api.ResourceDto
|
||||
for _, r := range res {
|
||||
resources = append(resources, api.ResourceDto{
|
||||
URL: r.URL,
|
||||
Body: r.Body,
|
||||
Title: r.Title,
|
||||
Time: r.Time,
|
||||
})
|
||||
}
|
||||
|
||||
// Add endpoints
|
||||
e.GET("/v1/resources", rest.SearchResources(svc))
|
||||
e.POST("/v1/resources", rest.AddResource(svc))
|
||||
e.POST("/v1/urls", rest.ScheduleURL(svc))
|
||||
// Write pagination headers
|
||||
writePagination(w, searchParams, totalCount)
|
||||
|
||||
log.Info().Msg("Successfully initialized tdsh-api. Waiting for requests")
|
||||
// Write body
|
||||
writeJSON(w, resources)
|
||||
}
|
||||
|
||||
func (state *State) addResource(w http.ResponseWriter, r *http.Request) {
|
||||
var res api.ResourceDto
|
||||
if err := json.NewDecoder(r.Body).Decode(&res); err != nil {
|
||||
log.Warn().Str("err", err.Error()).Msg("error while decoding request body")
|
||||
w.WriteHeader(http.StatusUnprocessableEntity)
|
||||
return
|
||||
}
|
||||
|
||||
// Hacky stuff to prevent from adding 'duplicate resource'
|
||||
// the thing is: even with the scheduler preventing from crawling 'duplicates' URL by adding a refresh period
|
||||
// and checking if the resource is not already indexed, this implementation may not work if the URLs was published
|
||||
// before the resource is saved. And this happen a LOT of time.
|
||||
// therefore the best thing to do is to make the API check if the resource should **really** be added by checking if
|
||||
// it isn't present on the database. This may sounds hacky, but it's the best solution i've come up at this time.
|
||||
endDate := time.Time{}
|
||||
if refreshDelay, err := state.configClient.GetRefreshDelay(); err == nil {
|
||||
if refreshDelay.Delay != -1 {
|
||||
endDate = time.Now().Add(-refreshDelay.Delay)
|
||||
}
|
||||
}
|
||||
|
||||
count, err := state.db.CountResources(&api.ResSearchParams{
|
||||
URL: res.URL,
|
||||
EndDate: endDate,
|
||||
PageSize: 1,
|
||||
PageNumber: 1,
|
||||
})
|
||||
if err != nil {
|
||||
log.Err(err).Msg("error while searching for resource")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if count > 0 {
|
||||
// Not an error
|
||||
log.Debug().Str("url", res.URL).Msg("Skipping duplicate resource")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
|
||||
// Create Elasticsearch document
|
||||
doc := database.ResourceIdx{
|
||||
URL: res.URL,
|
||||
Body: res.Body,
|
||||
Time: res.Time,
|
||||
Title: res.Title,
|
||||
Meta: res.Meta,
|
||||
Description: res.Description,
|
||||
Headers: res.Headers,
|
||||
}
|
||||
|
||||
if err := state.db.AddResource(doc); err != nil {
|
||||
log.Err(err).Msg("Error while adding resource")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
log.Info().Str("url", res.URL).Msg("Successfully saved resource")
|
||||
|
||||
writeJSON(w, res)
|
||||
}
|
||||
|
||||
func (state *State) scheduleURL(w http.ResponseWriter, r *http.Request) {
|
||||
var url string
|
||||
if err := json.NewDecoder(r.Body).Decode(&url); err != nil {
|
||||
log.Warn().Str("err", err.Error()).Msg("error while decoding request body")
|
||||
w.WriteHeader(http.StatusUnprocessableEntity)
|
||||
return
|
||||
}
|
||||
|
||||
if err := state.pub.PublishEvent(&event.FoundURLEvent{URL: url}); err != nil {
|
||||
log.Err(err).Msg("unable to schedule URL")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
log.Info().Str("url", url).Msg("successfully scheduled URL")
|
||||
}
|
||||
|
||||
func getSearchParams(r *http.Request) (*api.ResSearchParams, error) {
|
||||
params := &api.ResSearchParams{}
|
||||
|
||||
if param := r.URL.Query()["keyword"]; len(param) == 1 {
|
||||
params.Keyword = param[0]
|
||||
}
|
||||
|
||||
if param := r.URL.Query()["with-body"]; len(param) == 1 {
|
||||
params.WithBody = param[0] == "true"
|
||||
}
|
||||
|
||||
// extract raw query params (unescaped to keep + sign when parsing date)
|
||||
rawQueryParams := getRawQueryParam(r.URL.RawQuery)
|
||||
|
||||
if val, exist := rawQueryParams["start-date"]; exist {
|
||||
d, err := time.Parse(time.RFC3339, val)
|
||||
if err == nil {
|
||||
params.StartDate = d
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if val, exist := rawQueryParams["end-date"]; exist {
|
||||
d, err := time.Parse(time.RFC3339, val)
|
||||
if err == nil {
|
||||
params.EndDate = d
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// base64decode the URL
|
||||
if param := r.URL.Query()["url"]; len(param) == 1 {
|
||||
b, err := base64.URLEncoding.DecodeString(param[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
params.URL = string(b)
|
||||
}
|
||||
|
||||
// Acquire pagination
|
||||
page, size := getPagination(r)
|
||||
params.PageNumber = page
|
||||
params.PageSize = size
|
||||
|
||||
return params, nil
|
||||
}
|
||||
|
||||
func writePagination(w http.ResponseWriter, searchParams *api.ResSearchParams, total int64) {
|
||||
w.Header().Set(api.PaginationPageHeader, strconv.Itoa(searchParams.PageNumber))
|
||||
w.Header().Set(api.PaginationSizeHeader, strconv.Itoa(searchParams.PageSize))
|
||||
w.Header().Set(api.PaginationCountHeader, strconv.FormatInt(total, 10))
|
||||
}
|
||||
|
||||
func getPagination(r *http.Request) (page int, size int) {
|
||||
page = 1
|
||||
size = defaultPaginationSize
|
||||
|
||||
// Get pagination page
|
||||
if param := r.URL.Query()[api.PaginationPageQueryParam]; len(param) == 1 {
|
||||
if val, err := strconv.Atoi(param[0]); err == nil {
|
||||
page = val
|
||||
}
|
||||
}
|
||||
|
||||
// Get pagination size
|
||||
if param := r.URL.Query()[api.PaginationSizeQueryParam]; len(param) == 1 {
|
||||
if val, err := strconv.Atoi(param[0]); err == nil {
|
||||
size = val
|
||||
}
|
||||
}
|
||||
|
||||
// Prevent too much results from being returned
|
||||
if size > maxPaginationSize {
|
||||
size = maxPaginationSize
|
||||
}
|
||||
|
||||
return page, size
|
||||
}
|
||||
|
||||
func getRawQueryParam(url string) map[string]string {
|
||||
if url == "" {
|
||||
return map[string]string{}
|
||||
}
|
||||
|
||||
val := map[string]string{}
|
||||
parts := strings.Split(url, "&")
|
||||
|
||||
for _, part := range parts {
|
||||
p := strings.Split(part, "=")
|
||||
val[p[0]] = p[1]
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
func writeJSON(w http.ResponseWriter, body interface{}) {
|
||||
b, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("error while serializing body")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
return e.Start(":8080")
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write(b)
|
||||
}
|
||||
|
@ -0,0 +1,369 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/creekorful/trandoshan/api"
|
||||
"github.com/creekorful/trandoshan/internal/api/database"
|
||||
"github.com/creekorful/trandoshan/internal/api/database_mock"
|
||||
"github.com/creekorful/trandoshan/internal/configapi/client"
|
||||
"github.com/creekorful/trandoshan/internal/configapi/client_mock"
|
||||
"github.com/creekorful/trandoshan/internal/event"
|
||||
"github.com/creekorful/trandoshan/internal/event_mock"
|
||||
"github.com/golang/mock/gomock"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestWritePagination(t *testing.T) {
|
||||
rec := httptest.NewRecorder()
|
||||
searchParams := &api.ResSearchParams{
|
||||
PageSize: 15,
|
||||
PageNumber: 7,
|
||||
}
|
||||
total := int64(1200)
|
||||
|
||||
writePagination(rec, searchParams, total)
|
||||
|
||||
if rec.Header().Get(api.PaginationPageHeader) != "7" {
|
||||
t.Fail()
|
||||
}
|
||||
if rec.Header().Get(api.PaginationSizeHeader) != "15" {
|
||||
t.Fail()
|
||||
}
|
||||
if rec.Header().Get(api.PaginationCountHeader) != "1200" {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadPagination(t *testing.T) {
|
||||
// valid params
|
||||
req := httptest.NewRequest(http.MethodGet, "/index.php?pagination-page=1&pagination-size=10", nil)
|
||||
if page, size := getPagination(req); page != 1 || size != 10 {
|
||||
t.Errorf("wanted page: 1, size: 10 (got %d, %d)", page, size)
|
||||
}
|
||||
|
||||
// make sure invalid parameter are set as wanted
|
||||
req = httptest.NewRequest(http.MethodGet, "/index.php?pagination-page=abcd&pagination-size=lol", nil)
|
||||
if page, size := getPagination(req); page != 1 || size != defaultPaginationSize {
|
||||
t.Errorf("wanted page: 1, size: %d (got %d, %d)", defaultPaginationSize, page, size)
|
||||
}
|
||||
|
||||
// make sure we prevent too much results from being returned
|
||||
target := fmt.Sprintf("/index.php?pagination-page=10&pagination-size=%d", maxPaginationSize+1)
|
||||
req = httptest.NewRequest(http.MethodGet, target, nil)
|
||||
if page, size := getPagination(req); page != 10 || size != maxPaginationSize {
|
||||
t.Errorf("wanted page: 10, size: %d (got %d, %d)", maxPaginationSize, page, size)
|
||||
}
|
||||
|
||||
// make sure no parameter we set to default
|
||||
req = httptest.NewRequest(http.MethodGet, "/index.php", nil)
|
||||
if page, size := getPagination(req); page != 1 || size != defaultPaginationSize {
|
||||
t.Errorf("wanted page: 1, size: %d (got %d, %d)", defaultPaginationSize, page, size)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetSearchParams(t *testing.T) {
|
||||
startDate := time.Now()
|
||||
target := fmt.Sprintf("/resources?with-body=true&pagination-page=1&keyword=keyword&url=dXJs&start-date=%s", startDate.Format(time.RFC3339))
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, target, nil)
|
||||
|
||||
params, err := getSearchParams(req)
|
||||
if err != nil {
|
||||
t.Errorf("error while parsing search params: %s", err)
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
if !params.WithBody {
|
||||
t.Errorf("wrong withBody: %v", params.WithBody)
|
||||
}
|
||||
if params.PageSize != 50 {
|
||||
t.Errorf("wrong pagination-size: %d", params.PageSize)
|
||||
}
|
||||
if params.PageNumber != 1 {
|
||||
t.Errorf("wrong pagination-page: %d", params.PageNumber)
|
||||
}
|
||||
if params.Keyword != "keyword" {
|
||||
t.Errorf("wrong keyword: %s", params.Keyword)
|
||||
}
|
||||
if params.StartDate.Year() != startDate.Year() {
|
||||
t.Errorf("wrong start-date (year)")
|
||||
}
|
||||
if params.StartDate.Month() != startDate.Month() {
|
||||
t.Errorf("wrong start-date (month)")
|
||||
}
|
||||
if params.StartDate.Day() != startDate.Day() {
|
||||
t.Errorf("wrong start-date (day)")
|
||||
}
|
||||
if params.StartDate.Hour() != startDate.Hour() {
|
||||
t.Errorf("wrong start-date (hour)")
|
||||
}
|
||||
if params.StartDate.Minute() != startDate.Minute() {
|
||||
t.Errorf("wrong start-date (minute)")
|
||||
}
|
||||
if params.StartDate.Second() != startDate.Second() {
|
||||
t.Errorf("wrong start-date (second)")
|
||||
}
|
||||
if params.URL != "url" {
|
||||
t.Errorf("wrong url: %s", params.URL)
|
||||
}
|
||||
}
|
||||
|
||||
func TestScheduleURL(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
// The requests
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/urls", strings.NewReader("\"https://google.onion\""))
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
// Mocking status
|
||||
pubMock := event_mock.NewMockPublisher(mockCtrl)
|
||||
|
||||
s := State{pub: pubMock}
|
||||
|
||||
pubMock.EXPECT().PublishEvent(&event.FoundURLEvent{URL: "https://google.onion"}).Return(nil)
|
||||
|
||||
s.scheduleURL(rec, req)
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddResource(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
body := api.ResourceDto{
|
||||
URL: "https://example.onion",
|
||||
Body: "TheBody",
|
||||
Title: "Example",
|
||||
Time: time.Time{},
|
||||
Meta: map[string]string{"content": "content-meta"},
|
||||
Description: "the description",
|
||||
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
|
||||
}
|
||||
bodyBytes, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// The requests
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/resources", bytes.NewReader(bodyBytes))
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
dbMock := database_mock.NewMockDatabase(mockCtrl)
|
||||
configClientMock := client_mock.NewMockClient(mockCtrl)
|
||||
|
||||
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
|
||||
URL: "https://example.onion",
|
||||
PageSize: 1,
|
||||
PageNumber: 1,
|
||||
}}).Return(int64(0), nil)
|
||||
|
||||
dbMock.EXPECT().AddResource(database.ResourceIdx{
|
||||
URL: "https://example.onion",
|
||||
Body: "TheBody",
|
||||
Title: "Example",
|
||||
Time: time.Time{},
|
||||
Meta: map[string]string{"content": "content-meta"},
|
||||
Description: "the description",
|
||||
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
|
||||
})
|
||||
|
||||
configClientMock.EXPECT().GetRefreshDelay().Return(client.RefreshDelay{Delay: 5 * time.Hour}, nil)
|
||||
|
||||
s := State{db: dbMock, configClient: configClientMock}
|
||||
|
||||
s.addResource(rec, req)
|
||||
if rec.Code != http.StatusOK {
|
||||
t.FailNow()
|
||||
}
|
||||
if rec.Header().Get("Content-Type") != "application/json" {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
var res api.ResourceDto
|
||||
if err := json.NewDecoder(rec.Body).Decode(&res); err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
if res.URL != "https://example.onion" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Body != "TheBody" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Title != "Example" {
|
||||
t.FailNow()
|
||||
}
|
||||
if !res.Time.IsZero() {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Meta["content"] != "content-meta" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Description != "the description" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Headers["Content-Type"] != "application/html" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Headers["Server"] != "Traefik" {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddResourceDuplicateNotAllowed(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
body := api.ResourceDto{
|
||||
URL: "https://example.onion",
|
||||
Body: "TheBody",
|
||||
Title: "Example",
|
||||
Time: time.Time{},
|
||||
Meta: map[string]string{"content": "content-meta"},
|
||||
Description: "the description",
|
||||
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
|
||||
}
|
||||
bodyBytes, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// The requests
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/resources", bytes.NewReader(bodyBytes))
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
dbMock := database_mock.NewMockDatabase(mockCtrl)
|
||||
configClientMock := client_mock.NewMockClient(mockCtrl)
|
||||
|
||||
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
|
||||
URL: "https://example.onion",
|
||||
PageSize: 1,
|
||||
PageNumber: 1,
|
||||
}, endDateZero: true}).Return(int64(1), nil)
|
||||
|
||||
configClientMock.EXPECT().GetRefreshDelay().Return(client.RefreshDelay{Delay: -1}, nil)
|
||||
|
||||
s := State{db: dbMock, configClient: configClientMock}
|
||||
|
||||
s.addResource(rec, req)
|
||||
if rec.Code != http.StatusOK {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddResourceTooYoung(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
body := api.ResourceDto{
|
||||
URL: "https://example.onion",
|
||||
Body: "TheBody",
|
||||
Title: "Example",
|
||||
Time: time.Time{},
|
||||
Meta: map[string]string{"content": "content-meta"},
|
||||
Description: "the description",
|
||||
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
|
||||
}
|
||||
bodyBytes, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
// The requests
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/resources", bytes.NewReader(bodyBytes))
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
dbMock := database_mock.NewMockDatabase(mockCtrl)
|
||||
configClientMock := client_mock.NewMockClient(mockCtrl)
|
||||
|
||||
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
|
||||
URL: "https://example.onion",
|
||||
EndDate: time.Now().Add(-10 * time.Minute),
|
||||
PageSize: 1,
|
||||
PageNumber: 1,
|
||||
}}).Return(int64(1), nil)
|
||||
|
||||
configClientMock.EXPECT().GetRefreshDelay().Return(client.RefreshDelay{Delay: 10 * time.Minute}, nil)
|
||||
|
||||
s := State{db: dbMock, configClient: configClientMock}
|
||||
|
||||
s.addResource(rec, req)
|
||||
if rec.Code != http.StatusOK {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func TestSearchResources(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
// The requests
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/resources?keyword=example", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
dbMock := database_mock.NewMockDatabase(mockCtrl)
|
||||
|
||||
dbMock.EXPECT().CountResources(gomock.Any()).Return(int64(150), nil)
|
||||
dbMock.EXPECT().SearchResources(gomock.Any()).Return([]database.ResourceIdx{
|
||||
{
|
||||
URL: "example-1.onion",
|
||||
Body: "Example 1",
|
||||
Title: "Example 1",
|
||||
Time: time.Time{},
|
||||
},
|
||||
{
|
||||
URL: "example-2.onion",
|
||||
Body: "Example 2",
|
||||
Title: "Example 2",
|
||||
Time: time.Time{},
|
||||
},
|
||||
}, nil)
|
||||
|
||||
s := State{db: dbMock}
|
||||
s.searchResources(rec, req)
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fail()
|
||||
}
|
||||
if rec.Header().Get("Content-Type") != "application/json" {
|
||||
t.Fail()
|
||||
}
|
||||
if rec.Header().Get(api.PaginationCountHeader) != "150" {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
var resources []api.ResourceDto
|
||||
if err := json.NewDecoder(rec.Body).Decode(&resources); err != nil {
|
||||
t.Fatalf("error while decoding body: %s", err)
|
||||
}
|
||||
if len(resources) != 2 {
|
||||
t.Errorf("got %d resources want 2", len(resources))
|
||||
}
|
||||
}
|
||||
|
||||
// custom matcher to ignore time field when doing comparison ;(
|
||||
// todo: do less crappy?
|
||||
type searchParamsMatcher struct {
|
||||
target api.ResSearchParams
|
||||
endDateZero bool
|
||||
}
|
||||
|
||||
func (sm *searchParamsMatcher) Matches(x interface{}) bool {
|
||||
arg := x.(*api.ResSearchParams)
|
||||
return arg.URL == sm.target.URL && arg.PageSize == sm.target.PageSize && arg.PageNumber == sm.target.PageNumber &&
|
||||
sm.endDateZero == arg.EndDate.IsZero()
|
||||
}
|
||||
|
||||
func (sm *searchParamsMatcher) String() string {
|
||||
return "is valid search params"
|
||||
}
|
@ -0,0 +1,148 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/creekorful/trandoshan/api"
|
||||
"github.com/olivere/elastic/v7"
|
||||
"github.com/rs/zerolog/log"
|
||||
"time"
|
||||
)
|
||||
|
||||
var resourcesIndex = "resources"
|
||||
|
||||
type elasticSearchDB struct {
|
||||
client *elastic.Client
|
||||
}
|
||||
|
||||
// NewElasticDB create a new Database based on ES instance
|
||||
func NewElasticDB(uri string) (Database, error) {
|
||||
// Create Elasticsearch client
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
ec, err := elastic.DialContext(ctx,
|
||||
elastic.SetURL(uri),
|
||||
elastic.SetSniff(false),
|
||||
elastic.SetHealthcheck(false),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := setupElasticSearch(ctx, ec); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &elasticSearchDB{
|
||||
client: ec,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *elasticSearchDB) SearchResources(params *api.ResSearchParams) ([]ResourceIdx, error) {
|
||||
q := buildSearchQuery(params)
|
||||
from := (params.PageNumber - 1) * params.PageSize
|
||||
|
||||
res, err := e.client.Search().
|
||||
Index(resourcesIndex).
|
||||
Query(q).
|
||||
From(from).
|
||||
Size(params.PageSize).
|
||||
Do(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var resources []ResourceIdx
|
||||
for _, hit := range res.Hits.Hits {
|
||||
var resource ResourceIdx
|
||||
if err := json.Unmarshal(hit.Source, &resource); err != nil {
|
||||
log.Warn().Str("err", err.Error()).Msg("Error while un-marshaling resource")
|
||||
continue
|
||||
}
|
||||
|
||||
// Remove body if not wanted
|
||||
if !params.WithBody {
|
||||
resource.Body = ""
|
||||
}
|
||||
|
||||
resources = append(resources, resource)
|
||||
}
|
||||
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
func (e *elasticSearchDB) CountResources(params *api.ResSearchParams) (int64, error) {
|
||||
q := buildSearchQuery(params)
|
||||
|
||||
count, err := e.client.Count(resourcesIndex).Query(q).Do(context.Background())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func (e *elasticSearchDB) AddResource(res ResourceIdx) error {
|
||||
_, err := e.client.Index().
|
||||
Index(resourcesIndex).
|
||||
BodyJson(res).
|
||||
Do(context.Background())
|
||||
return err
|
||||
}
|
||||
|
||||
func buildSearchQuery(params *api.ResSearchParams) elastic.Query {
|
||||
var queries []elastic.Query
|
||||
if params.URL != "" {
|
||||
log.Trace().Str("url", params.URL).Msg("SearchQuery: Setting url")
|
||||
queries = append(queries, elastic.NewTermQuery("url.keyword", params.URL))
|
||||
}
|
||||
if params.Keyword != "" {
|
||||
log.Trace().Str("body", params.Keyword).Msg("SearchQuery: Setting body")
|
||||
queries = append(queries, elastic.NewMatchQuery("body", params.Keyword))
|
||||
}
|
||||
if !params.StartDate.IsZero() || !params.EndDate.IsZero() {
|
||||
timeQuery := elastic.NewRangeQuery("time")
|
||||
|
||||
if !params.StartDate.IsZero() {
|
||||
log.Trace().
|
||||
Str("startDate", params.StartDate.Format(time.RFC3339)).
|
||||
Msg("SearchQuery: Setting startDate")
|
||||
timeQuery.Gte(params.StartDate.Format(time.RFC3339))
|
||||
}
|
||||
if !params.EndDate.IsZero() {
|
||||
log.Trace().
|
||||
Str("endDate", params.EndDate.Format(time.RFC3339)).
|
||||
Msg("SearchQuery: Setting endDate")
|
||||
timeQuery.Lte(params.EndDate.Format(time.RFC3339))
|
||||
}
|
||||
queries = append(queries, timeQuery)
|
||||
}
|
||||
|
||||
// Handle specific case
|
||||
if len(queries) == 0 {
|
||||
return elastic.NewMatchAllQuery()
|
||||
}
|
||||
if len(queries) == 1 {
|
||||
return queries[0]
|
||||
}
|
||||
|
||||
// otherwise AND combine them
|
||||
return elastic.NewBoolQuery().Must(queries...)
|
||||
}
|
||||
|
||||
func setupElasticSearch(ctx context.Context, es *elastic.Client) error {
|
||||
// Setup index if doesn't exist
|
||||
exist, err := es.IndexExists(resourcesIndex).Do(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exist {
|
||||
log.Debug().Str("index", resourcesIndex).Msg("Creating missing index")
|
||||
if _, err := es.CreateIndex(resourcesIndex).Do(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -1,150 +0,0 @@
|
||||
package rest
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"github.com/creekorful/trandoshan/api"
|
||||
"github.com/creekorful/trandoshan/internal/api/service"
|
||||
"github.com/labstack/echo/v4"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultPaginationSize = 50
|
||||
maxPaginationSize = 100
|
||||
)
|
||||
|
||||
// SearchResources allows to search resources
|
||||
func SearchResources(s *service.Service) echo.HandlerFunc {
|
||||
return func(c echo.Context) error {
|
||||
searchParams, err := newSearchParams(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resources, total, err := s.SearchResources(searchParams)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
writePagination(c, searchParams, total)
|
||||
|
||||
return c.JSON(http.StatusOK, resources)
|
||||
}
|
||||
}
|
||||
|
||||
// AddResource persist a new resource
|
||||
func AddResource(s *service.Service) echo.HandlerFunc {
|
||||
return func(c echo.Context) error {
|
||||
var res api.ResourceDto
|
||||
if err := c.Bind(&res); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := s.AddResource(res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.JSON(http.StatusCreated, res)
|
||||
}
|
||||
}
|
||||
|
||||
// ScheduleURL schedule given URL for crawling
|
||||
func ScheduleURL(s *service.Service) echo.HandlerFunc {
|
||||
return func(c echo.Context) error {
|
||||
var url string
|
||||
if err := c.Bind(&url); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.ScheduleURL(url)
|
||||
}
|
||||
}
|
||||
|
||||
func readPagination(c echo.Context) (int, int) {
|
||||
paginationPage, err := strconv.Atoi(c.QueryParam(api.PaginationPageQueryParam))
|
||||
if err != nil {
|
||||
paginationPage = 1
|
||||
}
|
||||
paginationSize, err := strconv.Atoi(c.QueryParam(api.PaginationSizeQueryParam))
|
||||
if err != nil {
|
||||
paginationSize = defaultPaginationSize
|
||||
}
|
||||
// Prevent too much results from being returned
|
||||
if paginationSize > maxPaginationSize {
|
||||
paginationSize = maxPaginationSize
|
||||
}
|
||||
|
||||
return paginationPage, paginationSize
|
||||
}
|
||||
|
||||
func writePagination(c echo.Context, s *api.ResSearchParams, totalCount int64) {
|
||||
c.Response().Header().Set(api.PaginationPageHeader, strconv.Itoa(s.PageNumber))
|
||||
c.Response().Header().Set(api.PaginationSizeHeader, strconv.Itoa(s.PageSize))
|
||||
c.Response().Header().Set(api.PaginationCountHeader, strconv.FormatInt(totalCount, 10))
|
||||
}
|
||||
|
||||
func newSearchParams(c echo.Context) (*api.ResSearchParams, error) {
|
||||
params := &api.ResSearchParams{}
|
||||
|
||||
params.Keyword = c.QueryParam("keyword")
|
||||
|
||||
if c.QueryParam("with-body") == "true" {
|
||||
params.WithBody = true
|
||||
}
|
||||
|
||||
// extract raw query params (unescaped to keep + sign when parsing date)
|
||||
rawQueryParams := getRawQueryParam(c.QueryString())
|
||||
|
||||
if val, exist := rawQueryParams["start-date"]; exist {
|
||||
d, err := time.Parse(time.RFC3339, val)
|
||||
if err == nil {
|
||||
params.StartDate = d
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if val, exist := rawQueryParams["end-date"]; exist {
|
||||
d, err := time.Parse(time.RFC3339, val)
|
||||
if err == nil {
|
||||
params.EndDate = d
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// First of all base64decode the URL
|
||||
b64URL := c.QueryParam("url")
|
||||
b, err := base64.URLEncoding.DecodeString(b64URL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
params.URL = string(b)
|
||||
|
||||
// Acquire pagination
|
||||
page, size := readPagination(c)
|
||||
params.PageNumber = page
|
||||
params.PageSize = size
|
||||
|
||||
return params, nil
|
||||
}
|
||||
|
||||
func getRawQueryParam(url string) map[string]string {
|
||||
if url == "" {
|
||||
return map[string]string{}
|
||||
}
|
||||
|
||||
val := map[string]string{}
|
||||
parts := strings.Split(url, "&")
|
||||
|
||||
for _, part := range parts {
|
||||
p := strings.Split(part, "=")
|
||||
val[p[0]] = p[1]
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
@ -1,62 +0,0 @@
|
||||
package rest
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/labstack/echo/v4"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestNewSearchParams(t *testing.T) {
|
||||
e := echo.New()
|
||||
|
||||
startDate := time.Now()
|
||||
target := fmt.Sprintf("/resources?with-body=true&pagination-page=1&keyword=keyword&url=dXJs&start-date=%s", startDate.Format(time.RFC3339))
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, target, nil)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
c := e.NewContext(req, rec)
|
||||
|
||||
params, err := newSearchParams(c)
|
||||
if err != nil {
|
||||
t.Errorf("error while parsing search params: %s", err)
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
if !params.WithBody {
|
||||
t.Errorf("wrong withBody: %v", params.WithBody)
|
||||
}
|
||||
if params.PageSize != 50 {
|
||||
t.Errorf("wrong pagination-size: %d", params.PageSize)
|
||||
}
|
||||
if params.PageNumber != 1 {
|
||||
t.Errorf("wrong pagination-page: %d", params.PageNumber)
|
||||
}
|
||||
if params.Keyword != "keyword" {
|
||||
t.Errorf("wrong keyword: %s", params.Keyword)
|
||||
}
|
||||
if params.StartDate.Year() != startDate.Year() {
|
||||
t.Errorf("wrong start-date (year)")
|
||||
}
|
||||
if params.StartDate.Month() != startDate.Month() {
|
||||
t.Errorf("wrong start-date (month)")
|
||||
}
|
||||
if params.StartDate.Day() != startDate.Day() {
|
||||
t.Errorf("wrong start-date (day)")
|
||||
}
|
||||
if params.StartDate.Hour() != startDate.Hour() {
|
||||
t.Errorf("wrong start-date (hour)")
|
||||
}
|
||||
if params.StartDate.Minute() != startDate.Minute() {
|
||||
t.Errorf("wrong start-date (minute)")
|
||||
}
|
||||
if params.StartDate.Second() != startDate.Second() {
|
||||
t.Errorf("wrong start-date (second)")
|
||||
}
|
||||
if params.URL != "url" {
|
||||
t.Errorf("wrong url: %s", params.URL)
|
||||
}
|
||||
}
|
@ -1,136 +0,0 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/creekorful/trandoshan/api"
|
||||
"github.com/creekorful/trandoshan/internal/api/database"
|
||||
"github.com/creekorful/trandoshan/internal/duration"
|
||||
"github.com/creekorful/trandoshan/internal/event"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/urfave/cli/v2"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Service represent the functionality the API expose
|
||||
type Service struct {
|
||||
db database.Database
|
||||
pub event.Publisher
|
||||
refreshDelay time.Duration
|
||||
}
|
||||
|
||||
// New create a new Service instance
|
||||
func New(c *cli.Context) (*Service, error) {
|
||||
// Connect to the messaging server
|
||||
pub, err := event.NewPublisher(c.String("hub-uri"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error while connecting to hub server: %s", err)
|
||||
}
|
||||
|
||||
// Create Elasticsearch client
|
||||
db, err := database.NewElasticDB(c.String("elasticsearch-uri"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error while connecting to the database: %s", err)
|
||||
}
|
||||
|
||||
refreshDelay := duration.ParseDuration(c.String("refresh-delay"))
|
||||
|
||||
return &Service{
|
||||
db: db,
|
||||
pub: pub,
|
||||
refreshDelay: refreshDelay,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SearchResources allows to search resources using given params
|
||||
func (s *Service) SearchResources(params *api.ResSearchParams) ([]api.ResourceDto, int64, error) {
|
||||
totalCount, err := s.db.CountResources(params)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Error while counting on ES")
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
res, err := s.db.SearchResources(params)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Error while searching on ES")
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
var resources []api.ResourceDto
|
||||
for _, r := range res {
|
||||
resources = append(resources, api.ResourceDto{
|
||||
URL: r.URL,
|
||||
Body: r.Body,
|
||||
Title: r.Title,
|
||||
Time: r.Time,
|
||||
})
|
||||
}
|
||||
|
||||
return resources, totalCount, nil
|
||||
}
|
||||
|
||||
// AddResource allows to add given resource
|
||||
func (s *Service) AddResource(res api.ResourceDto) (api.ResourceDto, error) {
|
||||
// Hacky stuff to prevent from adding 'duplicate resource'
|
||||
// the thing is: even with the scheduler preventing from crawling 'duplicates' URL by adding a refresh period
|
||||
// and checking if the resource is not already indexed, this implementation may not work if the URLs was published
|
||||
// before the resource is saved. And this happen a LOT of time.
|
||||
// therefore the best thing to do is to make the API check if the resource should **really** be added by checking if
|
||||
// it isn't present on the database. This may sounds hacky, but it's the best solution i've come up at this time.
|
||||
endDate := time.Time{}
|
||||
if s.refreshDelay != -1 {
|
||||
endDate = time.Now().Add(-s.refreshDelay)
|
||||
}
|
||||
|
||||
count, err := s.db.CountResources(&api.ResSearchParams{
|
||||
URL: res.URL,
|
||||
EndDate: endDate,
|
||||
PageSize: 1,
|
||||
PageNumber: 1,
|
||||
})
|
||||
if err != nil {
|
||||
log.Err(err).Msg("error while searching for resource")
|
||||
return api.ResourceDto{}, nil
|
||||
}
|
||||
|
||||
if count > 0 {
|
||||
// Not an error
|
||||
log.Debug().Str("url", res.URL).Msg("Skipping duplicate resource")
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// Create Elasticsearch document
|
||||
doc := database.ResourceIdx{
|
||||
URL: res.URL,
|
||||
Body: res.Body,
|
||||
Time: res.Time,
|
||||
Title: res.Title,
|
||||
Meta: res.Meta,
|
||||
Description: res.Description,
|
||||
Headers: res.Headers,
|
||||
}
|
||||
|
||||
if err := s.db.AddResource(doc); err != nil {
|
||||
log.Err(err).Msg("Error while adding resource")
|
||||
return api.ResourceDto{}, err
|
||||
}
|
||||
|
||||
log.Debug().Str("url", res.URL).Msg("Successfully saved resource")
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// ScheduleURL schedule given url for crawling
|
||||
func (s *Service) ScheduleURL(url string) error {
|
||||
// Publish the URL
|
||||
if err := s.pub.PublishEvent(&event.FoundURLEvent{URL: url}); err != nil {
|
||||
log.Err(err).Msg("Unable to publish URL")
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug().Str("url", url).Msg("Successfully published URL")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close disconnect the service consumer
|
||||
func (s *Service) Close() {
|
||||
s.pub.Close()
|
||||
}
|
@ -1,202 +0,0 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"github.com/creekorful/trandoshan/api"
|
||||
"github.com/creekorful/trandoshan/internal/api/database"
|
||||
"github.com/creekorful/trandoshan/internal/api/database_mock"
|
||||
"github.com/creekorful/trandoshan/internal/event"
|
||||
"github.com/creekorful/trandoshan/internal/event_mock"
|
||||
"github.com/golang/mock/gomock"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestSearchResources(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
params := &api.ResSearchParams{Keyword: "example"}
|
||||
|
||||
dbMock := database_mock.NewMockDatabase(mockCtrl)
|
||||
|
||||
dbMock.EXPECT().CountResources(params).Return(int64(150), nil)
|
||||
dbMock.EXPECT().SearchResources(params).Return([]database.ResourceIdx{
|
||||
{
|
||||
URL: "example-1.onion",
|
||||
Body: "Example 1",
|
||||
Title: "Example 1",
|
||||
Time: time.Time{},
|
||||
},
|
||||
{
|
||||
URL: "example-2.onion",
|
||||
Body: "Example 2",
|
||||
Title: "Example 2",
|
||||
Time: time.Time{},
|
||||
},
|
||||
}, nil)
|
||||
|
||||
s := Service{db: dbMock}
|
||||
|
||||
res, count, err := s.SearchResources(params)
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
if count != 150 {
|
||||
t.Error()
|
||||
}
|
||||
if len(res) != 2 {
|
||||
t.Error()
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddResource(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
dbMock := database_mock.NewMockDatabase(mockCtrl)
|
||||
|
||||
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
|
||||
URL: "https://example.onion",
|
||||
PageSize: 1,
|
||||
PageNumber: 1,
|
||||
}}).Return(int64(0), nil)
|
||||
|
||||
dbMock.EXPECT().AddResource(database.ResourceIdx{
|
||||
URL: "https://example.onion",
|
||||
Body: "TheBody",
|
||||
Title: "Example",
|
||||
Time: time.Time{},
|
||||
Meta: map[string]string{"content": "content-meta"},
|
||||
Description: "the description",
|
||||
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
|
||||
})
|
||||
|
||||
s := Service{db: dbMock, refreshDelay: 5 * time.Hour}
|
||||
|
||||
res, err := s.AddResource(api.ResourceDto{
|
||||
URL: "https://example.onion",
|
||||
Body: "TheBody",
|
||||
Title: "Example",
|
||||
Time: time.Time{},
|
||||
Meta: map[string]string{"content": "content-meta"},
|
||||
Description: "the description",
|
||||
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
|
||||
})
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
if res.URL != "https://example.onion" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Body != "TheBody" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Title != "Example" {
|
||||
t.FailNow()
|
||||
}
|
||||
if !res.Time.IsZero() {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Meta["content"] != "content-meta" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Description != "the description" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Headers["Content-Type"] != "application/html" {
|
||||
t.FailNow()
|
||||
}
|
||||
if res.Headers["Server"] != "Traefik" {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddResourceDuplicateNotAllowed(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
dbMock := database_mock.NewMockDatabase(mockCtrl)
|
||||
|
||||
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
|
||||
URL: "https://example.onion",
|
||||
PageSize: 1,
|
||||
PageNumber: 1,
|
||||
}, endDateZero: true}).Return(int64(1), nil)
|
||||
|
||||
s := Service{db: dbMock, refreshDelay: -1}
|
||||
|
||||
_, err := s.AddResource(api.ResourceDto{
|
||||
URL: "https://example.onion",
|
||||
Body: "TheBody",
|
||||
Title: "Example",
|
||||
Time: time.Time{},
|
||||
Meta: map[string]string{"content": "content-meta"},
|
||||
Description: "the description",
|
||||
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
|
||||
})
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddResourceTooYoung(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
dbMock := database_mock.NewMockDatabase(mockCtrl)
|
||||
|
||||
dbMock.EXPECT().CountResources(&searchParamsMatcher{target: api.ResSearchParams{
|
||||
URL: "https://example.onion",
|
||||
EndDate: time.Now().Add(-10 * time.Minute),
|
||||
PageSize: 1,
|
||||
PageNumber: 1,
|
||||
}}).Return(int64(1), nil)
|
||||
|
||||
s := Service{db: dbMock, refreshDelay: -10 * time.Minute}
|
||||
|
||||
_, err := s.AddResource(api.ResourceDto{
|
||||
URL: "https://example.onion",
|
||||
Body: "TheBody",
|
||||
Title: "Example",
|
||||
Time: time.Time{},
|
||||
Meta: map[string]string{"content": "content-meta"},
|
||||
Description: "the description",
|
||||
Headers: map[string]string{"Content-Type": "application/html", "Server": "Traefik"},
|
||||
})
|
||||
if err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func TestScheduleURL(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
pubMock := event_mock.NewMockPublisher(mockCtrl)
|
||||
|
||||
s := Service{pub: pubMock}
|
||||
|
||||
pubMock.EXPECT().PublishEvent(&event.FoundURLEvent{URL: "https://example.onion"})
|
||||
|
||||
if err := s.ScheduleURL("https://example.onion"); err != nil {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
// custom matcher to ignore time field when doing comparison ;(
|
||||
// todo: do less crappy?
|
||||
type searchParamsMatcher struct {
|
||||
target api.ResSearchParams
|
||||
endDateZero bool
|
||||
}
|
||||
|
||||
func (sm *searchParamsMatcher) Matches(x interface{}) bool {
|
||||
arg := x.(*api.ResSearchParams)
|
||||
return arg.URL == sm.target.URL && arg.PageSize == sm.target.PageSize && arg.PageNumber == sm.target.PageNumber &&
|
||||
sm.endDateZero == arg.EndDate.IsZero()
|
||||
}
|
||||
|
||||
func (sm *searchParamsMatcher) String() string {
|
||||
return "is valid search params"
|
||||
}
|
@ -0,0 +1,229 @@
|
||||
package process
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/creekorful/trandoshan/api"
|
||||
"github.com/creekorful/trandoshan/internal/clock"
|
||||
configapi "github.com/creekorful/trandoshan/internal/configapi/client"
|
||||
"github.com/creekorful/trandoshan/internal/event"
|
||||
"github.com/creekorful/trandoshan/internal/logging"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/urfave/cli/v2"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
version = "0.7.0"
|
||||
// APIURIFlag is the api-uri flag
|
||||
APIURIFlag = "api-uri"
|
||||
// APITokenFlag is the api-token flag
|
||||
APITokenFlag = "api-token"
|
||||
// HubURIFlag is the hub-uri flag
|
||||
HubURIFlag = "hub-uri"
|
||||
// ConfigAPIURIFlag is the config-api-uri flag
|
||||
ConfigAPIURIFlag = "config-api-uri"
|
||||
)
|
||||
|
||||
// Provider is the implementation provider
|
||||
type Provider interface {
|
||||
// Clock return a clock implementation
|
||||
Clock() (clock.Clock, error)
|
||||
// ConfigClient return a new configured configapi.Client
|
||||
ConfigClient(keys []string) (configapi.Client, error)
|
||||
// APIClient return a new configured api.API (client)
|
||||
APIClient() (api.API, error)
|
||||
// Subscriber return a new configured subscriber
|
||||
Subscriber() (event.Subscriber, error)
|
||||
// Publisher return a new configured publisher
|
||||
Publisher() (event.Publisher, error)
|
||||
// GetValue return value for given key
|
||||
GetValue(key string) string
|
||||
// GetValue return values for given key
|
||||
GetValues(key string) []string
|
||||
}
|
||||
|
||||
type defaultProvider struct {
|
||||
ctx *cli.Context
|
||||
}
|
||||
|
||||
// NewDefaultProvider create a brand new default provider using given cli.Context
|
||||
func NewDefaultProvider(ctx *cli.Context) Provider {
|
||||
return &defaultProvider{ctx: ctx}
|
||||
}
|
||||
|
||||
func (p *defaultProvider) Clock() (clock.Clock, error) {
|
||||
return &clock.SystemClock{}, nil
|
||||
}
|
||||
|
||||
func (p *defaultProvider) ConfigClient(keys []string) (configapi.Client, error) {
|
||||
sub, err := p.Subscriber()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return configapi.NewConfigClient(p.ctx.String(ConfigAPIURIFlag), sub, keys)
|
||||
}
|
||||
|
||||
func (p *defaultProvider) APIClient() (api.API, error) {
|
||||
return api.NewClient(p.ctx.String(APIURIFlag), p.ctx.String(APITokenFlag)), nil
|
||||
}
|
||||
|
||||
func (p *defaultProvider) Subscriber() (event.Subscriber, error) {
|
||||
return event.NewSubscriber(p.ctx.String(HubURIFlag))
|
||||
}
|
||||
|
||||
func (p *defaultProvider) Publisher() (event.Publisher, error) {
|
||||
return event.NewPublisher(p.ctx.String(HubURIFlag))
|
||||
}
|
||||
|
||||
func (p *defaultProvider) GetValue(key string) string {
|
||||
return p.ctx.String(key)
|
||||
}
|
||||
|
||||
func (p *defaultProvider) GetValues(key string) []string {
|
||||
return p.ctx.StringSlice(key)
|
||||
}
|
||||
|
||||
// SubscriberDef is the subscriber definition
|
||||
type SubscriberDef struct {
|
||||
Exchange string
|
||||
Queue string
|
||||
Handler event.Handler
|
||||
}
|
||||
|
||||
// Process is a component of Trandoshan
|
||||
type Process interface {
|
||||
Name() string
|
||||
CommonFlags() []string
|
||||
CustomFlags() []cli.Flag
|
||||
Initialize(provider Provider) error
|
||||
Subscribers() []SubscriberDef
|
||||
HTTPHandler(provider Provider) http.Handler
|
||||
}
|
||||
|
||||
// MakeApp return cli.App corresponding for given Process
|
||||
func MakeApp(process Process) *cli.App {
|
||||
app := &cli.App{
|
||||
Name: fmt.Sprintf("tdsh-%s", process.Name()),
|
||||
Version: version,
|
||||
Usage: fmt.Sprintf("Trandoshan %s component", process.Name()),
|
||||
Flags: []cli.Flag{
|
||||
logging.GetLogFlag(),
|
||||
},
|
||||
Action: execute(process),
|
||||
}
|
||||
|
||||
// Add common flags
|
||||
flags := getCustomFlags()
|
||||
for _, flag := range process.CommonFlags() {
|
||||
if customFlag, contains := flags[flag]; contains {
|
||||
app.Flags = append(app.Flags, customFlag)
|
||||
}
|
||||
}
|
||||
|
||||
// Add custom flags
|
||||
for _, flag := range process.CustomFlags() {
|
||||
app.Flags = append(app.Flags, flag)
|
||||
}
|
||||
|
||||
return app
|
||||
}
|
||||
|
||||
func execute(process Process) cli.ActionFunc {
|
||||
return func(c *cli.Context) error {
|
||||
provider := NewDefaultProvider(c)
|
||||
|
||||
// Common setup
|
||||
logging.ConfigureLogger(c)
|
||||
|
||||
// Custom setup
|
||||
if err := process.Initialize(provider); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create subscribers if any
|
||||
if len(process.Subscribers()) > 0 {
|
||||
sub, err := provider.Subscriber()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// TODO sub.Close()
|
||||
|
||||
for _, subscriberDef := range process.Subscribers() {
|
||||
if err := sub.Subscribe(subscriberDef.Exchange, subscriberDef.Queue, subscriberDef.Handler); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var srv *http.Server
|
||||
|
||||
// Expose HTTP API if any
|
||||
if h := process.HTTPHandler(provider); h != nil {
|
||||
srv = &http.Server{
|
||||
Addr: "0.0.0.0:8080",
|
||||
// Good practice to set timeouts to avoid Slowloris attacks.
|
||||
WriteTimeout: time.Second * 15,
|
||||
ReadTimeout: time.Second * 15,
|
||||
IdleTimeout: time.Second * 60,
|
||||
Handler: h, // Pass our instance of gorilla/mux in.
|
||||
}
|
||||
|
||||
go func() {
|
||||
_ = srv.ListenAndServe()
|
||||
}()
|
||||
}
|
||||
|
||||
log.Info().
|
||||
Str("ver", c.App.Version).
|
||||
Msg(fmt.Sprintf("Started %s", c.App.Name))
|
||||
|
||||
// Handle graceful shutdown
|
||||
ch := make(chan os.Signal, 1)
|
||||
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// Block until we receive our signal.
|
||||
<-ch
|
||||
|
||||
// Close HTTP API if any
|
||||
if srv != nil {
|
||||
_ = srv.Shutdown(context.Background())
|
||||
}
|
||||
|
||||
// Connections are deferred here
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func getCustomFlags() map[string]cli.Flag {
|
||||
flags := map[string]cli.Flag{}
|
||||
|
||||
flags[HubURIFlag] = &cli.StringFlag{
|
||||
Name: HubURIFlag,
|
||||
Usage: "URI to the hub (event) server",
|
||||
Required: true,
|
||||
}
|
||||
flags[APIURIFlag] = &cli.StringFlag{
|
||||
Name: APIURIFlag,
|
||||
Usage: "URI to the API server",
|
||||
Required: true,
|
||||
}
|
||||
flags[APITokenFlag] = &cli.StringFlag{
|
||||
Name: APITokenFlag,
|
||||
Usage: "Token to use to authenticate against the API",
|
||||
Required: true,
|
||||
}
|
||||
flags[ConfigAPIURIFlag] = &cli.StringFlag{
|
||||
Name: ConfigAPIURIFlag,
|
||||
Usage: "URI to the ConfigAPI server",
|
||||
Required: true,
|
||||
}
|
||||
|
||||
return flags
|
||||
}
|
@ -1,12 +0,0 @@
|
||||
package util
|
||||
|
||||
import "github.com/urfave/cli/v2"
|
||||
|
||||
// GetHubURI return the URI of the hub (event) server
|
||||
func GetHubURI() *cli.StringFlag {
|
||||
return &cli.StringFlag{
|
||||
Name: "hub-uri",
|
||||
Usage: "URI to the hub (event) server",
|
||||
Required: true,
|
||||
}
|
||||
}
|
@ -1,29 +0,0 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"github.com/creekorful/trandoshan/api"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
// GetAPITokenFlag return the cli flag to provide API token
|
||||
func GetAPITokenFlag() *cli.StringFlag {
|
||||
return &cli.StringFlag{
|
||||
Name: "api-token",
|
||||
Usage: "Token to use to authenticate against the API",
|
||||
Required: true,
|
||||
}
|
||||
}
|
||||
|
||||
// GetAPIURIFlag return the cli flag to set api uri
|
||||
func GetAPIURIFlag() *cli.StringFlag {
|
||||
return &cli.StringFlag{
|
||||
Name: "api-uri",
|
||||
Usage: "URI to the API server",
|
||||
Required: true,
|
||||
}
|
||||
}
|
||||
|
||||
// GetAPIClient return a new configured API client
|
||||
func GetAPIClient(c *cli.Context) api.API {
|
||||
return api.NewClient(c.String("api-uri"), c.String("api-token"))
|
||||
}
|
@ -1,12 +0,0 @@
|
||||
package util
|
||||
|
||||
import "github.com/urfave/cli/v2"
|
||||
|
||||
// GetConfigAPIURIFlag return the cli flag to set config api uri
|
||||
func GetConfigAPIURIFlag() *cli.StringFlag {
|
||||
return &cli.StringFlag{
|
||||
Name: "config-api-uri",
|
||||
Usage: "URI to the ConfigAPI server",
|
||||
Required: true,
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue