Skip to content

Commit

Permalink
Merge pull request #462 from redis/perf-skip-flush-deploy-block-check
Browse files Browse the repository at this point in the history
perf: skip block check if no deplay after flush
  • Loading branch information
rueian authored Feb 8, 2024
2 parents dd84963 + 9adddbc commit cc73e53
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
22 changes: 12 additions & 10 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,15 +410,17 @@ func (p *pipe) _backgroundWrite() (err error) {
runtime.Gosched()
continue
}
// Blocking commands are executed in dedicated client which is aquired from pool.
// So, there is no sense to wait other commands to be written.
// https://github.com/redis/rueidis/issues/379
blocked := ones[0].IsBlock()
for i := 0; i < len(multi) && !blocked; i++ {
blocked = blocked || multi[i].IsBlock()
}
if flushDelay != 0 && !blocked && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage
time.Sleep(flushDelay - time.Since(flushStart)) // ref: https://github.com/redis/rueidis/issues/156
if flushDelay != 0 && atomic.LoadInt32(&p.waits) > 1 { // do not delay for sequential usage
// Blocking commands are executed in dedicated client which is acquired from pool.
// So, there is no sense to wait other commands to be written.
// https://github.com/redis/rueidis/issues/379
var blocked bool
for i := 0; i < len(multi) && !blocked; i++ {
blocked = blocked || multi[i].IsBlock()
}
if !blocked {
time.Sleep(flushDelay - time.Since(flushStart)) // ref: https://github.com/redis/rueidis/issues/156
}
}
}
}
Expand All @@ -429,7 +431,7 @@ func (p *pipe) _backgroundWrite() (err error) {
err = writeCmd(p.w, cmd.Commands())
}
if err != nil {
if err != ErrClosing { // ignore ErrClosing to allow final PING command to be sent
if err != ErrClosing { // ignore ErrClosing to allow the final PING command to be sent
return
}
runtime.Gosched()
Expand Down
25 changes: 25 additions & 0 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,31 @@ func TestWriteWithMaxFlushDelay(t *testing.T) {
}
}

func TestBlockWriteWithNoMaxFlushDelay(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, mock, cancel, _ := setup(t, ClientOption{
AlwaysPipelining: true,
MaxFlushDelay: 20 * time.Microsecond,
})
defer cancel()
times := 2000
wg := sync.WaitGroup{}
wg.Add(times)

for i := 0; i < times; i++ {
go func() {
for _, resp := range p.DoMulti(context.Background(),
cmds.NewBlockingCompleted([]string{"PING"}),
cmds.NewBlockingCompleted([]string{"PING"})).s {
ExpectOK(t, resp)
}
}()
}
for i := 0; i < times; i++ {
mock.Expect("PING").ReplyString("OK").Expect("PING").ReplyString("OK")
}
}

func TestWriteMultiFlush(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, mock, cancel, _ := setup(t, ClientOption{})
Expand Down

0 comments on commit cc73e53

Please sign in to comment.