-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathdecoder.go
200 lines (180 loc) · 4.5 KB
/
decoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package gosmparse
import (
"bytes"
"compress/zlib"
"encoding/binary"
"fmt"
"io"
"runtime"
"sync"
"github.com/thomersch/gosmparse/OSMPBF"
)
// A Decoder reads and decodes OSM data from an input stream.
type Decoder struct {
// QueueSize allows to tune the memory usage vs. parse speed.
// A larger QueueSize will consume more memory, but may speed up the parsing process.
QueueSize int
Workers int
r io.Reader
o OSMReader
denseInfoFn denseInfoFn
infoFn infoFn
}
type denseInfoFn func(i *OSMPBF.DenseInfo, ds *denseState, index int) *Info
type infoFn func(i *OSMPBF.Info, gran int64, st []string) *Info
// NewDecoder returns a new decoder that reads from r.
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{
r: r,
QueueSize: 200,
// By default the decoder ignores the Info fields.
denseInfoFn: func(i *OSMPBF.DenseInfo, ds *denseState, index int) *Info { return nil },
infoFn: func(i *OSMPBF.Info, gran int64, st []string) *Info { return nil },
}
}
// NewDecoderWithInfo returns a new decoder similar to NewDecoder, but will
// populate the Info field in the elements. Use this if you need meta data.
func NewDecoderWithInfo(r io.Reader) *Decoder {
return &Decoder{
r: r,
QueueSize: 200,
denseInfoFn: denseInfo,
infoFn: info,
}
}
// Parse starts the parsing process that will stream data into the given OSMReader.
func (d *Decoder) Parse(o OSMReader) error {
d.o = o
header, _, err := d.block()
if err != nil {
return err
}
// TODO: parser checks
if header.GetType() != "OSMHeader" {
return fmt.Errorf("Invalid header of first data block. Wanted: OSMHeader, have: %s", header.GetType())
}
errChan := make(chan error)
// feeder
blobs := make(chan *OSMPBF.Blob, d.QueueSize)
go func() {
defer close(blobs)
for {
_, blob, err := d.block()
if err != nil {
if err == io.EOF {
return
}
errChan <- err
return
}
blobs <- blob
}
}()
if d.Workers == 0 {
d.Workers = runtime.GOMAXPROCS(0)
}
var wg sync.WaitGroup
for i := 0; i < d.Workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for blob := range blobs {
err := d.readElements(blob)
if err != nil {
errChan <- err
return
}
}
}()
}
finished := make(chan bool)
go func() {
wg.Wait()
finished <- true
}()
select {
case err = <-errChan:
return err
case <-finished:
return nil
}
}
func (d *Decoder) block() (*OSMPBF.BlobHeader, *OSMPBF.Blob, error) {
// BlobHeaderLength
headerSizeBuf := make([]byte, 4)
if _, err := io.ReadFull(d.r, headerSizeBuf); err != nil {
return nil, nil, err
}
headerSize := binary.BigEndian.Uint32(headerSizeBuf)
// BlobHeader
headerBuf := make([]byte, headerSize)
if _, err := io.ReadFull(d.r, headerBuf); err != nil {
return nil, nil, err
}
blobHeader := new(OSMPBF.BlobHeader)
if err := blobHeader.UnmarshalVT(headerBuf); err != nil {
return nil, nil, err
}
// Blob
blobBuf := make([]byte, blobHeader.GetDatasize())
_, err := io.ReadFull(d.r, blobBuf)
if err != nil {
return nil, nil, err
}
blob := new(OSMPBF.Blob)
if err := blob.UnmarshalVT(blobBuf); err != nil {
return nil, nil, err
}
return blobHeader, blob, nil
}
func (d *Decoder) readElements(blob *OSMPBF.Blob) error {
pb, err := d.blobData(blob)
if err != nil {
return err
}
for _, pg := range pb.Primitivegroup {
switch {
case pg.Dense != nil:
denseNode(d.o, pb, pg.Dense, d.denseInfoFn)
case len(pg.Ways) != 0:
if err := way(d.o, pb, pg.Ways, d.infoFn); err != nil {
return err
}
case len(pg.Relations) != 0:
if err := relation(d.o, pb, pg.Relations, d.infoFn); err != nil {
return err
}
case len(pg.Nodes) != 0:
return fmt.Errorf("Nodes are not supported")
default:
return fmt.Errorf("no supported data in primitive group")
}
}
return nil
}
// should be concurrency safe
func (d *Decoder) blobData(blob *OSMPBF.Blob) (*OSMPBF.PrimitiveBlock, error) {
buf := make([]byte, blob.GetRawSize())
switch {
case blob.Raw != nil:
buf = blob.Raw
case blob.ZlibData != nil:
r, err := zlib.NewReader(bytes.NewReader(blob.GetZlibData()))
if err != nil {
return nil, err
}
defer r.Close()
n, err := io.ReadFull(r, buf)
if err != nil {
return nil, err
}
if n != int(blob.GetRawSize()) {
return nil, fmt.Errorf("expected %v bytes, read %v", blob.GetRawSize(), n)
}
default:
return nil, fmt.Errorf("found block with unknown data")
}
var primitiveBlock = &OSMPBF.PrimitiveBlock{}
err := primitiveBlock.UnmarshalVT(buf)
return primitiveBlock, err
}