From 403b4097defd4f68e2a3b25e7f38154b3a320503 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Fri, 31 Jan 2020 13:57:21 +0100 Subject: [PATCH] loopd: move global state into server state --- loopd/daemon.go | 33 ++++++--------------- loopd/start.go | 12 +------- loopd/swapclient_server.go | 59 +++++++++++++++++++++++++++++++------- 3 files changed, 58 insertions(+), 46 deletions(-) diff --git a/loopd/daemon.go b/loopd/daemon.go index 4989579..533698d 100644 --- a/loopd/daemon.go +++ b/loopd/daemon.go @@ -16,6 +16,7 @@ import ( "github.com/lightninglabs/loop" "github.com/lightninglabs/loop/lndclient" "github.com/lightninglabs/loop/looprpc" + "github.com/lightningnetwork/lnd/lntypes" "google.golang.org/grpc" ) @@ -68,14 +69,18 @@ func daemon(config *config, lisCfg *listenerCfg) error { return err } + swaps := make(map[lntypes.Hash]loop.SwapInfo) for _, s := range swapsList { swaps[s.SwapHash] = *s } // Instantiate the loopd gRPC server. server := swapClientServer{ - impl: swapClient, - lnd: &lnd.LndServices, + impl: swapClient, + lnd: &lnd.LndServices, + swaps: swaps, + subscribers: make(map[int]chan<- interface{}), + statusChan: make(chan loop.SwapInfo), } serverOpts := []grpc.ServerOption{} @@ -130,8 +135,6 @@ func daemon(config *config, lisCfg *listenerCfg) error { log.Infof("REST proxy disabled") } - statusChan := make(chan loop.SwapInfo) - mainCtx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup @@ -141,7 +144,7 @@ func daemon(config *config, lisCfg *listenerCfg) error { defer wg.Done() log.Infof("Starting swap client") - err := swapClient.Run(mainCtx, statusChan) + err := swapClient.Run(mainCtx, server.statusChan) if err != nil { log.Error(err) } @@ -159,25 +162,7 @@ func daemon(config *config, lisCfg *listenerCfg) error { defer wg.Done() log.Infof("Waiting for updates") - for { - select { - case swap := <-statusChan: - swapsLock.Lock() - swaps[swap.SwapHash] = swap - - for _, subscriber := range subscribers { - select { - case subscriber <- swap: - case <-mainCtx.Done(): - return - } - } - - swapsLock.Unlock() - case <-mainCtx.Done(): - return - } - } + server.processStatusUpdates(mainCtx) }() // Start the grpc server. diff --git a/loopd/start.go b/loopd/start.go index 321ed5c..1acc1ae 100644 --- a/loopd/start.go +++ b/loopd/start.go @@ -7,26 +7,16 @@ import ( "os" "path/filepath" "strings" - "sync" "github.com/jessevdk/go-flags" "github.com/lightninglabs/loop" "github.com/lightninglabs/loop/lndclient" "github.com/lightningnetwork/lnd/build" - "github.com/lightningnetwork/lnd/lntypes" ) const ( - defaultConfTarget = int32(6) -) - -var ( + defaultConfTarget = int32(6) defaultConfigFilename = "loopd.conf" - - swaps = make(map[lntypes.Hash]loop.SwapInfo) - subscribers = make(map[int]chan<- interface{}) - nextSubscriberID int - swapsLock sync.Mutex ) // RPCConfig holds optional options that can be used to make the loop daemon diff --git a/loopd/swapclient_server.go b/loopd/swapclient_server.go index 4f704ab..8966c7e 100644 --- a/loopd/swapclient_server.go +++ b/loopd/swapclient_server.go @@ -5,8 +5,10 @@ import ( "errors" "fmt" "sort" + "sync" "time" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/queue" "github.com/lightninglabs/loop" @@ -29,8 +31,13 @@ const ( // swapClientServer implements the grpc service exposed by loopd. type swapClientServer struct { - impl *loop.Client - lnd *lndclient.LndServices + impl *loop.Client + lnd *lndclient.LndServices + swaps map[lntypes.Hash]loop.SwapInfo + subscribers map[int]chan<- interface{} + statusChan chan loop.SwapInfo + nextSubscriberID int + swapsLock sync.Mutex } // LoopOut initiates an loop out swap with the given parameters. The call @@ -162,14 +169,14 @@ func (s *swapClientServer) Monitor(in *looprpc.MonitorRequest, // Add this subscriber to the global subscriber list. Also create a // snapshot of all pending and completed swaps within the lock, to // prevent subscribers from receiving duplicate updates. - swapsLock.Lock() + s.swapsLock.Lock() - id := nextSubscriberID - nextSubscriberID++ - subscribers[id] = queue.ChanIn() + id := s.nextSubscriberID + s.nextSubscriberID++ + s.subscribers[id] = queue.ChanIn() var pendingSwaps, completedSwaps []loop.SwapInfo - for _, swap := range swaps { + for _, swap := range s.swaps { if swap.State.Type() == loopdb.StateTypePending { pendingSwaps = append(pendingSwaps, swap) } else { @@ -177,13 +184,13 @@ func (s *swapClientServer) Monitor(in *looprpc.MonitorRequest, } } - swapsLock.Unlock() + s.swapsLock.Unlock() defer func() { queue.Stop() - swapsLock.Lock() - delete(subscribers, id) - swapsLock.Unlock() + s.swapsLock.Lock() + delete(s.subscribers, id) + s.swapsLock.Unlock() }() // Sort completed swaps new to old. @@ -383,6 +390,36 @@ func (s *swapClientServer) GetLsatTokens(ctx context.Context, return &looprpc.TokensResponse{Tokens: rpcTokens}, nil } +// processStatusUpdates reads updates on the status channel and processes them. +// +// NOTE: This must run inside a goroutine as it blocks until the main context +// shuts down. +func (s *swapClientServer) processStatusUpdates(mainCtx context.Context) { + for { + select { + // On updates, refresh the server's in-memory state and inform + // subscribers about the changes. + case swp := <-s.statusChan: + s.swapsLock.Lock() + s.swaps[swp.SwapHash] = swp + + for _, subscriber := range s.subscribers { + select { + case subscriber <- swp: + case <-mainCtx.Done(): + return + } + } + + s.swapsLock.Unlock() + + // Server is shutting down. + case <-mainCtx.Done(): + return + } + } +} + // validateConfTarget ensures the given confirmation target is valid. If one // isn't specified (0 value), then the default target is used. func validateConfTarget(target, defaultTarget int32) (int32, error) {