Skip to content

Commit

Permalink
feat: add AZ to replica info (#721)
Browse files Browse the repository at this point in the history
* feat: add AZ to replica info

* refactor: alwyas send HELLO

* refactor: use hello 2 explicit

* perf: use only when replica exist

* feat: add enable az lookup

* refactor: add comments to az lookup

* refactor: do not use mutex

* refactor: rename option
  • Loading branch information
proost authored Jan 18, 2025
1 parent 88c57e7 commit f14b451
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 0 deletions.
8 changes: 8 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type mockConn struct {
DoMultiStreamFn func(cmd ...Completed) MultiRedisResultStream
InfoFn func() map[string]RedisMessage
VersionFn func() int
AZFn func() string
ErrorFn func() error
CloseFn func()
DialFn func() error
Expand Down Expand Up @@ -163,6 +164,13 @@ func (m *mockConn) Version() int {
return 0
}

func (m *mockConn) AZ() string {
if m.AZFn != nil {
return m.AZFn()
}
return ""
}

func (m *mockConn) Error() error {
if m.ErrorFn != nil {
return m.ErrorFn()
Expand Down
20 changes: 20 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,26 @@ func (c *clusterClient) _refresh() (err error) {
}
if len(g.nodes) > 1 {
n := len(g.nodes) - 1

if c.opt.EnableReplicaInfoAZ {
var wg sync.WaitGroup
for i := 0; i < n; i += 4 { // batch AZ() for every 4 connections
for j := i; j < i+4 && j < n; j++ {
replica := g.nodes[j+1]
rConn := conns[replica.Addr].conn

wg.Add(1)
go func(j int, rConn conn) {
defer wg.Done()

g.nodes[j+1].AZ = rConn.AZ()
}(j, rConn)
}

wg.Wait()
}
}

for _, slot := range g.slots {
for i := slot[0]; i <= slot[1] && i >= 0 && i < 16384; i++ {
pslots[i] = conns[master].conn
Expand Down
105 changes: 105 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,111 @@ func TestClusterClientInit(t *testing.T) {
t.Fatalf("unexpected node assigned to rslot 16383")
}
})

t.Run("Refresh cluster which has multi replicas with az", func(t *testing.T) {
primaryNodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return slotsMultiRespWithMultiReplicas
}
return RedisResult{
err: errors.New("unexpected call"),
}
},
AZFn: func() string {
return "us-west-1a"
},
}
replicaNodeConn1 := &mockConn{
DoFn: func(cmd Completed) RedisResult {
return RedisResult{
err: errors.New("unexpected call"),
}
},
AZFn: func() string {
return "us-west-1a"
},
}
replicaNodeConn2 := &mockConn{
DoFn: func(cmd Completed) RedisResult {
return RedisResult{
err: errors.New("unexpected call"),
}
},
AZFn: func() string {
return "us-west-1b"
},
}
replicaNodeConn3 := &mockConn{
DoFn: func(cmd Completed) RedisResult {
return RedisResult{
err: errors.New("unexpected call"),
}
},
AZFn: func() string {
return "us-west-1c"
},
}

client, err := newClusterClient(
&ClientOption{
InitAddress: []string{"127.0.0.1:0"},
SendToReplicas: func(cmd Completed) bool {
return true
},
ReplicaSelector: func(slot uint16, replicas []ReplicaInfo) int {
for i, replica := range replicas {
if replica.AZ == "us-west-1b" {
return i
}
}
return -1
},
EnableReplicaInfoAZ: true,
},
func(dst string, opt *ClientOption) conn {
switch {
case dst == "127.0.0.2:1" || dst == "127.0.1.2:1":
return replicaNodeConn1
case dst == "127.0.0.3:2" || dst == "127.0.1.3:2":
return replicaNodeConn2
case dst == "127.0.0.4:3" || dst == "127.0.1.4:3":
return replicaNodeConn3
default:
return primaryNodeConn
}
},
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}

if client.pslots[0] != primaryNodeConn {
t.Fatalf("unexpected node assigned to pslot 0")
}
if client.pslots[8192] != primaryNodeConn {
t.Fatalf("unexpected node assigned to pslot 8192")
}
if client.pslots[8193] != primaryNodeConn {
t.Fatalf("unexpected node assigned to pslot 8193")
}
if client.pslots[16383] != primaryNodeConn {
t.Fatalf("unexpected node assigned to pslot 16383")
}
if client.rslots[0] != replicaNodeConn2 {
t.Fatalf("unexpected node assigned to rslot 0")
}
if client.rslots[8192] != replicaNodeConn2 {
t.Fatalf("unexpected node assigned to rslot 8192")
}
if client.rslots[8193] != replicaNodeConn2 {
t.Fatalf("unexpected node assigned to rslot 8193")
}
if client.rslots[16383] != replicaNodeConn2 {
t.Fatalf("unexpected node assigned to rslot 16383")
}
})
}

//gocyclo:ignore
Expand Down
5 changes: 5 additions & 0 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type conn interface {
DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisResultStream
Info() map[string]RedisMessage
Version() int
AZ() string
Error() error
Close()
Dial() error
Expand Down Expand Up @@ -190,6 +191,10 @@ func (m *mux) Version() int {
return m.pipe(0).Version()
}

func (m *mux) AZ() string {
return m.pipe(0).AZ()
}

func (m *mux) Error() error {
return m.pipe(0).Error()
}
Expand Down
23 changes: 23 additions & 0 deletions mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,21 @@ func TestMuxDelegation(t *testing.T) {
}
})

