From a902583f3807b5746c9f28c084a711e30ce0b612 Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Mon, 14 Nov 2022 15:12:53 +0000 Subject: [PATCH 01/11] Implemented tracer middleware class --- src/workflows/transport/middleware/tracer.py | 70 ++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 src/workflows/transport/middleware/tracer.py diff --git a/src/workflows/transport/middleware/tracer.py b/src/workflows/transport/middleware/tracer.py new file mode 100644 index 0000000..f05817e --- /dev/null +++ b/src/workflows/transport/middleware/tracer.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +import functools +from typing import Callable + +from . import BaseTransportMiddleware + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.trace import Status, StatusCode +from opentelemetry.sdk.trace.export import( + BatchSpanProcessor, + ConsoleSpanExporter, + ) +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.trace.propagation.tracecontext \ + import TraceContextTextMapPropagator +from opentelemetry.exporter.otlp.proto.http.trace_exporter \ + import OTLPSpanExporter + +resource = Resource(attributes={ + SERVICE_NAME: "Common Service" +}) + +processor = BatchSpanProcessor(OTLPSpanExporter( \ + endpoint="http://localhost:4318/v1/traces")) +provider = TracerProvider(resource = resource) +# A provider provides tracers: +provider = TracerProvider(resource=resource) +provider.add_span_processor(processor) +# In python trace is global: +trace.set_tracer_provider(provider) +tracer = trace.get_tracer(__name__) + +class TracerMiddleware(BaseTransportMiddleware): + def __init__(self, service_name: str): + self.service_name = service_name + + def _get_trace_context(self, message): + """If a trace context exists in the recipe wrapper's environment, get it""" + try: + #carrier = message['environment']['trace_context'] + carrier = message['trace_context'] + ctx = TraceContextTextMapPropagator().extract(carrier=carrier) + return ctx + except KeyError: + return {} + + def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: + print(f"call_next: {call_next}, channel: {channel}, callback: {callback}, kwargs: {kwargs}") + + @functools.wraps(callback) + def wrapped_callback(header, message): + print(f"wrapped_callback header: {header}, message: {message}") + + ctx = self._get_trace_context(message) + with tracer.start_as_current_span(self.service_name, context=ctx) as span: + if ctx == {}: + print("inserting trace_context into message") + message['trace_context'] = "foo" + return callback(header, message) + + return call_next(channel, wrapped_callback, **kwargs) + + def send(self, call_next: Callable, destination, message, **kwargs): + carrier = {} + TraceContextTextMapPropagator().inject(carrier) + message['trace_context'] = carrier + + call_next(destination, message, **kwargs) From 7cee3b57a02f9ad5241ef08af156e5258119a467 Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Mon, 14 Nov 2022 15:24:58 +0000 Subject: [PATCH 02/11] Refactored workflows.transport.middleware.tracer for readability --- src/workflows/transport/middleware/tracer.py | 29 ++++++++++---------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/workflows/transport/middleware/tracer.py b/src/workflows/transport/middleware/tracer.py index f05817e..ae09e8f 100644 --- a/src/workflows/transport/middleware/tracer.py +++ b/src/workflows/transport/middleware/tracer.py @@ -36,35 +36,36 @@ class TracerMiddleware(BaseTransportMiddleware): def __init__(self, service_name: str): self.service_name = service_name - def _get_trace_context(self, message): - """If a trace context exists in the recipe wrapper's environment, get it""" + def _extract_trace_context(self, message): + """Retrieves span context from message""" try: - #carrier = message['environment']['trace_context'] carrier = message['trace_context'] ctx = TraceContextTextMapPropagator().extract(carrier=carrier) + print(f"Extracting {ctx}") return ctx except KeyError: + print(f"Extracted nothing") return {} - def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: - print(f"call_next: {call_next}, channel: {channel}, callback: {callback}, kwargs: {kwargs}") + def _inject_trace_context(self, message): + """Inserts trace context into message""" + carrier = {} + TraceContextTextMapPropagator().inject(carrier) + message['trace_context'] = carrier + print(f"injecting {carrier}") + + def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: @functools.wraps(callback) def wrapped_callback(header, message): - print(f"wrapped_callback header: {header}, message: {message}") - - ctx = self._get_trace_context(message) + ctx = self._extract_trace_context(message) with tracer.start_as_current_span(self.service_name, context=ctx) as span: if ctx == {}: - print("inserting trace_context into message") - message['trace_context'] = "foo" + self._inject_trace_context(message) return callback(header, message) return call_next(channel, wrapped_callback, **kwargs) def send(self, call_next: Callable, destination, message, **kwargs): - carrier = {} - TraceContextTextMapPropagator().inject(carrier) - message['trace_context'] = carrier - + self._inject_trace_context(message) call_next(destination, message, **kwargs) From cb7f9c92d4ef2280fc7c1e5877ae0ebbe02aab1b Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Mon, 14 Nov 2022 15:31:19 +0000 Subject: [PATCH 03/11] Refactored tracer initialization into its own function --- src/workflows/transport/middleware/tracer.py | 33 +++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/workflows/transport/middleware/tracer.py b/src/workflows/transport/middleware/tracer.py index ae09e8f..d5b89dc 100644 --- a/src/workflows/transport/middleware/tracer.py +++ b/src/workflows/transport/middleware/tracer.py @@ -18,23 +18,26 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter \ import OTLPSpanExporter -resource = Resource(attributes={ - SERVICE_NAME: "Common Service" -}) - -processor = BatchSpanProcessor(OTLPSpanExporter( \ - endpoint="http://localhost:4318/v1/traces")) -provider = TracerProvider(resource = resource) -# A provider provides tracers: -provider = TracerProvider(resource=resource) -provider.add_span_processor(processor) -# In python trace is global: -trace.set_tracer_provider(provider) -tracer = trace.get_tracer(__name__) - class TracerMiddleware(BaseTransportMiddleware): def __init__(self, service_name: str): self.service_name = service_name + self._initiate_tracers(service_name) + + def _initiate_tracers(self, service_name): + resource = Resource(attributes={ + SERVICE_NAME: service_name + }) + + processor = BatchSpanProcessor(OTLPSpanExporter( \ + endpoint="http://localhost:4318/v1/traces")) + # A provider provides tracers: + provider = TracerProvider(resource=resource) + provider.add_span_processor(processor) + # In python trace is global: + trace.set_tracer_provider(provider) + self.tracer = trace.get_tracer(__name__) + + def _extract_trace_context(self, message): """Retrieves span context from message""" @@ -59,7 +62,7 @@ def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: @functools.wraps(callback) def wrapped_callback(header, message): ctx = self._extract_trace_context(message) - with tracer.start_as_current_span(self.service_name, context=ctx) as span: + with self.tracer.start_as_current_span(self.service_name, context=ctx) as span: if ctx == {}: self._inject_trace_context(message) return callback(header, message) From 5d151999f022f8a511c2d8f8f8a712e8d84b67f4 Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Mon, 14 Nov 2022 15:48:06 +0000 Subject: [PATCH 04/11] Added additional comments to workflows.transport.middleware.tracers --- src/workflows/transport/middleware/tracer.py | 27 ++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/workflows/transport/middleware/tracer.py b/src/workflows/transport/middleware/tracer.py index d5b89dc..35d0911 100644 --- a/src/workflows/transport/middleware/tracer.py +++ b/src/workflows/transport/middleware/tracer.py @@ -18,47 +18,52 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter \ import OTLPSpanExporter + class TracerMiddleware(BaseTransportMiddleware): def __init__(self, service_name: str): self.service_name = service_name self._initiate_tracers(service_name) def _initiate_tracers(self, service_name): + """Initiates everything needed for tracing""" + # Label this resource as its service: resource = Resource(attributes={ SERVICE_NAME: service_name }) - + # Export to OpenTelemetry Collector: processor = BatchSpanProcessor(OTLPSpanExporter( \ endpoint="http://localhost:4318/v1/traces")) # A provider provides tracers: provider = TracerProvider(resource=resource) provider.add_span_processor(processor) - # In python trace is global: + # A tracer provides traces: trace.set_tracer_provider(provider) self.tracer = trace.get_tracer(__name__) - - def _extract_trace_context(self, message): - """Retrieves span context from message""" + """Retrieves Context object from message""" try: + # Deserialise serialised context into a Context object: carrier = message['trace_context'] ctx = TraceContextTextMapPropagator().extract(carrier=carrier) - print(f"Extracting {ctx}") return ctx except KeyError: - print(f"Extracted nothing") + # If no context, leave empty: return {} def _inject_trace_context(self, message): - """Inserts trace context into message""" + """Inserts serialized trace context into message""" carrier = {} + # If called outside of a span context, just leave carrier empty + # (very safe!) TraceContextTextMapPropagator().inject(carrier) message['trace_context'] = carrier - print(f"injecting {carrier}") - def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: + """The callback includes 'everything' that happens in a service that + we care about, so we wrap it in a span context. + To link the current span context with others from the same request + we inject/extract the serialized trace context in the recipe message""" @functools.wraps(callback) def wrapped_callback(header, message): ctx = self._extract_trace_context(message) @@ -70,5 +75,7 @@ def wrapped_callback(header, message): return call_next(channel, wrapped_callback, **kwargs) def send(self, call_next: Callable, destination, message, **kwargs): + # Because send is usually called within a callback, it is inside a span + # context, so we can inject its trace context into the message: self._inject_trace_context(message) call_next(destination, message, **kwargs) From 0e496ed70d744fde78eabe578ea9ad67c6eff443 Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Mon, 14 Nov 2022 15:50:14 +0000 Subject: [PATCH 05/11] Added tracer middleware to services.common_service.py --- src/workflows/services/common_service.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index bf7f93c..9337685 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -203,6 +203,10 @@ def start_transport(self): else: self.log.debug("No transport layer defined for service. Skipping.") + from workflows.transport.middleware.tracer import TracerMiddleware + instrument = TracerMiddleware(self.__class__.__name__) + self._transport.add_middleware(instrument) + def stop_transport(self): """If a transport object has been defined then tear it down.""" if self.transport: From 5e3ff7c79108a175c6dbd65284023cce3b823418 Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Mon, 14 Nov 2022 15:55:40 +0000 Subject: [PATCH 06/11] Removed bad exception from _extract_trace_context() --- src/workflows/transport/middleware/tracer.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/workflows/transport/middleware/tracer.py b/src/workflows/transport/middleware/tracer.py index 35d0911..1343f72 100644 --- a/src/workflows/transport/middleware/tracer.py +++ b/src/workflows/transport/middleware/tracer.py @@ -42,14 +42,13 @@ def _initiate_tracers(self, service_name): def _extract_trace_context(self, message): """Retrieves Context object from message""" - try: + carrier = message.get('trace_context') + if carrier: # Deserialise serialised context into a Context object: - carrier = message['trace_context'] ctx = TraceContextTextMapPropagator().extract(carrier=carrier) return ctx - except KeyError: - # If no context, leave empty: - return {} + # If no context, leave empty: + return {} def _inject_trace_context(self, message): """Inserts serialized trace context into message""" From 43e6df81ced86fee4af4bc3c7d1104f30a211ab1 Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Mon, 14 Nov 2022 17:03:26 +0000 Subject: [PATCH 07/11] Added string checking to context injection --- src/workflows/transport/middleware/tracer.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/workflows/transport/middleware/tracer.py b/src/workflows/transport/middleware/tracer.py index 1343f72..90f4616 100644 --- a/src/workflows/transport/middleware/tracer.py +++ b/src/workflows/transport/middleware/tracer.py @@ -25,7 +25,7 @@ def __init__(self, service_name: str): self._initiate_tracers(service_name) def _initiate_tracers(self, service_name): - """Initiates everything needed for tracing""" + """Initiates everything needed for tracing.""" # Label this resource as its service: resource = Resource(attributes={ SERVICE_NAME: service_name @@ -41,7 +41,7 @@ def _initiate_tracers(self, service_name): self.tracer = trace.get_tracer(__name__) def _extract_trace_context(self, message): - """Retrieves Context object from message""" + """Retrieves Context object from message.""" carrier = message.get('trace_context') if carrier: # Deserialise serialised context into a Context object: @@ -51,7 +51,10 @@ def _extract_trace_context(self, message): return {} def _inject_trace_context(self, message): - """Inserts serialized trace context into message""" + """Inserts serialized trace context into message.""" + if type(message) == str: + print("Warning: string message received") + return carrier = {} # If called outside of a span context, just leave carrier empty # (very safe!) @@ -60,9 +63,9 @@ def _inject_trace_context(self, message): def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: """The callback includes 'everything' that happens in a service that - we care about, so we wrap it in a span context. - To link the current span context with others from the same request - we inject/extract the serialized trace context in the recipe message""" + we care about, so we wrap it in a span context. + To link the current span context with others from the same request + we inject/extract the serialized trace context in the recipe message.""" @functools.wraps(callback) def wrapped_callback(header, message): ctx = self._extract_trace_context(message) @@ -74,7 +77,7 @@ def wrapped_callback(header, message): return call_next(channel, wrapped_callback, **kwargs) def send(self, call_next: Callable, destination, message, **kwargs): - # Because send is usually called within a callback, it is inside a span - # context, so we can inject its trace context into the message: + """Because send is usually called within a callback, it is inside a span + context, so we can inject its trace context into the message.""" self._inject_trace_context(message) call_next(destination, message, **kwargs) From ada006aa6113a88625d010012d4fbc66a776314b Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Wed, 16 Nov 2022 16:16:04 +0000 Subject: [PATCH 08/11] Added span attributes to tracer middleware --- src/workflows/transport/middleware/tracer.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/workflows/transport/middleware/tracer.py b/src/workflows/transport/middleware/tracer.py index 90f4616..dcfab15 100644 --- a/src/workflows/transport/middleware/tracer.py +++ b/src/workflows/transport/middleware/tracer.py @@ -72,6 +72,12 @@ def wrapped_callback(header, message): with self.tracer.start_as_current_span(self.service_name, context=ctx) as span: if ctx == {}: self._inject_trace_context(message) + + # Insert header and message info: + for key, value in header.items(): + span.set_attribute(str(key), str(value)) + for key, value in message.items(): + span.set_attribute(str(key), str(value)) return callback(header, message) return call_next(channel, wrapped_callback, **kwargs) From 608f51e9147ba96a49ac9782b145b131fdc41ec9 Mon Sep 17 00:00:00 2001 From: Daniel Fernandes Date: Tue, 22 Nov 2022 17:03:21 +0000 Subject: [PATCH 09/11] Added basic logging for tracer.py --- src/workflows/transport/middleware/tracer.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/workflows/transport/middleware/tracer.py b/src/workflows/transport/middleware/tracer.py index dcfab15..6a45e2c 100644 --- a/src/workflows/transport/middleware/tracer.py +++ b/src/workflows/transport/middleware/tracer.py @@ -2,6 +2,7 @@ import functools from typing import Callable +import logging from . import BaseTransportMiddleware @@ -18,6 +19,7 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter \ import OTLPSpanExporter +logger = logging.getLogger(__name__) class TracerMiddleware(BaseTransportMiddleware): def __init__(self, service_name: str): @@ -39,6 +41,7 @@ def _initiate_tracers(self, service_name): # A tracer provides traces: trace.set_tracer_provider(provider) self.tracer = trace.get_tracer(__name__) + logger.info(f"initialized tracer as {service_name}") def _extract_trace_context(self, message): """Retrieves Context object from message.""" @@ -46,20 +49,23 @@ def _extract_trace_context(self, message): if carrier: # Deserialise serialised context into a Context object: ctx = TraceContextTextMapPropagator().extract(carrier=carrier) + logger.info(f"extracted trace context from {self.service_name}") return ctx # If no context, leave empty: + logger.warning(f"no context found for {self.service_name}, could not extract") return {} def _inject_trace_context(self, message): """Inserts serialized trace context into message.""" if type(message) == str: - print("Warning: string message received") + logger.warning(f"received string message in {self.service_name}, could not extract trace context") return carrier = {} # If called outside of a span context, just leave carrier empty # (very safe!) TraceContextTextMapPropagator().inject(carrier) message['trace_context'] = carrier + logger.info(f"injected trace context into {self.service_name}") def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: """The callback includes 'everything' that happens in a service that From e2c4532f660da745a421f028072fedee2e4d9ca1 Mon Sep 17 00:00:00 2001 From: Nicholas Devenish Date: Tue, 21 Jan 2025 16:49:05 +0000 Subject: [PATCH 10/11] Run pre-commits --- src/workflows/services/common_service.py | 1 + src/workflows/transport/middleware/tracer.py | 47 ++++++++++---------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index f927a79..4545071 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -204,6 +204,7 @@ def start_transport(self): self.log.debug("No transport layer defined for service. Skipping.") from workflows.transport.middleware.tracer import TracerMiddleware + instrument = TracerMiddleware(self.__class__.__name__) self._transport.add_middleware(instrument) diff --git a/src/workflows/transport/middleware/tracer.py b/src/workflows/transport/middleware/tracer.py index 6a45e2c..09f5fff 100644 --- a/src/workflows/transport/middleware/tracer.py +++ b/src/workflows/transport/middleware/tracer.py @@ -1,26 +1,23 @@ from __future__ import annotations import functools -from typing import Callable import logging - -from . import BaseTransportMiddleware +from typing import Callable from opentelemetry import trace -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.trace import Status, StatusCode -from opentelemetry.sdk.trace.export import( - BatchSpanProcessor, - ConsoleSpanExporter, - ) +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.trace.propagation.tracecontext \ - import TraceContextTextMapPropagator -from opentelemetry.exporter.otlp.proto.http.trace_exporter \ - import OTLPSpanExporter +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, +) +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + +from . import BaseTransportMiddleware logger = logging.getLogger(__name__) + class TracerMiddleware(BaseTransportMiddleware): def __init__(self, service_name: str): self.service_name = service_name @@ -29,12 +26,11 @@ def __init__(self, service_name: str): def _initiate_tracers(self, service_name): """Initiates everything needed for tracing.""" # Label this resource as its service: - resource = Resource(attributes={ - SERVICE_NAME: service_name - }) + resource = Resource(attributes={SERVICE_NAME: service_name}) # Export to OpenTelemetry Collector: - processor = BatchSpanProcessor(OTLPSpanExporter( \ - endpoint="http://localhost:4318/v1/traces")) + processor = BatchSpanProcessor( + OTLPSpanExporter(endpoint="http://localhost:4318/v1/traces") + ) # A provider provides tracers: provider = TracerProvider(resource=resource) provider.add_span_processor(processor) @@ -45,10 +41,10 @@ def _initiate_tracers(self, service_name): def _extract_trace_context(self, message): """Retrieves Context object from message.""" - carrier = message.get('trace_context') + carrier = message.get("trace_context") if carrier: # Deserialise serialised context into a Context object: - ctx = TraceContextTextMapPropagator().extract(carrier=carrier) + ctx = TraceContextTextMapPropagator().extract(carrier=carrier) logger.info(f"extracted trace context from {self.service_name}") return ctx # If no context, leave empty: @@ -58,13 +54,15 @@ def _extract_trace_context(self, message): def _inject_trace_context(self, message): """Inserts serialized trace context into message.""" if type(message) == str: - logger.warning(f"received string message in {self.service_name}, could not extract trace context") + logger.warning( + f"received string message in {self.service_name}, could not extract trace context" + ) return carrier = {} # If called outside of a span context, just leave carrier empty # (very safe!) TraceContextTextMapPropagator().inject(carrier) - message['trace_context'] = carrier + message["trace_context"] = carrier logger.info(f"injected trace context into {self.service_name}") def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: @@ -72,10 +70,13 @@ def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: we care about, so we wrap it in a span context. To link the current span context with others from the same request we inject/extract the serialized trace context in the recipe message.""" + @functools.wraps(callback) def wrapped_callback(header, message): ctx = self._extract_trace_context(message) - with self.tracer.start_as_current_span(self.service_name, context=ctx) as span: + with self.tracer.start_as_current_span( + self.service_name, context=ctx + ) as span: if ctx == {}: self._inject_trace_context(message) From daf57469eb80d4ab77cdc53ffd95f67dc326aac1 Mon Sep 17 00:00:00 2001 From: Nicholas Devenish Date: Tue, 21 Jan 2025 16:50:10 +0000 Subject: [PATCH 11/11] Fix lint --- src/workflows/transport/middleware/tracer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/workflows/transport/middleware/tracer.py b/src/workflows/transport/middleware/tracer.py index 09f5fff..5c1430d 100644 --- a/src/workflows/transport/middleware/tracer.py +++ b/src/workflows/transport/middleware/tracer.py @@ -51,9 +51,9 @@ def _extract_trace_context(self, message): logger.warning(f"no context found for {self.service_name}, could not extract") return {} - def _inject_trace_context(self, message): + def _inject_trace_context(self, message) -> None: """Inserts serialized trace context into message.""" - if type(message) == str: + if isinstance(message, str): logger.warning( f"received string message in {self.service_name}, could not extract trace context" )