Skip to content

feat(enterprise): add data drains for continuous export to S3 / webhook#4440

Open
waleedlatif1 wants to merge 20 commits intostagingfrom
waleedlatif1/data-drains
Open

feat(enterprise): add data drains for continuous export to S3 / webhook#4440
waleedlatif1 wants to merge 20 commits intostagingfrom
waleedlatif1/data-drains

Conversation

@waleedlatif1
Copy link
Copy Markdown
Collaborator

Summary

  • Continuously exports workflow logs, job logs, audit logs, copilot chats, and copilot runs to customer-owned S3 buckets or HTTPS webhooks on hourly or daily schedules
  • Pairs with data retention so customers can drain into long-term storage before Sim deletes
  • Built on two registries (DrainSource + DrainDestination) so future destinations are a single-file change
  • At-least-once delivery via opaque cursor that advances only on full success; consumers dedupe on stable row ids
  • SSRF-validated webhooks with DNS pinning, HMAC-SHA256 timestamp signatures, S3 server-side encryption, audit logging on every config and run change
  • Self-hosted gating via DATA_DRAINS_ENABLED / NEXT_PUBLIC_DATA_DRAINS_ENABLED, mirroring data retention

Type of Change

  • New feature

Testing

  • 26 unit tests passing (service, dispatcher, sources, S3, webhook)
  • bun run check:api-validation passing
  • Manually tested S3 + webhook end-to-end including failure paths and cursor replay

Checklist

  • Code follows project style guidelines
  • Self-reviewed my changes
  • Tests added/updated and passing
  • No new warnings introduced
  • I confirm that I have read and agree to the terms outlined in the Contributor License Agreement (CLA)

@vercel
Copy link
Copy Markdown

vercel Bot commented May 5, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
docs Ready Ready Preview, Comment May 5, 2026 5:50pm

Request Review

@cursor
Copy link
Copy Markdown

cursor Bot commented May 5, 2026

PR Summary

High Risk
Introduces new org-scoped export infrastructure (new tables, API routes, background jobs, and outbound network writes to S3/webhooks), which is security- and data-handling sensitive. Risk centers on authz gating, SSRF protections, and correct cursor/run-state handling under failures/retries.

Overview
Adds a new Enterprise “Data Drains” feature to continuously export workflow/job/audit/copilot data as NDJSON to either S3 or an HTTPS webhook, including a Settings UI to create/manage drains, run them manually, test destinations, and view recent run history.

Implements the backend: new data_drains/data_drain_runs schema + migration, org/admin-only API endpoints (create/list/get/update/delete, run now, test, list runs) with audit logging, a cron dispatcher (/api/cron/run-data-drains) that enqueues run-data-drain jobs, and a runner service that does chunked export with at-least-once semantics (cursor advances only on full success) and orphaned-run reaping.

Adds destination/source registries with initial implementations for S3 (deterministic keys, SSE AES256, optional endpoint SSRF validation) and webhook (DNS pinning + SSRF validation, HMAC-signed requests, retries/backoff, idempotency headers), plus new self-hosted feature flags (DATA_DRAINS_ENABLED / NEXT_PUBLIC_DATA_DRAINS_ENABLED) and Helm cron/env wiring, and extends secureFetch to support AbortSignal.

Reviewed by Cursor Bugbot for commit 57f6571. Configure here.

Data drains let enterprise organizations continuously export Sim data
(workflow logs, job logs, audit logs, copilot chats, copilot runs) to
customer-controlled S3 buckets or HTTPS webhooks on hourly or daily
schedules. Pairs with data retention to satisfy long-term compliance
archives.

Built around two registries (DrainSource + DrainDestination) so adding
new sources or destinations is a single-file change. Cursor-based
at-least-once delivery; cursor advances only on full success and rows
carry stable ids so consumers can dedupe.

Includes SSRF-validated webhooks with DNS pinning, HMAC-SHA256 timestamp
signatures, S3 server-side encryption, audit logging on every config
and run change, and self-hosted env var gating that mirrors data
retention.
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 5, 2026

Greptile Summary