t.Run("wire az", func(t *testing.T) {
m, checkClean := setupMux([]*mockWire{
{
AZFn: func() string {
return "az"
},
},
})
defer checkClean(t)
defer m.Close()
if az := m.AZ(); az != "az" {
t.Fatalf("unexpected az %v", az)
}
})

t.Run("wire err", func(t *testing.T) {
e := errors.New("err")
m, checkClean := setupMux([]*mockWire{
Expand Down Expand Up @@ -1042,6 +1057,7 @@ type mockWire struct {
DoStreamFn func(pool *pool, cmd Completed) RedisResultStream
DoMultiStreamFn func(pool *pool, cmd ...Completed) MultiRedisResultStream
InfoFn func() map[string]RedisMessage
AZFn func() string
VersionFn func() int
ErrorFn func() error
CloseFn func()
Expand Down Expand Up @@ -1133,6 +1149,13 @@ func (m *mockWire) Version() int {
return 0
}

func (m *mockWire) AZ() string {
if m.AZFn != nil {
return m.AZFn()
}
return ""
}

func (m *mockWire) Error() error {
if m == nil {
return ErrClosing
Expand Down
12 changes: 12 additions & 0 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type wire interface {
DoMultiStream(ctx context.Context, pool *pool, multi ...Completed) MultiRedisResultStream
Info() map[string]RedisMessage
Version() int
AZ() string
Error() error
Close()

Expand Down Expand Up @@ -257,6 +258,7 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg
return nil, ErrNoCache
}
init = init[:0]
init = append(init, []string{"HELLO", "2"})
if password != "" && username == "" {
init = append(init, []string{"AUTH", password})
} else if username != "" {
Expand Down Expand Up @@ -304,9 +306,15 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps, nobg
continue
}
if err = r.Error(); err != nil {
if re, ok := err.(*RedisError); ok && noHello.MatchString(re.string) {
continue
}
p.Close()
return nil, err
}
if i == 0 {
p.info, err = r.AsMap()
}
}
}
}
Expand Down Expand Up @@ -842,6 +850,10 @@ func (p *pipe) Version() int {
return int(p.version)
}

func (p *pipe) AZ() string {
return p.info["availability_zone"].string
}

func (p *pipe) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
if err := ctx.Err(); err != nil {
return newErrResult(err)
Expand Down
49 changes: 49 additions & 0 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ func TestNewPipe(t *testing.T) {
values: []RedisMessage{
{typ: '+', string: "proto"},
{typ: ':', integer: 3},
{typ: '+', string: "availability_zone"},
{typ: '+', string: "us-west-1a"},
},
})
mock.Expect("CLIENT", "TRACKING", "ON", "OPTIN").
Expand Down Expand Up @@ -238,6 +240,9 @@ func TestNewPipe(t *testing.T) {
t.Fatalf("pipe setup failed: %v", err)
}
go func() { mock.Expect("PING").ReplyString("OK") }()
if p.AZ() != "us-west-1a" {
t.Fatalf("unexpected az: %v", p.AZ())
}
p.Close()
mock.Close()
n1.Close()
Expand All @@ -247,6 +252,16 @@ func TestNewPipe(t *testing.T) {
n1, n2 := net.Pipe()
mock := &redisMock{buf: bufio.NewReader(n2), conn: n2, t: t}
go func() {
mock.Expect("HELLO", "2").
Reply(RedisMessage{
typ: '*',
values: []RedisMessage{
{typ: '+', string: "proto"},
{typ: ':', integer: 2},
{typ: '+', string: "availability_zone"},
{typ: '+', string: "us-west-1a"},
},
})
mock.Expect("AUTH", "pa").
ReplyString("OK")
mock.Expect("CLIENT", "SETNAME", "cn").
Expand Down Expand Up @@ -276,6 +291,9 @@ func TestNewPipe(t *testing.T) {
t.Fatalf("pipe setup failed: %v", err)
}
go func() { mock.Expect("PING").ReplyString("OK") }()
if p.AZ() != "us-west-1a" {
t.Fatalf("unexpected az: %v", p.AZ())
}
p.Close()
mock.Close()
n1.Close()
Expand Down Expand Up @@ -312,6 +330,9 @@ func TestNewPipe(t *testing.T) {
t.Fatalf("pipe setup failed: %v", err)
}
go func() { mock.Expect("PING").ReplyString("OK") }()
if p.AZ() != "" {
t.Fatalf("unexpected az: %v", p.AZ())
}
p.Close()
mock.Close()
n1.Close()
Expand Down Expand Up @@ -578,6 +599,21 @@ func TestNewRESP2Pipe(t *testing.T) {
{typ: '+', string: "redis"},
{typ: '+', string: "proto"},
{typ: ':', integer: 2},
{typ: '+', string: "availability_zone"},
{typ: '+', string: "us-west-1a"},
}})
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LibName).
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLIENT", "SETINFO", "LIB-VER", LibVer).
ReplyError("UNKNOWN COMMAND")
mock.Expect("HELLO", "2").
Reply(RedisMessage{typ: '*', values: []RedisMessage{
{typ: '+', string: "server"},
{typ: '+', string: "redis"},
{typ: '+', string: "proto"},
{typ: ':', integer: 2},
{typ: '+', string: "availability_zone"},
{typ: '+', string: "us-west-1a"},
}})
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", LibName).
ReplyError("UNKNOWN COMMAND")
Expand All @@ -593,6 +629,9 @@ func TestNewRESP2Pipe(t *testing.T) {
if p.version >= 6 {
t.Fatalf("unexpected p.version: %v", p.version)
}
if p.AZ() != "us-west-1a" {
t.Fatalf("unexpected az: %v", p.AZ())
}
go func() { mock.Expect("PING").ReplyString("OK") }()
p.Close()
mock.Close()
Expand All @@ -611,6 +650,8 @@ func TestNewRESP2Pipe(t *testing.T) {
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLIENT", "SETINFO", "LIB-VER", LibVer).
ReplyError("UNKNOWN COMMAND")
mock.Expect("HELLO", "2").
ReplyError("ERR unknown command `HELLO`")
mock.Expect("AUTH", "pa").
ReplyString("OK")
mock.Expect("CLIENT", "SETNAME", "cn").
Expand Down Expand Up @@ -652,6 +693,8 @@ func TestNewRESP2Pipe(t *testing.T) {
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLIENT", "SETINFO", "LIB-VER", LibVer).
ReplyError("UNKNOWN COMMAND")
mock.Expect("HELLO", "2").
ReplyError("ERR unknown command `HELLO`")
mock.Expect("AUTH", "ua", "pa").
ReplyString("OK")
mock.Expect("CLIENT", "SETNAME", "cn").
Expand Down Expand Up @@ -696,6 +739,8 @@ func TestNewRESP2Pipe(t *testing.T) {
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLIENT", "SETINFO", "LIB-VER", LibVer).
ReplyError("UNKNOWN COMMAND")
mock.Expect("HELLO", "2").
ReplyError("ERR unknown command `HELLO`")
mock.Expect("AUTH", "pa").
ReplyString("OK")
mock.Expect("CLIENT", "SETNAME", "cn").
Expand Down Expand Up @@ -742,6 +787,8 @@ func TestNewRESP2Pipe(t *testing.T) {
ReplyError("UNKNOWN COMMAND")
mock.Expect("CLIENT", "SETINFO", "LIB-VER", LibVer).
ReplyError("UNKNOWN COMMAND")
mock.Expect("HELLO", "2").
ReplyError("ERR unknown command `HELLO`")
mock.Expect("AUTH", "pa").
ReplyString("OK")
mock.Expect("CLIENT", "SETNAME", "cn").
Expand Down Expand Up @@ -806,6 +853,8 @@ func TestNewRESP2Pipe(t *testing.T) {
go func() {
mock.Expect("HELLO", "3").
ReplyError("ERR unknown command `HELLO`")
mock.Expect("HELLO", "2").
ReplyError("ERR unknown command `HELLO`")
}()
p, err := newPipe(func() (net.Conn, error) { return n1, nil }, &ClientOption{
DisableCache: true,
Expand Down
Loading

0 comments on commit f14b451

Please sign in to comment.