You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

528 lines
13 KiB

  1. package loop
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "errors"
  6. "fmt"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/btcsuite/btcutil"
  11. "github.com/lightninglabs/loop/lndclient"
  12. "github.com/lightninglabs/loop/loopdb"
  13. "github.com/lightninglabs/loop/lsat"
  14. "github.com/lightninglabs/loop/swap"
  15. "github.com/lightninglabs/loop/sweep"
  16. "github.com/lightningnetwork/lnd/lntypes"
  17. )
  18. var (
  19. // ErrSwapFeeTooHigh is returned when the swap invoice amount is too
  20. // high.
  21. ErrSwapFeeTooHigh = errors.New("swap fee too high")
  22. // ErrPrepayAmountTooHigh is returned when the prepay invoice amount is
  23. // too high.
  24. ErrPrepayAmountTooHigh = errors.New("prepay amount too high")
  25. // ErrSwapAmountTooLow is returned when the requested swap amount is
  26. // less than the server minimum.
  27. ErrSwapAmountTooLow = errors.New("swap amount too low")
  28. // ErrSwapAmountTooHigh is returned when the requested swap amount is
  29. // more than the server maximum.
  30. ErrSwapAmountTooHigh = errors.New("swap amount too high")
  31. // ErrExpiryTooSoon is returned when the server proposes an expiry that
  32. // is too soon for us.
  33. ErrExpiryTooSoon = errors.New("swap expiry too soon")
  34. // ErrExpiryTooFar is returned when the server proposes an expiry that
  35. // is too soon for us.
  36. ErrExpiryTooFar = errors.New("swap expiry too far")
  37. // ErrSweepConfTargetTooFar is returned when the client proposes a
  38. // confirmation target to sweep the on-chain HTLC of a Loop Out that is
  39. // beyond the expiration height proposed by the server.
  40. ErrSweepConfTargetTooFar = errors.New("sweep confirmation target is " +
  41. "beyond swap expiration height")
  42. // serverRPCTimeout is the maximum time a gRPC request to the server
  43. // should be allowed to take.
  44. serverRPCTimeout = 30 * time.Second
  45. // globalCallTimeout is the maximum time any call of the client to the
  46. // server is allowed to take, including the time it may take to get
  47. // and pay for an LSAT token.
  48. globalCallTimeout = serverRPCTimeout + lsat.PaymentTimeout
  49. republishDelay = 10 * time.Second
  50. )
  51. // Client performs the client side part of swaps. This interface exists to be
  52. // able to implement a stub.
  53. type Client struct {
  54. started uint32 // To be used atomically.
  55. errChan chan error
  56. lndServices *lndclient.LndServices
  57. sweeper *sweep.Sweeper
  58. executor *executor
  59. resumeReady chan struct{}
  60. wg sync.WaitGroup
  61. clientConfig
  62. }
  63. // NewClient returns a new instance to initiate swaps with.
  64. func NewClient(dbDir string, serverAddress string, insecure bool,
  65. tlsPathServer string, lnd *lndclient.LndServices, maxLSATCost,
  66. maxLSATFee btcutil.Amount) (*Client, func(), error) {
  67. store, err := loopdb.NewBoltSwapStore(dbDir, lnd.ChainParams)
  68. if err != nil {
  69. return nil, nil, err
  70. }
  71. lsatStore, err := lsat.NewFileStore(dbDir)
  72. if err != nil {
  73. return nil, nil, err
  74. }
  75. swapServerClient, err := newSwapServerClient(
  76. serverAddress, insecure, tlsPathServer, lsatStore, lnd,
  77. maxLSATCost, maxLSATFee,
  78. )
  79. if err != nil {
  80. return nil, nil, err
  81. }
  82. config := &clientConfig{
  83. LndServices: lnd,
  84. Server: swapServerClient,
  85. Store: store,
  86. LsatStore: lsatStore,
  87. CreateExpiryTimer: func(d time.Duration) <-chan time.Time {
  88. return time.NewTimer(d).C
  89. },
  90. }
  91. sweeper := &sweep.Sweeper{
  92. Lnd: lnd,
  93. }
  94. executor := newExecutor(&executorConfig{
  95. lnd: lnd,
  96. store: store,
  97. sweeper: sweeper,
  98. createExpiryTimer: config.CreateExpiryTimer,
  99. })
  100. client := &Client{
  101. errChan: make(chan error),
  102. clientConfig: *config,
  103. lndServices: lnd,
  104. sweeper: sweeper,
  105. executor: executor,
  106. resumeReady: make(chan struct{}),
  107. }
  108. cleanup := func() {
  109. swapServerClient.Close()
  110. }
  111. return client, cleanup, nil
  112. }
  113. // FetchSwaps returns all loop in and out swaps currently in the database.
  114. func (s *Client) FetchSwaps() ([]*SwapInfo, error) {
  115. loopOutSwaps, err := s.Store.FetchLoopOutSwaps()
  116. if err != nil {
  117. return nil, err
  118. }
  119. loopInSwaps, err := s.Store.FetchLoopInSwaps()
  120. if err != nil {
  121. return nil, err
  122. }
  123. swaps := make([]*SwapInfo, 0, len(loopInSwaps)+len(loopOutSwaps))
  124. for _, swp := range loopOutSwaps {
  125. htlc, err := swap.NewHtlc(
  126. swp.Contract.CltvExpiry, swp.Contract.SenderKey,
  127. swp.Contract.ReceiverKey, swp.Hash, swap.HtlcP2WSH,
  128. s.lndServices.ChainParams,
  129. )
  130. if err != nil {
  131. return nil, err
  132. }
  133. swaps = append(swaps, &SwapInfo{
  134. SwapType: swap.TypeOut,
  135. SwapContract: swp.Contract.SwapContract,
  136. SwapStateData: swp.State(),
  137. SwapHash: swp.Hash,
  138. LastUpdate: swp.LastUpdateTime(),
  139. HtlcAddress: htlc.Address,
  140. })
  141. }
  142. for _, swp := range loopInSwaps {
  143. htlc, err := swap.NewHtlc(
  144. swp.Contract.CltvExpiry, swp.Contract.SenderKey,
  145. swp.Contract.ReceiverKey, swp.Hash, swap.HtlcNP2WSH,
  146. s.lndServices.ChainParams,
  147. )
  148. if err != nil {
  149. return nil, err
  150. }
  151. swaps = append(swaps, &SwapInfo{
  152. SwapType: swap.TypeIn,
  153. SwapContract: swp.Contract.SwapContract,
  154. SwapStateData: swp.State(),
  155. SwapHash: swp.Hash,
  156. LastUpdate: swp.LastUpdateTime(),
  157. HtlcAddress: htlc.Address,
  158. })
  159. }
  160. return swaps, nil
  161. }
  162. // Run is a blocking call that executes all swaps. Any pending swaps are
  163. // restored from persistent storage and resumed. Subsequent updates will be
  164. // sent through the passed in statusChan. The function can be terminated by
  165. // cancelling the context.
  166. func (s *Client) Run(ctx context.Context,
  167. statusChan chan<- SwapInfo) error {
  168. if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
  169. return errors.New("swap client can only be started once")
  170. }
  171. // Log connected node.
  172. info, err := s.lndServices.Client.GetInfo(ctx)
  173. if err != nil {
  174. return fmt.Errorf("GetInfo error: %v", err)
  175. }
  176. log.Infof("Connected to lnd node %v with pubkey %v",
  177. info.Alias, hex.EncodeToString(info.IdentityPubkey[:]),
  178. )
  179. // Setup main context used for cancelation.
  180. mainCtx, mainCancel := context.WithCancel(ctx)
  181. defer mainCancel()
  182. // Query store before starting event loop to prevent new swaps from
  183. // being treated as swaps that need to be resumed.
  184. pendingLoopOutSwaps, err := s.Store.FetchLoopOutSwaps()
  185. if err != nil {
  186. return err
  187. }
  188. pendingLoopInSwaps, err := s.Store.FetchLoopInSwaps()
  189. if err != nil {
  190. return err
  191. }
  192. // Start goroutine to deliver all pending swaps to the main loop.
  193. s.wg.Add(1)
  194. go func() {
  195. defer s.wg.Done()
  196. s.resumeSwaps(mainCtx, pendingLoopOutSwaps, pendingLoopInSwaps)
  197. // Signal that new requests can be accepted. Otherwise the new
  198. // swap could already have been added to the store and read in
  199. // this goroutine as being a swap that needs to be resumed.
  200. // Resulting in two goroutines executing the same swap.
  201. close(s.resumeReady)
  202. }()
  203. // Main event loop.
  204. err = s.executor.run(mainCtx, statusChan)
  205. // Consider canceled as happy flow.
  206. if err == context.Canceled {
  207. err = nil
  208. }
  209. if err != nil {
  210. log.Errorf("Swap client terminating: %v", err)
  211. } else {
  212. log.Info("Swap client terminating")
  213. }
  214. // Cancel all remaining active goroutines.
  215. mainCancel()
  216. // Wait for all to finish.
  217. log.Debug("Wait for executor to finish")
  218. s.executor.waitFinished()
  219. log.Debug("Wait for goroutines to finish")
  220. s.wg.Wait()
  221. log.Info("Swap client terminated")
  222. return err
  223. }
  224. // resumeSwaps restarts all pending swaps from the provided list.
  225. func (s *Client) resumeSwaps(ctx context.Context,
  226. loopOutSwaps []*loopdb.LoopOut, loopInSwaps []*loopdb.LoopIn) {
  227. swapCfg := &swapConfig{
  228. lnd: s.lndServices,
  229. store: s.Store,
  230. }
  231. for _, pend := range loopOutSwaps {
  232. if pend.State().State.Type() != loopdb.StateTypePending {
  233. continue
  234. }
  235. swap, err := resumeLoopOutSwap(ctx, swapCfg, pend)
  236. if err != nil {
  237. log.Errorf("resuming loop out swap: %v", err)
  238. continue
  239. }
  240. s.executor.initiateSwap(ctx, swap)
  241. }
  242. for _, pend := range loopInSwaps {
  243. if pend.State().State.Type() != loopdb.StateTypePending {
  244. continue
  245. }
  246. swap, err := resumeLoopInSwap(ctx, swapCfg, pend)
  247. if err != nil {
  248. log.Errorf("resuming loop in swap: %v", err)
  249. continue
  250. }
  251. s.executor.initiateSwap(ctx, swap)
  252. }
  253. }
  254. // LoopOut initiates a loop out swap. It blocks until the swap is initiation
  255. // with the swap server is completed (typically this takes only a short amount
  256. // of time). From there on further status information can be acquired through
  257. // the status channel returned from the Run call.
  258. //
  259. // When the call returns, the swap has been persisted and will be resumed
  260. // automatically after restarts.
  261. //
  262. // The return value is a hash that uniquely identifies the new swap.
  263. func (s *Client) LoopOut(globalCtx context.Context,
  264. request *OutRequest) (*lntypes.Hash, btcutil.Address, error) {
  265. log.Infof("LoopOut %v to %v (channel: %v)",
  266. request.Amount, request.DestAddr,
  267. request.LoopOutChannel,
  268. )
  269. if err := s.waitForInitialized(globalCtx); err != nil {
  270. return nil, nil, err
  271. }
  272. // Create a new swap object for this swap.
  273. initiationHeight := s.executor.height()
  274. swapCfg := &swapConfig{
  275. lnd: s.lndServices,
  276. store: s.Store,
  277. server: s.Server,
  278. }
  279. swap, err := newLoopOutSwap(
  280. globalCtx, swapCfg, initiationHeight, request,
  281. )
  282. if err != nil {
  283. return nil, nil, err
  284. }
  285. // Post swap to the main loop.
  286. s.executor.initiateSwap(globalCtx, swap)
  287. // Return hash so that the caller can identify this swap in the updates
  288. // stream.
  289. return &swap.hash, swap.htlc.Address, nil
  290. }
  291. // LoopOutQuote takes a LoopOut amount and returns a break down of estimated
  292. // costs for the client. Both the swap server and the on-chain fee estimator
  293. // are queried to get to build the quote response.
  294. func (s *Client) LoopOutQuote(ctx context.Context,
  295. request *LoopOutQuoteRequest) (*LoopOutQuote, error) {
  296. terms, err := s.Server.GetLoopOutTerms(ctx)
  297. if err != nil {
  298. return nil, err
  299. }
  300. if request.Amount < terms.MinSwapAmount {
  301. return nil, ErrSwapAmountTooLow
  302. }
  303. if request.Amount > terms.MaxSwapAmount {
  304. return nil, ErrSwapAmountTooHigh
  305. }
  306. quote, err := s.Server.GetLoopOutQuote(
  307. ctx, request.Amount, request.SwapPublicationDeadline,
  308. )
  309. if err != nil {
  310. return nil, err
  311. }
  312. log.Infof("Offchain swap destination: %x", quote.SwapPaymentDest)
  313. swapFee := quote.SwapFee
  314. // Generate dummy p2wsh address for fee estimation. The p2wsh address
  315. // type is chosen because it adds the most weight of all output types
  316. // and we want the quote to return a worst case value.
  317. wsh := [32]byte{}
  318. p2wshAddress, err := btcutil.NewAddressWitnessScriptHash(
  319. wsh[:], s.lndServices.ChainParams,
  320. )
  321. if err != nil {
  322. return nil, err
  323. }
  324. minerFee, err := s.sweeper.GetSweepFee(
  325. ctx, swap.QuoteHtlc.AddSuccessToEstimator,
  326. p2wshAddress, request.SweepConfTarget,
  327. )
  328. if err != nil {
  329. return nil, err
  330. }
  331. return &LoopOutQuote{
  332. SwapFee: swapFee,
  333. MinerFee: minerFee,
  334. PrepayAmount: quote.PrepayAmount,
  335. SwapPaymentDest: quote.SwapPaymentDest,
  336. CltvDelta: quote.CltvDelta,
  337. }, nil
  338. }
  339. // LoopOutTerms returns the terms on which the server executes swaps.
  340. func (s *Client) LoopOutTerms(ctx context.Context) (
  341. *LoopOutTerms, error) {
  342. return s.Server.GetLoopOutTerms(ctx)
  343. }
  344. // waitForInitialized for swaps to be resumed and executor ready.
  345. func (s *Client) waitForInitialized(ctx context.Context) error {
  346. select {
  347. case <-s.executor.ready:
  348. case <-ctx.Done():
  349. return ctx.Err()
  350. }
  351. select {
  352. case <-s.resumeReady:
  353. case <-ctx.Done():
  354. return ctx.Err()
  355. }
  356. return nil
  357. }
  358. // LoopIn initiates a loop in swap.
  359. func (s *Client) LoopIn(globalCtx context.Context,
  360. request *LoopInRequest) (*lntypes.Hash, btcutil.Address, error) {
  361. log.Infof("Loop in %v (last hop: %v)",
  362. request.Amount,
  363. request.LastHop,
  364. )
  365. if err := s.waitForInitialized(globalCtx); err != nil {
  366. return nil, nil, err
  367. }
  368. // Create a new swap object for this swap.
  369. initiationHeight := s.executor.height()
  370. swapCfg := swapConfig{
  371. lnd: s.lndServices,
  372. store: s.Store,
  373. server: s.Server,
  374. }
  375. swap, err := newLoopInSwap(
  376. globalCtx, &swapCfg, initiationHeight, request,
  377. )
  378. if err != nil {
  379. return nil, nil, err
  380. }
  381. // Post swap to the main loop.
  382. s.executor.initiateSwap(globalCtx, swap)
  383. // Return hash so that the caller can identify this swap in the updates
  384. // stream.
  385. return &swap.hash, swap.htlc.Address, nil
  386. }
  387. // LoopInQuote takes an amount and returns a break down of estimated
  388. // costs for the client. Both the swap server and the on-chain fee estimator are
  389. // queried to get to build the quote response.
  390. func (s *Client) LoopInQuote(ctx context.Context,
  391. request *LoopInQuoteRequest) (*LoopInQuote, error) {
  392. // Retrieve current server terms to calculate swap fee.
  393. terms, err := s.Server.GetLoopInTerms(ctx)
  394. if err != nil {
  395. return nil, err
  396. }
  397. // Check amount limits.
  398. if request.Amount < terms.MinSwapAmount {
  399. return nil, ErrSwapAmountTooLow
  400. }
  401. if request.Amount > terms.MaxSwapAmount {
  402. return nil, ErrSwapAmountTooHigh
  403. }
  404. quote, err := s.Server.GetLoopInQuote(ctx, request.Amount)
  405. if err != nil {
  406. return nil, err
  407. }
  408. swapFee := quote.SwapFee
  409. // We don't calculate the on-chain fee if the HTLC is going to be
  410. // published externally.
  411. if request.ExternalHtlc {
  412. return &LoopInQuote{
  413. SwapFee: swapFee,
  414. MinerFee: 0,
  415. }, nil
  416. }
  417. // Get estimate for miner fee.
  418. minerFee, err := s.lndServices.Client.EstimateFeeToP2WSH(
  419. ctx, request.Amount, request.HtlcConfTarget,
  420. )
  421. if err != nil {
  422. return nil, err
  423. }
  424. return &LoopInQuote{
  425. SwapFee: swapFee,
  426. MinerFee: minerFee,
  427. CltvDelta: quote.CltvDelta,
  428. }, nil
  429. }
  430. // LoopInTerms returns the terms on which the server executes swaps.
  431. func (s *Client) LoopInTerms(ctx context.Context) (
  432. *LoopInTerms, error) {
  433. return s.Server.GetLoopInTerms(ctx)
  434. }