This PR introduces a full data-drain pipeline for continuously exporting workflow logs, job logs, audit logs, copilot chats, and copilot runs to customer-owned S3 buckets or HTTPS webhooks on hourly/daily schedules. It includes two destination backends (S3 + webhook), five source generators with composite keyset cursors, a dispatcher with conditional atomic claiming, an orphan reaper, Trigger.dev background task integration, and SSRF-safe delivery with HMAC-SHA256 signatures.

  • Dispatcher & service: At-least-once delivery via cursor advancing only on full success; cancellation signal is respected post-loop and marks the run as failed; per-org enterprise checks are wrapped in try-catch to prevent one billing-API failure from aborting the entire batch.
  • Security: Webhook destination uses validateUrlWithDNS + IP pinning (secureFetchWithPinnedIP) to defeat DNS rebinding; S3 endpoint is SSRF-validated at both the schema (sync IP-literal check) and session-open (DNS check) layers; credentials are encrypted with AES-256-GCM before storage.
  • Access control: Feature-flag and enterprise-plan gates apply to both reads and writes; owner/admin role is required for all drain operations.

Confidence Score: 5/5

Safe to merge; the core delivery guarantees, security hardening, and access control gates are all correct.

This is a large, well-structured feature with thorough security controls. All findings are minor: one metrics counter undercount in the dispatcher, one hard-parse failure mode that could affect list responsiveness after a schema evolution, and a missing Trigger.dev maxDuration that could impede very large first-time backfills. None of these affect data integrity, security, or correctness for the typical enterprise drain workflow.

apps/sim/lib/data-drains/dispatcher.ts (skipped counter), apps/sim/lib/data-drains/serializers.ts (hard-parse list failure), apps/sim/background/run-data-drain.ts (no maxDuration)

Important Files Changed

Filename Overview
apps/sim/lib/data-drains/dispatcher.ts Dispatch loop logic is solid with conditional claim, concurrency key, and per-org enterprise check in try-catch. Minor gap: failed-enqueue drains increment neither dispatched nor skipped, so candidates − dispatched − skipped can be non-zero without explanation.
apps/sim/lib/data-drains/service.ts Correctly orchestrates drain runs: at-least-once delivery via cursor only advancing on full success, signal.aborted check post-loop marking cancellations as failed, and session always closed in finally.
apps/sim/lib/data-drains/destinations/webhook.ts SSRF-validated with DNS pinning, HMAC-SHA256 Stripe-style signatures, signal threaded into fetch, exponential backoff with jitter, and Retry-After parsing.
apps/sim/lib/data-drains/destinations/s3.ts Endpoint SSRF-validated at both schema level (sync IP-literal check) and runtime (DNS check via assertEndpointIsPublic). SSE-AES256 on all writes. Lazy cached endpoint check amortizes DNS across chunks.
apps/sim/lib/data-drains/access.ts Feature-flag and enterprise-plan gates now apply to both reads and writes. Role check (owner/admin) is consistent across all endpoints.
apps/sim/lib/data-drains/serializers.ts Re-validates stored JSONB config via configSchema.parse() before serialization. Using .parse() (throws) rather than .safeParse() means one drain with a schema-invalid config will throw from the list endpoint for all drains in that org.
apps/sim/lib/data-drains/sources/cursor.ts Composite (timestamp, id) keyset pagination correctly handles ties; returns undefined for null cursor so Drizzle's and() skips the predicate on first run.
apps/sim/background/run-data-drain.ts Thin Trigger.dev task wrapper correctly threads the cancellation signal into runDrain. No maxDuration configured — very large backfills that exceed the default Trigger.dev timeout will time out, mark as failed, and restart from cursorBefore indefinitely.
apps/sim/lib/data-drains/encryption.ts Thin wrapper around shared AES-256-GCM helper. Callers are expected to run credentialsSchema.parse() after decryption, which both the service and test route do correctly.
apps/sim/lib/api/contracts/data-drains.ts API contracts correctly separate public endpoint schema (no SSRF check) from the destination-specific runtime schema. Discriminated union for create enforces destinationType; loose record schema for update is intentional and re-validated at the route layer.
packages/db/schema.ts New dataDrains and dataDrainRuns tables look correct. rowsExported/bytesWritten have DB-level NOT NULL DEFAULT 0, locators defaults to []::jsonb, cascade delete from dataDrains to dataDrainRuns.

