diff --git a/cmd/loop/main.go b/cmd/loop/main.go index abb1536..2baec3e 100644 --- a/cmd/loop/main.go +++ b/cmd/loop/main.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "io/ioutil" "os" "strconv" "time" @@ -14,11 +15,14 @@ import ( "github.com/lightninglabs/protobuf-hex-display/json" "github.com/lightninglabs/protobuf-hex-display/jsonpb" "github.com/lightninglabs/protobuf-hex-display/proto" + "github.com/lightningnetwork/lnd/macaroons" "github.com/btcsuite/btcutil" "github.com/urfave/cli" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "gopkg.in/macaroon.v2" ) var ( @@ -34,6 +38,21 @@ var ( // maxMsgRecvSize is the largest message our client will receive. We // set this to 200MiB atm. maxMsgRecvSize = grpc.MaxCallRecvMsgSize(1 * 1024 * 1024 * 200) + + // defaultMacaroonTimeout is the default macaroon timeout in seconds + // that we set when sending it over the line. + defaultMacaroonTimeout int64 = 60 + + tlsCertFlag = cli.StringFlag{ + Name: "tlscertpath", + Usage: "path to loop's TLS certificate, only needed if loop " + + "runs in the same process as lnd", + } + macaroonPathFlag = cli.StringFlag{ + Name: "macaroonpath", + Usage: "path to macaroon file, only needed if loop runs " + + "in the same process as lnd", + } ) func printJSON(resp interface{}) { @@ -84,6 +103,8 @@ func main() { Value: "localhost:11010", Usage: "loopd daemon address host:port", }, + tlsCertFlag, + macaroonPathFlag, } app.Commands = []cli.Command{ loopOutCommand, loopInCommand, termsCommand, @@ -99,7 +120,9 @@ func main() { func getClient(ctx *cli.Context) (looprpc.SwapClientClient, func(), error) { rpcServer := ctx.GlobalString("rpcserver") - conn, err := getClientConn(rpcServer) + tlsCertPath := ctx.GlobalString(tlsCertFlag.Name) + macaroonPath := ctx.GlobalString(macaroonPathFlag.Name) + conn, err := getClientConn(rpcServer, tlsCertPath, macaroonPath) if err != nil { return nil, nil, err } @@ -256,12 +279,36 @@ func logSwap(swap *looprpc.SwapStatus) { fmt.Println() } -func getClientConn(address string) (*grpc.ClientConn, error) { +func getClientConn(address, tlsCertPath, macaroonPath string) (*grpc.ClientConn, + error) { + opts := []grpc.DialOption{ - grpc.WithInsecure(), grpc.WithDefaultCallOptions(maxMsgRecvSize), } + switch { + // If a TLS certificate file is specified, we need to load it and build + // transport credentials with it. + case tlsCertPath != "": + creds, err := credentials.NewClientTLSFromFile(tlsCertPath, "") + if err != nil { + fatal(err) + } + + // Macaroons are only allowed to be transmitted over a TLS + // enabled connection. + if macaroonPath != "" { + opts = append(opts, readMacaroon(macaroonPath)) + } + + opts = append(opts, grpc.WithTransportCredentials(creds)) + + // By default, if no certificate is supplied, we assume the RPC server + // runs without TLS. + default: + opts = append(opts, grpc.WithInsecure()) + } + conn, err := grpc.Dial(address, opts...) if err != nil { return nil, fmt.Errorf("unable to connect to RPC server: %v", err) @@ -269,3 +316,42 @@ func getClientConn(address string) (*grpc.ClientConn, error) { return conn, nil } + +// readMacaroon tries to read the macaroon file at the specified path and create +// gRPC dial options from it. +func readMacaroon(macPath string) grpc.DialOption { + // Load the specified macaroon file. + macBytes, err := ioutil.ReadFile(macPath) + if err != nil { + fatal(fmt.Errorf("unable to read macaroon path : %v", err)) + } + + mac := &macaroon.Macaroon{} + if err = mac.UnmarshalBinary(macBytes); err != nil { + fatal(fmt.Errorf("unable to decode macaroon: %v", err)) + } + + macConstraints := []macaroons.Constraint{ + // We add a time-based constraint to prevent replay of the + // macaroon. It's good for 60 seconds by default to make up for + // any discrepancy between client and server clocks, but leaking + // the macaroon before it becomes invalid makes it possible for + // an attacker to reuse the macaroon. In addition, the validity + // time of the macaroon is extended by the time the server clock + // is behind the client clock, or shortened by the time the + // server clock is ahead of the client clock (or invalid + // altogether if, in the latter case, this time is more than 60 + // seconds). + macaroons.TimeoutConstraint(defaultMacaroonTimeout), + } + + // Apply constraints to the macaroon. + constrainedMac, err := macaroons.AddConstraints(mac, macConstraints...) + if err != nil { + fatal(err) + } + + // Now we append the macaroon credentials to the dial options. + cred := macaroons.NewMacaroonCredential(constrainedMac) + return grpc.WithPerRPCCredentials(cred) +} diff --git a/cmd/loopd/main.go b/cmd/loopd/main.go index 0310198..d256553 100644 --- a/cmd/loopd/main.go +++ b/cmd/loopd/main.go @@ -8,8 +8,8 @@ import ( func main() { cfg := loopd.RPCConfig{} - err := loopd.Start(cfg) + err := loopd.Run(cfg) if err != nil { - fmt.Println(err) + fmt.Printf("loopd exited with an error: %v\n", err) } } diff --git a/loopd/config.go b/loopd/config.go index 403cb6c..36fc459 100644 --- a/loopd/config.go +++ b/loopd/config.go @@ -28,8 +28,8 @@ type lndConfig struct { type viewParameters struct{} -type config struct { - ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"` +type Config struct { + ShowVersion bool `long:"version" description:"Display version information and exit"` Insecure bool `long:"insecure" description:"disable tls"` Network string `long:"network" description:"network to run on" choice:"regtest" choice:"testnet" choice:"mainnet" choice:"simnet"` SwapServer string `long:"swapserver" description:"swap server address host:port"` @@ -42,7 +42,7 @@ type config struct { MaxLogFiles int `long:"maxlogfiles" description:"Maximum logfiles to keep (0 for no rotation)"` MaxLogFileSize int `long:"maxlogfilesize" description:"Maximum logfile size in MB"` - DebugLevel string `short:"d" long:"debuglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify =,=,... to set the log level for individual subsystems -- Use show to list available subsystems"` + DebugLevel string `long:"debuglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify =,=,... to set the log level for individual subsystems -- Use show to list available subsystems"` MaxLSATCost uint32 `long:"maxlsatcost" description:"Maximum cost in satoshis that loopd is going to pay for an LSAT token automatically. Does not include routing fees."` MaxLSATFee uint32 `long:"maxlsatfee" description:"Maximum routing fee in satoshis that we are willing to pay while paying for an LSAT token."` @@ -59,19 +59,22 @@ const ( testnetServer = "test.swap.lightning.today:11010" ) -var defaultConfig = config{ - Network: "mainnet", - RPCListen: "localhost:11010", - RESTListen: "localhost:8081", - Insecure: false, - LogDir: defaultLogDir, - MaxLogFiles: defaultMaxLogFiles, - MaxLogFileSize: defaultMaxLogFileSize, - DebugLevel: defaultLogLevel, - MaxLSATCost: lsat.DefaultMaxCostSats, - MaxLSATFee: lsat.DefaultMaxRoutingFeeSats, - LoopOutMaxParts: defaultLoopOutMaxParts, - Lnd: &lndConfig{ - Host: "localhost:10009", - }, +// DefaultConfig returns all default values for the Config struct. +func DefaultConfig() Config { + return Config{ + Network: "mainnet", + RPCListen: "localhost:11010", + RESTListen: "localhost:8081", + Insecure: false, + LogDir: defaultLogDir, + MaxLogFiles: defaultMaxLogFiles, + MaxLogFileSize: defaultMaxLogFileSize, + DebugLevel: defaultLogLevel, + MaxLSATCost: lsat.DefaultMaxCostSats, + MaxLSATFee: lsat.DefaultMaxRoutingFeeSats, + LoopOutMaxParts: defaultLoopOutMaxParts, + Lnd: &lndConfig{ + Host: "localhost:10009", + }, + } } diff --git a/loopd/daemon.go b/loopd/daemon.go index 919e27c..2486ec0 100644 --- a/loopd/daemon.go +++ b/loopd/daemon.go @@ -6,11 +6,8 @@ import ( "fmt" "net" "net/http" - "os" - "os/signal" - "runtime/pprof" "sync" - "time" + "sync/atomic" proxy "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/lightninglabs/loop" @@ -24,6 +21,10 @@ var ( // maxMsgRecvSize is the largest message our REST proxy will receive. We // set this to 200MiB atm. maxMsgRecvSize = grpc.MaxCallRecvMsgSize(1 * 1024 * 1024 * 200) + + // errOnlyStartOnce is the error that is returned if the daemon is + // started more than once. + errOnlyStartOnce = fmt.Errorf("daemon can only be started once") ) // listenerCfg holds closures used to retrieve listeners for the gRPC services. @@ -38,72 +39,145 @@ type listenerCfg struct { getLnd func(string, *lndConfig) (*lndclient.GrpcLndServices, error) } -// daemon runs loopd in daemon mode. It will listen for grpc connections, -// execute commands and pass back swap status information. -func daemon(config *config, lisCfg *listenerCfg) error { - lnd, err := lisCfg.getLnd(config.Network, config.Lnd) - if err != nil { - return err - } - defer lnd.Close() +// Daemon is the struct that holds one instance of the loop client daemon. +type Daemon struct { + // To be used atomically. Declared first to optimize struct alignment. + started int32 + + // swapClientServer is the embedded RPC server that satisfies the client + // RPC interface. We embed this struct so the Daemon itself can be + // registered to an existing grpc.Server to run as a subserver in the + // same process. + swapClientServer + + // ErrChan is an error channel that users of the Daemon struct must use + // to detect runtime errors and also whether a shutdown is fully + // completed. + ErrChan chan error + + cfg *Config + listenerCfg *listenerCfg + internalErrChan chan error + + lnd *lndclient.GrpcLndServices + clientCleanup func() + + wg sync.WaitGroup + quit chan struct{} + stopOnce sync.Once + + mainCtx context.Context + mainCtxCancel func() + + grpcServer *grpc.Server + grpcListener net.Listener + restServer *http.Server + restListener net.Listener + restCtxCancel func() +} - // If no swap server is specified, use the default addresses for mainnet - // and testnet. - if config.SwapServer == "" { - // TODO(wilmer): Use onion service addresses when proxy is - // active. - switch config.Network { - case "mainnet": - config.SwapServer = mainnetServer - case "testnet": - config.SwapServer = testnetServer - default: - return errors.New("no swap server address specified") - } +// New creates a new instance of the loop client daemon. +func New(config *Config, lisCfg *listenerCfg) *Daemon { + return &Daemon{ + // We send exactly one error on this channel if something goes + // wrong at runtime. Or a nil value if the shutdown was + // successful. But in case nobody's listening, we don't want to + // block on it so we buffer it. + ErrChan: make(chan error, 1), + + quit: make(chan struct{}), + cfg: config, + listenerCfg: lisCfg, + + // We have 3 goroutines that could potentially send an error. + // We react on the first error but in case more than one exits + // with an error we don't want them to block. + internalErrChan: make(chan error, 3), } +} - log.Infof("Swap server address: %v", config.SwapServer) +// Start starts loopd in daemon mode. It will listen for grpc connections, +// execute commands and pass back swap status information. +func (d *Daemon) Start() error { + // There should be no reason to start the daemon twice. Therefore return + // an error if that's tried. This is mostly to guard against Start and + // StartAsSubserver both being called. + if atomic.AddInt32(&d.started, 1) != 1 { + return errOnlyStartOnce + } - // Create an instance of the loop client library. - swapClient, cleanup, err := getClient(config, &lnd.LndServices) + var err error + d.lnd, err = d.listenerCfg.getLnd(d.cfg.Network, d.cfg.Lnd) if err != nil { return err } - defer cleanup() - // Retrieve all currently existing swaps from the database. - swapsList, err := swapClient.FetchSwaps() + // With lnd connected, initialize everything else, such as the swap + // server client, the swap client RPC server instance and our main swap + // and error handlers. If this fails, then nothing has been started yet + // and we can just return the error. + err = d.initialize() if err != nil { return err } - swaps := make(map[lntypes.Hash]loop.SwapInfo) - for _, s := range swapsList { - swaps[s.SwapHash] = *s + // If we get here, we already have started several goroutines. So if + // anything goes wrong now, we need to cleanly shut down again. + startErr := d.startWebServers() + if startErr != nil { + log.Errorf("Error while starting daemon: %v", err) + d.Stop() + stopErr := <-d.ErrChan + if stopErr != nil { + log.Errorf("Error while stopping daemon: %v", stopErr) + } + return startErr } - // Instantiate the loopd gRPC server. - server := swapClientServer{ - impl: swapClient, - lnd: &lnd.LndServices, - swaps: swaps, - subscribers: make(map[int]chan<- interface{}), - statusChan: make(chan loop.SwapInfo), + return nil +} + +// StartAsSubserver is an alternative to Start where the RPC server does not +// create its own gRPC server but registers to an existing one. The same goes +// for REST (if enabled), instead of creating an own mux and HTTP server, we +// register to an existing one. +func (d *Daemon) StartAsSubserver(lndGrpc *lndclient.GrpcLndServices) error { + // There should be no reason to start the daemon twice. Therefore return + // an error if that's tried. This is mostly to guard against Start and + // StartAsSubserver both being called. + if atomic.AddInt32(&d.started, 1) != 1 { + return errOnlyStartOnce } + // When starting as a subserver, we get passed in an already established + // connection to lnd that might be shared among other subservers. + d.lnd = lndGrpc + + // With lnd already pre-connected, initialize everything else, such as + // the swap server client, the RPC server instance and our main swap + // handlers. If this fails, then nothing has been started yet and we can + // just return the error. + return d.initialize() +} + +// startWebServers starts the gRPC and REST servers in goroutines. +func (d *Daemon) startWebServers() error { + var err error + + // With our client created, let's now finish setting up and start our + // RPC server. serverOpts := []grpc.ServerOption{} - grpcServer := grpc.NewServer(serverOpts...) - looprpc.RegisterSwapClientServer(grpcServer, &server) + d.grpcServer = grpc.NewServer(serverOpts...) + looprpc.RegisterSwapClientServer(d.grpcServer, d) // Next, start the gRPC server listening for HTTP/2 connections. log.Infof("Starting gRPC listener") - grpcListener, err := lisCfg.grpcListener() + d.grpcListener, err = d.listenerCfg.grpcListener() if err != nil { - return fmt.Errorf("RPC server unable to listen on %s", - config.RPCListen) + return fmt.Errorf("RPC server unable to listen on %s: %v", + d.cfg.RPCListen, err) } - defer grpcListener.Close() // The default JSON marshaler of the REST proxy only sets OrigName to // true, which instructs it to use the same field names as specified in @@ -119,117 +193,238 @@ func daemon(config *config, lisCfg *listenerCfg) error { // We'll also create and start an accompanying proxy to serve clients // through REST. ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + d.restCtxCancel = cancel mux := proxy.NewServeMux(customMarshalerOption) var restHandler http.Handler = mux - if config.CORSOrigin != "" { - restHandler = allowCORS(restHandler, config.CORSOrigin) + if d.cfg.CORSOrigin != "" { + restHandler = allowCORS(restHandler, d.cfg.CORSOrigin) } proxyOpts := []grpc.DialOption{ grpc.WithInsecure(), grpc.WithDefaultCallOptions(maxMsgRecvSize), } err = looprpc.RegisterSwapClientHandlerFromEndpoint( - ctx, mux, config.RPCListen, proxyOpts, + ctx, mux, d.cfg.RPCListen, proxyOpts, ) if err != nil { return err } - restListener, err := lisCfg.restListener() + d.restListener, err = d.listenerCfg.restListener() if err != nil { - return fmt.Errorf("REST proxy unable to listen on %s", - config.RESTListen) + return fmt.Errorf("REST proxy unable to listen on %s: %v", + d.cfg.RESTListen, err) } // A nil listener indicates REST is disabled. - if restListener != nil { + if d.restListener != nil { log.Infof("Starting REST proxy listener") - defer restListener.Close() - proxy := &http.Server{Handler: restHandler} + d.restServer = &http.Server{Handler: restHandler} + d.wg.Add(1) go func() { - err := proxy.Serve(restListener) + defer d.wg.Done() + + log.Infof("REST proxy listening on %s", + d.restListener.Addr()) + err := d.restServer.Serve(d.restListener) // ErrServerClosed is always returned when the proxy is // shut down, so don't log it. if err != nil && err != http.ErrServerClosed { - log.Error(err) + // Notify the main error handler goroutine that + // we exited unexpectedly here. We don't have to + // worry about blocking as the internal error + // channel is sufficiently buffered. + d.internalErrChan <- err } }() } else { log.Infof("REST proxy disabled") } - mainCtx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup + // Start the grpc server. + d.wg.Add(1) + go func() { + defer d.wg.Done() + + log.Infof("RPC server listening on %s", d.grpcListener.Addr()) + err = d.grpcServer.Serve(d.grpcListener) + if err != nil && err != grpc.ErrServerStopped { + // Notify the main error handler goroutine that + // we exited unexpectedly here. We don't have to + // worry about blocking as the internal error + // channel is sufficiently buffered. + d.internalErrChan <- err + } + }() + + return nil +} + +// initialize creates and initializes an instance of the swap server client, +// the swap client RPC server instance and our main swap and error handlers. If +// this method fails with an error then no goroutine was started yet and no +// cleanup is necessary. If it succeeds, then goroutines have been spawned. +func (d *Daemon) initialize() error { + // If no swap server is specified, use the default addresses for mainnet + // and testnet. + if d.cfg.SwapServer == "" { + // TODO(wilmer): Use onion service addresses when proxy is + // active. + switch d.cfg.Network { + case "mainnet": + d.cfg.SwapServer = mainnetServer + case "testnet": + d.cfg.SwapServer = testnetServer + default: + return errors.New("no swap server address specified") + } + } + + log.Infof("Swap server address: %v", d.cfg.SwapServer) + + // Create an instance of the loop client library. + swapclient, clientCleanup, err := getClient(d.cfg, &d.lnd.LndServices) + if err != nil { + return err + } + d.clientCleanup = clientCleanup + + // Both the client RPC server and and the swap server client should + // stop on main context cancel. So we create it early and pass it down. + d.mainCtx, d.mainCtxCancel = context.WithCancel(context.Background()) + + // Now finally fully initialize the swap client RPC server instance. + d.swapClientServer = swapClientServer{ + impl: swapclient, + lnd: &d.lnd.LndServices, + swaps: make(map[lntypes.Hash]loop.SwapInfo), + subscribers: make(map[int]chan<- interface{}), + statusChan: make(chan loop.SwapInfo), + mainCtx: d.mainCtx, + } + + // Retrieve all currently existing swaps from the database. + swapsList, err := d.impl.FetchSwaps() + if err != nil { + // The client is the only thing we started yet, so if we clean + // up its connection now, nothing else needs to be shut down at + // this point. + clientCleanup() + return err + } + + for _, s := range swapsList { + d.swaps[s.SwapHash] = *s + } // Start the swap client itself. - wg.Add(1) + d.wg.Add(1) go func() { - defer wg.Done() + defer d.wg.Done() log.Infof("Starting swap client") - err := swapClient.Run(mainCtx, server.statusChan) + err := d.impl.Run(d.mainCtx, d.statusChan) if err != nil { - log.Error(err) + // Notify the main error handler goroutine that + // we exited unexpectedly here. We don't have to + // worry about blocking as the internal error + // channel is sufficiently buffered. + d.internalErrChan <- err } log.Infof("Swap client stopped") - - log.Infof("Stopping gRPC server") - grpcServer.Stop() - - cancel() }() // Start a goroutine that broadcasts swap updates to clients. - wg.Add(1) + d.wg.Add(1) go func() { - defer wg.Done() + defer d.wg.Done() log.Infof("Waiting for updates") - server.processStatusUpdates(mainCtx) + d.processStatusUpdates(d.mainCtx) }() - // Start the grpc server. - wg.Add(1) + // Last, start our internal error handler. This will return exactly one + // error or nil on the main error channel to inform the caller that + // something went wrong or that shutdown is complete. We don't add to + // the wait group here because this goroutine will itself wait for the + // stop to complete and signal its completion through the main error + // channel. go func() { - defer wg.Done() - - log.Infof("RPC server listening on %s", grpcListener.Addr()) - - if restListener != nil { - log.Infof("REST proxy listening on %s", restListener.Addr()) + var runtimeErr error + + // There are only two ways this goroutine can exit. Either there + // is an internal error or the caller requests shutdown. In both + // cases we wait for the stop to complete before we signal the + // caller that we're done. + select { + case runtimeErr = <-d.internalErrChan: + log.Errorf("Runtime error in daemon, shutting down: "+ + "%v", runtimeErr) + + case <-d.quit: } - err = grpcServer.Serve(grpcListener) - if err != nil { - log.Error(err) - } + // We need to shutdown before sending the error on the channel, + // otherwise a caller might exit the process too early. + d.stop() + log.Info("Daemon exited") + + // The caller expects exactly one message. So we send the error + // even if it's nil because we cleanly shut down. + d.ErrChan <- runtimeErr }() - interruptChannel := make(chan os.Signal, 1) - signal.Notify(interruptChannel, os.Interrupt) + return nil +} - // Run until the users terminates loopd or an error occurred. - select { - case <-interruptChannel: - log.Infof("Received SIGINT (Ctrl+C).") +// Stop tries to gracefully shut down the daemon. A caller needs to wait for a +// message on the main error channel indicating that the shutdown is completed. +func (d *Daemon) Stop() { + d.stopOnce.Do(func() { + close(d.quit) + }) +} - // TODO: Remove debug code. - // Debug code to dump goroutines on hanging exit. - go func() { - time.Sleep(5 * time.Second) - _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) - }() +// stop does the actual shutdown and blocks until all goroutines have exit. +func (d *Daemon) stop() { + // First of all, we can cancel the main context that all event handlers + // are using. This should stop all swap activity and all event handlers + // should exit. + if d.mainCtxCancel != nil { + d.mainCtxCancel() + } - cancel() - case <-mainCtx.Done(): + // As there is no swap activity anymore, we can forcefully shutdown the + // gRPC and HTTP servers now. + log.Infof("Stopping gRPC server") + if d.grpcServer != nil { + d.grpcServer.Stop() + } + log.Infof("Stopping REST server") + if d.restServer != nil { + // Don't return the error here, we first want to give everything + // else a chance to shut down cleanly. + err := d.restServer.Close() + if err != nil { + log.Errorf("Error stopping REST server: %v", err) + } + } + if d.restCtxCancel != nil { + d.restCtxCancel() } - wg.Wait() + // Next, shut down the connections to lnd and the swap server. + if d.lnd != nil { + d.lnd.Close() + } + if d.clientCleanup != nil { + d.clientCleanup() + } - return nil + // Everything should be shutting down now, wait for completion. + d.wg.Wait() } // allowCORS wraps the given http.Handler with a function that adds the diff --git a/loopd/start.go b/loopd/run.go similarity index 87% rename from loopd/start.go rename to loopd/run.go index 030f608..650bdaa 100644 --- a/loopd/start.go +++ b/loopd/run.go @@ -13,6 +13,7 @@ import ( "github.com/lightninglabs/loop/lndclient" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/lnrpc/verrpc" + "github.com/lightningnetwork/lnd/signal" ) const defaultConfigFilename = "loopd.conf" @@ -47,7 +48,7 @@ type RPCConfig struct { // newListenerCfg creates and returns a new listenerCfg from the passed config // and RPCConfig. -func newListenerCfg(config *config, rpcCfg RPCConfig) *listenerCfg { +func newListenerCfg(config *Config, rpcCfg RPCConfig) *listenerCfg { return &listenerCfg{ grpcListener: func() (net.Listener, error) { // If a custom RPC listener is set, we will listen on @@ -91,8 +92,9 @@ func newListenerCfg(config *config, rpcCfg RPCConfig) *listenerCfg { } } -func Start(rpcCfg RPCConfig) error { - config := defaultConfig +// Run starts the loop daemon and blocks until it's shut down again. +func Run(rpcCfg RPCConfig) error { + config := DefaultConfig() // Parse command line flags. parser := flags.NewParser(&config, flags.Default) @@ -168,7 +170,26 @@ func Start(rpcCfg RPCConfig) error { // Execute command. if parser.Active == nil { - return daemon(&config, lisCfg) + signal.Intercept() + + daemon := New(&config, lisCfg) + if err := daemon.Start(); err != nil { + return err + } + + select { + case <-signal.ShutdownChannel(): + log.Infof("Received SIGINT (Ctrl+C).") + daemon.Stop() + + // The above stop will return immediately. But we'll be + // notified on the error channel once the process is + // complete. + return <-daemon.ErrChan + + case err := <-daemon.ErrChan: + return err + } } if parser.Active.Name == "view" { diff --git a/loopd/swapclient_server.go b/loopd/swapclient_server.go index b2ada65..ad2be6f 100644 --- a/loopd/swapclient_server.go +++ b/loopd/swapclient_server.go @@ -39,6 +39,7 @@ type swapClientServer struct { statusChan chan loop.SwapInfo nextSubscriberID int swapsLock sync.Mutex + mainCtx context.Context } // LoopOut initiates an loop out swap with the given parameters. The call @@ -264,8 +265,14 @@ func (s *swapClientServer) Monitor(in *looprpc.MonitorRequest, if err := send(swap); err != nil { return err } + + // The client cancels the subscription. case <-server.Context().Done(): return nil + + // The server is shutting down. + case <-s.mainCtx.Done(): + return fmt.Errorf("server is shutting down") } } } diff --git a/loopd/utils.go b/loopd/utils.go index 0af6eab..0141693 100644 --- a/loopd/utils.go +++ b/loopd/utils.go @@ -10,7 +10,7 @@ import ( ) // getClient returns an instance of the swap client. -func getClient(config *config, lnd *lndclient.LndServices) (*loop.Client, +func getClient(config *Config, lnd *lndclient.LndServices) (*loop.Client, func(), error) { storeDir, err := getStoreDir(config.Network) diff --git a/loopd/view.go b/loopd/view.go index aa2d250..ff5084e 100644 --- a/loopd/view.go +++ b/loopd/view.go @@ -10,7 +10,7 @@ import ( ) // view prints all swaps currently in the database. -func view(config *config, lisCfg *listenerCfg) error { +func view(config *Config, lisCfg *listenerCfg) error { chainParams, err := swap.ChainParamsFromNetwork(config.Network) if err != nil { return err