Browse Source

loopd: move global state into server state

pull/140/head
Oliver Gugger 1 year ago
parent
commit
403b4097de
No known key found for this signature in database GPG Key ID: 8E4256593F177720
3 changed files with 58 additions and 46 deletions
  1. +9
    -24
      loopd/daemon.go
  2. +1
    -11
      loopd/start.go
  3. +48
    -11
      loopd/swapclient_server.go

+ 9
- 24
loopd/daemon.go View File

@ -16,6 +16,7 @@ import (
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/lndclient"
"github.com/lightninglabs/loop/looprpc"
"github.com/lightningnetwork/lnd/lntypes"
"google.golang.org/grpc"
)
@ -68,14 +69,18 @@ func daemon(config *config, lisCfg *listenerCfg) error {
return err
}
swaps := make(map[lntypes.Hash]loop.SwapInfo)
for _, s := range swapsList {
swaps[s.SwapHash] = *s
}
// Instantiate the loopd gRPC server.
server := swapClientServer{
impl: swapClient,
lnd: &lnd.LndServices,
impl: swapClient,
lnd: &lnd.LndServices,
swaps: swaps,
subscribers: make(map[int]chan<- interface{}),
statusChan: make(chan loop.SwapInfo),
}
serverOpts := []grpc.ServerOption{}
@ -130,8 +135,6 @@ func daemon(config *config, lisCfg *listenerCfg) error {
log.Infof("REST proxy disabled")
}
statusChan := make(chan loop.SwapInfo)
mainCtx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
@ -141,7 +144,7 @@ func daemon(config *config, lisCfg *listenerCfg) error {
defer wg.Done()
log.Infof("Starting swap client")
err := swapClient.Run(mainCtx, statusChan)
err := swapClient.Run(mainCtx, server.statusChan)
if err != nil {
log.Error(err)
}
@ -159,25 +162,7 @@ func daemon(config *config, lisCfg *listenerCfg) error {
defer wg.Done()
log.Infof("Waiting for updates")
for {
select {
case swap := <-statusChan:
swapsLock.Lock()
swaps[swap.SwapHash] = swap
for _, subscriber := range subscribers {
select {
case subscriber <- swap:
case <-mainCtx.Done():
return
}
}
swapsLock.Unlock()
case <-mainCtx.Done():
return
}
}
server.processStatusUpdates(mainCtx)
}()
// Start the grpc server.

+ 1
- 11
loopd/start.go View File

@ -7,26 +7,16 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"github.com/jessevdk/go-flags"
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/lndclient"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/lntypes"
)
const (
defaultConfTarget = int32(6)
)
var (
defaultConfTarget = int32(6)
defaultConfigFilename = "loopd.conf"
swaps = make(map[lntypes.Hash]loop.SwapInfo)
subscribers = make(map[int]chan<- interface{})
nextSubscriberID int
swapsLock sync.Mutex
)
// RPCConfig holds optional options that can be used to make the loop daemon

+ 48
- 11
loopd/swapclient_server.go View File

@ -5,8 +5,10 @@ import (
"errors"
"fmt"
"sort"
"sync"
"time"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightninglabs/loop"
@ -29,8 +31,13 @@ const (
// swapClientServer implements the grpc service exposed by loopd.
type swapClientServer struct {
impl *loop.Client
lnd *lndclient.LndServices
impl *loop.Client
lnd *lndclient.LndServices
swaps map[lntypes.Hash]loop.SwapInfo
subscribers map[int]chan<- interface{}
statusChan chan loop.SwapInfo
nextSubscriberID int
swapsLock sync.Mutex
}
// LoopOut initiates an loop out swap with the given parameters. The call
@ -162,14 +169,14 @@ func (s *swapClientServer) Monitor(in *looprpc.MonitorRequest,
// Add this subscriber to the global subscriber list. Also create a
// snapshot of all pending and completed swaps within the lock, to
// prevent subscribers from receiving duplicate updates.
swapsLock.Lock()
s.swapsLock.Lock()
id := nextSubscriberID
nextSubscriberID++
subscribers[id] = queue.ChanIn()
id := s.nextSubscriberID
s.nextSubscriberID++
s.subscribers[id] = queue.ChanIn()
var pendingSwaps, completedSwaps []loop.SwapInfo
for _, swap := range swaps {
for _, swap := range s.swaps {
if swap.State.Type() == loopdb.StateTypePending {
pendingSwaps = append(pendingSwaps, swap)
} else {
@ -177,13 +184,13 @@ func (s *swapClientServer) Monitor(in *looprpc.MonitorRequest,
}
}
swapsLock.Unlock()
s.swapsLock.Unlock()
defer func() {
queue.Stop()
swapsLock.Lock()
delete(subscribers, id)
swapsLock.Unlock()
s.swapsLock.Lock()
delete(s.subscribers, id)
s.swapsLock.Unlock()
}()
// Sort completed swaps new to old.
@ -383,6 +390,36 @@ func (s *swapClientServer) GetLsatTokens(ctx context.Context,
return &looprpc.TokensResponse{Tokens: rpcTokens}, nil
}
// processStatusUpdates reads updates on the status channel and processes them.
//
// NOTE: This must run inside a goroutine as it blocks until the main context
// shuts down.
func (s *swapClientServer) processStatusUpdates(mainCtx context.Context) {
for {
select {
// On updates, refresh the server's in-memory state and inform
// subscribers about the changes.
case swp := <-s.statusChan:
s.swapsLock.Lock()
s.swaps[swp.SwapHash] = swp
for _, subscriber := range s.subscribers {
select {
case subscriber <- swp:
case <-mainCtx.Done():
return
}
}
s.swapsLock.Unlock()
// Server is shutting down.
case <-mainCtx.Done():
return
}
}
}
// validateConfTarget ensures the given confirmation target is valid. If one
// isn't specified (0 value), then the default target is used.
func validateConfTarget(target, defaultTarget int32) (int32, error) {

Loading…
Cancel
Save