instantout: add instantout manager

pull/651/head
sputn1ck 6 months ago
parent ee0309f942
commit b7c1e68f46
No known key found for this signature in database
GPG Key ID: 671103D881A5F0E4

@ -0,0 +1,201 @@
package instantout
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/lightninglabs/loop/instantout/reservation"
"github.com/lightningnetwork/lnd/lntypes"
)
var (
defaultStateWaitTime = 30 * time.Second
defaultCltv = 100
ErrSwapDoesNotExist = errors.New("swap does not exist")
)
// Manager manages the instantout state machines.
type Manager struct {
// cfg contains all the services that the reservation manager needs to
// operate.
cfg *Config
// activeInstantOuts contains all the active instantouts.
activeInstantOuts map[lntypes.Hash]*FSM
// currentHeight stores the currently best known block height.
currentHeight int32
// blockEpochChan receives new block heights.
blockEpochChan chan int32
runCtx context.Context
sync.Mutex
}
// NewInstantOutManager creates a new instantout manager.
func NewInstantOutManager(cfg *Config) *Manager {
return &Manager{
cfg: cfg,
activeInstantOuts: make(map[lntypes.Hash]*FSM),
blockEpochChan: make(chan int32),
}
}
// Run runs the instantout manager.
func (m *Manager) Run(ctx context.Context, initChan chan struct{},
height int32) error {
log.Debugf("Starting instantout manager")
defer func() {
log.Debugf("Stopping instantout manager")
}()
runCtx, cancel := context.WithCancel(ctx)
defer cancel()
m.runCtx = runCtx
m.currentHeight = height
err := m.recoverInstantOuts(runCtx)
if err != nil {
close(initChan)
return err
}
newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.
RegisterBlockEpochNtfn(ctx)
if err != nil {
close(initChan)
return err
}
close(initChan)
for {
select {
case <-runCtx.Done():
return nil
case height := <-newBlockChan:
m.Lock()
m.currentHeight = height
m.Unlock()
case err := <-newBlockErrChan:
return err
}
}
}
// recoverInstantOuts recovers all the active instantouts from the database.
func (m *Manager) recoverInstantOuts(ctx context.Context) error {
// Fetch all the active instantouts from the database.
activeInstantOuts, err := m.cfg.Store.ListInstantLoopOuts(ctx)
if err != nil {
return err
}
for _, instantOut := range activeInstantOuts {
if isFinalState(instantOut.State) {
continue
}
log.Debugf("Recovering instantout %v", instantOut.SwapHash)
instantOutFSM, err := NewFSMFromInstantOut(
ctx, m.cfg, instantOut,
)
if err != nil {
return err
}
m.activeInstantOuts[instantOut.SwapHash] = instantOutFSM
// As SendEvent can block, we'll start a goroutine to process
// the event.
go func() {
err := instantOutFSM.SendEvent(OnRecover, nil)
if err != nil {
log.Errorf("FSM %v Error sending recover "+
"event %v, state: %v",
instantOutFSM.InstantOut.SwapHash, err,
instantOutFSM.InstantOut.State)
}
}()
}
return nil
}
// NewInstantOut creates a new instantout.
func (m *Manager) NewInstantOut(ctx context.Context,
reservations []reservation.ID) (*FSM, error) {
m.Lock()
// Create the instantout request.
request := &InitInstantOutCtx{
cltvExpiry: m.currentHeight + int32(defaultCltv),
reservations: reservations,
initationHeight: m.currentHeight,
protocolVersion: CurrentProtocolVersion(),
}
instantOut, err := NewFSM(
m.runCtx, m.cfg, ProtocolVersionFullReservation,
)
if err != nil {
m.Unlock()
return nil, err
}
m.activeInstantOuts[instantOut.InstantOut.SwapHash] = instantOut
m.Unlock()
// Start the instantout FSM.
go func() {
err := instantOut.SendEvent(OnStart, request)
if err != nil {
log.Errorf("Error sending event: %v", err)
}
}()
// If everything went well, we'll wait for the instant out to be
// waiting for sweepless sweep to be confirmed.
err = instantOut.DefaultObserver.WaitForState(
ctx, defaultStateWaitTime, WaitForSweeplessSweepConfirmed,
)
if err != nil {
if instantOut.LastActionError != nil {
return instantOut, fmt.Errorf(
"error waiting for sweepless sweep "+
"confirmed: %w", instantOut.LastActionError,
)
}
return instantOut, nil
}
return instantOut, nil
}
// GetActiveInstantOut returns an active instant out.
func (m *Manager) GetActiveInstantOut(swapHash lntypes.Hash) (*FSM, error) {
m.Lock()
defer m.Unlock()
fsm, ok := m.activeInstantOuts[swapHash]
if !ok {
return nil, ErrSwapDoesNotExist
}
// If the instant out is in a final state, we'll remove it from the
// active instant outs.
if isFinalState(fsm.InstantOut.State) {
delete(m.activeInstantOuts, swapHash)
}
return fsm, nil
}

