Browse Source

Lightning Loop initial version

pull/1/head
Joost Jager 2 years ago
parent
commit
21fcd8d94e
No known key found for this signature in database GPG Key ID: A61B9D4C393C59C7
60 changed files with 9236 additions and 8 deletions
  1. +55
    -8
      .gitignore
  2. +96
    -0
      client/README.md
  3. +322
    -0
      client/client.go
  4. +291
    -0
      client/client_test.go
  5. +15
    -0
      client/config.go
  6. +166
    -0
      client/executor.go
  7. +236
    -0
      client/interface.go
  8. +24
    -0
      client/log.go
  9. +125
    -0
      client/server_mock_test.go
  10. +17
    -0
      client/state_type.go
  11. +472
    -0
      client/store.go
  12. +65
    -0
      client/store_interface.go
  13. +123
    -0
      client/store_meta.go
  14. +146
    -0
      client/store_mock_test.go
  15. +131
    -0
      client/store_test.go
  16. +96
    -0
      client/swap.go
  17. +143
    -0
      client/swap_server_client.go
  18. +234
    -0
      client/testcontext_test.go
  19. +675
    -0
      client/uncharge.go
  20. +89
    -0
      client/uncharge_state.go
  21. +100
    -0
      client/uncharge_test.go
  22. +20
    -0
      client/utils.go
  23. +300
    -0
      cmd/swapcli/main.go
  24. +156
    -0
      cmd/swapd/daemon.go
  25. +24
    -0
      cmd/swapd/log.go
  26. +70
    -0
      cmd/swapd/main.go
  27. +7
    -0
      cmd/swapd/rpc/gen_protos.sh
  28. +925
    -0
      cmd/swapd/rpc/swapclient.pb.go
  29. +259
    -0
      cmd/swapd/rpc/swapclient.proto
  30. +247
    -0
      cmd/swapd/swapclient_server.go
  31. +49
    -0
      cmd/swapd/utils.go
  32. +89
    -0
      cmd/swapd/view.go
  33. +44
    -0
      go.mod
  34. +240
    -0
      go.sum
  35. +244
    -0
      lndclient/chainnotifier_client.go
  36. +154
    -0
      lndclient/invoices_client.go
  37. +330
    -0
      lndclient/lightning_client.go
  38. +186
    -0
      lndclient/lnd_services.go
  39. +23
    -0
      lndclient/log.go
  40. +90
    -0
      lndclient/signer_client.go
  41. +180
    -0
      lndclient/walletkit_client.go
  42. +7
    -0
      rpc/gen_protos.sh
  43. +405
    -0
      rpc/server.pb.go
  44. +40
    -0
      rpc/server.proto
  45. +106
    -0
      sweep/sweeper.go
  46. +134
    -0
      test/chainnotifier_mock.go
  47. +240
    -0
      test/context.go
  48. +111
    -0
      test/invoices_mock.go
  49. +17
    -0
      test/keys.go
  50. +152
    -0
      test/lightning_client_mock.go
  51. +176
    -0
      test/lnd_services_mock.go
  52. +23
    -0
      test/log.go
  53. +19
    -0
      test/signer_mock.go
  54. +69
    -0
      test/testutils.go
  55. +31
    -0
      test/timeout.go
  56. +95
    -0
      test/walletkit_mock.go
  57. +9
    -0
      utils/config.go
  58. +180
    -0
      utils/htlc.go
  59. +41
    -0
      utils/swaplog.go
  60. +123
    -0
      utils/utils.go

+ 55
- 8
.gitignore View File

@ -1,12 +1,59 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
# ---> Go
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
*.dylib
# Test binary, build with `go test -c`
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test
*.prof
output*.log
swapcli
!swapcli/
*.key
*.hex
# vim
*.swp
*.hex
*.db
*.bin
vendor
*.idea
*.iml
profile.cov
profile.tmp
.DS_Store
.vscode
nautserver
!nautserver/
nautview
!nautview/
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
swapd
!swapd/

+ 96
- 0
client/README.md View File

@ -0,0 +1,96 @@
# Swaplet
## Uncharge swap (off -> on-chain)
```
swapcli uncharge 500
|
|
v
.-----------------------------.
| Swap CLI |
| ./cmd/swapcli |
| |
| |
| .-------------------. | .--------------. .---------------.
| | Swap Client (lib) | | | LND node | | Bitcoin node |
| | ./ |<-------------| |-------------------| |
| | | | | | on-chain | |
| | |------------->| | htlc | |
| | | | off-chain | | | |
| '-------------------' | htlc '--------------' '---------------'
'-----------------|-----------' | ^
| | |
| v |
| .--. .--.
| _ -( )- _ _ -( )- _
| .--,( ),--. .--,( ),--.
initiate| _.-( )-._ _.-( )-._
swap | ( LIGHTNING NETWORK ) ( BITCOIN NETWORK )
| '-._( )_.-' '-._( )_.-'
| '__,( ),__' '__,( ),__'
| - ._(__)_. - - ._(__)_. -
| | ^
| | |
v v |
.--------------------. off-chain .--------------. .---------------.
| Swap Server | htlc | LND node | | Bitcoin node |
| |<-------------| | | |
| | | | on-chain | |
| | | | htlc | |
| |--------------| |----------------->| |
| | | | | |
'--------------------' '--------------' '---------------'
```
## Setup
LND and the swaplet are using go modules. Make sure that the `GO111MODULE` env variable is set to `on`.
In order to execute a swap, LND needs to be rebuilt with sub servers enabled.
### LND
* Checkout branch `master`
- `make install tags="signrpc walletrpc chainrpc"` to build and install lnd with required sub-servers enabled.
- Make sure there are no macaroons in the lnd dir `~/.lnd/data/chain/bitcoin/mainnet`. If there are, lnd has been started before and in that case, it could be that `admin.macaroon` doesn't contain signer permission. Delete `macaroons.db` and `*.macaroon`.
DO NOT DELETE `wallet.db` !
- Start lnd
### Swaplet
- `git clone git@gitlab.com:lightning-labs/swaplet.git`
- `cd swaplet/cmd`
- `go install ./...`
## Execute a swap
* Swaps are executed by a client daemon process. Run:
`swapd`
By default `swapd` attempts to connect to an lnd instance running on `localhost:10009` and reads the macaroon and tls certificate from `~/.lnd`. This can be altered using command line flags. See `swapd --help`.
`swapd` only listens on localhost and uses an unencrypted and unauthenticated connection.
* To initiate a swap, run:
`swapcli uncharge <amt_msat>`
When the swap is initiated successfully, `swapd` will see the process through.
* To query and track the swap status, run `swapcli` without arguments.
## Resume
When `swapd` is terminated (or killed) for whatever reason, it will pickup pending swaps after a restart.
Information about pending swaps is stored persistently in the swap database. Its location is `~/.swaplet/<network>/swapclient.db`.
## Multiple simultaneous swaps
It is possible to execute multiple swaps simultaneously.

