loopd: add reservation handling

pull/632/head
sputn1ck 9 months ago
parent 4d558b1418
commit 49c40d9173
No known key found for this signature in database
GPG Key ID: 671103D881A5F0E4

@ -2,6 +2,7 @@ package reservation
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"time" "time"
@ -24,8 +25,8 @@ type Manager struct {
sync.Mutex sync.Mutex
} }
// NewReservationManager creates a new reservation manager. // NewManager creates a new reservation manager.
func NewReservationManager(cfg *Config) *Manager { func NewManager(cfg *Config) *Manager {
return &Manager{ return &Manager{
cfg: cfg, cfg: cfg,
activeReservations: make(map[ID]*FSM), activeReservations: make(map[ID]*FSM),
@ -71,7 +72,7 @@ func (m *Manager) Run(ctx context.Context, height int32) error {
case reservationRes := <-reservationResChan: case reservationRes := <-reservationResChan:
log.Debugf("Received reservation %x", log.Debugf("Received reservation %x",
reservationRes.ReservationId) reservationRes.ReservationId)
err := m.newReservation( _, err := m.newReservation(
runCtx, uint32(currentHeight), reservationRes, runCtx, uint32(currentHeight), reservationRes,
) )
if err != nil { if err != nil {
@ -90,19 +91,19 @@ func (m *Manager) Run(ctx context.Context, height int32) error {
// newReservation creates a new reservation from the reservation request. // newReservation creates a new reservation from the reservation request.
func (m *Manager) newReservation(ctx context.Context, currentHeight uint32, func (m *Manager) newReservation(ctx context.Context, currentHeight uint32,
req *reservationrpc.ServerReservationNotification) error { req *reservationrpc.ServerReservationNotification) (*FSM, error) {
var reservationID ID var reservationID ID
err := reservationID.FromByteSlice( err := reservationID.FromByteSlice(
req.ReservationId, req.ReservationId,
) )
if err != nil { if err != nil {
return err return nil, err
} }
serverKey, err := btcec.ParsePubKey(req.ServerKey) serverKey, err := btcec.ParsePubKey(req.ServerKey)
if err != nil { if err != nil {
return err return nil, err
} }
// Create the reservation state machine. We need to pass in the runCtx // Create the reservation state machine. We need to pass in the runCtx
@ -136,14 +137,19 @@ func (m *Manager) newReservation(ctx context.Context, currentHeight uint32,
// We'll now wait for the reservation to be in the state where it is // We'll now wait for the reservation to be in the state where it is
// waiting to be confirmed. // waiting to be confirmed.
err = reservationFSM.DefaultObserver.WaitForState( err = reservationFSM.DefaultObserver.WaitForState(
ctx, time.Minute, WaitForConfirmation, ctx, 5*time.Second, WaitForConfirmation,
fsm.WithWaitForStateOption(time.Second), fsm.WithWaitForStateOption(time.Second),
) )
if err != nil { if err != nil {
return err if reservationFSM.LastActionError != nil {
return nil, fmt.Errorf("error waiting for "+
"state: %v, last action error: %v",
err, reservationFSM.LastActionError)
}
return nil, err
} }
return nil return reservationFSM, nil
} }
// RegisterReservationNotifications registers a new reservation notification // RegisterReservationNotifications registers a new reservation notification

@ -18,7 +18,11 @@ import (
"github.com/lightninglabs/loop" "github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/loopd/perms" "github.com/lightninglabs/loop/loopd/perms"
"github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/looprpc"
"github.com/lightninglabs/loop/instantout/reservation"
loop_looprpc "github.com/lightninglabs/loop/looprpc"
loop_swaprpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/macaroons" "github.com/lightningnetwork/lnd/macaroons"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -62,6 +66,10 @@ type Daemon struct {
// same process. // same process.
swapClientServer swapClientServer
// reservationManager is the manager that handles all reservation state
// machines.
reservationManager *reservation.Manager
// ErrChan is an error channel that users of the Daemon struct must use // ErrChan is an error channel that users of the Daemon struct must use
// to detect runtime errors and also whether a shutdown is fully // to detect runtime errors and also whether a shutdown is fully
// completed. // completed.
@ -226,7 +234,7 @@ func (d *Daemon) startWebServers() error {
grpc.UnaryInterceptor(unaryInterceptor), grpc.UnaryInterceptor(unaryInterceptor),
grpc.StreamInterceptor(streamInterceptor), grpc.StreamInterceptor(streamInterceptor),
) )
looprpc.RegisterSwapClientServer(d.grpcServer, d) loop_looprpc.RegisterSwapClientServer(d.grpcServer, d)
// Register our debug server if it is compiled in. // Register our debug server if it is compiled in.
d.registerDebugServer() d.registerDebugServer()
@ -286,7 +294,7 @@ func (d *Daemon) startWebServers() error {
restProxyDest, "[::]", "[::1]", 1, restProxyDest, "[::]", "[::1]", 1,
) )
} }
err = looprpc.RegisterSwapClientHandlerFromEndpoint( err = loop_looprpc.RegisterSwapClientHandlerFromEndpoint(
ctx, mux, restProxyDest, proxyOpts, ctx, mux, restProxyDest, proxyOpts,
) )
if err != nil { if err != nil {
@ -399,7 +407,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
return err return err
} }
swapDb, _, err := openDatabase(d.cfg, chainParams) swapDb, baseDb, err := openDatabase(d.cfg, chainParams)
if err != nil { if err != nil {
return err return err
} }
@ -413,6 +421,15 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
} }
d.clientCleanup = clientCleanup d.clientCleanup = clientCleanup
// Create a reservation server client.
reservationClient := loop_swaprpc.NewReservationServiceClient(
swapClient.Conn,
)
// Both the client RPC server and the swap server client should stop
// on main context cancel. So we create it early and pass it down.
d.mainCtx, d.mainCtxCancel = context.WithCancel(context.Background())
// Add our debug permissions to our main set of required permissions // Add our debug permissions to our main set of required permissions
// if compiled in. // if compiled in.
for endpoint, perm := range debugRequiredPermissions { for endpoint, perm := range debugRequiredPermissions {
@ -466,17 +483,32 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
} }
} }
// Create the reservation rpc server.
reservationStore := reservation.NewSQLStore(baseDb)
reservationConfig := &reservation.Config{
Store: reservationStore,
Wallet: d.lnd.WalletKit,
ChainNotifier: d.lnd.ChainNotifier,
ReservationClient: reservationClient,
FetchL402: swapClient.Server.FetchL402,
}
d.reservationManager = reservation.NewManager(
reservationConfig,
)
// Now finally fully initialize the swap client RPC server instance. // Now finally fully initialize the swap client RPC server instance.
d.swapClientServer = swapClientServer{ d.swapClientServer = swapClientServer{
config: d.cfg, config: d.cfg,
network: lndclient.Network(d.cfg.Network), network: lndclient.Network(d.cfg.Network),
impl: swapClient, impl: swapClient,
liquidityMgr: getLiquidityManager(swapClient), liquidityMgr: getLiquidityManager(swapClient),
lnd: &d.lnd.LndServices, lnd: &d.lnd.LndServices,
swaps: make(map[lntypes.Hash]loop.SwapInfo), swaps: make(map[lntypes.Hash]loop.SwapInfo),
subscribers: make(map[int]chan<- interface{}), subscribers: make(map[int]chan<- interface{}),
statusChan: make(chan loop.SwapInfo), statusChan: make(chan loop.SwapInfo),
mainCtx: d.mainCtx, mainCtx: d.mainCtx,
reservationManager: d.reservationManager,
} }
// Retrieve all currently existing swaps from the database. // Retrieve all currently existing swaps from the database.
@ -543,6 +575,30 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
log.Info("Liquidity manager stopped") log.Info("Liquidity manager stopped")
}() }()
// Start the reservation manager.
d.wg.Add(1)
go func() {
defer d.wg.Done()
// We need to know the current block height to properly
// initialize the reservation manager.
getInfo, err := d.lnd.Client.GetInfo(d.mainCtx)
if err != nil {
d.internalErrChan <- err
return
}
log.Info("Starting reservation manager")
defer log.Info("Reservation manager stopped")
err = d.reservationManager.Run(
d.mainCtx, int32(getInfo.BlockHeight),
)
if err != nil && !errors.Is(err, context.Canceled) {
d.internalErrChan <- err
}
}()
// Last, start our internal error handler. This will return exactly one // Last, start our internal error handler. This will return exactly one
// error or nil on the main error channel to inform the caller that // error or nil on the main error channel to inform the caller that
// something went wrong or that shutdown is complete. We don't add to // something went wrong or that shutdown is complete. We don't add to

