Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions 6 .changeset/small-dancers-smell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Improve metadata flushing efficiency by collapsing operations
2 changes: 1 addition & 1 deletion 2 apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const { action } = createActionApiRoute(
{
params: ParamsSchema,
body: UpdateMetadataRequestBody,
maxContentLength: 1024 * 1024, // 1MB
maxContentLength: 1024 * 1024 * 2, // 2MB
method: "PUT",
},
async ({ authentication, body, params }) => {
Expand Down
1 change: 1 addition & 0 deletions 1 apps/webapp/app/services/metadata/updateMetadata.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ export class UpdateMetadataService extends BaseService {
`[UpdateMetadataService][updateRunMetadataWithOperations] Updated metadata for run ${runId}`,
{
metadata: applyResults.newMetadata,
operations: operations,
}
);
}
Expand Down
30 changes: 21 additions & 9 deletions 30 packages/core/src/v3/runMetadata/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ApiClient } from "../apiClient/index.js";
import { FlushedRunMetadata, RunMetadataChangeOperation } from "../schemas/common.js";
import { ApiRequestOptions } from "../zodfetch.js";
import { MetadataStream } from "./metadataStream.js";
import { applyMetadataOperations } from "./operations.js";
import { applyMetadataOperations, collapseOperations } from "./operations.js";
import { RunMetadataManager, RunMetadataUpdater } from "./types.js";
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";

Expand Down Expand Up @@ -33,7 +33,7 @@ export class StandardMetadataManager implements RunMetadataManager {
get parent(): RunMetadataUpdater {
// Store a reference to 'this' to ensure proper context
const self = this;

// Create the updater object and store it in a local variable
const parentUpdater: RunMetadataUpdater = {
set: (key, value) => {
Expand Down Expand Up @@ -66,14 +66,14 @@ export class StandardMetadataManager implements RunMetadataManager {
},
stream: (key, value, signal) => self.doStream(key, value, "parent", parentUpdater, signal),
};

return parentUpdater;
}

get root(): RunMetadataUpdater {
// Store a reference to 'this' to ensure proper context
const self = this;

// Create the updater object and store it in a local variable
const rootUpdater: RunMetadataUpdater = {
set: (key, value) => {
Expand Down Expand Up @@ -106,7 +106,7 @@ export class StandardMetadataManager implements RunMetadataManager {
},
stream: (key, value, signal) => self.doStream(key, value, "root", rootUpdater, signal),
};

return rootUpdater;
}

Expand Down Expand Up @@ -353,9 +353,17 @@ export class StandardMetadataManager implements RunMetadataManager {
this.queuedRootOperations.clear();

try {
const collapsedOperations = collapseOperations(operations);
const collapsedParentOperations = collapseOperations(parentOperations);
const collapsedRootOperations = collapseOperations(rootOperations);

const response = await this.apiClient.updateRunMetadata(
this.runId,
{ operations, parentOperations, rootOperations },
{
operations: collapsedOperations,
parentOperations: collapsedParentOperations,
rootOperations: collapsedRootOperations,
},
requestOptions
);

Expand Down Expand Up @@ -406,10 +414,14 @@ export class StandardMetadataManager implements RunMetadataManager {
return;
}

const operations = Array.from(this.queuedOperations);
const parentOperations = Array.from(this.queuedParentOperations);
const rootOperations = Array.from(this.queuedRootOperations);

return {
operations: Array.from(this.queuedOperations),
parentOperations: Array.from(this.queuedParentOperations),
rootOperations: Array.from(this.queuedRootOperations),
operations: collapseOperations(operations),
parentOperations: collapseOperations(parentOperations),
rootOperations: collapseOperations(rootOperations),
};
}

Expand Down
107 changes: 107 additions & 0 deletions 107 packages/core/src/v3/runMetadata/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,110 @@ export function applyMetadataOperations(

return { newMetadata, unappliedOperations };
}

/**
* Collapses metadata operations to reduce payload size and avoid 413 "Request Entity Too Large" errors.
*
* When there are many operations queued up (e.g., 10k increment operations), sending them all
* individually can result in request payloads exceeding the server's 1MB limit. This function
* intelligently combines operations where possible to reduce the payload size:
*
* - **Increment operations**: Multiple increments on the same key are summed into a single increment
* - Example: increment("counter", 1) + increment("counter", 2) → increment("counter", 3)
*
* - **Set operations**: Multiple sets on the same key keep only the last one (since later sets override earlier ones)
* - Example: set("status", "processing") + set("status", "done") → set("status", "done")
*
* - **Delete operations**: Multiple deletes on the same key keep only one (duplicates are redundant)
* - Example: del("temp") + del("temp") → del("temp")
*
* - **Append, remove, and update operations**: Preserved as-is to maintain correctness since order matters
*
* @param operations Array of metadata change operations to collapse
* @returns Collapsed array with fewer operations that produce the same final result
*
* @example
* ```typescript
* const operations = [
* { type: "increment", key: "counter", value: 1 },
* { type: "increment", key: "counter", value: 2 },
* { type: "set", key: "status", value: "processing" },
* { type: "set", key: "status", value: "done" }
* ];
*
* const collapsed = collapseOperations(operations);
* // Result: [
* // { type: "increment", key: "counter", value: 3 },
* // { type: "set", key: "status", value: "done" }
* // ]
* ```
*/
export function collapseOperations(
operations: RunMetadataChangeOperation[]
): RunMetadataChangeOperation[] {
if (operations.length === 0) {
return operations;
}

// Maps to track collapsible operations
const incrementsByKey = new Map<string, number>();
const setsByKey = new Map<string, RunMetadataChangeOperation>();
const deletesByKey = new Set<string>();
const preservedOperations: RunMetadataChangeOperation[] = [];

// Process operations in order
for (const operation of operations) {
switch (operation.type) {
case "increment": {
const currentIncrement = incrementsByKey.get(operation.key) || 0;
incrementsByKey.set(operation.key, currentIncrement + operation.value);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
break;
}
case "set": {
// Keep only the last set operation for each key
setsByKey.set(operation.key, operation);
break;
}
case "delete": {
// Keep only one delete operation per key
deletesByKey.add(operation.key);
break;
}
case "append":
case "remove":
case "update": {
// Preserve these operations as-is to maintain correctness
preservedOperations.push(operation);
break;
}
default: {
// Handle any future operation types by preserving them
preservedOperations.push(operation);
break;
}
}
}

// Build the collapsed operations array
const collapsedOperations: RunMetadataChangeOperation[] = [];

// Add collapsed increment operations
for (const [key, value] of incrementsByKey) {
collapsedOperations.push({ type: "increment", key, value });
}

// Add collapsed set operations
for (const operation of setsByKey.values()) {
collapsedOperations.push(operation);
}

// Add collapsed delete operations
for (const key of deletesByKey) {
collapsedOperations.push({ type: "delete", key });
}

// Add preserved operations
collapsedOperations.push(...preservedOperations);

return collapsedOperations;
}
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.