forked from devsisters/goquic
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream.go
107 lines (91 loc) · 3.26 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package goquic
// #include <stddef.h>
// #include "src/adaptor.h"
import "C"
import (
"net/http"
"unsafe"
)
// (For Quic(Server|Client)Stream)
type DataStreamProcessor interface {
ProcessData(writer QuicStream, buffer []byte) int
// Called when there's nothing to read. Called on server XXX(serialx): Not called on client
OnFinRead(writer QuicStream)
// Called when the connection is closed. Called on client XXX(serialx): Not called on server
OnClose(writer QuicStream)
}
// (For QuicServerSession)
type IncomingDataStreamCreator interface {
CreateIncomingDynamicStream(streamId uint32) DataStreamProcessor
}
// (For QuicClientSession)
type OutgoingDataStreamCreator interface {
CreateOutgoingDynamicStream() DataStreamProcessor
}
type QuicStream interface {
UserStream() DataStreamProcessor
WriteHeader(header http.Header, is_body_empty bool)
WriteOrBufferData(body []byte, fin bool)
CloseReadSide()
}
/*
(Incoming/Outgoing)DataStreamCreator (a.k.a Session)
|
| creates domain-specific stream (i.e. spdy, ...)
v
QuicStream -- owns --> DataStreamProcessor
*/
//export CreateIncomingDynamicStream
func CreateIncomingDynamicStream(session_c unsafe.Pointer, stream_id uint32, wrapper_c unsafe.Pointer) unsafe.Pointer {
session := (*QuicServerSession)(session_c)
userStream := session.streamCreator.CreateIncomingDynamicStream(stream_id)
stream := &QuicServerStream{
userStream: userStream,
session: session,
wrapper: wrapper_c,
}
// This is to prevent garbage collection. This is cleaned up on QuicServerStream.OnClose()
session.quicServerStreams[stream] = true
return unsafe.Pointer(stream)
}
//export DataStreamProcessorProcessData
func DataStreamProcessorProcessData(go_data_stream_processor_c unsafe.Pointer, data unsafe.Pointer, data_len uint32, isServer int) uint32 {
var stream QuicStream
if isServer > 0 {
stream = (*QuicServerStream)(go_data_stream_processor_c)
} else {
stream = (*QuicClientStream)(go_data_stream_processor_c)
}
buf := C.GoBytes(data, C.int(data_len))
return uint32(stream.UserStream().ProcessData(stream, buf))
}
//export DataStreamProcessorOnFinRead
func DataStreamProcessorOnFinRead(go_data_stream_processor_c unsafe.Pointer, isServer int) {
var stream QuicStream
if isServer > 0 {
stream = (*QuicServerStream)(go_data_stream_processor_c)
} else {
stream = (*QuicClientStream)(go_data_stream_processor_c)
}
stream.UserStream().OnFinRead(stream)
}
//export DataStreamProcessorOnClose
func DataStreamProcessorOnClose(go_data_stream_processor_c unsafe.Pointer, isServer int) {
var stream QuicStream
if isServer > 0 {
stream = (*QuicServerStream)(go_data_stream_processor_c)
} else {
stream = (*QuicClientStream)(go_data_stream_processor_c)
}
stream.UserStream().OnClose(stream)
}
//export UnregisterQuicServerStreamFromSession
func UnregisterQuicServerStreamFromSession(go_stream_c unsafe.Pointer) {
stream := (*QuicServerStream)(go_stream_c)
delete(stream.session.quicServerStreams, stream)
}
//export UnregisterQuicClientStreamFromSession
func UnregisterQuicClientStreamFromSession(go_stream_c unsafe.Pointer) {
stream := (*QuicClientStream)(go_stream_c)
delete(stream.session.quicClientStreams, stream)
}