From 64d0388015808bc4adb070aaefbfd9e47b63050a Mon Sep 17 00:00:00 2001 From: Edouard Paris Date: Fri, 29 Mar 2019 17:02:52 +0100 Subject: [PATCH] fix pubsub --- cli/cli.go | 9 +++++++- events/events.go | 1 + pubsub/pubsub.go | 53 +++++++++++++++++++++++++----------------------- 3 files changed, 37 insertions(+), 26 deletions(-) diff --git a/cli/cli.go b/cli/cli.go index 962b63d..d5dd559 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -80,7 +80,14 @@ func pubsubRun(c *cli.Context) error { } events := make(chan *events.Event) - return pubsub.Run(context.Background(), app, events) + err = pubsub.Run(context.Background(), app, events) + if err != nil { + return err + } + //ev := <-events + //app.Logger.Info("events quit ", logging.String("type", ev.Type)) + + return nil } func getNetworkFromConfig(c *cli.Context) (*network.Network, error) { diff --git a/events/events.go b/events/events.go index 6f68c70..9329853 100644 --- a/events/events.go +++ b/events/events.go @@ -1,6 +1,7 @@ package events const ( + Quit = "quit" InvoiceCreated = "invoice.created" InvoiceSettled = "invoice.settled" ) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index c69e40b..41b2290 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -5,6 +5,7 @@ import ( "os" "os/signal" "sync" + "time" "github.com/edouardparis/lntop/app" "github.com/edouardparis/lntop/events" @@ -33,33 +34,35 @@ func (p *pubSub) invoices(ctx context.Context, sub chan *events.Event) { p.wg.Add(2) invoices := make(chan *models.Invoice) + go func() { + for invoice := range invoices { + p.logger.Debug("receive invoice", logging.Object("invoice", invoice)) + if invoice.Settled { + sub <- events.New(events.InvoiceSettled) + } else { + sub <- events.New(events.InvoiceCreated) + } + } + p.wg.Done() + }() + go func() { for { select { case <-p.stop: + p.logger.Info("stop invoices") + close(invoices) + p.logger.Info("close invoices") p.wg.Done() return default: - invoice := <-invoices - p.logger.Debug("receive invoice", logging.Object("invoice", invoice)) - if invoice.Settled { - sub <- events.New(events.InvoiceSettled) - } else { - sub <- events.New(events.InvoiceCreated) - } - } - } - }() - - go func() { - select { - case <-p.stop: - p.wg.Done() - return - default: - err := p.network.SubscribeInvoice(ctx, invoices) - if err != nil { - p.logger.Error("SubscribeInvoice returned an error", logging.Error(err)) + time.Sleep(3 * time.Second) + p.logger.Info("loop") + //err := p.network.SubscribeInvoice(ctx, invoices) + //if err != nil { + // + //p.logger.Error("SubscribeInvoice returned an error", logging.Error(err)) + //} } } }() @@ -70,11 +73,11 @@ func (p *pubSub) wait() { signal.Notify(c, os.Interrupt) p.wg.Add(1) go func() { - for sig := range c { - p.logger.Debug("Received signal, gracefully stopping", logging.String("sig", sig.String())) - p.wg.Done() - close(p.stop) - } + sig := <-c + p.logger.Debug("Received signal, gracefully stopping", logging.String("sig", sig.String())) + p.stop <- true + close(p.stop) + p.wg.Done() }() p.wg.Wait() }