Sequence Diagram

sequenceDiagram
    participant Cron as Cron (/run-data-drains)
    participant Dispatcher
    participant DB
    participant Queue as Job Queue (Trigger.dev)
    participant Task as run-data-drain task
    participant Source
    participant Destination

    Cron->>Dispatcher: dispatchDueDrains()
    Dispatcher->>DB: reapOrphanedRuns()
    Dispatcher->>DB: SELECT due drains
    loop per candidate
        Dispatcher->>DB: isOrganizationOnEnterprisePlan
        Dispatcher->>DB: UPDATE claim (conditional)
        Dispatcher->>Queue: enqueue run-data-drain
    end
    Queue->>Task: trigger with signal
    Task->>DB: SELECT drain + config
    Task->>Task: decryptCredentials
    Task->>DB: INSERT dataDrainRun status=running
    Task->>Destination: openSession()
    loop each chunk
        Task->>Source: pages() chunk[]
        Task->>Destination: deliver(body, metadata, signal)
        Destination-->>Task: locator
    end
    alt success
        Task->>DB: UPDATE dataDrains cursor + lastSuccessAt
        Task->>DB: UPDATE dataDrainRun status=success
    else cancelled / error
        Task->>DB: UPDATE dataDrains lastRunAt only
        Task->>DB: UPDATE dataDrainRun status=failed
    end
    Task->>Destination: close()
Loading

Reviews (12): Last reviewed commit: "refactor(data-drains): trim extraneous c..." | Re-trigger Greptile

Comment thread apps/sim/lib/data-drains/destinations/webhook.ts Outdated
Comment thread apps/sim/lib/data-drains/destinations/webhook.ts Outdated
Comment thread apps/sim/lib/data-drains/serializers.ts
Comment thread apps/sim/lib/data-drains/dispatcher.ts
Comment thread apps/sim/lib/data-drains/sources/copilot-chats.ts
…ument copilot_chats cursor

- Thread AbortSignal through webhook test() and secureFetchWithPinnedIP so the route's 10s timeout actually cancels the outbound request
- Re-validate destinationConfig against the typed schema in serializeDrain so unexpected JSONB shapes surface instead of leaking
- Note in docs that drains export rows once on creation cursor; mutable copilot_chats fields are a point-in-time snapshot
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

Comment thread apps/sim/lib/data-drains/destinations/webhook.ts
Comment thread apps/sim/ee/data-drains/hooks/data-drains.ts
Comment thread apps/sim/lib/data-drains/destinations/webhook.ts
Comment thread apps/sim/lib/data-drains/destinations/s3.ts
…SRF, unused hook)

- webhook deliver: pass signal to secureFetchWithPinnedIP so aborts cancel the in-flight socket instead of waiting for the 30s timeout
- S3 config: SSRF-validate the optional endpoint via validateExternalUrl so an enterprise admin cannot point the AWS SDK at internal/metadata addresses
- hooks: remove unused useDataDrain (single-drain detail hook had no consumer)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

Comment thread apps/sim/lib/api/contracts/data-drains.ts Outdated
Comment thread apps/sim/lib/core/config/feature-flags.ts
Comment thread apps/sim/lib/data-drains/destinations/s3.ts Outdated
…, self-hosted gate)

- update body schema: drop the discriminated-union-with-.optional() that silently required destinationType for any non-undefined body. The route already validates destination payloads against the typed configSchema/credentialsSchema for the existing drain, so the contract is now a flat partial — clients can send {enabled:false} without supplying destinationType
- S3 buildKey: partition by run startedAt instead of new Date() per chunk so a single run that crosses midnight still lands under one YYYY/MM/DD prefix
- self-hosted gate: wire DATA_DRAINS_ENABLED into authorizeDrainAccess and the cron dispatcher route so the docs claim ("reserved for server-side feature gating") is actually enforced — mutating endpoints 404 and the dispatcher no-ops when unset

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

Comment thread apps/sim/lib/data-drains/dispatcher.ts
…elf-hosted

