diff --git a/liquidity/liquidity.go b/liquidity/liquidity.go index 361736a..97548b9 100644 --- a/liquidity/liquidity.go +++ b/liquidity/liquidity.go @@ -1084,17 +1084,28 @@ func (m *Manager) currentSwapTraffic(loopOut []*loopdb.LoopOut, } for _, in := range loopIn { - // Skip completed swaps, they can't affect our channel balances. - if in.State().State.Type() != loopdb.StateTypePending { - continue - } - // Skip over swaps that may come through any peer. if in.Contract.LastHop == nil { continue } - traffic.ongoingLoopIn[*in.Contract.LastHop] = true + pubkey := *in.Contract.LastHop + + switch { + // Include any pending swaps in our ongoing set of swaps. + case in.State().State.Type() == loopdb.StateTypePending: + traffic.ongoingLoopIn[pubkey] = true + + // If a swap failed with an on-chain timeout, the server could + // not route to us. We add it to our backoff list so that + // there's some time for routing conditions to improve. + case in.State().State == loopdb.StateFailTimeout: + failedAt := in.LastUpdate().Time + + if failedAt.After(failureCutoff) { + traffic.failedLoopIn[pubkey] = failedAt + } + } } return traffic @@ -1105,6 +1116,7 @@ type swapTraffic struct { ongoingLoopOut map[lnwire.ShortChannelID]bool ongoingLoopIn map[route.Vertex]bool failedLoopOut map[lnwire.ShortChannelID]time.Time + failedLoopIn map[route.Vertex]time.Time } func newSwapTraffic() *swapTraffic { @@ -1112,6 +1124,7 @@ func newSwapTraffic() *swapTraffic { ongoingLoopOut: make(map[lnwire.ShortChannelID]bool), ongoingLoopIn: make(map[route.Vertex]bool), failedLoopOut: make(map[lnwire.ShortChannelID]time.Time), + failedLoopIn: make(map[route.Vertex]time.Time), } } diff --git a/liquidity/liquidity_test.go b/liquidity/liquidity_test.go index 076d473..203e2c4 100644 --- a/liquidity/liquidity_test.go +++ b/liquidity/liquidity_test.go @@ -1817,3 +1817,184 @@ func testSuggestSwaps(t *testing.T, setup *testSuggestSwapsSetup, require.Equal(t, expectedErr, err) require.Equal(t, expected, actual) } + +// TestCurrentTraffic tests recording of our current set of ongoing swaps. +func TestCurrentTraffic(t *testing.T) { + var ( + backoff = time.Hour * 5 + withinBackoff = testTime.Add(time.Hour * -1) + outsideBackoff = testTime.Add(backoff * -2) + + success = []*loopdb.LoopEvent{ + { + SwapStateData: loopdb.SwapStateData{ + State: loopdb.StateSuccess, + }, + }, + } + + failedInBackoff = []*loopdb.LoopEvent{ + { + SwapStateData: loopdb.SwapStateData{ + State: loopdb.StateFailOffchainPayments, + }, + Time: withinBackoff, + }, + } + + failedOutsideBackoff = []*loopdb.LoopEvent{ + { + SwapStateData: loopdb.SwapStateData{ + State: loopdb.StateFailOffchainPayments, + }, + Time: outsideBackoff, + }, + } + + failedTimeoutInBackoff = []*loopdb.LoopEvent{ + { + SwapStateData: loopdb.SwapStateData{ + State: loopdb.StateFailTimeout, + }, + Time: withinBackoff, + }, + } + + failedTimeoutOutsideBackoff = []*loopdb.LoopEvent{ + { + SwapStateData: loopdb.SwapStateData{ + State: loopdb.StateFailTimeout, + }, + Time: outsideBackoff, + }, + } + ) + + tests := []struct { + name string + loopOut []*loopdb.LoopOut + loopIn []*loopdb.LoopIn + expected *swapTraffic + }{ + { + name: "completed swaps ignored", + loopOut: []*loopdb.LoopOut{ + { + Loop: loopdb.Loop{ + Events: success, + }, + Contract: &loopdb.LoopOutContract{}, + }, + }, + loopIn: []*loopdb.LoopIn{ + { + Loop: loopdb.Loop{ + Events: success, + }, + Contract: &loopdb.LoopInContract{}, + }, + }, + expected: newSwapTraffic(), + }, + { + // No events indicates that the swap is still pending. + name: "pending swaps included", + loopOut: []*loopdb.LoopOut{ + { + Contract: &loopdb.LoopOutContract{ + OutgoingChanSet: []uint64{ + chanID1.ToUint64(), + }, + }, + }, + }, + loopIn: []*loopdb.LoopIn{ + { + Contract: &loopdb.LoopInContract{ + LastHop: &peer2, + }, + }, + }, + expected: &swapTraffic{ + ongoingLoopOut: map[lnwire.ShortChannelID]bool{ + chanID1: true, + }, + ongoingLoopIn: map[route.Vertex]bool{ + peer2: true, + }, + // Make empty maps so that we can assert equal. + failedLoopOut: make( + map[lnwire.ShortChannelID]time.Time, + ), + failedLoopIn: make(map[route.Vertex]time.Time), + }, + }, + { + name: "failure backoff included", + loopOut: []*loopdb.LoopOut{ + { + Contract: &loopdb.LoopOutContract{ + OutgoingChanSet: []uint64{ + chanID1.ToUint64(), + }, + }, + Loop: loopdb.Loop{ + Events: failedInBackoff, + }, + }, + { + Contract: &loopdb.LoopOutContract{ + OutgoingChanSet: []uint64{ + chanID2.ToUint64(), + }, + }, + Loop: loopdb.Loop{ + Events: failedOutsideBackoff, + }, + }, + }, + loopIn: []*loopdb.LoopIn{ + { + Contract: &loopdb.LoopInContract{ + LastHop: &peer1, + }, + Loop: loopdb.Loop{ + Events: failedTimeoutInBackoff, + }, + }, + { + Contract: &loopdb.LoopInContract{ + LastHop: &peer2, + }, + Loop: loopdb.Loop{ + Events: failedTimeoutOutsideBackoff, + }, + }, + }, + expected: &swapTraffic{ + ongoingLoopOut: make( + map[lnwire.ShortChannelID]bool, + ), + ongoingLoopIn: make(map[route.Vertex]bool), + failedLoopOut: map[lnwire.ShortChannelID]time.Time{ + chanID1: withinBackoff, + }, + failedLoopIn: map[route.Vertex]time.Time{ + peer1: withinBackoff, + }, + }, + }, + } + + for _, testCase := range tests { + cfg, _ := newTestConfig() + m := NewManager(cfg) + + params := m.GetParameters() + params.FailureBackOff = backoff + require.NoError(t, m.SetParameters(context.Background(), params)) + + actual := m.currentSwapTraffic(testCase.loopOut, testCase.loopIn) + require.Equal(t, testCase.expected, actual) + } +}