|
|
|
@ -4,17 +4,12 @@ import (
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"github.com/creekorful/trandoshan/api"
|
|
|
|
|
"github.com/creekorful/trandoshan/internal/configapi/client"
|
|
|
|
|
configapi "github.com/creekorful/trandoshan/internal/configapi/client"
|
|
|
|
|
"github.com/creekorful/trandoshan/internal/event"
|
|
|
|
|
"github.com/creekorful/trandoshan/internal/logging"
|
|
|
|
|
"github.com/creekorful/trandoshan/internal/util"
|
|
|
|
|
"github.com/creekorful/trandoshan/internal/process"
|
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
|
"github.com/urfave/cli/v2"
|
|
|
|
|
"net/url"
|
|
|
|
|
"os"
|
|
|
|
|
"os/signal"
|
|
|
|
|
"strings"
|
|
|
|
|
"syscall"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@ -26,78 +21,43 @@ var (
|
|
|
|
|
errHostnameNotAllowed = errors.New("hostname is not allowed")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// GetApp return the scheduler app
|
|
|
|
|
func GetApp() *cli.App {
|
|
|
|
|
return &cli.App{
|
|
|
|
|
Name: "tdsh-scheduler",
|
|
|
|
|
Version: "0.7.0",
|
|
|
|
|
Usage: "Trandoshan scheduler component",
|
|
|
|
|
Flags: []cli.Flag{
|
|
|
|
|
logging.GetLogFlag(),
|
|
|
|
|
util.GetHubURI(),
|
|
|
|
|
util.GetAPIURIFlag(),
|
|
|
|
|
util.GetAPITokenFlag(),
|
|
|
|
|
util.GetConfigAPIURIFlag(),
|
|
|
|
|
},
|
|
|
|
|
Action: execute,
|
|
|
|
|
}
|
|
|
|
|
type State struct {
|
|
|
|
|
apiClient api.API
|
|
|
|
|
configClient configapi.Client
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func execute(ctx *cli.Context) error {
|
|
|
|
|
logging.ConfigureLogger(ctx)
|
|
|
|
|
|
|
|
|
|
log.Info().
|
|
|
|
|
Str("ver", ctx.App.Version).
|
|
|
|
|
Str("hub-uri", ctx.String("hub-uri")).
|
|
|
|
|
Str("api-uri", ctx.String("api-uri")).
|
|
|
|
|
Str("config-api-uri", ctx.String("config-api-uri")).
|
|
|
|
|
Msg("Starting tdsh-scheduler")
|
|
|
|
|
func (state *State) Name() string {
|
|
|
|
|
return "scheduler"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create the API client
|
|
|
|
|
apiClient := util.GetAPIClient(ctx)
|
|
|
|
|
func (state *State) Flags() []string {
|
|
|
|
|
return []string{process.HubURIFlag, process.APIURIFlag, process.APITokenFlag, process.ConfigAPIURIFlag}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create the subscriber
|
|
|
|
|
sub, err := event.NewSubscriber(ctx.String("hub-uri"))
|
|
|
|
|
func (state *State) Provide(provider process.Provider) error {
|
|
|
|
|
apiClient, err := provider.APIClient()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer sub.Close()
|
|
|
|
|
state.apiClient = apiClient
|
|
|
|
|
|
|
|
|
|
// Create the ConfigAPI client
|
|
|
|
|
keys := []string{client.ForbiddenMimeTypesKey, client.ForbiddenHostnamesKey, client.RefreshDelayKey}
|
|
|
|
|
configClient, err := client.NewConfigClient(ctx.String("config-api-uri"), sub, keys)
|
|
|
|
|
keys := []string{configapi.ForbiddenMimeTypesKey, configapi.ForbiddenHostnamesKey, configapi.RefreshDelayKey}
|
|
|
|
|
configClient, err := provider.ConfigClient(keys)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Err(err).Msg("error while creating config client")
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
state := state{
|
|
|
|
|
apiClient: apiClient,
|
|
|
|
|
configClient: configClient,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := sub.Subscribe(event.FoundURLExchange, "schedulingQueue", state.handleURLFoundEvent); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Info().Msg("Successfully initialized tdsh-scheduler. Waiting for URLs")
|
|
|
|
|
|
|
|
|
|
// Handle graceful shutdown
|
|
|
|
|
c := make(chan os.Signal, 1)
|
|
|
|
|
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
|
|
|
|
|
|
// Block until we receive our signal.
|
|
|
|
|
<-c
|
|
|
|
|
state.configClient = configClient
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type state struct {
|
|
|
|
|
apiClient api.API
|
|
|
|
|
configClient client.Client
|
|
|
|
|
func (state *State) Subscribers() []process.SubscriberDef {
|
|
|
|
|
return []process.SubscriberDef{
|
|
|
|
|
{Exchange: event.FoundURLExchange, Queue: "schedulingQueue", Handler: state.handleURLFoundEvent},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (state *state) handleURLFoundEvent(subscriber event.Subscriber, msg event.RawMessage) error {
|
|
|
|
|
func (state *State) handleURLFoundEvent(subscriber event.Subscriber, msg event.RawMessage) error {
|
|
|
|
|
var evt event.FoundURLEvent
|
|
|
|
|
if err := subscriber.Read(&msg, &evt); err != nil {
|
|
|
|
|
return err
|
|
|
|
|