-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathbucket.go
86 lines (76 loc) · 2.12 KB
/
bucket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package observe
import (
"log"
"sync"
"time"
)
type Flusher interface {
Flush(events []TraceEvent) error
}
// EventBucket is a bucket for outgoing TraceEvents.
// It only schedules flushes when the bucket goes from empty to 1 item.
// At most the latency to flush the bucket will be flushPeriod.
// It will also flush the TraceEvents in batches according to batch size
type EventBucket struct {
mu sync.Mutex
wg sync.WaitGroup
bucket []TraceEvent
flushPeriod time.Duration
batchSize int
}
// NewEventBucket creates an EventBucket
func NewEventBucket(batchSize int, flushPeriod time.Duration) *EventBucket {
return &EventBucket{
flushPeriod: flushPeriod,
batchSize: batchSize,
}
}
// addEvent adds a TraceEvent and schedules to flush to Flusher if needed
func (b *EventBucket) addEvent(e TraceEvent, f Flusher) {
b.mu.Lock()
wasEmpty := len(b.bucket) == 0
b.bucket = append(b.bucket, e)
b.mu.Unlock()
// if this is the first event in the bucket,
// we schedule a flush
if wasEmpty {
b.scheduleFlush(f)
}
}
// Wait will block until all pending flushes are done
func (b *EventBucket) Wait() {
b.wg.Wait()
}
// scheduleFlush schedules a goroutine to flush
// the bucket at some time in the future depending on flushPeriod.
// Events will continue to build up until the flush comes due
func (b *EventBucket) scheduleFlush(f Flusher) {
// we start this routine and immediately wait, we are effectively
// scheduling the flush to run flushPeriod sections later. In the meantime,
// events may still be coming into the eventBucket
go func() {
// register this flush with the wait group
defer b.wg.Done()
b.wg.Add(1)
// wait for flushPeriod
time.Sleep(b.flushPeriod)
// move the events out of the EventBucket to a slice
// and add 1 to the waitgroup
b.mu.Lock()
bucket := b.bucket
b.bucket = nil
b.mu.Unlock()
// flush the bucket in chunks of batchSize
for i := 0; i < len(bucket); i += b.batchSize {
j := i + b.batchSize
if j > len(bucket) {
j = len(bucket)
}
// TODO retry logic?
err := f.Flush(bucket[i:j])
if err != nil {
log.Println(err)
}
}
}()
}