+ 322
- 0
client/client.go View File

@ -0,0 +1,322 @@
package client
import (
"context"
"encoding/hex"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcutil"
"github.com/lightninglabs/nautilus/lndclient"
"github.com/lightninglabs/nautilus/sweep"
"github.com/lightninglabs/nautilus/utils"
"github.com/lightningnetwork/lnd/lntypes"
)
var (
// ErrSwapFeeTooHigh is returned when the swap invoice amount is too
// high.
ErrSwapFeeTooHigh = errors.New("swap fee too high")
// ErrPrepayAmountTooHigh is returned when the prepay invoice amount is
// too high.
ErrPrepayAmountTooHigh = errors.New("prepay amount too high")
// ErrSwapAmountTooLow is returned when the requested swap amount is
// less than the server minimum.
ErrSwapAmountTooLow = errors.New("swap amount too low")
// ErrSwapAmountTooHigh is returned when the requested swap amount is
// more than the server maximum.
ErrSwapAmountTooHigh = errors.New("swap amount too high")
// ErrExpiryTooSoon is returned when the server proposes an expiry that
// is too soon for us.
ErrExpiryTooSoon = errors.New("swap expiry too soon")
// ErrExpiryTooFar is returned when the server proposes an expiry that
// is too soon for us.
ErrExpiryTooFar = errors.New("swap expiry too far")
serverRPCTimeout = 30 * time.Second
republishDelay = 10 * time.Second
)
// Client performs the client side part of swaps. This interface exists to
// be able to implement a stub.
type Client struct {
started uint32 // To be used atomically.
errChan chan error
lndServices *lndclient.LndServices
sweeper *sweep.Sweeper
executor *executor
resumeReady chan struct{}
wg sync.WaitGroup
clientConfig
}
// NewClient returns a new instance to initiate swaps with.
func NewClient(dbDir string, serverAddress string, insecure bool,
lnd *lndclient.LndServices) (*Client, func(), error) {
store, err := newBoltSwapClientStore(dbDir)
if err != nil {
return nil, nil, err
}
swapServerClient, err := newSwapServerClient(serverAddress, insecure)
if err != nil {
return nil, nil, err
}
config := &clientConfig{
LndServices: lnd,
Server: swapServerClient,
Store: store,
CreateExpiryTimer: func(d time.Duration) <-chan time.Time {
return time.NewTimer(d).C
},
}
sweeper := &sweep.Sweeper{
Lnd: lnd,
}
executor := newExecutor(&executorConfig{
lnd: lnd,
store: store,
sweeper: sweeper,
createExpiryTimer: config.CreateExpiryTimer,
})
client := &Client{
errChan: make(chan error),
clientConfig: *config,
lndServices: lnd,
sweeper: sweeper,
executor: executor,
resumeReady: make(chan struct{}),
}
cleanup := func() {
swapServerClient.Close()
}
return client, cleanup, nil
}
// GetUnchargeSwaps returns a list of all swaps currently in the database.
func (s *Client) GetUnchargeSwaps() ([]*PersistentUncharge, error) {
return s.Store.getUnchargeSwaps()
}
// Run is a blocking call that executes all swaps. Any pending swaps are
// restored from persistent storage and resumed. Subsequent updates
// will be sent through the passed in statusChan. The function can be
// terminated by cancelling the context.
func (s *Client) Run(ctx context.Context,
statusChan chan<- SwapInfo) error {
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
return errors.New("swap client can only be started once")
}
// Log connected node.
info, err := s.lndServices.Client.GetInfo(ctx)
if err != nil {
return fmt.Errorf("GetInfo error: %v", err)
}
logger.Infof("Connected to lnd node %v with pubkey %v",
info.Alias, hex.EncodeToString(info.IdentityPubkey[:]),
)
// Setup main context used for cancelation.
mainCtx, mainCancel := context.WithCancel(ctx)
defer mainCancel()
// Query store before starting event loop to prevent new swaps from
// being treated as swaps that need to be resumed.
pendingSwaps, err := s.Store.getUnchargeSwaps()
if err != nil {
return err
}
// Start goroutine to deliver all pending swaps to the main loop.
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.resumeSwaps(mainCtx, pendingSwaps)
// Signal that new requests can be accepted. Otherwise the new
// swap could already have been added to the store and read in
// this goroutine as being a swap that needs to be resumed.
// Resulting in two goroutines executing the same swap.
close(s.resumeReady)
}()
// Main event loop.
err = s.executor.run(mainCtx, statusChan)
// Consider canceled as happy flow.
if err == context.Canceled {
err = nil
}
if err != nil {
logger.Errorf("Swap client terminating: %v", err)
} else {
logger.Info("Swap client terminating")
}
// Cancel all remaining active goroutines.
mainCancel()
// Wait for all to finish.
logger.Debug("Wait for executor to finish")
s.executor.waitFinished()
logger.Debug("Wait for goroutines to finish")
s.wg.Wait()
logger.Info("Swap client terminated")
return err
}
// resumeSwaps restarts all pending swaps from the provided list.
func (s *Client) resumeSwaps(ctx context.Context,
swaps []*PersistentUncharge) {
for _, pend := range swaps {
if pend.State().Type() != StateTypePending {
continue
}
swapCfg := &swapConfig{
lnd: s.lndServices,
store: s.Store,
}
swap, err := resumeUnchargeSwap(ctx, swapCfg, pend)
if err != nil {
logger.Errorf("resuming swap: %v", err)
continue
}
s.executor.initiateSwap(ctx, swap)
}
}
// Uncharge initiates a uncharge swap. It blocks until the swap is
// initiation with the swap server is completed (typically this takes
// only a short amount of time). From there on further status
// information can be acquired through the status channel returned from
// the Run call.
//
// When the call returns, the swap has been persisted and will be
// resumed automatically after restarts.
//
// The return value is a hash that uniquely identifies the new swap.
func (s *Client) Uncharge(globalCtx context.Context,
request *UnchargeRequest) (*lntypes.Hash, error) {
logger.Infof("Uncharge %v to %v (channel: %v)",
request.Amount, request.DestAddr,
request.UnchargeChannel,
)
if err := s.waitForInitialized(globalCtx); err != nil {
return nil, err
}
// Create a new swap object for this swap.
initiationHeight := s.executor.height()
swapCfg := &swapConfig{
lnd: s.lndServices,
store: s.Store,
server: s.Server,
}
swap, err := newUnchargeSwap(
globalCtx, swapCfg, initiationHeight, request,
)
if err != nil {
return nil, err
}
// Post swap to the main loop.
s.executor.initiateSwap(globalCtx, swap)
// Return hash so that the caller can identify this swap in the updates
// stream.
return &swap.hash, nil
}
// UnchargeQuote takes a Uncharge amount and returns a break down of estimated
// costs for the client. Both the swap server and the on-chain fee estimator are
// queried to get to build the quote response.
func (s *Client) UnchargeQuote(ctx context.Context,
request *UnchargeQuoteRequest) (*UnchargeQuote, error) {
terms, err := s.Server.GetUnchargeTerms(ctx)
if err != nil {
return nil, err
}
if request.Amount < terms.MinSwapAmount {
return nil, ErrSwapAmountTooLow
}
if request.Amount > terms.MaxSwapAmount {
return nil, ErrSwapAmountTooHigh
}
logger.Infof("Offchain swap destination: %x", terms.SwapPaymentDest)
swapFee := utils.CalcFee(
request.Amount, terms.SwapFeeBase, terms.SwapFeeRate,
)
minerFee, err := s.sweeper.GetSweepFee(
ctx, utils.QuoteHtlc.MaxSuccessWitnessSize,
request.SweepConfTarget,
)
if err != nil {
return nil, err
}
return &UnchargeQuote{
SwapFee: swapFee,
MinerFee: minerFee,
PrepayAmount: btcutil.Amount(terms.PrepayAmt),
}, nil
}
// UnchargeTerms returns the terms on which the server executes swaps.
func (s *Client) UnchargeTerms(ctx context.Context) (
*UnchargeTerms, error) {
return s.Server.GetUnchargeTerms(ctx)
}
// waitForInitialized for swaps to be resumed and executor ready.
func (s *Client) waitForInitialized(ctx context.Context) error {
select {
case <-s.executor.ready:
case <-ctx.Done():
return ctx.Err()
}
select {
case <-s.resumeReady:
case <-ctx.Done():
return ctx.Err()
}
return nil
}

