reservation: update reservation state machine

This commit updates the reservation statemachine to
allow for locking and spending of the
initial reservation.
pull/687/head
sputn1ck 8 months ago
parent a838570dce
commit 25faa48ad1
No known key found for this signature in database
GPG Key ID: 671103D881A5F0E4

@ -1,6 +1,8 @@
package reservation
import (
"context"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil"
"github.com/lightninglabs/loop/fsm"
@ -86,8 +88,8 @@ func (r *FSM) SubscribeToConfirmationAction(_ fsm.EventContext) fsm.EventType {
r.reservation.InitiationHeight)
confChan, errConfChan, err := r.cfg.ChainNotifier.RegisterConfirmationsNtfn(
r.ctx, nil, pkscript, DefaultConfTarget,
r.reservation.InitiationHeight,
r.ctx, nil, pkscript, 1,
r.reservation.InitiationHeight-1,
)
if err != nil {
r.Errorf("unable to subscribe to conf notification: %v", err)
@ -141,31 +143,85 @@ func (r *FSM) SubscribeToConfirmationAction(_ fsm.EventContext) fsm.EventType {
}
}
// ReservationConfirmedAction waits for the reservation to be either expired or
// waits for other actions to happen.
func (r *FSM) ReservationConfirmedAction(_ fsm.EventContext) fsm.EventType {
blockHeightChan, errEpochChan, err := r.cfg.ChainNotifier.
RegisterBlockEpochNtfn(r.ctx)
// AsyncWaitForExpiredOrSweptAction waits for the reservation to be either
// expired or swept. This is non-blocking and can be used to wait for the
// reservation to expire while expecting other events.
func (f *FSM) AsyncWaitForExpiredOrSweptAction(_ fsm.EventContext,
) fsm.EventType {
notifCtx, cancel := context.WithCancel(f.ctx)
blockHeightChan, errEpochChan, err := f.cfg.ChainNotifier.
RegisterBlockEpochNtfn(notifCtx)
if err != nil {
return r.HandleError(err)
cancel()
return f.HandleError(err)
}
for {
select {
case err := <-errEpochChan:
return r.HandleError(err)
pkScript, err := f.reservation.GetPkScript()
if err != nil {
cancel()
return f.HandleError(err)
}
case blockHeight := <-blockHeightChan:
expired := blockHeight >= int32(r.reservation.Expiry)
if expired {
r.Debugf("Reservation %v expired",
r.reservation.ID)
spendChan, errSpendChan, err := f.cfg.ChainNotifier.RegisterSpendNtfn(
notifCtx, f.reservation.Outpoint, pkScript,
f.reservation.InitiationHeight,
)
if err != nil {
cancel()
return f.HandleError(err)
}
return OnTimedOut
go func() {
defer cancel()
for {
select {
case err := <-errEpochChan:
f.handleAsyncError(err)
return
case err := <-errSpendChan:
f.handleAsyncError(err)
return
case blockHeight := <-blockHeightChan:
expired := blockHeight >= int32(
f.reservation.Expiry,
)
if expired {
f.Debugf("Reservation expired")
err := f.SendEvent(OnTimedOut, nil)
if err != nil {
f.Errorf("Error sending event:"+
" %v", err)
}
return
}
case <-spendChan:
f.Debugf("Reservation spent")
err := f.SendEvent(OnSpent, nil)
if err != nil {
f.Errorf("Error sending event: %v", err)
}
return
case <-f.ctx.Done():
return
}
case <-r.ctx.Done():
return fsm.NoOp
}
}()
return fsm.NoOp
}
func (f *FSM) handleAsyncError(err error) {
f.LastActionError = err
f.Errorf("Error on async action: %v", err)
err2 := f.SendEvent(fsm.OnError, err)
if err2 != nil {
f.Errorf("Error sending event: %v", err2)
}
}

@ -5,6 +5,7 @@ import (
"encoding/hex"
"errors"
"testing"
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil"
@ -304,25 +305,31 @@ func TestSubscribeToConfirmationAction(t *testing.T) {
}
}
// TestReservationConfirmedAction tests the ReservationConfirmedAction of the
// reservation state machine.
func TestReservationConfirmedAction(t *testing.T) {
// AsyncWaitForExpiredOrSweptAction tests the AsyncWaitForExpiredOrSweptAction
// of the reservation state machine.
func TestAsyncWaitForExpiredOrSweptAction(t *testing.T) {
tests := []struct {
name string
blockHeight int32
blockErr error
expectedEvent fsm.EventType
spendDetail *chainntnfs.SpendDetail
expectedState fsm.StateType
}{
{
name: "expired",
blockHeight: 100,
expectedEvent: OnTimedOut,
expectedState: TimedOut,
},
{
name: "block error",
blockHeight: 0,
blockErr: errors.New("block error"),
expectedEvent: fsm.OnError,
expectedState: Confirmed,
},
{
name: "spent",
expectedState: Spent,
spendDetail: &chainntnfs.SpendDetail{},
},
}
@ -336,7 +343,9 @@ func TestReservationConfirmedAction(t *testing.T) {
ChainNotifier: chainNotifier,
},
&Reservation{
Expiry: defaultExpiry,
ServerPubkey: defaultPubkey,
ClientPubkey: defaultPubkey,
Expiry: defaultExpiry,
},
)
@ -361,9 +370,31 @@ func TestReservationConfirmedAction(t *testing.T) {
}
}()
eventType := r.ReservationConfirmedAction(nil)
spendChan := make(chan *chainntnfs.SpendDetail)
spendErrChan := make(chan error)
chainNotifier.On(
"RegisterSpendNtfn", mock.Anything,
mock.Anything, mock.Anything,
).Return(
spendChan, spendErrChan, nil,
)
go func() {
// Send the spend notification.
if tc.spendDetail != nil {
spendChan <- tc.spendDetail
}
}()
eventType := r.AsyncWaitForExpiredOrSweptAction(nil)
// Assert that the return value is as expected
require.Equal(t, tc.expectedEvent, eventType)
require.Equal(t, fsm.NoOp, eventType)
// Assert that the state is as expected
r.DefaultObserver.WaitForState(
r.ctx, time.Second*5, tc.expectedState,
)
// Assert that the expected functions were called on the mocks
chainNotifier.AssertExpectations(t)

@ -90,8 +90,16 @@ var (
// Failed is the state where the reservation has failed.
Failed = fsm.StateType("Failed")
// Swept is the state where the reservation has been swept by the server.
// Spent is the state where a spend tx has been confirmed.
Spent = fsm.StateType("Spent")
// Swept is the state where the reservation has been swept by the
// server.
Swept = fsm.StateType("Swept")
// Locked is the state where the reservation is locked and can't be
// used for instant out swaps.
Locked = fsm.StateType("Locked")
)
// Events.
@ -119,6 +127,18 @@ var (
// OnRecover is the event that is triggered when the reservation FSM
// recovers from a restart.
OnRecover = fsm.EventType("OnRecover")
// OnSpent is the event that is triggered when the reservation has been
// spent.
OnSpent = fsm.EventType("OnSpent")
// OnLocked is the event that is triggered when the reservation has
// been locked.
OnLocked = fsm.EventType("OnLocked")
// OnUnlocked is the event that is triggered when the reservation has
// been unlocked.
OnUnlocked = fsm.EventType("OnUnlocked")
)
// GetReservationStates returns the statemap that defines the reservation
@ -149,14 +169,32 @@ func (f *FSM) GetReservationStates() fsm.States {
},
Confirmed: fsm.State{
Transitions: fsm.Transitions{
OnTimedOut: TimedOut,
OnRecover: Confirmed,
OnSpent: Spent,
OnTimedOut: TimedOut,
OnRecover: Confirmed,
OnLocked: Locked,
fsm.OnError: Confirmed,
},
Action: f.AsyncWaitForExpiredOrSweptAction,
},
Locked: fsm.State{
Transitions: fsm.Transitions{
OnUnlocked: Confirmed,
OnTimedOut: TimedOut,
OnRecover: Locked,
OnSpent: Spent,
fsm.OnError: Locked,
},
Action: f.ReservationConfirmedAction,
Action: f.AsyncWaitForExpiredOrSweptAction,
},
TimedOut: fsm.State{
Action: fsm.NoOpAction,
},
Spent: fsm.State{
Action: fsm.NoOpAction,
},
Failed: fsm.State{
Action: fsm.NoOpAction,
},
@ -218,7 +256,7 @@ func (r *FSM) Errorf(format string, args ...interface{}) {
// isFinalState returns true if the state is a final state.
func isFinalState(state fsm.StateType) bool {
switch state {
case Failed, Swept, TimedOut:
case Failed, Swept, TimedOut, Spent:
return true
}
return false

@ -2,6 +2,8 @@ package reservation
import (
"context"
"fmt"
"strings"
"sync"
"time"
@ -263,3 +265,54 @@ func (m *Manager) RecoverReservations(ctx context.Context) error {
func (m *Manager) GetReservations(ctx context.Context) ([]*Reservation, error) {
return m.cfg.Store.ListReservations(ctx)
}
// GetReservation returns the reservation for the given id.
func (m *Manager) GetReservation(ctx context.Context, id ID) (*Reservation,
error) {
return m.cfg.Store.GetReservation(ctx, id)
}
// LockReservation locks the reservation with the given ID.
func (m *Manager) LockReservation(ctx context.Context, id ID) error {
// Try getting the reservation from the active reservations map.
m.Lock()
reservation, ok := m.activeReservations[id]
m.Unlock()
if !ok {
return fmt.Errorf("reservation not found")
}
// Try to send the lock event to the reservation.
err := reservation.SendEvent(OnLocked, nil)
if err != nil {
return err
}
return nil
}
// UnlockReservation unlocks the reservation with the given ID.
func (m *Manager) UnlockReservation(ctx context.Context, id ID) error {
// Try getting the reservation from the active reservations map.
m.Lock()
reservation, ok := m.activeReservations[id]
m.Unlock()
if !ok {
return fmt.Errorf("reservation not found")
}
// Try to send the unlock event to the reservation.
err := reservation.SendEvent(OnUnlocked, nil)
if err != nil && strings.Contains(err.Error(), "config error") {
// If the error is a config error, we can ignore it, as the
// reservation is already unlocked.
return nil
} else if err != nil {
return err
}
return nil
}

@ -2,6 +2,7 @@
stateDiagram-v2
[*] --> Init: OnServerRequest
Confirmed
Confirmed --> SpendBroadcasted: OnSpendBroadcasted
Confirmed --> TimedOut: OnTimedOut
Confirmed --> Confirmed: OnRecover
Failed
@ -9,6 +10,9 @@ Init
Init --> Failed: OnError
Init --> WaitForConfirmation: OnBroadcast
Init --> Failed: OnRecover
SpendBroadcasted
SpendBroadcasted --> SpendConfirmed: OnSpendConfirmed
SpendConfirmed
TimedOut
WaitForConfirmation
WaitForConfirmation --> WaitForConfirmation: OnRecover

Loading…
Cancel
Save