Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions 5 .changeset/real-rats-drop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

Add onCancel lifecycle hook
1 change: 1 addition & 0 deletions 1 apps/webapp/app/components/runs/v3/RunIcon.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ export function RunIcon({ name, className, spanName }: TaskIconProps) {
case "task-hook-onResume":
case "task-hook-onComplete":
case "task-hook-cleanup":
case "task-hook-onCancel":
Comment thread
ericallam marked this conversation as resolved.
return <FunctionIcon className={cn(className, "text-text-dimmed")} />;
case "task-hook-onFailure":
case "task-hook-catchError":
Expand Down
23 changes: 0 additions & 23 deletions 23 apps/webapp/app/v3/services/cancelTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,6 @@ export class CancelTaskRunService extends BaseService {
tx: this._prisma,
});

const inProgressEvents = await eventRepository.queryIncompleteEvents(
getTaskEventStoreTableForRun(taskRun),
{
runId: taskRun.friendlyId,
},
taskRun.createdAt,
taskRun.completedAt ?? undefined
);

logger.debug("Cancelling in-progress events", {
inProgressEvents: inProgressEvents.map((event) => event.id),
});

await Promise.all(
inProgressEvents.map((event) => {
return eventRepository.cancelEvent(
event,
options?.cancelledAt ?? new Date(),
options?.reason ?? "Run cancelled"
);
})
);

