-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcacheserver.go
136 lines (115 loc) · 3.01 KB
/
cacheserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package rpc
import (
"log"
"sync"
"github.com/n4mine/cacheserver/cache"
"github.com/n4mine/cacheserver/chunks"
"github.com/n4mine/cacheserver/models"
)
func (cs *CacheServer) Push(ps []models.Point, r *SimpleRpcResponse) error {
var cnt, fail int64
var err error
for _, p := range ps {
if err = cache.CacheObj.Push(p.Key, p.Timestamp, p.Value); err != nil {
log.Printf("push obj error, obj: %v, error: %v\n", p, err)
fail++
}
cnt++
}
r.Code = 0
return nil
}
func (cs *CacheServer) Get(req []DataReq, resp *[]*DataResp) error {
ch := make(chan *DataResp, 1e3)
go func() {
wg := sync.WaitGroup{}
wg.Add(len(req)) // 因为query限制了查询曲线的条数, 所以这里的并发不会太大
for _, r := range req {
go func(singleReq DataReq) {
defer wg.Done()
key := singleReq.Key
from := singleReq.From
to := singleReq.To
var singleResp DataResp
// 请求参数有误
if len(key) == 0 || from == 0 || to == 0 || from >= to {
singleResp.Code = models.CodeUserErr
singleResp.Msg = "请求参数有错误"
singleResp.Key = singleReq.Key
singleResp.From = singleReq.From
singleResp.To = singleReq.To
singleResp.Step = singleReq.Step
singleResp.RRA = singleReq.RRA
singleResp.Data = nil
ch <- &singleResp
return
}
data, err := cache.CacheObj.Get(key, from, to)
// 获取数据错误
if err != nil {
var code int
switch err {
case models.ErrNonExistSeries:
code = models.CodeNonExistSeries
case models.ErrNonEnoughData:
code = models.CodeNonEnoughErr
case models.ErrInternalError:
code = models.CodeInternalErr
}
singleResp.Code = code
singleResp.Msg = err.Error()
singleResp.Key = singleReq.Key
singleResp.From = singleReq.From
singleResp.To = singleReq.To
singleResp.Step = singleReq.Step
singleResp.RRA = singleReq.RRA
singleResp.Data = nil
ch <- &singleResp
return
}
// 正常返回
singleResp.Code = models.CodeSucc
singleResp.Key = singleReq.Key
singleResp.From = singleReq.From
singleResp.To = singleReq.To
singleResp.Step = singleReq.Step
singleResp.RRA = singleReq.RRA
singleResp.Data = data
ch <- &singleResp
}(r)
}
wg.Wait()
close(ch)
}()
for r := range ch {
*resp = append(*resp, r)
}
return nil
}
// SimpleRpcResponse 同transfer定义
type SimpleRpcResponse struct {
// code == 0, normal
// code > 0, exception
Code int `msg:"code"`
}
// DataReq, 同query定义
type DataReq struct {
Key string `msg:"key"`
From int64 `msg:"from"`
To int64 `msg:"to"`
Step int `msg:"step"`
RRA int `msg:"rra"`
}
// DataResp, 同query定义
type DataResp struct {
// code == 0, normal
// code > 0, exception
Code int `msg:"code"`
Msg string `msg:"msg"`
Key string `msg:"key"`
From int64 `msg:"from"`
To int64 `msg:"to"`
Step int `msg:"step"`
RRA int `msg:"rra"`
Data []chunks.Iter `msg:"data"`
}