You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

916 lines
24 KiB

  1. package loop
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "crypto/sha256"
  6. "errors"
  7. "fmt"
  8. "sync"
  9. "time"
  10. "github.com/btcsuite/btcutil"
  11. "github.com/btcsuite/btcd/chaincfg/chainhash"
  12. "github.com/btcsuite/btcd/wire"
  13. "github.com/lightninglabs/lndclient"
  14. "github.com/lightninglabs/loop/labels"
  15. "github.com/lightninglabs/loop/loopdb"
  16. "github.com/lightninglabs/loop/swap"
  17. "github.com/lightningnetwork/lnd/chainntnfs"
  18. "github.com/lightningnetwork/lnd/channeldb"
  19. "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
  20. "github.com/lightningnetwork/lnd/lntypes"
  21. "github.com/lightningnetwork/lnd/lnwire"
  22. )
  23. var (
  24. // MaxLoopInAcceptDelta configures the maximum acceptable number of
  25. // remaining blocks until the on-chain htlc expires. This value is used
  26. // to decide whether we want to continue with the swap parameters as
  27. // proposed by the server. It is a protection to prevent the server from
  28. // getting us to lock up our funds to an arbitrary point in the future.
  29. MaxLoopInAcceptDelta = int32(1500)
  30. // MinLoopInPublishDelta defines the minimum number of remaining blocks
  31. // until on-chain htlc expiry required to proceed to publishing the htlc
  32. // tx. This value isn't critical, as we could even safely publish the
  33. // htlc after expiry. The reason we do implement this check is to
  34. // prevent us from publishing an htlc that the server surely wouldn't
  35. // follow up to.
  36. MinLoopInPublishDelta = int32(10)
  37. // TimeoutTxConfTarget defines the confirmation target for the loop in
  38. // timeout tx.
  39. TimeoutTxConfTarget = int32(2)
  40. )
  41. // loopInSwap contains all the in-memory state related to a pending loop in
  42. // swap.
  43. type loopInSwap struct {
  44. swapKit
  45. executeConfig
  46. loopdb.LoopInContract
  47. htlc *swap.Htlc
  48. htlcP2WSH *swap.Htlc
  49. htlcNP2WSH *swap.Htlc
  50. // htlcTxHash is the confirmed htlc tx id.
  51. htlcTxHash *chainhash.Hash
  52. timeoutAddr btcutil.Address
  53. wg sync.WaitGroup
  54. }
  55. // loopInInitResult contains information about a just-initiated loop in swap.
  56. type loopInInitResult struct {
  57. swap *loopInSwap
  58. serverMessage string
  59. }
  60. // newLoopInSwap initiates a new loop in swap.
  61. func newLoopInSwap(globalCtx context.Context, cfg *swapConfig,
  62. currentHeight int32, request *LoopInRequest) (*loopInInitResult,
  63. error) {
  64. // Request current server loop in terms and use these to calculate the
  65. // swap fee that we should subtract from the swap amount in the payment
  66. // request that we send to the server.
  67. quote, err := cfg.server.GetLoopInQuote(globalCtx, request.Amount)
  68. if err != nil {
  69. return nil, fmt.Errorf("loop in terms: %v", err)
  70. }
  71. swapFee := quote.SwapFee
  72. if swapFee > request.MaxSwapFee {
  73. log.Warnf("Swap fee %v exceeding maximum of %v",
  74. swapFee, request.MaxSwapFee)
  75. return nil, ErrSwapFeeTooHigh
  76. }
  77. // Calculate the swap invoice amount. The prepay is added which
  78. // effectively forces the server to pay us back our prepayment on a
  79. // successful swap.
  80. swapInvoiceAmt := request.Amount - swapFee
  81. // Generate random preimage.
  82. var swapPreimage lntypes.Preimage
  83. if _, err := rand.Read(swapPreimage[:]); err != nil {
  84. log.Error("Cannot generate preimage")
  85. }
  86. swapHash := lntypes.Hash(sha256.Sum256(swapPreimage[:]))
  87. // Derive a sender key for this swap.
  88. keyDesc, err := cfg.lnd.WalletKit.DeriveNextKey(
  89. globalCtx, swap.KeyFamily,
  90. )
  91. if err != nil {
  92. return nil, err
  93. }
  94. var senderKey [33]byte
  95. copy(senderKey[:], keyDesc.PubKey.SerializeCompressed())
  96. // Create the swap invoice in lnd.
  97. _, swapInvoice, err := cfg.lnd.Client.AddInvoice(
  98. globalCtx, &invoicesrpc.AddInvoiceData{
  99. Preimage: &swapPreimage,
  100. Value: lnwire.NewMSatFromSatoshis(swapInvoiceAmt),
  101. Memo: "swap",
  102. Expiry: 3600 * 24 * 365,
  103. },
  104. )
  105. if err != nil {
  106. return nil, err
  107. }
  108. // Create the probe invoice in lnd. Derive the payment hash
  109. // deterministically from the swap hash in such a way that the server
  110. // can be sure that we don't know the preimage.
  111. probeHash := lntypes.Hash(sha256.Sum256(swapHash[:]))
  112. probeHash[0] ^= 1
  113. log.Infof("Creating probe invoice %v", probeHash)
  114. probeInvoice, err := cfg.lnd.Invoices.AddHoldInvoice(
  115. globalCtx, &invoicesrpc.AddInvoiceData{
  116. Hash: &probeHash,
  117. Value: lnwire.NewMSatFromSatoshis(swapInvoiceAmt),
  118. Memo: "loop in probe",
  119. Expiry: 3600,
  120. },
  121. )
  122. if err != nil {
  123. return nil, err
  124. }
  125. // Create a cancellable context that is used for monitoring the probe.
  126. probeWaitCtx, probeWaitCancel := context.WithCancel(globalCtx)
  127. // Launch a goroutine to monitor the probe.
  128. probeResult, err := awaitProbe(probeWaitCtx, *cfg.lnd, probeHash)
  129. if err != nil {
  130. probeWaitCancel()
  131. return nil, fmt.Errorf("probe failed: %v", err)
  132. }
  133. // Post the swap parameters to the swap server. The response contains
  134. // the server success key and the expiry height of the on-chain swap
  135. // htlc.
  136. log.Infof("Initiating swap request at height %v", currentHeight)
  137. swapResp, err := cfg.server.NewLoopInSwap(globalCtx, swapHash,
  138. request.Amount, senderKey, swapInvoice, probeInvoice,
  139. request.LastHop, request.Initiator,
  140. )
  141. probeWaitCancel()
  142. if err != nil {
  143. return nil, fmt.Errorf("cannot initiate swap: %v", err)
  144. }
  145. // Because the context is cancelled, it is guaranteed that we will be
  146. // able to read from the probeResult channel.
  147. err = <-probeResult
  148. if err != nil {
  149. return nil, fmt.Errorf("probe error: %v", err)
  150. }
  151. // Validate the response parameters the prevent us continuing with a
  152. // swap that is based on parameters outside our allowed range.
  153. err = validateLoopInContract(cfg.lnd, currentHeight, request, swapResp)
  154. if err != nil {
  155. return nil, err
  156. }
  157. // Instantiate a struct that contains all required data to start the
  158. // swap.
  159. initiationTime := time.Now()
  160. contract := loopdb.LoopInContract{
  161. HtlcConfTarget: request.HtlcConfTarget,
  162. LastHop: request.LastHop,
  163. ExternalHtlc: request.ExternalHtlc,
  164. SwapContract: loopdb.SwapContract{
  165. InitiationHeight: currentHeight,
  166. InitiationTime: initiationTime,
  167. ReceiverKey: swapResp.receiverKey,
  168. SenderKey: senderKey,
  169. Preimage: swapPreimage,
  170. AmountRequested: request.Amount,
  171. CltvExpiry: swapResp.expiry,
  172. MaxMinerFee: request.MaxMinerFee,
  173. MaxSwapFee: request.MaxSwapFee,
  174. Label: request.Label,
  175. ProtocolVersion: loopdb.CurrentInternalProtocolVersion,
  176. },
  177. }
  178. swapKit := newSwapKit(
  179. swapHash, swap.TypeIn,
  180. cfg, &contract.SwapContract,
  181. )
  182. swapKit.lastUpdateTime = initiationTime
  183. swap := &loopInSwap{
  184. LoopInContract: contract,
  185. swapKit: *swapKit,
  186. }
  187. if err := swap.initHtlcs(); err != nil {
  188. return nil, err
  189. }
  190. // Persist the data before exiting this function, so that the caller can
  191. // trust that this swap will be resumed on restart.
  192. err = cfg.store.CreateLoopIn(swapHash, &swap.LoopInContract)
  193. if err != nil {
  194. return nil, fmt.Errorf("cannot store swap: %v", err)
  195. }
  196. if swapResp.serverMessage != "" {
  197. swap.log.Infof("Server message: %v", swapResp.serverMessage)
  198. }
  199. return &loopInInitResult{
  200. swap: swap,
  201. serverMessage: swapResp.serverMessage,
  202. }, nil
  203. }
  204. // awaitProbe waits for a probe payment to arrive and cancels it. This is a
  205. // workaround for the current lack of multi-path probing.
  206. func awaitProbe(ctx context.Context, lnd lndclient.LndServices,
  207. probeHash lntypes.Hash) (chan error, error) {
  208. // Subscribe to the probe invoice.
  209. updateChan, errChan, err := lnd.Invoices.SubscribeSingleInvoice(
  210. ctx, probeHash,
  211. )
  212. if err != nil {
  213. return nil, err
  214. }
  215. // Wait in the background for the probe to arrive.
  216. probeResult := make(chan error, 1)
  217. go func() {
  218. for {
  219. select {
  220. case update := <-updateChan:
  221. switch update.State {
  222. case channeldb.ContractAccepted:
  223. log.Infof("Server probe successful")
  224. probeResult <- nil
  225. // Cancel probe invoice so that the
  226. // server will know that its probe was
  227. // successful.
  228. err := lnd.Invoices.CancelInvoice(
  229. ctx, probeHash,
  230. )
  231. if err != nil {
  232. log.Errorf("Cancel probe "+
  233. "invoice: %v", err)
  234. }
  235. return
  236. case channeldb.ContractCanceled:
  237. probeResult <- errors.New(
  238. "probe invoice expired")
  239. return
  240. case channeldb.ContractSettled:
  241. probeResult <- errors.New(
  242. "impossible that probe " +
  243. "invoice was settled")
  244. return
  245. }
  246. case err := <-errChan:
  247. probeResult <- err
  248. return
  249. case <-ctx.Done():
  250. probeResult <- ctx.Err()
  251. return
  252. }
  253. }
  254. }()
  255. return probeResult, nil
  256. }
  257. // resumeLoopInSwap returns a swap object representing a pending swap that has
  258. // been restored from the database.
  259. func resumeLoopInSwap(reqContext context.Context, cfg *swapConfig,
  260. pend *loopdb.LoopIn) (*loopInSwap, error) {
  261. hash := lntypes.Hash(sha256.Sum256(pend.Contract.Preimage[:]))
  262. log.Infof("Resuming loop in swap %v", hash)
  263. swapKit := newSwapKit(
  264. hash, swap.TypeIn, cfg,
  265. &pend.Contract.SwapContract,
  266. )
  267. swap := &loopInSwap{
  268. LoopInContract: *pend.Contract,
  269. swapKit: *swapKit,
  270. }
  271. if err := swap.initHtlcs(); err != nil {
  272. return nil, err
  273. }
  274. lastUpdate := pend.LastUpdate()
  275. if lastUpdate == nil {
  276. swap.lastUpdateTime = pend.Contract.InitiationTime
  277. } else {
  278. swap.state = lastUpdate.State
  279. swap.lastUpdateTime = lastUpdate.Time
  280. swap.htlcTxHash = lastUpdate.HtlcTxHash
  281. }
  282. return swap, nil
  283. }
  284. // validateLoopInContract validates the contract parameters against our
  285. // request.
  286. func validateLoopInContract(lnd *lndclient.LndServices,
  287. height int32,
  288. request *LoopInRequest,
  289. response *newLoopInResponse) error {
  290. // Verify that we are not forced to publish an htlc that locks up our
  291. // funds for too long in case the server doesn't follow through.
  292. if response.expiry-height > MaxLoopInAcceptDelta {
  293. return ErrExpiryTooFar
  294. }
  295. return nil
  296. }
  297. // initHtlcs creates and updates the native and nested segwit htlcs
  298. // of the loopInSwap.
  299. func (s *loopInSwap) initHtlcs() error {
  300. htlcP2WSH, err := s.swapKit.getHtlc(swap.HtlcP2WSH)
  301. if err != nil {
  302. return err
  303. }
  304. htlcNP2WSH, err := s.swapKit.getHtlc(swap.HtlcNP2WSH)
  305. if err != nil {
  306. return err
  307. }
  308. // Log htlc addresses for debugging.
  309. s.swapKit.log.Infof("Htlc address (P2WSH): %v", htlcP2WSH.Address)
  310. s.swapKit.log.Infof("Htlc address (NP2WSH): %v", htlcNP2WSH.Address)
  311. s.htlcP2WSH = htlcP2WSH
  312. s.htlcNP2WSH = htlcNP2WSH
  313. return nil
  314. }
  315. // sendUpdate reports an update to the swap state.
  316. func (s *loopInSwap) sendUpdate(ctx context.Context) error {
  317. info := s.swapInfo()
  318. s.log.Infof("Loop in swap state: %v", info.State)
  319. info.HtlcAddressP2WSH = s.htlcP2WSH.Address
  320. info.HtlcAddressNP2WSH = s.htlcNP2WSH.Address
  321. info.ExternalHtlc = s.ExternalHtlc
  322. select {
  323. case s.statusChan <- *info:
  324. case <-ctx.Done():
  325. return ctx.Err()
  326. }
  327. return nil
  328. }
  329. // execute starts/resumes the swap. It is a thin wrapper around executeSwap to
  330. // conveniently handle the error case.
  331. func (s *loopInSwap) execute(mainCtx context.Context,
  332. cfg *executeConfig, height int32) error {
  333. defer s.wg.Wait()
  334. s.executeConfig = *cfg
  335. s.height = height
  336. // Create context for our state subscription which we will cancel once
  337. // swap execution has completed, ensuring that we kill the subscribe
  338. // goroutine.
  339. subCtx, cancel := context.WithCancel(mainCtx)
  340. defer cancel()
  341. s.wg.Add(1)
  342. go func() {
  343. defer s.wg.Done()
  344. subscribeAndLogUpdates(
  345. subCtx, s.hash, s.log, s.server.SubscribeLoopInUpdates,
  346. )
  347. }()
  348. // Announce swap by sending out an initial update.
  349. err := s.sendUpdate(mainCtx)
  350. if err != nil {
  351. return err
  352. }
  353. // Execute the swap until it either reaches a final state or a temporary
  354. // error occurs.
  355. err = s.executeSwap(mainCtx)
  356. // Sanity check. If there is no error, the swap must be in a final
  357. // state.
  358. if err == nil && s.state.Type() == loopdb.StateTypePending {
  359. err = fmt.Errorf("swap in non-final state %v", s.state)
  360. }
  361. // If an unexpected error happened, report a temporary failure
  362. // but don't persist the error. Otherwise for example a
  363. // connection error could lead to abandoning the swap
  364. // permanently and losing funds.
  365. if err != nil {
  366. s.log.Errorf("Swap error: %v", err)
  367. s.setState(loopdb.StateFailTemporary)
  368. // If we cannot send out this update, there is nothing we can do.
  369. _ = s.sendUpdate(mainCtx)
  370. return err
  371. }
  372. s.log.Infof("Loop in swap completed: %v "+
  373. "(final cost: server %v, onchain %v, offchain %v)",
  374. s.state,
  375. s.cost.Server,
  376. s.cost.Onchain,
  377. s.cost.Offchain,
  378. )
  379. return nil
  380. }
  381. // executeSwap executes the swap.
  382. func (s *loopInSwap) executeSwap(globalCtx context.Context) error {
  383. var err error
  384. // For loop in, the client takes the first step by publishing the
  385. // on-chain htlc. Only do this is we haven't already done so in a
  386. // previous run.
  387. if s.state == loopdb.StateInitiated {
  388. if s.ExternalHtlc {
  389. // If an external htlc was indicated, we can move to the
  390. // HtlcPublished state directly and wait for
  391. // confirmation.
  392. s.setState(loopdb.StateHtlcPublished)
  393. err = s.persistAndAnnounceState(globalCtx)
  394. if err != nil {
  395. return err
  396. }
  397. } else {
  398. published, err := s.publishOnChainHtlc(globalCtx)
  399. if err != nil {
  400. return err
  401. }
  402. if !published {
  403. return nil
  404. }
  405. }
  406. }
  407. // Wait for the htlc to confirm. After a restart this will pick up a
  408. // previously published tx.
  409. conf, err := s.waitForHtlcConf(globalCtx)
  410. if err != nil {
  411. return err
  412. }
  413. // Determine the htlc outpoint by inspecting the htlc tx.
  414. htlcOutpoint, htlcValue, err := swap.GetScriptOutput(
  415. conf.Tx, s.htlc.PkScript,
  416. )
  417. if err != nil {
  418. return err
  419. }
  420. // Verify that the confirmed (external) htlc value matches the swap
  421. // amount. Otherwise fail the swap immediately.
  422. if htlcValue != s.LoopInContract.AmountRequested {
  423. s.setState(loopdb.StateFailIncorrectHtlcAmt)
  424. return s.persistAndAnnounceState(globalCtx)
  425. }
  426. // TODO: Add miner fee of htlc tx to swap cost balance.
  427. // The server is expected to see the htlc on-chain and knowing that it
  428. // can sweep that htlc with the preimage, it should pay our swap
  429. // invoice, receive the preimage and sweep the htlc. We are waiting for
  430. // this to happen and simultaneously watch the htlc expiry height. When
  431. // the htlc expires, we will publish a timeout tx to reclaim the funds.
  432. err = s.waitForSwapComplete(globalCtx, htlcOutpoint, htlcValue)
  433. if err != nil {
  434. return err
  435. }
  436. // Persist swap outcome.
  437. if err := s.persistAndAnnounceState(globalCtx); err != nil {
  438. return err
  439. }
  440. return nil
  441. }
  442. // waitForHtlcConf watches the chain until the htlc confirms.
  443. func (s *loopInSwap) waitForHtlcConf(globalCtx context.Context) (
  444. *chainntnfs.TxConfirmation, error) {
  445. // Register for confirmation of the htlc. It is essential to specify not
  446. // just the pk script, because an attacker may publish the same htlc
  447. // with a lower value and we don't want to follow through with that tx.
  448. // In the unlikely event that our call to SendOutputs crashes and we
  449. // restart, htlcTxHash will be nil at this point. Then only register
  450. // with PkScript and accept the risk that the call triggers on a
  451. // different htlc outpoint.
  452. s.log.Infof("Register for htlc conf (hh=%v, txid=%v)",
  453. s.InitiationHeight, s.htlcTxHash)
  454. if s.htlcTxHash == nil {
  455. s.log.Warnf("No htlc tx hash available, registering with " +
  456. "just the pkscript")
  457. }
  458. ctx, cancel := context.WithCancel(globalCtx)
  459. defer cancel()
  460. notifier := s.lnd.ChainNotifier
  461. confChanP2WSH, confErrP2WSH, err := notifier.RegisterConfirmationsNtfn(
  462. ctx, s.htlcTxHash, s.htlcP2WSH.PkScript, 1, s.InitiationHeight,
  463. )
  464. if err != nil {
  465. return nil, err
  466. }
  467. confChanNP2WSH, confErrNP2WSH, err := notifier.RegisterConfirmationsNtfn(
  468. ctx, s.htlcTxHash, s.htlcNP2WSH.PkScript, 1, s.InitiationHeight,
  469. )
  470. if err != nil {
  471. return nil, err
  472. }
  473. var conf *chainntnfs.TxConfirmation
  474. for conf == nil {
  475. select {
  476. // P2WSH htlc confirmed.
  477. case conf = <-confChanP2WSH:
  478. s.htlc = s.htlcP2WSH
  479. s.log.Infof("P2WSH htlc confirmed")
  480. // NP2WSH htlc confirmed.
  481. case conf = <-confChanNP2WSH:
  482. s.htlc = s.htlcNP2WSH
  483. s.log.Infof("NP2WSH htlc confirmed")
  484. // Conf ntfn error.
  485. case err := <-confErrP2WSH:
  486. return nil, err
  487. // Conf ntfn error.
  488. case err := <-confErrNP2WSH:
  489. return nil, err
  490. // Keep up with block height.
  491. case notification := <-s.blockEpochChan:
  492. s.height = notification.(int32)
  493. // Cancel.
  494. case <-globalCtx.Done():
  495. return nil, globalCtx.Err()
  496. }
  497. }
  498. // Store htlc tx hash for accounting purposes. Usually this call is a
  499. // no-op because the htlc tx hash was already known. Exceptions are:
  500. //
  501. // - Old pending swaps that were initiated before we persisted the htlc
  502. // tx hash directly after publish.
  503. //
  504. // - Swaps that experienced a crash during their call to SendOutputs. In
  505. // that case, we weren't able to record the tx hash.
  506. txHash := conf.Tx.TxHash()
  507. s.htlcTxHash = &txHash
  508. return conf, nil
  509. }
  510. // publishOnChainHtlc checks whether there are still enough blocks left and if
  511. // so, it publishes the htlc and advances the swap state.
  512. func (s *loopInSwap) publishOnChainHtlc(ctx context.Context) (bool, error) {
  513. var err error
  514. blocksRemaining := s.CltvExpiry - s.height
  515. s.log.Infof("Blocks left until on-chain expiry: %v", blocksRemaining)
  516. // Verify whether it still makes sense to publish the htlc.
  517. if blocksRemaining < MinLoopInPublishDelta {
  518. s.setState(loopdb.StateFailTimeout)
  519. return false, s.persistAndAnnounceState(ctx)
  520. }
  521. // Get fee estimate from lnd.
  522. feeRate, err := s.lnd.WalletKit.EstimateFee(
  523. ctx, s.LoopInContract.HtlcConfTarget,
  524. )
  525. if err != nil {
  526. return false, fmt.Errorf("estimate fee: %v", err)
  527. }
  528. // Transition to state HtlcPublished before calling SendOutputs to
  529. // prevent us from ever paying multiple times after a crash.
  530. s.setState(loopdb.StateHtlcPublished)
  531. err = s.persistAndAnnounceState(ctx)
  532. if err != nil {
  533. return false, err
  534. }
  535. s.log.Infof("Publishing on chain HTLC with fee rate %v", feeRate)
  536. // Internal loop-in is always P2WSH.
  537. tx, err := s.lnd.WalletKit.SendOutputs(
  538. ctx, []*wire.TxOut{{
  539. PkScript: s.htlcP2WSH.PkScript,
  540. Value: int64(s.LoopInContract.AmountRequested),
  541. }}, feeRate, labels.LoopInHtlcLabel(swap.ShortHash(&s.hash)),
  542. )
  543. if err != nil {
  544. return false, fmt.Errorf("send outputs: %v", err)
  545. }
  546. txHash := tx.TxHash()
  547. s.log.Infof("Published on chain HTLC tx %v", txHash)
  548. // Persist the htlc hash so that after a restart we are still waiting
  549. // for our own htlc. We don't need to announce to clients, because the
  550. // state remains unchanged.
  551. //
  552. // TODO(joostjager): Store tx hash before calling SendOutputs. This is
  553. // not yet possible with the current lnd api.
  554. s.htlcTxHash = &txHash
  555. s.lastUpdateTime = time.Now()
  556. if err := s.persistState(); err != nil {
  557. return false, fmt.Errorf("persist htlc tx: %v", err)
  558. }
  559. return true, nil
  560. }
  561. // waitForSwapComplete waits until a spending tx of the htlc gets confirmed and
  562. // the swap invoice is either settled or canceled. If the htlc times out, the
  563. // timeout tx will be published.
  564. func (s *loopInSwap) waitForSwapComplete(ctx context.Context,
  565. htlcOutpoint *wire.OutPoint, htlcValue btcutil.Amount) error {
  566. // Register the htlc spend notification.
  567. rpcCtx, cancel := context.WithCancel(ctx)
  568. defer cancel()
  569. spendChan, spendErr, err := s.lnd.ChainNotifier.RegisterSpendNtfn(
  570. rpcCtx, htlcOutpoint, s.htlc.PkScript, s.InitiationHeight,
  571. )
  572. if err != nil {
  573. return fmt.Errorf("register spend ntfn: %v", err)
  574. }
  575. // Register for swap invoice updates.
  576. rpcCtx, cancel = context.WithCancel(ctx)
  577. defer cancel()
  578. s.log.Infof("Subscribing to swap invoice %v", s.hash)
  579. swapInvoiceChan, swapInvoiceErr, err := s.lnd.Invoices.SubscribeSingleInvoice(
  580. rpcCtx, s.hash,
  581. )
  582. if err != nil {
  583. return fmt.Errorf("subscribe to swap invoice: %v", err)
  584. }
  585. // checkTimeout publishes the timeout tx if the contract has expired.
  586. checkTimeout := func() error {
  587. if s.height >= s.LoopInContract.CltvExpiry {
  588. return s.publishTimeoutTx(ctx, htlcOutpoint, htlcValue)
  589. }
  590. return nil
  591. }
  592. // Check timeout at current height. After a restart we may want to
  593. // publish the tx immediately.
  594. err = checkTimeout()
  595. if err != nil {
  596. return err
  597. }
  598. htlcSpend := false
  599. invoiceFinalized := false
  600. for !htlcSpend || !invoiceFinalized {
  601. select {
  602. // Spend notification error.
  603. case err := <-spendErr:
  604. return err
  605. // Receive block epochs and start publishing the timeout tx
  606. // whenever possible.
  607. case notification := <-s.blockEpochChan:
  608. s.height = notification.(int32)
  609. err := checkTimeout()
  610. if err != nil {
  611. return err
  612. }
  613. // The htlc spend is confirmed. Inspect the spending tx to
  614. // determine the final swap state.
  615. case spendDetails := <-spendChan:
  616. s.log.Infof("Htlc spend by tx: %v",
  617. spendDetails.SpenderTxHash)
  618. err := s.processHtlcSpend(
  619. ctx, spendDetails, htlcValue,
  620. )
  621. if err != nil {
  622. return err
  623. }
  624. htlcSpend = true
  625. // Swap invoice ntfn error.
  626. case err := <-swapInvoiceErr:
  627. return err
  628. // An update to the swap invoice occurred. Check the new state
  629. // and update the swap state accordingly.
  630. case update := <-swapInvoiceChan:
  631. s.log.Infof("Received swap invoice update: %v",
  632. update.State)
  633. switch update.State {
  634. // Swap invoice was paid, so update server cost balance.
  635. case channeldb.ContractSettled:
  636. s.cost.Server -= update.AmtPaid
  637. // If invoice settlement and htlc spend happen
  638. // in the expected order, move the swap to an
  639. // intermediate state that indicates that the
  640. // swap is complete from the user point of view,
  641. // but still incomplete with regards to
  642. // accounting data.
  643. if s.state == loopdb.StateHtlcPublished {
  644. s.setState(loopdb.StateInvoiceSettled)
  645. err := s.persistAndAnnounceState(ctx)
  646. if err != nil {
  647. return err
  648. }
  649. }
  650. invoiceFinalized = true
  651. // Canceled invoice has no effect on server cost
  652. // balance.
  653. case channeldb.ContractCanceled:
  654. invoiceFinalized = true
  655. }
  656. case <-ctx.Done():
  657. return ctx.Err()
  658. }
  659. }
  660. return nil
  661. }
  662. func (s *loopInSwap) processHtlcSpend(ctx context.Context,
  663. spend *chainntnfs.SpendDetail, htlcValue btcutil.Amount) error {
  664. // Determine the htlc input of the spending tx and inspect the witness
  665. // to findout whether a success or a timeout tx spend the htlc.
  666. htlcInput := spend.SpendingTx.TxIn[spend.SpenderInputIndex]
  667. if s.htlc.IsSuccessWitness(htlcInput.Witness) {
  668. s.setState(loopdb.StateSuccess)
  669. // Server swept the htlc. The htlc value can be added to the
  670. // server cost balance.
  671. s.cost.Server += htlcValue
  672. } else {
  673. s.setState(loopdb.StateFailTimeout)
  674. // Now that the timeout tx confirmed, we can safely cancel the
  675. // swap invoice. We still need to query the final invoice state.
  676. // This is not a hodl invoice, so it may be that the invoice was
  677. // already settled. This means that the server didn't succeed in
  678. // sweeping the htlc after paying the invoice.
  679. err := s.lnd.Invoices.CancelInvoice(ctx, s.hash)
  680. if err != nil && err != channeldb.ErrInvoiceAlreadySettled {
  681. return err
  682. }
  683. // TODO: Add miner fee of timeout tx to swap cost balance.
  684. }
  685. return nil
  686. }
  687. // publishTimeoutTx publishes a timeout tx after the on-chain htlc has expired.
  688. // The swap failed and we are reclaiming our funds.
  689. func (s *loopInSwap) publishTimeoutTx(ctx context.Context,
  690. htlcOutpoint *wire.OutPoint, htlcValue btcutil.Amount) error {
  691. if s.timeoutAddr == nil {
  692. var err error
  693. s.timeoutAddr, err = s.lnd.WalletKit.NextAddr(ctx)
  694. if err != nil {
  695. return err
  696. }
  697. }
  698. // Calculate sweep tx fee
  699. fee, err := s.sweeper.GetSweepFee(
  700. ctx, s.htlc.AddTimeoutToEstimator, s.timeoutAddr,
  701. TimeoutTxConfTarget,
  702. )
  703. if err != nil {
  704. return err
  705. }
  706. witnessFunc := func(sig []byte) (wire.TxWitness, error) {
  707. return s.htlc.GenTimeoutWitness(sig), nil
  708. }
  709. sequence := uint32(0)
  710. timeoutTx, err := s.sweeper.CreateSweepTx(
  711. ctx, s.height, sequence, s.htlc, *htlcOutpoint, s.SenderKey,
  712. witnessFunc, htlcValue, fee, s.timeoutAddr,
  713. )
  714. if err != nil {
  715. return err
  716. }
  717. timeoutTxHash := timeoutTx.TxHash()
  718. s.log.Infof("Publishing timeout tx %v with fee %v to addr %v",
  719. timeoutTxHash, fee, s.timeoutAddr)
  720. err = s.lnd.WalletKit.PublishTransaction(
  721. ctx, timeoutTx,
  722. labels.LoopInSweepTimeout(swap.ShortHash(&s.hash)),
  723. )
  724. if err != nil {
  725. s.log.Warnf("publish timeout: %v", err)
  726. }
  727. return nil
  728. }
  729. // persistAndAnnounceState updates the swap state on disk and sends out an
  730. // update notification.
  731. func (s *loopInSwap) persistAndAnnounceState(ctx context.Context) error {
  732. // Update state in store.
  733. if err := s.persistState(); err != nil {
  734. return err
  735. }
  736. // Send out swap update
  737. return s.sendUpdate(ctx)
  738. }
  739. // persistState updates the swap state on disk.
  740. func (s *loopInSwap) persistState() error {
  741. return s.store.UpdateLoopIn(
  742. s.hash, s.lastUpdateTime,
  743. loopdb.SwapStateData{
  744. State: s.state,
  745. Cost: s.cost,
  746. HtlcTxHash: s.htlcTxHash,
  747. },
  748. )
  749. }
  750. // setState updates the swap state and last update timestamp.
  751. func (s *loopInSwap) setState(state loopdb.SwapState) {
  752. s.lastUpdateTime = time.Now()
  753. s.state = state
  754. }