diff --git a/nbhttp/engine.go b/nbhttp/engine.go index a1584d2d..af0a7c76 100644 --- a/nbhttp/engine.go +++ b/nbhttp/engine.go @@ -251,6 +251,7 @@ type Engine struct { BaseCtx context.Context Cancel func() + SyncCall func(f func()) ExecuteClient func(f func()) // isOneshot bool @@ -1044,6 +1045,7 @@ func NewEngine(conf Config) *Engine { } conf.Handler = handler + var serverCall = func(f func()) { f() } var serverExecutor = conf.ServerExecutor var messageHandlerExecutePool *taskpool.TaskPool if serverExecutor == nil { @@ -1053,6 +1055,7 @@ func NewEngine(conf Config) *Engine { nativeSize := conf.MessageHandlerPoolSize - 1 messageHandlerExecutePool = taskpool.New(nativeSize, 1024*64) serverExecutor = messageHandlerExecutePool.Go + serverCall = messageHandlerExecutePool.Call } var clientExecutor = conf.ClientExecutor @@ -1135,6 +1138,7 @@ func NewEngine(conf Config) *Engine { emptyRequest: (&http.Request{}).WithContext(baseCtx), BaseCtx: baseCtx, Cancel: cancel, + SyncCall: serverCall, } // shouldSupportTLS := !conf.SupportServerOnly || len(conf.AddrsTLS) > 0 diff --git a/nbhttp/websocket/conn.go b/nbhttp/websocket/conn.go index 9dfe3bdd..d523e77e 100644 --- a/nbhttp/websocket/conn.go +++ b/nbhttp/websocket/conn.go @@ -167,7 +167,9 @@ func (c *Conn) handleDataFrame(opcode MessageType, fin bool, pbody *[]byte) { if c.releasePayload { defer c.Engine.BodyAllocator.Free(pbody) } - h(c, opcode, fin, *pbody) + c.Engine.SyncCall(func() { + h(c, opcode, fin, *pbody) + }) } else { if !c.Execute(func() { if c.releasePayload { @@ -188,7 +190,9 @@ func (c *Conn) handleMessage(opcode MessageType, pbody *[]byte) { if c.releasePayload { defer c.Engine.BodyAllocator.Free(pbody) } - c.handleWsMessage(opcode, *pbody) + c.Engine.SyncCall(func() { + c.handleWsMessage(opcode, *pbody) + }) } else { if !c.Execute(func() { if c.releasePayload { @@ -209,7 +213,9 @@ func (c *Conn) handleProtocolMessage(opcode MessageType, pbody *[]byte) { if c.releasePayload { defer c.Engine.BodyAllocator.Free(pbody) } - c.handleWsMessage(opcode, *pbody) + c.Engine.SyncCall(func() { + c.handleWsMessage(opcode, *pbody) + }) } else { if !c.Execute(func() { if c.releasePayload { diff --git a/taskpool/iotaskpool.go b/taskpool/iotaskpool.go index 720926be..7ee43283 100644 --- a/taskpool/iotaskpool.go +++ b/taskpool/iotaskpool.go @@ -10,6 +10,17 @@ type IOTaskPool struct { pool sync.Pool } +// Call . +// +//go:norace +func (tp *IOTaskPool) Call(f func([]byte)) { + tp.task.Call(func() { + pbuf := tp.pool.Get().(*[]byte) + f(*pbuf) + tp.pool.Put(pbuf) + }) +} + // Go . // //go:norace diff --git a/taskpool/taskpool.go b/taskpool/taskpool.go index 171d77e4..b368f8b3 100644 --- a/taskpool/taskpool.go +++ b/taskpool/taskpool.go @@ -45,6 +45,13 @@ func (tp *TaskPool) fork(f func()) bool { return false } +// Call . +// +//go:norace +func (tp *TaskPool) Call(f func()) { + tp.caller(f) +} + // Go . // //go:norace