forked from qedus/osmpbf
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathgosmonaut.go
583 lines (505 loc) · 12 KB
/
gosmonaut.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
package gosmonaut
import (
"errors"
"fmt"
"io"
"runtime"
"sync"
"time"
)
/* Gosmonaut */
type osmPair struct {
i OSMEntity
e error
}
// Config defines the configuration for Gosmonaut.
type Config struct {
// DebugMode prints duration and memory info and runs the garbage collector
// after every processing step if enabled.
DebugMode bool
// PrintWarnings prints warnings to stdout if enabled. Possible warnings
// include missing referenced entites or unsupported features.
PrintWarnings bool
// Set the number of processes that are used for decoding.
// If not set the amount of available logical CPUs will be used.
NumProcessors int
// Decoder sets the PBF blob decoder. Defaults to `FastDecoder`.
Decoder DecoderType
}
// Gosmonaut is responsible for decoding an OpenStreetMap pbf file.
// For creating an instance the NewGosmonaut() function must be used.
type Gosmonaut struct {
dec *decoder
stream chan osmPair
lock sync.Mutex
// Store header block
header Header
// Defined by caller
types OSMTypeSet
funcEntityNeeded func(OSMType, OSMTags) bool
// ID trackers
nodeIDTracker, wayIDTracker idTracker
// Entitiy caches
nodeCache *binaryNodeEntityMap
wayCache *binaryWayEntityMap
nodeTags map[int64]OSMTags
wayTags map[int64]OSMTags
// For debug mode
debugMode bool
printWarnings bool
timeStarted time.Time
timeLast time.Time
}
// NewGosmonaut creates a new Gosmonaut instance and parses the meta information
// (Header) of the given file. Either zero or exactly one `Config` object must
// be passed.
func NewGosmonaut(file io.ReadSeeker, config ...Config) (*Gosmonaut, error) {
// Check config
var conf Config
if len(config) > 1 {
return nil, errors.New("only 1 Config object is allowed")
} else if len(config) == 1 {
conf = config[0]
}
// Get number of processes
var nProcs int
if conf.NumProcessors < 1 {
nProcs = runtime.NumCPU()
} else {
nProcs = conf.NumProcessors
}
// Create decoder
dec, header, err := newDecoder(file, nProcs, conf.Decoder)
if err != nil {
return nil, err
}
return &Gosmonaut{
dec: dec,
header: header,
debugMode: conf.DebugMode,
printWarnings: conf.PrintWarnings,
}, nil
}
// Header returns the meta information of the PBF file.
func (g *Gosmonaut) Header() Header {
return g.header
}
// Typical PrimitiveBlock contains 8k OSM entities
const entitiesPerPrimitiveBlock = 8000
// Start starts the decoding process. The function call will block until the
// previous run has finished.
// Only types that are enabled in `types` will be sent to the caller.
// funcEntityNeeded will be called to determine if the caller needs a specific
// OSM entity.
// Found entities and encountered errors can be received by polling the Next()
// method.
func (g *Gosmonaut) Start(
types OSMTypeSet,
funcEntityNeeded func(OSMType, OSMTags) bool,
) {
// Block until previous run finished
g.lock.Lock()
g.stream = make(chan osmPair, entitiesPerPrimitiveBlock)
// Init vars
g.funcEntityNeeded = funcEntityNeeded
g.types = types
go func() {
// Decode
g.decode()
// Finish
close(g.stream)
g.lock.Unlock()
}()
}
func (g *Gosmonaut) decode() {
// Init debug vars
{
timeNow := time.Now()
g.timeStarted = timeNow
g.timeLast = timeNow
}
g.printDebugInfo("Decoding started")
// Scan relation dependencies
g.nodeIDTracker = newBitsetIDTracker()
g.wayIDTracker = newBitsetIDTracker()
if g.types.Get(RelationType) {
if err := g.scanRelationDependencies(); err != nil {
g.streamError(err)
return
}
g.printDebugInfo(fmt.Sprintf("Scanned relation dependencies [length: %d]", g.wayIDTracker.len()))
}
// Scan way dependencies
if g.types.Get(WayType) || g.wayIDTracker.len() != 0 {
if err := g.scanWayDependencies(); err != nil {
g.streamError(err)
return
}
g.printDebugInfo(fmt.Sprintf("Scanned way dependencies [length: %d]", g.nodeIDTracker.len()))
}
g.nodeCache = newBinaryNodeEntityMap(g.nodeIDTracker.len())
g.nodeTags = make(map[int64]OSMTags)
g.printDebugInfo("Created node cache")
// Scan nodes
if g.types.Get(NodeType) || g.nodeIDTracker.len() != 0 {
if err := g.scanNodes(); err != nil {
g.streamError(err)
return
}
g.printDebugInfo("Scanned nodes")
}
g.nodeIDTracker = nil
g.printDebugInfo("Deleted node ID tracker")
g.nodeCache.prepare()
g.printDebugInfo("Prepared node cache")
g.wayCache = newBinaryWayEntityMap(g.wayIDTracker.len())
g.wayTags = make(map[int64]OSMTags)
g.printDebugInfo("Created way cache")
// Scan ways
if g.types.Get(WayType) || g.wayIDTracker.len() != 0 {
if err := g.scanWays(); err != nil {
g.streamError(err)
return
}
g.printDebugInfo("Scanned ways")
}
g.wayIDTracker = nil
g.printDebugInfo("Deleted way ID tracker")
g.wayCache.prepare()
g.printDebugInfo("Prepared way cache")
// Scan relations
if g.types.Get(RelationType) {
if err := g.scanRelations(); err != nil {
g.streamError(err)
return
}
g.printDebugInfo("Scanned relations")
}
g.wayCache = nil
g.nodeCache = nil
g.wayTags = nil
g.nodeTags = nil
g.printDebugInfo("Deleted entity caches")
if g.debugMode {
fmt.Println("Elapsed time:", time.Since(g.timeStarted))
}
}
func (g *Gosmonaut) streamError(err error) {
g.stream <- osmPair{nil, err}
}
func (g *Gosmonaut) streamEntity(i OSMEntity) {
g.stream <- osmPair{i, nil}
}
// Next returns the next decoded entity (x)or an error.
// If the error is io.EOF the file has successfully been decoded.
// If the error is not EOF decoding has been stopped due to another error.
func (g *Gosmonaut) Next() (OSMEntity, error) {
p, ok := <-g.stream
if !ok {
return nil, io.EOF
}
return p.i, p.e
}
func (g *Gosmonaut) entityNeeded(t OSMType, tags OSMTags) bool {
if !g.types.Get(t) {
return false
}
return g.funcEntityNeeded(t, tags)
}
func (g *Gosmonaut) scanRelationDependencies() error {
return g.scan(RelationType, func(id int64, tags OSMTags, v entityParser) error {
// Needed by cache?
if !g.entityNeeded(RelationType, tags) {
return nil
}
// Get parser
d, ok := v.(relationParser)
if !ok {
return fmt.Errorf("got invalid relation parser (%T)", v)
}
// Add members to ID caches
ids, err := d.ids()
if err != nil {
return err
}
types, err := d.types()
if err != nil {
return err
}
if len(ids) != len(types) {
return errors.New("length of relation ids and types differs")
}
for i, id := range ids {
switch types[i] {
case WayType:
g.wayIDTracker.set(id)
case NodeType:
g.nodeIDTracker.set(id)
// We don't support sub-relations yet
}
}
return nil
})
}
func (g *Gosmonaut) scanWayDependencies() error {
return g.scan(WayType, func(id int64, tags OSMTags, v entityParser) error {
// Needed by cache or stream?
if !g.wayIDTracker.get(id) && !g.entityNeeded(WayType, tags) {
return nil
}
// Get parser
d, ok := v.(wayParser)
if !ok {
return fmt.Errorf("got invalid way parser (%T)", v)
}
// Add nodes to ID cache
refs, err := d.refs()
if err != nil {
return err
}
for _, id := range refs {
g.nodeIDTracker.set(id)
}
return nil
})
}
func (g *Gosmonaut) scanNodes() error {
return g.scan(NodeType, func(id int64, tags OSMTags, v entityParser) error {
// Needed by cache or stream?
rc, rs := g.nodeIDTracker.get(id), g.entityNeeded(NodeType, tags)
if !rc && !rs {
return nil
}
// Get parser
d, ok := v.(nodeParser)
if !ok {
return fmt.Errorf("got invalid node parser (%T)", v)
}
// Get properties
lat, lon, err := d.coords()
if err != nil {
return err
}
// Add to cache
if rc {
g.nodeCache.add(rawNode{
id: id,
lat: newCoordinate(lat),
lon: newCoordinate(lon),
})
// Add tags
if tags.Len() != 0 {
g.nodeTags[id] = tags
}
}
// Send to output stream
if rs {
g.streamEntity(Node{
ID: id,
Lat: lat,
Lon: lon,
Tags: tags,
})
}
return nil
})
}
func (g *Gosmonaut) scanWays() error {
return g.scan(WayType, func(id int64, tags OSMTags, v entityParser) error {
// Needed by cache or stream?
rc, rs := g.wayIDTracker.get(id), g.entityNeeded(WayType, tags)
if !rc && !rs {
return nil
}
// Get parser
d, ok := v.(wayParser)
if !ok {
return fmt.Errorf("got invalid way parser (%T)", v)
}
// Get properties
refs, err := d.refs()
if err != nil {
return err
}
// Add to cache
if rc {
g.wayCache.add(rawWay{
id: id,
refs: refs,
})
// Add tags
if tags.Len() != 0 {
g.wayTags[id] = tags
}
}
// Send to output stream
if rs {
nodes, err := g.uncacheNodes(refs)
if err != nil {
return err
}
g.streamEntity(Way{
ID: id,
Tags: tags,
Nodes: nodes,
})
}
return nil
})
}
func (g *Gosmonaut) scanRelations() error {
return g.scan(RelationType, func(id int64, tags OSMTags, v entityParser) error {
// Needed by stream?
if !g.entityNeeded(RelationType, tags) {
return nil
}
// Get parser
d, ok := v.(relationParser)
if !ok {
return fmt.Errorf("got invalid relation parser (%T)", v)
}
// Get properties
ids, err := d.ids()
if err != nil {
return err
}
types, err := d.types()
if err != nil {
return err
}
roles, err := d.roles()
if err != nil {
return err
}
n := len(ids)
if len(types) != n || len(roles) != n {
return errors.New("length of relation ids, roles and types differs")
}
// Build relation
members := make([]Member, 0, n)
for i, mid := range ids {
var e OSMEntity
switch types[i] {
case WayType:
if rw, ok := g.wayCache.get(mid); ok {
nodes, err := g.uncacheNodes(rw.refs)
if err != nil {
return err
}
e = Way{
ID: rw.id,
Tags: g.wayTags[rw.id],
Nodes: nodes,
}
} else {
g.printWarning(fmt.Sprintf("Way #%d in not in file for relation #%d", mid, id))
continue
}
case NodeType:
if n, err := g.uncacheNode(mid); err == nil {
e = n
} else {
g.printWarning(err.Error())
continue
}
default:
// We don't support sub-relations yet
g.printWarning(fmt.Sprintf("Skipping sub-relation #%d in relation #%d (not supported)", mid, id))
continue
}
members = append(members, Member{roles[i], e})
}
// Send to output stream
g.streamEntity(Relation{
ID: id,
Tags: tags,
Members: members,
})
return nil
})
}
func (g *Gosmonaut) scan(t OSMType,
receiver func(int64, OSMTags, entityParser) error,
) error {
if err := g.dec.Start(t); err != nil {
return err
}
// Decode file
for {
parsers, err := g.dec.nextPair()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
// Iterate parsers
for _, v := range parsers {
// Iterate entities
for {
id, tags, err := v.next()
if err == io.EOF {
break
} else if err != nil {
return err
}
// Send to receiver
if err := receiver(id, tags, v); err != nil {
return err
}
}
}
}
}
/* Cache */
func (g *Gosmonaut) uncacheNode(id int64) (Node, error) {
rn, ok := g.nodeCache.get(id)
if !ok {
return Node{}, fmt.Errorf("Node #%d in not in file", id)
}
return Node{
ID: rn.id,
Lat: rn.lat.float(),
Lon: rn.lon.float(),
Tags: g.nodeTags[id],
}, nil
}
func (g *Gosmonaut) uncacheNodes(refs []int64) ([]Node, error) {
nodes := make([]Node, len(refs))
for i, id := range refs {
if n, err := g.uncacheNode(id); err == nil {
nodes[i] = n
} else {
return nil, err
}
}
return nodes, nil
}
/* Debug Mode */
func (g *Gosmonaut) printWarning(warning string) {
if g.printWarnings {
fmt.Println("Warning:", warning)
}
}
func (g *Gosmonaut) printDebugInfo(state string) {
if !g.debugMode {
return
}
elapsed := time.Since(g.timeLast).Seconds()
// Run garbage collector
runtime.GC()
// Print memory stats
bToMb := func(b uint64) uint64 {
return b / 1024 / 1024
}
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("Alloc = %v MiB\tTotalAlloc = %v MiB\tSys = %v MiB\tNumGC = %v\n",
bToMb(m.Alloc),
bToMb(m.TotalAlloc),
bToMb(m.Sys),
m.NumGC,
)
// Print elapsed
fmt.Printf("%.4fs - %v\n", elapsed, state)
g.timeLast = time.Now()
}