Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/docs/content/docs/en/tools/stt.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ Transcribe audio and video files to text using leading AI providers. Supports mu
| --------- | ---- | -------- | ----------- |
| `provider` | string | Yes | STT provider \(elevenlabs\) |
| `apiKey` | string | Yes | ElevenLabs API key |
| `model` | string | No | ElevenLabs model to use \(scribe_v1, scribe_v1_experimental\) |
| `model` | string | No | ElevenLabs model to use \(scribe_v2\) |
| `audioFile` | file | No | Audio or video file to transcribe \(e.g., MP3, WAV, M4A, WEBM\) |
| `audioFileReference` | file | No | Reference to audio/video file from previous blocks |
| `audioUrl` | string | No | URL to audio or video file |
Expand Down
2 changes: 2 additions & 0 deletions apps/realtime/src/handlers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { setupConnectionHandlers } from '@/handlers/connection'
import { setupOperationsHandlers } from '@/handlers/operations'
import { setupPresenceHandlers } from '@/handlers/presence'
import { setupSubblocksHandlers } from '@/handlers/subblocks'
import { setupTableHandlers } from '@/handlers/tables'
import { setupVariablesHandlers } from '@/handlers/variables'
import { setupWorkflowHandlers } from '@/handlers/workflow'
import type { AuthenticatedSocket } from '@/middleware/auth'
Expand All @@ -13,5 +14,6 @@ export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: IRoom
setupSubblocksHandlers(socket, roomManager)
setupVariablesHandlers(socket, roomManager)
setupPresenceHandlers(socket, roomManager)
setupTableHandlers(socket, roomManager)
setupConnectionHandlers(socket, roomManager)
}
73 changes: 73 additions & 0 deletions apps/realtime/src/handlers/tables.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { createLogger } from '@sim/logger'
import type { AuthenticatedSocket } from '@/middleware/auth'
import { verifyTableAccess } from '@/middleware/permissions'
import { type IRoomManager, tableRoomName } from '@/rooms/types'

const logger = createLogger('TableHandlers')

/**
* Wires `join-table` / `leave-table` socket events. Tables don't track presence
* or last-modified state — joining is a thin wrapper around `socket.join` so the
* Sim API → Realtime HTTP bridge can broadcast row updates back to subscribed clients.
*/
export function setupTableHandlers(socket: AuthenticatedSocket, _roomManager: IRoomManager) {
socket.on('join-table', async ({ tableId }: { tableId?: string }) => {
try {
if (!tableId || typeof tableId !== 'string') {
socket.emit('join-table-error', {
tableId: tableId ?? null,
error: 'tableId required',
code: 'INVALID_TABLE_ID',
retryable: false,
})
return
}

const userId = socket.userId
if (!userId) {
socket.emit('join-table-error', {
tableId,
error: 'Authentication required',
code: 'AUTHENTICATION_REQUIRED',
retryable: false,
})
return
}

const { hasAccess } = await verifyTableAccess(userId, tableId)
if (!hasAccess) {
socket.emit('join-table-error', {
tableId,
error: 'Access denied to table',
code: 'ACCESS_DENIED',
retryable: false,
})
return
}

const room = tableRoomName(tableId)
socket.join(room)
socket.emit('join-table-success', { tableId, socketId: socket.id })
logger.debug(`Socket ${socket.id} (user ${userId}) joined ${room}`)
} catch (error) {
logger.error(`Error joining table room:`, error)
socket.emit('join-table-error', {
tableId: null,
error: 'Failed to join table',
code: 'JOIN_TABLE_FAILED',
retryable: true,
})
}
})

socket.on('leave-table', async ({ tableId }: { tableId?: string }) => {
try {
if (!tableId || typeof tableId !== 'string') return
const room = tableRoomName(tableId)
socket.leave(room)
logger.debug(`Socket ${socket.id} left ${room}`)
} catch (error) {
logger.error(`Error leaving table room:`, error)
}
})
}
48 changes: 48 additions & 0 deletions apps/realtime/src/middleware/permissions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,51 @@ export async function verifyWorkflowAccess(
return { hasAccess: false }
}
}

