forked from devsisters/goquic
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatastream_client.go
127 lines (107 loc) · 3.04 KB
/
datastream_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package goquic
import (
"bytes"
"errors"
"io"
"net/http"
"time"
"github.com/devsisters/goquic/spdy"
"github.com/oleiade/lane"
)
// implement OutgoingDataStreamCreator for Client
type SpdyClientSession struct {
conn *Conn
}
func (c *SpdyClientSession) CreateOutgoingDynamicStream() DataStreamProcessor {
return &SpdyClientStream{
conn: c.conn,
}
}
// implement DataStreamProcessor for Client
type SpdyClientStream struct {
conn *Conn
quicClientStream *QuicClientStream
pendingReads *lane.Queue
buf bytes.Buffer
header http.Header
headerParsed bool
// True writeFinished means that this stream is half-closed on our side
writeFinished bool
// True when connection is closed fully
closed bool
}
func (s *SpdyClientStream) ProcessData(writer QuicStream, buffer []byte) int {
//cs.conn.buffer.Write(buffer)
s.pendingReads.Enqueue(buffer)
return len(buffer)
}
func (s *SpdyClientStream) OnFinRead(writer QuicStream) {
// XXX(serialx): This does not seem to be called at all?
}
func (s *SpdyClientStream) OnClose(writer QuicStream) {
s.closed = true
}
func (s *SpdyClientStream) ReadHeader() (http.Header, error) {
if !s.headerParsed {
// Read until header parsing is successful
for {
for s.pendingReads.Empty() {
s.conn.waitForEvents()
}
_, err := s.buf.Write(s.pendingReads.Dequeue().([]byte))
if err != nil {
return nil, err
}
headerBuf := bytes.NewBuffer(s.buf.Bytes()) // Create a temporary buf just in case for parsing failure
header, err := spdy.ParseHeaders(headerBuf)
if err == nil { // If parsing successful
// XXX(serialx): Is it correct to assume headers are in proper packet frame boundary?
// What if theres some parts of body left in headerBuf?
s.header = header
s.headerParsed = true
break
}
}
}
return s.header, nil
}
func (s *SpdyClientStream) Read(buf []byte) (int, error) {
s.conn.processEventsWithDeadline(time.Now()) // Process any pending events
// We made sure we've processed all events. So pendingReads.Empty() means that it is really empty
if s.closed && s.pendingReads.Empty() {
return 0, io.EOF
}
if !s.headerParsed {
s.ReadHeader()
}
// Wait for body
for s.pendingReads.Empty() {
s.conn.waitForEvents()
if s.closed && s.pendingReads.Empty() {
return 0, io.EOF
}
}
buffer := s.pendingReads.Dequeue().([]byte)
return copy(buf, buffer), nil // XXX(serialx): Must do buffering to respect io.Reader specs
}
func (s *SpdyClientStream) WriteHeader(header http.Header, isBodyEmpty bool) {
s.quicClientStream.WriteHeader(header, isBodyEmpty)
if isBodyEmpty {
s.writeFinished = true
}
}
func (s *SpdyClientStream) Write(buf []byte) (int, error) {
if s.writeFinished {
return 0, errors.New("Write already finished")
}
s.quicClientStream.WriteOrBufferData(buf, false)
return len(buf), nil
}
func (s *SpdyClientStream) FinWrite() error {
if s.writeFinished {
return errors.New("Write already finished")
}
s.quicClientStream.WriteOrBufferData(nil, true)
s.writeFinished = true
return nil
}