diff --git a/instantout/reservation/manager.go b/instantout/reservation/manager.go index 64c88e4..58baeae 100644 --- a/instantout/reservation/manager.go +++ b/instantout/reservation/manager.go @@ -2,6 +2,7 @@ package reservation import ( "context" + "fmt" "sync" "time" @@ -24,8 +25,8 @@ type Manager struct { sync.Mutex } -// NewReservationManager creates a new reservation manager. -func NewReservationManager(cfg *Config) *Manager { +// NewManager creates a new reservation manager. +func NewManager(cfg *Config) *Manager { return &Manager{ cfg: cfg, activeReservations: make(map[ID]*FSM), @@ -71,7 +72,7 @@ func (m *Manager) Run(ctx context.Context, height int32) error { case reservationRes := <-reservationResChan: log.Debugf("Received reservation %x", reservationRes.ReservationId) - err := m.newReservation( + _, err := m.newReservation( runCtx, uint32(currentHeight), reservationRes, ) if err != nil { @@ -90,19 +91,19 @@ func (m *Manager) Run(ctx context.Context, height int32) error { // newReservation creates a new reservation from the reservation request. func (m *Manager) newReservation(ctx context.Context, currentHeight uint32, - req *reservationrpc.ServerReservationNotification) error { + req *reservationrpc.ServerReservationNotification) (*FSM, error) { var reservationID ID err := reservationID.FromByteSlice( req.ReservationId, ) if err != nil { - return err + return nil, err } serverKey, err := btcec.ParsePubKey(req.ServerKey) if err != nil { - return err + return nil, err } // Create the reservation state machine. We need to pass in the runCtx @@ -136,14 +137,19 @@ func (m *Manager) newReservation(ctx context.Context, currentHeight uint32, // We'll now wait for the reservation to be in the state where it is // waiting to be confirmed. err = reservationFSM.DefaultObserver.WaitForState( - ctx, time.Minute, WaitForConfirmation, + ctx, 5*time.Second, WaitForConfirmation, fsm.WithWaitForStateOption(time.Second), ) if err != nil { - return err + if reservationFSM.LastActionError != nil { + return nil, fmt.Errorf("error waiting for "+ + "state: %v, last action error: %v", + err, reservationFSM.LastActionError) + } + return nil, err } - return nil + return reservationFSM, nil } // RegisterReservationNotifications registers a new reservation notification diff --git a/loopd/daemon.go b/loopd/daemon.go index 0acbb01..d3f48be 100644 --- a/loopd/daemon.go +++ b/loopd/daemon.go @@ -18,7 +18,11 @@ import ( "github.com/lightninglabs/loop" "github.com/lightninglabs/loop/loopd/perms" "github.com/lightninglabs/loop/loopdb" - "github.com/lightninglabs/loop/looprpc" + + "github.com/lightninglabs/loop/instantout/reservation" + loop_looprpc "github.com/lightninglabs/loop/looprpc" + + loop_swaprpc "github.com/lightninglabs/loop/swapserverrpc" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/macaroons" "google.golang.org/grpc" @@ -62,6 +66,10 @@ type Daemon struct { // same process. swapClientServer + // reservationManager is the manager that handles all reservation state + // machines. + reservationManager *reservation.Manager + // ErrChan is an error channel that users of the Daemon struct must use // to detect runtime errors and also whether a shutdown is fully // completed. @@ -226,7 +234,7 @@ func (d *Daemon) startWebServers() error { grpc.UnaryInterceptor(unaryInterceptor), grpc.StreamInterceptor(streamInterceptor), ) - looprpc.RegisterSwapClientServer(d.grpcServer, d) + loop_looprpc.RegisterSwapClientServer(d.grpcServer, d) // Register our debug server if it is compiled in. d.registerDebugServer() @@ -286,7 +294,7 @@ func (d *Daemon) startWebServers() error { restProxyDest, "[::]", "[::1]", 1, ) } - err = looprpc.RegisterSwapClientHandlerFromEndpoint( + err = loop_looprpc.RegisterSwapClientHandlerFromEndpoint( ctx, mux, restProxyDest, proxyOpts, ) if err != nil { @@ -399,7 +407,7 @@ func (d *Daemon) initialize(withMacaroonService bool) error { return err } - swapDb, _, err := openDatabase(d.cfg, chainParams) + swapDb, baseDb, err := openDatabase(d.cfg, chainParams) if err != nil { return err } @@ -413,6 +421,15 @@ func (d *Daemon) initialize(withMacaroonService bool) error { } d.clientCleanup = clientCleanup + // Create a reservation server client. + reservationClient := loop_swaprpc.NewReservationServiceClient( + swapClient.Conn, + ) + + // Both the client RPC server and the swap server client should stop + // on main context cancel. So we create it early and pass it down. + d.mainCtx, d.mainCtxCancel = context.WithCancel(context.Background()) + // Add our debug permissions to our main set of required permissions // if compiled in. for endpoint, perm := range debugRequiredPermissions { @@ -466,17 +483,32 @@ func (d *Daemon) initialize(withMacaroonService bool) error { } } + // Create the reservation rpc server. + reservationStore := reservation.NewSQLStore(baseDb) + reservationConfig := &reservation.Config{ + Store: reservationStore, + Wallet: d.lnd.WalletKit, + ChainNotifier: d.lnd.ChainNotifier, + ReservationClient: reservationClient, + FetchL402: swapClient.Server.FetchL402, + } + + d.reservationManager = reservation.NewManager( + reservationConfig, + ) + // Now finally fully initialize the swap client RPC server instance. d.swapClientServer = swapClientServer{ - config: d.cfg, - network: lndclient.Network(d.cfg.Network), - impl: swapClient, - liquidityMgr: getLiquidityManager(swapClient), - lnd: &d.lnd.LndServices, - swaps: make(map[lntypes.Hash]loop.SwapInfo), - subscribers: make(map[int]chan<- interface{}), - statusChan: make(chan loop.SwapInfo), - mainCtx: d.mainCtx, + config: d.cfg, + network: lndclient.Network(d.cfg.Network), + impl: swapClient, + liquidityMgr: getLiquidityManager(swapClient), + lnd: &d.lnd.LndServices, + swaps: make(map[lntypes.Hash]loop.SwapInfo), + subscribers: make(map[int]chan<- interface{}), + statusChan: make(chan loop.SwapInfo), + mainCtx: d.mainCtx, + reservationManager: d.reservationManager, } // Retrieve all currently existing swaps from the database. @@ -543,6 +575,30 @@ func (d *Daemon) initialize(withMacaroonService bool) error { log.Info("Liquidity manager stopped") }() + // Start the reservation manager. + d.wg.Add(1) + go func() { + defer d.wg.Done() + + // We need to know the current block height to properly + // initialize the reservation manager. + getInfo, err := d.lnd.Client.GetInfo(d.mainCtx) + if err != nil { + d.internalErrChan <- err + return + } + + log.Info("Starting reservation manager") + defer log.Info("Reservation manager stopped") + + err = d.reservationManager.Run( + d.mainCtx, int32(getInfo.BlockHeight), + ) + if err != nil && !errors.Is(err, context.Canceled) { + d.internalErrChan <- err + } + }() + // Last, start our internal error handler. This will return exactly one // error or nil on the main error channel to inform the caller that // something went wrong or that shutdown is complete. We don't add to diff --git a/loopd/log.go b/loopd/log.go index 7a0ce47..b9ccb74 100644 --- a/loopd/log.go +++ b/loopd/log.go @@ -6,6 +6,7 @@ import ( "github.com/lightninglabs/lndclient" "github.com/lightninglabs/loop" "github.com/lightninglabs/loop/fsm" + "github.com/lightninglabs/loop/instantout/reservation" "github.com/lightninglabs/loop/liquidity" "github.com/lightninglabs/loop/loopdb" "github.com/lightningnetwork/lnd" @@ -38,6 +39,9 @@ func SetupLoggers(root *build.RotatingLogWriter, intercept signal.Interceptor) { root, liquidity.Subsystem, intercept, liquidity.UseLogger, ) lnd.AddSubLogger(root, fsm.Subsystem, intercept, fsm.UseLogger) + lnd.AddSubLogger( + root, reservation.Subsystem, intercept, reservation.UseLogger, + ) } // genSubLogger creates a logger for a subsystem. We provide an instance of diff --git a/loopd/perms/perms.go b/loopd/perms/perms.go index 90b7aa2..6e2d078 100644 --- a/loopd/perms/perms.go +++ b/loopd/perms/perms.go @@ -96,4 +96,8 @@ var RequiredPermissions = map[string][]bakery.Op{ Entity: "loop", Action: "in", }}, + "/looprpc.SwapClient/ListReservations": {{ + Entity: "reservation", + Action: "read", + }}, } diff --git a/loopd/swapclient_server.go b/loopd/swapclient_server.go index 14de0eb..31b27db 100644 --- a/loopd/swapclient_server.go +++ b/loopd/swapclient_server.go @@ -18,6 +18,7 @@ import ( "github.com/lightninglabs/aperture/lsat" "github.com/lightninglabs/lndclient" "github.com/lightninglabs/loop" + "github.com/lightninglabs/loop/instantout/reservation" "github.com/lightninglabs/loop/labels" "github.com/lightninglabs/loop/liquidity" "github.com/lightninglabs/loop/loopdb" @@ -74,17 +75,18 @@ type swapClientServer struct { clientrpc.UnimplementedSwapClientServer clientrpc.UnimplementedDebugServer - config *Config - network lndclient.Network - impl *loop.Client - liquidityMgr *liquidity.Manager - lnd *lndclient.LndServices - swaps map[lntypes.Hash]loop.SwapInfo - subscribers map[int]chan<- interface{} - statusChan chan loop.SwapInfo - nextSubscriberID int - swapsLock sync.Mutex - mainCtx context.Context + config *Config + network lndclient.Network + impl *loop.Client + liquidityMgr *liquidity.Manager + lnd *lndclient.LndServices + reservationManager *reservation.Manager + swaps map[lntypes.Hash]loop.SwapInfo + subscribers map[int]chan<- interface{} + statusChan chan loop.SwapInfo + nextSubscriberID int + swapsLock sync.Mutex + mainCtx context.Context } // LoopOut initiates a loop out swap with the given parameters. The call returns @@ -1138,6 +1140,25 @@ func (s *swapClientServer) SuggestSwaps(ctx context.Context, return resp, nil } +// ListReservations lists all existing reservations the client has ever made. +func (s *swapClientServer) ListReservations(ctx context.Context, + _ *clientrpc.ListReservationsRequest) ( + *clientrpc.ListReservationsResponse, error) { + + reservations, err := s.reservationManager.GetReservations( + ctx, + ) + if err != nil { + return nil, err + } + + return &clientrpc.ListReservationsResponse{ + Reservations: ToClientReservations( + reservations, + ), + }, nil +} + func rpcAutoloopReason(reason liquidity.Reason) (clientrpc.AutoReason, error) { switch reason { case liquidity.ReasonNone: @@ -1397,3 +1418,40 @@ func getPublicationDeadline(unixTimestamp uint64) time.Time { return time.Unix(int64(unixTimestamp), 0) } } + +// ToClientReservations converts a slice of server +// reservations to a slice of client reservations. +func ToClientReservations( + res []*reservation.Reservation) []*clientrpc.ClientReservation { + + var result []*clientrpc.ClientReservation + for _, r := range res { + result = append(result, toClientReservation(r)) + } + + return result +} + +// toClientReservation converts a server reservation to a +// client reservation. +func toClientReservation( + res *reservation.Reservation) *clientrpc.ClientReservation { + + var ( + txid []byte + vout uint32 + ) + if res.Outpoint != nil { + txid = res.Outpoint.Hash[:] + vout = res.Outpoint.Index + } + + return &clientrpc.ClientReservation{ + ReservationId: res.ID[:], + State: string(res.State), + Amount: uint64(res.Value), + TxId: txid, + Vout: vout, + Expiry: res.Expiry, + } +}