Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions 5 .changeset/wet-dragons-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"trigger.dev": patch
---

Serialize metadata to prevent invalid metadata from breaking run completions
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { startSpan } from "@internal/tracing";
import {
CompleteRunAttemptResult,
ExecutionResult,
FlushedRunMetadata,
GitMeta,
StartRunAttemptResult,
TaskRunError,
Expand Down Expand Up @@ -35,6 +36,7 @@ import {
import { ReleaseConcurrencySystem } from "./releaseConcurrencySystem.js";
import { SystemResources } from "./systems.js";
import { WaitpointSystem } from "./waitpointSystem.js";
import { tryCatch } from "@trigger.dev/core/utils";

export type RunAttemptSystemOptions = {
resources: SystemResources;
Expand Down Expand Up @@ -386,15 +388,7 @@ export class RunAttemptSystem {
workerId?: string;
runnerId?: string;
}): Promise<CompleteRunAttemptResult> {
if (completion.metadata) {
this.$.eventBus.emit("runMetadataUpdated", {
time: new Date(),
run: {
id: runId,
metadata: completion.metadata,
},
});
}
await this.#notifyMetadataUpdated(runId, completion);

switch (completion.ok) {
case true: {
Expand Down Expand Up @@ -1314,4 +1308,56 @@ export class RunAttemptSystem {

return taskRun?.runtimeEnvironment;
}

async #notifyMetadataUpdated(runId: string, completion: TaskRunExecutionResult) {
if (completion.metadata) {
this.$.eventBus.emit("runMetadataUpdated", {
time: new Date(),
run: {
id: runId,
metadata: completion.metadata,
},
});

return;
}

if (completion.flushedMetadata) {
const [packetError, packet] = await tryCatch(parsePacket(completion.flushedMetadata));

if (!packet) {
return;
}

if (packetError) {
this.$.logger.error("RunEngine.completeRunAttempt(): failed to parse flushed metadata", {
runId,
flushedMetadata: completion.flushedMetadata,
error: packetError,
});

return;
}

const metadata = FlushedRunMetadata.safeParse(packet);

if (!metadata.success) {
this.$.logger.error("RunEngine.completeRunAttempt(): failed to parse flushed metadata", {
runId,
flushedMetadata: completion.flushedMetadata,
error: metadata.error,
});

return;
}

this.$.eventBus.emit("runMetadataUpdated", {
time: new Date(),
run: {
id: runId,
metadata: metadata.data,
},
});
}
}
}
12 changes: 6 additions & 6 deletions 12 packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand Down Expand Up @@ -382,7 +382,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand Down Expand Up @@ -447,7 +447,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand All @@ -473,7 +473,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand Down Expand Up @@ -518,7 +518,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: usageSample.cpuTime,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});
}
Expand All @@ -544,7 +544,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});
}
Expand Down
12 changes: 6 additions & 6 deletions 12 packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand Down Expand Up @@ -381,7 +381,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand Down Expand Up @@ -444,7 +444,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand Down Expand Up @@ -472,7 +472,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});

Expand Down Expand Up @@ -517,7 +517,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: usageSample.cpuTime,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});
}
Expand All @@ -544,7 +544,7 @@ const zodIpc = new ZodIpcConnection({
usage: {
durationMs: 0,
},
metadata: runMetadataManager.stopAndReturnLastFlush(),
flushedMetadata: await runMetadataManager.stopAndReturnLastFlush(),
},
});
}
Expand Down
11 changes: 8 additions & 3 deletions 11 packages/core/src/v3/runMetadata/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { MetadataStream } from "./metadataStream.js";
import { applyMetadataOperations, collapseOperations } from "./operations.js";
import { RunMetadataManager, RunMetadataUpdater } from "./types.js";
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
import { IOPacket, stringifyIO } from "../utils/ioSerialization.js";

const MAXIMUM_ACTIVE_STREAMS = 5;
const MAXIMUM_TOTAL_STREAMS = 10;
Expand Down Expand Up @@ -422,23 +423,27 @@ export class StandardMetadataManager implements RunMetadataManager {
}
}

stopAndReturnLastFlush(): FlushedRunMetadata | undefined {
async stopAndReturnLastFlush(): Promise<IOPacket> {
this.stopPeriodicFlush();
this.isFlushing = true;

if (!this.#needsFlush()) {
return;
return { dataType: "application/json" };
}

const operations = Array.from(this.queuedOperations);
const parentOperations = Array.from(this.queuedParentOperations);
const rootOperations = Array.from(this.queuedRootOperations);

return {
const data = {
operations: collapseOperations(operations),
parentOperations: collapseOperations(parentOperations),
rootOperations: collapseOperations(rootOperations),
};

const packet = await stringifyIO(data);

return packet;
}

#needsFlush(): boolean {
Expand Down
16 changes: 16 additions & 0 deletions 16 packages/core/src/v3/schemas/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,15 @@ export const TaskRunFailedExecutionResult = z.object({
usage: TaskRunExecutionUsage.optional(),
// Optional for now for backwards compatibility
taskIdentifier: z.string().optional(),
// This is deprecated, use flushedMetadata instead
metadata: FlushedRunMetadata.optional(),
// This is the new way to flush metadata
flushedMetadata: z
.object({
data: z.string().optional(),
dataType: z.string(),
})
.optional(),
});

export type TaskRunFailedExecutionResult = z.infer<typeof TaskRunFailedExecutionResult>;
Expand All @@ -389,7 +397,15 @@ export const TaskRunSuccessfulExecutionResult = z.object({
usage: TaskRunExecutionUsage.optional(),
// Optional for now for backwards compatibility
taskIdentifier: z.string().optional(),
// This is deprecated, use flushedMetadata instead
metadata: FlushedRunMetadata.optional(),
// This is the new way to flush metadata
flushedMetadata: z
.object({
data: z.string().optional(),
dataType: z.string(),
})
.optional(),
});

export type TaskRunSuccessfulExecutionResult = z.infer<typeof TaskRunSuccessfulExecutionResult>;
Expand Down
16 changes: 16 additions & 0 deletions 16 references/hello-world/src/trigger/metadata.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { metadata, task } from "@trigger.dev/sdk";

export const metadataTestTask = task({
id: "metadata-tester",
retry: {
maxAttempts: 3,
minTimeoutInMs: 500,
maxTimeoutInMs: 1000,
factor: 1.5,
},
run: async (payload: any, { ctx }) => {
metadata.set("test-key", "test-value");
metadata.append("test-keys", "test-value");
metadata.increment("test-counter", 1);
},
});
Morty Proxy This is a proxified and sanitized view of the page, visit original site.