-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathencoder.go
163 lines (140 loc) · 3.34 KB
/
encoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package astiencoder
import (
"sync"
"fmt"
"github.com/asticode/go-astitools/worker"
"github.com/pkg/errors"
)
// Errors
var (
ErrWorkflowNotFound = errors.New("workflow.not.found")
)
// Encoder represents an encoder
type Encoder struct {
b WorkflowBuilder
cfg *Configuration
e *exposer
ee *eventEmitter
m *sync.Mutex
w *astiworker.Worker
ws map[string]*Workflow
wsStarted map[string]bool
}
// WorkflowBuilder represents an object that can build a workflow based on a job
type WorkflowBuilder interface {
BuildWorkflow(j Job, w *Workflow) error
}
// NewEncoder creates a new encoder
func NewEncoder(cfg *Configuration) (e *Encoder) {
aw := astiworker.NewWorker()
ee := newEventEmitter()
e = &Encoder{
cfg: cfg,
ee: ee,
m: &sync.Mutex{},
w: aw,
ws: make(map[string]*Workflow),
wsStarted: make(map[string]bool),
}
e.e = newExposer(e)
ee.addHandler(e.handleEvent)
return
}
// Close implements the io.Closer interface
func (e *Encoder) Close() error {
return nil
}
// Stop stops the encoder
func (e *Encoder) Stop() {
e.w.Stop()
}
// HandleSignals handles signals
func (e *Encoder) HandleSignals() {
e.w.HandleSignals()
}
// Wait is a blocking pattern
func (e *Encoder) Wait() {
e.w.Wait()
}
// AddEventHandler adds an event handler
func (e *Encoder) AddEventHandler(h EventHandler) {
e.ee.addHandler(h)
}
// SetWorkflowBuilder sets the workflow builder
func (e *Encoder) SetWorkflowBuilder(b WorkflowBuilder) {
e.b = b
}
// Serve creates and starts the server
func (e *Encoder) Serve() (err error) {
var s *server
if s, err = newServer(e.cfg.Server, e.e); err != nil {
err = errors.Wrap(err, "astiencoder: creating new server failed")
return
}
e.AddEventHandler(s.handleEvent)
e.w.Serve(e.cfg.Server.Addr, s.handler())
return
}
// NewWorkflow creates a new workflow based on a job
func (e *Encoder) NewWorkflow(name string, j Job) (w *Workflow, err error) {
// No workflow builder
if e.b == nil {
err = errors.New("astiencoder: no workflow builder")
return
}
// Name is already used
e.m.Lock()
_, ok := e.ws[name]
e.m.Unlock()
if ok {
err = fmt.Errorf("astiencoder: workflow name %s is already used", name)
return
}
// Create closer
c := newCloser()
// Create workflow
w = newWorkflow(name, j, e.w.Context(), e.ee.emit, e.w.NewTask, c)
// Build workflow
if err = e.b.BuildWorkflow(j, w); err != nil {
err = errors.Wrapf(err, "astiencoder: building workflow for job %+v failed", j)
return
}
// Index nodes in workflow
w.indexNodes()
// Store workflow
e.m.Lock()
e.ws[name] = w
e.m.Unlock()
return
}
// Workflow returns a specific workflow
func (e *Encoder) Workflow(name string) (w *Workflow, err error) {
e.m.Lock()
defer e.m.Unlock()
var ok bool
if w, ok = e.ws[name]; !ok {
err = ErrWorkflowNotFound
return
}
return
}
func (e *Encoder) handleEvent() (bool, func(Event)) {
return false, func(evt Event) {
switch evt.Name {
case EventNameWorkflowStarted:
e.m.Lock()
defer e.m.Unlock()
if _, ok := e.ws[evt.Payload.(string)]; !ok {
return
}
e.wsStarted[evt.Payload.(string)] = true
case EventNameWorkflowStopped:
e.m.Lock()
defer e.m.Unlock()
delete(e.wsStarted, evt.Payload.(string))
if e.cfg.Exec.StopWhenWorkflowsAreStopped && len(e.wsStarted) == 0 {
e.Stop()
}
}
}
}