From d7a6339068db59a782182127582433103d3de080 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Sat, 2 May 2026 18:29:21 -0700 Subject: [PATCH] Add queueing for hosted keys --- .../hosted-key-rate-limiter.test.ts | 79 +++++- .../hosted-key/hosted-key-rate-limiter.ts | 247 +++++++++++++++--- apps/sim/lib/core/telemetry.ts | 41 +++ apps/sim/tools/index.ts | 84 +++++- 4 files changed, 404 insertions(+), 47 deletions(-) diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts index 50cf346222d..ace2e9569f4 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts @@ -7,6 +7,10 @@ import type { import { HostedKeyRateLimiter } from './hosted-key-rate-limiter' import type { CustomRateLimit, PerRequestRateLimit } from './types' +/** Force the queue wait to give up on the first iteration by reporting a retry time + * larger than the 5-minute MAX_QUEUE_WAIT_MS cap. */ +const RETRY_PAST_CAP_MS = 6 * 60 * 1000 + interface MockAdapter { consumeTokens: Mock getTokenStatus: Mock @@ -72,11 +76,12 @@ describe('HostedKeyRateLimiter', () => { expect(result.error).toContain('No hosted keys configured') }) - it('should rate limit billing actor when they exceed their limit', async () => { + it('should rate limit billing actor when wait exceeds the queue cap', async () => { + // resetAt past the 5-minute cap forces the wait loop to bail immediately. const rateLimitedResult: ConsumeResult = { allowed: false, tokensRemaining: 0, - resetAt: new Date(Date.now() + 30000), + resetAt: new Date(Date.now() + RETRY_PAST_CAP_MS), } mockAdapter.consumeTokens.mockResolvedValue(rateLimitedResult) @@ -93,6 +98,33 @@ describe('HostedKeyRateLimiter', () => { expect(result.error).toContain('Rate limit exceeded') }) + it('should wait for capacity then succeed when bucket refills within the cap', async () => { + // First call: bucket empty, refills in 100ms (well under cap). + // Second call: bucket has capacity, consumed. + const blocked: ConsumeResult = { + allowed: false, + tokensRemaining: 0, + resetAt: new Date(Date.now() + 100), + } + const allowed: ConsumeResult = { + allowed: true, + tokensRemaining: 9, + resetAt: new Date(Date.now() + 60000), + } + mockAdapter.consumeTokens.mockResolvedValueOnce(blocked).mockResolvedValueOnce(allowed) + + const result = await rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + perRequestRateLimit, + 'workspace-wait' + ) + + expect(result.success).toBe(true) + expect(result.key).toBe('test-key-1') + expect(mockAdapter.consumeTokens).toHaveBeenCalledTimes(2) + }) + it('should allow billing actor within their rate limit', async () => { const allowedResult: ConsumeResult = { allowed: true, @@ -197,11 +229,11 @@ describe('HostedKeyRateLimiter', () => { ], } - it('should enforce requestsPerMinute for custom mode', async () => { + it('should enforce requestsPerMinute for custom mode when wait exceeds the cap', async () => { const rateLimitedResult: ConsumeResult = { allowed: false, tokensRemaining: 0, - resetAt: new Date(Date.now() + 30000), + resetAt: new Date(Date.now() + RETRY_PAST_CAP_MS), } mockAdapter.consumeTokens.mockResolvedValue(rateLimitedResult) @@ -246,7 +278,7 @@ describe('HostedKeyRateLimiter', () => { expect(mockAdapter.getTokenStatus).toHaveBeenCalledTimes(1) }) - it('should block request when a dimension is depleted', async () => { + it('should block request when a dimension wait exceeds the cap', async () => { const allowedConsume: ConsumeResult = { allowed: true, tokensRemaining: 4, @@ -258,7 +290,7 @@ describe('HostedKeyRateLimiter', () => { tokensAvailable: 0, maxTokens: 2000, lastRefillAt: new Date(), - nextRefillAt: new Date(Date.now() + 45000), + nextRefillAt: new Date(Date.now() + RETRY_PAST_CAP_MS), } mockAdapter.getTokenStatus.mockResolvedValue(depleted) @@ -274,6 +306,39 @@ describe('HostedKeyRateLimiter', () => { expect(result.error).toContain('tokens') }) + it('should wait for dimension capacity then succeed when budget refills', async () => { + const allowedConsume: ConsumeResult = { + allowed: true, + tokensRemaining: 4, + resetAt: new Date(Date.now() + 60000), + } + mockAdapter.consumeTokens.mockResolvedValue(allowedConsume) + + const depleted: TokenStatus = { + tokensAvailable: 0, + maxTokens: 2000, + lastRefillAt: new Date(), + nextRefillAt: new Date(Date.now() + 100), + } + const refilled: TokenStatus = { + tokensAvailable: 500, + maxTokens: 2000, + lastRefillAt: new Date(), + nextRefillAt: new Date(Date.now() + 60000), + } + mockAdapter.getTokenStatus.mockResolvedValueOnce(depleted).mockResolvedValueOnce(refilled) + + const result = await rateLimiter.acquireKey( + testProvider, + envKeyPrefix, + customRateLimit, + 'workspace-dim-wait' + ) + + expect(result.success).toBe(true) + expect(mockAdapter.getTokenStatus).toHaveBeenCalledTimes(2) + }) + it('should pre-check all dimensions and block on first depleted one', async () => { const multiDimensionConfig: CustomRateLimit = { mode: 'custom', @@ -309,7 +374,7 @@ describe('HostedKeyRateLimiter', () => { tokensAvailable: 0, maxTokens: 100, lastRefillAt: new Date(), - nextRefillAt: new Date(Date.now() + 30000), + nextRefillAt: new Date(Date.now() + RETRY_PAST_CAP_MS), } mockAdapter.getTokenStatus .mockResolvedValueOnce(tokensBudget) diff --git a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts index a20cf8413f3..d07efd055f8 100644 --- a/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts +++ b/apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.ts @@ -1,9 +1,13 @@ import { createLogger } from '@sim/logger' +import { sleep } from '@sim/utils/helpers' +import { generateShortId } from '@sim/utils/id' +import { acquireLock, releaseLock } from '@/lib/core/config/redis' import { createStorageAdapter, type RateLimitStorageAdapter, type TokenBucketConfig, } from '@/lib/core/rate-limiter/storage' +import { PlatformEvents } from '@/lib/core/telemetry' import { type AcquireKeyResult, type CustomRateLimit, @@ -16,6 +20,24 @@ import { const logger = createLogger('HostedKeyRateLimiter') +/** + * Maximum time a hosted-key acquisition will wait for the per-workspace bucket + * to refill before falling back to a 429. Sized comfortably under the 90-min + * Trigger.dev container ceiling so a queued call still has time to actually + * execute after acquisition. + */ +const MAX_QUEUE_WAIT_MS = 5 * 60 * 1000 + +/** + * Floor on per-iteration sleep when the bucket reports `retryAfterMs <= 0`, + * which can happen due to clock skew or sub-millisecond resets. Prevents a + * tight retry loop hammering the storage adapter. + */ +const MIN_QUEUE_RETRY_DELAY_MS = 50 + +/** TTL slack on the FIFO lock — a crashed worker can't permanently block its workspace. */ +const QUEUE_LOCK_TTL_SECONDS = Math.ceil(MAX_QUEUE_WAIT_MS / 1000) + 30 + /** * Resolves env var names for a numbered key prefix using a `{PREFIX}_COUNT` env var. * E.g. with `EXA_API_KEY_COUNT=5`, returns `['EXA_API_KEY_1', ..., 'EXA_API_KEY_5']`. @@ -179,11 +201,16 @@ export class HostedKeyRateLimiter { * Acquire an available key via round-robin selection. * * For both modes: - * 1. Per-billing-actor request rate limiting (enforced): blocks actors who exceed their request limit + * 1. Per-billing-actor request rate limiting (enforced): when the actor is over their + * limit, the call blocks (waits for refill) up to `MAX_QUEUE_WAIT_MS`. A Redis + * FIFO lock keyed on `{provider, billingActorId}` keeps callers in the same + * workspace serialized so the bucket drains predictably. * 2. Round-robin key selection: cycles through available keys for even distribution * * For `custom` mode additionally: - * 3. Pre-checks dimension budgets: blocks if any dimension is already depleted + * 3. Pre-checks dimension budgets: same wait-for-refill behavior if a dimension is depleted + * + * If the wait exceeds the cap, the call falls back to today's 429 result. * * @param envKeyPrefix - Env var prefix (e.g. 'EXA_API_KEY'). Keys resolved via `{prefix}_COUNT`. * @param billingActorId - The billing actor (typically workspace ID) to rate limit against @@ -194,56 +221,198 @@ export class HostedKeyRateLimiter { config: HostedKeyRateLimitConfig, billingActorId: string ): Promise { - if (config.requestsPerMinute) { - const rateLimitResult = await this.checkActorRateLimit(provider, billingActorId, config) - if (rateLimitResult) { - return { - success: false, - billingActorRateLimited: true, - retryAfterMs: rateLimitResult.retryAfterMs, - error: `Rate limit exceeded. Please wait ${Math.ceil(rateLimitResult.retryAfterMs / 1000)} seconds. If you're getting throttled frequently, consider adding your own API key under Settings > BYOK to avoid shared rate limits.`, + const lockKey = `hosted-queue:${provider}:${billingActorId}` + const lockValue = generateShortId() + const lockHeld = await this.acquireFifoLock(lockKey, lockValue) + + try { + if (config.requestsPerMinute) { + const rateLimitResult = await this.waitForActorCapacity(provider, billingActorId, config) + if (rateLimitResult.rateLimited) { + return { + success: false, + billingActorRateLimited: true, + retryAfterMs: rateLimitResult.retryAfterMs, + error: `Rate limit exceeded. Please wait ${Math.ceil(rateLimitResult.retryAfterMs / 1000)} seconds. If you're getting throttled frequently, consider adding your own API key under Settings > BYOK to avoid shared rate limits.`, + } } } - } - if (config.mode === 'custom' && config.dimensions.length > 0) { - const dimensionResult = await this.preCheckDimensions(provider, billingActorId, config) - if (dimensionResult) { + if (config.mode === 'custom' && config.dimensions.length > 0) { + const dimensionResult = await this.waitForDimensionCapacity( + provider, + billingActorId, + config + ) + if (dimensionResult.rateLimited) { + return { + success: false, + billingActorRateLimited: true, + retryAfterMs: dimensionResult.retryAfterMs, + error: `Rate limit exceeded for ${dimensionResult.dimension}. Please wait ${Math.ceil(dimensionResult.retryAfterMs / 1000)} seconds. If you're getting throttled frequently, consider adding your own API key under Settings > BYOK to avoid shared rate limits.`, + } + } + } + + const envKeys = resolveEnvKeys(envKeyPrefix) + const availableKeys = this.getAvailableKeys(envKeys) + + if (availableKeys.length === 0) { + logger.warn(`No hosted keys configured for provider ${provider}`) return { success: false, - billingActorRateLimited: true, - retryAfterMs: dimensionResult.retryAfterMs, - error: `Rate limit exceeded for ${dimensionResult.dimension}. Please wait ${Math.ceil(dimensionResult.retryAfterMs / 1000)} seconds. If you're getting throttled frequently, consider adding your own API key under Settings > BYOK to avoid shared rate limits.`, + error: `No hosted keys configured for ${provider}`, } } - } - const envKeys = resolveEnvKeys(envKeyPrefix) - const availableKeys = this.getAvailableKeys(envKeys) + const counter = this.roundRobinCounters.get(provider) ?? 0 + const selected = availableKeys[counter % availableKeys.length] + this.roundRobinCounters.set(provider, counter + 1) + + logger.debug(`Selected hosted key for ${provider}`, { + provider, + keyIndex: selected.keyIndex, + envVarName: selected.envVarName, + }) - if (availableKeys.length === 0) { - logger.warn(`No hosted keys configured for provider ${provider}`) return { - success: false, - error: `No hosted keys configured for ${provider}`, + success: true, + key: selected.key, + keyIndex: selected.keyIndex, + envVarName: selected.envVarName, + } + } finally { + if (lockHeld) { + await this.releaseFifoLock(lockKey, lockValue) } } + } + + /** + * Acquire the per-workspace+provider FIFO lock that serializes queue waits. + * Returns true if the lock was held by this caller (or Redis is unavailable, in which + * case the lock is a no-op and we proceed without fairness). Returns false if the lock + * is already held by another caller and we should still proceed without waiting on it + * (correctness is preserved by the token bucket; we just lose fairness). + */ + private async acquireFifoLock(lockKey: string, lockValue: string): Promise { + try { + return await acquireLock(lockKey, lockValue, QUEUE_LOCK_TTL_SECONDS) + } catch (error) { + logger.warn(`Failed to acquire hosted-queue FIFO lock ${lockKey}`, { error }) + return false + } + } + + /** + * Release the per-workspace+provider FIFO lock. Best-effort; logs but does not throw. + */ + private async releaseFifoLock(lockKey: string, lockValue: string): Promise { + try { + await releaseLock(lockKey, lockValue) + } catch (error) { + logger.warn(`Failed to release hosted-queue FIFO lock ${lockKey}`, { error }) + } + } + + /** + * Wait for actor request-rate capacity. Re-checks the bucket after each refill window + * up to `MAX_QUEUE_WAIT_MS`. Returns `{ rateLimited: false }` once a token has been + * consumed (the underlying check is consume-on-success, matching the original behavior). + */ + private async waitForActorCapacity( + provider: string, + billingActorId: string, + config: HostedKeyRateLimitConfig + ): Promise<{ rateLimited: false } | { rateLimited: true; retryAfterMs: number }> { + const startedAt = Date.now() + let attempts = 0 + + while (true) { + const result = await this.checkActorRateLimit(provider, billingActorId, config) + attempts++ + + if (!result) { + if (attempts > 1) { + PlatformEvents.hostedKeyQueueWaited({ + provider, + workspaceId: billingActorId, + waitedMs: Date.now() - startedAt, + attempts, + reason: 'actor_requests', + }) + } + return { rateLimited: false } + } + + const elapsed = Date.now() - startedAt + const remaining = MAX_QUEUE_WAIT_MS - elapsed + if (remaining <= 0 || result.retryAfterMs > remaining) { + PlatformEvents.hostedKeyQueueWaitExceeded({ + provider, + workspaceId: billingActorId, + waitedMs: elapsed, + reason: 'actor_requests', + }) + return { rateLimited: true, retryAfterMs: result.retryAfterMs } + } + + const sleepMs = Math.max(MIN_QUEUE_RETRY_DELAY_MS, result.retryAfterMs) + await sleep(sleepMs) + } + } + + /** + * Wait for custom-mode dimension capacity. `preCheckDimensions` is read-only — it does + * not consume — so re-running it after a sleep is safe and does not double-charge. + * Post-execution `reportUsage` performs the actual consumption. + */ + private async waitForDimensionCapacity( + provider: string, + billingActorId: string, + config: CustomRateLimit + ): Promise< + { rateLimited: false } | { rateLimited: true; retryAfterMs: number; dimension: string } + > { + const startedAt = Date.now() + let attempts = 0 + + while (true) { + const result = await this.preCheckDimensions(provider, billingActorId, config) + attempts++ + + if (!result) { + if (attempts > 1) { + PlatformEvents.hostedKeyQueueWaited({ + provider, + workspaceId: billingActorId, + waitedMs: Date.now() - startedAt, + attempts, + reason: 'dimension', + }) + } + return { rateLimited: false } + } + + const elapsed = Date.now() - startedAt + const remaining = MAX_QUEUE_WAIT_MS - elapsed + if (remaining <= 0 || result.retryAfterMs > remaining) { + PlatformEvents.hostedKeyQueueWaitExceeded({ + provider, + workspaceId: billingActorId, + waitedMs: elapsed, + reason: 'dimension', + dimension: result.dimension, + }) + return { + rateLimited: true, + retryAfterMs: result.retryAfterMs, + dimension: result.dimension, + } + } - const counter = this.roundRobinCounters.get(provider) ?? 0 - const selected = availableKeys[counter % availableKeys.length] - this.roundRobinCounters.set(provider, counter + 1) - - logger.debug(`Selected hosted key for ${provider}`, { - provider, - keyIndex: selected.keyIndex, - envVarName: selected.envVarName, - }) - - return { - success: true, - key: selected.key, - keyIndex: selected.keyIndex, - envVarName: selected.envVarName, + const sleepMs = Math.max(MIN_QUEUE_RETRY_DELAY_MS, result.retryAfterMs) + await sleep(sleepMs) } } diff --git a/apps/sim/lib/core/telemetry.ts b/apps/sim/lib/core/telemetry.ts index 34af72809f1..ce97a94e8a4 100644 --- a/apps/sim/lib/core/telemetry.ts +++ b/apps/sim/lib/core/telemetry.ts @@ -1002,6 +1002,47 @@ export const PlatformEvents = { }) }, + /** + * Track a successful hosted-key acquisition that had to wait for capacity. + * Fires after the actor or dimension bucket refilled enough for the call to proceed. + */ + hostedKeyQueueWaited: (attrs: { + provider: string + workspaceId: string + waitedMs: number + attempts: number + reason: 'actor_requests' | 'dimension' + dimension?: string + }) => { + trackPlatformEvent('platform.hosted_key.queue_waited', { + 'provider.id': attrs.provider, + 'workspace.id': attrs.workspaceId, + 'queue.waited_ms': attrs.waitedMs, + 'queue.attempts': attrs.attempts, + 'queue.reason': attrs.reason, + ...(attrs.dimension && { 'queue.dimension': attrs.dimension }), + }) + }, + + /** + * Track a hosted-key acquisition that exceeded the queue wait cap and fell back to a 429. + */ + hostedKeyQueueWaitExceeded: (attrs: { + provider: string + workspaceId: string + waitedMs: number + reason: 'actor_requests' | 'dimension' + dimension?: string + }) => { + trackPlatformEvent('platform.hosted_key.queue_wait_exceeded', { + 'provider.id': attrs.provider, + 'workspace.id': attrs.workspaceId, + 'queue.waited_ms': attrs.waitedMs, + 'queue.reason': attrs.reason, + ...(attrs.dimension && { 'queue.dimension': attrs.dimension }), + }) + }, + /** * Track chat deployed (workflow deployed as chat interface) */ diff --git a/apps/sim/tools/index.ts b/apps/sim/tools/index.ts index 3bf69a56101..a90ab0cee24 100644 --- a/apps/sim/tools/index.ts +++ b/apps/sim/tools/index.ts @@ -302,6 +302,48 @@ async function injectHostedKeyIfNeeded( } } +/** + * Re-acquire a hosted key after upstream-429 retries have been exhausted. Calls + * `acquireKey` (which now blocks on the per-workspace bucket) and re-injects the + * fresh key into `params`. Returns false if no key could be obtained — caller + * should re-throw the original upstream 429. + * + * Does not consult BYOK. We only enter this path from inside the hosted-key + * branch of `executeTool`, so BYOK has already been ruled out for this call. + */ +async function reacquireHostedKey( + tool: ToolConfig, + params: Record, + executionContext: ExecutionContext | undefined, + requestId: string +): Promise { + if (!tool.hosting) return false + const { envKeyPrefix, apiKeyParam, byokProviderId, rateLimit } = tool.hosting + const { workspaceId } = resolveToolScope(params, executionContext) + if (!workspaceId) return false + + const provider = byokProviderId || tool.id + const acquireResult = await getHostedKeyRateLimiter().acquireKey( + provider, + envKeyPrefix, + rateLimit, + workspaceId + ) + + if (!acquireResult.success || !acquireResult.key) { + logger.warn( + `[${requestId}] Re-acquire of hosted key for ${tool.id} failed: ${acquireResult.error ?? 'unknown'}` + ) + return false + } + + params[apiKeyParam] = acquireResult.key + logger.info( + `[${requestId}] Re-acquired hosted key for ${tool.id} (${acquireResult.envVarName}) after upstream throttling` + ) + return true +} + /** * Check if an error is a rate limit (throttling) or quota exhaustion error. * Some providers (e.g. Perplexity) return 401/403 with "insufficient_quota" @@ -328,11 +370,23 @@ interface RetryContext { toolId: string envVarName: string executionContext?: ExecutionContext + /** + * Optional callback invoked after the local exponential backoff has been exhausted by + * upstream 429s. Should re-enter the per-workspace hosted-key queue (which now blocks + * on the bucket) and return a fresh execution thunk bound to the newly acquired key. + * If the callback returns null, we give up and re-throw the last error. + */ + reacquireAfterRetriesExhausted?: () => Promise<(() => Promise) | null> } /** * Execute a function with exponential backoff retry for rate limiting errors. * Only used for hosted key requests. Tracks rate limit events via telemetry. + * + * On terminal upstream 429, optionally re-enters the hosted-key queue (which waits for + * the per-workspace bucket to refill) and retries once with a freshly acquired key. + * This handles the case where the upstream provider's limit is tighter than ours — we + * re-queue the call instead of surfacing the error. */ async function executeWithRetry( fn: () => Promise, @@ -340,7 +394,8 @@ async function executeWithRetry( maxRetries = 3, baseDelayMs = 1000 ): Promise { - const { requestId, toolId, envVarName, executionContext } = context + const { requestId, toolId, envVarName, executionContext, reacquireAfterRetriesExhausted } = + context let lastError: unknown for (let attempt = 0; attempt <= maxRetries; attempt++) { @@ -351,6 +406,23 @@ async function executeWithRetry( if (!isRateLimitError(error) || attempt === maxRetries) { if (isRateLimitError(error) && attempt === maxRetries) { + if (reacquireAfterRetriesExhausted) { + try { + const requeued = await reacquireAfterRetriesExhausted() + if (requeued) { + logger.warn( + `[${requestId}] Upstream retries exhausted for ${toolId} (${envVarName}); re-queued and retrying once with fresh key` + ) + return (await requeued()) as T + } + } catch (requeueError) { + logger.error( + `[${requestId}] Re-queue after exhausted upstream retries failed for ${toolId}`, + { error: toError(requeueError).message } + ) + } + } + PlatformEvents.hostedKeyUserThrottled({ toolId, reason: 'upstream_retries_exhausted', @@ -984,6 +1056,16 @@ export async function executeTool( toolId, envVarName: hostedKeyInfo.envVarName!, executionContext, + reacquireAfterRetriesExhausted: async () => { + const reacquired = await reacquireHostedKey( + tool, + contextParams, + executionContext, + requestId + ) + if (!reacquired) return null + return () => executeToolRequest(toolId, tool, contextParams) + }, }) : await executeToolRequest(toolId, tool, contextParams)