From 688b00bed3f7b541bb3fd01c8e8eb46ee44aad60 Mon Sep 17 00:00:00 2001 From: lesismal Date: Thu, 18 Jan 2024 16:10:24 +0800 Subject: [PATCH 1/6] nbio.Conn: change writeBuffer to writeList, support async sendfile --- conn_unix.go | 292 +++++++++++++++++++++++++++++++-------------- go.mod | 3 +- go.sum | 17 ++- nbhttp/response.go | 43 ++++--- nbio_test.go | 3 +- poller_epoll.go | 2 +- sendfile_unix.go | 109 +++++++++++++++++ writev_bsd.go | 28 +++++ writev_linux.go | 14 +++ 9 files changed, 397 insertions(+), 114 deletions(-) create mode 100644 sendfile_unix.go create mode 100644 writev_bsd.go create mode 100644 writev_linux.go diff --git a/conn_unix.go b/conn_unix.go index f164f618..dc2dd815 100644 --- a/conn_unix.go +++ b/conn_unix.go @@ -19,6 +19,50 @@ import ( "github.com/lesismal/nbio/mempool" ) +var ( + emptyToWrite = toWrite{} + + poolToWrite = sync.Pool{ + New: func() interface{} { + return &toWrite{} + }, + } +) + +func newToWriteBuf(buf []byte) *toWrite { + t := poolToWrite.New().(*toWrite) + b := mempool.Malloc(len(buf)) + copy(b, buf) + t.buf = b + return t +} + +func newToWriteFile(fd int, offset, remain int64) *toWrite { + t := poolToWrite.New().(*toWrite) + t.fd = fd + t.offset = offset + t.remain = remain + return t +} + +func releaseToWrite(t *toWrite) { + if t.buf != nil { + mempool.Free(t.buf) + } + if t.fd > 0 { + syscall.Close(t.fd) + } + *t = emptyToWrite + poolToWrite.Put(t) +} + +type toWrite struct { + fd int + buf []byte + offset int64 + remain int64 +} + // Conn implements net.Conn. type Conn struct { mux sync.Mutex @@ -32,7 +76,8 @@ type Conn struct { rTimer *time.Timer wTimer *time.Timer - writeBuffer []byte + left int + writeList []*toWrite typ ConnType closed bool @@ -46,8 +91,6 @@ type Conn struct { session interface{} - chWaitWrite chan struct{} - execList []func() DataHandler func(c *Conn, data []byte) @@ -159,7 +202,7 @@ func (c *Conn) Write(b []byte) (int, error) { return n, err } - if len(c.writeBuffer) == 0 { + if len(c.writeList) == 0 { if c.wTimer != nil { c.wTimer.Stop() c.wTimer = nil @@ -197,7 +240,7 @@ func (c *Conn) Writev(in [][]byte) (int, error) { c.closeWithErrorWithoutLock(err) return n, err } - if len(c.writeBuffer) == 0 { + if len(c.writeList) == 0 { if c.wTimer != nil { c.wTimer.Stop() c.wTimer = nil @@ -387,7 +430,7 @@ func (c *Conn) write(b []byte) (int, error) { return -1, errOverflow } - if len(c.writeBuffer) == 0 { + if len(c.writeList) == 0 { n, err := c.doWrite(b) if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) { return n, err @@ -397,113 +440,183 @@ func (c *Conn) write(b []byte) (int, error) { } left := len(b) - n if left > 0 && c.typ == ConnTypeTCP { - c.writeBuffer = mempool.Malloc(left) - copy(c.writeBuffer, b[n:]) - c.modWrite() + t := newToWriteBuf(b[n:]) + c.appendWrite(t) } return len(b), nil } - c.writeBuffer = mempool.Append(c.writeBuffer, b...) + + t := newToWriteBuf(b) + c.appendWrite(t) return len(b), nil } -func (c *Conn) flush() error { - c.mux.Lock() - if c.closed { - c.mux.Unlock() - return net.ErrClosed +func (c *Conn) writev(in [][]byte) (int, error) { + size := 0 + for _, v := range in { + size += len(v) } - - if len(c.writeBuffer) == 0 { - if c.chWaitWrite != nil { - select { - case c.chWaitWrite <- struct{}{}: - default: - } + if c.overflow(size) { + return -1, errOverflow + } + if len(c.writeList) > 0 { + for _, v := range in { + t := newToWriteBuf(v) + c.appendWrite(t) } - c.mux.Unlock() - return nil + return size, nil } - old := c.writeBuffer - - n, err := c.doWrite(old) + n, err := writev(c.fd, in) if err != nil && !errors.Is(err, syscall.EINTR) && !errors.Is(err, syscall.EAGAIN) { - c.closed = true - c.mux.Unlock() - c.closeWithErrorWithoutLock(err) - return err - } - if n < 0 { - n = 0 + return n, err } - left := len(old) - n - if left > 0 { - if n > 0 { - c.writeBuffer = mempool.Malloc(left) - copy(c.writeBuffer, old[n:]) - mempool.Free(old) - } - // c.modWrite() - } else { - mempool.Free(old) - c.writeBuffer = nil - if c.wTimer != nil { - c.wTimer.Stop() - c.wTimer = nil - } - c.resetRead() - if c.chWaitWrite != nil { - select { - case c.chWaitWrite <- struct{}{}: - default: + nwrite := 0 + if n > 0 { + nwrite += n + if n < size { + for i := 0; i < len(in) && n > 0; i++ { + b := in[i] + if n >= len(b) { + n -= len(b) + t := newToWriteBuf(b) + c.appendWrite(t) + } else { + t := newToWriteBuf(b[len(b)-n:]) + c.appendWrite(t) + } } + c.left += (size - n) } } - c.mux.Unlock() - return nil + return nwrite, nil } -func (c *Conn) writev(in [][]byte) (int, error) { - size := 0 - for _, v := range in { - size += len(v) +func (c *Conn) appendWrite(t *toWrite) { + c.writeList = append(c.writeList, t) + if t.buf != nil { + c.left += len(t.buf) } - if c.overflow(size) { - return -1, errOverflow +} + +func (c *Conn) flush() error { + c.mux.Lock() + defer c.mux.Unlock() + if c.closed { + return net.ErrClosed } - if len(c.writeBuffer) > 0 { - for _, v := range in { - c.writeBuffer = mempool.Append(c.writeBuffer, v...) + + if len(c.writeList) == 0 { + return nil + } + + iovc := make([][]byte, 4)[0:0] + writeBuffers := func() error { + var ( + n int + err error + head *toWrite + ) + + if len(c.writeList) == 1 { + head = c.writeList[0] + buf := head.buf[head.offset:] + for len(buf) > 0 && err == nil { + n, err = syscall.Write(c.fd, buf) + if n > 0 { + c.left -= n + head.offset += int64(n) + buf = buf[n:] + if len(buf) == 0 { + releaseToWrite(head) + c.writeList = nil + } + } + } + return err } - return size, nil + + iovc = iovc[0:0] + for i := 0; i < len(c.writeList); i++ { + head = c.writeList[i] + if head.buf != nil { + iovc = append(iovc, head.buf[head.offset:]) + } else { + break + } + } + + for len(iovc) > 0 && err == nil { + n, err = writev(c.fd, iovc) + if n > 0 { + c.left -= n + for n > 0 { + head = c.writeList[0] + headLeft := len(head.buf) - int(head.offset) + if n < headLeft { + head.offset += int64(n) + iovc[0] = iovc[0][n:] + break + } else { + releaseToWrite(head) + c.writeList = c.writeList[1:] + if len(c.writeList) == 0 { + c.writeList = nil + } + iovc = iovc[1:] + n -= headLeft + } + } + } + } + return err } - if len(in) > 1 && size <= 65536 { - b := mempool.Malloc(size) - copied := 0 - for _, v := range in { - copy(b[copied:], v) - copied += len(v) + writeFile := func() error { + v := c.writeList[0] + for v.remain > 0 { + var offset = v.offset + n, err := syscall.Sendfile(c.fd, v.fd, &offset, int(v.remain)) + if n > 0 { + v.remain -= int64(n) + v.offset += int64(n) + if v.remain <= 0 { + releaseToWrite(c.writeList[0]) + c.writeList = c.writeList[1:] + } + } + if err != nil { + return err + } } - n, err := c.write(b) - mempool.Free(b) - return n, err + return nil } - nwrite := 0 - for _, b := range in { - n, err := c.write(b) - if n > 0 { - nwrite += n + for len(c.writeList) > 0 { + var err error + if c.writeList[0].fd == 0 { + err = writeBuffers() + } else { + err = writeFile() + } + + if errors.Is(err, syscall.EINTR) { + continue + } + if errors.Is(err, syscall.EAGAIN) { + c.modWrite() + return nil } if err != nil { - return nwrite, err + c.closed = true + c.closeWithErrorWithoutLock(err) + return err } } - return nwrite, nil + + return nil } func (c *Conn) doWrite(b []byte) (int, error) { @@ -524,7 +637,7 @@ func (c *Conn) doWrite(b []byte) (int, error) { func (c *Conn) overflow(n int) bool { g := c.p.g - return g.maxWriteBufferSize > 0 && (len(c.writeBuffer)+n > g.maxWriteBufferSize) + return g.maxWriteBufferSize > 0 && (c.left+n > g.maxWriteBufferSize) } func (c *Conn) closeWithError(err error) error { @@ -551,16 +664,11 @@ func (c *Conn) closeWithError(err error) error { func (c *Conn) closeWithErrorWithoutLock(err error) error { c.closeErr = err - if c.writeBuffer != nil { - mempool.Free(c.writeBuffer) - c.writeBuffer = nil - } - - if c.chWaitWrite != nil { - select { - case c.chWaitWrite <- struct{}{}: - default: + if c.writeList != nil { + for _, t := range c.writeList { + releaseToWrite(t) } + c.writeList = nil } if c.p.g != nil { diff --git a/go.mod b/go.mod index 708d05dc..dd31169f 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,6 @@ go 1.16 require ( github.com/lesismal/llib v1.1.12 - golang.org/x/crypto v0.6.0 // indirect + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/sys v0.16.0 ) diff --git a/go.sum b/go.sum index 8cb26f12..a43de983 100644 --- a/go.sum +++ b/go.sum @@ -4,33 +4,44 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210513122933-cd7d49e622d5/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= -golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/nbhttp/response.go b/nbhttp/response.go index 8fa32131..fd779825 100644 --- a/nbhttp/response.go +++ b/nbhttp/response.go @@ -132,15 +132,12 @@ func (res *Response) Write(data []byte) (int, error) { res.buffer = buf return l, nil } - nw, err := conn.Write(buf) + _, err = conn.Write(buf) mempool.Free(buf) - if err == nil { - return l, nil - } - if nw >= 4 { - return nw - 4, err + if err != nil { + return 0, err } - return -1, err + return l, nil } if len(res.header[contentLengthHeader]) > 0 { @@ -148,13 +145,16 @@ func (res *Response) Write(data []byte) (int, error) { buf := res.buffer res.buffer = nil - if buf == nil { - return conn.Write(buf) - } + // if buf == nil { + // return conn.Write(buf) + // } buf = mempool.Append(buf, data...) - nw, err := conn.Write(buf) + _, err := conn.Write(buf) mempool.Free(buf) - return nw, err + if err != nil { + return 0, err + } + return l, err } if res.bodyBuffer == nil { res.bodyBuffer = mempool.Malloc(l)[0:0] @@ -191,9 +191,22 @@ func (res *Response) ReadFrom(r io.Reader) (n int64, err error) { f, ok := r.(*os.File) if ok { - nc, ok := c.(interface { + rc := c + if hc, ok := c.(*Conn); ok { + rc = hc.Conn + } + nc, ok := rc.(interface { Sendfile(f *os.File, remain int64) (int64, error) }) + if !ok { + hc, ok2 := c.(*Conn) + if ok2 { + nc, ok = hc.Conn.(interface { + Sendfile(f *os.File, remain int64) (int64, error) + }) + } + + } if ok { ns, err := nc.Sendfile(f, lr.N) return ns, err @@ -257,9 +270,7 @@ func (res *Response) eoncodeHead() { const contentType = "Content-Type: text/plain; charset=utf-8\r\n" data = mempool.AppendString(data, contentType) } - - const contentLenthKey = "Content-Length" - if !res.chunked && len(res.header[contentLenthKey]) == 0 { + if !res.chunked && len(res.header[contentLengthHeader]) == 0 { const contentLenthPrefix = "Content-Length: " if !res.hasBody { data = mempool.AppendString(data, contentLenthPrefix) diff --git a/nbio_test.go b/nbio_test.go index a2fe9de0..d3117fc8 100644 --- a/nbio_test.go +++ b/nbio_test.go @@ -140,7 +140,8 @@ func TestSendfile(t *testing.T) { log.Panicf("write 'sendfile' failed: %v", err) } - if _, err := io.ReadFull(conn, buf); err != nil { + _, err := io.ReadFull(conn, buf) + if err != nil { log.Panicf("read file failed: %v", err) } } diff --git a/poller_epoll.go b/poller_epoll.go index a6f4e94b..a42e5e0e 100644 --- a/poller_epoll.go +++ b/poller_epoll.go @@ -349,7 +349,7 @@ func (c *Conn) ResetPollerEvent() { g := p.g fd := c.fd if !c.closed && g.epollMod == EPOLLET && g.epollOneshot == EPOLLONESHOT { - if len(c.writeBuffer) == 0 { + if len(c.writeList) == 0 { p.resetRead(fd) } else { p.modWrite(fd) diff --git a/sendfile_unix.go b/sendfile_unix.go new file mode 100644 index 00000000..77863ff1 --- /dev/null +++ b/sendfile_unix.go @@ -0,0 +1,109 @@ +// Copyright 2020 lesismal. All rights reserved. +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +//go:build linux || darwin || netbsd || freebsd || openbsd || dragonfly +// +build linux darwin netbsd freebsd openbsd dragonfly + +package nbio + +import ( + "errors" + "io" + "net" + "os" + "syscall" +) + +const maxSendfileSize = 4 << 20 + +// Sendfile . +func (c *Conn) Sendfile(f *os.File, remain int64) (int64, error) { + if f == nil { + return 0, nil + } + + c.mux.Lock() + defer c.mux.Unlock() + if c.closed { + return 0, net.ErrClosed + } + + offset, err := f.Seek(0, io.SeekCurrent) + if err != nil { + return 0, err + } + stat, err := f.Stat() + if err != nil { + c.closeWithErrorWithoutLock(err) + return 0, err + } + size := stat.Size() + if (remain <= 0) || (remain > size-offset) { + remain = size - offset + } + + if len(c.writeList) > 0 { + src, err := syscall.Dup(int(f.Fd())) + if err != nil { + c.closeWithErrorWithoutLock(err) + return 0, err + } + t := newToWriteFile(src, offset, remain) + c.appendWrite(t) + return remain, nil + } + + c.p.g.beforeWrite(c) + + var ( + n int + src = int(f.Fd()) + dst = c.fd + total = remain + ) + + err = syscall.SetNonblock(src, true) + if err != nil { + c.closeWithErrorWithoutLock(err) + return 0, err + } + + for remain > 0 { + n = maxSendfileSize + if int64(n) > remain { + n = int(remain) + } + var tmpOffset = offset + n, err = syscall.Sendfile(dst, src, &tmpOffset, n) + if n > 0 { + remain -= int64(n) + offset += int64(n) + } else if n == 0 && err == nil { + break + } + if errors.Is(err, syscall.EINTR) { + continue + } + if errors.Is(err, syscall.EAGAIN) { + src, err = syscall.Dup(src) + if err == nil { + t := newToWriteFile(src, offset, remain) + c.appendWrite(t) + // err = syscall.SetNonblock(src, true) + // if err != nil { + // c.closeWithErrorWithoutLock(err) + // return 0, err + // } + c.modWrite() + } + break + } + if err != nil { + c.closeWithErrorWithoutLock(err) + return 0, err + } + } + + return total, nil +} diff --git a/writev_bsd.go b/writev_bsd.go new file mode 100644 index 00000000..40516086 --- /dev/null +++ b/writev_bsd.go @@ -0,0 +1,28 @@ +// Copyright 2020 lesismal. All rights reserved. +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +//go:build darwin || netbsd || freebsd || openbsd || dragonfly +// +build darwin netbsd freebsd openbsd dragonfly + +package nbio + +import ( + "syscall" + + "github.com/lesismal/nbio/mempool" +) + +func writev(fd int, iovs [][]byte) (int, error) { + size := 0 + for _, v := range iovs { + size += len(v) + } + buf := mempool.Malloc(size)[:0] + for _, v := range iovs { + buf = append(buf, v...) + } + n, err := syscall.Write(fd, buf) + mempool.Free(buf) + return n, err +} diff --git a/writev_linux.go b/writev_linux.go new file mode 100644 index 00000000..cc463364 --- /dev/null +++ b/writev_linux.go @@ -0,0 +1,14 @@ +// Copyright 2020 lesismal. All rights reserved. +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +//go:build linux +// +build linux + +package nbio + +import "golang.org/x/sys/unix" + +func writev(fd int, iovs [][]byte) (int, error) { + return unix.Writev(fd, iovs) +} From 9725a36dc21457f4a8dfa564ad9498ee85720aa8 Mon Sep 17 00:00:00 2001 From: lesismal Date: Thu, 18 Jan 2024 16:10:46 +0800 Subject: [PATCH 2/6] tidy --- sendfile_bsd.go | 62 ----------------------------- sendfile_linux.go | 99 ----------------------------------------------- 2 files changed, 161 deletions(-) delete mode 100644 sendfile_bsd.go delete mode 100644 sendfile_linux.go diff --git a/sendfile_bsd.go b/sendfile_bsd.go deleted file mode 100644 index 07f03aae..00000000 --- a/sendfile_bsd.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2020 lesismal. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -//go:build darwin || netbsd || freebsd || openbsd || dragonfly -// +build darwin netbsd freebsd openbsd dragonfly - -package nbio - -import ( - "io" - "os" - - "github.com/lesismal/nbio/mempool" -) - -// Sendfile . -func (c *Conn) Sendfile(f *os.File, remain int64) (written int64, err error) { - if f == nil { - return 0, nil - } - - if remain <= 0 { - stat, err := f.Stat() - if err != nil { - return 0, err - } - remain = stat.Size() - } - - for remain > 0 { - bufLen := 1024 * 32 - if bufLen > int(remain) { - bufLen = int(remain) - } - buf := mempool.Malloc(bufLen) - nr, er := f.Read(buf) - if nr > 0 { - nw, ew := c.Write(buf[0:nr]) - if nw < 0 { - nw = 0 - } - remain -= int64(nw) - written += int64(nw) - if ew != nil { - err = ew - break - } - if nr != nw { - err = io.ErrShortWrite - break - } - } - if er != nil { - if er != io.EOF { - err = er - } - break - } - } - return written, err -} diff --git a/sendfile_linux.go b/sendfile_linux.go deleted file mode 100644 index 188eb3fb..00000000 --- a/sendfile_linux.go +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright 2020 lesismal. All rights reserved. -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file. - -//go:build linux -// +build linux - -package nbio - -import ( - "errors" - "net" - "os" - "syscall" -) - -const maxSendfileSize = 4 << 20 - -// Sendfile . -func (c *Conn) Sendfile(f *os.File, remain int64) (int64, error) { - if f == nil { - return 0, nil - } - c.mux.Lock() - if c.closed { - c.mux.Unlock() - return -1, net.ErrClosed - } - - if remain <= 0 { - stat, err := f.Stat() - if err != nil { - return 0, err - } - remain = stat.Size() - } - - if len(c.writeBuffer) > 0 { - if c.chWaitWrite == nil { - c.chWaitWrite = make(chan struct{}, 1) - } - c.mux.Unlock() - <-c.chWaitWrite - if c.closed { - c.chWaitWrite = nil - return -1, net.ErrClosed - } - c.mux.Lock() - } - - c.p.g.beforeWrite(c) - - var ( - err error - n int - src = int(f.Fd()) - dst = c.fd - total = remain - ) - - for remain > 0 { - n = maxSendfileSize - if int64(n) > remain { - n = int(remain) - } - n, err = syscall.Sendfile(dst, src, nil, n) - if n > 0 { - remain -= int64(n) - } else if n == 0 && err == nil { - break - } - if errors.Is(err, syscall.EINTR) { - continue - } - if errors.Is(err, syscall.EAGAIN) { - c.modWrite() - if c.chWaitWrite == nil { - c.chWaitWrite = make(chan struct{}, 1) - } - c.mux.Unlock() - <-c.chWaitWrite - c.chWaitWrite = nil - if c.closed { - return total - remain, err - } - c.mux.Lock() - continue - } - if err != nil { - c.closeWithErrorWithoutLock(err) - c.mux.Unlock() - return total - remain, err - } - } - - c.chWaitWrite = nil - c.mux.Unlock() - return total - remain, err -} From d4d83ebea89adc98472c6a6e31a01379e3506ac5 Mon Sep 17 00:00:00 2001 From: lesismal Date: Thu, 18 Jan 2024 16:51:50 +0800 Subject: [PATCH 3/6] lint --- conn_unix.go | 4 +++- lmux/lmux.go | 4 ++-- nbhttp/engine.go | 4 ++-- poller_epoll.go | 4 ++-- poller_kqueue.go | 16 +++++++++------- poller_std.go | 6 ++++-- sendfile_unix.go | 4 ++-- 7 files changed, 24 insertions(+), 18 deletions(-) diff --git a/conn_unix.go b/conn_unix.go index dc2dd815..2ad7e012 100644 --- a/conn_unix.go +++ b/conn_unix.go @@ -606,7 +606,7 @@ func (c *Conn) flush() error { continue } if errors.Is(err, syscall.EAGAIN) { - c.modWrite() + // c.modWrite() return nil } if err != nil { @@ -616,6 +616,8 @@ func (c *Conn) flush() error { } } + c.resetRead() + return nil } diff --git a/lmux/lmux.go b/lmux/lmux.go index 58791dd7..c9c6091c 100644 --- a/lmux/lmux.go +++ b/lmux/lmux.go @@ -75,8 +75,8 @@ func (lm *ListenerMux) Start() { c, err := l.Accept() if err != nil { var ne net.Error - if ok := errors.As(err, &ne); ok && ne.Temporary() { - logging.Error("Accept failed: temporary error, retrying...") + if ok := errors.As(err, &ne); ok && ne.Timeout() { + logging.Error("Accept failed: timeout error, retrying...") time.Sleep(time.Second / 20) continue } else { diff --git a/nbhttp/engine.go b/nbhttp/engine.go index c0a2fb76..b275313d 100644 --- a/nbhttp/engine.go +++ b/nbhttp/engine.go @@ -298,8 +298,8 @@ func (e *Engine) listen(ln net.Listener, tlsConfig *tls.Config, addConn func(*Co addConn(&Conn{Conn: conn}, tlsConfig, decrease) } else { var ne net.Error - if ok := errors.As(err, &ne); ok && ne.Temporary() { - logging.Error("Accept failed: temporary error, retrying...") + if ok := errors.As(err, &ne); ok && ne.Timeout() { + logging.Error("Accept failed: timeout error, retrying...") time.Sleep(time.Second / 20) } else { if !e.shutdown { diff --git a/poller_epoll.go b/poller_epoll.go index a42e5e0e..ac606c6a 100644 --- a/poller_epoll.go +++ b/poller_epoll.go @@ -140,8 +140,8 @@ func (p *poller) acceptorLoop() { p.g.pollers[c.Hash()%len(p.g.pollers)].addConn(c) } else { var ne net.Error - if ok := errors.As(err, &ne); ok && ne.Temporary() { - logging.Error("NBIO[%v][%v_%v] Accept failed: temporary error, retrying...", p.g.Name, p.pollType, p.index) + if ok := errors.As(err, &ne); ok && ne.Timeout() { + logging.Error("NBIO[%v][%v_%v] Accept failed: timeout error, retrying...", p.g.Name, p.pollType, p.index) time.Sleep(time.Second / 20) } else { if !p.shutdown { diff --git a/poller_kqueue.go b/poller_kqueue.go index 22a6c1b6..d65cfb7a 100644 --- a/poller_kqueue.go +++ b/poller_kqueue.go @@ -8,6 +8,7 @@ package nbio import ( + "errors" "fmt" "io" "net" @@ -32,7 +33,6 @@ const ( ) const ( - // for build IPPROTO_TCP = 0 TCP_KEEPINTVL = 0 TCP_KEEPIDLE = 0 @@ -147,10 +147,10 @@ func (p *poller) readWrite(ev *syscall.Kevent_t) { p.g.onData(rc, buffer[:n]) } p.g.payback(c, buffer) - if err == syscall.EINTR { + if errors.Is(err, syscall.EINTR) { continue } - if err == syscall.EAGAIN { + if errors.Is(err, syscall.EAGAIN) { return } if (err != nil || n == 0) && ev.Flags&syscall.EV_DELETE == 0 { @@ -205,15 +205,17 @@ func (p *poller) acceptorLoop() { for !p.shutdown { conn, err := p.listener.Accept() if err == nil { - c, err := NBConn(conn) + var c *Conn + c, err = NBConn(conn) if err != nil { conn.Close() continue } p.g.pollers[c.Hash()%len(p.g.pollers)].addConn(c) } else { - if ne, ok := err.(net.Error); ok && ne.Temporary() { - logging.Error("NBIO[%v][%v_%v] Accept failed: temporary error, retrying...", p.g.Name, p.pollType, p.index) + var ne net.Error + if ok := errors.As(err, &ne); ok && ne.Timeout() { + logging.Error("NBIO[%v][%v_%v] Accept failed: timeout error, retrying...", p.g.Name, p.pollType, p.index) time.Sleep(time.Second / 20) } else { if !p.shutdown { @@ -241,7 +243,7 @@ func (p *poller) readWriteLoop() { p.eventList = nil p.mux.Unlock() n, err := syscall.Kevent(p.kfd, changes, events, nil) - if err != nil && err != syscall.EINTR { + if err != nil && errors.Is(err, syscall.EINTR) { return } diff --git a/poller_std.go b/poller_std.go index 8b8b8f04..01b8a934 100644 --- a/poller_std.go +++ b/poller_std.go @@ -8,6 +8,7 @@ package nbio import ( + "errors" "net" "runtime" "time" @@ -109,8 +110,9 @@ func (p *poller) start() { for !p.shutdown { err = p.accept() if err != nil { - if ne, ok := err.(net.Error); ok && ne.Temporary() { - logging.Error("NBIO[%v][%v_%v] Accept failed: temporary error, retrying...", p.g.Name, p.pollType, p.index) + var ne net.Error + if ok := errors.As(err, &ne); ok && ne.Timeout() { + logging.Error("NBIO[%v][%v_%v] Accept failed: timeout error, retrying...", p.g.Name, p.pollType, p.index) time.Sleep(time.Second / 20) } else { if !p.shutdown { diff --git a/sendfile_unix.go b/sendfile_unix.go index 77863ff1..d32c43b5 100644 --- a/sendfile_unix.go +++ b/sendfile_unix.go @@ -43,8 +43,9 @@ func (c *Conn) Sendfile(f *os.File, remain int64) (int64, error) { remain = size - offset } + var src int if len(c.writeList) > 0 { - src, err := syscall.Dup(int(f.Fd())) + src, err = syscall.Dup(int(f.Fd())) if err != nil { c.closeWithErrorWithoutLock(err) return 0, err @@ -58,7 +59,6 @@ func (c *Conn) Sendfile(f *os.File, remain int64) (int64, error) { var ( n int - src = int(f.Fd()) dst = c.fd total = remain ) From d1c756e5cde99c6e194e6aed48ccc7215c18debe Mon Sep 17 00:00:00 2001 From: lesismal Date: Thu, 18 Jan 2024 16:52:42 +0800 Subject: [PATCH 4/6] lint --- .github/workflows/golangci-lint.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index faebaa11..b0736da9 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -21,7 +21,7 @@ jobs: uses: golangci/golangci-lint-action@v3.4.0 with: # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. - version: v1.52.2 + version: v1.55.2 # Optional: working directory, useful for monorepos # working-directory: somedir From 39b405058132510fbc84410320145cbc47e0d04e Mon Sep 17 00:00:00 2001 From: lesismal Date: Thu, 18 Jan 2024 17:16:08 +0800 Subject: [PATCH 5/6] tidy --- sendfile_unix.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sendfile_unix.go b/sendfile_unix.go index d32c43b5..180a8c08 100644 --- a/sendfile_unix.go +++ b/sendfile_unix.go @@ -43,9 +43,9 @@ func (c *Conn) Sendfile(f *os.File, remain int64) (int64, error) { remain = size - offset } - var src int + src := int(f.Fd()) if len(c.writeList) > 0 { - src, err = syscall.Dup(int(f.Fd())) + src, err = syscall.Dup(src) if err != nil { c.closeWithErrorWithoutLock(err) return 0, err From 38f45dbae5e1f542f7f3e6007c08c429331443d2 Mon Sep 17 00:00:00 2001 From: lesismal Date: Thu, 18 Jan 2024 18:02:40 +0800 Subject: [PATCH 6/6] lint --- nbhttp/client.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/nbhttp/client.go b/nbhttp/client.go index 6398f5f8..058a0173 100644 --- a/nbhttp/client.go +++ b/nbhttp/client.go @@ -360,6 +360,8 @@ func (s *proxySocks5) Dial(network, addr string) (net.Conn, error) { return conn, nil } +const errProxyAtSocks5Prefix = "proxy: SOCKS5 proxy at " + func (s *proxySocks5) connect(conn net.Conn, target string) error { host, portStr, err := net.SplitHostPort(target) if err != nil { @@ -392,10 +394,10 @@ func (s *proxySocks5) connect(conn net.Conn, target string) error { return errors.New("proxy: failed to read greeting from SOCKS5 proxy at " + s.addr + ": " + err.Error()) } if buf[0] != 5 { - return errors.New("proxy: SOCKS5 proxy at " + s.addr + " has unexpected version " + strconv.Itoa(int(buf[0]))) + return errors.New(errProxyAtSocks5Prefix + s.addr + " has unexpected version " + strconv.Itoa(int(buf[0]))) } if buf[1] == 0xff { - return errors.New("proxy: SOCKS5 proxy at " + s.addr + " requires authentication") + return errors.New(errProxyAtSocks5Prefix + s.addr + " requires authentication") } // See RFC 1929 @@ -416,7 +418,7 @@ func (s *proxySocks5) connect(conn net.Conn, target string) error { } if buf[1] != 0 { - return errors.New("proxy: SOCKS5 proxy at " + s.addr + " rejected username/password") + return errors.New(errProxyAtSocks5Prefix + s.addr + " rejected username/password") } } @@ -455,7 +457,7 @@ func (s *proxySocks5) connect(conn net.Conn, target string) error { } if len(failure) > 0 { - return errors.New("proxy: SOCKS5 proxy at " + s.addr + " failed to connect: " + failure) + return errors.New(errProxyAtSocks5Prefix + s.addr + " failed to connect: " + failure) } var bytesToDiscard int