Merge pull request #667 from sputn1ck/expand_fsm

fsm: expand fsm
pull/669/head
Konstantin Nick 5 months ago committed by GitHub
commit 3743a49f27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -31,7 +31,7 @@ func NewExampleFSMContext(service ExampleService,
service: service,
store: store,
}
exampleFSM.StateMachine = NewStateMachine(exampleFSM.GetStates())
exampleFSM.StateMachine = NewStateMachine(exampleFSM.GetStates(), 10)
return exampleFSM
}
@ -55,7 +55,7 @@ var (
// GetStates returns the states for the example FSM.
func (e *ExampleFSM) GetStates() States {
return States{
Default: State{
EmptyState: State{
Transitions: Transitions{
OnRequestStuff: InitFSM,
},

@ -17,8 +17,8 @@ var (
)
const (
// Default represents the default state of the system.
Default StateType = ""
// EmptyState represents the default state of the system.
EmptyState StateType = ""
// NoOp represents a no-op event.
NoOp EventType = "NoOp"
@ -88,19 +88,19 @@ type StateMachine struct {
// ActionEntryFunc is a function that is called before an action is
// executed.
ActionEntryFunc func()
ActionEntryFunc func(Notification)
// ActionExitFunc is a function that is called after an action is
// executed.
ActionExitFunc func()
// mutex ensures that only 1 event is processed by the state machine at
// any given time.
mutex sync.Mutex
// executed, it is called with the EventType returned by the action.
ActionExitFunc func(NextEvent EventType)
// LastActionError is an error set by the last action executed.
LastActionError error
// DefaultObserver is the default observer that is notified when the
// state machine transitions between states.
DefaultObserver *CachedObserver
// previous represents the previous state.
previous StateType
@ -114,13 +114,35 @@ type StateMachine struct {
// observerMutex ensures that observers are only added or removed
// safely.
observerMutex sync.Mutex
// mutex ensures that only 1 event is processed by the state machine at
// any given time.
mutex sync.Mutex
}
// NewStateMachine creates a new state machine.
func NewStateMachine(states States) *StateMachine {
func NewStateMachine(states States, observerSize int) *StateMachine {
return NewStateMachineWithState(states, EmptyState, observerSize)
}
// NewStateMachineWithState creates a new state machine and sets the initial
// state.
func NewStateMachineWithState(states States, current StateType,
observerSize int) *StateMachine {
observers := []Observer{}
var defaultObserver *CachedObserver
if observerSize > 0 {
defaultObserver = NewCachedObserver(observerSize)
observers = append(observers, defaultObserver)
}
return &StateMachine{
States: states,
observers: make([]Observer, 0),
States: states,
current: current,
DefaultObserver: defaultObserver,
observers: observers,
}
}
@ -189,18 +211,20 @@ func (s *StateMachine) SendEvent(event EventType, eventCtx EventContext) error {
// Notify the state machine's observers.
s.observerMutex.Lock()
notification := Notification{
PreviousState: s.previous,
NextState: s.current,
Event: event,
}
for _, observer := range s.observers {
observer.Notify(Notification{
PreviousState: s.previous,
NextState: s.current,
Event: event,
})
observer.Notify(notification)
}
s.observerMutex.Unlock()
// Execute the state machines ActionEntryFunc.
if s.ActionEntryFunc != nil {
s.ActionEntryFunc()
s.ActionEntryFunc(notification)
}
// Execute the current state's entry function
@ -219,7 +243,7 @@ func (s *StateMachine) SendEvent(event EventType, eventCtx EventContext) error {
// Execute the state machines ActionExitFunc.
if s.ActionExitFunc != nil {
s.ActionExitFunc()
s.ActionExitFunc(nextEvent)
}
// If the next event is a no-op, we're done.

@ -46,9 +46,60 @@ func (c *CachedObserver) GetCachedNotifications() []Notification {
return c.cachedNotifications.Get()
}
// WaitForStateOption is an option that can be passed to the WaitForState
// function.
type WaitForStateOption interface {
apply(*fsmOptions)
}
// fsmOptions is a struct that holds all options that can be passed to the
// WaitForState function.
type fsmOptions struct {
initialWait time.Duration
}
// InitialWaitOption is an option that can be passed to the WaitForState
// function to wait for a given duration before checking the state.
type InitialWaitOption struct {
initialWait time.Duration
}
// WithWaitForStateOption creates a new InitialWaitOption.
func WithWaitForStateOption(initialWait time.Duration) WaitForStateOption {
return &InitialWaitOption{
initialWait,
}
}
// apply implements the WaitForStateOption interface.
func (w *InitialWaitOption) apply(o *fsmOptions) {
o.initialWait = w.initialWait
}
// WaitForState waits for the state machine to reach the given state.
// If the optional initialWait parameter is set, the function will wait for
// the given duration before checking the state. This is useful if the
// function is called immediately after sending an event to the state machine
// and the state machine needs some time to process the event.
func (s *CachedObserver) WaitForState(ctx context.Context,
timeout time.Duration, state StateType) error {
timeout time.Duration, state StateType,
opts ...InitialWaitOption) error {
var options fsmOptions
for _, opt := range opts {
opt.apply(&options)
}
// Wait for the initial wait duration if set.
if options.initialWait > 0 {
select {
case <-time.After(options.initialWait):
case <-ctx.Done():
return ctx.Err()
}
}
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

Loading…
Cancel
Save