-
Notifications
You must be signed in to change notification settings - Fork 7
release: 0.11.0 #343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
release: 0.11.0 #343
Changes from all commits
76e0299
374dd9a
ce99568
781fa60
d807d4a
1a823ac
2f4f8ff
09ccff7
481fc64
751e6e2
7d52c8b
dc26b20
d3d2fd3
604484a
fb2b652
f9e100d
d2c3873
b4ac845
1f81349
c020912
8c93c46
d458944
50e7c83
6627005
a0ff612
3ef6804
a3d0650
6ba71ee
4d15e72
34d770c
4218e01
3b8adfd
a79813a
8d80f94
b8535df
d47d89a
6b3f278
2a0e1a4
888918e
e3f1889
130d61a
d750e09
168b21f
406192a
082a01a
ad0faaf
7c918db
01492fe
c6f56f7
0d318ad
eda66f5
91db92c
6b4dd25
c1e7675
b09749b
8921cc6
bf74fd7
5aa039f
5a8ec93
65af241
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,3 @@ | ||
| { | ||
| ".": "0.10.4" | ||
| ".": "0.11.0" | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| configured_endpoints: 45 | ||
| openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/sgp/agentex-sdk-c108a179582f0e0c6d479ea4b3bc6310a83693987073967c2b6203df23718eb2.yml | ||
| openapi_spec_hash: 53b8e5866709af71bef94816b8ede38b | ||
| openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/sgp/agentex-sdk-307ea66bdd28f83ddc0c526365cfe06f4c1bb2fd421d19f6ebb7f687d06f9ee6.yml | ||
| openapi_spec_hash: 5bbd18a405a11e8497d38a5a88b98018 | ||
| config_hash: fb079ef7936611b032568661b8165f19 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| # File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. | ||
|
|
||
| __title__ = "agentex" | ||
| __version__ = "0.10.4" # x-release-please-version | ||
| __version__ = "0.11.0" # x-release-please-version |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from typing import override | ||
|
|
||
| import scale_gp_beta.lib.tracing as tracing | ||
|
|
@@ -125,48 +127,64 @@ def _add_source_to_span(self, span: Span) -> None: | |
|
|
||
| @override | ||
| async def on_span_start(self, span: Span) -> None: | ||
| self._add_source_to_span(span) | ||
| sgp_span = create_span( | ||
| name=span.name, | ||
| span_type=_get_span_type(span), | ||
| span_id=span.id, | ||
| parent_id=span.parent_id, | ||
| trace_id=span.trace_id, | ||
| input=span.input, | ||
| output=span.output, | ||
| metadata=span.data, | ||
| ) | ||
| sgp_span.start_time = span.start_time.isoformat() # type: ignore[union-attr] | ||
| await self.on_spans_start([span]) | ||
|
|
||
| @override | ||
| async def on_span_end(self, span: Span) -> None: | ||
| await self.on_spans_end([span]) | ||
|
|
||
| @override | ||
| async def on_spans_start(self, spans: list[Span]) -> None: | ||
| if not spans: | ||
| return | ||
|
|
||
| sgp_spans: list[SGPSpan] = [] | ||
| for span in spans: | ||
| self._add_source_to_span(span) | ||
| sgp_span = create_span( | ||
| name=span.name, | ||
| span_type=_get_span_type(span), | ||
| span_id=span.id, | ||
| parent_id=span.parent_id, | ||
| trace_id=span.trace_id, | ||
| input=span.input, | ||
| output=span.output, | ||
| metadata=span.data, | ||
| ) | ||
| sgp_span.start_time = span.start_time.isoformat() # type: ignore[union-attr] | ||
| self._spans[span.id] = sgp_span | ||
| sgp_spans.append(sgp_span) | ||
|
|
||
| if self.disabled: | ||
| logger.warning("SGP is disabled, skipping span upsert") | ||
| return | ||
| # TODO(AGX1-198): Batch multiple spans into a single upsert_batch call | ||
| # instead of one span per HTTP request. | ||
| # https://linear.app/scale-epd/issue/AGX1-198/actually-use-sgp-batching-for-spans | ||
| await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr] | ||
| items=[sgp_span.to_request_params()] | ||
| items=[s.to_request_params() for s in sgp_spans] | ||
| ) | ||
|
Comment on lines
+154
to
163
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Prompt To Fix With AIThis is a comment left during a code review.
Path: src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py
Line: 154-163
Comment:
**`shutdown()` crashes with `AttributeError` when `disabled=True` and spans are in-flight**
`on_spans_start` now populates `self._spans` (line 155) **before** the `if self.disabled: return` guard (line 158). If any spans are started but not yet ended when `shutdown()` is called in disabled mode, it reaches `self.sgp_async_client.spans.upsert_batch(...)` where `self.sgp_async_client` is `None`, triggering an `AttributeError`. Before this PR the disabled path returned before populating `_spans`, so `_spans` was always empty at shutdown time and this was never triggered in practice. The fix is to either move the `self._spans[span.id] = sgp_span` assignment after the `if self.disabled` guard, or add an early `if self.disabled: return` check at the top of `shutdown()` (mirroring how `on_spans_end` handles it at line 184).
How can I resolve this? If you propose a fix, please make it concise. |
||
|
|
||
| self._spans[span.id] = sgp_span | ||
|
|
||
| @override | ||
| async def on_span_end(self, span: Span) -> None: | ||
| sgp_span = self._spans.pop(span.id, None) | ||
| if sgp_span is None: | ||
| logger.warning(f"Span {span.id} not found in stored spans, skipping span end") | ||
| async def on_spans_end(self, spans: list[Span]) -> None: | ||
| if not spans: | ||
| return | ||
|
|
||
| self._add_source_to_span(span) | ||
| sgp_span.input = span.input # type: ignore[assignment] | ||
| sgp_span.output = span.output # type: ignore[assignment] | ||
| sgp_span.metadata = span.data # type: ignore[assignment] | ||
| sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr] | ||
|
|
||
| if self.disabled: | ||
| to_upsert: list[SGPSpan] = [] | ||
| for span in spans: | ||
| sgp_span = self._spans.pop(span.id, None) | ||
| if sgp_span is None: | ||
| logger.warning(f"Span {span.id} not found in stored spans, skipping span end") | ||
| continue | ||
|
|
||
| self._add_source_to_span(span) | ||
| sgp_span.input = span.input # type: ignore[assignment] | ||
| sgp_span.output = span.output # type: ignore[assignment] | ||
| sgp_span.metadata = span.data # type: ignore[assignment] | ||
| sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr] | ||
| to_upsert.append(sgp_span) | ||
|
|
||
| if self.disabled or not to_upsert: | ||
| return | ||
| await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr] | ||
| items=[sgp_span.to_request_params()] | ||
| items=[s.to_request_params() for s in to_upsert] | ||
| ) | ||
|
|
||
| @override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -95,29 +95,40 @@ async def _drain_loop(self) -> None: | |
|
|
||
| @staticmethod | ||
| async def _process_items(items: list[_SpanQueueItem]) -> None: | ||
| """Process a list of span events concurrently.""" | ||
| """Dispatch a batch of same-event-type items to each processor in one call. | ||
|
|
||
| async def _handle(item: _SpanQueueItem) -> None: | ||
| Groups spans by processor so each processor sees its full slice of the | ||
| drain batch at once. Processors that override the batched methods can | ||
| then send a single HTTP request per drain cycle instead of N. | ||
| """ | ||
| if not items: | ||
| return | ||
|
|
||
| event_type = items[0].event_type | ||
| assert all(i.event_type == event_type for i in items), ( | ||
| "_process_items requires all items to share the same event_type; " | ||
| "callers must split START and END batches before dispatching." | ||
| ) | ||
|
Comment on lines
+107
to
+111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The code comment correctly identifies this as a potential "silent data-corruption bug," but using Prompt To Fix With AIThis is a comment left during a code review.
Path: src/agentex/lib/core/tracing/span_queue.py
Line: 107-111
Comment:
**`assert` in production guard defeats data-corruption protection**
The code comment correctly identifies this as a potential "silent data-corruption bug," but using `assert` for the guard means it is silently stripped when Python runs with the `-O` (optimize) flag. If a caller ever passes a mixed-event-type list, START and END spans would be fed to the wrong batched method with no warning. Use an explicit `if/raise` instead.
How can I resolve this? If you propose a fix, please make it concise. |
||
| by_processor: dict[AsyncTracingProcessor, list[Span]] = {} | ||
| for item in items: | ||
| for p in item.processors: | ||
| by_processor.setdefault(p, []).append(item.span) | ||
|
|
||
| async def _handle(p: AsyncTracingProcessor, spans: list[Span]) -> None: | ||
| try: | ||
| if item.event_type == SpanEventType.START: | ||
| coros = [p.on_span_start(item.span) for p in item.processors] | ||
| if event_type == SpanEventType.START: | ||
| await p.on_spans_start(spans) | ||
| else: | ||
| coros = [p.on_span_end(item.span) for p in item.processors] | ||
| results = await asyncio.gather(*coros, return_exceptions=True) | ||
| for result in results: | ||
| if isinstance(result, Exception): | ||
| logger.error( | ||
| "Tracing processor error during %s for span %s", | ||
| item.event_type.value, | ||
| item.span.id, | ||
| exc_info=result, | ||
| ) | ||
| await p.on_spans_end(spans) | ||
| except Exception: | ||
| logger.exception( | ||
| "Unexpected error in span queue for span %s", item.span.id | ||
| "Tracing processor %s failed handling %d spans during %s", | ||
| type(p).__name__, | ||
| len(spans), | ||
| event_type.value, | ||
| ) | ||
|
|
||
| await asyncio.gather(*[_handle(item) for item in items]) | ||
| await asyncio.gather(*[_handle(p, spans) for p, spans in by_processor.items()]) | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Shutdown | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_spanspopulated before upsert β stale entries on HTTP failureSpans are added to
self._spansbefore theupsert_batchHTTP call (lines 155β156). If the batch upsert throws (network error, server 5xx), the exception is caught upstream by the queue's_handle, but_spansalready holds entries for spans whose start event was never delivered to SGP. A subsequenton_spans_endwill find those spans, update them, and send end-only upserts β orphaned end events with no matching start on the server.The old single-span code registered the span in
_spansonly after a successful upsert, so failures were cleanly skipped on the end path. Consider populating_spansonly after confirming the batch call succeeded, or rolling back entries on exception.Prompt To Fix With AI