Browse Source

handle units with same base Type, tests

master
Chakib Benziane 1 year ago
parent
commit
10fd30ba32
3 changed files with 126 additions and 10 deletions
  1. 3
    1
      README.md
  2. 41
    9
      manager.go
  3. 82
    0
      manager_test.go

+ 3
- 1
README.md View File

@@ -35,7 +35,7 @@ import (
35 35
 
36 36
 type Worker struct{}
37 37
 
38
-// Example loop, it will be spwaned in a goroutine
38
+// Example loop, will be spwaned inside a goroutine
39 39
 func (w *Worker) Spawn(um UnitManager) {
40 40
     ticker := time.NewTicker(time.Second)
41 41
 
@@ -75,9 +75,11 @@ func main() {
75 75
 
76 76
     // NewWorker returns a type implementing WorkUnit interface unit :=
77 77
     worker := NewWorker()
78
+    worker2 := NewWorker()
78 79
 
79 80
     // Register the unit with the manager
80 81
     manager.AddUnit(worker)
82
+    manager.AddUnit(worker2)
81 83
 
82 84
     // Start the manager
83 85
     go manager.Start()

+ 41
- 9
manager.go View File

@@ -5,9 +5,12 @@ import (
5 5
 	"os"
6 6
 	"os/signal"
7 7
 	"reflect"
8
+	"strconv"
8 9
 	"strings"
9 10
 )
10 11
 
12
+var idGen = IdGenerator()
13
+
11 14
 type WorkUnit interface {
12 15
 	Spawn(UnitManager)
13 16
 	Shutdown()
@@ -43,7 +46,9 @@ func (w *WorkUnitManager) Panic(err error) {
43 46
 }
44 47
 
45 48
 type Manager struct {
46
-	signal chan os.Signal
49
+	signalIn chan os.Signal
50
+
51
+	shutdownSigs []os.Signal
47 52
 
48 53
 	workers map[string]*WorkUnitManager
49 54
 
@@ -62,8 +67,9 @@ func (m *Manager) Start() {
62 67
 
63 68
 	for {
64 69
 		select {
65
-		case sig := <-m.signal:
66
-			if sig != os.Interrupt {
70
+		case sig := <-m.signalIn:
71
+
72
+			if !in(m.shutdownSigs, sig) {
67 73
 				break
68 74
 			}
69 75
 
@@ -120,7 +126,21 @@ func (m *Manager) Start() {
120 126
 }
121 127
 
122 128
 func (m *Manager) ShutdownOn(sig os.Signal) {
123
-	signal.Notify(m.signal, sig)
129
+	signal.Notify(m.signalIn, sig)
130
+
131
+	m.shutdownSigs = append(m.shutdownSigs, sig)
132
+}
133
+
134
+type IDGenerator func(string) int
135
+
136
+func IdGenerator() IDGenerator {
137
+	ids := make(map[string]int)
138
+
139
+	return func(unit string) int {
140
+		ret := ids[unit]
141
+		ids[unit]++
142
+		return ret
143
+	}
124 144
 }
125 145
 
126 146
 func (m *Manager) AddUnit(unit WorkUnit) {
@@ -135,17 +155,29 @@ func (m *Manager) AddUnit(unit WorkUnit) {
135 155
 	unitType := reflect.TypeOf(unit)
136 156
 	unitName := strings.Split(unitType.String(), ".")[1]
137 157
 
158
+	unitId := idGen(unitName)
159
+	unitName += strconv.Itoa(unitId)
160
+
138 161
 	log.Println("Adding unit ", unitName)
139 162
 
140 163
 	m.workers[unitName] = workUnitManager
141
-	log.Println(m.workers)
142 164
 }
143 165
 
144 166
 func NewManager() *Manager {
145 167
 	return &Manager{
146
-		signal:  make(chan os.Signal, 1),
147
-		Quit:    make(chan bool, 1),
148
-		workers: make(map[string]*WorkUnitManager),
149
-		panic:   make(chan error, 1),
168
+		signalIn: make(chan os.Signal, 1),
169
+		Quit:     make(chan bool, 1),
170
+		workers:  make(map[string]*WorkUnitManager),
171
+		panic:    make(chan error, 1),
172
+	}
173
+}
174
+
175
+// Test if signal is in array
176
+func in(arr []os.Signal, sig os.Signal) bool {
177
+	for _, s := range arr {
178
+		if s == sig {
179
+			return true
180
+		}
150 181
 	}
182
+	return false
151 183
 }

+ 82
- 0
manager_test.go View File

@@ -0,0 +1,82 @@
1
+package gum
2
+
3
+import (
4
+	"log"
5
+	"os"
6
+	"syscall"
7
+	"testing"
8
+	"time"
9
+)
10
+
11
+var WorkerID int
12
+
13
+type Worker struct{}
14
+
15
+// Example loop, it will be spwaned in a goroutine
16
+func (w *Worker) Spawn(um UnitManager) {
17
+	ticker := time.NewTicker(time.Second)
18
+
19
+	// Worker's loop
20
+	for {
21
+		select {
22
+		case <-ticker.C:
23
+			log.Println("tick")
24
+
25
+		// Read from channel if this worker unit should stop
26
+		case <-um.ShouldStop():
27
+
28
+			// Shutdown work for current unit
29
+			w.Shutdown()
30
+
31
+			// Notify manager that this unit is done.
32
+			um.Done()
33
+		}
34
+	}
35
+}
36
+
37
+func (w *Worker) Shutdown() {
38
+	// Do shutdown procedure for worker
39
+	return
40
+}
41
+
42
+func NewWorker() *Worker {
43
+	return &Worker{}
44
+}
45
+
46
+func DoRunMain(pid chan int, quit chan<- bool) {
47
+
48
+	pid <- os.Getpid()
49
+
50
+	// Create a unit manager
51
+	manager := NewManager()
52
+
53
+	// Shutdown all units on SIGINT
54
+	manager.ShutdownOn(os.Interrupt)
55
+
56
+	// NewWorker returns a type implementing WorkUnit interface unit :=
57
+	worker1 := NewWorker()
58
+	worker2 := NewWorker()
59
+
60
+	// Register the unit with the manager
61
+	manager.AddUnit(worker1)
62
+	manager.AddUnit(worker2)
63
+
64
+	// Start the manager
65
+	go manager.Start()
66
+
67
+	// Wait for all units to shutdown gracefully through their `Shutdown` method
68
+	<-manager.Quit
69
+	quit <- true
70
+}
71
+
72
+func TestRunMain(t *testing.T) {
73
+	mainPid := make(chan int, 1)
74
+	quit := make(chan bool)
75
+	go DoRunMain(mainPid, quit)
76
+
77
+	time.Sleep(3 * time.Second)
78
+
79
+	syscall.Kill(<-mainPid, syscall.SIGINT)
80
+	<-quit
81
+
82
+}

Loading…
Cancel
Save