Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions 5 .changeset/add-is-replay-context.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Add `isReplay` boolean to the run context (`ctx.run.isReplay`), derived from the existing `replayedFromTaskRunFriendlyId` database field. Defaults to `false` for backwards compatibility.
15 changes: 4 additions & 11 deletions 15 apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ export type PromptSpanData = {
config?: string;
};

function extractPromptSpanData(
properties: Record<string, unknown>
): PromptSpanData | undefined {
function extractPromptSpanData(properties: Record<string, unknown>): PromptSpanData | undefined {
// Properties come as an unflattened nested object from ClickHouse,
// e.g. { prompt: { slug: "...", version: 3, ... } }
const prompt = properties.prompt;
Expand Down Expand Up @@ -592,10 +590,7 @@ export class SpanPresenter extends BasePresenter {
triggeredRuns,
aiData:
span.properties && typeof span.properties === "object"
? extractAISpanData(
span.properties as Record<string, unknown>,
span.duration / 1_000_000
)
? extractAISpanData(span.properties as Record<string, unknown>, span.duration / 1_000_000)
: undefined,
};

Expand Down Expand Up @@ -739,10 +734,7 @@ export class SpanPresenter extends BasePresenter {
"ai.streamObject",
];

if (
typeof span.message === "string" &&
AI_SUMMARY_MESSAGES.includes(span.message)
) {
if (typeof span.message === "string" && AI_SUMMARY_MESSAGES.includes(span.message)) {
const aiSummaryData = extractAISummarySpanData(
span.properties as Record<string, unknown>,
span.duration / 1_000_000
Expand Down Expand Up @@ -899,6 +891,7 @@ export class SpanPresenter extends BasePresenter {
createdAt: run.createdAt,
tags: run.runTags,
isTest: run.isTest,
isReplay: !!run.replayedFromTaskRunFriendlyId,
idempotencyKey: getUserProvidedIdempotencyKey(run) ?? undefined,
startedAt: run.startedAt ?? run.createdAt,
durationMs: run.usageDurationMs,
Expand Down
1 change: 1 addition & 0 deletions 1 apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ export class DevQueueConsumer {
runId: lockedTaskRun.friendlyId,
messageId: lockedTaskRun.id,
isTest: lockedTaskRun.isTest,
isReplay: !!lockedTaskRun.replayedFromTaskRunFriendlyId,
metrics: [
{
name: "start",
Expand Down
4 changes: 4 additions & 0 deletions 4 apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,7 @@ export const AttemptForExecutionGetPayload = {
createdAt: true,
startedAt: true,
isTest: true,
replayedFromTaskRunFriendlyId: true,
metadata: true,
metadataType: true,
idempotencyKey: true,
Expand Down Expand Up @@ -1726,6 +1727,7 @@ class SharedQueueTasks {
startedAt: taskRun.startedAt ?? taskRun.createdAt,
tags: taskRun.runTags ?? [],
isTest: taskRun.isTest,
isReplay: !!taskRun.replayedFromTaskRunFriendlyId,
idempotencyKey: taskRun.idempotencyKey ?? undefined,
durationMs: taskRun.usageDurationMs,
costInCents: taskRun.costInCents,
Expand Down Expand Up @@ -2045,6 +2047,7 @@ class SharedQueueTasks {
traceContext: true,
friendlyId: true,
isTest: true,
replayedFromTaskRunFriendlyId: true,
lockedBy: {
select: {
machineConfig: true,
Expand Down Expand Up @@ -2090,6 +2093,7 @@ class SharedQueueTasks {
runId: run.friendlyId,
messageId: run.id,
isTest: run.isTest,
isReplay: !!run.replayedFromTaskRunFriendlyId,
attemptCount,
metrics: [],
} satisfies TaskRunExecutionLazyAttemptPayload;
Expand Down
1 change: 1 addition & 0 deletions 1 apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ export class CreateTaskRunAttemptService extends BaseService {
createdAt: taskRun.createdAt,
tags: taskRun.runTags ?? [],
isTest: taskRun.isTest,
isReplay: !!taskRun.replayedFromTaskRunFriendlyId,
idempotencyKey: taskRun.idempotencyKey ?? undefined,
startedAt: taskRun.startedAt ?? taskRun.createdAt,
durationMs: taskRun.usageDurationMs,
Expand Down
3 changes: 3 additions & 0 deletions 3 docs/context.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ export const parentTask = task({
<ResponseField name="isTest" type="boolean">
Whether this is a [test run](/run-tests).
</ResponseField>
<ResponseField name="isReplay" type="boolean">
Whether this run is a [replay](/replaying) of a previous run.
</ResponseField>
<ResponseField name="createdAt" type="date">
The creation time of the task run.
</ResponseField>
Expand Down
15 changes: 15 additions & 0 deletions 15 docs/replaying.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ description: "A replay is a copy of a run with the same payload but against the
</Tab>
</Tabs>

### Detecting replays in your task

You can check if a run is a replay using the [context](/context) object:

```ts
export const myTask = task({
id: "my-task",
run: async (payload, { ctx }) => {
if (ctx.run.isReplay) {
// This run is a replay of a previous run
}
},
});
```

### Replaying using the SDK

You can replay a run using the SDK:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ export class DequeueSystem {
id: lockedTaskRun.id,
friendlyId: lockedTaskRun.friendlyId,
isTest: lockedTaskRun.isTest,
isReplay: !!lockedTaskRun.replayedFromTaskRunFriendlyId,
machine: machinePreset,
attemptNumber: nextAttemptNumber,
// Keeping this for backwards compatibility, but really this should be called workerQueue
Expand Down
88 changes: 46 additions & 42 deletions 88 internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ export class RunAttemptSystem {
machinePreset: true,
runTags: true,
isTest: true,
replayedFromTaskRunFriendlyId: true,
idempotencyKey: true,
idempotencyKeyOptions: true,
startedAt: true,
Expand Down Expand Up @@ -232,9 +233,9 @@ export class RunAttemptSystem {
run.lockedById
? this.#resolveTaskRunExecutionTask(run.lockedById)
: Promise.resolve({
id: run.taskIdentifier,
filePath: "unknown",
}),
id: run.taskIdentifier,
filePath: "unknown",
}),
this.#resolveTaskRunExecutionQueue({
lockedQueueId: run.lockedQueueId ?? undefined,
queueName: run.queue,
Expand All @@ -245,13 +246,13 @@ export class RunAttemptSystem {
run.lockedById
? this.#resolveTaskRunExecutionMachinePreset(run.lockedById, run.machinePreset)
: Promise.resolve(
getMachinePreset({
defaultMachine: this.options.machines.defaultMachine,
machines: this.options.machines.machines,
config: undefined,
run,
})
),
getMachinePreset({
defaultMachine: this.options.machines.defaultMachine,
machines: this.options.machines.machines,
config: undefined,
run,
})
),
run.lockedById
? this.#resolveTaskRunExecutionDeployment(run.lockedById)
: Promise.resolve(undefined),
Expand All @@ -262,6 +263,7 @@ export class RunAttemptSystem {
id: run.friendlyId,
tags: run.runTags,
isTest: run.isTest,
isReplay: !!run.replayedFromTaskRunFriendlyId,
createdAt: run.createdAt,
startedAt: run.startedAt ?? run.createdAt,
idempotencyKey: getUserProvidedIdempotencyKey(run) ?? undefined,
Expand Down Expand Up @@ -426,6 +428,7 @@ export class RunAttemptSystem {
payloadType: true,
runTags: true,
isTest: true,
replayedFromTaskRunFriendlyId: true,
idempotencyKey: true,
idempotencyKeyOptions: true,
startedAt: true,
Expand Down Expand Up @@ -459,8 +462,9 @@ export class RunAttemptSystem {
run,
snapshot: {
executionStatus: "EXECUTING",
description: `Attempt created, starting execution${isWarmStart ? " (warm start)" : ""
}`,
description: `Attempt created, starting execution${
isWarmStart ? " (warm start)" : ""
}`,
},
previousSnapshotId: latestSnapshot.id,
environmentId: latestSnapshot.environmentId,
Expand Down Expand Up @@ -574,6 +578,7 @@ export class RunAttemptSystem {
createdAt: updatedRun.createdAt,
tags: updatedRun.runTags,
isTest: updatedRun.isTest,
isReplay: !!updatedRun.replayedFromTaskRunFriendlyId,
idempotencyKey: getUserProvidedIdempotencyKey(updatedRun) ?? undefined,
idempotencyKeyScope: extractIdempotencyKeyScope(updatedRun),
startedAt: updatedRun.startedAt ?? updatedRun.createdAt,
Expand Down Expand Up @@ -618,8 +623,8 @@ export class RunAttemptSystem {
deployment,
batch: updatedRun.batchId
? {
id: BatchId.toFriendlyId(updatedRun.batchId),
}
id: BatchId.toFriendlyId(updatedRun.batchId),
}
: undefined,
};

Expand Down Expand Up @@ -1387,8 +1392,8 @@ export class RunAttemptSystem {
error,
bulkActionGroupIds: bulkActionId
? {
push: bulkActionId,
}
push: bulkActionId,
}
: undefined,
...(usageUpdate && {
usageDurationMs: usageUpdate.usageDurationMs,
Expand Down Expand Up @@ -1876,26 +1881,26 @@ export class RunAttemptSystem {
const result = await this.cache.queues.swr(cacheKey, async () => {
const queue = params.lockedQueueId
? await this.$.readOnlyPrisma.taskQueue.findFirst({
where: {
id: params.lockedQueueId,
},
select: {
id: true,
friendlyId: true,
name: true,
},
})
where: {
id: params.lockedQueueId,
},
select: {
id: true,
friendlyId: true,
name: true,
},
})
: await this.$.readOnlyPrisma.taskQueue.findFirst({
where: {
runtimeEnvironmentId: params.runtimeEnvironmentId,
name: params.queueName,
},
select: {
id: true,
friendlyId: true,
name: true,
},
});
where: {
runtimeEnvironmentId: params.runtimeEnvironmentId,
name: params.queueName,
},
select: {
id: true,
friendlyId: true,
name: true,
},
});

if (!queue) {
// Return synthetic queue so run/span view still loads (e.g. createFailedTaskRun with fallback queue)
Expand Down Expand Up @@ -2068,13 +2073,13 @@ export class RunAttemptSystem {
if (environmentType !== "DEVELOPMENT") {
const machinePreset = machinePresetName
? machinePresetFromName(
this.options.machines.machines,
machinePresetName as MachinePresetName
)
this.options.machines.machines,
machinePresetName as MachinePresetName
)
: machinePresetFromName(
this.options.machines.machines,
this.options.machines.defaultMachine
);
this.options.machines.machines,
this.options.machines.defaultMachine
);

costInCents = currentCostInCents + attemptDurationMs * machinePreset.centsPerMs;
}
Expand All @@ -2084,7 +2089,6 @@ export class RunAttemptSystem {
costInCents,
};
}

}

export function safeParseGitMeta(git: unknown): GitMeta | undefined {
Expand Down
14 changes: 8 additions & 6 deletions 14 packages/core/src/v3/schemas/common.ts
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ export const TaskRun = z.object({
payloadType: z.string(),
tags: z.array(z.string()),
isTest: z.boolean().default(false),
isReplay: z.boolean().default(false),
createdAt: z.coerce.date(),
startedAt: z.coerce.date().default(() => new Date()),
/** The user-provided idempotency key (not the hash) */
Expand Down Expand Up @@ -378,6 +379,7 @@ export const V3TaskRun = z.object({
payloadType: z.string(),
tags: z.array(z.string()),
isTest: z.boolean().default(false),
isReplay: z.boolean().default(false),
createdAt: z.coerce.date(),
startedAt: z.coerce.date().default(() => new Date()),
/** The user-provided idempotency key (not the hash) */
Expand Down Expand Up @@ -538,13 +540,13 @@ export type WaitpointTokenResult = z.infer<typeof WaitpointTokenResult>;

export type WaitpointTokenTypedResult<T> =
| {
ok: true;
output: T;
}
ok: true;
output: T;
}
| {
ok: false;
error: Error;
};
ok: false;
error: Error;
};

export const SerializedError = z.object({
message: z.string(),
Expand Down
1 change: 1 addition & 0 deletions 1 packages/core/src/v3/schemas/runEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ export const DequeuedMessage = z.object({
id: z.string(),
friendlyId: z.string(),
isTest: z.boolean(),
isReplay: z.boolean().default(false),
Comment thread
nicktrn marked this conversation as resolved.
machine: MachinePreset,
attemptNumber: z.number(),
masterQueue: z.string(),
Expand Down
1 change: 1 addition & 0 deletions 1 packages/core/src/v3/schemas/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ export const TaskRunExecutionLazyAttemptPayload = z.object({
attemptCount: z.number().optional(),
messageId: z.string(),
isTest: z.boolean(),
isReplay: z.boolean().default(false),
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
traceContext: z.record(z.unknown()),
environment: z.record(z.string()).optional(),
metrics: TaskRunExecutionMetrics.optional(),
Expand Down
1 change: 1 addition & 0 deletions 1 packages/core/src/v3/semanticInternalAttributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const SemanticInternalAttributes = {
ATTEMPT_NUMBER: "ctx.attempt.number",
RUN_ID: "ctx.run.id",
RUN_IS_TEST: "ctx.run.isTest",
RUN_IS_REPLAY: "ctx.run.isReplay",
ORIGINAL_RUN_ID: "$original_run_id",
BATCH_ID: "ctx.batch.id",
TASK_SLUG: "ctx.task.id",
Expand Down
1 change: 1 addition & 0 deletions 1 packages/core/src/v3/taskContext/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export class TaskContextAPI {
[SemanticInternalAttributes.QUEUE_ID]: this.ctx.queue.id,
[SemanticInternalAttributes.RUN_ID]: this.ctx.run.id,
[SemanticInternalAttributes.RUN_IS_TEST]: this.ctx.run.isTest,
[SemanticInternalAttributes.RUN_IS_REPLAY]: this.ctx.run.isReplay,
[SemanticInternalAttributes.BATCH_ID]: this.ctx.batch?.id,
[SemanticInternalAttributes.IDEMPOTENCY_KEY]: this.ctx.run.idempotencyKey,
};
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.