-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpubzap_test.go
104 lines (92 loc) · 2.26 KB
/
pubzap_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package pubzap_test
import (
"context"
"fmt"
"log"
"strings"
"testing"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/mempubsub"
)
type exampleObj struct {
A int
B string
C []byte
}
func (e exampleObj) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddInt("A", e.A)
enc.AddString("B", e.B)
enc.AddByteString("C", e.C)
return nil
}
func TestPublishAndRecieveLogs(t *testing.T) {
ctx := context.Background()
topic, err := pubsub.OpenTopic(ctx, "mem://topicA")
if err != nil {
t.Fatal(err)
}
defer topic.Shutdown(ctx)
sub, err := pubsub.OpenSubscription(ctx, "mem://topicA")
if err != nil {
t.Fatal(err)
}
defer sub.Shutdown(ctx)
topicInfo, close, err := zap.Open("mem://topicA?publishTimeout=10s")
defer close()
if err != nil {
t.Fatal(err)
}
encoderCfg := zapcore.EncoderConfig{
MessageKey: "msg",
LevelKey: "severity",
NameKey: "logger",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.RFC3339NanoTimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
}
pbencoder := zapcore.NewJSONEncoder(encoderCfg)
core := zapcore.NewTee(
zapcore.NewCore(pbencoder, topicInfo, zapcore.InfoLevel),
)
logger := zap.New(core)
defer logger.Sync()
wantCount := 10000
for i := 1; i <= wantCount; i++ {
go func(v int) {
logger.Info(
fmt.Sprintf("message %d", v),
zap.Int("key1", v),
zap.Int("key2", v),
zap.Int("key3", v),
zap.String("stringKey", strings.Repeat("value", 90)),
zap.Object("exampleObj", exampleObj{B: strings.Repeat("B", 100), A: 9999999, C: []byte(strings.Repeat("C", 100))}),
)
}(i)
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
count := 0
// Loop on received messages.
for {
msg, err := sub.Receive(ctx)
if err != nil {
// Errors from Receive indicate that Receive will no longer succeed.
log.Printf("Receiving message: %v", err)
break
}
// Do work based on the message, for example:
// fmt.Printf("Got message: %q\n", msg.Body)
// Messages must always be acknowledged with Ack.
msg.Ack()
count++
if count >= wantCount {
break
}
}
if count != wantCount {
t.Fatalf("expect count to be %d, got: %d", wantCount, count)
}
}