/**
* Verify a user has read access to a table by virtue of workspace permission.
* Mirrors `verifyWorkflowAccess` for the table-room socket join check.
*/
export async function verifyTableAccess(
userId: string,
tableId: string
): Promise<{ hasAccess: boolean; workspaceId?: string }> {
try {
const { userTableDefinitions, permissions } = await import('@sim/db')
const tableData = await db
.select({ workspaceId: userTableDefinitions.workspaceId })
.from(userTableDefinitions)
.where(and(eq(userTableDefinitions.id, tableId), isNull(userTableDefinitions.archivedAt)))
.limit(1)

if (!tableData.length) {
logger.warn(`Table ${tableId} not found`)
return { hasAccess: false }
}
const { workspaceId } = tableData[0]
if (!workspaceId) return { hasAccess: false }

const [permissionRow] = await db
.select({ permissionType: permissions.permissionType })
.from(permissions)
.where(
and(
eq(permissions.userId, userId),
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workspaceId)
)
)
.limit(1)

if (!permissionRow?.permissionType) {
logger.warn(
`User ${userId} has no permission for workspace ${workspaceId} (table ${tableId})`
)
return { hasAccess: false }
}
return { hasAccess: true, workspaceId }
} catch (error) {
logger.error(`Error verifying table access for user ${userId}, table ${tableId}:`, error)
return { hasAccess: false }
}
}
28 changes: 27 additions & 1 deletion apps/realtime/src/rooms/memory-manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import { createLogger } from '@sim/logger'
import type { Server } from 'socket.io'
import type { IRoomManager, UserPresence, UserSession, WorkflowRoom } from '@/rooms/types'
import {
type IRoomManager,
type TableRowUpdatedPayload,
tableRoomName,
type UserPresence,
type UserSession,
type WorkflowRoom,
} from '@/rooms/types'

const logger = createLogger('MemoryRoomManager')

Expand Down Expand Up @@ -255,4 +262,23 @@ export class MemoryRoomManager implements IRoomManager {

logger.info(`Notified ${room.users.size} users about workflow deployment change: ${workflowId}`)
}

emitToTable<T = unknown>(tableId: string, event: string, payload: T): void {
this._io.to(tableRoomName(tableId)).emit(event, payload)
}

async handleTableRowUpdated(tableId: string, payload: TableRowUpdatedPayload): Promise<void> {
this.emitToTable(tableId, 'table-row-updated', { tableId, ...payload })
}

async handleTableRowDeleted(tableId: string, rowId: string): Promise<void> {
this.emitToTable(tableId, 'table-row-deleted', { tableId, rowId })
}

async handleTableDeleted(tableId: string): Promise<void> {
logger.info(`Handling table deletion notification for ${tableId}`)
this.emitToTable(tableId, 'table-deleted', { tableId, timestamp: Date.now() })
// Eject sockets so they don't hold a stale room. Cross-pod safe via socket.io.
await this._io.in(tableRoomName(tableId)).socketsLeave(tableRoomName(tableId))
}
}
27 changes: 26 additions & 1 deletion apps/realtime/src/rooms/redis-manager.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { createLogger } from '@sim/logger'
import { createClient, type RedisClientType } from 'redis'
import type { Server } from 'socket.io'
import type { IRoomManager, UserPresence, UserSession } from '@/rooms/types'
import {
type IRoomManager,
type TableRowUpdatedPayload,
tableRoomName,
type UserPresence,
type UserSession,
} from '@/rooms/types'

const logger = createLogger('RedisRoomManager')

Expand Down Expand Up @@ -457,4 +463,23 @@ export class RedisRoomManager implements IRoomManager {
const userCount = await this.getUniqueUserCount(workflowId)
logger.info(`Notified ${userCount} users about workflow deployment change: ${workflowId}`)
}

emitToTable<T = unknown>(tableId: string, event: string, payload: T): void {
this._io.to(tableRoomName(tableId)).emit(event, payload)
}