+ 291
- 0
client/client_test.go View File

@ -0,0 +1,291 @@
package client
import (
"bytes"
"context"
"crypto/sha256"
"errors"
"testing"
"github.com/btcsuite/btcutil"
"github.com/lightninglabs/nautilus/lndclient"
"github.com/lightninglabs/nautilus/test"
"github.com/lightningnetwork/lnd/lntypes"
)
var (
testAddr, _ = btcutil.DecodeAddress(
"rbsHiPKwAgxeo1EQYiyzJTkA8XEmWSVAKx", nil)
testRequest = &UnchargeRequest{
Amount: btcutil.Amount(50000),
DestAddr: testAddr,
MaxMinerFee: 50000,
SweepConfTarget: 2,
MaxSwapFee: 1050,
MaxPrepayAmount: 100,
MaxPrepayRoutingFee: 75000,
MaxSwapRoutingFee: 70000,
}
swapInvoiceDesc = "swap"
prepayInvoiceDesc = "prepay"
)
// TestSuccess tests the uncharge happy flow.
func TestSuccess(t *testing.T) {
defer test.Guard(t)()
ctx := createClientTestContext(t, nil)
// Initiate uncharge.
hash, err := ctx.swapClient.Uncharge(context.Background(), testRequest)
if err != nil {
t.Fatal(err)
}
ctx.assertStored()
ctx.assertStatus(StateInitiated)
signalSwapPaymentResult := ctx.AssertPaid(swapInvoiceDesc)
signalPrepaymentResult := ctx.AssertPaid(prepayInvoiceDesc)
// Expect client to register for conf
confIntent := ctx.AssertRegisterConf()
testSuccess(ctx, testRequest.Amount, *hash,
signalPrepaymentResult, signalSwapPaymentResult, false,
confIntent,
)
}
// TestFailOffchain tests the handling of swap for which the server failed the
// payments.
func TestFailOffchain(t *testing.T) {
defer test.Guard(t)()
ctx := createClientTestContext(t, nil)
_, err := ctx.swapClient.Uncharge(context.Background(), testRequest)
if err != nil {
t.Fatal(err)
}
ctx.assertStored()
ctx.assertStatus(StateInitiated)
signalSwapPaymentResult := ctx.AssertPaid(swapInvoiceDesc)
signalPrepaymentResult := ctx.AssertPaid(prepayInvoiceDesc)
ctx.AssertRegisterConf()
signalSwapPaymentResult(
errors.New(lndclient.PaymentResultUnknownPaymentHash),
)
signalPrepaymentResult(
errors.New(lndclient.PaymentResultUnknownPaymentHash),
)
ctx.assertStatus(StateFailOffchainPayments)
ctx.assertStoreFinished(StateFailOffchainPayments)
ctx.finish()
}
// TestWrongAmount asserts that the client checks the server invoice amounts.
func TestFailWrongAmount(t *testing.T) {
defer test.Guard(t)()
test := func(t *testing.T, modifier func(*serverMock),
expectedErr error) {
ctx := createClientTestContext(t, nil)
// Modify mock for this subtest.
modifier(ctx.serverMock)
_, err := ctx.swapClient.Uncharge(
context.Background(), testRequest,
)
if err != expectedErr {
t.Fatalf("Expected %v, but got %v", expectedErr, err)
}
ctx.finish()
}
t.Run("swap fee too high", func(t *testing.T) {
test(t, func(m *serverMock) {
m.swapInvoiceAmt += 10
}, ErrSwapFeeTooHigh)
})
t.Run("prepay amount too high", func(t *testing.T) {
test(t, func(m *serverMock) {
// Keep total swap fee unchanged, but increase prepaid
// portion.
m.swapInvoiceAmt -= 10
m.prepayInvoiceAmt += 10
}, ErrPrepayAmountTooHigh)
})
}
// TestResume tests that swaps in various states are properly resumed after a
// restart.
func TestResume(t *testing.T) {
defer test.Guard(t)()
t.Run("not expired", func(t *testing.T) {
testResume(t, false, false, true)
})
t.Run("expired not revealed", func(t *testing.T) {
testResume(t, true, false, false)
})
t.Run("expired revealed", func(t *testing.T) {
testResume(t, true, true, true)
})
}
func testResume(t *testing.T, expired, preimageRevealed, expectSuccess bool) {
defer test.Guard(t)()
preimage := testPreimage
hash := sha256.Sum256(preimage[:])
dest := test.GetDestAddr(t, 0)
amt := btcutil.Amount(50000)
swapPayReq, err := getInvoice(hash, amt, swapInvoiceDesc)
if err != nil {
t.Fatal(err)
}
prePayReq, err := getInvoice(hash, 100, prepayInvoiceDesc)
if err != nil {
t.Fatal(err)
}
_, senderPubKey := test.CreateKey(1)
var senderKey [33]byte
copy(senderKey[:], senderPubKey.SerializeCompressed())
_, receiverPubKey := test.CreateKey(2)
var receiverKey [33]byte
copy(receiverKey[:], receiverPubKey.SerializeCompressed())
state := StateInitiated
if preimageRevealed {
state = StatePreimageRevealed
}
pendingSwap := &PersistentUncharge{
Contract: &UnchargeContract{
DestAddr: dest,
SwapInvoice: swapPayReq,
SweepConfTarget: 2,
MaxSwapRoutingFee: 70000,
SwapContract: SwapContract{
Preimage: preimage,
AmountRequested: amt,
CltvExpiry: 744,
ReceiverKey: receiverKey,
SenderKey: senderKey,
MaxSwapFee: 60000,
PrepayInvoice: prePayReq,
MaxMinerFee: 50000,
},
},
Events: []*PersistentUnchargeEvent{
{
State: state,
},
},
Hash: hash,
}
if expired {
// Set cltv expiry so that it has already expired at the test
// block height.
pendingSwap.Contract.CltvExpiry = 610
}
ctx := createClientTestContext(t, []*PersistentUncharge{pendingSwap})
if preimageRevealed {
ctx.assertStatus(StatePreimageRevealed)
} else {
ctx.assertStatus(StateInitiated)
}
signalSwapPaymentResult := ctx.AssertPaid(swapInvoiceDesc)
signalPrepaymentResult := ctx.AssertPaid(prepayInvoiceDesc)
// Expect client to register for conf
confIntent := ctx.AssertRegisterConf()
signalSwapPaymentResult(nil)
signalPrepaymentResult(nil)
if !expectSuccess {
ctx.assertStatus(StateFailTimeout)
ctx.assertStoreFinished(StateFailTimeout)
ctx.finish()
return
}
// Because there is no reliable payment yet, an invoice is assumed to be
// paid after resume.
testSuccess(ctx, amt, hash,
func(r error) {},
func(r error) {},
preimageRevealed,
confIntent,
)
}
func testSuccess(ctx *testContext, amt btcutil.Amount, hash lntypes.Hash,
signalPrepaymentResult, signalSwapPaymentResult func(error),
preimageRevealed bool, confIntent *test.ConfRegistration) {
htlcOutpoint := ctx.publishHtlc(confIntent.PkScript, amt)
signalPrepaymentResult(nil)
ctx.AssertRegisterSpendNtfn(confIntent.PkScript)
// Publish tick.
ctx.expiryChan <- testTime
if !preimageRevealed {
ctx.assertStatus(StatePreimageRevealed)
ctx.assertStorePreimageReveal()
}
// Expect client on-chain sweep of HTLC.
sweepTx := ctx.ReceiveTx()
if !bytes.Equal(sweepTx.TxIn[0].PreviousOutPoint.Hash[:],
htlcOutpoint.Hash[:]) {
ctx.T.Fatalf("client not sweeping from htlc tx")
}
// Check preimage.
clientPreImage := sweepTx.TxIn[0].Witness[1]
clientPreImageHash := sha256.Sum256(clientPreImage)
if clientPreImageHash != hash {
ctx.T.Fatalf("incorrect preimage")
}
// Simulate server pulling payment.
signalSwapPaymentResult(nil)
ctx.NotifySpend(sweepTx, 0)
ctx.assertStatus(StateSuccess)
ctx.assertStoreFinished(StateSuccess)
ctx.finish()
}

