From cc04928d6612c9cb7071a6a09e351238c22a5e8c Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Thu, 11 Jun 2020 14:53:19 +0200 Subject: [PATCH] ns: Refactor uplink metrics - Remove `uplink_unique_total` - Introduce `uplink_duplicates_total` - Introduce `uplink_processed_total` --- pkg/networkserver/grpc_gsns.go | 6 ++-- pkg/networkserver/observability.go | 55 +++++++++++++++++++----------- 2 files changed, 39 insertions(+), 22 deletions(-) diff --git a/pkg/networkserver/grpc_gsns.go b/pkg/networkserver/grpc_gsns.go index 8f9e8c29c7..8991284a22 100644 --- a/pkg/networkserver/grpc_gsns.go +++ b/pkg/networkserver/grpc_gsns.go @@ -904,9 +904,9 @@ func (ns *NetworkServer) handleDataUplink(ctx context.Context, up *ttnpb.UplinkM } if !ok { queuedEvents = append(queuedEvents, evtDropDataUplink(ctx, matched.Device.EndDeviceIdentifiers, errDuplicate)) + registerReceiveDuplicateUplink(ctx, up) return nil } - registerReceiveUniqueUplink(ctx, up) publishEvents(ctx, queuedEvents...) queuedEvents = nil @@ -1024,6 +1024,7 @@ func (ns *NetworkServer) handleDataUplink(ctx context.Context, up *ttnpb.UplinkM }) } queuedEvents = append(queuedEvents, evtProcessDataUplink(ctx, matched.Device.EndDeviceIdentifiers, up)) + registerProcessUplink(ctx, up) return nil } @@ -1159,9 +1160,9 @@ func (ns *NetworkServer) handleJoinRequest(ctx context.Context, up *ttnpb.Uplink } if !ok { queuedEvents = append(queuedEvents, evtDropJoinRequest(ctx, matched.EndDeviceIdentifiers, errDuplicate)) + registerReceiveDuplicateUplink(ctx, up) return nil } - registerReceiveUniqueUplink(ctx, up) devAddr := ns.newDevAddr(ctx, matched) for matched.Session != nil && devAddr.Equal(matched.Session.DevAddr) { @@ -1281,6 +1282,7 @@ func (ns *NetworkServer) handleJoinRequest(ctx context.Context, up *ttnpb.Uplink }, }) queuedEvents = append(queuedEvents, evtProcessJoinRequest(ctx, matched.EndDeviceIdentifiers, up)) + registerProcessUplink(ctx, up) return nil } diff --git a/pkg/networkserver/observability.go b/pkg/networkserver/observability.go index 03ecdc6cc0..f7b31d2db3 100644 --- a/pkg/networkserver/observability.go +++ b/pkg/networkserver/observability.go @@ -206,19 +206,19 @@ var nsMetrics = &messageMetrics{ }, []string{messageType}, ), - uplinkUniqueReceived: metrics.NewContextualCounterVec( + uplinkDuplicates: metrics.NewContextualCounterVec( prometheus.CounterOpts{ Subsystem: subsystem, - Name: "uplink_unique_received_total", - Help: "Total number of received unique uplinks (without duplicates)", + Name: "uplink_duplicates_total", + Help: "Total number of duplicate uplinks", }, []string{messageType}, ), - uplinkForwarded: metrics.NewContextualCounterVec( + uplinkProcessed: metrics.NewContextualCounterVec( prometheus.CounterOpts{ Subsystem: subsystem, - Name: "uplink_forwarded_total", - Help: "Total number of forwarded uplinks", + Name: "uplink_processed_total", + Help: "Total number of processed uplinks", }, []string{messageType}, ), @@ -230,6 +230,14 @@ var nsMetrics = &messageMetrics{ }, []string{messageType, "error"}, ), + uplinkForwarded: metrics.NewContextualCounterVec( + prometheus.CounterOpts{ + Subsystem: subsystem, + Name: "uplink_forwarded_total", + Help: "Total number of forwarded uplinks", + }, + []string{messageType}, + ), uplinkGateways: metrics.NewContextualHistogramVec( prometheus.HistogramOpts{ Subsystem: subsystem, @@ -246,16 +254,18 @@ func init() { } type messageMetrics struct { - uplinkReceived *metrics.ContextualCounterVec - uplinkUniqueReceived *metrics.ContextualCounterVec - uplinkForwarded *metrics.ContextualCounterVec - uplinkDropped *metrics.ContextualCounterVec - uplinkGateways *metrics.ContextualHistogramVec + uplinkReceived *metrics.ContextualCounterVec + uplinkDuplicates *metrics.ContextualCounterVec + uplinkProcessed *metrics.ContextualCounterVec + uplinkForwarded *metrics.ContextualCounterVec + uplinkDropped *metrics.ContextualCounterVec + uplinkGateways *metrics.ContextualHistogramVec } func (m messageMetrics) Describe(ch chan<- *prometheus.Desc) { m.uplinkReceived.Describe(ch) - m.uplinkUniqueReceived.Describe(ch) + m.uplinkDuplicates.Describe(ch) + m.uplinkProcessed.Describe(ch) m.uplinkForwarded.Describe(ch) m.uplinkDropped.Describe(ch) m.uplinkGateways.Describe(ch) @@ -263,7 +273,8 @@ func (m messageMetrics) Describe(ch chan<- *prometheus.Desc) { func (m messageMetrics) Collect(ch chan<- prometheus.Metric) { m.uplinkReceived.Collect(ch) - m.uplinkUniqueReceived.Collect(ch) + m.uplinkDuplicates.Collect(ch) + m.uplinkProcessed.Collect(ch) m.uplinkForwarded.Collect(ch) m.uplinkDropped.Collect(ch) m.uplinkGateways.Collect(ch) @@ -273,17 +284,16 @@ func uplinkMTypeLabel(mType ttnpb.MType) string { return strings.ToLower(mType.String()) } -func registerReceiveUniqueUplink(ctx context.Context, msg *ttnpb.UplinkMessage) { - nsMetrics.uplinkUniqueReceived.WithLabelValues(ctx, uplinkMTypeLabel(msg.Payload.MType)).Inc() -} - func registerReceiveUplink(ctx context.Context, msg *ttnpb.UplinkMessage) { nsMetrics.uplinkReceived.WithLabelValues(ctx, uplinkMTypeLabel(msg.Payload.MType)).Inc() } -func registerMergeMetadata(ctx context.Context, msg *ttnpb.UplinkMessage) { - gtwCount, _ := rxMetadataStats(ctx, msg.RxMetadata) - nsMetrics.uplinkGateways.WithLabelValues(ctx).Observe(float64(gtwCount)) +func registerReceiveDuplicateUplink(ctx context.Context, msg *ttnpb.UplinkMessage) { + nsMetrics.uplinkDuplicates.WithLabelValues(ctx, uplinkMTypeLabel(msg.Payload.MType)).Inc() +} + +func registerProcessUplink(ctx context.Context, msg *ttnpb.UplinkMessage) { + nsMetrics.uplinkProcessed.WithLabelValues(ctx, uplinkMTypeLabel(msg.Payload.MType)).Inc() } func registerForwardDataUplink(ctx context.Context, msg *ttnpb.ApplicationUplink) { @@ -305,3 +315,8 @@ func registerDropUplink(ctx context.Context, msg *ttnpb.UplinkMessage, err error } nsMetrics.uplinkDropped.WithLabelValues(ctx, uplinkMTypeLabel(msg.Payload.MType), cause).Inc() } + +func registerMergeMetadata(ctx context.Context, msg *ttnpb.UplinkMessage) { + gtwCount, _ := rxMetadataStats(ctx, msg.RxMetadata) + nsMetrics.uplinkGateways.WithLabelValues(ctx).Observe(float64(gtwCount)) +}