async handleTableRowUpdated(tableId: string, payload: TableRowUpdatedPayload): Promise<void> {
this.emitToTable(tableId, 'table-row-updated', { tableId, ...payload })
}

async handleTableRowDeleted(tableId: string, rowId: string): Promise<void> {
this.emitToTable(tableId, 'table-row-deleted', { tableId, rowId })
}

async handleTableDeleted(tableId: string): Promise<void> {
logger.info(`Handling table deletion notification for ${tableId}`)
this.emitToTable(tableId, 'table-deleted', { tableId, timestamp: Date.now() })
// Eject sockets across all pods via socket.io's Redis adapter.
await this._io.in(tableRoomName(tableId)).socketsLeave(tableRoomName(tableId))
}
}
41 changes: 41 additions & 0 deletions apps/realtime/src/rooms/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,45 @@ export interface IRoomManager {
* Handle workflow deployment change - notify users to refresh deployment state
*/
handleWorkflowDeployed(workflowId: string): Promise<void>

/**
* Emit an event to all clients in a table room (`table:${tableId}`).
* Tables don't track presence/last-modified state — just pub/sub.
*/
emitToTable<T = unknown>(tableId: string, event: string, payload: T): void

/**
* Notify all clients in a table room of a row write (insert/update/cell-state-change).
* Sim API calls this via the `/api/table-row-updated` HTTP bridge after every successful
* row commit; the client merges the delta into its React Query cache.
*/
handleTableRowUpdated(tableId: string, payload: TableRowUpdatedPayload): Promise<void>

/**
* Notify all clients in a table room that a row has been deleted.
*/
handleTableRowDeleted(tableId: string, rowId: string): Promise<void>

/**
* Notify all clients in a table room that the table has been deleted; eject sockets.
*/
handleTableDeleted(tableId: string): Promise<void>
}

/**
* Payload broadcast on `table-row-updated`. Mirrors the shape of `TableRow.data` so
* the client can merge directly into its React Query rows cache. `position` and
* `updatedAt` are included for cache reconciliation; `data` is the full row data
* (not a per-cell delta) — see plan Notes.
*/
export interface TableRowUpdatedPayload {
rowId: string
data: Record<string, unknown>
/** Per-workflow-group execution state. Keyed by `WorkflowGroup.id`. */
executions?: Record<string, unknown>
position: number
updatedAt: string | number
}

/** Socket.IO room name for a table. Namespaced from workflow rooms. */
export const tableRoomName = (tableId: string): string => `table:${tableId}`
46 changes: 46 additions & 0 deletions apps/realtime/src/routes/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,52 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
return
}

// Handle table row write notifications from the Sim API
if (req.method === 'POST' && req.url === '/api/table-row-updated') {
try {
const body = await readRequestBody(req)
const { tableId, rowId, data, executions, position, updatedAt } = JSON.parse(body)
await roomManager.handleTableRowUpdated(tableId, {
rowId,
data,
executions,
position,
updatedAt,
})
sendSuccess(res)
} catch (error) {
logger.error('Error handling table row update notification:', error)
sendError(res, 'Failed to process table row update')
}
return
}

if (req.method === 'POST' && req.url === '/api/table-row-deleted') {
try {
const body = await readRequestBody(req)
const { tableId, rowId } = JSON.parse(body)
await roomManager.handleTableRowDeleted(tableId, rowId)
sendSuccess(res)
} catch (error) {
logger.error('Error handling table row deletion notification:', error)
sendError(res, 'Failed to process table row deletion')
}
return
}

if (req.method === 'POST' && req.url === '/api/table-deleted') {
try {
const body = await readRequestBody(req)
const { tableId } = JSON.parse(body)
await roomManager.handleTableDeleted(tableId)
sendSuccess(res)
} catch (error) {
logger.error('Error handling table deletion notification:', error)
sendError(res, 'Failed to process table deletion')
}
return
}

res.writeHead(404, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Not found' }))
}
Expand Down
Loading
Loading