+ 15
- 0
client/config.go View File

@ -0,0 +1,15 @@
package client
import (
"time"
"github.com/lightninglabs/nautilus/lndclient"
)
// clientConfig contains config items for the swap client.
type clientConfig struct {
LndServices *lndclient.LndServices
Server swapServerClient
Store swapClientStore
CreateExpiryTimer func(expiry time.Duration) <-chan time.Time
}

+ 166
- 0
client/executor.go View File

@ -0,0 +1,166 @@
package client
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/lightninglabs/nautilus/lndclient"
"github.com/lightninglabs/nautilus/sweep"
"github.com/lightningnetwork/lnd/queue"
)
// executorConfig contains executor configuration data.
type executorConfig struct {
lnd *lndclient.LndServices
sweeper *sweep.Sweeper
store swapClientStore
createExpiryTimer func(expiry time.Duration) <-chan time.Time
}
// executor is responsible for executing swaps.
type executor struct {
wg sync.WaitGroup
newSwaps chan genericSwap
currentHeight uint32
ready chan struct{}
executorConfig
}
// newExecutor returns a new swap executor instance.
func newExecutor(cfg *executorConfig) *executor {
return &executor{
executorConfig: *cfg,
newSwaps: make(chan genericSwap),
ready: make(chan struct{}),
}
}
// run starts the executor event loop. It accepts and executes new swaps,
// providing them with required config data.
func (s *executor) run(mainCtx context.Context,
statusChan chan<- SwapInfo) error {
blockEpochChan, blockErrorChan, err :=
s.lnd.ChainNotifier.RegisterBlockEpochNtfn(mainCtx)
if err != nil {
return err
}
// Before starting, make sure we have an up to date block height.
// Otherwise we might reveal a preimage for a swap that is already
// expired.
logger.Infof("Wait for first block ntfn")
var height int32
setHeight := func(h int32) {
height = h
atomic.StoreUint32(&s.currentHeight, uint32(h))
}
select {
case h := <-blockEpochChan:
setHeight(int32(h))
case err := <-blockErrorChan:
return err
case <-mainCtx.Done():
return mainCtx.Err()
}
// Start main event loop.
logger.Infof("Starting event loop at height %v", height)
// Signal that executor being ready with an up to date block height.
close(s.ready)
// Use a map to administer the individual notification queues for the
// swaps.
blockEpochQueues := make(map[int]*queue.ConcurrentQueue)
// On exit, stop all queue goroutines.
defer func() {
for _, queue := range blockEpochQueues {
queue.Stop()
}
}()
swapDoneChan := make(chan int)
nextSwapID := 0
for {
select {
case newSwap := <-s.newSwaps:
queue := queue.NewConcurrentQueue(10)
queue.Start()
swapID := nextSwapID
blockEpochQueues[swapID] = queue
s.wg.Add(1)
go func() {
defer s.wg.Done()
newSwap.execute(mainCtx, &executeConfig{
statusChan: statusChan,
sweeper: s.sweeper,
blockEpochChan: queue.ChanOut(),
timerFactory: s.executorConfig.createExpiryTimer,
}, height)
select {
case swapDoneChan <- swapID:
case <-mainCtx.Done():
}
}()
nextSwapID++
case doneID := <-swapDoneChan:
queue, ok := blockEpochQueues[doneID]
if !ok {
return fmt.Errorf(
"swap id %v not found in queues",
doneID)
}
queue.Stop()
delete(blockEpochQueues, doneID)
case h := <-blockEpochChan:
setHeight(int32(h))
for _, queue := range blockEpochQueues {
select {
case queue.ChanIn() <- int32(h):
case <-mainCtx.Done():
return mainCtx.Err()
}
}
case err := <-blockErrorChan:
return fmt.Errorf("block error: %v", err)
case <-mainCtx.Done():
return mainCtx.Err()
}
}
}
// initiateSwap delivers a new swap to the executor main loop.
func (s *executor) initiateSwap(ctx context.Context,
swap genericSwap) {
select {
case s.newSwaps <- swap:
case <-ctx.Done():
return
}
}
// height returns the current height known to the swap server.
func (s *executor) height() int32 {
return int32(atomic.LoadUint32(&s.currentHeight))
}
// waitFinished waits for all swap goroutines to finish.
func (s *executor) waitFinished() {
s.wg.Wait()
}

