Skip to content

Add worker resource and queue metrics to OTel instrumentation#613

Open
Mohammed0tarek wants to merge 3 commits intotaskiq-python:masterfrom
Mohammed0tarek:observability/extra_open_telemetry_metrics
Open

Add worker resource and queue metrics to OTel instrumentation#613
Mohammed0tarek wants to merge 3 commits intotaskiq-python:masterfrom
Mohammed0tarek:observability/extra_open_telemetry_metrics

Conversation

@Mohammed0tarek
Copy link
Copy Markdown

What this adds

This extends the existing OpenTelemetryMiddleware with four new metrics that give
visibility into what a worker process is doing at runtime, not just what tasks it has
finished.

Worker resource utilization (observable gauges, worker process only):

  • worker_cpu_utilization — CPU usage percentage of the worker process
  • worker_memory_utilization — RSS memory in bytes

Both gauges are gaurded by broker.is_worker_process so they stay silent in
client/producer processes.

Worker queue depth (UpDownCounters):

  • worker_active_tasks — number of tasks currently executing, incremented in
    pre_execute and decremented in post_execute. Carries a task_name attribute so
    you can see per-task concurrency.
  • worker_prefetched_tasks — number of tasks sitting in the internal prefetch queue
    (fetched from the broker, waiting for a worker slot). Driven by two new hooks,
    on_prefetch_queue_add and on_prefetch_queue_remove, called from the Receiver at
    the right points — after a real message is enqueued, and after it is dequeued but
    only once the QUEUE_DONE sentinel is handled so it never fires on shutdown signals.

Why UpDownCounter for the queue metrics

I didn't want to mess with the queue definition (they are local variables) so gauges where not the right thing
since they require polling and they are not an attribute in the receiver class.

Tests

Added tests for all four metrics to tests/opentelemetry/test_metrics.py:

  • test_active_tasks_counter — verifies net-zero value after task completion and
    task_name attribute correctness
  • test_prefetch_queue_counter — calls the hooks directly (the Receiver is not
    exercised by InMemoryBroker.kiq()) and asserts the counter value
  • test_worker_resource_metrics_when_worker_process — sets is_worker_process = True and
    triggers an OTel collection cycle to verify the gauges yield observations.
    If feel like this is a slight workaround but I hope it is fine.
  • test_metrics_exist updated to include worker_active_tasks

Add counters and histograms to OpenTelemetryMiddleware:
- tasks_sent: producer-side counter per task name
- task_success / task_errors: consumer-side counters with retry_error attribute
- task_execution_time: histogram using result.execution_time
- task_wait_time: histogram measuring queue time from send to receive via UTC timestamps in labels

Add tests covering all instruments, retry_error attribute paths, and queue time correctness.
- Add worker_active_tasks UpDownCounter driven by pre/post_execute hooks
- Add worker_prefetched_tasks UpDownCounter via on_prefetch_queue_add/remove hooks in receiver
- Add worker_cpu_utilization and worker_memory_utilization observable gauges (worker process only)
- Add tests for all new metrics
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant