Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions 6 pkg/networkserver/grpc_gsns.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,9 +904,9 @@ func (ns *NetworkServer) handleDataUplink(ctx context.Context, up *ttnpb.UplinkM
}
if !ok {
queuedEvents = append(queuedEvents, evtDropDataUplink(ctx, matched.Device.EndDeviceIdentifiers, errDuplicate))
registerReceiveDuplicateUplink(ctx, up)
return nil
}
registerReceiveUniqueUplink(ctx, up)

publishEvents(ctx, queuedEvents...)
queuedEvents = nil
Expand Down Expand Up @@ -1024,6 +1024,7 @@ func (ns *NetworkServer) handleDataUplink(ctx context.Context, up *ttnpb.UplinkM
})
}
queuedEvents = append(queuedEvents, evtProcessDataUplink(ctx, matched.Device.EndDeviceIdentifiers, up))
registerProcessUplink(ctx, up)
return nil
}

Expand Down Expand Up @@ -1159,9 +1160,9 @@ func (ns *NetworkServer) handleJoinRequest(ctx context.Context, up *ttnpb.Uplink
}
if !ok {
queuedEvents = append(queuedEvents, evtDropJoinRequest(ctx, matched.EndDeviceIdentifiers, errDuplicate))
registerReceiveDuplicateUplink(ctx, up)
return nil
}
registerReceiveUniqueUplink(ctx, up)

devAddr := ns.newDevAddr(ctx, matched)
for matched.Session != nil && devAddr.Equal(matched.Session.DevAddr) {
Expand Down Expand Up @@ -1281,6 +1282,7 @@ func (ns *NetworkServer) handleJoinRequest(ctx context.Context, up *ttnpb.Uplink
},
})
queuedEvents = append(queuedEvents, evtProcessJoinRequest(ctx, matched.EndDeviceIdentifiers, up))
registerProcessUplink(ctx, up)
return nil
}

Expand Down
55 changes: 35 additions & 20 deletions 55 pkg/networkserver/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,19 +206,19 @@ var nsMetrics = &messageMetrics{
},
[]string{messageType},
),
uplinkUniqueReceived: metrics.NewContextualCounterVec(
uplinkDuplicates: metrics.NewContextualCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "uplink_unique_received_total",
Help: "Total number of received unique uplinks (without duplicates)",
Name: "uplink_duplicates_total",
Help: "Total number of duplicate uplinks",
},
[]string{messageType},
),
uplinkForwarded: metrics.NewContextualCounterVec(
uplinkProcessed: metrics.NewContextualCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "uplink_forwarded_total",
Help: "Total number of forwarded uplinks",
Name: "uplink_processed_total",
Help: "Total number of processed uplinks",
},
[]string{messageType},
),
Expand All @@ -230,6 +230,14 @@ var nsMetrics = &messageMetrics{
},
[]string{messageType, "error"},
),
uplinkForwarded: metrics.NewContextualCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "uplink_forwarded_total",
Help: "Total number of forwarded uplinks",
},
[]string{messageType},
),
uplinkGateways: metrics.NewContextualHistogramVec(
prometheus.HistogramOpts{
Subsystem: subsystem,
Expand All @@ -246,24 +254,27 @@ func init() {
}

type messageMetrics struct {
uplinkReceived *metrics.ContextualCounterVec
uplinkUniqueReceived *metrics.ContextualCounterVec
uplinkForwarded *metrics.ContextualCounterVec
uplinkDropped *metrics.ContextualCounterVec
uplinkGateways *metrics.ContextualHistogramVec
uplinkReceived *metrics.ContextualCounterVec
uplinkDuplicates *metrics.ContextualCounterVec
uplinkProcessed *metrics.ContextualCounterVec
uplinkForwarded *metrics.ContextualCounterVec
uplinkDropped *metrics.ContextualCounterVec
uplinkGateways *metrics.ContextualHistogramVec
}

func (m messageMetrics) Describe(ch chan<- *prometheus.Desc) {
m.uplinkReceived.Describe(ch)
m.uplinkUniqueReceived.Describe(ch)
m.uplinkDuplicates.Describe(ch)
m.uplinkProcessed.Describe(ch)
m.uplinkForwarded.Describe(ch)
m.uplinkDropped.Describe(ch)
m.uplinkGateways.Describe(ch)
}

func (m messageMetrics) Collect(ch chan<- prometheus.Metric) {
m.uplinkReceived.Collect(ch)
m.uplinkUniqueReceived.Collect(ch)
m.uplinkDuplicates.Collect(ch)
m.uplinkProcessed.Collect(ch)
m.uplinkForwarded.Collect(ch)
m.uplinkDropped.Collect(ch)
m.uplinkGateways.Collect(ch)
Expand All @@ -273,17 +284,16 @@ func uplinkMTypeLabel(mType ttnpb.MType) string {
return strings.ToLower(mType.String())
}

func registerReceiveUniqueUplink(ctx context.Context, msg *ttnpb.UplinkMessage) {
nsMetrics.uplinkUniqueReceived.WithLabelValues(ctx, uplinkMTypeLabel(msg.Payload.MType)).Inc()
}

func registerReceiveUplink(ctx context.Context, msg *ttnpb.UplinkMessage) {
nsMetrics.uplinkReceived.WithLabelValues(ctx, uplinkMTypeLabel(msg.Payload.MType)).Inc()
}

func registerMergeMetadata(ctx context.Context, msg *ttnpb.UplinkMessage) {
gtwCount, _ := rxMetadataStats(ctx, msg.RxMetadata)
nsMetrics.uplinkGateways.WithLabelValues(ctx).Observe(float64(gtwCount))
func registerReceiveDuplicateUplink(ctx context.Context, msg *ttnpb.UplinkMessage) {
nsMetrics.uplinkDuplicates.WithLabelValues(ctx, uplinkMTypeLabel(msg.Payload.MType)).Inc()
}

func registerProcessUplink(ctx context.Context, msg *ttnpb.UplinkMessage) {
nsMetrics.uplinkProcessed.WithLabelValues(ctx, uplinkMTypeLabel(msg.Payload.MType)).Inc()
}

func registerForwardDataUplink(ctx context.Context, msg *ttnpb.ApplicationUplink) {
Expand All @@ -305,3 +315,8 @@ func registerDropUplink(ctx context.Context, msg *ttnpb.UplinkMessage, err error
}
nsMetrics.uplinkDropped.WithLabelValues(ctx, uplinkMTypeLabel(msg.Payload.MType), cause).Inc()
}

func registerMergeMetadata(ctx context.Context, msg *ttnpb.UplinkMessage) {
gtwCount, _ := rxMetadataStats(ctx, msg.RxMetadata)
nsMetrics.uplinkGateways.WithLabelValues(ctx).Observe(float64(gtwCount))
}
Morty Proxy This is a proxified and sanitized view of the page, visit original site.