Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
db5d298
feat(enterprise): add data drains for continuous export to S3 / webhook
waleedlatif1 May 5, 2026
2f82488
fix(data-drains): thread test signal, validate serialized config, doc…
waleedlatif1 May 5, 2026
0fd2c7f
fix(data-drains): address PR review (signal in deliver, S3 endpoint S…
waleedlatif1 May 5, 2026
216c861
fix(data-drains): address PR review (update schema, S3 date partition…
waleedlatif1 May 5, 2026
5c67e99
fix(data-drains): dispatcher must short-circuit enterprise check on s…
waleedlatif1 May 5, 2026
7940e30
fix(data-drains): isolate per-org enterprise check failures in dispat…
waleedlatif1 May 5, 2026
f9e487a
fix(secure-fetch): remove abort listener when request settles
waleedlatif1 May 5, 2026
1ece3d4
fix(data-drains): DNS-resolve S3 endpoint for SSRF defense
waleedlatif1 May 5, 2026
635907d
fix(data-drains): lazy-init S3 endpoint DNS check
waleedlatif1 May 5, 2026
5ec4537
fix(data-drains): match documented webhook retry count
waleedlatif1 May 5, 2026
feb6827
fix(data-drains): apply feature-flag and enterprise gates to read routes
waleedlatif1 May 5, 2026
7100fe0
fix(data-drains): handle concurrent delete and DB outage in failure path
waleedlatif1 May 5, 2026
e7e2954
fix(secure-fetch): route early-abort path through settledReject
waleedlatif1 May 5, 2026
13f7eff
fix(data-drains): address PR review thread fixes
waleedlatif1 May 5, 2026
4fe0d0e
fix(data-drains): guard dispatcher rollback so DB errors don't abort …
waleedlatif1 May 5, 2026
57f6571
refactor(data-drains): trim extraneous comments and defensive code
waleedlatif1 May 5, 2026
a56a100
refactor(data-drains): collapse redundant int() in runs limit query
waleedlatif1 May 5, 2026
70a7e5b
refactor(data-drains): tighten query keys and small UI cleanups
waleedlatif1 May 5, 2026
fb19077
fix(data-drains): correct cn import path
waleedlatif1 May 5, 2026
3d8a404
fix(data-drains): bugbot fixes for cursor precision, invisible failur…
waleedlatif1 May 5, 2026
2888404
fix(data-drains): align cursor predicate and ORDER BY on ms-truncated…
waleedlatif1 May 5, 2026
59410fc
fix(data-drains): expression indexes match date_trunc cursor ORDER BY
waleedlatif1 May 5, 2026
48f818d
fix(data-drains): cadence buffer + clear stale error on success
waleedlatif1 May 5, 2026
2db1755
fix(data-drains): abortable webhook backoff + S3/webhook optional for…
waleedlatif1 May 5, 2026
62c8b53
refactor(data-drains): drop dead detail key + redundant destinationTy…
waleedlatif1 May 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions apps/docs/content/docs/en/enterprise/data-drains.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
---
title: Data Drains
description: Continuously export workflow logs, audit logs, and Mothership data to your own S3 bucket or HTTPS endpoint on a schedule
---

import { FAQ } from '@/components/ui/faq'

Data Drains let organization owners and admins on Enterprise plans continuously export Sim data to a destination they control — a customer-owned S3 bucket or an HTTPS webhook. A drain runs on a schedule, picks up only new rows since its last successful run, and writes them as NDJSON to the destination. Viewing drain configuration and run history is restricted to owners and admins as well, since destinations expose internal bucket names and webhook URLs.

Drains pair naturally with [Data Retention](/enterprise/data-retention): drain into long-term storage first, then let retention safely delete from Sim.

---

## Setup

Go to **Settings → Enterprise → Data Drains** in your workspace, then click **New drain**.

Each drain has four pieces:

1. A **source** — the category of data to export
2. A **destination** — where the data goes
3. A **schedule** — how often it runs
4. A **name** — unique within your organization

---

## Sources

A drain exports exactly one source. To export multiple sources, create multiple drains.

| Source | Description |
|---|---|
| **Workflow logs** | Workflow execution records (one row per execution, only after the run reaches a terminal state). |
| **Job logs** | Background job records (deployed APIs, schedules, webhooks). Only terminal-state rows are exported. |
| **Audit logs** | Organization- and workspace-scoped audit events — logins, permission changes, resource creation/deletion, drain configuration changes. |
| **Copilot chats** | Mothership chat history. |
| **Copilot runs** | Mothership run records (terminal state only). |

Each row is delivered as a single line of NDJSON. The shape of each row is part of the public schema and stable across versions; every row carries an `id` field that downstream consumers can use to dedupe.

Drains export each row exactly once based on its creation cursor. Mutable fields on **Copilot chats** (messages, title, `lastSeenAt`) are a point-in-time snapshot and won't be re-emitted if the chat is later updated. Treat the export as append-only and reconstitute current state from your own system of record if you need it.

---

## Destinations

### Amazon S3 (or any S3-compatible store)

Writes one NDJSON object per delivered chunk to your bucket.

- **Bucket** — the bucket name. Must already exist; Sim does not create buckets.
- **Region** — AWS region (e.g. `us-east-1`).
- **Prefix** *(optional)* — folder path inside the bucket. Trailing slash optional.
- **Access key ID / Secret access key** — IAM credentials with `s3:PutObject` on the bucket. The "Test connection" button performs a real write probe to verify, then deletes it.
- **Endpoint** *(optional)* — for non-AWS stores like MinIO, Cloudflare R2, or GCS S3-interop. Leave blank for AWS S3.
- **Force path-style** *(optional)* — required for MinIO/Ceph, must be off for AWS S3 and R2.

Object keys are deterministic:

```
{prefix}/{source}/{drainId}/{yyyy}/{mm}/{dd}/{runId}-{seq}.ndjson
```

Objects are written with `AES256` server-side encryption.

### HTTPS Webhook

POSTs each chunk as NDJSON to your endpoint.

- **URL** — must be HTTPS. Sim resolves the hostname and refuses to deliver to private, loopback, or cloud-metadata IPs. The resolved IP is pinned for the duration of a run to prevent DNS rebinding.
- **Signing secret** — shared secret used for HMAC-SHA256 signing.
- **Bearer token** *(optional)* — sent as `Authorization: Bearer <token>`.
- **Signature header name** *(optional)* — defaults to `X-Sim-Signature`.

Each request includes:

```
Content-Type: application/x-ndjson
User-Agent: Sim-DataDrain/1.0
X-Sim-Timestamp: <unix-seconds>
X-Sim-Signature-Version: v1
X-Sim-Signature: t=<unix-seconds>,v1=<hex(hmac-sha256)>
X-Sim-Drain-Id: <drain id>
X-Sim-Run-Id: <run id>
X-Sim-Source: <source name>
X-Sim-Sequence: <chunk index>
X-Sim-Row-Count: <rows in this chunk>
Idempotency-Key: <runId>-<sequence>
```

The signature is computed as `HMAC-SHA256(secret, "${timestamp}.${body}")` and serialized as `t=<timestamp>,v1=<hex>`. Verify by recomputing over the same string and rejecting timestamps older than ~5 minutes — this defends against captured-request replay attacks.

Failed deliveries retry up to 3 times with exponential backoff (500ms, 1s, 2s with ±20% jitter), respecting `Retry-After` on 429/503. Non-retryable 4xx responses fail the run immediately.

---

## Schedule

| Cadence | Drain runs |
|---|---|
| **Hourly** | Once per hour. |
| **Daily** | Once per day. |

You can also disable a drain with the **Enabled** toggle (it stops running but is preserved), or trigger an out-of-schedule run with **Run now** on any drain row.

---

## Delivery semantics

Drains use an **opaque cursor** that advances only on full success. If a delivery fails partway through a run, the cursor is unchanged and the next run replays from the last successful position.

This is **at-least-once delivery**. Combined with the `id` field on every row and the `Idempotency-Key` header on every webhook chunk, downstream systems can dedupe deterministically.

The **last 10 runs** for each drain are visible by expanding its row in the settings page, with status, row count, bytes written, destination locator (`s3://...` or webhook URL), and the error message if it failed.

---

## Security

- Destination credentials are encrypted at rest using the same key-rotation–aware encryption that protects OAuth tokens.
- Credentials are **never** returned by the Sim API after creation. Updates accept new credentials; omitting them leaves the existing encrypted blob in place.
- Webhook URLs are SSRF-validated: HTTPS-only, no private/loopback/metadata IPs, with the resolved IP pinned to defeat DNS rebinding.
- Every create, update, delete, manual run, and test-connection call is recorded in the [Audit Log](/enterprise/audit-logs).

---

<FAQ items={[
{
question: "Who can configure data drains?",
answer: "Only organization owners and admins can view, create, edit, run, or delete drains. On Sim Cloud, the organization must be on an Enterprise plan."
},
{
question: "Will drained data be duplicated if a run fails?",
answer: "The drain cursor only advances on overall success, so a failure replays the same chunks on the next run. Every row has a stable `id` field and every webhook chunk has an `Idempotency-Key` header so receivers can dedupe."
},
{
question: "Can I export multiple sources to the same destination?",
answer: "Yes — create one drain per source, all pointing at the same bucket or endpoint. S3 destinations namespace by source automatically; webhook receivers can branch on the `X-Sim-Source` header."
},
{
question: "Does deleting a drain delete the data already exported?",
answer: "No. Deletion only removes the drain's configuration and its run history from Sim. Data already written to your bucket or sent to your webhook is yours and is unaffected."
},
{
question: "What happens if my credentials stop working mid-run?",
answer: "The run fails, the drain cursor does not advance, and the failed run is recorded with the error. Once you fix the credentials with an Update or by re-creating the drain, the next run replays from where the last successful run left off."
},
{
question: "What format is the data in?",
answer: "NDJSON — newline-delimited JSON, one row per line. Each chunk is a single S3 object or a single POST body."
}
]} />

---

## Self-hosted setup

### Environment variables

```bash
DATA_DRAINS_ENABLED=true
NEXT_PUBLIC_DATA_DRAINS_ENABLED=true
```

`NEXT_PUBLIC_DATA_DRAINS_ENABLED` shows the **Settings → Enterprise → Data Drains** page in the UI. `DATA_DRAINS_ENABLED` gates the server-side mutating endpoints and the cron dispatcher — when unset on a self-hosted deployment, drain create/update/delete/run requests return `404` and the dispatcher is a no-op. Both should be set to `true` together.

Data Drains otherwise rely on the standard Trigger.dev background job infrastructure used elsewhere in Sim — no additional setup is required. The cron dispatcher runs hourly and fans out due drains as background jobs.
7 changes: 7 additions & 0 deletions apps/docs/content/docs/en/enterprise/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ Configure how long execution logs, soft-deleted resources, and Mothership data a

---

## Data Drains

Continuously export workflow logs, audit logs, and Mothership data to a customer-owned S3 bucket or HTTPS webhook on a schedule. See the [data drains guide](/docs/enterprise/data-drains).

---

<FAQ items={[
{ question: "Who can manage Enterprise features?", answer: "Workspace admins on an Enterprise-entitled workspace. Access Control, SSO, whitelabeling, audit logs, and data retention are all configured per workspace under Settings → Enterprise." },
{ question: "Which SSO providers are supported?", answer: "Sim supports SAML 2.0 and OIDC, which works with virtually any enterprise identity provider including Okta, Azure AD (Entra ID), Google Workspace, ADFS, and OneLogin." },
Expand All @@ -79,6 +85,7 @@ Self-hosted deployments enable enterprise features via environment variables ins
| `WHITELABELING_ENABLED`, `NEXT_PUBLIC_WHITELABELING_ENABLED` | Custom branding |
| `AUDIT_LOGS_ENABLED`, `NEXT_PUBLIC_AUDIT_LOGS_ENABLED` | Audit logging |
| `NEXT_PUBLIC_DATA_RETENTION_ENABLED` | Data retention configuration |
| `DATA_DRAINS_ENABLED`, `NEXT_PUBLIC_DATA_DRAINS_ENABLED` | Data drains |
| `CREDENTIAL_SETS_ENABLED`, `NEXT_PUBLIC_CREDENTIAL_SETS_ENABLED` | Polling groups for email triggers |
| `INBOX_ENABLED`, `NEXT_PUBLIC_INBOX_ENABLED` | Sim Mailer inbox |
| `DISABLE_INVITATIONS`, `NEXT_PUBLIC_DISABLE_INVITATIONS` | Disable invitations; manage membership via Admin API |
Expand Down
10 changes: 9 additions & 1 deletion apps/docs/content/docs/en/enterprise/meta.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
{
"title": "Enterprise",
"pages": ["index", "sso", "access-control", "whitelabeling", "audit-logs", "data-retention"],
"pages": [
"index",
"sso",
"access-control",
"whitelabeling",
"audit-logs",
"data-retention",
"data-drains"
],
"defaultOpen": false
}
30 changes: 30 additions & 0 deletions apps/sim/app/api/cron/run-data-drains/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { createLogger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { isBillingEnabled, isDataDrainsEnabled } from '@/lib/core/config/feature-flags'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { dispatchDueDrains } from '@/lib/data-drains/dispatcher'

const logger = createLogger('CronRunDataDrains')

export const GET = withRouteHandler(async (request: NextRequest) => {
const authError = verifyCronAuth(request, 'Data drain dispatcher')
if (authError) return authError

// Self-hosted opt-in: skip dispatch entirely when the deployment hasn't
// enabled drains. Sim Cloud (billing enabled) gates per-org by enterprise
// plan inside the dispatcher's join.
if (!isBillingEnabled && !isDataDrainsEnabled) {
return NextResponse.json({ success: true, dispatched: 0, skipped: 'disabled' })
}

try {
const result = await dispatchDueDrains()
logger.info('Data drain dispatcher run complete', result)
return NextResponse.json({ success: true, ...result })
} catch (error) {
logger.error('Data drain dispatcher run failed', { error: toError(error).message })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
})
Loading
Loading