diff --git a/pkg/formatters/event.go b/pkg/formatters/event.go index c0c904c4..f6df18d8 100644 --- a/pkg/formatters/event.go +++ b/pkg/formatters/event.go @@ -16,8 +16,6 @@ import ( flattener "github.com/karimra/go-map-flattener" "github.com/openconfig/gnmi/proto/gnmi" - - "github.com/openconfig/gnmic/pkg/path" ) // EventMsg represents a gNMI update message, @@ -41,58 +39,23 @@ func ResponseToEventMsgs(name string, rsp *gnmi.SubscribeResponse, meta map[stri if rsp == nil { return nil, nil } - evs := make([]*EventMsg, 0) + evs := make([]*EventMsg, 0, len(rsp.GetUpdate().GetUpdate())+len(rsp.GetUpdate().GetDelete())) switch rsp := rsp.Response.(type) { case *gnmi.SubscribeResponse_Update: namePrefix, prefixTags := tagsFromGNMIPath(rsp.Update.GetPrefix()) // notification updates - for _, upd := range rsp.Update.GetUpdate() { - e, err := updateToEvent(name, namePrefix, rsp.Update.Timestamp, upd, prefixTags) - if err != nil { - return nil, err - } - for k, v := range meta { - if k == "format" { - continue - } - if _, ok := e.Tags[k]; ok { - e.Tags[fmt.Sprintf("meta_%s", k)] = v - continue - } - e.Tags[k] = v - } - if (e != nil && e != &EventMsg{}) { - evs = append(evs, e) - } + uevs, err := updatesToEvent(name, namePrefix, rsp.Update.GetTimestamp(), rsp.Update.GetUpdate(), prefixTags, meta) + if err != nil { + return nil, err } + evs = append(evs, uevs...) // notification deletes - numDeletes := len(rsp.Update.GetDelete()) - if numDeletes > 0 { - e := &EventMsg{ - Name: name, - Timestamp: rsp.Update.GetTimestamp(), - Tags: make(map[string]string), - Deletes: make([]string, 0, numDeletes), - } - // build tags - for k, v := range prefixTags { - e.Tags[k] = v - } - for k, v := range meta { - if k == "format" { - continue - } - if _, ok := e.Tags[k]; ok { - e.Tags[fmt.Sprintf("meta_%s", k)] = v - continue - } - e.Tags[k] = v - } - // add paths - for _, del := range rsp.Update.GetDelete() { - e.Deletes = append(e.Deletes, path.GnmiPathToXPath(del, false)) + for _, del := range rsp.Update.GetDelete() { + e := deleteToEvent(name, namePrefix, rsp.Update.GetTimestamp(), del, prefixTags) + addMetaTags(e, meta) + if (e != nil && e != &EventMsg{}) { + evs = append(evs, e) } - evs = append(evs, e) } for _, ep := range eps { @@ -106,30 +69,31 @@ func GetResponseToEventMsgs(rsp *gnmi.GetResponse, meta map[string]string, eps . if rsp == nil { return nil, nil } - evs := make([]*EventMsg, 0) + evs := make([]*EventMsg, 0, len(rsp.GetNotification())) for _, notif := range rsp.GetNotification() { namePrefix, prefixTags := tagsFromGNMIPath(notif.GetPrefix()) - for _, upd := range notif.GetUpdate() { - e, err := updateToEvent("get-request", namePrefix, notif.GetTimestamp(), upd, prefixTags) - if err != nil { - return nil, err - } - for k, v := range meta { - if k == "format" { - continue - } - if _, ok := e.Tags[k]; ok { - e.Tags["meta:"+k] = v - continue - } - e.Tags[k] = v - } - if (e != nil && e != &EventMsg{}) { - evs = append(evs, e) - } + uevs, err := updatesToEvent("get-request", namePrefix, notif.GetTimestamp(), notif.GetUpdate(), prefixTags, meta) + if err != nil { + return nil, err } - for _, ep := range eps { - evs = ep.Apply(evs...) + evs = append(evs, uevs...) + } + for _, ep := range eps { + evs = ep.Apply(evs...) + } + return evs, nil +} + +func updatesToEvent(name, prefix string, ts int64, upds []*gnmi.Update, tags, meta map[string]string) ([]*EventMsg, error) { + evs := make([]*EventMsg, 0, len(upds)) + for _, upd := range upds { + e, err := updateToEvent(name, prefix, ts, upd, tags) + if err != nil { + return nil, err + } + addMetaTags(e, meta) + if (e != nil && e != &EventMsg{}) { + evs = append(evs, e) } } return evs, nil @@ -168,6 +132,35 @@ func updateToEvent(name, prefix string, ts int64, upd *gnmi.Update, tags map[str return e, nil } +func deleteToEvent(name, prefix string, ts int64, del *gnmi.Path, tags map[string]string) *EventMsg { + e := &EventMsg{ + Name: name, + Timestamp: ts, + Tags: make(map[string]string), + Deletes: make([]string, 0, 1), + } + for k, v := range tags { + e.Tags[k] = v + } + pathName, pTags := tagsFromGNMIPath(del) + psb := strings.Builder{} + psb.WriteString(strings.TrimRight(prefix, "/")) + psb.WriteString("/") + psb.WriteString(strings.TrimLeft(pathName, "/")) + pathName = psb.String() + for k, v := range pTags { + if vv, ok := e.Tags[k]; ok { + if v != vv { + e.Tags[fmt.Sprintf("%s_%s", pathName, k)] = v + } + continue + } + e.Tags[k] = v + } + e.Deletes = append(e.Deletes, pathName) + return e +} + // tagsFromGNMIPath returns a string representation of the gNMI path without keys, // as well as a map of the keys in the path. // the key map will also contain a target value if present in the gNMI path. @@ -393,3 +386,16 @@ func num64(n interface{}) interface{} { } return nil } + +func addMetaTags(e *EventMsg, meta map[string]string) { + for k, v := range meta { + if k == "format" { + continue + } + if _, ok := e.Tags[k]; ok { + e.Tags[fmt.Sprintf("meta_%s", k)] = v + continue + } + e.Tags[k] = v + } +} diff --git a/pkg/formatters/event_test.go b/pkg/formatters/event_test.go index 793e467c..f2f4f65b 100644 --- a/pkg/formatters/event_test.go +++ b/pkg/formatters/event_test.go @@ -12,7 +12,6 @@ import ( "fmt" "reflect" "testing" - "time" "github.com/google/go-cmp/cmp" "github.com/openconfig/gnmi/proto/gnmi" @@ -136,48 +135,6 @@ func TestFromMap(t *testing.T) { } } -func TestResponseToEventMsgs(t *testing.T) { - rsp := &gnmi.SubscribeResponse{ - Response: &gnmi.SubscribeResponse_Update{ - Update: &gnmi.Notification{ - Timestamp: time.Now().UnixNano(), - Prefix: &gnmi.Path{ - Elem: []*gnmi.PathElem{ - {Name: "a"}, - }, - }, - Update: []*gnmi.Update{ - { - Path: &gnmi.Path{ - Elem: []*gnmi.PathElem{ - {Name: "b", - Key: map[string]string{ - "k1": "v1", - }, - }, - {Name: "c", - Key: map[string]string{ - "k2": "v2", - }}, - }, - }, - Val: &gnmi.TypedValue{ - Value: &gnmi.TypedValue_StringVal{ - StringVal: "value", - }, - }, - }, - }, - }, - }, - } - evs, err := ResponseToEventMsgs("subname", rsp, map[string]string{"k1": "v0"}) - if err != nil { - t.Error(err) - } - t.Logf("%v", evs) -} - func TestTagsFromGNMIPath(t *testing.T) { type args struct { p *gnmi.Path @@ -394,3 +351,344 @@ func Test_getValueFlat(t *testing.T) { }) } } + +func TestResponseToEventMsgs(t *testing.T) { + type args struct { + name string + rsp *gnmi.SubscribeResponse + meta map[string]string + eps []EventProcessor + } + tests := []struct { + name string + args args + want []*EventMsg + wantErr bool + }{ + { + name: "sync_response", + args: args{ + name: "sub1", + rsp: &gnmi.SubscribeResponse{ + Response: &gnmi.SubscribeResponse_SyncResponse{ + SyncResponse: true, + }, + }, + }, + want: []*EventMsg{}, + wantErr: false, + }, + { + name: "single_update_ascii_value", + args: args{ + name: "sub1", + rsp: &gnmi.SubscribeResponse{ + Response: &gnmi.SubscribeResponse_Update{ + Update: &gnmi.Notification{ + Timestamp: 42, + Update: []*gnmi.Update{ + { + Path: &gnmi.Path{ + Elem: []*gnmi.PathElem{ + { + Name: "interface", + Key: map[string]string{ + "name": "ethernet-1/1", + }, + }, + {Name: "oper-state"}, + }, + }, + Val: &gnmi.TypedValue{ + Value: &gnmi.TypedValue_AsciiVal{AsciiVal: "up"}, + }, + }, + }, + }, + }, + }, + }, + want: []*EventMsg{ + { + Name: "sub1", + Timestamp: 42, + Tags: map[string]string{ + "interface_name": "ethernet-1/1", + }, + Values: map[string]interface{}{ + "/interface/oper-state": "up", + }, + }, + }, + wantErr: false, + }, + { + name: "single_update_string_json_value", + args: args{ + name: "sub1", + rsp: &gnmi.SubscribeResponse{ + Response: &gnmi.SubscribeResponse_Update{ + Update: &gnmi.Notification{ + Timestamp: 42, + Update: []*gnmi.Update{ + { + Path: &gnmi.Path{ + Elem: []*gnmi.PathElem{ + { + Name: "interface", + Key: map[string]string{ + "name": "ethernet-1/1", + }, + }, + {Name: "oper-state"}, + }, + }, + Val: &gnmi.TypedValue{ + Value: &gnmi.TypedValue_JsonVal{JsonVal: []byte("\"up\"")}, + }, + }, + }, + }, + }, + }, + }, + want: []*EventMsg{ + { + Name: "sub1", + Timestamp: 42, + Tags: map[string]string{ + "interface_name": "ethernet-1/1", + }, + Values: map[string]interface{}{ + "/interface/oper-state": "up", + }, + }, + }, + wantErr: false, + }, + { + name: "single_update_object_json_value", + args: args{ + name: "sub1", + rsp: &gnmi.SubscribeResponse{ + Response: &gnmi.SubscribeResponse_Update{ + Update: &gnmi.Notification{ + Timestamp: 42, + Update: []*gnmi.Update{ + { + Path: &gnmi.Path{ + Elem: []*gnmi.PathElem{ + { + Name: "interface", + Key: map[string]string{ + "name": "ethernet-1/1", + }, + }, + {Name: "statistics"}, + }, + }, + Val: &gnmi.TypedValue{ + Value: &gnmi.TypedValue_JsonVal{JsonVal: []byte(`{"in-octets":"10","out-octets":"11"}`)}, + }, + }, + }, + }, + }, + }, + }, + want: []*EventMsg{ + { + Name: "sub1", + Timestamp: 42, + Tags: map[string]string{ + "interface_name": "ethernet-1/1", + }, + Values: map[string]interface{}{ + "/interface/statistics/in-octets": "10", + "/interface/statistics/out-octets": "11", + }, + }, + }, + wantErr: false, + }, + { + name: "multiple_updates_single_ascii_values", + args: args{ + name: "sub1", + rsp: &gnmi.SubscribeResponse{ + Response: &gnmi.SubscribeResponse_Update{ + Update: &gnmi.Notification{ + Timestamp: 42, + Update: []*gnmi.Update{ + { + Path: &gnmi.Path{ + Elem: []*gnmi.PathElem{ + { + Name: "interface", + Key: map[string]string{ + "name": "ethernet-1/1", + }, + }, + {Name: "admin-state"}, + }, + }, + Val: &gnmi.TypedValue{ + Value: &gnmi.TypedValue_AsciiVal{AsciiVal: "enable"}, + }, + }, + { + Path: &gnmi.Path{ + Elem: []*gnmi.PathElem{ + { + Name: "interface", + Key: map[string]string{ + "name": "ethernet-1/1", + }, + }, + {Name: "oper-state"}, + }, + }, + Val: &gnmi.TypedValue{ + Value: &gnmi.TypedValue_AsciiVal{AsciiVal: "up"}, + }, + }, + }, + }, + }, + }, + }, + want: []*EventMsg{ + { + Name: "sub1", + Timestamp: 42, + Tags: map[string]string{ + "interface_name": "ethernet-1/1", + }, + Values: map[string]interface{}{ + "/interface/admin-state": "enable", + }, + }, + { + Name: "sub1", + Timestamp: 42, + Tags: map[string]string{ + "interface_name": "ethernet-1/1", + }, + Values: map[string]interface{}{ + "/interface/oper-state": "up", + }, + }, + }, + wantErr: false, + }, + { + name: "with_single_delete", + args: args{ + name: "sub1", + rsp: &gnmi.SubscribeResponse{ + Response: &gnmi.SubscribeResponse_Update{ + Update: &gnmi.Notification{ + Timestamp: 42, + Delete: []*gnmi.Path{ + { + Elem: []*gnmi.PathElem{ + { + Name: "interface", + Key: map[string]string{ + "name": "ethernet-1/1", + }, + }, + }, + }, + }, + }, + }, + }, + }, + want: []*EventMsg{ + { + Name: "sub1", + Timestamp: 42, + Tags: map[string]string{ + "interface_name": "ethernet-1/1", + }, + Deletes: []string{ + "/interface", + }, + }, + }, + wantErr: false, + }, + { + name: "with_2_deletes", + args: args{ + name: "sub1", + rsp: &gnmi.SubscribeResponse{ + Response: &gnmi.SubscribeResponse_Update{ + Update: &gnmi.Notification{ + Timestamp: 42, + Delete: []*gnmi.Path{ + { + Elem: []*gnmi.PathElem{ + { + Name: "interface", + Key: map[string]string{ + "name": "ethernet-1/1", + }, + }, + }, + }, + { + Elem: []*gnmi.PathElem{ + { + Name: "interface", + Key: map[string]string{ + "name": "ethernet-1/2", + }, + }, + }, + }, + }, + }, + }, + }, + }, + want: []*EventMsg{ + { + Name: "sub1", + Timestamp: 42, + Tags: map[string]string{ + "interface_name": "ethernet-1/1", + }, + Deletes: []string{ + "/interface", + }, + }, + { + Name: "sub1", + Timestamp: 42, + Tags: map[string]string{ + "interface_name": "ethernet-1/2", + }, + Deletes: []string{ + "/interface", + }, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ResponseToEventMsgs(tt.args.name, tt.args.rsp, tt.args.meta, tt.args.eps...) + if (err != nil) != tt.wantErr { + t.Errorf("ResponseToEventMsgs() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ResponseToEventMsgs() got = %v", got) + t.Errorf("ResponseToEventMsgs() want= %v", tt.want) + } + }) + } +}