Merge pull request #202 from guggero/grub

loopd+config: prepare to be used as a library
pull/214/head
Oliver Gugger 4 years ago committed by GitHub
commit c281cab8a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"os" "os"
"strconv" "strconv"
"time" "time"
@ -14,11 +15,14 @@ import (
"github.com/lightninglabs/protobuf-hex-display/json" "github.com/lightninglabs/protobuf-hex-display/json"
"github.com/lightninglabs/protobuf-hex-display/jsonpb" "github.com/lightninglabs/protobuf-hex-display/jsonpb"
"github.com/lightninglabs/protobuf-hex-display/proto" "github.com/lightninglabs/protobuf-hex-display/proto"
"github.com/lightningnetwork/lnd/macaroons"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/urfave/cli" "github.com/urfave/cli"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"gopkg.in/macaroon.v2"
) )
var ( var (
@ -34,6 +38,21 @@ var (
// maxMsgRecvSize is the largest message our client will receive. We // maxMsgRecvSize is the largest message our client will receive. We
// set this to 200MiB atm. // set this to 200MiB atm.
maxMsgRecvSize = grpc.MaxCallRecvMsgSize(1 * 1024 * 1024 * 200) maxMsgRecvSize = grpc.MaxCallRecvMsgSize(1 * 1024 * 1024 * 200)
// defaultMacaroonTimeout is the default macaroon timeout in seconds
// that we set when sending it over the line.
defaultMacaroonTimeout int64 = 60
tlsCertFlag = cli.StringFlag{
Name: "tlscertpath",
Usage: "path to loop's TLS certificate, only needed if loop " +
"runs in the same process as lnd",
}
macaroonPathFlag = cli.StringFlag{
Name: "macaroonpath",
Usage: "path to macaroon file, only needed if loop runs " +
"in the same process as lnd",
}
) )
func printJSON(resp interface{}) { func printJSON(resp interface{}) {
@ -84,6 +103,8 @@ func main() {
Value: "localhost:11010", Value: "localhost:11010",
Usage: "loopd daemon address host:port", Usage: "loopd daemon address host:port",
}, },
tlsCertFlag,
macaroonPathFlag,
} }
app.Commands = []cli.Command{ app.Commands = []cli.Command{
loopOutCommand, loopInCommand, termsCommand, loopOutCommand, loopInCommand, termsCommand,
@ -99,7 +120,9 @@ func main() {
func getClient(ctx *cli.Context) (looprpc.SwapClientClient, func(), error) { func getClient(ctx *cli.Context) (looprpc.SwapClientClient, func(), error) {
rpcServer := ctx.GlobalString("rpcserver") rpcServer := ctx.GlobalString("rpcserver")
conn, err := getClientConn(rpcServer) tlsCertPath := ctx.GlobalString(tlsCertFlag.Name)
macaroonPath := ctx.GlobalString(macaroonPathFlag.Name)
conn, err := getClientConn(rpcServer, tlsCertPath, macaroonPath)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -256,12 +279,36 @@ func logSwap(swap *looprpc.SwapStatus) {
fmt.Println() fmt.Println()
} }
func getClientConn(address string) (*grpc.ClientConn, error) { func getClientConn(address, tlsCertPath, macaroonPath string) (*grpc.ClientConn,
error) {
opts := []grpc.DialOption{ opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(maxMsgRecvSize), grpc.WithDefaultCallOptions(maxMsgRecvSize),
} }
switch {
// If a TLS certificate file is specified, we need to load it and build
// transport credentials with it.
case tlsCertPath != "":
creds, err := credentials.NewClientTLSFromFile(tlsCertPath, "")
if err != nil {
fatal(err)
}
// Macaroons are only allowed to be transmitted over a TLS
// enabled connection.
if macaroonPath != "" {
opts = append(opts, readMacaroon(macaroonPath))
}
opts = append(opts, grpc.WithTransportCredentials(creds))
// By default, if no certificate is supplied, we assume the RPC server
// runs without TLS.
default:
opts = append(opts, grpc.WithInsecure())
}
conn, err := grpc.Dial(address, opts...) conn, err := grpc.Dial(address, opts...)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to connect to RPC server: %v", err) return nil, fmt.Errorf("unable to connect to RPC server: %v", err)
@ -269,3 +316,42 @@ func getClientConn(address string) (*grpc.ClientConn, error) {
return conn, nil return conn, nil
} }
// readMacaroon tries to read the macaroon file at the specified path and create
// gRPC dial options from it.
func readMacaroon(macPath string) grpc.DialOption {
// Load the specified macaroon file.
macBytes, err := ioutil.ReadFile(macPath)
if err != nil {
fatal(fmt.Errorf("unable to read macaroon path : %v", err))
}
mac := &macaroon.Macaroon{}
if err = mac.UnmarshalBinary(macBytes); err != nil {
fatal(fmt.Errorf("unable to decode macaroon: %v", err))
}
macConstraints := []macaroons.Constraint{
// We add a time-based constraint to prevent replay of the
// macaroon. It's good for 60 seconds by default to make up for
// any discrepancy between client and server clocks, but leaking
// the macaroon before it becomes invalid makes it possible for
// an attacker to reuse the macaroon. In addition, the validity
// time of the macaroon is extended by the time the server clock
// is behind the client clock, or shortened by the time the
// server clock is ahead of the client clock (or invalid
// altogether if, in the latter case, this time is more than 60
// seconds).
macaroons.TimeoutConstraint(defaultMacaroonTimeout),
}
// Apply constraints to the macaroon.
constrainedMac, err := macaroons.AddConstraints(mac, macConstraints...)
if err != nil {
fatal(err)
}
// Now we append the macaroon credentials to the dial options.
cred := macaroons.NewMacaroonCredential(constrainedMac)
return grpc.WithPerRPCCredentials(cred)
}

@ -8,8 +8,8 @@ import (
func main() { func main() {
cfg := loopd.RPCConfig{} cfg := loopd.RPCConfig{}
err := loopd.Start(cfg) err := loopd.Run(cfg)
if err != nil { if err != nil {
fmt.Println(err) fmt.Printf("loopd exited with an error: %v\n", err)
} }
} }

@ -28,8 +28,8 @@ type lndConfig struct {
type viewParameters struct{} type viewParameters struct{}
type config struct { type Config struct {
ShowVersion bool `short:"V" long:"version" description:"Display version information and exit"` ShowVersion bool `long:"version" description:"Display version information and exit"`
Insecure bool `long:"insecure" description:"disable tls"` Insecure bool `long:"insecure" description:"disable tls"`
Network string `long:"network" description:"network to run on" choice:"regtest" choice:"testnet" choice:"mainnet" choice:"simnet"` Network string `long:"network" description:"network to run on" choice:"regtest" choice:"testnet" choice:"mainnet" choice:"simnet"`
SwapServer string `long:"swapserver" description:"swap server address host:port"` SwapServer string `long:"swapserver" description:"swap server address host:port"`
@ -42,7 +42,7 @@ type config struct {
MaxLogFiles int `long:"maxlogfiles" description:"Maximum logfiles to keep (0 for no rotation)"` MaxLogFiles int `long:"maxlogfiles" description:"Maximum logfiles to keep (0 for no rotation)"`
MaxLogFileSize int `long:"maxlogfilesize" description:"Maximum logfile size in MB"` MaxLogFileSize int `long:"maxlogfilesize" description:"Maximum logfile size in MB"`
DebugLevel string `short:"d" long:"debuglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify <subsystem>=<level>,<subsystem2>=<level>,... to set the log level for individual subsystems -- Use show to list available subsystems"` DebugLevel string `long:"debuglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify <subsystem>=<level>,<subsystem2>=<level>,... to set the log level for individual subsystems -- Use show to list available subsystems"`
MaxLSATCost uint32 `long:"maxlsatcost" description:"Maximum cost in satoshis that loopd is going to pay for an LSAT token automatically. Does not include routing fees."` MaxLSATCost uint32 `long:"maxlsatcost" description:"Maximum cost in satoshis that loopd is going to pay for an LSAT token automatically. Does not include routing fees."`
MaxLSATFee uint32 `long:"maxlsatfee" description:"Maximum routing fee in satoshis that we are willing to pay while paying for an LSAT token."` MaxLSATFee uint32 `long:"maxlsatfee" description:"Maximum routing fee in satoshis that we are willing to pay while paying for an LSAT token."`
@ -59,19 +59,22 @@ const (
testnetServer = "test.swap.lightning.today:11010" testnetServer = "test.swap.lightning.today:11010"
) )
var defaultConfig = config{ // DefaultConfig returns all default values for the Config struct.
Network: "mainnet", func DefaultConfig() Config {
RPCListen: "localhost:11010", return Config{
RESTListen: "localhost:8081", Network: "mainnet",
Insecure: false, RPCListen: "localhost:11010",
LogDir: defaultLogDir, RESTListen: "localhost:8081",
MaxLogFiles: defaultMaxLogFiles, Insecure: false,
MaxLogFileSize: defaultMaxLogFileSize, LogDir: defaultLogDir,
DebugLevel: defaultLogLevel, MaxLogFiles: defaultMaxLogFiles,
MaxLSATCost: lsat.DefaultMaxCostSats, MaxLogFileSize: defaultMaxLogFileSize,
MaxLSATFee: lsat.DefaultMaxRoutingFeeSats, DebugLevel: defaultLogLevel,
LoopOutMaxParts: defaultLoopOutMaxParts, MaxLSATCost: lsat.DefaultMaxCostSats,
Lnd: &lndConfig{ MaxLSATFee: lsat.DefaultMaxRoutingFeeSats,
Host: "localhost:10009", LoopOutMaxParts: defaultLoopOutMaxParts,
}, Lnd: &lndConfig{
Host: "localhost:10009",
},
}
} }

@ -6,11 +6,8 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"os"
"os/signal"
"runtime/pprof"
"sync" "sync"
"time" "sync/atomic"
proxy "github.com/grpc-ecosystem/grpc-gateway/runtime" proxy "github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/lightninglabs/loop" "github.com/lightninglabs/loop"
@ -24,6 +21,10 @@ var (
// maxMsgRecvSize is the largest message our REST proxy will receive. We // maxMsgRecvSize is the largest message our REST proxy will receive. We
// set this to 200MiB atm. // set this to 200MiB atm.
maxMsgRecvSize = grpc.MaxCallRecvMsgSize(1 * 1024 * 1024 * 200) maxMsgRecvSize = grpc.MaxCallRecvMsgSize(1 * 1024 * 1024 * 200)
// errOnlyStartOnce is the error that is returned if the daemon is
// started more than once.
errOnlyStartOnce = fmt.Errorf("daemon can only be started once")
) )
// listenerCfg holds closures used to retrieve listeners for the gRPC services. // listenerCfg holds closures used to retrieve listeners for the gRPC services.
@ -38,72 +39,145 @@ type listenerCfg struct {
getLnd func(string, *lndConfig) (*lndclient.GrpcLndServices, error) getLnd func(string, *lndConfig) (*lndclient.GrpcLndServices, error)
} }
// daemon runs loopd in daemon mode. It will listen for grpc connections, // Daemon is the struct that holds one instance of the loop client daemon.
// execute commands and pass back swap status information. type Daemon struct {
func daemon(config *config, lisCfg *listenerCfg) error { // To be used atomically. Declared first to optimize struct alignment.
lnd, err := lisCfg.getLnd(config.Network, config.Lnd) started int32
if err != nil {
return err // swapClientServer is the embedded RPC server that satisfies the client
} // RPC interface. We embed this struct so the Daemon itself can be
defer lnd.Close() // registered to an existing grpc.Server to run as a subserver in the
// same process.
swapClientServer
// ErrChan is an error channel that users of the Daemon struct must use
// to detect runtime errors and also whether a shutdown is fully
// completed.
ErrChan chan error
cfg *Config
listenerCfg *listenerCfg
internalErrChan chan error
lnd *lndclient.GrpcLndServices
clientCleanup func()
wg sync.WaitGroup
quit chan struct{}
stopOnce sync.Once
mainCtx context.Context
mainCtxCancel func()
grpcServer *grpc.Server
grpcListener net.Listener
restServer *http.Server
restListener net.Listener
restCtxCancel func()
}
// If no swap server is specified, use the default addresses for mainnet // New creates a new instance of the loop client daemon.
// and testnet. func New(config *Config, lisCfg *listenerCfg) *Daemon {
if config.SwapServer == "" { return &Daemon{
// TODO(wilmer): Use onion service addresses when proxy is // We send exactly one error on this channel if something goes
// active. // wrong at runtime. Or a nil value if the shutdown was
switch config.Network { // successful. But in case nobody's listening, we don't want to
case "mainnet": // block on it so we buffer it.
config.SwapServer = mainnetServer ErrChan: make(chan error, 1),
case "testnet":
config.SwapServer = testnetServer quit: make(chan struct{}),
default: cfg: config,
return errors.New("no swap server address specified") listenerCfg: lisCfg,
}
// We have 3 goroutines that could potentially send an error.
// We react on the first error but in case more than one exits
// with an error we don't want them to block.
internalErrChan: make(chan error, 3),
} }
}
log.Infof("Swap server address: %v", config.SwapServer) // Start starts loopd in daemon mode. It will listen for grpc connections,
// execute commands and pass back swap status information.
func (d *Daemon) Start() error {
// There should be no reason to start the daemon twice. Therefore return
// an error if that's tried. This is mostly to guard against Start and
// StartAsSubserver both being called.
if atomic.AddInt32(&d.started, 1) != 1 {
return errOnlyStartOnce
}
// Create an instance of the loop client library. var err error
swapClient, cleanup, err := getClient(config, &lnd.LndServices) d.lnd, err = d.listenerCfg.getLnd(d.cfg.Network, d.cfg.Lnd)
if err != nil { if err != nil {
return err return err
} }
defer cleanup()
// Retrieve all currently existing swaps from the database. // With lnd connected, initialize everything else, such as the swap
swapsList, err := swapClient.FetchSwaps() // server client, the swap client RPC server instance and our main swap
// and error handlers. If this fails, then nothing has been started yet
// and we can just return the error.
err = d.initialize()
if err != nil { if err != nil {
return err return err
} }
swaps := make(map[lntypes.Hash]loop.SwapInfo) // If we get here, we already have started several goroutines. So if
for _, s := range swapsList { // anything goes wrong now, we need to cleanly shut down again.
swaps[s.SwapHash] = *s startErr := d.startWebServers()
if startErr != nil {
log.Errorf("Error while starting daemon: %v", err)
d.Stop()
stopErr := <-d.ErrChan
if stopErr != nil {
log.Errorf("Error while stopping daemon: %v", stopErr)
}
return startErr
} }
// Instantiate the loopd gRPC server. return nil
server := swapClientServer{ }
impl: swapClient,
lnd: &lnd.LndServices, // StartAsSubserver is an alternative to Start where the RPC server does not
swaps: swaps, // create its own gRPC server but registers to an existing one. The same goes
subscribers: make(map[int]chan<- interface{}), // for REST (if enabled), instead of creating an own mux and HTTP server, we
statusChan: make(chan loop.SwapInfo), // register to an existing one.
func (d *Daemon) StartAsSubserver(lndGrpc *lndclient.GrpcLndServices) error {
// There should be no reason to start the daemon twice. Therefore return
// an error if that's tried. This is mostly to guard against Start and
// StartAsSubserver both being called.
if atomic.AddInt32(&d.started, 1) != 1 {
return errOnlyStartOnce
} }
// When starting as a subserver, we get passed in an already established
// connection to lnd that might be shared among other subservers.
d.lnd = lndGrpc
// With lnd already pre-connected, initialize everything else, such as
// the swap server client, the RPC server instance and our main swap
// handlers. If this fails, then nothing has been started yet and we can
// just return the error.
return d.initialize()
}
// startWebServers starts the gRPC and REST servers in goroutines.
func (d *Daemon) startWebServers() error {
var err error
// With our client created, let's now finish setting up and start our
// RPC server.
serverOpts := []grpc.ServerOption{} serverOpts := []grpc.ServerOption{}
grpcServer := grpc.NewServer(serverOpts...) d.grpcServer = grpc.NewServer(serverOpts...)
looprpc.RegisterSwapClientServer(grpcServer, &server) looprpc.RegisterSwapClientServer(d.grpcServer, d)
// Next, start the gRPC server listening for HTTP/2 connections. // Next, start the gRPC server listening for HTTP/2 connections.
log.Infof("Starting gRPC listener") log.Infof("Starting gRPC listener")
grpcListener, err := lisCfg.grpcListener() d.grpcListener, err = d.listenerCfg.grpcListener()
if err != nil { if err != nil {
return fmt.Errorf("RPC server unable to listen on %s", return fmt.Errorf("RPC server unable to listen on %s: %v",
config.RPCListen) d.cfg.RPCListen, err)
} }
defer grpcListener.Close()
// The default JSON marshaler of the REST proxy only sets OrigName to // The default JSON marshaler of the REST proxy only sets OrigName to
// true, which instructs it to use the same field names as specified in // true, which instructs it to use the same field names as specified in
@ -119,117 +193,238 @@ func daemon(config *config, lisCfg *listenerCfg) error {
// We'll also create and start an accompanying proxy to serve clients // We'll also create and start an accompanying proxy to serve clients
// through REST. // through REST.
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() d.restCtxCancel = cancel
mux := proxy.NewServeMux(customMarshalerOption) mux := proxy.NewServeMux(customMarshalerOption)
var restHandler http.Handler = mux var restHandler http.Handler = mux
if config.CORSOrigin != "" { if d.cfg.CORSOrigin != "" {
restHandler = allowCORS(restHandler, config.CORSOrigin) restHandler = allowCORS(restHandler, d.cfg.CORSOrigin)
} }
proxyOpts := []grpc.DialOption{ proxyOpts := []grpc.DialOption{
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithDefaultCallOptions(maxMsgRecvSize), grpc.WithDefaultCallOptions(maxMsgRecvSize),
} }
err = looprpc.RegisterSwapClientHandlerFromEndpoint( err = looprpc.RegisterSwapClientHandlerFromEndpoint(
ctx, mux, config.RPCListen, proxyOpts, ctx, mux, d.cfg.RPCListen, proxyOpts,
) )
if err != nil { if err != nil {
return err return err
} }
restListener, err := lisCfg.restListener() d.restListener, err = d.listenerCfg.restListener()
if err != nil { if err != nil {
return fmt.Errorf("REST proxy unable to listen on %s", return fmt.Errorf("REST proxy unable to listen on %s: %v",
config.RESTListen) d.cfg.RESTListen, err)
} }
// A nil listener indicates REST is disabled. // A nil listener indicates REST is disabled.
if restListener != nil { if d.restListener != nil {
log.Infof("Starting REST proxy listener") log.Infof("Starting REST proxy listener")
defer restListener.Close() d.restServer = &http.Server{Handler: restHandler}
proxy := &http.Server{Handler: restHandler}
d.wg.Add(1)
go func() { go func() {
err := proxy.Serve(restListener) defer d.wg.Done()
log.Infof("REST proxy listening on %s",
d.restListener.Addr())
err := d.restServer.Serve(d.restListener)
// ErrServerClosed is always returned when the proxy is // ErrServerClosed is always returned when the proxy is
// shut down, so don't log it. // shut down, so don't log it.
if err != nil && err != http.ErrServerClosed { if err != nil && err != http.ErrServerClosed {
log.Error(err) // Notify the main error handler goroutine that
// we exited unexpectedly here. We don't have to
// worry about blocking as the internal error
// channel is sufficiently buffered.
d.internalErrChan <- err
} }
}() }()
} else { } else {
log.Infof("REST proxy disabled") log.Infof("REST proxy disabled")
} }
mainCtx, cancel := context.WithCancel(context.Background()) // Start the grpc server.
var wg sync.WaitGroup d.wg.Add(1)
go func() {
defer d.wg.Done()
log.Infof("RPC server listening on %s", d.grpcListener.Addr())
err = d.grpcServer.Serve(d.grpcListener)
if err != nil && err != grpc.ErrServerStopped {
// Notify the main error handler goroutine that
// we exited unexpectedly here. We don't have to
// worry about blocking as the internal error
// channel is sufficiently buffered.
d.internalErrChan <- err
}
}()
return nil
}
// initialize creates and initializes an instance of the swap server client,
// the swap client RPC server instance and our main swap and error handlers. If
// this method fails with an error then no goroutine was started yet and no
// cleanup is necessary. If it succeeds, then goroutines have been spawned.
func (d *Daemon) initialize() error {
// If no swap server is specified, use the default addresses for mainnet
// and testnet.
if d.cfg.SwapServer == "" {
// TODO(wilmer): Use onion service addresses when proxy is
// active.
switch d.cfg.Network {
case "mainnet":
d.cfg.SwapServer = mainnetServer
case "testnet":
d.cfg.SwapServer = testnetServer
default:
return errors.New("no swap server address specified")
}
}
log.Infof("Swap server address: %v", d.cfg.SwapServer)
// Create an instance of the loop client library.
swapclient, clientCleanup, err := getClient(d.cfg, &d.lnd.LndServices)
if err != nil {
return err
}
d.clientCleanup = clientCleanup
// Both the client RPC server and and the swap server client should
// stop on main context cancel. So we create it early and pass it down.
d.mainCtx, d.mainCtxCancel = context.WithCancel(context.Background())
// Now finally fully initialize the swap client RPC server instance.
d.swapClientServer = swapClientServer{
impl: swapclient,
lnd: &d.lnd.LndServices,
swaps: make(map[lntypes.Hash]loop.SwapInfo),
subscribers: make(map[int]chan<- interface{}),
statusChan: make(chan loop.SwapInfo),
mainCtx: d.mainCtx,
}
// Retrieve all currently existing swaps from the database.
swapsList, err := d.impl.FetchSwaps()
if err != nil {
// The client is the only thing we started yet, so if we clean
// up its connection now, nothing else needs to be shut down at
// this point.
clientCleanup()
return err
}
for _, s := range swapsList {
d.swaps[s.SwapHash] = *s
}
// Start the swap client itself. // Start the swap client itself.
wg.Add(1) d.wg.Add(1)
go func() { go func() {
defer wg.Done() defer d.wg.Done()
log.Infof("Starting swap client") log.Infof("Starting swap client")
err := swapClient.Run(mainCtx, server.statusChan) err := d.impl.Run(d.mainCtx, d.statusChan)
if err != nil { if err != nil {
log.Error(err) // Notify the main error handler goroutine that
// we exited unexpectedly here. We don't have to
// worry about blocking as the internal error
// channel is sufficiently buffered.
d.internalErrChan <- err
} }
log.Infof("Swap client stopped") log.Infof("Swap client stopped")
log.Infof("Stopping gRPC server")
grpcServer.Stop()
cancel()
}() }()
// Start a goroutine that broadcasts swap updates to clients. // Start a goroutine that broadcasts swap updates to clients.
wg.Add(1) d.wg.Add(1)
go func() { go func() {
defer wg.Done() defer d.wg.Done()
log.Infof("Waiting for updates") log.Infof("Waiting for updates")
server.processStatusUpdates(mainCtx) d.processStatusUpdates(d.mainCtx)
}() }()
// Start the grpc server. // Last, start our internal error handler. This will return exactly one
wg.Add(1) // error or nil on the main error channel to inform the caller that
// something went wrong or that shutdown is complete. We don't add to
// the wait group here because this goroutine will itself wait for the
// stop to complete and signal its completion through the main error
// channel.
go func() { go func() {
defer wg.Done() var runtimeErr error
log.Infof("RPC server listening on %s", grpcListener.Addr()) // There are only two ways this goroutine can exit. Either there
// is an internal error or the caller requests shutdown. In both
if restListener != nil { // cases we wait for the stop to complete before we signal the
log.Infof("REST proxy listening on %s", restListener.Addr()) // caller that we're done.
select {
case runtimeErr = <-d.internalErrChan:
log.Errorf("Runtime error in daemon, shutting down: "+
"%v", runtimeErr)
case <-d.quit:
} }
err = grpcServer.Serve(grpcListener) // We need to shutdown before sending the error on the channel,
if err != nil { // otherwise a caller might exit the process too early.
log.Error(err) d.stop()
} log.Info("Daemon exited")
// The caller expects exactly one message. So we send the error
// even if it's nil because we cleanly shut down.
d.ErrChan <- runtimeErr
}() }()
interruptChannel := make(chan os.Signal, 1) return nil
signal.Notify(interruptChannel, os.Interrupt) }
// Run until the users terminates loopd or an error occurred. // Stop tries to gracefully shut down the daemon. A caller needs to wait for a
select { // message on the main error channel indicating that the shutdown is completed.
case <-interruptChannel: func (d *Daemon) Stop() {
log.Infof("Received SIGINT (Ctrl+C).") d.stopOnce.Do(func() {
close(d.quit)
})
}
// TODO: Remove debug code. // stop does the actual shutdown and blocks until all goroutines have exit.
// Debug code to dump goroutines on hanging exit. func (d *Daemon) stop() {
go func() { // First of all, we can cancel the main context that all event handlers
time.Sleep(5 * time.Second) // are using. This should stop all swap activity and all event handlers
_ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) // should exit.
}() if d.mainCtxCancel != nil {
d.mainCtxCancel()
}
cancel() // As there is no swap activity anymore, we can forcefully shutdown the
case <-mainCtx.Done(): // gRPC and HTTP servers now.
log.Infof("Stopping gRPC server")
if d.grpcServer != nil {
d.grpcServer.Stop()
}
log.Infof("Stopping REST server")
if d.restServer != nil {
// Don't return the error here, we first want to give everything
// else a chance to shut down cleanly.
err := d.restServer.Close()
if err != nil {
log.Errorf("Error stopping REST server: %v", err)
}
}
if d.restCtxCancel != nil {
d.restCtxCancel()
} }
wg.Wait() // Next, shut down the connections to lnd and the swap server.
if d.lnd != nil {
d.lnd.Close()
}
if d.clientCleanup != nil {
d.clientCleanup()
}
return nil // Everything should be shutting down now, wait for completion.
d.wg.Wait()
} }
// allowCORS wraps the given http.Handler with a function that adds the // allowCORS wraps the given http.Handler with a function that adds the

@ -13,6 +13,7 @@ import (
"github.com/lightninglabs/loop/lndclient" "github.com/lightninglabs/loop/lndclient"
"github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/lnrpc/verrpc" "github.com/lightningnetwork/lnd/lnrpc/verrpc"
"github.com/lightningnetwork/lnd/signal"
) )
const defaultConfigFilename = "loopd.conf" const defaultConfigFilename = "loopd.conf"
@ -47,7 +48,7 @@ type RPCConfig struct {
// newListenerCfg creates and returns a new listenerCfg from the passed config // newListenerCfg creates and returns a new listenerCfg from the passed config
// and RPCConfig. // and RPCConfig.
func newListenerCfg(config *config, rpcCfg RPCConfig) *listenerCfg { func newListenerCfg(config *Config, rpcCfg RPCConfig) *listenerCfg {
return &listenerCfg{ return &listenerCfg{
grpcListener: func() (net.Listener, error) { grpcListener: func() (net.Listener, error) {
// If a custom RPC listener is set, we will listen on // If a custom RPC listener is set, we will listen on
@ -91,8 +92,9 @@ func newListenerCfg(config *config, rpcCfg RPCConfig) *listenerCfg {
} }
} }
func Start(rpcCfg RPCConfig) error { // Run starts the loop daemon and blocks until it's shut down again.
config := defaultConfig func Run(rpcCfg RPCConfig) error {
config := DefaultConfig()
// Parse command line flags. // Parse command line flags.
parser := flags.NewParser(&config, flags.Default) parser := flags.NewParser(&config, flags.Default)
@ -168,7 +170,26 @@ func Start(rpcCfg RPCConfig) error {
// Execute command. // Execute command.
if parser.Active == nil { if parser.Active == nil {
return daemon(&config, lisCfg) signal.Intercept()
daemon := New(&config, lisCfg)
if err := daemon.Start(); err != nil {
return err
}
select {
case <-signal.ShutdownChannel():
log.Infof("Received SIGINT (Ctrl+C).")
daemon.Stop()
// The above stop will return immediately. But we'll be
// notified on the error channel once the process is
// complete.
return <-daemon.ErrChan
case err := <-daemon.ErrChan:
return err
}
} }
if parser.Active.Name == "view" { if parser.Active.Name == "view" {

@ -39,6 +39,7 @@ type swapClientServer struct {
statusChan chan loop.SwapInfo statusChan chan loop.SwapInfo
nextSubscriberID int nextSubscriberID int
swapsLock sync.Mutex swapsLock sync.Mutex
mainCtx context.Context
} }
// LoopOut initiates an loop out swap with the given parameters. The call // LoopOut initiates an loop out swap with the given parameters. The call
@ -264,8 +265,14 @@ func (s *swapClientServer) Monitor(in *looprpc.MonitorRequest,
if err := send(swap); err != nil { if err := send(swap); err != nil {
return err return err
} }
// The client cancels the subscription.
case <-server.Context().Done(): case <-server.Context().Done():
return nil return nil
// The server is shutting down.
case <-s.mainCtx.Done():
return fmt.Errorf("server is shutting down")
} }
} }
} }

@ -10,7 +10,7 @@ import (
) )
// getClient returns an instance of the swap client. // getClient returns an instance of the swap client.
func getClient(config *config, lnd *lndclient.LndServices) (*loop.Client, func getClient(config *Config, lnd *lndclient.LndServices) (*loop.Client,
func(), error) { func(), error) {
storeDir, err := getStoreDir(config.Network) storeDir, err := getStoreDir(config.Network)

@ -10,7 +10,7 @@ import (
) )
// view prints all swaps currently in the database. // view prints all swaps currently in the database.
func view(config *config, lisCfg *listenerCfg) error { func view(config *Config, lisCfg *listenerCfg) error {
chainParams, err := swap.ChainParamsFromNetwork(config.Network) chainParams, err := swap.ChainParamsFromNetwork(config.Network)
if err != nil { if err != nil {
return err return err

Loading…
Cancel
Save