@ -6,6 +6,7 @@ import (
"github.com/lightninglabs/lndclient" "github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop" "github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/fsm" "github.com/lightninglabs/loop/fsm"
"github.com/lightninglabs/loop/instantout/reservation"
"github.com/lightninglabs/loop/liquidity" "github.com/lightninglabs/loop/liquidity"
"github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/loopdb"
"github.com/lightningnetwork/lnd" "github.com/lightningnetwork/lnd"
@ -38,6 +39,9 @@ func SetupLoggers(root *build.RotatingLogWriter, intercept signal.Interceptor) {
root, liquidity.Subsystem, intercept, liquidity.UseLogger, root, liquidity.Subsystem, intercept, liquidity.UseLogger,
) )
lnd.AddSubLogger(root, fsm.Subsystem, intercept, fsm.UseLogger) lnd.AddSubLogger(root, fsm.Subsystem, intercept, fsm.UseLogger)
lnd.AddSubLogger(
root, reservation.Subsystem, intercept, reservation.UseLogger,
)
} }
// genSubLogger creates a logger for a subsystem. We provide an instance of // genSubLogger creates a logger for a subsystem. We provide an instance of

@ -96,4 +96,8 @@ var RequiredPermissions = map[string][]bakery.Op{
Entity: "loop", Entity: "loop",
Action: "in", Action: "in",
}}, }},
"/looprpc.SwapClient/ListReservations": {{
Entity: "reservation",
Action: "read",
}},
} }

