Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions 6 .changeset/strange-moles-provide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Auto-cancel in-flight dev runs when the CLI exits, using a detached watchdog process that survives pnpm SIGKILL
6 changes: 6 additions & 0 deletions 6 .server-changes/dev-cli-disconnect-md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Added `/engine/v1/dev/disconnect` endpoint to auto-cancel runs when the CLI disconnects. Maximum of 500 runs can be cancelled. Uses the bulk action system when there are more than 25 runs to cancel.
180 changes: 180 additions & 0 deletions 180 apps/webapp/app/routes/engine.v1.dev.disconnect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import { json } from "@remix-run/server-runtime";
import { Ratelimit } from "@upstash/ratelimit";
import { tryCatch } from "@trigger.dev/core";
import { DevDisconnectRequestBody } from "@trigger.dev/core/v3";
import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic";
import { BulkActionNotificationType, BulkActionType } from "@trigger.dev/database";
import { prisma } from "~/db.server";
import { logger } from "~/services/logger.server";
import { RateLimiter } from "~/services/rateLimiter.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server";
import { commonWorker } from "~/v3/commonWorker.server";
import pMap from "p-map";

const CANCEL_REASON = "Dev session ended (CLI exited)";

// Below this threshold, cancel runs inline with pMap.
// Above it, create a bulk action and process asynchronously.
const BULK_ACTION_THRESHOLD = 25;

// Maximum number of runs that can be cancelled in a single disconnect call.
const MAX_RUNS = 500;

// Rate limit: 5 calls per minute per environment
const disconnectRateLimiter = new RateLimiter({
keyPrefix: "dev-disconnect",
limiter: Ratelimit.fixedWindow(5, "1 m"),
logFailure: true,
});

