Skip to content

Commit

Permalink
Merge pull request #386 from lesismal/conn_send_list
Browse files Browse the repository at this point in the history
nbio.Conn: change writeBuffer to writeList, support async sendfile
  • Loading branch information
lesismal authored Jan 18, 2024
2 parents a496ba6 + 38f45db commit 1c97b6e
Show file tree
Hide file tree
Showing 17 changed files with 425 additions and 295 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
uses: golangci/[email protected]
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.52.2
version: v1.55.2
# Optional: working directory, useful for monorepos
# working-directory: somedir

Expand Down
294 changes: 202 additions & 92 deletions conn_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,50 @@ import (
"github.com/lesismal/nbio/mempool"
)

var (
emptyToWrite = toWrite{}

poolToWrite = sync.Pool{
New: func() interface{} {
return &toWrite{}
},
}
)

func newToWriteBuf(buf []byte) *toWrite {
t := poolToWrite.New().(*toWrite)
b := mempool.Malloc(len(buf))
copy(b, buf)
t.buf = b
return t
}

func newToWriteFile(fd int, offset, remain int64) *toWrite {
t := poolToWrite.New().(*toWrite)
t.fd = fd
t.offset = offset
t.remain = remain
return t
}

func releaseToWrite(t *toWrite) {
if t.buf != nil {
mempool.Free(t.buf)
}
if t.fd > 0 {
syscall.Close(t.fd)
}
*t = emptyToWrite
poolToWrite.Put(t)
}

type toWrite struct {
fd int
buf []byte
offset int64
remain int64
}