@ -18,6 +18,7 @@ import (
"github.com/lightninglabs/aperture/lsat" "github.com/lightninglabs/aperture/lsat"
"github.com/lightninglabs/lndclient" "github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop" "github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/instantout/reservation"
"github.com/lightninglabs/loop/labels" "github.com/lightninglabs/loop/labels"
"github.com/lightninglabs/loop/liquidity" "github.com/lightninglabs/loop/liquidity"
"github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/loopdb"
@ -74,17 +75,18 @@ type swapClientServer struct {
clientrpc.UnimplementedSwapClientServer clientrpc.UnimplementedSwapClientServer
clientrpc.UnimplementedDebugServer clientrpc.UnimplementedDebugServer
config *Config config *Config
network lndclient.Network network lndclient.Network
impl *loop.Client impl *loop.Client
liquidityMgr *liquidity.Manager liquidityMgr *liquidity.Manager
lnd *lndclient.LndServices lnd *lndclient.LndServices
swaps map[lntypes.Hash]loop.SwapInfo reservationManager *reservation.Manager
subscribers map[int]chan<- interface{} swaps map[lntypes.Hash]loop.SwapInfo
statusChan chan loop.SwapInfo subscribers map[int]chan<- interface{}
nextSubscriberID int statusChan chan loop.SwapInfo
swapsLock sync.Mutex nextSubscriberID int
mainCtx context.Context swapsLock sync.Mutex
mainCtx context.Context
} }
// LoopOut initiates a loop out swap with the given parameters. The call returns // LoopOut initiates a loop out swap with the given parameters. The call returns
@ -1138,6 +1140,25 @@ func (s *swapClientServer) SuggestSwaps(ctx context.Context,
return resp, nil return resp, nil
} }
// ListReservations lists all existing reservations the client has ever made.
func (s *swapClientServer) ListReservations(ctx context.Context,
_ *clientrpc.ListReservationsRequest) (
*clientrpc.ListReservationsResponse, error) {
reservations, err := s.reservationManager.GetReservations(
ctx,
)
if err != nil {
return nil, err
}
return &clientrpc.ListReservationsResponse{
Reservations: ToClientReservations(
reservations,
),
}, nil
}
func rpcAutoloopReason(reason liquidity.Reason) (clientrpc.AutoReason, error) { func rpcAutoloopReason(reason liquidity.Reason) (clientrpc.AutoReason, error) {
switch reason { switch reason {
case liquidity.ReasonNone: case liquidity.ReasonNone:
@ -1397,3 +1418,40 @@ func getPublicationDeadline(unixTimestamp uint64) time.Time {
return time.Unix(int64(unixTimestamp), 0) return time.Unix(int64(unixTimestamp), 0)
} }
} }
// ToClientReservations converts a slice of server
// reservations to a slice of client reservations.
func ToClientReservations(
res []*reservation.Reservation) []*clientrpc.ClientReservation {
var result []*clientrpc.ClientReservation
for _, r := range res {
result = append(result, toClientReservation(r))
}
return result
}
// toClientReservation converts a server reservation to a
// client reservation.
func toClientReservation(
res *reservation.Reservation) *clientrpc.ClientReservation {
var (
txid []byte
vout uint32
)
if res.Outpoint != nil {
txid = res.Outpoint.Hash[:]
vout = res.Outpoint.Index
}
return &clientrpc.ClientReservation{
ReservationId: res.ID[:],
State: string(res.State),
Amount: uint64(res.Value),
TxId: txid,
Vout: vout,
Expiry: res.Expiry,
}
}

Loading…
Cancel
Save