+ 236
- 0
client/interface.go View File

@ -0,0 +1,236 @@
package client
import (
"time"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/lntypes"
)
// UnchargeRequest contains the required parameters for the swap.
type UnchargeRequest struct {
// Amount specifies the requested swap amount in sat. This does not
// include the swap and miner fee.
Amount btcutil.Amount
// Destination address for the swap.
DestAddr btcutil.Address
// MaxSwapRoutingFee is the maximum off-chain fee in msat that may be
// paid for payment to the server. This limit is applied during path
// finding. Typically this value is taken from the response of the
// UnchargeQuote call.
MaxSwapRoutingFee btcutil.Amount
// MaxPrepayRoutingFee is the maximum off-chain fee in msat that may be
// paid for payment to the server. This limit is applied during path
// finding. Typically this value is taken from the response of the
// UnchargeQuote call.
MaxPrepayRoutingFee btcutil.Amount
// MaxSwapFee is the maximum we are willing to pay the server for the
// swap. This value is not disclosed in the swap initiation call, but if
// the server asks for a higher fee, we abort the swap. Typically this
// value is taken from the response of the UnchargeQuote call. It
// includes the prepay amount.
MaxSwapFee btcutil.Amount
// MaxPrepayAmount is the maximum amount of the swap fee that may be
// charged as a prepayment.
MaxPrepayAmount btcutil.Amount
// MaxMinerFee is the maximum in on-chain fees that we are willing to
// spent. If we want to sweep the on-chain htlc and the fee estimate
// turns out higher than this value, we cancel the swap. If the fee
// estimate is lower, we publish the sweep tx.
//
// If the sweep tx isn't confirmed, we are forced to ratchet up fees
// until it is swept. Possibly even exceeding MaxMinerFee if we get
// close to the htlc timeout. Because the initial publication revealed
// the preimage, we have no other choice. The server may already have
// pulled the off-chain htlc. Only when the fee becomes higher than the
// swap amount, we can only wait for fees to come down and hope - if we
// are past the timeout - that the server isn't publishing the
// revocation.
//
// MaxMinerFee is typically taken from the response of the
// UnchargeQuote call.
MaxMinerFee btcutil.Amount
// SweepConfTarget specifies the targeted confirmation target for the
// client sweep tx.
SweepConfTarget int32
// UnchargeChannel optionally specifies the short channel id of the
// channel to uncharge.
UnchargeChannel *uint64
}
// UnchargeContract contains the data that is serialized to persistent storage for
// pending swaps.
type UnchargeContract struct {
SwapContract
DestAddr btcutil.Address
SwapInvoice string
// MaxSwapRoutingFee is the maximum off-chain fee in msat that may be
// paid for the swap payment to the server.
MaxSwapRoutingFee btcutil.Amount
// SweepConfTarget specifies the targeted confirmation target for the
// client sweep tx.
SweepConfTarget int32
// UnchargeChannel is the channel to uncharge. If zero, any channel may
// be used.
UnchargeChannel *uint64
}
// UnchargeSwapInfo contains status information for a uncharge swap.
type UnchargeSwapInfo struct {
UnchargeContract
SwapInfoKit
// State where the swap is in.
State SwapState
}
// SwapCost is a breakdown of the final swap costs.
type SwapCost struct {
// Swap is the amount paid to the server.
Server btcutil.Amount
// Onchain is the amount paid to miners for the onchain tx.
Onchain btcutil.Amount
}
// UnchargeQuoteRequest specifies the swap parameters for which a quote is
// requested.
type UnchargeQuoteRequest struct {
// Amount specifies the requested swap amount in sat. This does not
// include the swap and miner fee.
Amount btcutil.Amount
// SweepConfTarget specifies the targeted confirmation target for the
// client sweep tx.
SweepConfTarget int32
// TODO: Add argument to specify confirmation target for server
// publishing htlc. This may influence the swap fee quote, because the
// server needs to pay more for faster confirmations.
//
// TODO: Add arguments to specify maximum total time locks for the
// off-chain swap payment and prepayment. This may influence the
// available routes and off-chain fee estimates. To apply these maximum
// values properly, the server needs to be queried for its required
// final cltv delta values for the off-chain payments.
}
// UnchargeQuote contains estimates for the fees making up the total swap cost
// for the client.
type UnchargeQuote struct {
// SwapFee is the fee that the swap server is charging for the swap.
SwapFee btcutil.Amount
// PrepayAmount is the part of the swap fee that is requested as a
// prepayment.
PrepayAmount btcutil.Amount
// MinerFee is an estimate of the on-chain fee that needs to be paid to
// sweep the htlc.
MinerFee btcutil.Amount
}
// UnchargeTerms are the server terms on which it executes swaps.
type UnchargeTerms struct {
// SwapFeeBase is the fixed per-swap base fee.
SwapFeeBase btcutil.Amount
// SwapFeeRate is the variable fee in parts per million.
SwapFeeRate int64
// PrepayAmt is the fixed part of the swap fee that needs to be prepaid.
PrepayAmt btcutil.Amount
// MinSwapAmount is the minimum amount that the server requires for a
// swap.
MinSwapAmount btcutil.Amount
// MaxSwapAmount is the maximum amount that the server accepts for a
// swap.
MaxSwapAmount btcutil.Amount
// Time lock delta relative to current block height that swap server
// will accept on the swap initiation call.
CltvDelta int32
// SwapPaymentDest is the node pubkey where to swap payment needs to be
// sent to.
SwapPaymentDest [33]byte
}
// SwapContract contains the base data that is serialized to persistent storage
// for pending swaps.
type SwapContract struct {
Preimage lntypes.Preimage
AmountRequested btcutil.Amount
PrepayInvoice string
SenderKey [33]byte
ReceiverKey [33]byte
CltvExpiry int32
// MaxPrepayRoutingFee is the maximum off-chain fee in msat that may be
// paid for the prepayment to the server.
MaxPrepayRoutingFee btcutil.Amount
// MaxSwapFee is the maximum we are willing to pay the server for the
// swap.
MaxSwapFee btcutil.Amount
// MaxMinerFee is the maximum in on-chain fees that we are willing to
// spend.
MaxMinerFee btcutil.Amount
// InitiationHeight is the block height at which the swap was initiated.
InitiationHeight int32
// InitiationTime is the time at which the swap was initiated.
InitiationTime time.Time
}
// SwapInfoKit contains common swap info fields.
type SwapInfoKit struct {
// Hash is the sha256 hash of the preimage that unlocks the htlcs. It is
// used to uniquely identify this swap.
Hash lntypes.Hash
// LastUpdateTime is the time of the last update of this swap.
LastUpdateTime time.Time
}
// SwapType indicates the type of swap.
type SwapType uint8
const (
// SwapTypeCharge is a charge swap.
SwapTypeCharge SwapType = iota
// SwapTypeUncharge is an uncharge swap.
SwapTypeUncharge
)
// SwapInfo exposes common info fields for charge and uncharge swaps.
type SwapInfo struct {
LastUpdate time.Time
SwapHash lntypes.Hash
State SwapState
SwapType SwapType
SwapContract
}

