Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions 5 .changeset/tiny-buckets-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"trigger.dev": patch
---

Fix stalled run detection
2 changes: 1 addition & 1 deletion 2 packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10);
for await (const _ of setInterval(heartbeatInterval)) {
if (_isRunning && _execution) {
try {
await zodIpc.send("TASK_HEARTBEAT", { id: _execution.attempt.id });
await zodIpc.send("TASK_HEARTBEAT", { id: _execution.run.id });
} catch (err) {
console.error("Failed to send HEARTBEAT message", err);
}
Expand Down
2 changes: 1 addition & 1 deletion 2 packages/cli-v3/src/entryPoints/managed/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const Env = z.object({
TRIGGER_SUPERVISOR_API_DOMAIN: z.string(),
TRIGGER_SUPERVISOR_API_PORT: z.coerce.number(),
TRIGGER_WORKER_INSTANCE_NAME: z.string(),
TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30),
TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(20),
TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().default(5),
TRIGGER_SUCCESS_EXIT_CODE: z.coerce.number().default(0),
TRIGGER_FAILURE_EXIT_CODE: z.coerce.number().default(1),
Expand Down
67 changes: 51 additions & 16 deletions 67 packages/cli-v3/src/entryPoints/managed/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { RunLogger, SendDebugLogOptions } from "./logger.js";
import { RunnerEnv } from "./env.js";
import { WorkloadHttpClient } from "@trigger.dev/core/v3/workers";
import { setTimeout as sleep } from "timers/promises";
import { RunExecutionHeartbeat } from "./heartbeat.js";
import { RunExecutionSnapshotPoller } from "./poller.js";
import { assertExhaustive, tryCatch } from "@trigger.dev/core/utils";
import { MetadataClient } from "./overrides.js";
Expand Down Expand Up @@ -63,9 +62,10 @@ export class RunExecution {
private restoreCount: number;

private taskRunProcess?: TaskRunProcess;
private runHeartbeat?: RunExecutionHeartbeat;
private snapshotPoller?: RunExecutionSnapshotPoller;

private lastHeartbeat?: Date;

constructor(opts: RunExecutionOptions) {
this.id = randomBytes(4).toString("hex");
this.workerManifest = opts.workerManifest;
Expand Down Expand Up @@ -105,11 +105,12 @@ export class RunExecution {
envVars: Record<string, string>;
isWarmStart?: boolean;
}) {
return new TaskRunProcess({
const taskRunProcess = new TaskRunProcess({
workerManifest: this.workerManifest,
env: {
...envVars,
...this.env.gatherProcessEnv(),
HEARTBEAT_INTERVAL_MS: String(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000),
},
serverWorker: {
id: "managed",
Expand All @@ -123,6 +124,29 @@ export class RunExecution {
},
isWarmStart,
}).initialize();

taskRunProcess.onTaskRunHeartbeat.attach(async (runId) => {
if (!this.runFriendlyId) {
this.sendDebugLog("onTaskRunHeartbeat: missing run ID", { heartbeatRunId: runId });
return;
}

if (runId !== this.runFriendlyId) {
this.sendDebugLog("onTaskRunHeartbeat: mismatched run ID", {
heartbeatRunId: runId,
expectedRunId: this.runFriendlyId,
});
return;
}

const [error] = await tryCatch(this.onHeartbeat());

if (error) {
this.sendDebugLog("onTaskRunHeartbeat: failed", { error: error.message });
}
});

return taskRunProcess;
}

/**
Expand Down Expand Up @@ -229,7 +253,6 @@ export class RunExecution {
this.currentSnapshotId = snapshot.friendlyId;

// Update services
this.runHeartbeat?.updateSnapshotId(snapshot.friendlyId);
this.snapshotPoller?.updateSnapshotId(snapshot.friendlyId);

switch (snapshot.executionStatus) {
Expand Down Expand Up @@ -450,13 +473,6 @@ export class RunExecution {
this.podScheduledAt = runOpts.podScheduledAt;

// Create and start services
this.runHeartbeat = new RunExecutionHeartbeat({
runFriendlyId: this.runFriendlyId,
snapshotFriendlyId: this.currentSnapshotId,
httpClient: this.httpClient,
logger: this.logger,
heartbeatIntervalSeconds: this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS,
});
this.snapshotPoller = new RunExecutionSnapshotPoller({
runFriendlyId: this.runFriendlyId,
snapshotFriendlyId: this.currentSnapshotId,
Expand All @@ -466,7 +482,6 @@ export class RunExecution {
handleSnapshotChange: this.handleSnapshotChange.bind(this),
});

this.runHeartbeat.start();
this.snapshotPoller.start();

const [startError, start] = await tryCatch(
Expand Down Expand Up @@ -839,9 +854,6 @@ export class RunExecution {
this.env.override(overrides);

// Update services with new values
if (overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS) {
this.runHeartbeat?.updateInterval(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000);
}
if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) {
this.snapshotPoller?.updateInterval(this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS * 1000);
}
Expand All @@ -857,6 +869,28 @@ export class RunExecution {
}
}

private async onHeartbeat() {
if (!this.runFriendlyId) {
this.sendDebugLog("Heartbeat: missing run ID");
return;
}

if (!this.currentSnapshotId) {
this.sendDebugLog("Heartbeat: missing snapshot ID");
return;
}

this.sendDebugLog("Heartbeat: started");

const response = await this.httpClient.heartbeatRun(this.runFriendlyId, this.currentSnapshotId);

if (!response.success) {
this.sendDebugLog("Heartbeat: failed", { error: response.error });
}

this.lastHeartbeat = new Date();
}

sendDebugLog(
message: string,
properties?: SendDebugLogOptions["properties"],
Expand All @@ -871,6 +905,7 @@ export class RunExecution {
snapshotId: this.currentSnapshotId,
executionId: this.id,
executionRestoreCount: this.restoreCount,
lastHeartbeat: this.lastHeartbeat?.toISOString(),
},
});
}
Expand Down Expand Up @@ -917,7 +952,7 @@ export class RunExecution {
}

private stopServices() {
this.runHeartbeat?.stop();
this.snapshotPoller?.stop();
this.taskRunProcess?.onTaskRunHeartbeat.detach();
}
}
92 changes: 0 additions & 92 deletions 92 packages/cli-v3/src/entryPoints/managed/heartbeat.ts

This file was deleted.

Morty Proxy This is a proxified and sanitized view of the page, visit original site.