// Conn implements net.Conn.
type Conn struct {
mux sync.Mutex
Expand All @@ -32,7 +76,8 @@ type Conn struct {
rTimer *time.Timer
wTimer *time.Timer

writeBuffer []byte
left int
writeList []*toWrite

typ ConnType
closed bool
Expand All @@ -46,8 +91,6 @@ type Conn struct {

session interface{}

chWaitWrite chan struct{}

execList []func()

DataHandler func(c *Conn, data []byte)
Expand Down Expand Up @@ -159,7 +202,7 @@ func (c *Conn) Write(b []byte) (int, error) {
return n, err
}

if len(c.writeBuffer) == 0 {
if len(c.writeList) == 0 {
if c.wTimer != nil {
c.wTimer.Stop()
c.wTimer = nil
Expand Down Expand Up @@ -197,7 +240,7 @@ func (c *Conn) Writev(in [][]byte) (int, error) {
c.closeWithErrorWithoutLock(err)
return n, err
}
if len(c.writeBuffer) == 0 {
if len(c.writeList) == 0 {
if c.wTimer != nil {
c.wTimer.Stop()
c.wTimer = nil
Expand Down Expand Up @@ -387,7 +430,7 @@ func (c *Conn) write(b []byte) (int, error) {
return -1, errOverflow
}

if len(c.writeBuffer) == 0 {
if len(c.writeList) == 0 {
n, err := c.doWrite(b)
if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) {
return n, err
Expand All @@ -397,113 +440,185 @@ func (c *Conn) write(b []byte) (int, error) {
}
left := len(b) - n
if left > 0 && c.typ == ConnTypeTCP {
c.writeBuffer = mempool.Malloc(left)
copy(c.writeBuffer, b[n:])
c.modWrite()
t := newToWriteBuf(b[n:])
c.appendWrite(t)
}
return len(b), nil
}
c.writeBuffer = mempool.Append(c.writeBuffer, b...)

t := newToWriteBuf(b)
c.appendWrite(t)

return len(b), nil
}

func (c *Conn) flush() error {
c.mux.Lock()
if c.closed {
c.mux.Unlock()
return net.ErrClosed
func (c *Conn) writev(in [][]byte) (int, error) {
size := 0
for _, v := range in {
size += len(v)
}

if len(c.writeBuffer) == 0 {
if c.chWaitWrite != nil {
select {
case c.chWaitWrite <- struct{}{}:
default:
}
if c.overflow(size) {
return -1, errOverflow
}
if len(c.writeList) > 0 {
for _, v := range in {
t := newToWriteBuf(v)
c.appendWrite(t)
}
c.mux.Unlock()
return nil
return size, nil
}

old := c.writeBuffer

n, err := c.doWrite(old)
n, err := writev(c.fd, in)
if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) {
c.closed = true
c.mux.Unlock()
c.closeWithErrorWithoutLock(err)
return err
}
if n < 0 {
n = 0
return n, err
}
left := len(old) - n
if left > 0 {
if n > 0 {
c.writeBuffer = mempool.Malloc(left)
copy(c.writeBuffer, old[n:])
mempool.Free(old)
}
// c.modWrite()
} else {
mempool.Free(old)
c.writeBuffer = nil
if c.wTimer != nil {
c.wTimer.Stop()
c.wTimer = nil
}
c.resetRead()
if c.chWaitWrite != nil {
select {
case c.chWaitWrite <- struct{}{}:
default:
nwrite := 0
if n > 0 {
nwrite += n
if n < size {
for i := 0; i < len(in) && n > 0; i++ {
b := in[i]
if n >= len(b) {
n -= len(b)
t := newToWriteBuf(b)
c.appendWrite(t)
} else {
t := newToWriteBuf(b[len(b)-n:])
c.appendWrite(t)
}
}
c.left += (size - n)
}
}

c.mux.Unlock()
return nil
return nwrite, nil
}

func (c *Conn) writev(in [][]byte) (int, error) {
size := 0
for _, v := range in {
size += len(v)
func (c *Conn) appendWrite(t *toWrite) {
c.writeList = append(c.writeList, t)
if t.buf != nil {
c.left += len(t.buf)
}
if c.overflow(size) {
return -1, errOverflow
}

func (c *Conn) flush() error {
c.mux.Lock()
defer c.mux.Unlock()
if c.closed {
return net.ErrClosed
}
if len(c.writeBuffer) > 0 {
for _, v := range in {
c.writeBuffer = mempool.Append(c.writeBuffer, v...)

if len(c.writeList) == 0 {
return nil
}

iovc := make([][]byte, 4)[0:0]
writeBuffers := func() error {
var (
n int
err error
head *toWrite
)

if len(c.writeList) == 1 {
head = c.writeList[0]
buf := head.buf[head.offset:]
for len(buf) > 0 && err == nil {
n, err = syscall.Write(c.fd, buf)
if n > 0 {
c.left -= n
head.offset += int64(n)
buf = buf[n:]
if len(buf) == 0 {
releaseToWrite(head)
c.writeList = nil
}
}
}
return err
}
return size, nil

iovc = iovc[0:0]
for i := 0; i < len(c.writeList); i++ {
head = c.writeList[i]
if head.buf != nil {
iovc = append(iovc, head.buf[head.offset:])
} else {
break
}
}

for len(iovc) > 0 && err == nil {
n, err = writev(c.fd, iovc)
if n > 0 {
c.left -= n
for n > 0 {
head = c.writeList[0]
headLeft := len(head.buf) - int(head.offset)
if n < headLeft {
head.offset += int64(n)
iovc[0] = iovc[0][n:]
break
} else {
releaseToWrite(head)
c.writeList = c.writeList[1:]
if len(c.writeList) == 0 {
c.writeList = nil
}
iovc = iovc[1:]
n -= headLeft
}
}
}
}
return err
}

if len(in) > 1 && size <= 65536 {
b := mempool.Malloc(size)
copied := 0
for _, v := range in {
copy(b[copied:], v)
copied += len(v)
writeFile := func() error {
v := c.writeList[0]
for v.remain > 0 {
var offset = v.offset
n, err := syscall.Sendfile(c.fd, v.fd, &offset, int(v.remain))
if n > 0 {
v.remain -= int64(n)
v.offset += int64(n)
if v.remain <= 0 {
releaseToWrite(c.writeList[0])
c.writeList = c.writeList[1:]
}
}
if err != nil {
return err
}
}
n, err := c.write(b)
mempool.Free(b)
return n, err
return nil
}

nwrite := 0
for _, b := range in {
n, err := c.write(b)
if n > 0 {
nwrite += n
for len(c.writeList) > 0 {
var err error
if c.writeList[0].fd == 0 {
err = writeBuffers()
} else {
err = writeFile()
}

if errors.Is(err, syscall.EINTR) {
continue
}
if errors.Is(err, syscall.EAGAIN) {
// c.modWrite()
return nil
}
if err != nil {
return nwrite, err
c.closed = true
c.closeWithErrorWithoutLock(err)
return err
}
}
return nwrite, nil

c.resetRead()

return nil
}

func (c *Conn) doWrite(b []byte) (int, error) {
Expand All @@ -524,7 +639,7 @@ func (c *Conn) doWrite(b []byte) (int, error) {

func (c *Conn) overflow(n int) bool {
g := c.p.g
return g.maxWriteBufferSize > 0 && (len(c.writeBuffer)+n > g.maxWriteBufferSize)
return g.maxWriteBufferSize > 0 && (c.left+n > g.maxWriteBufferSize)
}

func (c *Conn) closeWithError(err error) error {
Expand All @@ -551,16 +666,11 @@ func (c *Conn) closeWithError(err error) error {
func (c *Conn) closeWithErrorWithoutLock(err error) error {
c.closeErr = err

if c.writeBuffer != nil {
mempool.Free(c.writeBuffer)
c.writeBuffer = nil
}

if c.chWaitWrite != nil {
select {
case c.chWaitWrite <- struct{}{}:
default:
if c.writeList != nil {
for _, t := range c.writeList {
releaseToWrite(t)
}
c.writeList = nil
}

if c.p.g != nil {
Expand Down
Loading

0 comments on commit 1c97b6e

Please sign in to comment.