+ 24
- 0
client/log.go View File

@ -0,0 +1,24 @@
package client
import (
"github.com/btcsuite/btclog"
"os"
)
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var (
backendLog = btclog.NewBackend(logWriter{})
logger = backendLog.Logger("CLIENT")
servicesLogger = backendLog.Logger("SERVICES")
)
// logWriter implements an io.Writer that outputs to both standard output and
// the write-end pipe of an initialized log rotator.
type logWriter struct{}
func (logWriter) Write(p []byte) (n int, err error) {
os.Stdout.Write(p)
return len(p), nil
}

+ 125
- 0
client/server_mock_test.go View File

@ -0,0 +1,125 @@
package client
import (
"context"
"errors"
"testing"
"time"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcutil"
"github.com/lightninglabs/nautilus/test"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/zpay32"
)
var (
testTime = time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC)
testUnchargeOnChainCltvDelta = int32(30)
testCltvDelta = 50
testSwapFeeBase = btcutil.Amount(21)
testSwapFeeRate = int64(100)
testInvoiceExpiry = 180 * time.Second
testFixedPrepayAmount = btcutil.Amount(100)
testMinSwapAmount = btcutil.Amount(10000)
testMaxSwapAmount = btcutil.Amount(1000000)
testTxConfTarget = 2
testRepublishDelay = 10 * time.Second
)
// serverMock is used in client unit tests to simulate swap server behaviour.
type serverMock struct {
t *testing.T
expectedSwapAmt btcutil.Amount
swapInvoiceAmt btcutil.Amount
prepayInvoiceAmt btcutil.Amount
height int32
swapInvoice string
swapHash lntypes.Hash
}
func newServerMock() *serverMock {
return &serverMock{
expectedSwapAmt: 50000,
// Total swap fee: 1000 + 0.01 * 50000 = 1050
swapInvoiceAmt: 50950,
prepayInvoiceAmt: 100,
height: 600,
}
}
func (s *serverMock) NewUnchargeSwap(ctx context.Context,
swapHash lntypes.Hash, amount btcutil.Amount,
receiverKey [33]byte) (
*newUnchargeResponse, error) {
_, senderKey := test.CreateKey(100)
if amount != s.expectedSwapAmt {
return nil, errors.New("unexpected test swap amount")
}
swapPayReqString, err := getInvoice(swapHash, s.swapInvoiceAmt,
swapInvoiceDesc)
if err != nil {
return nil, err
}
prePayReqString, err := getInvoice(swapHash, s.prepayInvoiceAmt,
prepayInvoiceDesc)
if err != nil {
return nil, err
}
var senderKeyArray [33]byte
copy(senderKeyArray[:], senderKey.SerializeCompressed())
return &newUnchargeResponse{
senderKey: senderKeyArray,
swapInvoice: swapPayReqString,
prepayInvoice: prePayReqString,
expiry: s.height + testUnchargeOnChainCltvDelta,
}, nil
}
func (s *serverMock) GetUnchargeTerms(ctx context.Context) (
*UnchargeTerms, error) {
dest := [33]byte{1, 2, 3}
return &UnchargeTerms{
SwapFeeBase: testSwapFeeBase,
SwapFeeRate: testSwapFeeRate,
SwapPaymentDest: dest,
CltvDelta: testUnchargeOnChainCltvDelta,
MinSwapAmount: testMinSwapAmount,
MaxSwapAmount: testMaxSwapAmount,
PrepayAmt: testFixedPrepayAmount,
}, nil
}
func getInvoice(hash lntypes.Hash, amt btcutil.Amount, memo string) (string, error) {
req, err := zpay32.NewInvoice(
&chaincfg.TestNet3Params, hash, testTime,
zpay32.Description(memo),
zpay32.Amount(lnwire.MilliSatoshi(1000*amt)),
)
if err != nil {
return "", err
}
reqString, err := test.EncodePayReq(req)
if err != nil {
return "", err
}
return reqString, nil
}