const { action } = createActionApiRoute(
{
body: DevDisconnectRequestBody,
maxContentLength: 1024 * 256, // 256KB
method: "POST",
},
async ({ authentication, body }) => {
// Only allow dev environments — this endpoint uses finalizeRun which
// skips PENDING_CANCEL and immediately finalizes executing runs.
if (authentication.environment.type !== "DEVELOPMENT") {
return json({ error: "This endpoint is only available for dev environments" }, { status: 403 });
}

const environmentId = authentication.environment.id;

// Rate limit per environment
const rateLimitResult = await disconnectRateLimiter.limit(environmentId);
if (!rateLimitResult.success) {
return json(
{ error: "Rate limit exceeded", retryAfter: Math.ceil((rateLimitResult.reset - Date.now()) / 1000) },
{ status: 429 }
);
}

if (body.runFriendlyIds.length > MAX_RUNS) {
return json(
{ error: `A maximum of ${MAX_RUNS} runs can be cancelled per request` },
{ status: 400 }
);
}

const { runFriendlyIds } = body;

if (runFriendlyIds.length === 0) {
return json({ cancelled: 0 }, { status: 200 });
}

logger.info("Dev disconnect: cancelling runs", {
environmentId,
runCount: runFriendlyIds.length,
});

// For small numbers of runs, cancel inline
if (runFriendlyIds.length <= BULK_ACTION_THRESHOLD) {
const cancelled = await cancelRunsInline(runFriendlyIds, environmentId);
return json({ cancelled }, { status: 200 });
}

// For large numbers, create a bulk action to process asynchronously
const bulkActionId = await createBulkCancelAction(
runFriendlyIds,
authentication.environment.project.id,
environmentId
);

logger.info("Dev disconnect: created bulk action for large run set", {
environmentId,
bulkActionId,
runCount: runFriendlyIds.length,
});

return json({ cancelled: 0, bulkActionId }, { status: 200 });
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
);
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

async function cancelRunsInline(
runFriendlyIds: string[],
environmentId: string
): Promise<number> {
const runIds = runFriendlyIds.map((fid) => RunId.toId(fid));

const runs = await prisma.taskRun.findMany({
where: {
id: { in: runIds },
runtimeEnvironmentId: environmentId,
},
select: {
id: true,
engine: true,
friendlyId: true,
status: true,
createdAt: true,
completedAt: true,
taskEventStore: true,
},
});

let cancelled = 0;
const cancelService = new CancelTaskRunService(prisma);

await pMap(
runs,
async (run) => {
const [error, result] = await tryCatch(
cancelService.call(run, { reason: CANCEL_REASON, finalizeRun: true })
);

if (error) {
logger.error("Dev disconnect: failed to cancel run", {
runId: run.id,
error,
});
} else if (result && !result.alreadyFinished) {
cancelled++;
}
},
{ concurrency: 10 }
);

logger.info("Dev disconnect: completed inline cancellation", {
environmentId,
cancelled,
total: runFriendlyIds.length,
});

return cancelled;
}

async function createBulkCancelAction(
runFriendlyIds: string[],
projectId: string,
environmentId: string
): Promise<string> {
const { id, friendlyId } = BulkActionId.generate();

await prisma.bulkActionGroup.create({
data: {
id,
friendlyId,
projectId,
environmentId,
name: "Dev session disconnect",
type: BulkActionType.CANCEL,
params: { runId: runFriendlyIds, finalizeRun: true },
queryName: "bulk_action_v1",
totalCount: runFriendlyIds.length,
completionNotification: BulkActionNotificationType.NONE,
},
});

await commonWorker.enqueue({
id: `processBulkAction-${id}`,
job: "processBulkAction",
payload: { bulkActionId: id },
});

return friendlyId;
}

export { action };
5 changes: 4 additions & 1 deletion 5 apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,13 @@ export class BulkActionService extends BaseService {
}

// 2. Parse the params
const rawParams = group.params && typeof group.params === "object" ? group.params : {};
const finalizeRun = "finalizeRun" in rawParams && (rawParams as any).finalizeRun === true;
const filters = parseRunListInputOptions({
organizationId: group.project.organizationId,
projectId: group.projectId,
environmentId: group.environmentId,
...(group.params && typeof group.params === "object" ? group.params : {}),
...rawParams,
});

const runsRepository = new RunsRepository({
Expand Down Expand Up @@ -199,6 +201,7 @@ export class BulkActionService extends BaseService {
cancelService.call(run, {
reason: `Bulk action ${group.friendlyId} cancelled run`,
bulkActionId: bulkActionId,
finalizeRun,
})
);
if (error) {
Expand Down
3 changes: 3 additions & 0 deletions 3 apps/webapp/app/v3/services/cancelTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export type CancelTaskRunServiceOptions = {
cancelAttempts?: boolean;
cancelledAt?: Date;
bulkActionId?: string;
/** Skip PENDING_CANCEL and finalize immediately (use when the worker is known to be dead). */
finalizeRun?: boolean;
};

type CancelTaskRunServiceResult = {
Expand Down Expand Up @@ -57,6 +59,7 @@ export class CancelTaskRunService extends BaseService {
runId: taskRun.id,
completedAt: options?.cancelledAt,
reason: options?.reason,
finalizeRun: options?.finalizeRun,
bulkActionId: options?.bulkActionId,
tx: this._prisma,
});
Expand Down
52 changes: 28 additions & 24 deletions 52 internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1436,35 +1436,39 @@ export class RunAttemptSystem {
});

//if executing, we need to message the worker to cancel the run and put it into `PENDING_CANCEL` status
//unless finalizeRun is true (worker is known to be dead), in which case skip straight to FINISHED
if (
isExecuting(latestSnapshot.executionStatus) ||
isPendingExecuting(latestSnapshot.executionStatus)
) {
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
run,
snapshot: {
executionStatus: "PENDING_CANCEL",
description: "Run was cancelled",
},
previousSnapshotId: latestSnapshot.id,
environmentId: latestSnapshot.environmentId,
environmentType: latestSnapshot.environmentType,
projectId: latestSnapshot.projectId,
organizationId: latestSnapshot.organizationId,
workerId,
runnerId,
});
if (!finalizeRun) {
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
run,
snapshot: {
executionStatus: "PENDING_CANCEL",
description: "Run was cancelled",
},
previousSnapshotId: latestSnapshot.id,
environmentId: latestSnapshot.environmentId,
environmentType: latestSnapshot.environmentType,
projectId: latestSnapshot.projectId,
organizationId: latestSnapshot.organizationId,
workerId,
runnerId,
});

//the worker needs to be notified so it can kill the run and complete the attempt
await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
eventBus: this.$.eventBus,
});
return {
alreadyFinished: false,
...executionResultFromSnapshot(newSnapshot),
};
//the worker needs to be notified so it can kill the run and complete the attempt
await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
eventBus: this.$.eventBus,
});
return {
alreadyFinished: false,
...executionResultFromSnapshot(newSnapshot),
};
}
// finalizeRun is true — fall through to finish the run immediately
Comment thread
ericallam marked this conversation as resolved.
}

//not executing, so we will actually finish the run
Expand Down
20 changes: 20 additions & 0 deletions 20 packages/cli-v3/src/apiClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
DevConfigResponseBody,
DevDequeueRequestBody,
DevDequeueResponseBody,
DevDisconnectRequestBody,
DevDisconnectResponseBody,
EnvironmentVariableResponseBody,
FailDeploymentRequestBody,
FailDeploymentResponseBody,
Expand Down Expand Up @@ -557,6 +559,7 @@ export class CliApiClient {
heartbeatRun: this.devHeartbeatRun.bind(this),
startRunAttempt: this.devStartRunAttempt.bind(this),
completeRunAttempt: this.devCompleteRunAttempt.bind(this),
disconnect: this.devDisconnect.bind(this),
setEngineURL: this.setEngineURL.bind(this),
} as const;
}
Expand Down Expand Up @@ -681,6 +684,23 @@ export class CliApiClient {
return eventSource;
}

private async devDisconnect(
body: DevDisconnectRequestBody
): Promise<ApiResult<DevDisconnectResponseBody>> {
if (!this.accessToken) {
throw new Error("devDisconnect: No access token");
}

return wrapZodFetch(DevDisconnectResponseBody, `${this.engineURL}/engine/v1/dev/disconnect`, {
method: "POST",
headers: {
Authorization: `Bearer ${this.accessToken}`,
Accept: "application/json",
},
body: JSON.stringify(body),
});
}

private async devDequeue(
body: DevDequeueRequestBody
): Promise<ApiResult<DevDequeueResponseBody>> {
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.