return {
id: result.run.id,
};
Expand Down
40 changes: 32 additions & 8 deletions 40 packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
TaskRunExecution,
timeout,
TriggerConfig,
UsageMeasurement,
waitUntil,
WorkerManifest,
WorkerToExecutorMessageCatalog,
Expand Down Expand Up @@ -232,7 +233,10 @@ async function bootstrap() {

let _execution: TaskRunExecution | undefined;
let _isRunning = false;
let _isCancelled = false;
let _tracingSDK: TracingSDK | undefined;
let _executionMeasurement: UsageMeasurement | undefined;
const cancelController = new AbortController();

Comment thread
ericallam marked this conversation as resolved.
const zodIpc = new ZodIpcConnection({
listenSchema: WorkerToExecutorMessageCatalog,
Expand Down Expand Up @@ -403,18 +407,17 @@ const zodIpc = new ZodIpcConnection({
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
);

const measurement = usage.start();
_executionMeasurement = usage.start();

// This lives outside of the executor because this will eventually be moved to the controller level
const signal = execution.run.maxDuration
? timeout.abortAfterTimeout(execution.run.maxDuration)
: undefined;
const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration);

const signal = AbortSignal.any([cancelController.signal, timeoutController.signal]);

const { result } = await executor.execute(execution, metadata, traceContext, signal);

const usageSample = usage.stop(measurement);
if (_isRunning && !_isCancelled) {
const usageSample = usage.stop(_executionMeasurement);

if (_isRunning) {
return sender.send("TASK_RUN_COMPLETED", {
execution,
result: {
Expand Down Expand Up @@ -458,7 +461,16 @@ const zodIpc = new ZodIpcConnection({
WAIT_COMPLETED_NOTIFICATION: async () => {
await managedWorkerRuntime.completeWaitpoints([]);
},
FLUSH: async ({ timeoutInMs }, sender) => {
CANCEL: async ({ timeoutInMs }) => {
_isCancelled = true;
cancelController.abort("run cancelled");
await callCancelHooks(timeoutInMs);
if (_executionMeasurement) {
usage.stop(_executionMeasurement);
}
await flushAll(timeoutInMs);
},
FLUSH: async ({ timeoutInMs }) => {
await flushAll(timeoutInMs);
},
WAITPOINT_CREATED: async ({ wait, waitpoint }) => {
Expand All @@ -470,6 +482,18 @@ const zodIpc = new ZodIpcConnection({
},
});

async function callCancelHooks(timeoutInMs: number = 10_000) {
const now = performance.now();

try {
await Promise.race([lifecycleHooks.callOnCancelHookListeners(), setTimeout(timeoutInMs)]);
Comment thread
ericallam marked this conversation as resolved.
} finally {
const duration = performance.now() - now;

log(`Called cancel hooks in ${duration}ms`);
}
}

async function flushAll(timeoutInMs: number = 10_000) {
const now = performance.now();

Expand Down
38 changes: 31 additions & 7 deletions 38 packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
TaskRunExecution,
timeout,
TriggerConfig,
UsageMeasurement,
waitUntil,
WorkerManifest,
WorkerToExecutorMessageCatalog,
Expand Down Expand Up @@ -229,7 +230,10 @@ async function bootstrap() {

let _execution: TaskRunExecution | undefined;
let _isRunning = false;
let _isCancelled = false;
let _tracingSDK: TracingSDK | undefined;
let _executionMeasurement: UsageMeasurement | undefined;
const cancelController = new AbortController();

Comment thread
ericallam marked this conversation as resolved.
const zodIpc = new ZodIpcConnection({
listenSchema: WorkerToExecutorMessageCatalog,
Expand Down Expand Up @@ -398,18 +402,17 @@ const zodIpc = new ZodIpcConnection({
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
);

const measurement = usage.start();
_executionMeasurement = usage.start();

// This lives outside of the executor because this will eventually be moved to the controller level
const signal = execution.run.maxDuration
? timeout.abortAfterTimeout(execution.run.maxDuration)
: undefined;
const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration);

const signal = AbortSignal.any([cancelController.signal, timeoutController.signal]);

const { result } = await executor.execute(execution, metadata, traceContext, signal);

const usageSample = usage.stop(measurement);
if (_isRunning && !_isCancelled) {
const usageSample = usage.stop(_executionMeasurement);

if (_isRunning) {
return sender.send("TASK_RUN_COMPLETED", {
execution,
result: {
Expand Down Expand Up @@ -454,6 +457,15 @@ const zodIpc = new ZodIpcConnection({
FLUSH: async ({ timeoutInMs }, sender) => {
await flushAll(timeoutInMs);
},
CANCEL: async ({ timeoutInMs }, sender) => {
_isCancelled = true;
cancelController.abort("run cancelled");
await callCancelHooks(timeoutInMs);
if (_executionMeasurement) {
usage.stop(_executionMeasurement);
}
await flushAll(timeoutInMs);
},
WAITPOINT_CREATED: async ({ wait, waitpoint }) => {
managedWorkerRuntime.associateWaitWithWaitpoint(wait.id, waitpoint.id);
},
Expand All @@ -463,6 +475,18 @@ const zodIpc = new ZodIpcConnection({
},
});

async function callCancelHooks(timeoutInMs: number = 10_000) {
const now = performance.now();

try {
await Promise.race([lifecycleHooks.callOnCancelHookListeners(), setTimeout(timeoutInMs)]);
} finally {
const duration = performance.now() - now;

console.log(`Called cancel hooks in ${duration}ms`);
}
}

async function flushAll(timeoutInMs: number = 10_000) {
const now = performance.now();

Expand Down
15 changes: 13 additions & 2 deletions 15 packages/cli-v3/src/executions/taskRunProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ export class TaskRunProcess {
this._isBeingCancelled = true;

try {
await this.#flush();
await this.#cancel();
} catch (err) {
console.error("Error flushing task run process", { err });
console.error("Error cancelling task run process", { err });
}

await this.kill();
Expand All @@ -120,6 +120,10 @@ export class TaskRunProcess {
async cleanup(kill = true) {
this._isPreparedForNextRun = false;

if (this._isBeingCancelled) {
return;
}

try {
await this.#flush();
} catch (err) {
Expand Down Expand Up @@ -224,10 +228,17 @@ export class TaskRunProcess {
await this._ipc?.sendWithAck("FLUSH", { timeoutInMs }, timeoutInMs + 1_000);
}

async #cancel(timeoutInMs: number = 30_000) {
logger.debug("sending cancel message to task run process", { pid: this.pid, timeoutInMs });

await this._ipc?.sendWithAck("CANCEL", { timeoutInMs }, timeoutInMs + 1_000);
}

async execute(
params: TaskRunProcessExecuteParams,
isWarmStart?: boolean
): Promise<TaskRunExecutionResult> {
this._isBeingCancelled = false;
Comment thread
ericallam marked this conversation as resolved.
this._isPreparedForNextRun = false;
this._isPreparedForNextAttempt = false;

Expand Down
22 changes: 22 additions & 0 deletions 22 packages/core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,25 @@ export async function tryCatch<T, E = Error>(
return [error as E, null];
}
}

export type Deferred<T> = {
promise: Promise<T>;
resolve: (value: T) => void;
reject: (reason?: any) => void;
};

export function promiseWithResolvers<T>(): Deferred<T> {
let resolve!: (value: T) => void;
let reject!: (reason?: any) => void;

const promise = new Promise<T>((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});

return {
promise,
resolve,
reject,
};
}
3 changes: 3 additions & 0 deletions 3 packages/core/src/v3/lifecycle-hooks-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ export type {
AnyOnCleanupHookFunction,
TaskCleanupHookParams,
TaskWait,
TaskCancelHookParams,
OnCancelHookFunction,
AnyOnCancelHookFunction,
} from "./lifecycleHooks/types.js";
28 changes: 28 additions & 0 deletions 28 packages/core/src/v3/lifecycleHooks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
AnyOnStartHookFunction,
AnyOnSuccessHookFunction,
AnyOnWaitHookFunction,
AnyOnCancelHookFunction,
RegisteredHookFunction,
RegisterHookFunctionParams,
TaskWait,
Expand Down Expand Up @@ -260,6 +261,33 @@ export class LifecycleHooksAPI {
this.#getManager().registerOnResumeHookListener(listener);
}

public registerGlobalCancelHook(hook: RegisterHookFunctionParams<AnyOnCancelHookFunction>): void {
this.#getManager().registerGlobalCancelHook(hook);
}

public registerTaskCancelHook(
taskId: string,
hook: RegisterHookFunctionParams<AnyOnCancelHookFunction>
): void {
this.#getManager().registerTaskCancelHook(taskId, hook);
}

public getTaskCancelHook(taskId: string): AnyOnCancelHookFunction | undefined {
return this.#getManager().getTaskCancelHook(taskId);
}

public getGlobalCancelHooks(): RegisteredHookFunction<AnyOnCancelHookFunction>[] {
return this.#getManager().getGlobalCancelHooks();
}

public callOnCancelHookListeners(): Promise<void> {
return this.#getManager().callOnCancelHookListeners();
}

public registerOnCancelHookListener(listener: () => Promise<void>): void {
this.#getManager().registerOnCancelHookListener(listener);
}

#getManager(): LifecycleHooksManager {
return getGlobal(API_NAME) ?? NOOP_LIFECYCLE_HOOKS_MANAGER;
}
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.