@ -23,6 +23,8 @@ type Manager struct {
// activeReservations contains all the active reservationsFSMs.
activeReservations map[ID]*FSM
runCtx context.Context
sync.Mutex
}
@ -41,6 +43,7 @@ func (m *Manager) Run(ctx context.Context, height int32) error {
runCtx, cancel := context.WithCancel(ctx)
defer cancel()
m.runCtx = runCtx
currentHeight := height
err := m.RecoverReservations(runCtx)
@ -58,7 +61,7 @@ func (m *Manager) Run(ctx context.Context, height int32) error {
chan *reservationrpc.ServerReservationNotification,
)
err = m.RegisterReservationNotifications(runCtx, reservationResChan)
err = m.RegisterReservationNotifications(reservationResChan)
if err != nil {
return err
}
@ -155,25 +158,29 @@ func (m *Manager) newReservation(ctx context.Context, currentHeight uint32,
// RegisterReservationNotifications registers a new reservation notification
// stream.
func (m *Manager) RegisterReservationNotifications(
ctx context.Context, reservationChan chan *reservationrpc.
ServerReservationNotification) error {
reservationChan chan *reservationrpc.ServerReservationNotification) error {
// In order to create a valid lsat we first are going to call
// the FetchL402 method.
err := m.cfg.FetchL402(ctx)
err := m.cfg.FetchL402(m.runCtx)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(m.runCtx)
// We'll now subscribe to the reservation notifications.
reservationStream, err := m.cfg.ReservationClient.
ReservationNotificationStream(
ctx, &reservationrpc.ReservationNotificationRequest{},
)
if err != nil {
cancel()
return err
}
log.Debugf("Successfully subscribed to reservation notifications")
// We'll now start a goroutine that will forward all the reservation
// notifications to the reservationChan.
go func() {
@ -188,36 +195,30 @@ func (m *Manager) RegisterReservationNotifications(
log.Errorf("Error receiving "+
"reservation: %v", err)
reconnectTimer := time.NewTimer(time.Second * 10)
cancel()
// If we encounter an error, we'll
// try to reconnect.
for {
select {
case <-ctx.Done():
case <-m.runCtx.Done():
return
case <-reconnectTimer.C:
case <-time.After(time.Second * 10):
log.Debugf("Reconnecting to " +
"reservation notifications")
err = m.RegisterReservationNotifications(
ctx, reservationChan,
reservationChan,
)
if err == nil {
log.Debugf(
"Successfully " +
"reconnected",
)
reconnectTimer.Stop()
// If we were able to
// reconnect, we'll
// return.
return
if err != nil {
log.Errorf("Error "+
"reconnecting: %v", err)
continue
}
log.Errorf("Error "+
"reconnecting: %v",
err)
reconnectTimer.Reset(
time.Second * 10,
)
// If we were able to reconnect, we'll
// return.
return
}
}
}

Loading…
Cancel
Save