|
|
|
@ -10,8 +10,10 @@ import (
|
|
|
|
|
"github.com/nats-io/nats.go"
|
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
|
"github.com/urfave/cli/v2"
|
|
|
|
|
"github.com/xhit/go-str2duration/v2"
|
|
|
|
|
"net/url"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// GetApp return the scheduler app
|
|
|
|
@ -32,6 +34,10 @@ func GetApp() *cli.App {
|
|
|
|
|
Usage: "URI to the API server",
|
|
|
|
|
Required: true,
|
|
|
|
|
},
|
|
|
|
|
&cli.StringFlag{
|
|
|
|
|
Name: "refresh-delay",
|
|
|
|
|
Usage: "Duration before allowing crawl of existing resource (none = never)",
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
Action: execute,
|
|
|
|
|
}
|
|
|
|
@ -58,14 +64,14 @@ func execute(ctx *cli.Context) error {
|
|
|
|
|
log.Info().Msg("Successfully initialized tdsh-scheduler. Waiting for URLs")
|
|
|
|
|
|
|
|
|
|
if err := sub.QueueSubscribe(messaging.URLFoundSubject, "schedulers",
|
|
|
|
|
handleMessage(apiClient)); err != nil {
|
|
|
|
|
handleMessage(apiClient, parseRefreshDelay(ctx.String("refresh-delay")))); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func handleMessage(apiClient api.Client) natsutil.MsgHandler {
|
|
|
|
|
func handleMessage(apiClient api.Client, refreshDelay time.Duration) natsutil.MsgHandler {
|
|
|
|
|
return func(nc *nats.Conn, msg *nats.Msg) error {
|
|
|
|
|
var urlMsg messaging.URLFoundMsg
|
|
|
|
|
if err := natsutil.ReadJSON(msg, &urlMsg); err != nil {
|
|
|
|
@ -86,8 +92,18 @@ func handleMessage(apiClient api.Client) natsutil.MsgHandler {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If we want to allow re-schedule of existing crawled resources we need to retrieve only resources
|
|
|
|
|
// that are newer than now-refreshDelay.
|
|
|
|
|
endDate := time.Time{}
|
|
|
|
|
if refreshDelay != -1 {
|
|
|
|
|
log.Debug().Stringer("delay", refreshDelay).Msg("Existing resources will be crawled again")
|
|
|
|
|
endDate = time.Now().Add(-refreshDelay)
|
|
|
|
|
} else {
|
|
|
|
|
log.Debug().Msg("Existing resources will NOT be crawled again")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
b64URI := base64.URLEncoding.EncodeToString([]byte(u.String()))
|
|
|
|
|
urls, err := apiClient.SearchResources(b64URI, "")
|
|
|
|
|
urls, err := apiClient.SearchResources(b64URI, "", time.Time{}, endDate)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Err(err).Msg("Error while searching URL")
|
|
|
|
|
return err
|
|
|
|
@ -106,3 +122,16 @@ func handleMessage(apiClient api.Client) natsutil.MsgHandler {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func parseRefreshDelay(delay string) time.Duration {
|
|
|
|
|
if delay == "" {
|
|
|
|
|
return -1
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val, err := str2duration.ParseDuration(delay)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return -1
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return val
|
|
|
|
|
}
|
|
|
|
|