-
Notifications
You must be signed in to change notification settings - Fork 3.6k
feat(block): Allow wait block to wait up to 30 days #4331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging
Are you sure you want to change the base?
Changes from all commits
0b9019d
a54dcbe
28af223
d889f32
316bc8c
3f508e4
d6ec115
d7da35b
cf233bb
f8f3758
3c8bb40
d33acf4
4f40c4c
cbfab1c
4309d06
8b57476
e3d0e74
0ac0539
3838b6e
fc07922
3a1b1a8
46ffc49
010435c
c0bc62c
387cc97
2dbc7fd
8a50f18
dcf3302
bc09865
5f56e46
ca3bbf1
bbf400f
7c619e7
64cfda5
7ca736a
6066fc1
3422f64
595c4c3
d6c1bc2
58a3ae2
489f2d3
22ccaf1
a4b5df1
6aa3fe3
ecbf5e5
0c32dd4
7678245
2aaf2b7
d445b9c
c234b01
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,161 @@ | ||
| import { db } from '@sim/db' | ||
| import { pausedExecutions } from '@sim/db/schema' | ||
| import { createLogger } from '@sim/logger' | ||
| import { toError } from '@sim/utils/errors' | ||
| import { generateShortId } from '@sim/utils/id' | ||
| import { and, eq, isNotNull, lte } from 'drizzle-orm' | ||
| import { type NextRequest, NextResponse } from 'next/server' | ||
| import { verifyCronAuth } from '@/lib/auth/internal' | ||
| import { acquireLock, releaseLock } from '@/lib/core/config/redis' | ||
| import { withRouteHandler } from '@/lib/core/utils/with-route-handler' | ||
| import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' | ||
|
|
||
| const logger = createLogger('TimePauseResumePoll') | ||
|
|
||
| export const dynamic = 'force-dynamic' | ||
| export const maxDuration = 120 | ||
|
|
||
| const LOCK_KEY = 'time-pause-resume-poll-lock' | ||
| const LOCK_TTL_SECONDS = 120 | ||
| const POLL_BATCH_LIMIT = 200 | ||
|
|
||
| interface StoredPausePoint { | ||
| contextId?: string | ||
| resumeStatus?: string | ||
| pauseKind?: string | ||
| resumeAt?: string | ||
| } | ||
|
|
||
| export const GET = withRouteHandler(async (request: NextRequest) => { | ||
| const requestId = generateShortId() | ||
|
|
||
| const authError = verifyCronAuth(request, 'Time-pause resume poll') | ||
| if (authError) return authError | ||
|
|
||
| const lockAcquired = await acquireLock(LOCK_KEY, requestId, LOCK_TTL_SECONDS) | ||
| if (!lockAcquired) { | ||
| return NextResponse.json( | ||
| { success: true, message: 'Polling already in progress – skipped', requestId }, | ||
| { status: 202 } | ||
| ) | ||
| } | ||
|
|
||
| let claimedRows = 0 | ||
| let dispatched = 0 | ||
| const failures: { executionId: string; contextId: string; error: string }[] = [] | ||
|
|
||
| try { | ||
| const now = new Date() | ||
|
|
||
| const dueRows = await db | ||
| .select({ | ||
| id: pausedExecutions.id, | ||
| executionId: pausedExecutions.executionId, | ||
| workflowId: pausedExecutions.workflowId, | ||
| pausePoints: pausedExecutions.pausePoints, | ||
| metadata: pausedExecutions.metadata, | ||
| }) | ||
| .from(pausedExecutions) | ||
| .where( | ||
| and( | ||
| eq(pausedExecutions.status, 'paused'), | ||
| isNotNull(pausedExecutions.nextResumeAt), | ||
| lte(pausedExecutions.nextResumeAt, now) | ||
| ) | ||
| ) | ||
| .limit(POLL_BATCH_LIMIT) | ||
|
|
||
| claimedRows = dueRows.length | ||
|
|
||
| for (const row of dueRows) { | ||
| const points = (row.pausePoints ?? {}) as Record<string, StoredPausePoint> | ||
| const metadata = (row.metadata ?? {}) as Record<string, unknown> | ||
| const userId = typeof metadata.executorUserId === 'string' ? metadata.executorUserId : '' | ||
|
|
||
| const duePoints: StoredPausePoint[] = [] | ||
| let nextRemaining: Date | null = null | ||
|
|
||
| for (const point of Object.values(points)) { | ||
| if (point.pauseKind !== 'time' || !point.resumeAt) continue | ||
| if (point.resumeStatus && point.resumeStatus !== 'paused') continue | ||
|
|
||
| const resumeAt = new Date(point.resumeAt) | ||
| if (Number.isNaN(resumeAt.getTime())) continue | ||
|
|
||
| if (resumeAt <= now) { | ||
| duePoints.push(point) | ||
| } else if (!nextRemaining || resumeAt < nextRemaining) { | ||
| nextRemaining = resumeAt | ||
| } | ||
| } | ||
|
|
||
| for (const point of duePoints) { | ||
| const contextId = point.contextId | ||
| if (!contextId) continue | ||
| try { | ||
| const enqueueResult = await PauseResumeManager.enqueueOrStartResume({ | ||
| executionId: row.executionId, | ||
| contextId, | ||
| resumeInput: {}, | ||
| userId, | ||
| }) | ||
|
|
||
| if (enqueueResult.status === 'starting') { | ||
| PauseResumeManager.startResumeExecution({ | ||
| resumeEntryId: enqueueResult.resumeEntryId, | ||
| resumeExecutionId: enqueueResult.resumeExecutionId, | ||
| pausedExecution: enqueueResult.pausedExecution, | ||
| contextId: enqueueResult.contextId, | ||
| resumeInput: enqueueResult.resumeInput, | ||
| userId: enqueueResult.userId, | ||
| }).catch((error) => { | ||
| logger.error('Background time-pause resume failed', { | ||
| executionId: row.executionId, | ||
| contextId, | ||
| error: toError(error).message, | ||
| }) | ||
| }) | ||
| } | ||
| dispatched++ | ||
| } catch (error) { | ||
| const message = toError(error).message | ||
| logger.warn('Failed to dispatch time-pause resume', { | ||
| executionId: row.executionId, | ||
| contextId, | ||
| error: message, | ||
| }) | ||
| failures.push({ executionId: row.executionId, contextId, error: message }) | ||
| } | ||
| } | ||
|
|
||
| // We never auto-retry a failed dispatch: workflow blocks aren't idempotent, and an | ||
| // operator must investigate stranded rows by hand. Setting nextResumeAt to the next | ||
| // future pause (or null) drops the row out of the poll, surfacing the failure. | ||
| await db | ||
| .update(pausedExecutions) | ||
| .set({ nextResumeAt: nextRemaining }) | ||
| .where(eq(pausedExecutions.id, row.id)) | ||
|
TheodoreSpeaks marked this conversation as resolved.
|
||
| } | ||
|
|
||
| logger.info('Time-pause resume poll completed', { | ||
| requestId, | ||
| claimedRows, | ||
| dispatched, | ||
| failureCount: failures.length, | ||
|
Comment on lines
+92
to
+144
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When Any transient failure — DB timeout, lock contention, network hiccup inside A simple fix is to re-schedule failed points by putting their for (const point of duePoints) {
const contextId = point.contextId
if (!contextId) continue
try {
// ... dispatch ...
dispatched++
} catch (error) {
const message = toError(error).message
logger.warn('Failed to dispatch time-pause resume', { ... })
failures.push({ executionId: row.executionId, contextId, error: message })
// Re-queue failed point
if (point.resumeAt) {
const retryAt = new Date(point.resumeAt)
if (!Number.isNaN(retryAt.getTime())) {
if (!nextRemaining || retryAt < nextRemaining) nextRemaining = retryAt
}
}
}
}Alternatively, schedule a short retry (e.g. |
||
| }) | ||
|
|
||
| return NextResponse.json({ | ||
| success: true, | ||
| requestId, | ||
| claimedRows, | ||
| dispatched, | ||
| failures, | ||
| }) | ||
| } catch (error) { | ||
| const message = toError(error).message | ||
| logger.error('Time-pause resume poll failed', { requestId, error: message }) | ||
| return NextResponse.json({ success: false, requestId, error: message }, { status: 500 }) | ||
| } finally { | ||
| await releaseLock(LOCK_KEY, requestId).catch(() => {}) | ||
| } | ||
| }) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ORDER BYon batch query — high-volume queues risk row starvationWithout an explicit
ORDER BY, PostgreSQL returns rows in an unspecified order. When the queue depth exceedsPOLL_BATCH_LIMIT = 200, the same 200 rows may be returned on every invocation (e.g. lowest physical heap order), while later-inserted rows are perpetually skipped. Adding.orderBy(pausedExecutions.nextResumeAt)ensures the most-overdue entries are always processed first and that all rows are eventually drained.