Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions 1 internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ export class RunEngine {
executionSnapshotSystem: this.executionSnapshotSystem,
batchSystem: this.batchSystem,
waitpointSystem: this.waitpointSystem,
delayedRunSystem: this.delayedRunSystem,
machines: this.options.machines,
});

Expand Down
5 changes: 5 additions & 0 deletions 5 internal-packages/run-engine/src/engine/statuses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ export function isFinishedOrPendingFinished(status: TaskRunExecutionStatus): boo
return finishedStatuses.includes(status);
}

export function isInitialState(status: TaskRunExecutionStatus): boolean {
const startedStatuses: TaskRunExecutionStatus[] = ["RUN_CREATED"];
return startedStatuses.includes(status);
}

export function isFinalRunStatus(status: TaskRunStatus): boolean {
const finalStatuses: TaskRunStatus[] = [
"CANCELED",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,8 @@ export class DelayedRunSystem {
availableAt: delayUntil,
});
}

async preventDelayedRunFromBeingEnqueued({ runId }: { runId: string }) {
await this.$.worker.ack(`enqueueDelayedRun:${runId}`);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { runStatusFromError, ServiceValidationError } from "../errors.js";
import { sendNotificationToWorker } from "../eventBus.js";
import { getMachinePreset } from "../machinePresets.js";
import { retryOutcomeFromCompletion } from "../retrying.js";
import { isExecuting } from "../statuses.js";
import { isExecuting, isInitialState } from "../statuses.js";
import { RunEngineOptions } from "../types.js";
import { BatchSystem } from "./batchSystem.js";
import {
Expand All @@ -32,12 +32,14 @@ import {
} from "./executionSnapshotSystem.js";
import { SystemResources } from "./systems.js";
import { WaitpointSystem } from "./waitpointSystem.js";
import { DelayedRunSystem } from "./delayedRunSystem.js";

export type RunAttemptSystemOptions = {
resources: SystemResources;
executionSnapshotSystem: ExecutionSnapshotSystem;
batchSystem: BatchSystem;
waitpointSystem: WaitpointSystem;
delayedRunSystem: DelayedRunSystem;
retryWarmStartThresholdMs?: number;
machines: RunEngineOptions["machines"];
};
Expand All @@ -47,12 +49,14 @@ export class RunAttemptSystem {
private readonly executionSnapshotSystem: ExecutionSnapshotSystem;
private readonly batchSystem: BatchSystem;
private readonly waitpointSystem: WaitpointSystem;
private readonly delayedRunSystem: DelayedRunSystem;

constructor(private readonly options: RunAttemptSystemOptions) {
this.$ = options.resources;
this.executionSnapshotSystem = options.executionSnapshotSystem;
this.batchSystem = options.batchSystem;
this.waitpointSystem = options.waitpointSystem;
this.delayedRunSystem = options.delayedRunSystem;
}

public async startRunAttempt({
Expand Down Expand Up @@ -968,6 +972,7 @@ export class RunAttemptSystem {
completedAt: true,
taskEventStore: true,
parentTaskRunId: true,
delayUntil: true,
runtimeEnvironment: {
select: {
organizationId: true,
Expand All @@ -986,6 +991,11 @@ export class RunAttemptSystem {
},
});

//if the run is delayed and hasn't started yet, we need to prevent it being added to the queue in future
if (isInitialState(latestSnapshot.executionStatus) && run.delayUntil) {
await this.delayedRunSystem.preventDelayedRunFromBeingEnqueued({ runId });
}

//remove it from the queue and release concurrency
await this.$.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId);

Expand Down
111 changes: 111 additions & 0 deletions 111 internal-packages/run-engine/src/engine/tests/delays.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,115 @@ describe("RunEngine delays", () => {
engine.quit();
}
});

containerTest("Cancelling a delayed run", async ({ prisma, redisOptions }) => {
//create environment
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");

const engine = new RunEngine({
prisma,
worker: {
redis: redisOptions,
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 100,
},
queue: {
redis: redisOptions,
},
runLock: {
redis: redisOptions,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0001,
},
tracer: trace.getTracer("test", "0.0.0"),
});

try {
const taskIdentifier = "test-task";

//create background worker
const backgroundWorker = await setupBackgroundWorker(
engine,
authenticatedEnvironment,
taskIdentifier
);

//trigger the run with a 1 second delay
const run = await engine.trigger(
{
number: 1,
friendlyId: "run_1234",
environment: authenticatedEnvironment,
taskIdentifier,
payload: "{}",
payloadType: "application/json",
context: {},
traceContext: {},
traceId: "t12345",
spanId: "s12345",
masterQueue: "main",
queue: "task/test-task",
isTest: false,
tags: [],
delayUntil: new Date(Date.now() + 1000),
},
prisma
);

//verify it's created but not queued
const executionData = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData);
expect(executionData.snapshot.executionStatus).toBe("RUN_CREATED");
expect(run.status).toBe("DELAYED");

//cancel the run
await engine.cancelRun({
runId: run.id,
reason: "Cancelled by test",
});

//verify it's cancelled
const executionData2 = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData2);
expect(executionData2.snapshot.executionStatus).toBe("FINISHED");
expect(executionData2.run.status).toBe("CANCELED");

//wait past the original delay time
await setTimeout(1500);

//verify the run is still cancelled
const executionData3 = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData3);
expect(executionData3.snapshot.executionStatus).toBe("FINISHED");
expect(executionData3.run.status).toBe("CANCELED");

//attempt to dequeue - should get nothing
const dequeued = await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: run.masterQueue,
maxRunCount: 10,
});

expect(dequeued.length).toBe(0);

//verify final state is still cancelled
const executionData4 = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData4);
expect(executionData4.snapshot.executionStatus).toBe("FINISHED");
expect(executionData4.run.status).toBe("CANCELED");
} finally {
engine.quit();
}
});
});
Morty Proxy This is a proxified and sanitized view of the page, visit original site.