From b7c1e68f46b588167949a1610bf50e832cf35678 Mon Sep 17 00:00:00 2001 From: sputn1ck Date: Wed, 25 Oct 2023 23:30:53 +0200 Subject: [PATCH] instantout: add instantout manager --- instantout/manager.go | 201 ++++++++++++++++++++++++++++++ instantout/reservation/manager.go | 49 ++++---- 2 files changed, 226 insertions(+), 24 deletions(-) create mode 100644 instantout/manager.go diff --git a/instantout/manager.go b/instantout/manager.go new file mode 100644 index 0000000..ab4aad4 --- /dev/null +++ b/instantout/manager.go @@ -0,0 +1,201 @@ +package instantout + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/lightninglabs/loop/instantout/reservation" + "github.com/lightningnetwork/lnd/lntypes" +) + +var ( + defaultStateWaitTime = 30 * time.Second + defaultCltv = 100 + ErrSwapDoesNotExist = errors.New("swap does not exist") +) + +// Manager manages the instantout state machines. +type Manager struct { + // cfg contains all the services that the reservation manager needs to + // operate. + cfg *Config + + // activeInstantOuts contains all the active instantouts. + activeInstantOuts map[lntypes.Hash]*FSM + + // currentHeight stores the currently best known block height. + currentHeight int32 + + // blockEpochChan receives new block heights. + blockEpochChan chan int32 + + runCtx context.Context + + sync.Mutex +} + +// NewInstantOutManager creates a new instantout manager. +func NewInstantOutManager(cfg *Config) *Manager { + return &Manager{ + cfg: cfg, + activeInstantOuts: make(map[lntypes.Hash]*FSM), + blockEpochChan: make(chan int32), + } +} + +// Run runs the instantout manager. +func (m *Manager) Run(ctx context.Context, initChan chan struct{}, + height int32) error { + + log.Debugf("Starting instantout manager") + defer func() { + log.Debugf("Stopping instantout manager") + }() + + runCtx, cancel := context.WithCancel(ctx) + defer cancel() + + m.runCtx = runCtx + m.currentHeight = height + + err := m.recoverInstantOuts(runCtx) + if err != nil { + close(initChan) + return err + } + + newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier. + RegisterBlockEpochNtfn(ctx) + if err != nil { + close(initChan) + return err + } + + close(initChan) + + for { + select { + case <-runCtx.Done(): + return nil + + case height := <-newBlockChan: + m.Lock() + m.currentHeight = height + m.Unlock() + + case err := <-newBlockErrChan: + return err + } + } +} + +// recoverInstantOuts recovers all the active instantouts from the database. +func (m *Manager) recoverInstantOuts(ctx context.Context) error { + // Fetch all the active instantouts from the database. + activeInstantOuts, err := m.cfg.Store.ListInstantLoopOuts(ctx) + if err != nil { + return err + } + + for _, instantOut := range activeInstantOuts { + if isFinalState(instantOut.State) { + continue + } + + log.Debugf("Recovering instantout %v", instantOut.SwapHash) + + instantOutFSM, err := NewFSMFromInstantOut( + ctx, m.cfg, instantOut, + ) + if err != nil { + return err + } + + m.activeInstantOuts[instantOut.SwapHash] = instantOutFSM + + // As SendEvent can block, we'll start a goroutine to process + // the event. + go func() { + err := instantOutFSM.SendEvent(OnRecover, nil) + if err != nil { + log.Errorf("FSM %v Error sending recover "+ + "event %v, state: %v", + instantOutFSM.InstantOut.SwapHash, err, + instantOutFSM.InstantOut.State) + } + }() + } + + return nil +} + +// NewInstantOut creates a new instantout. +func (m *Manager) NewInstantOut(ctx context.Context, + reservations []reservation.ID) (*FSM, error) { + + m.Lock() + // Create the instantout request. + request := &InitInstantOutCtx{ + cltvExpiry: m.currentHeight + int32(defaultCltv), + reservations: reservations, + initationHeight: m.currentHeight, + protocolVersion: CurrentProtocolVersion(), + } + + instantOut, err := NewFSM( + m.runCtx, m.cfg, ProtocolVersionFullReservation, + ) + if err != nil { + m.Unlock() + return nil, err + } + m.activeInstantOuts[instantOut.InstantOut.SwapHash] = instantOut + m.Unlock() + + // Start the instantout FSM. + go func() { + err := instantOut.SendEvent(OnStart, request) + if err != nil { + log.Errorf("Error sending event: %v", err) + } + }() + + // If everything went well, we'll wait for the instant out to be + // waiting for sweepless sweep to be confirmed. + err = instantOut.DefaultObserver.WaitForState( + ctx, defaultStateWaitTime, WaitForSweeplessSweepConfirmed, + ) + if err != nil { + if instantOut.LastActionError != nil { + return instantOut, fmt.Errorf( + "error waiting for sweepless sweep "+ + "confirmed: %w", instantOut.LastActionError, + ) + } + return instantOut, nil + } + + return instantOut, nil +} + +// GetActiveInstantOut returns an active instant out. +func (m *Manager) GetActiveInstantOut(swapHash lntypes.Hash) (*FSM, error) { + m.Lock() + defer m.Unlock() + + fsm, ok := m.activeInstantOuts[swapHash] + if !ok { + return nil, ErrSwapDoesNotExist + } + + // If the instant out is in a final state, we'll remove it from the + // active instant outs. + if isFinalState(fsm.InstantOut.State) { + delete(m.activeInstantOuts, swapHash) + } + + return fsm, nil +} diff --git a/instantout/reservation/manager.go b/instantout/reservation/manager.go index 655b5f9..58cec31 100644 --- a/instantout/reservation/manager.go +++ b/instantout/reservation/manager.go @@ -23,6 +23,8 @@ type Manager struct { // activeReservations contains all the active reservationsFSMs. activeReservations map[ID]*FSM + runCtx context.Context + sync.Mutex } @@ -41,6 +43,7 @@ func (m *Manager) Run(ctx context.Context, height int32) error { runCtx, cancel := context.WithCancel(ctx) defer cancel() + m.runCtx = runCtx currentHeight := height err := m.RecoverReservations(runCtx) @@ -58,7 +61,7 @@ func (m *Manager) Run(ctx context.Context, height int32) error { chan *reservationrpc.ServerReservationNotification, ) - err = m.RegisterReservationNotifications(runCtx, reservationResChan) + err = m.RegisterReservationNotifications(reservationResChan) if err != nil { return err } @@ -155,25 +158,29 @@ func (m *Manager) newReservation(ctx context.Context, currentHeight uint32, // RegisterReservationNotifications registers a new reservation notification // stream. func (m *Manager) RegisterReservationNotifications( - ctx context.Context, reservationChan chan *reservationrpc. - ServerReservationNotification) error { + reservationChan chan *reservationrpc.ServerReservationNotification) error { // In order to create a valid lsat we first are going to call // the FetchL402 method. - err := m.cfg.FetchL402(ctx) + err := m.cfg.FetchL402(m.runCtx) if err != nil { return err } + ctx, cancel := context.WithCancel(m.runCtx) + // We'll now subscribe to the reservation notifications. reservationStream, err := m.cfg.ReservationClient. ReservationNotificationStream( ctx, &reservationrpc.ReservationNotificationRequest{}, ) if err != nil { + cancel() return err } + log.Debugf("Successfully subscribed to reservation notifications") + // We'll now start a goroutine that will forward all the reservation // notifications to the reservationChan. go func() { @@ -188,36 +195,30 @@ func (m *Manager) RegisterReservationNotifications( log.Errorf("Error receiving "+ "reservation: %v", err) - reconnectTimer := time.NewTimer(time.Second * 10) + cancel() // If we encounter an error, we'll // try to reconnect. for { select { - case <-ctx.Done(): + case <-m.runCtx.Done(): return - case <-reconnectTimer.C: + + case <-time.After(time.Second * 10): + log.Debugf("Reconnecting to " + + "reservation notifications") err = m.RegisterReservationNotifications( - ctx, reservationChan, + reservationChan, ) - if err == nil { - log.Debugf( - "Successfully " + - "reconnected", - ) - reconnectTimer.Stop() - // If we were able to - // reconnect, we'll - // return. - return + if err != nil { + log.Errorf("Error "+ + "reconnecting: %v", err) + continue } - log.Errorf("Error "+ - "reconnecting: %v", - err) - reconnectTimer.Reset( - time.Second * 10, - ) + // If we were able to reconnect, we'll + // return. + return } } }