isOrganizationOnEnterprisePlan returns false on deployments without billing
infrastructure, so the dispatcher would silently skip every drain on
self-hosted even with DATA_DRAINS_ENABLED=true. Mirror the access.ts pattern:
when isBillingEnabled is false, treat all orgs as eligible — the cron route's
DATA_DRAINS_ENABLED gate already controls global on/off.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Comment thread apps/sim/lib/data-drains/dispatcher.ts
…cher

A throw from isOrganizationOnEnterprisePlan (Stripe outage, DB timeout) for
one org used to propagate out of the for-loop and abort the whole dispatch
batch. Wrap the check in try-catch so a single bad lookup just skips that
drain — the next cron tick retries it.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

Comment thread apps/sim/lib/core/security/input-validation.server.ts
Comment thread apps/sim/lib/data-drains/sources/cursor.ts
Promise reject is idempotent so this wasn't a correctness bug, but
routing the already-aborted branch through settledReject keeps all
settling paths consistent and ensures cleanupAbort runs even if a
listener somehow gets registered later.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

Comment thread apps/sim/lib/data-drains/service.ts
Comment thread apps/sim/lib/data-drains/dispatcher.ts Outdated
Comment thread apps/sim/lib/data-drains/sources/audit-logs.ts
Comment thread apps/sim/lib/data-drains/access.ts
- service: throw on cancellation after pages loop so a run aborted mid-stream
  isn't recorded as success
- audit-logs: include org-scoped rows (workspace_id IS NULL with
  metadata->>organizationId match) alongside workspace rows
- access: require owner/admin for read routes too; drain configs leak bucket
  names and webhook URLs

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

Comment thread apps/sim/lib/data-drains/dispatcher.ts Outdated
…batch

If the rollback update threw (e.g. transient DB error), the exception
bubbled out of the for loop and silently skipped the rest of the
candidate drains for the cycle. Wrap it so the batch continues.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Bugbot reviewed your changes and found no new issues!

Comment @cursor review or bugbot run to trigger another review on this PR

Reviewed by Cursor Bugbot for commit 4fe0d0e. Configure here.

Audit cleanup before merge:
- service: drop chunk-empty defensive skip (sources already handle it),
  trim WHAT-comments
- dispatcher: tighten claim-race / rollback / enterprise-cache rationale
  to a single WHY each
- access: collapse the duplicated module-top + inline comments into one
  TSDoc on the gate function
- s3: fix orphaned doc block over assertEndpointIsPublic, soften the
  forcePathStyle TSDoc to match the actual default
- webhook: drop empty close() comment
- docs: clarify that drain reads also require owner/admin, drop the
  "on the dispatcher tick" implementation detail

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@greptile

@waleedlatif1
Copy link
Copy Markdown
Collaborator Author

@cursor review

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 57f6571. Configure here.

Comment thread apps/sim/lib/data-drains/sources/cursor.ts
Comment thread apps/sim/lib/data-drains/service.ts
Comment thread packages/db/migrations/0202_panoramic_dreaming_celestial.sql Outdated
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Add runsList factory to avoid bypassing the query-key factory, drop
keepPreviousData on the near-static drains list, invalidate the drain
detail on run-now/delete, remove the orphaned detail/runs caches on
delete, add an aria-label on the row actions trigger, and use cn() for
the conditional run-status class.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
CLAUDE.md said `@/lib/utils` but the actual export lives at
`@/lib/core/utils/cn`. Build was failing with "Module not found".

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…es, audit-log index

- cursor: compare in millisecond buckets so PG's microsecond-precision
  timestamps don't cause the cursor row to re-emit forever. The JS Date
  round-trip truncates 00:00:00.123456 to 00:00:00.123, which made
  gt(col, cursor) match the cursor row itself.
- service: insert the run row before parse/decrypt so encryption-key
  rotation or schema drift surface as a failed run in the UI instead of
  vanishing into background-job logs while lastRunAt advances.
- audit_log: add (workspace_id, created_at, id) composite index so the
  audit-logs source's tie-breaking ORDER BY is satisfied by the index
  without a heap fetch.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant