Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions 8 .changeset/tame-oranges-change.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@trigger.dev/redis-worker": patch
"@trigger.dev/sdk": patch
"trigger.dev": patch
"@trigger.dev/core": patch
---

Adapted the CLI API client to propagate the trigger source via http headers.
6 changes: 5 additions & 1 deletion 6 apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { prisma } from "~/db.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server";
import { sanitizeTriggerSource } from "~/utils/triggerSource";

const ParamsSchema = z.object({
/* This is the run friendly ID */
Expand Down Expand Up @@ -41,8 +42,11 @@ export async function action({ request, params }: ActionFunctionArgs) {
return json({ error: "Run not found" }, { status: 404 });
}

const triggerSource =
sanitizeTriggerSource(request.headers.get("x-trigger-source")) ?? "api";

const service = new ReplayTaskRunService();
const newRun = await service.call(taskRun);
const newRun = await service.call(taskRun, { triggerSource });

if (!newRun) {
return json({ error: "Failed to create new run" }, { status: 400 });
Expand Down
5 changes: 5 additions & 0 deletions 5 apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
handleRequestIdempotency,
saveRequestIdempotency,
} from "~/utils/requestIdempotency.server";
import { sanitizeTriggerSource } from "~/utils/triggerSource";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { OutOfEntitlementError, TriggerTaskService } from "~/v3/services/triggerTask.server";

Expand All @@ -36,6 +37,7 @@ export const HeadersSchema = z.object({
"x-trigger-engine-version": RunEngineVersionSchema.nullish(),
"x-trigger-request-idempotency-key": z.string().nullish(),
"x-trigger-realtime-streams-version": z.string().nullish(),
"x-trigger-source": z.string().nullish(),
traceparent: z.string().optional(),
Comment thread
myftija marked this conversation as resolved.
tracestate: z.string().optional(),
});
Expand Down Expand Up @@ -67,6 +69,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-engine-version": engineVersion,
"x-trigger-request-idempotency-key": requestIdempotencyKey,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
"x-trigger-source": triggerSourceHeader,
} = headers;

const cachedResponse = await handleRequestIdempotency(requestIdempotencyKey, {
Expand Down Expand Up @@ -119,6 +122,8 @@ const { action, loader } = createActionApiRoute(
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader) ?? "api",
triggerAction: "trigger",
},
engineVersion ?? undefined
);
Expand Down
4 changes: 4 additions & 0 deletions 4 apps/webapp/app/routes/api.v1.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
BatchTriggerV3Service,
} from "~/v3/services/batchTriggerV3.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { sanitizeTriggerSource } from "~/utils/triggerSource";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server";
Expand Down Expand Up @@ -72,6 +73,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-engine-version": engineVersion,
"batch-processing-strategy": batchProcessingStrategy,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
"x-trigger-source": triggerSourceHeader,
traceparent,
tracestate,
} = headers;
Expand Down Expand Up @@ -113,6 +115,8 @@ const { action, loader } = createActionApiRoute(
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader) ?? "api",
triggerAction: "trigger",
});

const $responseHeaders = await responseHeaders(
Expand Down
4 changes: 4 additions & 0 deletions 4 apps/webapp/app/routes/api.v2.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { sanitizeTriggerSource } from "~/utils/triggerSource";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server";
Expand Down Expand Up @@ -62,6 +63,7 @@ const { action, loader } = createActionApiRoute(
"batch-processing-strategy": batchProcessingStrategy,
"x-trigger-request-idempotency-key": requestIdempotencyKey,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
"x-trigger-source": triggerSourceHeader,
traceparent,
Comment thread
myftija marked this conversation as resolved.
tracestate,
} = headers;
Expand Down Expand Up @@ -127,6 +129,8 @@ const { action, loader } = createActionApiRoute(
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader) ?? "api",
triggerAction: "trigger",
});

const $responseHeaders = await responseHeaders(
Expand Down
3 changes: 3 additions & 0 deletions 3 apps/webapp/app/routes/api.v3.batches.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
} from "~/utils/requestIdempotency.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { sanitizeTriggerSource } from "~/utils/triggerSource";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { determineRealtimeStreamsVersion } from "~/services/realtime/v1StreamsGlobal.server";
import { extractJwtSigningSecretKey } from "~/services/realtime/jwtAuth.server";
Expand Down Expand Up @@ -65,6 +66,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-worker": isFromWorker,
"x-trigger-client": triggerClient,
"x-trigger-realtime-streams-version": realtimeStreamsVersion,
"x-trigger-source": triggerSourceHeader,
traceparent,
tracestate,
} = headers;
Expand Down Expand Up @@ -132,6 +134,7 @@ const { action, loader } = createActionApiRoute(
realtimeStreamsVersion: determineRealtimeStreamsVersion(
realtimeStreamsVersion ?? undefined
),
triggerSource: isFromWorker ? "sdk" : sanitizeTriggerSource(triggerSourceHeader) ?? "api",
});

const $responseHeaders = await responseHeaders(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ export const action: ActionFunction = async ({ request, params }) => {
ttlSeconds: submission.value.ttlSeconds,
version: submission.value.version,
prioritySeconds: submission.value.prioritySeconds,
triggerSource: "dashboard",
});

if (!newRun) {
Expand Down
4 changes: 4 additions & 0 deletions 4 apps/webapp/app/runEngine/services/batchTrigger.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export type BatchTriggerTaskServiceOptions = {
spanParentAsLink?: boolean;
oneTimeUseToken?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
triggerAction?: string;
};

/**
Expand Down Expand Up @@ -678,6 +680,8 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
batchId: batch.id,
batchIndex: currentIndex,
realtimeStreamsVersion: options?.realtimeStreamsVersion,
triggerSource: options?.triggerSource ?? "api",
triggerAction: options?.triggerAction ?? "trigger",
},
"V2"
);
Expand Down
2 changes: 2 additions & 0 deletions 2 apps/webapp/app/runEngine/services/createBatch.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export type CreateBatchServiceOptions = {
spanParentAsLink?: boolean;
oneTimeUseToken?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
};

/**
Expand Down Expand Up @@ -143,6 +144,7 @@ export class CreateBatchService extends WithRunEngine {
idempotencyKey: body.idempotencyKey,
processingConcurrency: config.processingConcurrency,
planType,
triggerSource: options.triggerSource,
};

await this._engine.initializeBatch(initOptions);
Expand Down
13 changes: 13 additions & 0 deletions 13 apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
import { Tracer } from "@opentelemetry/api";
import { tryCatch } from "@trigger.dev/core/utils";
import {
RunAnnotations,
TaskRunError,
taskRunErrorEnhancer,
taskRunErrorToString,
Expand Down Expand Up @@ -289,6 +290,17 @@ export class RunEngineTriggerTaskService {

const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);

// Build annotations for this run
const triggerSource = options.triggerSource ?? "api";
const triggerAction = options.triggerAction ?? "trigger";
const parentAnnotations = RunAnnotations.safeParse(parentRun?.annotations).data;
const annotations = {
triggerSource,
triggerAction,
rootTriggerSource: parentAnnotations?.rootTriggerSource ?? triggerSource,
rootScheduleId: parentAnnotations?.rootScheduleId || options.scheduleId || undefined,
};

try {
return await this.traceEventConcern.traceRun(
triggerRequest,
Expand Down Expand Up @@ -369,6 +381,7 @@ export class RunEngineTriggerTaskService {
planType,
realtimeStreamsVersion: options.realtimeStreamsVersion,
debounce: body.options?.debounce,
annotations,
// When debouncing with triggerAndWait, create a span for the debounced trigger
onDebounced:
body.options?.debounce && body.options?.resumeParentOnCompletion
Expand Down
9 changes: 9 additions & 0 deletions 9 apps/webapp/app/utils/triggerSource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
const ALLOWED_TRIGGER_SOURCES = new Set(["sdk", "cli", "mcp"]);

/** Validates a client-provided trigger source header against the allowlist. */
export function sanitizeTriggerSource(value: string | null | undefined): string | undefined {
if (value && ALLOWED_TRIGGER_SOURCES.has(value)) {
return value;
}
return undefined;
}
2 changes: 2 additions & 0 deletions 2 apps/webapp/app/v3/runEngineHandlers.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,8 @@ export function setupBatchQueueCallbacks() {
batchIndex: itemIndex,
realtimeStreamsVersion: meta.realtimeStreamsVersion,
planType: meta.planType,
triggerSource: meta.parentRunId ? "sdk" : meta.triggerSource ?? "api",
triggerAction: "trigger",
},
"V2"
);
Expand Down
2 changes: 2 additions & 0 deletions 2 apps/webapp/app/v3/scheduleEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ function createScheduleEngine() {
scheduleInstanceId,
queueTimestamp: exactScheduleTime,
overrideCreatedAt: exactScheduleTime,
triggerSource: "schedule",
triggerAction: "trigger",
}
);

Expand Down
4 changes: 4 additions & 0 deletions 4 apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ export type BatchTriggerTaskServiceOptions = {
spanParentAsLink?: boolean;
oneTimeUseToken?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
triggerAction?: string;
};

type RunItemData = {
Expand Down Expand Up @@ -853,6 +855,8 @@ export class BatchTriggerV3Service extends BaseService {
skipChecks: true,
runFriendlyId: task.runId,
realtimeStreamsVersion: options?.realtimeStreamsVersion,
triggerSource: options?.triggerSource ?? "api",
triggerAction: options?.triggerAction ?? "trigger",
}
);

Expand Down
1 change: 1 addition & 0 deletions 1 apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ export class BulkActionService extends BaseService {
const [error, result] = await tryCatch(
replayService.call(run, {
bulkActionId: bulkActionId,
triggerSource: "dashboard",
})
);
if (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class PerformBulkActionService extends BaseService {
switch (item.group.type) {
case "REPLAY": {
const service = new ReplayTaskRunService(this._prisma);
const result = await service.call(item.sourceRun);
const result = await service.call(item.sourceRun, { triggerSource: "dashboard" });

await this._prisma.bulkActionItem.update({
where: { id: item.id },
Expand Down
3 changes: 3 additions & 0 deletions 3 apps/webapp/app/v3/services/replayTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type OverrideOptions = {
payload?: string;
metadata?: unknown;
bulkActionId?: string;
triggerSource?: string;
} & RunOptionsData;

export class ReplayTaskRunService extends BaseService {
Expand Down Expand Up @@ -123,6 +124,8 @@ export class ReplayTaskRunService extends BaseService {
realtimeStreamsVersion: determineRealtimeStreamsVersion(
existingTaskRun.realtimeStreamsVersion
),
triggerSource: overrideOptions.triggerSource ?? "api",
triggerAction: "replay",
}
);

Expand Down
51 changes: 29 additions & 22 deletions 51 apps/webapp/app/v3/services/testTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,35 @@ export class TestTaskService extends BaseService {

switch (triggerSource) {
case "STANDARD": {
const result = await triggerTaskService.call(data.taskIdentifier, environment, {
payload: data.payload,
options: {
test: true,
metadata: data.metadata,
delay: data.delaySeconds ? new Date(Date.now() + data.delaySeconds * 1000) : undefined,
ttl: data.ttlSeconds,
idempotencyKey: data.idempotencyKey,
idempotencyKeyTTL: data.idempotencyKeyTTLSeconds
? `${data.idempotencyKeyTTLSeconds}s`
: undefined,
queue: data.queue ? { name: data.queue } : undefined,
concurrencyKey: data.concurrencyKey,
maxAttempts: data.maxAttempts,
maxDuration: data.maxDurationSeconds,
tags: data.tags,
machine: data.machine,
region: data.region,
lockToVersion: data.version === "latest" ? undefined : data.version,
priority: data.prioritySeconds,
const result = await triggerTaskService.call(
data.taskIdentifier,
environment,
{
payload: data.payload,
options: {
test: true,
metadata: data.metadata,
delay: data.delaySeconds
? new Date(Date.now() + data.delaySeconds * 1000)
: undefined,
ttl: data.ttlSeconds,
idempotencyKey: data.idempotencyKey,
idempotencyKeyTTL: data.idempotencyKeyTTLSeconds
? `${data.idempotencyKeyTTLSeconds}s`
: undefined,
queue: data.queue ? { name: data.queue } : undefined,
concurrencyKey: data.concurrencyKey,
maxAttempts: data.maxAttempts,
maxDuration: data.maxDurationSeconds,
tags: data.tags,
machine: data.machine,
region: data.region,
lockToVersion: data.version === "latest" ? undefined : data.version,
priority: data.prioritySeconds,
},
},
});
{ triggerSource: "dashboard", triggerAction: "test" }
);

return result?.run;
}
Expand Down Expand Up @@ -72,7 +79,7 @@ export class TestTaskService extends BaseService {
priority: data.prioritySeconds,
},
},
{ customIcon: "scheduled" }
{ customIcon: "scheduled", triggerSource: "dashboard", triggerAction: "test" }
);

return result?.run;
Expand Down
2 changes: 2 additions & 0 deletions 2 apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ export type TriggerTaskServiceOptions = {
replayedFromTaskRunFriendlyId?: string;
planType?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
triggerAction?: string;
};

export class OutOfEntitlementError extends Error {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "public"."TaskRun" ADD COLUMN "annotations" JSONB;
3 changes: 3 additions & 0 deletions 3 internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,9 @@ model TaskRun {
metadataType String @default("application/json")
metadataVersion Int @default(1)

/// Structured annotations: triggerSource, triggerAction, rootTriggerSource, rootScheduleId
annotations Json?

/// Run output
output String?
outputType String @default("application/json")
Expand Down
1 change: 1 addition & 0 deletions 1 internal-packages/run-engine/src/batch-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ export class BatchQueue {
realtimeStreamsVersion: options.realtimeStreamsVersion,
idempotencyKey: options.idempotencyKey,
processingConcurrency: options.processingConcurrency,
triggerSource: options.triggerSource,
};

// Store metadata in completion tracker
Expand Down
Loading
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.