Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions 5 .changeset/tender-jobs-collect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"trigger.dev": patch
---

TriggerApiError 4xx errors will no longer cause tasks to be retried
4 changes: 2 additions & 2 deletions 4 .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
{
"type": "node-terminal",
"request": "launch",
"name": "Debug fairDequeuingStrategy.test.ts",
"command": "pnpm run test -t FairDequeuingStrategy",
"name": "Debug triggerTask.test.ts",
"command": "pnpm run test --run ./test/engine/triggerTask.test.ts",
"envFile": "${workspaceFolder}/.env",
"cwd": "${workspaceFolder}/apps/webapp",
"sourceMaps": true
Expand Down
8 changes: 7 additions & 1 deletion 8 apps/webapp/app/components/runs/v3/SpanEvents.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,13 @@ export function SpanEventError({
time={spanEvent.time}
titleClassName="text-rose-500"
/>
{enhancedException.message && <Callout variant="error">{enhancedException.message}</Callout>}
{enhancedException.message && (
<Callout variant="error">
<pre className="text-wrap font-sans text-sm font-normal text-rose-200">
{enhancedException.message}
</pre>
</Callout>
)}
{enhancedException.link &&
(enhancedException.link.magic === "CONTACT_FORM" ? (
<Feedback
Expand Down
5 changes: 4 additions & 1 deletion 5 apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
import { TaskRun } from "@trigger.dev/database";
import { z } from "zod";
import { env } from "~/env.server";
import { EngineServiceValidationError } from "~/runEngine/concerns/errors";
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
Expand Down Expand Up @@ -116,7 +117,9 @@ const { action, loader } = createActionApiRoute(
);
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 422 });
return json({ error: error.message }, { status: error.status ?? 422 });
} else if (error instanceof EngineServiceValidationError) {
return json({ error: error.message }, { status: error.status ?? 422 });
} else if (error instanceof OutOfEntitlementError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof Error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,13 @@ function RunError({ error }: { error: TaskRunError }) {
return (
<div className="flex flex-col gap-2 rounded-sm border border-rose-500/50 px-3 pb-3 pt-2">
<Header3 className="text-rose-500">{name}</Header3>
{enhancedError.message && <Callout variant="error">{enhancedError.message}</Callout>}
{enhancedError.message && (
<Callout variant="error">
<pre className="text-wrap font-sans text-sm font-normal text-rose-200">
{enhancedError.message}
</pre>
</Callout>
)}
{enhancedError.link &&
(enhancedError.link.magic === "CONTACT_FORM" ? (
<Feedback
Expand Down
6 changes: 6 additions & 0 deletions 6 apps/webapp/app/runEngine/concerns/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export class EngineServiceValidationError extends Error {
constructor(message: string, public status?: number) {
super(message);
this.name = "EngineServiceValidationError";
}
}
96 changes: 96 additions & 0 deletions 96 apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { RunId } from "@trigger.dev/core/v3/isomorphic";
import type { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
import { logger } from "~/services/logger.server";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import type { RunEngine } from "~/v3/runEngine.server";
import type { TraceEventConcern, TriggerTaskRequest } from "../types";

export type IdempotencyKeyConcernResult =
| { isCached: true; run: TaskRun }
| { isCached: false; idempotencyKey?: string; idempotencyKeyExpiresAt?: Date };

export class IdempotencyKeyConcern {
constructor(
private readonly prisma: PrismaClientOrTransaction,
private readonly engine: RunEngine,
private readonly traceEventConcern: TraceEventConcern
) {}

async handleTriggerRequest(request: TriggerTaskRequest): Promise<IdempotencyKeyConcernResult> {
const idempotencyKey = request.options?.idempotencyKey ?? request.body.options?.idempotencyKey;
const idempotencyKeyExpiresAt =
request.options?.idempotencyKeyExpiresAt ??
resolveIdempotencyKeyTTL(request.body.options?.idempotencyKeyTTL) ??
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); // 30 days

if (!idempotencyKey) {
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
}

const existingRun = idempotencyKey
? await this.prisma.taskRun.findFirst({
where: {
runtimeEnvironmentId: request.environment.id,
idempotencyKey,
taskIdentifier: request.taskId,
},
include: {
associatedWaitpoint: true,
},
})
: undefined;

if (existingRun) {
if (existingRun.idempotencyKeyExpiresAt && existingRun.idempotencyKeyExpiresAt < new Date()) {
logger.debug("[TriggerTaskService][call] Idempotency key has expired", {
idempotencyKey: request.options?.idempotencyKey,
run: existingRun,
});

// Update the existing run to remove the idempotency key
await this.prisma.taskRun.updateMany({
where: { id: existingRun.id, idempotencyKey },
data: { idempotencyKey: null, idempotencyKeyExpiresAt: null },
});
} else {
const associatedWaitpoint = existingRun.associatedWaitpoint;
const parentRunId = request.body.options?.parentRunId;
const resumeParentOnCompletion = request.body.options?.resumeParentOnCompletion;
//We're using `andWait` so we need to block the parent run with a waitpoint
if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) {
await this.traceEventConcern.traceIdempotentRun(
request,
{
existingRun,
idempotencyKey,
incomplete: associatedWaitpoint.status === "PENDING",
isError: associatedWaitpoint.outputIsError,
},
async (event) => {
//block run with waitpoint
await this.engine.blockRunWithWaitpoint({
runId: RunId.fromFriendlyId(parentRunId),
waitpoints: associatedWaitpoint.id,
spanIdToComplete: event.spanId,
batch: request.options?.batchId
? {
id: request.options.batchId,
index: request.options.batchIndex ?? 0,
}
: undefined,
projectId: request.environment.projectId,
organizationId: request.environment.organizationId,
tx: this.prisma,
releaseConcurrency: request.body.options?.releaseConcurrency,
});
}
);
}

return { isCached: true, run: existingRun };
}
}

return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
}
}
63 changes: 63 additions & 0 deletions 63 apps/webapp/app/runEngine/concerns/payloads.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
import { PayloadProcessor, TriggerTaskRequest } from "../types";
import { env } from "~/env.server";
import { startActiveSpan } from "~/v3/tracer.server";
import { uploadPacketToObjectStore } from "~/v3/r2.server";
import { EngineServiceValidationError } from "./errors";

export class DefaultPayloadProcessor implements PayloadProcessor {
async process(request: TriggerTaskRequest): Promise<IOPacket> {
return await startActiveSpan("handlePayloadPacket()", async (span) => {
const payload = request.body.payload;
const payloadType = request.body.options?.payloadType ?? "application/json";

const packet = this.#createPayloadPacket(payload, payloadType);

if (!packet.data) {
return packet;
}

const { needsOffloading, size } = packetRequiresOffloading(
packet,
env.TASK_PAYLOAD_OFFLOAD_THRESHOLD
);

span.setAttribute("needsOffloading", needsOffloading);
span.setAttribute("size", size);

if (!needsOffloading) {
return packet;
}
Comment thread
ericallam marked this conversation as resolved.

const filename = `${request.friendlyId}/payload.json`;

const [uploadError] = await tryCatch(
uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment)
);

if (uploadError) {
throw new EngineServiceValidationError(
"Failed to upload large payload to object store",
500
); // This is retryable
}
Comment thread
ericallam marked this conversation as resolved.

return {
data: filename,
dataType: "application/store",
};
});
}

#createPayloadPacket(payload: any, payloadType: string): IOPacket {
if (payloadType === "application/json") {
return { data: JSON.stringify(payload), dataType: "application/json" };
}

if (typeof payload === "string") {
return { data: payload, dataType: payloadType };
}

return { dataType: payloadType };
}
}
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.