Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
d5b8cb8
refactor(run-engine): split completeWaitpoint into tx-able mutation a…
d-cs Jun 10, 2026
67f70bc
refactor(run-engine): carry output through CompletedWaitpointMutation…
d-cs Jun 10, 2026
d3bfa0c
test(run-engine): add failing tests for atomic single-commit run comp…
d-cs Jun 10, 2026
a8bd676
test(run-engine): make atomic-completion assertions race-free (xmin e…
d-cs Jun 10, 2026
9372ab6
feat(run-engine): complete run and waitpoint in a single transaction
d-cs Jun 10, 2026
5694397
feat(run-engine): unblock continued runs in a single transaction
d-cs Jun 10, 2026
b4ce360
fix(run-engine): keep snapshot side effects out of the unblock transa…
d-cs Jun 10, 2026
eee5d58
fix(run-engine): restore pre-split snapshot return and event payload …
d-cs Jun 10, 2026
9e1d3a6
fix(run-engine): widen unblock transaction limits for batch-scale wai…
d-cs Jun 10, 2026
9cffdce
chore: add server-changes entry for transactional run completion
d-cs Jun 10, 2026
36c6a3f
fix(run-engine): log snapshot ids instead of full objects on the unbl…
d-cs Jun 10, 2026
f9974dd
Revert "fix(run-engine): log snapshot ids instead of full objects on …
d-cs Jun 10, 2026
3a89556
docs(run-engine): explain why the SUSPENDED unblock path stays non-tr…
d-cs Jun 10, 2026
485147d
chore: drop implementation plan doc from the branch
d-cs Jun 10, 2026
92277a5
docs(run-engine): trim narration and history references from comments
d-cs Jun 10, 2026
fc20dc2
fix(run-engine): move snapshot side effects and ttl ack out of the st…
d-cs Jun 10, 2026
06abd0a
Merge branch 'main' into run-engine-transactional-completion
d-cs Jun 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix(run-engine): keep snapshot side effects out of the unblock transa…
…ction

Split createExecutionSnapshot into createExecutionSnapshotMutation (pure
Postgres, safe inside a tx) and scheduleSnapshotSideEffects (Redis heartbeat
enqueue + eventBus.emit, must run after commit).

In continueRunIfUnblocked EXECUTING_WITH_WAITPOINTS, the $transaction now
calls only createExecutionSnapshotMutation; scheduleSnapshotSideEffects is
called after the tx returns so heartbeat/event always reference a durable row.
createExecutionSnapshot is reimplemented as mutation → side-effects, keeping
all other callers byte-identical.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
  • Loading branch information
d-cs and claude committed Jun 10, 2026
commit b4ce360eb291007f4bd392dd08303456eb76fb04
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,12 @@ export class ExecutionSnapshotSystem {
this.heartbeatTimeouts = options.heartbeatTimeouts;
}

public async createExecutionSnapshot(
/**
* Pure Postgres mutation — safe inside a transaction. Inserts the snapshot row and
* returns the enhanced result. Direct callers MUST call `scheduleSnapshotSideEffects()`
* with the result after the surrounding transaction commits.
*/
public async createExecutionSnapshotMutation(
prisma: PrismaClientOrTransaction,
{
run,
Expand Down Expand Up @@ -399,34 +404,94 @@ export class ExecutionSnapshotSystem {
},
});

return {
...newSnapshot,
friendlyId: SnapshotId.toFriendlyId(newSnapshot.id),
runFriendlyId: RunId.toFriendlyId(newSnapshot.runId),
completedWaitpoints,
};
}

/**
* Post-commit side effects for a newly created snapshot: arms the heartbeat job (if
* the execution status requires one) and emits the `executionSnapshotCreated` event.
* Never call this inside a transaction — these side effects must only run against a
* durable (committed) snapshot row.
*/
public async scheduleSnapshotSideEffects({
snapshot,
runId,
error,
completedWaitpoints,
}: {
snapshot: Awaited<ReturnType<ExecutionSnapshotSystem["createExecutionSnapshotMutation"]>>;
runId: string;
error?: string;
completedWaitpoints?: { id: string; index?: number }[];
}) {
if (!error) {
//set heartbeat (if relevant)
const intervalMs = this.#getHeartbeatIntervalMs(newSnapshot.executionStatus);
const intervalMs = this.#getHeartbeatIntervalMs(snapshot.executionStatus);
if (intervalMs !== null) {
await this.$.worker.enqueue({
id: `heartbeatSnapshot.${run.id}`,
id: `heartbeatSnapshot.${runId}`,
job: "heartbeatSnapshot",
payload: { snapshotId: newSnapshot.id, runId: run.id },
payload: { snapshotId: snapshot.id, runId },
availableAt: new Date(Date.now() + intervalMs),
});
}
}

this.$.eventBus.emit("executionSnapshotCreated", {
time: newSnapshot.createdAt,
time: snapshot.createdAt,
run: {
id: newSnapshot.runId,
id: snapshot.runId,
},
snapshot: {
...newSnapshot,
...snapshot,
completedWaitpointIds: completedWaitpoints?.map((w) => w.id) ?? [],
},
});
}

public async createExecutionSnapshot(
prisma: PrismaClientOrTransaction,
args: {
run: { id: string; status: TaskRunStatus; attemptNumber?: number | null };
snapshot: {
executionStatus: TaskRunExecutionStatus;
description: string;
metadata?: Prisma.JsonValue;
};
previousSnapshotId?: string;
batchId?: string;
environmentId: string;
environmentType: RuntimeEnvironmentType;
projectId: string;
organizationId: string;
checkpointId?: string;
workerId?: string;
runnerId?: string;
completedWaitpoints?: {
id: string;
index?: number;
}[];
error?: string;
}
) {
const newSnapshot = await this.createExecutionSnapshotMutation(prisma, args);

await this.scheduleSnapshotSideEffects({
snapshot: newSnapshot,
runId: args.run.id,
error: args.error,
completedWaitpoints: args.completedWaitpoints,
});

return {
...newSnapshot,
friendlyId: SnapshotId.toFriendlyId(newSnapshot.id),
runFriendlyId: RunId.toFriendlyId(newSnapshot.runId),
friendlyId: newSnapshot.friendlyId,
runFriendlyId: newSnapshot.runFriendlyId,
};
}

Expand Down
26 changes: 17 additions & 9 deletions internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -836,12 +836,16 @@ export class WaitpointSystem {
};
}
case "EXECUTING_WITH_WAITPOINTS": {
const completedWaitpointArgs = blockingWaitpoints.map((b) => ({
id: b.waitpoint.id,
index: b.batchIndex ?? undefined,
}));

const newSnapshot = await $transaction(
this.$.prisma,
async (tx) => {
const createdSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
tx,
{
const createdSnapshot =
await this.executionSnapshotSystem.createExecutionSnapshotMutation(tx, {
run: {
id: runId,
status: snapshot.runStatus,
Expand All @@ -857,12 +861,8 @@ export class WaitpointSystem {
projectId: snapshot.projectId,
organizationId: snapshot.organizationId,
batchId: snapshot.batchId ?? undefined,
completedWaitpoints: blockingWaitpoints.map((b) => ({
id: b.waitpoint.id,
index: b.batchIndex ?? undefined,
})),
}
);
completedWaitpoints: completedWaitpointArgs,
});

// Remove the blocking waitpoints in the same transaction, so the
// new snapshot and the unblock are atomic.
Expand Down Expand Up @@ -891,6 +891,14 @@ export class WaitpointSystem {
throw new Error(`continueRunIfUnblocked: failed to unblock run: ${runId}`);
}

// Schedule side effects (heartbeat + eventBus) AFTER the transaction has
// committed, so they always reference a durable snapshot row.
await this.executionSnapshotSystem.scheduleSnapshotSideEffects({
snapshot: newSnapshot,
runId,
completedWaitpoints: completedWaitpointArgs,
});

this.$.logger.debug(
`continueRunIfUnblocked: run was still executing, sending notification`,
{
Expand Down