-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproto.go
166 lines (153 loc) · 4.75 KB
/
proto.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package quacfka
import (
"context"
"errors"
"fmt"
"log"
"sync"
"github.com/apache/arrow-go/v18/arrow/memory"
bufa "github.com/loicalleyne/bufarrow"
"github.com/panjf2000/ants/v2"
"google.golang.org/protobuf/proto"
)
var (
ErrWaitGroupIsNil = errors.New("*sync.waitgroup is nil")
ErrMessageChanCapacityZero = errors.New("chan []byte must have capacity > 0")
ErrRecordChanCapacityZero = errors.New("chan []arrow.record must have capacity > 0")
ErrProcessingFuncIsNil = errors.New("func([]byte, *bufa.Schema[T]) error is nil")
ErrProcessingRoutineCountInvalid = errors.New("deserializer routine count not > 0")
ErrRowGroupSizeMultiplier = errors.New("row group size multiplier not > 0")
)
type processorConf[T proto.Message] struct {
parent *Orchestrator[T]
ctx context.Context
wg *sync.WaitGroup
s *bufa.Schema[T]
unmarshalFunc func([]byte, any) error
rChanCap int
}
func (o *Orchestrator[T]) configureProcessor(ctx context.Context, wg *sync.WaitGroup, unmarshalFunc func([]byte, any) error) (*processorConf[T], error) {
if unmarshalFunc == nil {
return nil, ErrProcessingFuncIsNil
}
d := new(processorConf[T])
d.ctx = ctx
d.parent = o
if wg != nil {
d.wg = wg
}
d.unmarshalFunc = unmarshalFunc
return d, nil
}
func (o *Orchestrator[T]) ConfigureProcessor(rChanCap, rowGroupSizeMultiplier, routineCount int, unmarshalFunc func([]byte, any) error) error {
if rChanCap < 1 {
return ErrRecordChanCapacityZero
}
if routineCount < 1 {
return ErrProcessingRoutineCountInvalid
}
o.msgProcessorsCount.Store(int32(routineCount))
if rowGroupSizeMultiplier < 1 {
return ErrRowGroupSizeMultiplier
}
o.rowGroupSizeMultiplier = rowGroupSizeMultiplier
if unmarshalFunc == nil {
return ErrProcessingFuncIsNil
}
o.mungeFunc = unmarshalFunc
d, err := o.configureProcessor(context.Background(), nil, unmarshalFunc)
if err != nil {
return err
}
d.rChanCap = rChanCap
o.processorConf = d
return nil
}
// ProcessMessages creates a pool of deserializer goroutines
func (o *Orchestrator[T]) ProcessMessages(ctx context.Context, wg *sync.WaitGroup) {
defer close(o.rChan)
defer wg.Done()
cpool, _ := ants.NewPoolWithFunc(o.MsgProcessorsCount(), convertMessages[T], ants.WithPreAlloc(true))
defer cpool.Release()
var cwg sync.WaitGroup
for cpool.Running() < o.MsgProcessorsCount() && ctx.Err() == nil {
sc, err := o.bufArrowSchema.Clone(memory.DefaultAllocator)
if err != nil {
o.err = fmt.Errorf("quacfka: bufarrow schema clone: %w", err)
return
}
cwg.Add(1)
c, _ := o.configureProcessor(ctx, &cwg, o.processorConf.unmarshalFunc)
c.s = sc
cpool.Invoke(c)
if debugLog != nil {
debugLog("quacfka: processing pool size %d\n", cpool.Running())
}
}
cwg.Wait()
if debugLog != nil {
debugLog("quacfka: processing closing rChan: %v recs\n", o.Metrics.recordsProcessed.Load())
}
o.rChanClosed = true
}
func convertMessages[T proto.Message](c any) {
defer func() {
e := recover()
if e != nil {
switch x := e.(type) {
case error:
err := x
c.(*processorConf[T]).parent.err = fmt.Errorf("recover convertmessages: %w", err)
if errorLog != nil {
errorLog("recover convertmessages: %v\n", err)
}
case string:
err := errors.New(x)
c.(*processorConf[T]).parent.err = fmt.Errorf("recover convertmessages: %w", err)
if errorLog != nil {
errorLog("recover convertmessages: %s\n", err)
}
default:
}
return
}
}()
defer c.(*processorConf[T]).wg.Done()
bc := 0
for m := range c.(*processorConf[T]).parent.mChan {
err := c.(*processorConf[T]).unmarshalFunc(m, c.(*processorConf[T]).s)
if err != nil {
log.Println("unmarshal ", err)
c.(*processorConf[T]).parent.err = fmt.Errorf("unmarshal error: %w", err)
if errorLog != nil {
errorLog("quacfka: processing - unmarshal error: %v\n", err)
}
continue
}
bc++
c.(*processorConf[T]).parent.Metrics.recordsProcessed.Add(1)
if bc == 122880*c.(*processorConf[T]).parent.rowGroupSizeMultiplier {
c.(*processorConf[T]).parent.rChan <- Record{Raw: c.(*processorConf[T]).s.NewRecord(),
Norm: c.(*processorConf[T]).s.NewNormalizerRecord()}
c.(*processorConf[T]).parent.rChanRecs.Add(1)
if debugLog != nil {
debugLog("quacfka: new arrow record - %d\n", c.(*processorConf[T]).parent.rChanRecs.Load())
}
bc = 0
}
select {
case <-c.(*processorConf[T]).ctx.Done():
break
default:
}
}
if bc != 0 {
c.(*processorConf[T]).parent.rChan <- Record{Raw: c.(*processorConf[T]).s.NewRecord(),
Norm: c.(*processorConf[T]).s.NewNormalizerRecord()}
c.(*processorConf[T]).parent.rChanRecs.Add(1)
if debugLog != nil {
debugLog("quacfka: new arrow record - %d\n", len(c.(*processorConf[T]).parent.rChan))
}
}
c.(*processorConf[T]).s.Release()
}