From 6acab90dc78a9201f0f54b2c4ac86ec33dcc1ebc Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 5 May 2026 14:05:29 +0200 Subject: [PATCH 1/7] feat(grpc): Support span streaming --- sentry_sdk/integrations/grpc/aio/client.py | 125 ++- sentry_sdk/integrations/grpc/aio/server.py | 71 +- sentry_sdk/integrations/grpc/client.py | 120 ++- sentry_sdk/integrations/grpc/server.py | 43 +- tests/integrations/grpc/test_grpc.py | 751 ++++++++++++++----- tests/integrations/grpc/test_grpc.py.err | 736 ++++++++++++++++++ tests/integrations/grpc/test_grpc_aio.py | 558 ++++++++++---- tests/integrations/grpc/test_grpc_aio.py.err | 557 ++++++++++++++ 8 files changed, 2520 insertions(+), 441 deletions(-) create mode 100644 tests/integrations/grpc/test_grpc.py.err create mode 100644 tests/integrations/grpc/test_grpc_aio.py.err diff --git a/sentry_sdk/integrations/grpc/aio/client.py b/sentry_sdk/integrations/grpc/aio/client.py index 2edad83aff..58ce4e4430 100644 --- a/sentry_sdk/integrations/grpc/aio/client.py +++ b/sentry_sdk/integrations/grpc/aio/client.py @@ -4,6 +4,7 @@ from sentry_sdk.consts import OP from sentry_sdk.integrations import DidNotEnable from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN +from sentry_sdk.tracing_utils import has_span_streaming_enabled try: from grpc.aio import ( @@ -49,23 +50,51 @@ async def intercept_unary_unary( ) -> "Union[UnaryUnaryCall, Message]": method = client_call_details.method - with sentry_sdk.start_span( - op=OP.GRPC_CLIENT, - name="unary unary call to %s" % method.decode(), - origin=SPAN_ORIGIN, - ) as span: - span.set_data("type", "unary unary") - span.set_data("method", method) - - client_call_details = self._update_client_call_details_metadata_from_scope( - client_call_details - ) - - response = await continuation(client_call_details, request) - status_code = await response.code() - span.set_data("code", status_code.name) - - return response + client = sentry_sdk.get_client() + span_streaming = has_span_streaming_enabled(client.options) + + if span_streaming: + with sentry_sdk.traces.start_span( + name="unary unary call to %s" % method.decode(), + attributes={ + "sentry.op": OP.GRPC_CLIENT, + "sentry.origin": SPAN_ORIGIN, + }, + ) as span: + span.set_attribute("type", "unary unary") + span.set_attribute("rpc.method", method.decode()) + + client_call_details = ( + self._update_client_call_details_metadata_from_scope( + client_call_details + ) + ) + + response = await continuation(client_call_details, request) + status_code = await response.code() + span.set_attribute("code", status_code.name) + + return response + else: + with sentry_sdk.start_span( + op=OP.GRPC_CLIENT, + name="unary unary call to %s" % method.decode(), + origin=SPAN_ORIGIN, + ) as span: + span.set_data("type", "unary unary") + span.set_data("method", method) + + client_call_details = ( + self._update_client_call_details_metadata_from_scope( + client_call_details + ) + ) + + response = await continuation(client_call_details, request) + status_code = await response.code() + span.set_data("code", status_code.name) + + return response class SentryUnaryStreamClientInterceptor( @@ -80,20 +109,48 @@ async def intercept_unary_stream( ) -> "Union[AsyncIterable[Any], UnaryStreamCall]": method = client_call_details.method - with sentry_sdk.start_span( - op=OP.GRPC_CLIENT, - name="unary stream call to %s" % method.decode(), - origin=SPAN_ORIGIN, - ) as span: - span.set_data("type", "unary stream") - span.set_data("method", method) - - client_call_details = self._update_client_call_details_metadata_from_scope( - client_call_details - ) - - response = await continuation(client_call_details, request) - # status_code = await response.code() - # span.set_data("code", status_code) - - return response + client = sentry_sdk.get_client() + span_streaming = has_span_streaming_enabled(client.options) + + if span_streaming: + with sentry_sdk.traces.start_span( + name="unary stream call to %s" % method.decode(), + attributes={ + "sentry.op": OP.GRPC_CLIENT, + "sentry.origin": SPAN_ORIGIN, + }, + ) as span: + span.set_attribute("type", "unary stream") + span.set_attribute("rpc.method", method.decode()) + + client_call_details = ( + self._update_client_call_details_metadata_from_scope( + client_call_details + ) + ) + + response = await continuation(client_call_details, request) + # status_code = await response.code() + # span.set_data("code", status_code) + + return response + else: + with sentry_sdk.start_span( + op=OP.GRPC_CLIENT, + name="unary stream call to %s" % method.decode(), + origin=SPAN_ORIGIN, + ) as span: + span.set_data("type", "unary stream") + span.set_data("method", method) + + client_call_details = ( + self._update_client_call_details_metadata_from_scope( + client_call_details + ) + ) + + response = await continuation(client_call_details, request) + # status_code = await response.code() + # span.set_data("code", status_code) + + return response diff --git a/sentry_sdk/integrations/grpc/aio/server.py b/sentry_sdk/integrations/grpc/aio/server.py index f77f14833f..3c5667685c 100644 --- a/sentry_sdk/integrations/grpc/aio/server.py +++ b/sentry_sdk/integrations/grpc/aio/server.py @@ -4,6 +4,7 @@ from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN from sentry_sdk.tracing import TransactionSource from sentry_sdk.utils import event_from_exception +from sentry_sdk.tracing_utils import has_span_streaming_enabled from typing import TYPE_CHECKING @@ -52,27 +53,55 @@ async def wrapped(request: "Any", context: "ServicerContext") -> "Any": if not name: return await handler(request, context) - # What if the headers are empty? - transaction = sentry_sdk.continue_trace( - dict(context.invocation_metadata()), - op=OP.GRPC_SERVER, - name=name, - source=TransactionSource.CUSTOM, - origin=SPAN_ORIGIN, - ) - - with sentry_sdk.start_transaction(transaction=transaction): - try: - return await handler.unary_unary(request, context) - except AbortError: - raise - except Exception as exc: - event, hint = event_from_exception( - exc, - mechanism={"type": "grpc", "handled": False}, - ) - sentry_sdk.capture_event(event, hint=hint) - raise + client = sentry_sdk.get_client() + span_streaming = has_span_streaming_enabled(client.options) + if span_streaming: + # What if the headers are empty? + sentry_sdk.traces.continue_trace( + dict(context.invocation_metadata()) + ) + + with sentry_sdk.traces.start_span( + name=name, + attributes={ + "sentry.op": OP.GRPC_SERVER, + "sentry.span.source": TransactionSource.CUSTOM, + "sentry.origin": SPAN_ORIGIN, + }, + ): + try: + return await handler.unary_unary(request, context) + except AbortError: + raise + except Exception as exc: + event, hint = event_from_exception( + exc, + mechanism={"type": "grpc", "handled": False}, + ) + sentry_sdk.capture_event(event, hint=hint) + raise + else: + # What if the headers are empty? + transaction = sentry_sdk.continue_trace( + dict(context.invocation_metadata()), + op=OP.GRPC_SERVER, + name=name, + source=TransactionSource.CUSTOM, + origin=SPAN_ORIGIN, + ) + + with sentry_sdk.start_transaction(transaction=transaction): + try: + return await handler.unary_unary(request, context) + except AbortError: + raise + except Exception as exc: + event, hint = event_from_exception( + exc, + mechanism={"type": "grpc", "handled": False}, + ) + sentry_sdk.capture_event(event, hint=hint) + raise elif not handler.request_streaming and handler.response_streaming: handler_factory = grpc.unary_stream_rpc_method_handler diff --git a/sentry_sdk/integrations/grpc/client.py b/sentry_sdk/integrations/grpc/client.py index b6cbc54f10..ef2710af5c 100644 --- a/sentry_sdk/integrations/grpc/client.py +++ b/sentry_sdk/integrations/grpc/client.py @@ -2,6 +2,7 @@ from sentry_sdk.consts import OP from sentry_sdk.integrations import DidNotEnable from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN +from sentry_sdk.tracing_utils import has_span_streaming_enabled from typing import TYPE_CHECKING @@ -30,22 +31,48 @@ def intercept_unary_unary( ) -> "_UnaryOutcome": method = client_call_details.method - with sentry_sdk.start_span( - op=OP.GRPC_CLIENT, - name="unary unary call to %s" % method, - origin=SPAN_ORIGIN, - ) as span: - span.set_data("type", "unary unary") - span.set_data("method", method) - - client_call_details = self._update_client_call_details_metadata_from_scope( - client_call_details - ) - - response = continuation(client_call_details, request) - span.set_data("code", response.code().name) - - return response + client = sentry_sdk.get_client() + span_streaming = has_span_streaming_enabled(client.options) + if span_streaming: + with sentry_sdk.traces.start_span( + name="unary unary call to %s" % method, + attributes={ + "sentry.op": OP.GRPC_CLIENT, + "sentry.origin": SPAN_ORIGIN, + }, + ) as span: + span.set_attribute("type", "unary unary") + span.set_attribute("method", method) + + client_call_details = ( + self._update_client_call_details_metadata_from_scope( + client_call_details + ) + ) + + response = continuation(client_call_details, request) + span.set_attribute("code", response.code().name) + + return response + else: + with sentry_sdk.start_span( + op=OP.GRPC_CLIENT, + name="unary unary call to %s" % method, + origin=SPAN_ORIGIN, + ) as span: + span.set_data("type", "unary unary") + span.set_data("method", method) + + client_call_details = ( + self._update_client_call_details_metadata_from_scope( + client_call_details + ) + ) + + response = continuation(client_call_details, request) + span.set_data("code", response.code().name) + + return response def intercept_unary_stream( self: "ClientInterceptor", @@ -55,23 +82,50 @@ def intercept_unary_stream( ) -> "Union[Iterator[Message], Call]": method = client_call_details.method - with sentry_sdk.start_span( - op=OP.GRPC_CLIENT, - name="unary stream call to %s" % method, - origin=SPAN_ORIGIN, - ) as span: - span.set_data("type", "unary stream") - span.set_data("method", method) - - client_call_details = self._update_client_call_details_metadata_from_scope( - client_call_details - ) - - response: "UnaryStreamCall" = continuation(client_call_details, request) - # Setting code on unary-stream leads to execution getting stuck - # span.set_data("code", response.code().name) - - return response + client = sentry_sdk.get_client() + span_streaming = has_span_streaming_enabled(client.options) + if span_streaming: + with sentry_sdk.traces.start_span( + name="unary stream call to %s" % method, + attributes={ + "sentry.op": OP.GRPC_CLIENT, + "sentry.origin": SPAN_ORIGIN, + }, + ) as span: + span.set_attribute("type", "unary stream") + span.set_attribute("method", method) + + client_call_details = ( + self._update_client_call_details_metadata_from_scope( + client_call_details + ) + ) + + response: "UnaryStreamCall" = continuation(client_call_details, request) + # Setting code on unary-stream leads to execution getting stuck + # span.set_data("code", response.code().name) + + return response + else: + with sentry_sdk.start_span( + op=OP.GRPC_CLIENT, + name="unary stream call to %s" % method, + origin=SPAN_ORIGIN, + ) as span: + span.set_data("type", "unary stream") + span.set_data("method", method) + + client_call_details = ( + self._update_client_call_details_metadata_from_scope( + client_call_details + ) + ) + + response: "UnaryStreamCall" = continuation(client_call_details, request) + # Setting code on unary-stream leads to execution getting stuck + # span.set_data("code", response.code().name) + + return response @staticmethod def _update_client_call_details_metadata_from_scope( diff --git a/sentry_sdk/integrations/grpc/server.py b/sentry_sdk/integrations/grpc/server.py index af01f37fb2..94c9f900d1 100644 --- a/sentry_sdk/integrations/grpc/server.py +++ b/sentry_sdk/integrations/grpc/server.py @@ -3,6 +3,7 @@ from sentry_sdk.integrations import DidNotEnable from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN from sentry_sdk.tracing import TransactionSource +from sentry_sdk.tracing_utils import has_span_streaming_enabled from typing import TYPE_CHECKING @@ -45,19 +46,37 @@ def behavior(request: "Message", context: "ServicerContext") -> "Message": if name: metadata = dict(context.invocation_metadata()) - transaction = sentry_sdk.continue_trace( - metadata, - op=OP.GRPC_SERVER, - name=name, - source=TransactionSource.CUSTOM, - origin=SPAN_ORIGIN, - ) + client = sentry_sdk.get_client() + span_streaming = has_span_streaming_enabled(client.options) + if span_streaming: + sentry_sdk.traces.continue_trace(metadata) - with sentry_sdk.start_transaction(transaction=transaction): - try: - return handler.unary_unary(request, context) - except BaseException as e: - raise e + with sentry_sdk.traces.start_span( + name=name, + attributes={ + "sentry.op": OP.GRPC_SERVER, + "sentry.span.source": TransactionSource.CUSTOM, + "sentry.origin": SPAN_ORIGIN, + }, + ): + try: + return handler.unary_unary(request, context) + except BaseException as e: + raise e + else: + transaction = sentry_sdk.continue_trace( + metadata, + op=OP.GRPC_SERVER, + name=name, + source=TransactionSource.CUSTOM, + origin=SPAN_ORIGIN, + ) + + with sentry_sdk.start_transaction(transaction=transaction): + try: + return handler.unary_unary(request, context) + except BaseException as e: + raise e else: return handler.unary_unary(request, context) diff --git a/tests/integrations/grpc/test_grpc.py b/tests/integrations/grpc/test_grpc.py index 25436d9feb..92a6c195e7 100644 --- a/tests/integrations/grpc/test_grpc.py +++ b/tests/integrations/grpc/test_grpc.py @@ -3,9 +3,13 @@ from concurrent import futures from typing import List, Optional, Tuple + +from unittest import mock from unittest.mock import Mock +import sentry_sdk from sentry_sdk import start_span, start_transaction +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.consts import OP from sentry_sdk.integrations.grpc import GRPCIntegration from sentry_sdk.integrations.grpc.client import ClientInterceptor @@ -51,35 +55,74 @@ def _tear_down(server: grpc.Server): @pytest.mark.forked -def test_grpc_server_starts_transaction(sentry_init, capture_events_forksafe): - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_grpc_server_starts_transaction( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + sentry_init( + traces_sample_rate=1.0, + integrations=[GRPCIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) server, channel = _set_up() # Use the provided channel stub = gRPCTestServiceStub(channel) - stub.TestServe(gRPCTestMessage(text="test")) - _tear_down(server=server) + if span_streaming: + items = capture_items_forksafe("span") + + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) - events.write_file.close() - event = events.read_event() - span = event["spans"][0] + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[0] - assert event["type"] == "transaction" - assert event["transaction_info"] == { - "source": "custom", - } - assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER - assert span["op"] == "test" + assert spans[1]["attributes"]["sentry.span.source"] == "custom" + assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER + assert span["attributes"]["sentry.op"] == "test" + else: + events = capture_events_forksafe() + + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + events.write_file.close() + event = events.read_event() + span = event["spans"][0] + + assert event["type"] == "transaction" + assert event["transaction_info"] == { + "source": "custom", + } + assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER + assert span["op"] == "test" @pytest.mark.forked -def test_grpc_server_other_interceptors(sentry_init, capture_events_forksafe): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_grpc_server_other_interceptors( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): """Ensure compatibility with additional server interceptors.""" - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() + sentry_init( + traces_sample_rate=1.0, + integrations=[GRPCIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) + mock_intercept = lambda continuation, handler_call_details: continuation( handler_call_details ) @@ -90,135 +133,304 @@ def test_grpc_server_other_interceptors(sentry_init, capture_events_forksafe): # Use the provided channel stub = gRPCTestServiceStub(channel) - stub.TestServe(gRPCTestMessage(text="test")) - _tear_down(server=server) + if span_streaming: + items = capture_items_forksafe("span") + + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + mock_interceptor.intercept_service.assert_called_once() + + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[0] - mock_interceptor.intercept_service.assert_called_once() + assert spans[1]["attributes"]["sentry.span.source"] == "custom" + assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER + assert span["attributes"]["sentry.op"] == "test" + else: + events = capture_events_forksafe() - events.write_file.close() - event = events.read_event() - span = event["spans"][0] + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + mock_interceptor.intercept_service.assert_called_once() + + events.write_file.close() + event = events.read_event() + span = event["spans"][0] - assert event["type"] == "transaction" - assert event["transaction_info"] == { - "source": "custom", - } - assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER - assert span["op"] == "test" + assert event["type"] == "transaction" + assert event["transaction_info"] == { + "source": "custom", + } + assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER + assert span["op"] == "test" @pytest.mark.forked -def test_grpc_server_continues_transaction(sentry_init, capture_events_forksafe): - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_grpc_server_continues_transaction( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + sentry_init( + traces_sample_rate=1.0, + integrations=[GRPCIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) server, channel = _set_up() # Use the provided channel stub = gRPCTestServiceStub(channel) - with start_transaction() as transaction: - metadata = ( - ( - "baggage", - "sentry-trace_id={trace_id},sentry-environment=test," - "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( - trace_id=transaction.trace_id + if span_streaming: + items = capture_items_forksafe("span") + + with sentry_sdk.traces.start_span(name="custom parent") as segment_span: + metadata = ( + ( + "baggage", + "sentry-trace_id={trace_id},sentry-environment=test," + "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( + trace_id=segment_span.trace_id + ), ), - ), - ( - "sentry-trace", - "{trace_id}-{parent_span_id}-{sampled}".format( - trace_id=transaction.trace_id, - parent_span_id=transaction.span_id, - sampled=1, + ( + "sentry-trace", + "{trace_id}-{parent_span_id}-{sampled}".format( + trace_id=segment_span.trace_id, + parent_span_id=segment_span.span_id, + sampled=1, + ), ), - ), - ) - stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) + ) + + stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) + + _tear_down(server=server) + + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[0] + + assert spans[1]["attributes"]["sentry.span.source"] == "custom" + assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER + assert spans[1]["trace_id"] == segment_span.trace_id + assert span["attributes"]["sentry.op"] == "test" + else: + events = capture_events_forksafe() + + with start_transaction() as transaction: + metadata = ( + ( + "baggage", + "sentry-trace_id={trace_id},sentry-environment=test," + "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( + trace_id=transaction.trace_id + ), + ), + ( + "sentry-trace", + "{trace_id}-{parent_span_id}-{sampled}".format( + trace_id=transaction.trace_id, + parent_span_id=transaction.span_id, + sampled=1, + ), + ), + ) - _tear_down(server=server) + stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) - events.write_file.close() - event = events.read_event() - span = event["spans"][0] + _tear_down(server=server) - assert event["type"] == "transaction" - assert event["transaction_info"] == { - "source": "custom", - } - assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER - assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert span["op"] == "test" + events.write_file.close() + event = events.read_event() + span = event["spans"][0] + + assert event["type"] == "transaction" + assert event["transaction_info"] == { + "source": "custom", + } + assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER + assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert span["op"] == "test" @pytest.mark.forked -def test_grpc_client_starts_span(sentry_init, capture_events_forksafe): - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_grpc_client_starts_span( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + sentry_init( + traces_sample_rate=1.0, + integrations=[GRPCIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) server, channel = _set_up() # Use the provided channel stub = gRPCTestServiceStub(channel) - with start_transaction(): - stub.TestServe(gRPCTestMessage(text="test")) + if span_streaming: + items = capture_items_forksafe("span") - _tear_down(server=server) + with sentry_sdk.traces.start_span(name="custom parent"): + stub.TestServe(gRPCTestMessage(text="test")) - events.write_file.close() - events.read_event() - local_transaction = events.read_event() - span = local_transaction["spans"][0] + _tear_down(server=server) - assert len(local_transaction["spans"]) == 1 - assert span["op"] == OP.GRPC_CLIENT - assert ( - span["description"] - == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" - ) - assert span["data"] == ApproxDict( - { - "type": "unary unary", - "method": "/grpc_test_server.gRPCTestService/TestServe", - "code": "OK", - } - ) + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[2] + + assert len(spans) == 4 + assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT + assert ( + span["name"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["attributes"] == ApproxDict( + { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "sentry.environment": mock.ANY, + "sentry.op": "grpc.client", + "sentry.origin": "auto.grpc.grpc", + "sentry.release": mock.ANY, + "sentry.sdk.name": "sentry.python", + "sentry.sdk.version": mock.ANY, + "sentry.segment.id": mock.ANY, + "sentry.segment.name": "custom parent", + "server.address": mock.ANY, + "thread.id": mock.ANY, + "thread.name": mock.ANY, + "code": "OK", + } + ) + else: + events = capture_events_forksafe() + + with start_transaction(): + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + events.write_file.close() + events.read_event() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["data"] == ApproxDict( + { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "code": "OK", + } + ) @pytest.mark.forked -def test_grpc_client_unary_stream_starts_span(sentry_init, capture_events_forksafe): - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_grpc_client_unary_stream_starts_span( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + sentry_init( + traces_sample_rate=1.0, + integrations=[GRPCIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) server, channel = _set_up() # Use the provided channel stub = gRPCTestServiceStub(channel) - with start_transaction(): - [el for el in stub.TestUnaryStream(gRPCTestMessage(text="test"))] + if span_streaming: + items = capture_items_forksafe("span") - _tear_down(server=server) + with sentry_sdk.traces.start_span(name="custom parent"): + [el for el in stub.TestUnaryStream(gRPCTestMessage(text="test"))] - events.write_file.close() - local_transaction = events.read_event() - span = local_transaction["spans"][0] + _tear_down(server=server) - assert len(local_transaction["spans"]) == 1 - assert span["op"] == OP.GRPC_CLIENT - assert ( - span["description"] - == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" - ) - assert span["data"] == ApproxDict( - { - "type": "unary stream", - "method": "/grpc_test_server.gRPCTestService/TestUnaryStream", - } - ) + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[0] + + assert len(spans) == 2 + assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT + assert ( + span["name"] + == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" + ) + assert span["attributes"] == ApproxDict( + { + "type": "unary stream", + "method": "/grpc_test_server.gRPCTestService/TestUnaryStream", + "sentry.environment": mock.ANY, + "sentry.op": "grpc.client", + "sentry.origin": "auto.grpc.grpc", + "sentry.release": mock.ANY, + "sentry.sdk.name": "sentry.python", + "sentry.sdk.version": mock.ANY, + "sentry.segment.id": mock.ANY, + "sentry.segment.name": "custom parent", + "server.address": mock.ANY, + "thread.id": mock.ANY, + "thread.name": mock.ANY, + } + ) + else: + events = capture_events_forksafe() + with start_transaction(): + [el for el in stub.TestUnaryStream(gRPCTestMessage(text="test"))] + + _tear_down(server=server) + + events.write_file.close() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" + ) + assert span["data"] == ApproxDict( + { + "type": "unary stream", + "method": "/grpc_test_server.gRPCTestService/TestUnaryStream", + } + ) # using unittest.mock.Mock not possible because grpc verifies @@ -232,10 +444,19 @@ def intercept_unary_unary(self, continuation, client_call_details, request): @pytest.mark.forked -def test_grpc_client_other_interceptor(sentry_init, capture_events_forksafe): +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_grpc_client_other_interceptor( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): """Ensure compatibility with additional client interceptors.""" - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() + sentry_init( + traces_sample_rate=1.0, + integrations=[GRPCIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) server, channel = _set_up() @@ -243,37 +464,89 @@ def test_grpc_client_other_interceptor(sentry_init, capture_events_forksafe): channel = grpc.intercept_channel(channel, MockClientInterceptor()) stub = gRPCTestServiceStub(channel) - with start_transaction(): - stub.TestServe(gRPCTestMessage(text="test")) + if span_streaming: + items = capture_items_forksafe("span") - _tear_down(server=server) + with sentry_sdk.traces.start_span(name="custom parent"): + stub.TestServe(gRPCTestMessage(text="test")) - assert MockClientInterceptor.call_counter == 1 + _tear_down(server=server) - events.write_file.close() - events.read_event() - local_transaction = events.read_event() - span = local_transaction["spans"][0] + assert MockClientInterceptor.call_counter == 1 - assert len(local_transaction["spans"]) == 1 - assert span["op"] == OP.GRPC_CLIENT - assert ( - span["description"] - == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" - ) - assert span["data"] == ApproxDict( - { - "type": "unary unary", - "method": "/grpc_test_server.gRPCTestService/TestServe", - "code": "OK", - } - ) + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[2] + + assert len(spans) == 4 + assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT + assert ( + span["name"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["attributes"] == ApproxDict( + { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "sentry.environment": mock.ANY, + "sentry.op": "grpc.client", + "sentry.origin": "auto.grpc.grpc", + "sentry.release": mock.ANY, + "sentry.sdk.name": "sentry.python", + "sentry.sdk.version": mock.ANY, + "sentry.segment.id": mock.ANY, + "sentry.segment.name": "custom parent", + "server.address": mock.ANY, + "thread.id": mock.ANY, + "thread.name": mock.ANY, + "code": "OK", + } + ) + else: + events = capture_events_forksafe() + + with start_transaction(): + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + assert MockClientInterceptor.call_counter == 1 + + events.write_file.close() + events.read_event() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["data"] == ApproxDict( + { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "code": "OK", + } + ) @pytest.mark.forked -def test_prevent_dual_client_interceptor(sentry_init, capture_events_forksafe): - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_prevent_dual_client_interceptor( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + sentry_init( + traces_sample_rate=1.0, + integrations=[GRPCIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) server, channel = _set_up() @@ -281,56 +554,111 @@ def test_prevent_dual_client_interceptor(sentry_init, capture_events_forksafe): channel = grpc.intercept_channel(channel, ClientInterceptor()) stub = gRPCTestServiceStub(channel) - with start_transaction(): - stub.TestServe(gRPCTestMessage(text="test")) + if span_streaming: + items = capture_items_forksafe("span") - _tear_down(server=server) + with sentry_sdk.traces.start_span(name="custom parent"): + stub.TestServe(gRPCTestMessage(text="test")) - events.write_file.close() - events.read_event() - local_transaction = events.read_event() - span = local_transaction["spans"][0] + _tear_down(server=server) - assert len(local_transaction["spans"]) == 1 - assert span["op"] == OP.GRPC_CLIENT - assert ( - span["description"] - == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" - ) - assert span["data"] == ApproxDict( - { - "type": "unary unary", - "method": "/grpc_test_server.gRPCTestService/TestServe", - "code": "OK", - } - ) + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[2] + + assert len(spans) == 4 + assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT + assert ( + span["name"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["attributes"] == ApproxDict( + { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "code": "OK", + } + ) + else: + events = capture_events_forksafe() + + with start_transaction(): + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + events.write_file.close() + events.read_event() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["data"] == ApproxDict( + { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "code": "OK", + } + ) @pytest.mark.forked +@pytest.mark.parametrize("span_streaming", [True, False]) def test_grpc_client_and_servers_interceptors_integration( - sentry_init, capture_events_forksafe + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, ): - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() + sentry_init( + traces_sample_rate=1.0, + integrations=[GRPCIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) server, channel = _set_up() # Use the provided channel stub = gRPCTestServiceStub(channel) - with start_transaction(): - stub.TestServe(gRPCTestMessage(text="test")) + if span_streaming: + items = capture_items_forksafe("span") - _tear_down(server=server) + with sentry_sdk.traces.start_span(name="custom parent"): + stub.TestServe(gRPCTestMessage(text="test")) - events.write_file.close() - server_transaction = events.read_event() - local_transaction = events.read_event() + _tear_down(server=server) - assert ( - server_transaction["contexts"]["trace"]["trace_id"] - == local_transaction["contexts"]["trace"]["trace_id"] - ) + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + + spans = [item["payload"] for item in items if item["type"] == "span"] + + assert spans[0]["trace_id"] == spans[1]["trace_id"] + else: + events = capture_events_forksafe() + + with start_transaction(): + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + events.write_file.close() + server_transaction = events.read_event() + local_transaction = events.read_event() + + assert ( + server_transaction["contexts"]["trace"]["trace_id"] + == local_transaction["contexts"]["trace"]["trace_id"] + ) @pytest.mark.forked @@ -365,35 +693,69 @@ def test_stream_unary(sentry_init): @pytest.mark.forked -def test_span_origin(sentry_init, capture_events_forksafe): - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_span_origin( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + sentry_init( + traces_sample_rate=1.0, + integrations=[GRPCIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) server, channel = _set_up() # Use the provided channel stub = gRPCTestServiceStub(channel) - with start_transaction(name="custom_transaction"): - stub.TestServe(gRPCTestMessage(text="test")) + if span_streaming: + items = capture_items_forksafe("span") - _tear_down(server=server) + with sentry_sdk.traces.start_span(name="custom parent"): + stub.TestServe(gRPCTestMessage(text="test")) - events.write_file.close() + _tear_down(server=server) - transaction_from_integration = events.read_event() - custom_transaction = events.read_event() + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() - assert ( - transaction_from_integration["contexts"]["trace"]["origin"] == "auto.grpc.grpc" - ) - assert ( - transaction_from_integration["spans"][0]["origin"] - == "auto.grpc.grpc.TestService" - ) # manually created in TestService, not the instrumentation + spans = [item["payload"] for item in items if item["type"] == "span"] + + assert spans[1]["attributes"]["sentry.origin"] == "auto.grpc.grpc" + assert ( + spans[0]["attributes"]["sentry.origin"] == "auto.grpc.grpc.TestService" + ) # manually created in TestService, not the instrumentation + + assert spans[3]["attributes"]["sentry.origin"] == "manual" + assert spans[2]["attributes"]["sentry.origin"] == "auto.grpc.grpc" + else: + events = capture_events_forksafe() + + with start_transaction(name="custom_transaction"): + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + events.write_file.close() + + transaction_from_integration = events.read_event() + custom_transaction = events.read_event() + + assert ( + transaction_from_integration["contexts"]["trace"]["origin"] + == "auto.grpc.grpc" + ) + assert ( + transaction_from_integration["spans"][0]["origin"] + == "auto.grpc.grpc.TestService" + ) # manually created in TestService, not the instrumentation - assert custom_transaction["contexts"]["trace"]["origin"] == "manual" - assert custom_transaction["spans"][0]["origin"] == "auto.grpc.grpc" + assert custom_transaction["contexts"]["trace"]["origin"] == "manual" + assert custom_transaction["spans"][0]["origin"] == "auto.grpc.grpc" class TestService(gRPCTestServiceServicer): @@ -401,12 +763,25 @@ class TestService(gRPCTestServiceServicer): @staticmethod def TestServe(request, context): # noqa: N802 - with start_span( - op="test", - name="test", - origin="auto.grpc.grpc.TestService", - ): - pass + client = sentry_sdk.get_client() + span_streaming = has_span_streaming_enabled(client.options) + + if span_streaming: + with sentry_sdk.traces.start_span( + name="test", + attributes={ + "sentry.op": "test", + "sentry.origin": "auto.grpc.grpc.TestService", + }, + ): + pass + else: + with start_span( + op="test", + name="test", + origin="auto.grpc.grpc.TestService", + ): + pass return gRPCTestMessage(text=request.text) diff --git a/tests/integrations/grpc/test_grpc.py.err b/tests/integrations/grpc/test_grpc.py.err new file mode 100644 index 0000000000..99d3c5cf3d --- /dev/null +++ b/tests/integrations/grpc/test_grpc.py.err @@ -0,0 +1,736 @@ +import grpc +import pytest + +from concurrent import futures +from typing import List, Optional, Tuple + +from unittest import mock +from unittest.mock import Mock + +import sentry_sdk +from sentry_sdk import start_span, start_transaction +from sentry_sdk.consts import OP +from sentry_sdk.integrations.grpc import GRPCIntegration +from sentry_sdk.integrations.grpc.client import ClientInterceptor +from tests.conftest import ApproxDict +from tests.integrations.grpc.grpc_test_service_pb2 import gRPCTestMessage +from tests.integrations.grpc.grpc_test_service_pb2_grpc import ( + add_gRPCTestServiceServicer_to_server, + gRPCTestServiceServicer, + gRPCTestServiceStub, +) + + +# Set up in-memory channel instead of network-based +def _set_up( + interceptors: Optional[List[grpc.ServerInterceptor]] = None, +) -> Tuple[grpc.Server, grpc.Channel]: + """ + Sets up a gRPC server and returns both the server and a channel connected to it. + This eliminates network dependencies and makes tests more reliable. + """ + # Create server with thread pool + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=2), + interceptors=interceptors, + ) + + # Add our test service to the server + servicer = TestService() + add_gRPCTestServiceServicer_to_server(servicer, server) + + # Use dynamic port allocation instead of hardcoded port + port = server.add_insecure_port("[::]:0") # Let gRPC choose an available port + server.start() + + # Create channel connected to our server + channel = grpc.insecure_channel(f"localhost:{port}") # noqa: E231 + + return server, channel + + +def _tear_down(server: grpc.Server): + server.stop(grace=None) # Immediate shutdown + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_grpc_server_starts_transaction( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + if span_streaming: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) + items = capture_items_forksafe("span") + else: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + events = capture_events_forksafe() + + server, channel = _set_up() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + if span_streaming: + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[0] + + assert spans[1]["attributes"]["sentry.span.source"] == "custom" + assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER + assert span["attributes"]["sentry.op"] == "test" + else: + events.write_file.close() + event = events.read_event() + span = event["spans"][0] + + assert event["type"] == "transaction" + assert event["transaction_info"] == { + "source": "custom", + } + assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER + assert span["op"] == "test" + + +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_grpc_server_other_interceptors( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + """Ensure compatibility with additional server interceptors.""" + if span_streaming: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) + items = capture_items_forksafe("span") + else: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + events = capture_events_forksafe() + mock_intercept = lambda continuation, handler_call_details: continuation( + handler_call_details + ) + mock_interceptor = Mock() + mock_interceptor.intercept_service.side_effect = mock_intercept + + server, channel = _set_up(interceptors=[mock_interceptor]) + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + mock_interceptor.intercept_service.assert_called_once() + + if span_streaming: + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[0] + + assert spans[1]["attributes"]["sentry.span.source"] == "custom" + assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER + assert span["attributes"]["sentry.op"] == "test" + else: + events.write_file.close() + event = events.read_event() + span = event["spans"][0] + + assert event["type"] == "transaction" + assert event["transaction_info"] == { + "source": "custom", + } + assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER + assert span["op"] == "test" + + +@pytest.mark.forked +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_grpc_server_continues_transaction( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + if span_streaming: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) + items = capture_items_forksafe("span") + else: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + events = capture_events_forksafe() + + server, channel = _set_up() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + + if span_streaming: + with sentry_sdk.traces.start_span(name="custom parent") as segment_span: + metadata = ( + ( + "baggage", + "sentry-trace_id={trace_id},sentry-environment=test," + "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( + trace_id=segment_span.trace_id + ), + ), + ( + "sentry-trace", + "{trace_id}-{parent_span_id}-{sampled}".format( + trace_id=segment_span.trace_id, + parent_span_id=segment_span.span_id, + sampled=1, + ), + ), + ) + else: + with start_transaction() as transaction: + metadata = ( + ( + "baggage", + "sentry-trace_id={trace_id},sentry-environment=test," + "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( + trace_id=transaction.trace_id + ), + ), + ( + "sentry-trace", + "{trace_id}-{parent_span_id}-{sampled}".format( + trace_id=transaction.trace_id, + parent_span_id=transaction.span_id, + sampled=1, + ), + ), + ) + stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) + + _tear_down(server=server) + + if span_streaming: + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[0] + + assert spans[1]["attributes"]["sentry.span.source"] == "custom" + assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER + assert spans[1]["trace_id"] == segment_span.trace_id + assert span["attributes"]["sentry.op"] == "test" + else: + events.write_file.close() + event = events.read_event() + span = event["spans"][0] + + assert event["type"] == "transaction" + assert event["transaction_info"] == { + "source": "custom", + } + assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER + assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert span["op"] == "test" + + +@pytest.mark.forked +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_grpc_client_starts_span( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + if span_streaming: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) + items = capture_items_forksafe("span") + else: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + events = capture_events_forksafe() + + server, channel = _set_up() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + + if span_streaming: + with sentry_sdk.traces.start_span(name="custom parent"): + else: + with start_transaction(): + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + if span_streaming: + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[2] + + assert len(spans) == 4 + assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT + assert ( + span["name"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["attributes"] == ApproxDict( + { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "sentry.environment": mock.ANY, + "sentry.op": "grpc.client", + "sentry.origin": "auto.grpc.grpc", + "sentry.release": mock.ANY, + "sentry.sdk.name": "sentry.python", + "sentry.sdk.version": mock.ANY, + "sentry.segment.id": mock.ANY, + "sentry.segment.name": "custom parent", + "server.address": mock.ANY, + "thread.id": mock.ANY, + "thread.name": mock.ANY, + "code": "OK", + } + ) + else: + events.write_file.close() + events.read_event() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["data"] == ApproxDict( + { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "code": "OK", + } + ) + + +@pytest.mark.forked +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_grpc_client_unary_stream_starts_span( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + if span_streaming: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) + items = capture_items_forksafe("span") + else: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + events = capture_events_forksafe() + + server, channel = _set_up() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + + if span_streaming: + with sentry_sdk.traces.start_span(name="custom parent"): + else: + with start_transaction(): + [el for el in stub.TestUnaryStream(gRPCTestMessage(text="test"))] + + _tear_down(server=server) + + if span_streaming: + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[0] + + assert len(spans) == 2 + assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT + assert ( + span["name"] + == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" + ) + assert span["attributes"] == ApproxDict( + { + "type": "unary stream", + "method": "/grpc_test_server.gRPCTestService/TestUnaryStream", + "sentry.environment": mock.ANY, + "sentry.op": "grpc.client", + "sentry.origin": "auto.grpc.grpc", + "sentry.release": mock.ANY, + "sentry.sdk.name": "sentry.python", + "sentry.sdk.version": mock.ANY, + "sentry.segment.id": mock.ANY, + "sentry.segment.name": "custom parent", + "server.address": mock.ANY, + "thread.id": mock.ANY, + "thread.name": mock.ANY, + } + ) + else: + events.write_file.close() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" + ) + assert span["data"] == ApproxDict( + { + "type": "unary stream", + "method": "/grpc_test_server.gRPCTestService/TestUnaryStream", + } + ) + + +# using unittest.mock.Mock not possible because grpc verifies +# that the interceptor is of the correct type +class MockClientInterceptor(grpc.UnaryUnaryClientInterceptor): + call_counter = 0 + + def intercept_unary_unary(self, continuation, client_call_details, request): + self.__class__.call_counter += 1 + return continuation(client_call_details, request) + + +@pytest.mark.forked +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_grpc_client_other_interceptor( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + """Ensure compatibility with additional client interceptors.""" + if span_streaming: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) + items = capture_items_forksafe("span") + else: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + events = capture_events_forksafe() + + server, channel = _set_up() + + # Intercept the channel + channel = grpc.intercept_channel(channel, MockClientInterceptor()) + stub = gRPCTestServiceStub(channel) + + if span_streaming: + with sentry_sdk.traces.start_span(name="custom parent"): + else: + with start_transaction(): + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + assert MockClientInterceptor.call_counter == 1 + + if span_streaming: + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[2] + + assert len(spans) == 4 + assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT + assert ( + span["name"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["attributes"] == ApproxDict( + { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "sentry.environment": mock.ANY, + "sentry.op": "grpc.client", + "sentry.origin": "auto.grpc.grpc", + "sentry.release": mock.ANY, + "sentry.sdk.name": "sentry.python", + "sentry.sdk.version": mock.ANY, + "sentry.segment.id": mock.ANY, + "sentry.segment.name": "custom parent", + "server.address": mock.ANY, + "thread.id": mock.ANY, + "thread.name": mock.ANY, + "code": "OK", + } + ) + else: + events.write_file.close() + events.read_event() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["data"] == ApproxDict( + { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "code": "OK", + } + ) + + +@pytest.mark.forked +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_prevent_dual_client_interceptor( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + if span_streaming: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) + items = capture_items_forksafe("span") + else: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + events = capture_events_forksafe() + + server, channel = _set_up() + + # Intercept the channel + channel = grpc.intercept_channel(channel, ClientInterceptor()) + stub = gRPCTestServiceStub(channel) + + if span_streaming: + with sentry_sdk.traces.start_span(name="custom parent"): + else: + with start_transaction(): + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + if span_streaming: + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[2] + + assert len(spans) == 4 + assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT + assert ( + span["name"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["attributes"] == ApproxDict( + { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "code": "OK", + } + ) + else: + events.write_file.close() + events.read_event() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["data"] == ApproxDict( + { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "code": "OK", + } + ) + + +@pytest.mark.forked +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_grpc_client_and_servers_interceptors_integration( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + if span_streaming: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) + items = capture_items_forksafe("span") + else: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + events = capture_events_forksafe() + + server, channel = _set_up() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + + if span_streaming: + with sentry_sdk.traces.start_span(name="custom parent"): + else: + with start_transaction(): + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + if span_streaming: + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + + spans = [item["payload"] for item in items if item["type"] == "span"] + + assert ( + spans[0]["trace_id"] + == spans[1]["trace_id"] + ) + else: + events.write_file.close() + server_transaction = events.read_event() + local_transaction = events.read_event() + + assert ( + server_transaction["contexts"]["trace"]["trace_id"] + == local_transaction["contexts"]["trace"]["trace_id"] + ) + + +@pytest.mark.forked +def test_stream_stream(sentry_init): + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + server, channel = _set_up() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + response_iterator = stub.TestStreamStream(iter((gRPCTestMessage(text="test"),))) + for response in response_iterator: + assert response.text == "test" + + _tear_down(server=server) + + +@pytest.mark.forked +def test_stream_unary(sentry_init): + """ + Test to verify stream-stream works. + Tracing not supported for it yet. + """ + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + server, channel = _set_up() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + response = stub.TestStreamUnary(iter((gRPCTestMessage(text="test"),))) + assert response.text == "test" + + _tear_down(server=server) + + +@pytest.mark.forked +@pytest.mark.parametrize("span_streaming", [True, False]) +def test_span_origin( + sentry_init, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + if span_streaming: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) + items = capture_items_forksafe("span") + else: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + events = capture_events_forksafe() + + server, channel = _set_up() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + + if span_streaming: + with sentry_sdk.traces.start_span(name="custom parent"): + else: + with start_transaction(name="custom_transaction"): + stub.TestServe(gRPCTestMessage(text="test")) + + _tear_down(server=server) + + if span_streaming: + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + + spans = [item["payload"] for item in items if item["type"] == "span"] + + assert ( + spans[1]["attributes"]["sentry.origin"] == "auto.grpc.grpc" + ) + assert ( + spans[0]["attributes"]["sentry.origin"] + == "auto.grpc.grpc.TestService" + ) # manually created in TestService, not the instrumentation + else: + events.write_file.close() + + transaction_from_integration = events.read_event() + custom_transaction = events.read_event() + + assert ( + transaction_from_integration["contexts"]["trace"]["origin"] == "auto.grpc.grpc" + ) + assert ( + transaction_from_integration["spans"][0]["origin"] + == "auto.grpc.grpc.TestService" + ) # manually created in TestService, not the instrumentation + + if span_streaming: + assert spans[3]["attributes"]["sentry.origin"] == "manual" + assert spans[2]["attributes"]["sentry.origin"] == "auto.grpc.grpc" + else: + assert custom_transaction["contexts"]["trace"]["origin"] == "manual" + assert custom_transaction["spans"][0]["origin"] == "auto.grpc.grpc" + + +class TestService(gRPCTestServiceServicer): + events = [] + + @staticmethod + def TestServe(request, context): # noqa: N802 + with sentry_sdk.traces.start_span( + name="test", + attributes = { + "sentry.op": "test", + "sentry.origin": "auto.grpc.grpc.TestService", + } + ): + pass + + return gRPCTestMessage(text=request.text) + + @staticmethod + def TestUnaryStream(request, context): # noqa: N802 + for _ in range(3): + yield gRPCTestMessage(text=request.text) + + @staticmethod + def TestStreamStream(request, context): # noqa: N802 + for r in request: + yield r + + @staticmethod + def TestStreamUnary(request, context): # noqa: N802 + requests = [r for r in request] + return requests.pop() diff --git a/tests/integrations/grpc/test_grpc_aio.py b/tests/integrations/grpc/test_grpc_aio.py index 96e9a4dba8..eea5916005 100644 --- a/tests/integrations/grpc/test_grpc_aio.py +++ b/tests/integrations/grpc/test_grpc_aio.py @@ -3,9 +3,11 @@ import grpc import pytest import pytest_asyncio -import sentry_sdk +from unittest import mock +import sentry_sdk from sentry_sdk import start_span, start_transaction +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.consts import OP from sentry_sdk.integrations.grpc import GRPCIntegration from tests.conftest import ApproxDict @@ -16,6 +18,8 @@ gRPCTestServiceStub, ) +from typing import Optional + @pytest_asyncio.fixture(scope="function") async def grpc_server_and_channel(sentry_init): @@ -23,133 +27,253 @@ async def grpc_server_and_channel(sentry_init): Creates an async gRPC server and a channel connected to it. Returns both for use in tests, and cleans up afterward. """ - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + channel: "Optional[grpc.aio.Channel]" = None + server: "Optional[grpc.aio.Server]" = None - # Create server - server = grpc.aio.server() + async def inner(span_streaming: bool): + nonlocal server, channel + + sentry_init( + traces_sample_rate=1.0, + integrations=[GRPCIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) - # Let gRPC choose a free port instead of hardcoding it - port = server.add_insecure_port("[::]:0") + # Create server + server = grpc.aio.server() - # Add service implementation - add_gRPCTestServiceServicer_to_server(TestService, server) + # Let gRPC choose a free port instead of hardcoding it + port = server.add_insecure_port("[::]:0") - # Start the server - await asyncio.create_task(server.start()) + # Add service implementation + add_gRPCTestServiceServicer_to_server(TestService, server) + + # Start the server + await asyncio.create_task(server.start()) - # Create channel connected to our server - channel = grpc.aio.insecure_channel(f"localhost:{port}") # noqa: E231 + # Create channel connected to our server + channel = grpc.aio.insecure_channel(f"localhost:{port}") # noqa: E231 + + return server, channel try: - yield server, channel + yield inner finally: - # Clean up resources - await channel.close() - await server.stop(None) + if channel is not None: + await channel.close() + + if server is not None: + await server.stop(None) @pytest.mark.asyncio -async def test_noop_for_unimplemented_method(sentry_init, capture_events): - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_noop_for_unimplemented_method( + sentry_init, + capture_events, + capture_items, + span_streaming, +): + sentry_init( + traces_sample_rate=1.0, + integrations=[GRPCIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) # Create empty server with no services server = grpc.aio.server() port = server.add_insecure_port("[::]:0") # Let gRPC choose a free port await asyncio.create_task(server.start()) - events = capture_events() - - try: - async with grpc.aio.insecure_channel( - f"localhost:{port}" # noqa: E231 - ) as channel: - stub = gRPCTestServiceStub(channel) - with pytest.raises(grpc.RpcError) as exc: - await stub.TestServe(gRPCTestMessage(text="test")) - assert exc.value.details() == "Method not found!" - finally: - await server.stop(None) - - assert not events + if span_streaming: + items = capture_items("event", "transaction", "span") + + try: + async with grpc.aio.insecure_channel( + f"localhost:{port}" # noqa: E231 + ) as channel: + stub = gRPCTestServiceStub(channel) + with pytest.raises(grpc.RpcError) as exc: + await stub.TestServe(gRPCTestMessage(text="test")) + assert exc.value.details() == "Method not found!" + finally: + await server.stop(None) + + sentry_sdk.flush() + spans = [item.payload for item in items] + assert len(spans) == 1 + else: + events = capture_events() + + try: + async with grpc.aio.insecure_channel( + f"localhost:{port}" # noqa: E231 + ) as channel: + stub = gRPCTestServiceStub(channel) + with pytest.raises(grpc.RpcError) as exc: + await stub.TestServe(gRPCTestMessage(text="test")) + assert exc.value.details() == "Method not found!" + finally: + await server.stop(None) + + assert not events @pytest.mark.asyncio -async def test_grpc_server_starts_transaction(grpc_server_and_channel, capture_events): - _, channel = grpc_server_and_channel - events = capture_events() +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_grpc_server_starts_transaction( + grpc_server_and_channel, + capture_events, + capture_items, + span_streaming, +): + _, channel = await grpc_server_and_channel(span_streaming) # Use the provided channel stub = gRPCTestServiceStub(channel) - await stub.TestServe(gRPCTestMessage(text="test")) - (event,) = events - span = event["spans"][0] + if span_streaming: + items = capture_items("span") - assert event["type"] == "transaction" - assert event["transaction_info"] == { - "source": "custom", - } - assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER - assert span["op"] == "test" + await stub.TestServe(gRPCTestMessage(text="test")) + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + span = spans[0] + + assert spans[1]["attributes"]["sentry.span.source"] == "custom" + assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER + assert span["attributes"]["sentry.op"] == "test" + else: + events = capture_events() + + await stub.TestServe(gRPCTestMessage(text="test")) + + (event,) = events + span = event["spans"][0] + + assert event["type"] == "transaction" + assert event["transaction_info"] == { + "source": "custom", + } + assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER + assert span["op"] == "test" @pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) async def test_grpc_server_continues_transaction( - grpc_server_and_channel, capture_events + grpc_server_and_channel, + capture_events, + capture_items, + span_streaming, ): - _, channel = grpc_server_and_channel - events = capture_events() + _, channel = await grpc_server_and_channel(span_streaming) # Use the provided channel stub = gRPCTestServiceStub(channel) - with sentry_sdk.start_transaction() as transaction: - metadata = ( - ( - "baggage", - "sentry-trace_id={trace_id},sentry-environment=test," - "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( - trace_id=transaction.trace_id + if span_streaming: + items = capture_items("span") + + with sentry_sdk.traces.start_span(name="custom parent") as segment_span: + metadata = ( + ( + "baggage", + "sentry-trace_id={trace_id},sentry-environment=test," + "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( + trace_id=segment_span.trace_id + ), ), - ), - ( - "sentry-trace", - "{trace_id}-{parent_span_id}-{sampled}".format( - trace_id=transaction.trace_id, - parent_span_id=transaction.span_id, - sampled=1, + ( + "sentry-trace", + "{trace_id}-{parent_span_id}-{sampled}".format( + trace_id=segment_span.trace_id, + parent_span_id=segment_span.span_id, + sampled=1, + ), ), - ), - ) + ) + + await stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) + + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + span = spans[0] + + assert spans[1]["attributes"]["sentry.span.source"] == "custom" + assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER + assert spans[1]["trace_id"] == segment_span.trace_id + assert span["attributes"]["sentry.op"] == "test" + else: + events = capture_events() + + with sentry_sdk.start_transaction() as transaction: + metadata = ( + ( + "baggage", + "sentry-trace_id={trace_id},sentry-environment=test," + "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( + trace_id=transaction.trace_id + ), + ), + ( + "sentry-trace", + "{trace_id}-{parent_span_id}-{sampled}".format( + trace_id=transaction.trace_id, + parent_span_id=transaction.span_id, + sampled=1, + ), + ), + ) - await stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) + await stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) - (event, _) = events - span = event["spans"][0] + (event, _) = events + span = event["spans"][0] - assert event["type"] == "transaction" - assert event["transaction_info"] == { - "source": "custom", - } - assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER - assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert span["op"] == "test" + assert event["type"] == "transaction" + assert event["transaction_info"] == { + "source": "custom", + } + assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER + assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert span["op"] == "test" @pytest.mark.asyncio -async def test_grpc_server_exception(grpc_server_and_channel, capture_events): - _, channel = grpc_server_and_channel - events = capture_events() +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_grpc_server_exception( + grpc_server_and_channel, + capture_events, + capture_items, + span_streaming, +): + _, channel = await grpc_server_and_channel(span_streaming) # Use the provided channel stub = gRPCTestServiceStub(channel) - try: - await stub.TestServe(gRPCTestMessage(text="exception")) - raise AssertionError() - except Exception: - pass - (event, _) = events + if span_streaming: + items = capture_items("TestService.TestException", "event", "grpc") + + try: + await stub.TestServe(gRPCTestMessage(text="exception")) + raise AssertionError() + except Exception: + pass + + (event,) = (item.payload for item in items if item.type == "event") + else: + events = capture_events() + + try: + await stub.TestServe(gRPCTestMessage(text="exception")) + raise AssertionError() + except Exception: + pass + + (event, _) = events assert event["exception"]["values"][0]["type"] == "TestService.TestException" assert event["exception"]["values"][0]["value"] == "test" @@ -159,7 +283,7 @@ async def test_grpc_server_exception(grpc_server_and_channel, capture_events): @pytest.mark.asyncio async def test_grpc_server_abort(grpc_server_and_channel, capture_events): - _, channel = grpc_server_and_channel + _, channel = await grpc_server_and_channel(False) events = capture_events() # Use the provided channel @@ -177,66 +301,152 @@ async def test_grpc_server_abort(grpc_server_and_channel, capture_events): @pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) async def test_grpc_client_starts_span( - grpc_server_and_channel, capture_events_forksafe + grpc_server_and_channel, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, ): - _, channel = grpc_server_and_channel - events = capture_events_forksafe() + _, channel = await grpc_server_and_channel(span_streaming) # Use the provided channel stub = gRPCTestServiceStub(channel) - with start_transaction(): - await stub.TestServe(gRPCTestMessage(text="test")) - events.write_file.close() - events.read_event() - local_transaction = events.read_event() - span = local_transaction["spans"][0] + if span_streaming: + items = capture_items_forksafe("span") - assert len(local_transaction["spans"]) == 1 - assert span["op"] == OP.GRPC_CLIENT - assert ( - span["description"] - == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" - ) - assert span["data"] == ApproxDict( - { - "type": "unary unary", - "method": "/grpc_test_server.gRPCTestService/TestServe", - "code": "OK", - } - ) + with sentry_sdk.traces.start_span(name="custom parent") as span: + await stub.TestServe(gRPCTestMessage(text="test")) + + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[2] + + assert len(spans) == 4 + assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT + assert ( + span["name"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["attributes"] == ApproxDict( + { + "type": "unary unary", + "rpc.method": "/grpc_test_server.gRPCTestService/TestServe", + "sentry.environment": mock.ANY, + "sentry.op": "grpc.client", + "sentry.origin": "auto.grpc.grpc", + "sentry.release": mock.ANY, + "sentry.sdk.name": "sentry.python", + "sentry.sdk.version": mock.ANY, + "sentry.segment.id": mock.ANY, + "sentry.segment.name": "custom parent", + "server.address": mock.ANY, + "thread.id": mock.ANY, + "thread.name": mock.ANY, + "code": "OK", + } + ) + else: + events = capture_events_forksafe() + + with start_transaction(): + await stub.TestServe(gRPCTestMessage(text="test")) + + events.write_file.close() + events.read_event() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["data"] == ApproxDict( + { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "code": "OK", + } + ) @pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) async def test_grpc_client_unary_stream_starts_span( - grpc_server_and_channel, capture_events_forksafe + grpc_server_and_channel, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, ): - _, channel = grpc_server_and_channel - events = capture_events_forksafe() + _, channel = await grpc_server_and_channel(span_streaming) # Use the provided channel stub = gRPCTestServiceStub(channel) - with start_transaction(): - response = stub.TestUnaryStream(gRPCTestMessage(text="test")) - [_ async for _ in response] - - events.write_file.close() - local_transaction = events.read_event() - span = local_transaction["spans"][0] - - assert len(local_transaction["spans"]) == 1 - assert span["op"] == OP.GRPC_CLIENT - assert ( - span["description"] - == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" - ) - assert span["data"] == ApproxDict( - { - "type": "unary stream", - "method": "/grpc_test_server.gRPCTestService/TestUnaryStream", - } - ) + + if span_streaming: + items = capture_items_forksafe("span") + + with sentry_sdk.traces.start_span(name="custom parent"): + response = stub.TestUnaryStream(gRPCTestMessage(text="test")) + [_ async for _ in response] + + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[0] + + assert len(spans) == 2 + assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT + assert ( + span["name"] + == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" + ) + assert span["attributes"] == ApproxDict( + { + "type": "unary stream", + "rpc.method": "/grpc_test_server.gRPCTestService/TestUnaryStream", + "sentry.environment": mock.ANY, + "sentry.op": "grpc.client", + "sentry.origin": "auto.grpc.grpc", + "sentry.release": mock.ANY, + "sentry.sdk.name": "sentry.python", + "sentry.sdk.version": mock.ANY, + "sentry.segment.id": mock.ANY, + "sentry.segment.name": "custom parent", + "server.address": mock.ANY, + "thread.id": mock.ANY, + "thread.name": mock.ANY, + } + ) + else: + events = capture_events_forksafe() + + with start_transaction(): + response = stub.TestUnaryStream(gRPCTestMessage(text="test")) + [_ async for _ in response] + + events.write_file.close() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" + ) + assert span["data"] == ApproxDict( + { + "type": "unary stream", + "method": "/grpc_test_server.gRPCTestService/TestUnaryStream", + } + ) @pytest.mark.asyncio @@ -245,7 +455,7 @@ async def test_stream_stream(grpc_server_and_channel): Test to verify stream-stream works. Tracing not supported for it yet. """ - _, channel = grpc_server_and_channel + _, channel = await grpc_server_and_channel(False) # Use the provided channel stub = gRPCTestServiceStub(channel) @@ -260,7 +470,7 @@ async def test_stream_unary(grpc_server_and_channel): Test to verify stream-stream works. Tracing not supported for it yet. """ - _, channel = grpc_server_and_channel + _, channel = await grpc_server_and_channel(False) # Use the provided channel stub = gRPCTestServiceStub(channel) @@ -269,30 +479,59 @@ async def test_stream_unary(grpc_server_and_channel): @pytest.mark.asyncio -async def test_span_origin(grpc_server_and_channel, capture_events_forksafe): - _, channel = grpc_server_and_channel - events = capture_events_forksafe() +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_span_origin( + grpc_server_and_channel, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + _, channel = await grpc_server_and_channel(span_streaming) # Use the provided channel stub = gRPCTestServiceStub(channel) - with start_transaction(name="custom_transaction"): - await stub.TestServe(gRPCTestMessage(text="test")) - events.write_file.close() + if span_streaming: + items = capture_items_forksafe("span") - transaction_from_integration = events.read_event() - custom_transaction = events.read_event() + with sentry_sdk.traces.start_span(name="custom parent"): + await stub.TestServe(gRPCTestMessage(text="test")) - assert ( - transaction_from_integration["contexts"]["trace"]["origin"] == "auto.grpc.grpc" - ) - assert ( - transaction_from_integration["spans"][0]["origin"] - == "auto.grpc.grpc.TestService.aio" - ) # manually created in TestService, not the instrumentation + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() - assert custom_transaction["contexts"]["trace"]["origin"] == "manual" - assert custom_transaction["spans"][0]["origin"] == "auto.grpc.grpc" + spans = [item["payload"] for item in items if item["type"] == "span"] + + assert spans[1]["attributes"]["sentry.origin"] == "auto.grpc.grpc" + assert ( + spans[0]["attributes"]["sentry.origin"] == "auto.grpc.grpc.TestService.aio" + ) # manually created in TestService, not the instrumentation + + assert spans[3]["attributes"]["sentry.origin"] == "manual" + assert spans[2]["attributes"]["sentry.origin"] == "auto.grpc.grpc" + else: + events = capture_events_forksafe() + + with start_transaction(name="custom_transaction"): + await stub.TestServe(gRPCTestMessage(text="test")) + + events.write_file.close() + + transaction_from_integration = events.read_event() + custom_transaction = events.read_event() + + assert ( + transaction_from_integration["contexts"]["trace"]["origin"] + == "auto.grpc.grpc" + ) + assert ( + transaction_from_integration["spans"][0]["origin"] + == "auto.grpc.grpc.TestService.aio" + ) # manually created in TestService, not the instrumentation + + assert custom_transaction["contexts"]["trace"]["origin"] == "manual" + assert custom_transaction["spans"][0]["origin"] == "auto.grpc.grpc" class TestService(gRPCTestServiceServicer): @@ -304,12 +543,25 @@ def __init__(self): @classmethod async def TestServe(cls, request, context): # noqa: N802 - with start_span( - op="test", - name="test", - origin="auto.grpc.grpc.TestService.aio", - ): - pass + client = sentry_sdk.get_client() + span_streaming = has_span_streaming_enabled(client.options) + + if span_streaming: + with sentry_sdk.traces.start_span( + name="test", + attributes={ + "sentry.op": "test", + "sentry.origin": "auto.grpc.grpc.TestService.aio", + }, + ): + pass + else: + with start_span( + op="test", + name="test", + origin="auto.grpc.grpc.TestService.aio", + ): + pass if request.text == "exception": raise cls.TestException() diff --git a/tests/integrations/grpc/test_grpc_aio.py.err b/tests/integrations/grpc/test_grpc_aio.py.err new file mode 100644 index 0000000000..a09f3126e6 --- /dev/null +++ b/tests/integrations/grpc/test_grpc_aio.py.err @@ -0,0 +1,557 @@ +import asyncio + +import grpc +import pytest +import pytest_asyncio +from unittest import mock + +import sentry_sdk +from sentry_sdk import start_span, start_transaction +from sentry_sdk.consts import OP +from sentry_sdk.integrations.grpc import GRPCIntegration +from tests.conftest import ApproxDict +from tests.integrations.grpc.grpc_test_service_pb2 import gRPCTestMessage +from tests.integrations.grpc.grpc_test_service_pb2_grpc import ( + add_gRPCTestServiceServicer_to_server, + gRPCTestServiceServicer, + gRPCTestServiceStub, +) + +from typing import Optional + + +@pytest_asyncio.fixture(scope="function") +async def grpc_server_and_channel(sentry_init): + """ + Creates an async gRPC server and a channel connected to it. + Returns both for use in tests, and cleans up afterward. + """ + channel: "Optional[grpc.aio.Channel]" = None + server: "Optional[grpc.aio.Server]" = None + async def inner(span_streaming: bool): + nonlocal server, channel + + sentry_init( + traces_sample_rate=1.0, + integrations=[GRPCIntegration()], + _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, + ) + + # Create server + server = grpc.aio.server() + + # Let gRPC choose a free port instead of hardcoding it + port = server.add_insecure_port("[::]:0") + + # Add service implementation + add_gRPCTestServiceServicer_to_server(TestService, server) + + # Start the server + await asyncio.create_task(server.start()) + + # Create channel connected to our server + channel = grpc.aio.insecure_channel(f"localhost:{port}") # noqa: E231 + + return server, channel + + try: + yield inner + finally: + if channel is not None: + await channel.close() + + if server is not None: + await server.stop(None) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_noop_for_unimplemented_method( + sentry_init, + capture_events, + capture_items, + span_streaming, +): + if span_streaming: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) + else: + sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + + # Create empty server with no services + server = grpc.aio.server() + port = server.add_insecure_port("[::]:0") # Let gRPC choose a free port + await asyncio.create_task(server.start()) + + if span_streaming: + items = capture_items("event", "transaction", "span") + else: + events = capture_events() + + try: + async with grpc.aio.insecure_channel( + f"localhost:{port}" # noqa: E231 + ) as channel: + stub = gRPCTestServiceStub(channel) + with pytest.raises(grpc.RpcError) as exc: + await stub.TestServe(gRPCTestMessage(text="test")) + assert exc.value.details() == "Method not found!" + finally: + await server.stop(None) + + if span_streaming: + sentry_sdk.flush() + spans = [item.payload for item in items] + assert len(spans) == 1 + else: + assert not events + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_grpc_server_starts_transaction( + grpc_server_and_channel, + capture_events, + capture_items, + span_streaming, +): + if span_streaming: + _, channel = await grpc_server_and_channel(span_streaming) + items = capture_items("span") + else: + _, channel = grpc_server_and_channel + events = capture_events() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + await stub.TestServe(gRPCTestMessage(text="test")) + + if span_streaming: + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + span = spans[0] + + assert spans[1]["attributes"]["sentry.span.source"] == "custom" + assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER + assert span["attributes"]["sentry.op"] == "test" + else: + (event,) = events + span = event["spans"][0] + + assert event["type"] == "transaction" + assert event["transaction_info"] == { + "source": "custom", + } + assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER + assert span["op"] == "test" + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_grpc_server_continues_transaction( + grpc_server_and_channel, + capture_events, + capture_items, + span_streaming, +): + if span_streaming: + _, channel = await grpc_server_and_channel(span_streaming) + items = capture_items("span") + else: + _, channel = grpc_server_and_channel + events = capture_events() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + + if span_streaming: + with sentry_sdk.traces.start_span(name="custom parent") as segment_span: + metadata = ( + ( + "baggage", + "sentry-trace_id={trace_id},sentry-environment=test," + "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( + trace_id=segment_span.trace_id + ), + ), + ( + "sentry-trace", + "{trace_id}-{parent_span_id}-{sampled}".format( + trace_id=segment_span.trace_id, + parent_span_id=segment_span.span_id, + sampled=1, + ), + ), + ) + else: + with sentry_sdk.start_transaction() as transaction: + metadata = ( + ( + "baggage", + "sentry-trace_id={trace_id},sentry-environment=test," + "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( + trace_id=transaction.trace_id + ), + ), + ( + "sentry-trace", + "{trace_id}-{parent_span_id}-{sampled}".format( + trace_id=transaction.trace_id, + parent_span_id=transaction.span_id, + sampled=1, + ), + ), + ) + + await stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) + + if span_streaming: + sentry_sdk.flush() + spans = [item.payload for item in items if item.type == "span"] + span = spans[0] + + assert spans[1]["attributes"]["sentry.span.source"] == "custom" + assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER + assert spans[1]["trace_id"] == segment_span.trace_id + assert span["attributes"]["sentry.op"] == "test" + else: + (event, _) = events + span = event["spans"][0] + + assert event["type"] == "transaction" + assert event["transaction_info"] == { + "source": "custom", + } + assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER + assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert span["op"] == "test" + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_grpc_server_exception( + grpc_server_and_channel, + capture_events, + capture_items, + span_streaming, +): + if span_streaming: + _, channel = await grpc_server_and_channel(span_streaming) + items = capture_items("TestService.TestException", "event", "grpc") + else: + _, channel = grpc_server_and_channel + events = capture_events() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + try: + await stub.TestServe(gRPCTestMessage(text="exception")) + raise AssertionError() + except Exception: + pass + + if span_streaming: + (event, ) = (item.payload for item in items if item.type == "event") + else: + (event, _) = events + + assert event["exception"]["values"][0]["type"] == "TestService.TestException" + assert event["exception"]["values"][0]["value"] == "test" + assert event["exception"]["values"][0]["mechanism"]["handled"] is False + assert event["exception"]["values"][0]["mechanism"]["type"] == "grpc" + + +@pytest.mark.asyncio +async def test_grpc_server_abort(grpc_server_and_channel, capture_events): + _, channel = grpc_server_and_channel + events = capture_events() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + try: + await stub.TestServe(gRPCTestMessage(text="abort")) + raise AssertionError() + except Exception: + pass + + # Add a small delay to allow events to be collected + await asyncio.sleep(0.1) + + assert len(events) == 1 + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_grpc_client_starts_span( + grpc_server_and_channel, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + if span_streaming: + _, channel = await grpc_server_and_channel(span_streaming) + items = capture_items_forksafe("span") + else: + _, channel = grpc_server_and_channel + events = capture_events_forksafe() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + if span_streaming: + with sentry_sdk.traces.start_span(name="custom parent") as span: + await stub.TestServe(gRPCTestMessage(text="test")) + + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[2] + + assert len(spans) == 4 + assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT + assert ( + span["name"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["attributes"] == ApproxDict( + { + "type": "unary unary", + "rpc.method": "/grpc_test_server.gRPCTestService/TestServe", + "sentry.environment": mock.ANY, + "sentry.op": "grpc.client", + "sentry.origin": "auto.grpc.grpc", + "sentry.release": mock.ANY, + "sentry.sdk.name": "sentry.python", + "sentry.sdk.version": mock.ANY, + "sentry.segment.id": mock.ANY, + "sentry.segment.name": "custom parent", + "server.address": mock.ANY, + "thread.id": mock.ANY, + "thread.name": mock.ANY, + "code": "OK", + } + ) + else: + with start_transaction(): + await stub.TestServe(gRPCTestMessage(text="test")) + + events.write_file.close() + events.read_event() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" + ) + assert span["data"] == ApproxDict( + { + "type": "unary unary", + "method": "/grpc_test_server.gRPCTestService/TestServe", + "code": "OK", + } + ) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_grpc_client_unary_stream_starts_span( + grpc_server_and_channel, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + if span_streaming: + _, channel = await grpc_server_and_channel(span_streaming) + items = capture_items_forksafe("span") + else: + _, channel = grpc_server_and_channel + events = capture_events_forksafe() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + if span_streaming: + with sentry_sdk.traces.start_span(name="custom parent"): + else: + with start_transaction(): + response = stub.TestUnaryStream(gRPCTestMessage(text="test")) + [_ async for _ in response] + + if span_streaming: + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + spans = [item["payload"] for item in items if item["type"] == "span"] + span = spans[0] + + assert len(spans) == 2 + assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT + assert ( + span["name"] + == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" + ) + assert span["attributes"] == ApproxDict( + { + "type": "unary stream", + "rpc.method": "/grpc_test_server.gRPCTestService/TestUnaryStream", + "sentry.environment": mock.ANY, + "sentry.op": "grpc.client", + "sentry.origin": "auto.grpc.grpc", + "sentry.release": mock.ANY, + "sentry.sdk.name": "sentry.python", + "sentry.sdk.version": mock.ANY, + "sentry.segment.id": mock.ANY, + "sentry.segment.name": "custom parent", + "server.address": mock.ANY, + "thread.id": mock.ANY, + "thread.name": mock.ANY, + } + ) + else: + events.write_file.close() + local_transaction = events.read_event() + span = local_transaction["spans"][0] + + assert len(local_transaction["spans"]) == 1 + assert span["op"] == OP.GRPC_CLIENT + assert ( + span["description"] + == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" + ) + assert span["data"] == ApproxDict( + { + "type": "unary stream", + "method": "/grpc_test_server.gRPCTestService/TestUnaryStream", + } + ) + + +@pytest.mark.asyncio +async def test_stream_stream(grpc_server_and_channel): + """ + Test to verify stream-stream works. + Tracing not supported for it yet. + """ + _, channel = grpc_server_and_channel + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + response = stub.TestStreamStream((gRPCTestMessage(text="test"),)) + async for r in response: + assert r.text == "test" + + +@pytest.mark.asyncio +async def test_stream_unary(grpc_server_and_channel): + """ + Test to verify stream-stream works. + Tracing not supported for it yet. + """ + _, channel = grpc_server_and_channel + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + response = await stub.TestStreamUnary((gRPCTestMessage(text="test"),)) + assert response.text == "test" + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_span_origin( + grpc_server_and_channel, + capture_events_forksafe, + capture_items_forksafe, + span_streaming, +): + if span_streaming: + _, channel = await grpc_server_and_channel(span_streaming) + items = capture_items_forksafe("span") + else: + _, channel = grpc_server_and_channel + events = capture_events_forksafe() + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + if span_streaming: + with sentry_sdk.traces.start_span(name="custom parent"): + await stub.TestServe(gRPCTestMessage(text="test")) + + sentry_sdk.flush() + items.write_file.close() + items = items.read_event() + + spans = [item["payload"] for item in items if item["type"] == "span"] + + assert ( + spans[1]["attributes"]["sentry.origin"] == "auto.grpc.grpc" + ) + assert ( + spans[0]["attributes"]["sentry.origin"] + == "auto.grpc.grpc.TestService.aio" + ) # manually created in TestService, not the instrumentation + else: + with start_transaction(name="custom_transaction"): + await stub.TestServe(gRPCTestMessage(text="test")) + + events.write_file.close() + + transaction_from_integration = events.read_event() + custom_transaction = events.read_event() + + assert ( + transaction_from_integration["contexts"]["trace"]["origin"] == "auto.grpc.grpc" + ) + assert ( + transaction_from_integration["spans"][0]["origin"] + == "auto.grpc.grpc.TestService.aio" + ) # manually created in TestService, not the instrumentation + + if span_streaming: + assert spans[3]["attributes"]["sentry.origin"] == "manual" + assert spans[2]["attributes"]["sentry.origin"] == "auto.grpc.grpc" + else: + assert custom_transaction["contexts"]["trace"]["origin"] == "manual" + assert custom_transaction["spans"][0]["origin"] == "auto.grpc.grpc" + + +class TestService(gRPCTestServiceServicer): + class TestException(Exception): + __test__ = False + + def __init__(self): + super().__init__("test") + + @classmethod + async def TestServe(cls, request, context): # noqa: N802 + with sentry_sdk.traces.start_span( + name="test", + attributes = { + "sentry.op": "test", + "sentry.origin": "auto.grpc.grpc.TestService.aio", + }, + ): + pass + + if request.text == "exception": + raise cls.TestException() + + if request.text == "abort": + await context.abort(grpc.StatusCode.ABORTED, "Aborted!") + + return gRPCTestMessage(text=request.text) + + @classmethod + async def TestUnaryStream(cls, request, context): # noqa: N802 + for _ in range(3): + yield gRPCTestMessage(text=request.text) + + @classmethod + async def TestStreamStream(cls, request, context): # noqa: N802 + async for r in request: + yield r + + @classmethod + async def TestStreamUnary(cls, request, context): # noqa: N802 + requests = [r async for r in request] + return requests.pop() From 8f48c88040c48315022947fb9f6585ff70dd037a Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 5 May 2026 14:06:31 +0200 Subject: [PATCH 2/7] remove files --- tests/integrations/grpc/test_grpc.py.err | 736 ------------------- tests/integrations/grpc/test_grpc_aio.py.err | 557 -------------- 2 files changed, 1293 deletions(-) delete mode 100644 tests/integrations/grpc/test_grpc.py.err delete mode 100644 tests/integrations/grpc/test_grpc_aio.py.err diff --git a/tests/integrations/grpc/test_grpc.py.err b/tests/integrations/grpc/test_grpc.py.err deleted file mode 100644 index 99d3c5cf3d..0000000000 --- a/tests/integrations/grpc/test_grpc.py.err +++ /dev/null @@ -1,736 +0,0 @@ -import grpc -import pytest - -from concurrent import futures -from typing import List, Optional, Tuple - -from unittest import mock -from unittest.mock import Mock - -import sentry_sdk -from sentry_sdk import start_span, start_transaction -from sentry_sdk.consts import OP -from sentry_sdk.integrations.grpc import GRPCIntegration -from sentry_sdk.integrations.grpc.client import ClientInterceptor -from tests.conftest import ApproxDict -from tests.integrations.grpc.grpc_test_service_pb2 import gRPCTestMessage -from tests.integrations.grpc.grpc_test_service_pb2_grpc import ( - add_gRPCTestServiceServicer_to_server, - gRPCTestServiceServicer, - gRPCTestServiceStub, -) - - -# Set up in-memory channel instead of network-based -def _set_up( - interceptors: Optional[List[grpc.ServerInterceptor]] = None, -) -> Tuple[grpc.Server, grpc.Channel]: - """ - Sets up a gRPC server and returns both the server and a channel connected to it. - This eliminates network dependencies and makes tests more reliable. - """ - # Create server with thread pool - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=2), - interceptors=interceptors, - ) - - # Add our test service to the server - servicer = TestService() - add_gRPCTestServiceServicer_to_server(servicer, server) - - # Use dynamic port allocation instead of hardcoded port - port = server.add_insecure_port("[::]:0") # Let gRPC choose an available port - server.start() - - # Create channel connected to our server - channel = grpc.insecure_channel(f"localhost:{port}") # noqa: E231 - - return server, channel - - -def _tear_down(server: grpc.Server): - server.stop(grace=None) # Immediate shutdown - - -@pytest.mark.parametrize("span_streaming", [True, False]) -def test_grpc_server_starts_transaction( - sentry_init, - capture_events_forksafe, - capture_items_forksafe, - span_streaming, -): - if span_streaming: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) - items = capture_items_forksafe("span") - else: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() - - server, channel = _set_up() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - stub.TestServe(gRPCTestMessage(text="test")) - - _tear_down(server=server) - - if span_streaming: - sentry_sdk.flush() - items.write_file.close() - items = items.read_event() - spans = [item["payload"] for item in items if item["type"] == "span"] - span = spans[0] - - assert spans[1]["attributes"]["sentry.span.source"] == "custom" - assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER - assert span["attributes"]["sentry.op"] == "test" - else: - events.write_file.close() - event = events.read_event() - span = event["spans"][0] - - assert event["type"] == "transaction" - assert event["transaction_info"] == { - "source": "custom", - } - assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER - assert span["op"] == "test" - - -@pytest.mark.parametrize("span_streaming", [True, False]) -def test_grpc_server_other_interceptors( - sentry_init, - capture_events_forksafe, - capture_items_forksafe, - span_streaming, -): - """Ensure compatibility with additional server interceptors.""" - if span_streaming: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) - items = capture_items_forksafe("span") - else: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() - mock_intercept = lambda continuation, handler_call_details: continuation( - handler_call_details - ) - mock_interceptor = Mock() - mock_interceptor.intercept_service.side_effect = mock_intercept - - server, channel = _set_up(interceptors=[mock_interceptor]) - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - stub.TestServe(gRPCTestMessage(text="test")) - - _tear_down(server=server) - - mock_interceptor.intercept_service.assert_called_once() - - if span_streaming: - sentry_sdk.flush() - items.write_file.close() - items = items.read_event() - spans = [item["payload"] for item in items if item["type"] == "span"] - span = spans[0] - - assert spans[1]["attributes"]["sentry.span.source"] == "custom" - assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER - assert span["attributes"]["sentry.op"] == "test" - else: - events.write_file.close() - event = events.read_event() - span = event["spans"][0] - - assert event["type"] == "transaction" - assert event["transaction_info"] == { - "source": "custom", - } - assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER - assert span["op"] == "test" - - -@pytest.mark.forked -@pytest.mark.parametrize("span_streaming", [True, False]) -def test_grpc_server_continues_transaction( - sentry_init, - capture_events_forksafe, - capture_items_forksafe, - span_streaming, -): - if span_streaming: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) - items = capture_items_forksafe("span") - else: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() - - server, channel = _set_up() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - - if span_streaming: - with sentry_sdk.traces.start_span(name="custom parent") as segment_span: - metadata = ( - ( - "baggage", - "sentry-trace_id={trace_id},sentry-environment=test," - "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( - trace_id=segment_span.trace_id - ), - ), - ( - "sentry-trace", - "{trace_id}-{parent_span_id}-{sampled}".format( - trace_id=segment_span.trace_id, - parent_span_id=segment_span.span_id, - sampled=1, - ), - ), - ) - else: - with start_transaction() as transaction: - metadata = ( - ( - "baggage", - "sentry-trace_id={trace_id},sentry-environment=test," - "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( - trace_id=transaction.trace_id - ), - ), - ( - "sentry-trace", - "{trace_id}-{parent_span_id}-{sampled}".format( - trace_id=transaction.trace_id, - parent_span_id=transaction.span_id, - sampled=1, - ), - ), - ) - stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) - - _tear_down(server=server) - - if span_streaming: - sentry_sdk.flush() - items.write_file.close() - items = items.read_event() - spans = [item["payload"] for item in items if item["type"] == "span"] - span = spans[0] - - assert spans[1]["attributes"]["sentry.span.source"] == "custom" - assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER - assert spans[1]["trace_id"] == segment_span.trace_id - assert span["attributes"]["sentry.op"] == "test" - else: - events.write_file.close() - event = events.read_event() - span = event["spans"][0] - - assert event["type"] == "transaction" - assert event["transaction_info"] == { - "source": "custom", - } - assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER - assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert span["op"] == "test" - - -@pytest.mark.forked -@pytest.mark.parametrize("span_streaming", [True, False]) -def test_grpc_client_starts_span( - sentry_init, - capture_events_forksafe, - capture_items_forksafe, - span_streaming, -): - if span_streaming: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) - items = capture_items_forksafe("span") - else: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() - - server, channel = _set_up() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - - if span_streaming: - with sentry_sdk.traces.start_span(name="custom parent"): - else: - with start_transaction(): - stub.TestServe(gRPCTestMessage(text="test")) - - _tear_down(server=server) - - if span_streaming: - sentry_sdk.flush() - items.write_file.close() - items = items.read_event() - spans = [item["payload"] for item in items if item["type"] == "span"] - span = spans[2] - - assert len(spans) == 4 - assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT - assert ( - span["name"] - == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" - ) - assert span["attributes"] == ApproxDict( - { - "type": "unary unary", - "method": "/grpc_test_server.gRPCTestService/TestServe", - "sentry.environment": mock.ANY, - "sentry.op": "grpc.client", - "sentry.origin": "auto.grpc.grpc", - "sentry.release": mock.ANY, - "sentry.sdk.name": "sentry.python", - "sentry.sdk.version": mock.ANY, - "sentry.segment.id": mock.ANY, - "sentry.segment.name": "custom parent", - "server.address": mock.ANY, - "thread.id": mock.ANY, - "thread.name": mock.ANY, - "code": "OK", - } - ) - else: - events.write_file.close() - events.read_event() - local_transaction = events.read_event() - span = local_transaction["spans"][0] - - assert len(local_transaction["spans"]) == 1 - assert span["op"] == OP.GRPC_CLIENT - assert ( - span["description"] - == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" - ) - assert span["data"] == ApproxDict( - { - "type": "unary unary", - "method": "/grpc_test_server.gRPCTestService/TestServe", - "code": "OK", - } - ) - - -@pytest.mark.forked -@pytest.mark.parametrize("span_streaming", [True, False]) -def test_grpc_client_unary_stream_starts_span( - sentry_init, - capture_events_forksafe, - capture_items_forksafe, - span_streaming, -): - if span_streaming: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) - items = capture_items_forksafe("span") - else: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() - - server, channel = _set_up() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - - if span_streaming: - with sentry_sdk.traces.start_span(name="custom parent"): - else: - with start_transaction(): - [el for el in stub.TestUnaryStream(gRPCTestMessage(text="test"))] - - _tear_down(server=server) - - if span_streaming: - sentry_sdk.flush() - items.write_file.close() - items = items.read_event() - spans = [item["payload"] for item in items if item["type"] == "span"] - span = spans[0] - - assert len(spans) == 2 - assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT - assert ( - span["name"] - == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" - ) - assert span["attributes"] == ApproxDict( - { - "type": "unary stream", - "method": "/grpc_test_server.gRPCTestService/TestUnaryStream", - "sentry.environment": mock.ANY, - "sentry.op": "grpc.client", - "sentry.origin": "auto.grpc.grpc", - "sentry.release": mock.ANY, - "sentry.sdk.name": "sentry.python", - "sentry.sdk.version": mock.ANY, - "sentry.segment.id": mock.ANY, - "sentry.segment.name": "custom parent", - "server.address": mock.ANY, - "thread.id": mock.ANY, - "thread.name": mock.ANY, - } - ) - else: - events.write_file.close() - local_transaction = events.read_event() - span = local_transaction["spans"][0] - - assert len(local_transaction["spans"]) == 1 - assert span["op"] == OP.GRPC_CLIENT - assert ( - span["description"] - == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" - ) - assert span["data"] == ApproxDict( - { - "type": "unary stream", - "method": "/grpc_test_server.gRPCTestService/TestUnaryStream", - } - ) - - -# using unittest.mock.Mock not possible because grpc verifies -# that the interceptor is of the correct type -class MockClientInterceptor(grpc.UnaryUnaryClientInterceptor): - call_counter = 0 - - def intercept_unary_unary(self, continuation, client_call_details, request): - self.__class__.call_counter += 1 - return continuation(client_call_details, request) - - -@pytest.mark.forked -@pytest.mark.parametrize("span_streaming", [True, False]) -def test_grpc_client_other_interceptor( - sentry_init, - capture_events_forksafe, - capture_items_forksafe, - span_streaming, -): - """Ensure compatibility with additional client interceptors.""" - if span_streaming: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) - items = capture_items_forksafe("span") - else: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() - - server, channel = _set_up() - - # Intercept the channel - channel = grpc.intercept_channel(channel, MockClientInterceptor()) - stub = gRPCTestServiceStub(channel) - - if span_streaming: - with sentry_sdk.traces.start_span(name="custom parent"): - else: - with start_transaction(): - stub.TestServe(gRPCTestMessage(text="test")) - - _tear_down(server=server) - - assert MockClientInterceptor.call_counter == 1 - - if span_streaming: - sentry_sdk.flush() - items.write_file.close() - items = items.read_event() - spans = [item["payload"] for item in items if item["type"] == "span"] - span = spans[2] - - assert len(spans) == 4 - assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT - assert ( - span["name"] - == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" - ) - assert span["attributes"] == ApproxDict( - { - "type": "unary unary", - "method": "/grpc_test_server.gRPCTestService/TestServe", - "sentry.environment": mock.ANY, - "sentry.op": "grpc.client", - "sentry.origin": "auto.grpc.grpc", - "sentry.release": mock.ANY, - "sentry.sdk.name": "sentry.python", - "sentry.sdk.version": mock.ANY, - "sentry.segment.id": mock.ANY, - "sentry.segment.name": "custom parent", - "server.address": mock.ANY, - "thread.id": mock.ANY, - "thread.name": mock.ANY, - "code": "OK", - } - ) - else: - events.write_file.close() - events.read_event() - local_transaction = events.read_event() - span = local_transaction["spans"][0] - - assert len(local_transaction["spans"]) == 1 - assert span["op"] == OP.GRPC_CLIENT - assert ( - span["description"] - == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" - ) - assert span["data"] == ApproxDict( - { - "type": "unary unary", - "method": "/grpc_test_server.gRPCTestService/TestServe", - "code": "OK", - } - ) - - -@pytest.mark.forked -@pytest.mark.parametrize("span_streaming", [True, False]) -def test_prevent_dual_client_interceptor( - sentry_init, - capture_events_forksafe, - capture_items_forksafe, - span_streaming, -): - if span_streaming: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) - items = capture_items_forksafe("span") - else: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() - - server, channel = _set_up() - - # Intercept the channel - channel = grpc.intercept_channel(channel, ClientInterceptor()) - stub = gRPCTestServiceStub(channel) - - if span_streaming: - with sentry_sdk.traces.start_span(name="custom parent"): - else: - with start_transaction(): - stub.TestServe(gRPCTestMessage(text="test")) - - _tear_down(server=server) - - if span_streaming: - sentry_sdk.flush() - items.write_file.close() - items = items.read_event() - spans = [item["payload"] for item in items if item["type"] == "span"] - span = spans[2] - - assert len(spans) == 4 - assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT - assert ( - span["name"] - == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" - ) - assert span["attributes"] == ApproxDict( - { - "type": "unary unary", - "method": "/grpc_test_server.gRPCTestService/TestServe", - "code": "OK", - } - ) - else: - events.write_file.close() - events.read_event() - local_transaction = events.read_event() - span = local_transaction["spans"][0] - - assert len(local_transaction["spans"]) == 1 - assert span["op"] == OP.GRPC_CLIENT - assert ( - span["description"] - == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" - ) - assert span["data"] == ApproxDict( - { - "type": "unary unary", - "method": "/grpc_test_server.gRPCTestService/TestServe", - "code": "OK", - } - ) - - -@pytest.mark.forked -@pytest.mark.parametrize("span_streaming", [True, False]) -def test_grpc_client_and_servers_interceptors_integration( - sentry_init, - capture_events_forksafe, - capture_items_forksafe, - span_streaming, -): - if span_streaming: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) - items = capture_items_forksafe("span") - else: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() - - server, channel = _set_up() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - - if span_streaming: - with sentry_sdk.traces.start_span(name="custom parent"): - else: - with start_transaction(): - stub.TestServe(gRPCTestMessage(text="test")) - - _tear_down(server=server) - - if span_streaming: - sentry_sdk.flush() - items.write_file.close() - items = items.read_event() - - spans = [item["payload"] for item in items if item["type"] == "span"] - - assert ( - spans[0]["trace_id"] - == spans[1]["trace_id"] - ) - else: - events.write_file.close() - server_transaction = events.read_event() - local_transaction = events.read_event() - - assert ( - server_transaction["contexts"]["trace"]["trace_id"] - == local_transaction["contexts"]["trace"]["trace_id"] - ) - - -@pytest.mark.forked -def test_stream_stream(sentry_init): - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - server, channel = _set_up() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - response_iterator = stub.TestStreamStream(iter((gRPCTestMessage(text="test"),))) - for response in response_iterator: - assert response.text == "test" - - _tear_down(server=server) - - -@pytest.mark.forked -def test_stream_unary(sentry_init): - """ - Test to verify stream-stream works. - Tracing not supported for it yet. - """ - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - server, channel = _set_up() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - response = stub.TestStreamUnary(iter((gRPCTestMessage(text="test"),))) - assert response.text == "test" - - _tear_down(server=server) - - -@pytest.mark.forked -@pytest.mark.parametrize("span_streaming", [True, False]) -def test_span_origin( - sentry_init, - capture_events_forksafe, - capture_items_forksafe, - span_streaming, -): - if span_streaming: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) - items = capture_items_forksafe("span") - else: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - events = capture_events_forksafe() - - server, channel = _set_up() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - - if span_streaming: - with sentry_sdk.traces.start_span(name="custom parent"): - else: - with start_transaction(name="custom_transaction"): - stub.TestServe(gRPCTestMessage(text="test")) - - _tear_down(server=server) - - if span_streaming: - sentry_sdk.flush() - items.write_file.close() - items = items.read_event() - - spans = [item["payload"] for item in items if item["type"] == "span"] - - assert ( - spans[1]["attributes"]["sentry.origin"] == "auto.grpc.grpc" - ) - assert ( - spans[0]["attributes"]["sentry.origin"] - == "auto.grpc.grpc.TestService" - ) # manually created in TestService, not the instrumentation - else: - events.write_file.close() - - transaction_from_integration = events.read_event() - custom_transaction = events.read_event() - - assert ( - transaction_from_integration["contexts"]["trace"]["origin"] == "auto.grpc.grpc" - ) - assert ( - transaction_from_integration["spans"][0]["origin"] - == "auto.grpc.grpc.TestService" - ) # manually created in TestService, not the instrumentation - - if span_streaming: - assert spans[3]["attributes"]["sentry.origin"] == "manual" - assert spans[2]["attributes"]["sentry.origin"] == "auto.grpc.grpc" - else: - assert custom_transaction["contexts"]["trace"]["origin"] == "manual" - assert custom_transaction["spans"][0]["origin"] == "auto.grpc.grpc" - - -class TestService(gRPCTestServiceServicer): - events = [] - - @staticmethod - def TestServe(request, context): # noqa: N802 - with sentry_sdk.traces.start_span( - name="test", - attributes = { - "sentry.op": "test", - "sentry.origin": "auto.grpc.grpc.TestService", - } - ): - pass - - return gRPCTestMessage(text=request.text) - - @staticmethod - def TestUnaryStream(request, context): # noqa: N802 - for _ in range(3): - yield gRPCTestMessage(text=request.text) - - @staticmethod - def TestStreamStream(request, context): # noqa: N802 - for r in request: - yield r - - @staticmethod - def TestStreamUnary(request, context): # noqa: N802 - requests = [r for r in request] - return requests.pop() diff --git a/tests/integrations/grpc/test_grpc_aio.py.err b/tests/integrations/grpc/test_grpc_aio.py.err deleted file mode 100644 index a09f3126e6..0000000000 --- a/tests/integrations/grpc/test_grpc_aio.py.err +++ /dev/null @@ -1,557 +0,0 @@ -import asyncio - -import grpc -import pytest -import pytest_asyncio -from unittest import mock - -import sentry_sdk -from sentry_sdk import start_span, start_transaction -from sentry_sdk.consts import OP -from sentry_sdk.integrations.grpc import GRPCIntegration -from tests.conftest import ApproxDict -from tests.integrations.grpc.grpc_test_service_pb2 import gRPCTestMessage -from tests.integrations.grpc.grpc_test_service_pb2_grpc import ( - add_gRPCTestServiceServicer_to_server, - gRPCTestServiceServicer, - gRPCTestServiceStub, -) - -from typing import Optional - - -@pytest_asyncio.fixture(scope="function") -async def grpc_server_and_channel(sentry_init): - """ - Creates an async gRPC server and a channel connected to it. - Returns both for use in tests, and cleans up afterward. - """ - channel: "Optional[grpc.aio.Channel]" = None - server: "Optional[grpc.aio.Server]" = None - async def inner(span_streaming: bool): - nonlocal server, channel - - sentry_init( - traces_sample_rate=1.0, - integrations=[GRPCIntegration()], - _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}, - ) - - # Create server - server = grpc.aio.server() - - # Let gRPC choose a free port instead of hardcoding it - port = server.add_insecure_port("[::]:0") - - # Add service implementation - add_gRPCTestServiceServicer_to_server(TestService, server) - - # Start the server - await asyncio.create_task(server.start()) - - # Create channel connected to our server - channel = grpc.aio.insecure_channel(f"localhost:{port}") # noqa: E231 - - return server, channel - - try: - yield inner - finally: - if channel is not None: - await channel.close() - - if server is not None: - await server.stop(None) - - -@pytest.mark.asyncio -@pytest.mark.parametrize("span_streaming", [True, False]) -async def test_noop_for_unimplemented_method( - sentry_init, - capture_events, - capture_items, - span_streaming, -): - if span_streaming: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()], _experiments={"trace_lifecycle": "stream" if span_streaming else "static"}) - else: - sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - - # Create empty server with no services - server = grpc.aio.server() - port = server.add_insecure_port("[::]:0") # Let gRPC choose a free port - await asyncio.create_task(server.start()) - - if span_streaming: - items = capture_items("event", "transaction", "span") - else: - events = capture_events() - - try: - async with grpc.aio.insecure_channel( - f"localhost:{port}" # noqa: E231 - ) as channel: - stub = gRPCTestServiceStub(channel) - with pytest.raises(grpc.RpcError) as exc: - await stub.TestServe(gRPCTestMessage(text="test")) - assert exc.value.details() == "Method not found!" - finally: - await server.stop(None) - - if span_streaming: - sentry_sdk.flush() - spans = [item.payload for item in items] - assert len(spans) == 1 - else: - assert not events - - -@pytest.mark.asyncio -@pytest.mark.parametrize("span_streaming", [True, False]) -async def test_grpc_server_starts_transaction( - grpc_server_and_channel, - capture_events, - capture_items, - span_streaming, -): - if span_streaming: - _, channel = await grpc_server_and_channel(span_streaming) - items = capture_items("span") - else: - _, channel = grpc_server_and_channel - events = capture_events() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - await stub.TestServe(gRPCTestMessage(text="test")) - - if span_streaming: - sentry_sdk.flush() - spans = [item.payload for item in items if item.type == "span"] - span = spans[0] - - assert spans[1]["attributes"]["sentry.span.source"] == "custom" - assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER - assert span["attributes"]["sentry.op"] == "test" - else: - (event,) = events - span = event["spans"][0] - - assert event["type"] == "transaction" - assert event["transaction_info"] == { - "source": "custom", - } - assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER - assert span["op"] == "test" - - -@pytest.mark.asyncio -@pytest.mark.parametrize("span_streaming", [True, False]) -async def test_grpc_server_continues_transaction( - grpc_server_and_channel, - capture_events, - capture_items, - span_streaming, -): - if span_streaming: - _, channel = await grpc_server_and_channel(span_streaming) - items = capture_items("span") - else: - _, channel = grpc_server_and_channel - events = capture_events() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - - if span_streaming: - with sentry_sdk.traces.start_span(name="custom parent") as segment_span: - metadata = ( - ( - "baggage", - "sentry-trace_id={trace_id},sentry-environment=test," - "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( - trace_id=segment_span.trace_id - ), - ), - ( - "sentry-trace", - "{trace_id}-{parent_span_id}-{sampled}".format( - trace_id=segment_span.trace_id, - parent_span_id=segment_span.span_id, - sampled=1, - ), - ), - ) - else: - with sentry_sdk.start_transaction() as transaction: - metadata = ( - ( - "baggage", - "sentry-trace_id={trace_id},sentry-environment=test," - "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( - trace_id=transaction.trace_id - ), - ), - ( - "sentry-trace", - "{trace_id}-{parent_span_id}-{sampled}".format( - trace_id=transaction.trace_id, - parent_span_id=transaction.span_id, - sampled=1, - ), - ), - ) - - await stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) - - if span_streaming: - sentry_sdk.flush() - spans = [item.payload for item in items if item.type == "span"] - span = spans[0] - - assert spans[1]["attributes"]["sentry.span.source"] == "custom" - assert spans[1]["attributes"]["sentry.op"] == OP.GRPC_SERVER - assert spans[1]["trace_id"] == segment_span.trace_id - assert span["attributes"]["sentry.op"] == "test" - else: - (event, _) = events - span = event["spans"][0] - - assert event["type"] == "transaction" - assert event["transaction_info"] == { - "source": "custom", - } - assert event["contexts"]["trace"]["op"] == OP.GRPC_SERVER - assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id - assert span["op"] == "test" - - -@pytest.mark.asyncio -@pytest.mark.parametrize("span_streaming", [True, False]) -async def test_grpc_server_exception( - grpc_server_and_channel, - capture_events, - capture_items, - span_streaming, -): - if span_streaming: - _, channel = await grpc_server_and_channel(span_streaming) - items = capture_items("TestService.TestException", "event", "grpc") - else: - _, channel = grpc_server_and_channel - events = capture_events() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - try: - await stub.TestServe(gRPCTestMessage(text="exception")) - raise AssertionError() - except Exception: - pass - - if span_streaming: - (event, ) = (item.payload for item in items if item.type == "event") - else: - (event, _) = events - - assert event["exception"]["values"][0]["type"] == "TestService.TestException" - assert event["exception"]["values"][0]["value"] == "test" - assert event["exception"]["values"][0]["mechanism"]["handled"] is False - assert event["exception"]["values"][0]["mechanism"]["type"] == "grpc" - - -@pytest.mark.asyncio -async def test_grpc_server_abort(grpc_server_and_channel, capture_events): - _, channel = grpc_server_and_channel - events = capture_events() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - try: - await stub.TestServe(gRPCTestMessage(text="abort")) - raise AssertionError() - except Exception: - pass - - # Add a small delay to allow events to be collected - await asyncio.sleep(0.1) - - assert len(events) == 1 - - -@pytest.mark.asyncio -@pytest.mark.parametrize("span_streaming", [True, False]) -async def test_grpc_client_starts_span( - grpc_server_and_channel, - capture_events_forksafe, - capture_items_forksafe, - span_streaming, -): - if span_streaming: - _, channel = await grpc_server_and_channel(span_streaming) - items = capture_items_forksafe("span") - else: - _, channel = grpc_server_and_channel - events = capture_events_forksafe() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - if span_streaming: - with sentry_sdk.traces.start_span(name="custom parent") as span: - await stub.TestServe(gRPCTestMessage(text="test")) - - sentry_sdk.flush() - items.write_file.close() - items = items.read_event() - spans = [item["payload"] for item in items if item["type"] == "span"] - span = spans[2] - - assert len(spans) == 4 - assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT - assert ( - span["name"] - == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" - ) - assert span["attributes"] == ApproxDict( - { - "type": "unary unary", - "rpc.method": "/grpc_test_server.gRPCTestService/TestServe", - "sentry.environment": mock.ANY, - "sentry.op": "grpc.client", - "sentry.origin": "auto.grpc.grpc", - "sentry.release": mock.ANY, - "sentry.sdk.name": "sentry.python", - "sentry.sdk.version": mock.ANY, - "sentry.segment.id": mock.ANY, - "sentry.segment.name": "custom parent", - "server.address": mock.ANY, - "thread.id": mock.ANY, - "thread.name": mock.ANY, - "code": "OK", - } - ) - else: - with start_transaction(): - await stub.TestServe(gRPCTestMessage(text="test")) - - events.write_file.close() - events.read_event() - local_transaction = events.read_event() - span = local_transaction["spans"][0] - - assert len(local_transaction["spans"]) == 1 - assert span["op"] == OP.GRPC_CLIENT - assert ( - span["description"] - == "unary unary call to /grpc_test_server.gRPCTestService/TestServe" - ) - assert span["data"] == ApproxDict( - { - "type": "unary unary", - "method": "/grpc_test_server.gRPCTestService/TestServe", - "code": "OK", - } - ) - - -@pytest.mark.asyncio -@pytest.mark.parametrize("span_streaming", [True, False]) -async def test_grpc_client_unary_stream_starts_span( - grpc_server_and_channel, - capture_events_forksafe, - capture_items_forksafe, - span_streaming, -): - if span_streaming: - _, channel = await grpc_server_and_channel(span_streaming) - items = capture_items_forksafe("span") - else: - _, channel = grpc_server_and_channel - events = capture_events_forksafe() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - if span_streaming: - with sentry_sdk.traces.start_span(name="custom parent"): - else: - with start_transaction(): - response = stub.TestUnaryStream(gRPCTestMessage(text="test")) - [_ async for _ in response] - - if span_streaming: - sentry_sdk.flush() - items.write_file.close() - items = items.read_event() - spans = [item["payload"] for item in items if item["type"] == "span"] - span = spans[0] - - assert len(spans) == 2 - assert span["attributes"]["sentry.op"] == OP.GRPC_CLIENT - assert ( - span["name"] - == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" - ) - assert span["attributes"] == ApproxDict( - { - "type": "unary stream", - "rpc.method": "/grpc_test_server.gRPCTestService/TestUnaryStream", - "sentry.environment": mock.ANY, - "sentry.op": "grpc.client", - "sentry.origin": "auto.grpc.grpc", - "sentry.release": mock.ANY, - "sentry.sdk.name": "sentry.python", - "sentry.sdk.version": mock.ANY, - "sentry.segment.id": mock.ANY, - "sentry.segment.name": "custom parent", - "server.address": mock.ANY, - "thread.id": mock.ANY, - "thread.name": mock.ANY, - } - ) - else: - events.write_file.close() - local_transaction = events.read_event() - span = local_transaction["spans"][0] - - assert len(local_transaction["spans"]) == 1 - assert span["op"] == OP.GRPC_CLIENT - assert ( - span["description"] - == "unary stream call to /grpc_test_server.gRPCTestService/TestUnaryStream" - ) - assert span["data"] == ApproxDict( - { - "type": "unary stream", - "method": "/grpc_test_server.gRPCTestService/TestUnaryStream", - } - ) - - -@pytest.mark.asyncio -async def test_stream_stream(grpc_server_and_channel): - """ - Test to verify stream-stream works. - Tracing not supported for it yet. - """ - _, channel = grpc_server_and_channel - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - response = stub.TestStreamStream((gRPCTestMessage(text="test"),)) - async for r in response: - assert r.text == "test" - - -@pytest.mark.asyncio -async def test_stream_unary(grpc_server_and_channel): - """ - Test to verify stream-stream works. - Tracing not supported for it yet. - """ - _, channel = grpc_server_and_channel - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - response = await stub.TestStreamUnary((gRPCTestMessage(text="test"),)) - assert response.text == "test" - - -@pytest.mark.asyncio -@pytest.mark.parametrize("span_streaming", [True, False]) -async def test_span_origin( - grpc_server_and_channel, - capture_events_forksafe, - capture_items_forksafe, - span_streaming, -): - if span_streaming: - _, channel = await grpc_server_and_channel(span_streaming) - items = capture_items_forksafe("span") - else: - _, channel = grpc_server_and_channel - events = capture_events_forksafe() - - # Use the provided channel - stub = gRPCTestServiceStub(channel) - if span_streaming: - with sentry_sdk.traces.start_span(name="custom parent"): - await stub.TestServe(gRPCTestMessage(text="test")) - - sentry_sdk.flush() - items.write_file.close() - items = items.read_event() - - spans = [item["payload"] for item in items if item["type"] == "span"] - - assert ( - spans[1]["attributes"]["sentry.origin"] == "auto.grpc.grpc" - ) - assert ( - spans[0]["attributes"]["sentry.origin"] - == "auto.grpc.grpc.TestService.aio" - ) # manually created in TestService, not the instrumentation - else: - with start_transaction(name="custom_transaction"): - await stub.TestServe(gRPCTestMessage(text="test")) - - events.write_file.close() - - transaction_from_integration = events.read_event() - custom_transaction = events.read_event() - - assert ( - transaction_from_integration["contexts"]["trace"]["origin"] == "auto.grpc.grpc" - ) - assert ( - transaction_from_integration["spans"][0]["origin"] - == "auto.grpc.grpc.TestService.aio" - ) # manually created in TestService, not the instrumentation - - if span_streaming: - assert spans[3]["attributes"]["sentry.origin"] == "manual" - assert spans[2]["attributes"]["sentry.origin"] == "auto.grpc.grpc" - else: - assert custom_transaction["contexts"]["trace"]["origin"] == "manual" - assert custom_transaction["spans"][0]["origin"] == "auto.grpc.grpc" - - -class TestService(gRPCTestServiceServicer): - class TestException(Exception): - __test__ = False - - def __init__(self): - super().__init__("test") - - @classmethod - async def TestServe(cls, request, context): # noqa: N802 - with sentry_sdk.traces.start_span( - name="test", - attributes = { - "sentry.op": "test", - "sentry.origin": "auto.grpc.grpc.TestService.aio", - }, - ): - pass - - if request.text == "exception": - raise cls.TestException() - - if request.text == "abort": - await context.abort(grpc.StatusCode.ABORTED, "Aborted!") - - return gRPCTestMessage(text=request.text) - - @classmethod - async def TestUnaryStream(cls, request, context): # noqa: N802 - for _ in range(3): - yield gRPCTestMessage(text=request.text) - - @classmethod - async def TestStreamStream(cls, request, context): # noqa: N802 - async for r in request: - yield r - - @classmethod - async def TestStreamUnary(cls, request, context): # noqa: N802 - requests = [r async for r in request] - return requests.pop() From 3c262793da99609f136f09ee246cd87ccaca57fd Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 5 May 2026 14:09:47 +0200 Subject: [PATCH 3/7] add fixture --- tests/conftest.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 8e7a2ffec6..8b118d62b1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -427,6 +427,52 @@ def flush(timeout=None, callback=None): return inner +@pytest.fixture +def capture_items_forksafe(monkeypatch, capture_items, request): + def inner(*types): + capture_items(*types) + + items_r, items_w = os.pipe() + items_r = os.fdopen(items_r, "rb", 0) + items_w = os.fdopen(items_w, "wb", 0) + + test_client = sentry_sdk.get_client() + old_capture_envelope = test_client.transport.capture_envelope + + telemetry = [] + + def append(envelope): + for item in envelope: + if types and item.type not in types: + continue + + if item.type in ("metric", "log", "span"): + for i in item.payload.json["items"]: + t = {k: v for k, v in i.items() if k != "attributes"} + t["attributes"] = { + k: v["value"] for k, v in i["attributes"].items() + } + telemetry.append({"type": item.type, "payload": t}) + else: + telemetry.append({"type": item.type, "payload": item.payload.json}) + + return old_capture_envelope(envelope) + + real_flush = test_client.flush + + def flush(timeout=None, callback=None): + real_flush(timeout=timeout, callback=callback) + items_w.write(json.dumps(telemetry).encode("utf-8")) + items_w.write(b"\n") + + monkeypatch.setattr(test_client.transport, "capture_envelope", append) + monkeypatch.setattr(test_client, "flush", flush) + + return EventStreamReader(items_r, items_w) + + return inner + + class EventStreamReader: def __init__(self, read_file, write_file): self.read_file = read_file From 3deb7975964453f56511b4377320b9b7490a8359 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 5 May 2026 14:15:22 +0200 Subject: [PATCH 4/7] avoid redefinition --- sentry_sdk/integrations/grpc/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/grpc/client.py b/sentry_sdk/integrations/grpc/client.py index ef2710af5c..1e55ef7ba8 100644 --- a/sentry_sdk/integrations/grpc/client.py +++ b/sentry_sdk/integrations/grpc/client.py @@ -84,6 +84,7 @@ def intercept_unary_stream( client = sentry_sdk.get_client() span_streaming = has_span_streaming_enabled(client.options) + response: "UnaryStreamCall" if span_streaming: with sentry_sdk.traces.start_span( name="unary stream call to %s" % method, @@ -101,7 +102,7 @@ def intercept_unary_stream( ) ) - response: "UnaryStreamCall" = continuation(client_call_details, request) + response = continuation(client_call_details, request) # Setting code on unary-stream leads to execution getting stuck # span.set_data("code", response.code().name) @@ -121,7 +122,7 @@ def intercept_unary_stream( ) ) - response: "UnaryStreamCall" = continuation(client_call_details, request) + response = continuation(client_call_details, request) # Setting code on unary-stream leads to execution getting stuck # span.set_data("code", response.code().name) From 42e65e7d9dd69822d99b045be2a09ebdfe441903 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 5 May 2026 17:54:58 +0200 Subject: [PATCH 5/7] change attributes --- sentry_sdk/integrations/grpc/aio/client.py | 10 +++------- sentry_sdk/integrations/grpc/client.py | 12 ++++++------ tests/integrations/grpc/test_grpc.py | 18 +++++++----------- tests/integrations/grpc/test_grpc_aio.py | 4 +--- 4 files changed, 17 insertions(+), 27 deletions(-) diff --git a/sentry_sdk/integrations/grpc/aio/client.py b/sentry_sdk/integrations/grpc/aio/client.py index 58ce4e4430..99079314ec 100644 --- a/sentry_sdk/integrations/grpc/aio/client.py +++ b/sentry_sdk/integrations/grpc/aio/client.py @@ -1,7 +1,7 @@ from typing import Callable, Union, AsyncIterable, Any import sentry_sdk -from sentry_sdk.consts import OP +from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN from sentry_sdk.tracing_utils import has_span_streaming_enabled @@ -61,7 +61,6 @@ async def intercept_unary_unary( "sentry.origin": SPAN_ORIGIN, }, ) as span: - span.set_attribute("type", "unary unary") span.set_attribute("rpc.method", method.decode()) client_call_details = ( @@ -72,7 +71,7 @@ async def intercept_unary_unary( response = await continuation(client_call_details, request) status_code = await response.code() - span.set_attribute("code", status_code.name) + span.set_attribute(SPANDATA.RPC_RESPONSE_STATUS_CODE, status_code.name) return response else: @@ -120,8 +119,7 @@ async def intercept_unary_stream( "sentry.origin": SPAN_ORIGIN, }, ) as span: - span.set_attribute("type", "unary stream") - span.set_attribute("rpc.method", method.decode()) + span.set_attribute(SPANDATA.RPC_METHOD, method.decode()) client_call_details = ( self._update_client_call_details_metadata_from_scope( @@ -130,8 +128,6 @@ async def intercept_unary_stream( ) response = await continuation(client_call_details, request) - # status_code = await response.code() - # span.set_data("code", status_code) return response else: diff --git a/sentry_sdk/integrations/grpc/client.py b/sentry_sdk/integrations/grpc/client.py index 1e55ef7ba8..a0683ae5fd 100644 --- a/sentry_sdk/integrations/grpc/client.py +++ b/sentry_sdk/integrations/grpc/client.py @@ -1,5 +1,5 @@ import sentry_sdk -from sentry_sdk.consts import OP +from sentry_sdk.consts import OP, SPANDATA from sentry_sdk.integrations import DidNotEnable from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN from sentry_sdk.tracing_utils import has_span_streaming_enabled @@ -41,8 +41,7 @@ def intercept_unary_unary( "sentry.origin": SPAN_ORIGIN, }, ) as span: - span.set_attribute("type", "unary unary") - span.set_attribute("method", method) + span.set_attribute(SPANDATA.RPC_METHOD, method) client_call_details = ( self._update_client_call_details_metadata_from_scope( @@ -51,7 +50,9 @@ def intercept_unary_unary( ) response = continuation(client_call_details, request) - span.set_attribute("code", response.code().name) + span.set_attribute( + SPANDATA.RPC_RESPONSE_STATUS_CODE, response.code().name + ) return response else: @@ -93,8 +94,7 @@ def intercept_unary_stream( "sentry.origin": SPAN_ORIGIN, }, ) as span: - span.set_attribute("type", "unary stream") - span.set_attribute("method", method) + span.set_attribute(SPANDATA.RPC_METHOD, method) client_call_details = ( self._update_client_call_details_metadata_from_scope( diff --git a/tests/integrations/grpc/test_grpc.py b/tests/integrations/grpc/test_grpc.py index 92a6c195e7..47b732f31c 100644 --- a/tests/integrations/grpc/test_grpc.py +++ b/tests/integrations/grpc/test_grpc.py @@ -308,8 +308,7 @@ def test_grpc_client_starts_span( ) assert span["attributes"] == ApproxDict( { - "type": "unary unary", - "method": "/grpc_test_server.gRPCTestService/TestServe", + "rpc.method": "/grpc_test_server.gRPCTestService/TestServe", "sentry.environment": mock.ANY, "sentry.op": "grpc.client", "sentry.origin": "auto.grpc.grpc", @@ -321,7 +320,7 @@ def test_grpc_client_starts_span( "server.address": mock.ANY, "thread.id": mock.ANY, "thread.name": mock.ANY, - "code": "OK", + "rpc.response.status_code": "OK", } ) else: @@ -393,8 +392,7 @@ def test_grpc_client_unary_stream_starts_span( ) assert span["attributes"] == ApproxDict( { - "type": "unary stream", - "method": "/grpc_test_server.gRPCTestService/TestUnaryStream", + "rpc.method": "/grpc_test_server.gRPCTestService/TestUnaryStream", "sentry.environment": mock.ANY, "sentry.op": "grpc.client", "sentry.origin": "auto.grpc.grpc", @@ -488,8 +486,7 @@ def test_grpc_client_other_interceptor( ) assert span["attributes"] == ApproxDict( { - "type": "unary unary", - "method": "/grpc_test_server.gRPCTestService/TestServe", + "rpc.method": "/grpc_test_server.gRPCTestService/TestServe", "sentry.environment": mock.ANY, "sentry.op": "grpc.client", "sentry.origin": "auto.grpc.grpc", @@ -501,7 +498,7 @@ def test_grpc_client_other_interceptor( "server.address": mock.ANY, "thread.id": mock.ANY, "thread.name": mock.ANY, - "code": "OK", + "rpc.response.status_code": "OK", } ) else: @@ -576,9 +573,8 @@ def test_prevent_dual_client_interceptor( ) assert span["attributes"] == ApproxDict( { - "type": "unary unary", - "method": "/grpc_test_server.gRPCTestService/TestServe", - "code": "OK", + "rpc.method": "/grpc_test_server.gRPCTestService/TestServe", + "rpc.response.status_code": "OK", } ) else: diff --git a/tests/integrations/grpc/test_grpc_aio.py b/tests/integrations/grpc/test_grpc_aio.py index eea5916005..de71dc2456 100644 --- a/tests/integrations/grpc/test_grpc_aio.py +++ b/tests/integrations/grpc/test_grpc_aio.py @@ -333,7 +333,6 @@ async def test_grpc_client_starts_span( ) assert span["attributes"] == ApproxDict( { - "type": "unary unary", "rpc.method": "/grpc_test_server.gRPCTestService/TestServe", "sentry.environment": mock.ANY, "sentry.op": "grpc.client", @@ -346,7 +345,7 @@ async def test_grpc_client_starts_span( "server.address": mock.ANY, "thread.id": mock.ANY, "thread.name": mock.ANY, - "code": "OK", + "rpc.response.status_code": "OK", } ) else: @@ -409,7 +408,6 @@ async def test_grpc_client_unary_stream_starts_span( ) assert span["attributes"] == ApproxDict( { - "type": "unary stream", "rpc.method": "/grpc_test_server.gRPCTestService/TestUnaryStream", "sentry.environment": mock.ANY, "sentry.op": "grpc.client", From 049f1de37cf8c31e09534a9dd08a22887a8216d0 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 5 May 2026 18:00:13 +0200 Subject: [PATCH 6/7] add spandata --- sentry_sdk/consts.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 6135f49720..d2b4cd89af 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -893,6 +893,18 @@ class SPANDATA: Example: "5249fbada8d5416482c2f6e47e337372" """ + RPC_METHOD = "rpc.method" + """ + The fully-qualified logical name of the method from the RPC interface perspective. + Example: "com.example.ExampleService/exampleMethod" + """ + + RPC_RESPONSE_STATUS_CODE = "rpc.response.status_code" + """ + Status code of the RPC returned by the RPC server or generated by the client. + Example: "DEADLINE_EXCEEDED" + """ + SERVER_ADDRESS = "server.address" """ Name of the database host. From 71c3619949260706407cdd8ea71fa74111d38da8 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 5 May 2026 18:48:36 +0200 Subject: [PATCH 7/7] use const instead of string --- sentry_sdk/integrations/grpc/aio/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/grpc/aio/client.py b/sentry_sdk/integrations/grpc/aio/client.py index 99079314ec..25faaf05ed 100644 --- a/sentry_sdk/integrations/grpc/aio/client.py +++ b/sentry_sdk/integrations/grpc/aio/client.py @@ -61,7 +61,7 @@ async def intercept_unary_unary( "sentry.origin": SPAN_ORIGIN, }, ) as span: - span.set_attribute("rpc.method", method.decode()) + span.set_attribute(SPANDATA.RPC_METHOD, method.decode()) client_call_details = ( self._update_client_call_details_metadata_from_scope(