fix pubsub

pull/1/head
Edouard Paris 5 years ago
parent c6eb89150d
commit 64d0388015

@ -80,7 +80,14 @@ func pubsubRun(c *cli.Context) error {
} }
events := make(chan *events.Event) events := make(chan *events.Event)
return pubsub.Run(context.Background(), app, events) err = pubsub.Run(context.Background(), app, events)
if err != nil {
return err
}
//ev := <-events
//app.Logger.Info("events quit ", logging.String("type", ev.Type))
return nil
} }
func getNetworkFromConfig(c *cli.Context) (*network.Network, error) { func getNetworkFromConfig(c *cli.Context) (*network.Network, error) {

@ -1,6 +1,7 @@
package events package events
const ( const (
Quit = "quit"
InvoiceCreated = "invoice.created" InvoiceCreated = "invoice.created"
InvoiceSettled = "invoice.settled" InvoiceSettled = "invoice.settled"
) )

@ -5,6 +5,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"sync" "sync"
"time"
"github.com/edouardparis/lntop/app" "github.com/edouardparis/lntop/app"
"github.com/edouardparis/lntop/events" "github.com/edouardparis/lntop/events"
@ -33,33 +34,35 @@ func (p *pubSub) invoices(ctx context.Context, sub chan *events.Event) {
p.wg.Add(2) p.wg.Add(2)
invoices := make(chan *models.Invoice) invoices := make(chan *models.Invoice)
go func() {
for invoice := range invoices {
p.logger.Debug("receive invoice", logging.Object("invoice", invoice))
if invoice.Settled {
sub <- events.New(events.InvoiceSettled)
} else {
sub <- events.New(events.InvoiceCreated)
}
}
p.wg.Done()
}()
go func() { go func() {
for { for {
select { select {
case <-p.stop: case <-p.stop:
p.logger.Info("stop invoices")
close(invoices)
p.logger.Info("close invoices")
p.wg.Done() p.wg.Done()
return return
default: default:
invoice := <-invoices time.Sleep(3 * time.Second)
p.logger.Debug("receive invoice", logging.Object("invoice", invoice)) p.logger.Info("loop")
if invoice.Settled { //err := p.network.SubscribeInvoice(ctx, invoices)
sub <- events.New(events.InvoiceSettled) //if err != nil {
} else { //
sub <- events.New(events.InvoiceCreated) //p.logger.Error("SubscribeInvoice returned an error", logging.Error(err))
} //}
}
}
}()
go func() {
select {
case <-p.stop:
p.wg.Done()
return
default:
err := p.network.SubscribeInvoice(ctx, invoices)
if err != nil {
p.logger.Error("SubscribeInvoice returned an error", logging.Error(err))
} }
} }
}() }()
@ -70,11 +73,11 @@ func (p *pubSub) wait() {
signal.Notify(c, os.Interrupt) signal.Notify(c, os.Interrupt)
p.wg.Add(1) p.wg.Add(1)
go func() { go func() {
for sig := range c { sig := <-c
p.logger.Debug("Received signal, gracefully stopping", logging.String("sig", sig.String())) p.logger.Debug("Received signal, gracefully stopping", logging.String("sig", sig.String()))
p.wg.Done() p.stop <- true
close(p.stop) close(p.stop)
} p.wg.Done()
}() }()
p.wg.Wait() p.wg.Wait()
} }

Loading…
Cancel
Save