+ 17
- 0
client/state_type.go View File

@ -0,0 +1,17 @@
package client
// SwapStateType defines the types of swap states that exist. Every swap state
// defined as type SwapState above, falls into one of these SwapStateType
// categories.
type SwapStateType uint8
const (
// StateTypePending indicates that the swap is still pending.
StateTypePending SwapStateType = iota
// StateTypeSuccess indicates that the swap has completed successfully.
StateTypeSuccess
// StateTypeFail indicates that the swap has failed.
StateTypeFail
)

+ 472
- 0
client/store.go View File

@ -0,0 +1,472 @@
package client
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"time"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/coreos/bbolt"
"github.com/lightninglabs/nautilus/utils"
"github.com/lightningnetwork/lnd/lntypes"
)
var (
dbFileName = "swapclient.db"
// unchargeSwapsBucketKey is a bucket that contains all swaps that are
// currently pending or completed.
//
// maps: swap_hash -> UnchargeContract
unchargeSwapsBucketKey = []byte("uncharge-swaps")
// unchargeUpdatesBucketKey is a bucket that contains all updates
// pertaining to a swap. This list only ever grows.
//
// maps: update_nr -> time | state
updatesBucketKey = []byte("updates")
// contractKey is the key that stores the serialized swap contract.
contractKey = []byte("contract")
byteOrder = binary.BigEndian
keyLength = 33
)
// boltSwapClientStore stores swap data in boltdb.
type boltSwapClientStore struct {
db *bbolt.DB
}
// newBoltSwapClientStore creates a new client swap store.
func newBoltSwapClientStore(dbPath string) (*boltSwapClientStore, error) {
if !utils.FileExists(dbPath) {
if err := os.MkdirAll(dbPath, 0700); err != nil {
return nil, err
}
}
path := filepath.Join(dbPath, dbFileName)
bdb, err := bbolt.Open(path, 0600, nil)
if err != nil {
return nil, err
}
err = bdb.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(unchargeSwapsBucketKey)
if err != nil {
return err
}
_, err = tx.CreateBucketIfNotExists(updatesBucketKey)
if err != nil {
return err
}
_, err = tx.CreateBucketIfNotExists(metaBucket)
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
err = syncVersions(bdb)
if err != nil {
return nil, err
}
return &boltSwapClientStore{
db: bdb,
}, nil
}
// getUnchargeSwaps returns all swaps currently in the store.
func (s *boltSwapClientStore) getUnchargeSwaps() ([]*PersistentUncharge, error) {
var swaps []*PersistentUncharge
err := s.db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(unchargeSwapsBucketKey)
if bucket == nil {
return errors.New("bucket does not exist")
}
err := bucket.ForEach(func(k, _ []byte) error {
swapBucket := bucket.Bucket(k)
if swapBucket == nil {
return fmt.Errorf("swap bucket %x not found",
k)
}
contractBytes := swapBucket.Get(contractKey)
if contractBytes == nil {
return errors.New("contract not found")
}
contract, err := deserializeUnchargeContract(
contractBytes,
)
if err != nil {
return err
}
stateBucket := swapBucket.Bucket(updatesBucketKey)
if stateBucket == nil {
return errors.New("updates bucket not found")
}
var updates []*PersistentUnchargeEvent
err = stateBucket.ForEach(func(k, v []byte) error {
event, err := deserializeUnchargeUpdate(v)
if err != nil {
return err
}
updates = append(updates, event)
return nil
})
if err != nil {
return err
}
var hash lntypes.Hash
copy(hash[:], k)
swap := PersistentUncharge{
Contract: contract,
Hash: hash,
Events: updates,
}
swaps = append(swaps, &swap)
return nil
})
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return swaps, nil
}
// createUncharge adds an initiated swap to the store.
func (s *boltSwapClientStore) createUncharge(hash lntypes.Hash,
swap *UnchargeContract) error {
if hash != swap.Preimage.Hash() {
return errors.New("hash and preimage do not match")
}
return s.db.Update(func(tx *bbolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(unchargeSwapsBucketKey)
if err != nil {
return err
}
if bucket.Get(hash[:]) != nil {
return fmt.Errorf("swap %v already exists", swap.Preimage)
}
// Create bucket for swap.
swapBucket, err := bucket.CreateBucket(hash[:])
if err != nil {
return err
}
contract, err := serializeUnchargeContract(swap)
if err != nil {
return err
}
// Store contact.
if err := swapBucket.Put(contractKey, contract); err != nil {
return err
}
// Create empty updates bucket.
_, err = swapBucket.CreateBucket(updatesBucketKey)
return err
})
}
// updateUncharge stores a swap updateUncharge.
func (s *boltSwapClientStore) updateUncharge(hash lntypes.Hash, time time.Time,
state SwapState) error {
return s.db.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(unchargeSwapsBucketKey)
if bucket == nil {
return errors.New("bucket does not exist")
}
swapBucket := bucket.Bucket(hash[:])
if swapBucket == nil {
return errors.New("swap not found")
}
updateBucket := swapBucket.Bucket(updatesBucketKey)
if updateBucket == nil {
return errors.New("udpate bucket not found")
}
id, err := updateBucket.NextSequence()
if err != nil {
return err
}
updateValue, err := serializeUnchargeUpdate(time, state)
if err != nil {
return err
}
return updateBucket.Put(itob(id), updateValue)
})
}
// Close closes the underlying bolt db.
func (s *boltSwapClientStore) close() error {
return s.db.Close()
}
func deserializeUnchargeContract(value []byte) (*UnchargeContract, error) {
r := bytes.NewReader(value)
contract, err := deserializeContract(r)
if err != nil {
return nil, err
}
swap := UnchargeContract{
SwapContract: *contract,
}
addr, err := wire.ReadVarString(r, 0)
if err != nil {
return nil, err
}
swap.DestAddr, err = btcutil.DecodeAddress(addr, nil)
if err != nil {
return nil, err
}
swap.SwapInvoice, err = wire.ReadVarString(r, 0)
if err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &swap.SweepConfTarget); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &swap.MaxSwapRoutingFee); err != nil {
return nil, err
}
var unchargeChannel uint64
if err := binary.Read(r, byteOrder, &unchargeChannel); err != nil {
return nil, err
}
if unchargeChannel != 0 {
swap.UnchargeChannel = &unchargeChannel
}
return &swap, nil
}
func serializeUnchargeContract(swap *UnchargeContract) (
[]byte, error) {
var b bytes.Buffer
serializeContract(&swap.SwapContract, &b)
addr := swap.DestAddr.String()
if err := wire.WriteVarString(&b, 0, addr); err != nil {
return nil, err
}
if err := wire.WriteVarString(&b, 0, swap.SwapInvoice); err != nil {
return nil, err
}
if err := binary.Write(&b, byteOrder, swap.SweepConfTarget); err != nil {
return nil, err
}
if err := binary.Write(&b, byteOrder, swap.MaxSwapRoutingFee); err != nil {
return nil, err
}
var unchargeChannel uint64
if swap.UnchargeChannel != nil {
unchargeChannel = *swap.UnchargeChannel
}
if err := binary.Write(&b, byteOrder, unchargeChannel); err != nil {
return nil, err
}
return b.Bytes(), nil
}
func deserializeContract(r io.Reader) (*SwapContract, error) {
swap := SwapContract{}
var err error
var unixNano int64
if err := binary.Read(r, byteOrder, &unixNano); err != nil {
return nil, err
}
swap.InitiationTime = time.Unix(0, unixNano)
if err := binary.Read(r, byteOrder, &swap.Preimage); err != nil {
return nil, err
}
binary.Read(r, byteOrder, &swap.AmountRequested)
swap.PrepayInvoice, err = wire.ReadVarString(r, 0)
if err != nil {
return nil, err
}
n, err := r.Read(swap.SenderKey[:])
if err != nil {
return nil, err
}
if n != keyLength {
return nil, fmt.Errorf("sender key has invalid length")
}
n, err = r.Read(swap.ReceiverKey[:])
if err != nil {
return nil, err
}
if n != keyLength {
return nil, fmt.Errorf("receiver key has invalid length")
}
if err := binary.Read(r, byteOrder, &swap.CltvExpiry); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &swap.MaxMinerFee); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &swap.MaxSwapFee); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &swap.MaxPrepayRoutingFee); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &swap.InitiationHeight); err != nil {
return nil, err
}
return &swap, nil
}
func serializeContract(swap *SwapContract, b *bytes.Buffer) error {
if err := binary.Write(b, byteOrder, swap.InitiationTime.UnixNano()); err != nil {
return err
}
if err := binary.Write(b, byteOrder, swap.Preimage); err != nil {
return err
}