Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions 36 apps/webapp/app/v3/commonWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { singleton } from "~/utils/singleton";
import { DeliverAlertService } from "./services/alerts/deliverAlert.server";
import { PerformDeploymentAlertsService } from "./services/alerts/performDeploymentAlerts.server";
import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server";
import { ExpireEnqueuedRunService } from "./services/expireEnqueuedRun.server";
import { EnqueueDelayedRunService } from "./services/enqueueDelayedRun.server";

function initializeWorker() {
const redisOptions = {
Expand Down Expand Up @@ -52,6 +54,24 @@ function initializeWorker() {
maxAttempts: 3,
},
},
"v3.expireRun": {
schema: z.object({
runId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 6,
},
},
"v3.enqueueDelayedRun": {
schema: z.object({
runId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 6,
},
},
},
concurrency: {
workers: env.COMMON_WORKER_CONCURRENCY_WORKERS,
Expand All @@ -65,16 +85,26 @@ function initializeWorker() {
"v3.deliverAlert": async ({ payload }) => {
const service = new DeliverAlertService();

return await service.call(payload.alertId);
await service.call(payload.alertId);
},
"v3.performDeploymentAlerts": async ({ payload }) => {
const service = new PerformDeploymentAlertsService();

return await service.call(payload.deploymentId);
await service.call(payload.deploymentId);
},
"v3.performTaskRunAlerts": async ({ payload }) => {
const service = new PerformTaskRunAlertsService();
return await service.call(payload.runId);
await service.call(payload.runId);
},
"v3.expireRun": async ({ payload }) => {
const service = new ExpireEnqueuedRunService();

await service.call(payload.runId);
},
"v3.enqueueDelayedRun": async ({ payload }) => {
const service = new EnqueueDelayedRunService();

await service.call(payload.runId);
},
},
});
Expand Down
2 changes: 1 addition & 1 deletion 2 apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ export class CreateTaskRunAttemptService extends BaseService {
});

if (taskRun.ttl) {
await ExpireEnqueuedRunService.dequeue(taskRun.id, tx);
await ExpireEnqueuedRunService.ack(taskRun.id, tx);
}
}

Expand Down
28 changes: 27 additions & 1 deletion 28 apps/webapp/app/v3/services/enqueueDelayedRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,34 @@ import { logger } from "~/services/logger.server";
import { marqs } from "~/v3/marqs/index.server";
import { BaseService } from "./baseService.server";
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
import { commonWorker } from "../commonWorker.server";
import { workerQueue } from "~/services/worker.server";

export class EnqueueDelayedRunService extends BaseService {
public static async enqueue(runId: string, runAt?: Date) {
await commonWorker.enqueue({
job: "v3.enqueueDelayedRun",
payload: { runId },
availableAt: runAt,
id: `v3.enqueueDelayed:${runId}`,
});
}

public static async reschedule(runId: string, runAt?: Date) {
// We have to do this for now because it's possible that the workerQueue
// was used when the run was first delayed, and EnqueueDelayedRunService.reschedule
// is called from RescheduleTaskRunService, which allows the runAt to be changed
// so if we don't dequeue the old job, we might end up with multiple jobs
await workerQueue.dequeue(`v3.enqueueDelayedRun.${runId}`);

await commonWorker.enqueue({
job: "v3.enqueueDelayedRun",
payload: { runId },
availableAt: runAt,
id: `v3.enqueueDelayed:${runId}`,
});
}

public async call(runId: string) {
const run = await this._prisma.taskRun.findFirst({
where: {
Expand Down Expand Up @@ -52,7 +78,7 @@ export class EnqueueDelayedRunService extends BaseService {
const expireAt = parseNaturalLanguageDuration(run.ttl);

if (expireAt) {
await ExpireEnqueuedRunService.enqueue(run.id, expireAt, tx);
await ExpireEnqueuedRunService.enqueue(run.id, expireAt);
}
}
});
Expand Down
25 changes: 14 additions & 11 deletions 25 apps/webapp/app/v3/services/expireEnqueuedRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
import { PrismaClientOrTransaction } from "~/db.server";
import { logger } from "~/services/logger.server";
import { BaseService } from "./baseService.server";
import { commonWorker } from "../commonWorker.server";
import { eventRepository } from "../eventRepository.server";
import { BaseService } from "./baseService.server";
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
import { workerQueue } from "~/services/worker.server";
import { PrismaClientOrTransaction } from "~/db.server";

export class ExpireEnqueuedRunService extends BaseService {
public static async dequeue(runId: string, tx?: PrismaClientOrTransaction) {
return await workerQueue.dequeue(`v3.expireRun:${runId}`, { tx });
public static async ack(runId: string, tx?: PrismaClientOrTransaction) {
// We don't "dequeue" from the workerQueue here because it would be redundant and if this service
// is called for a run that has already started, nothing happens
await commonWorker.ack(`v3.expireRun:${runId}`);
}

public static async enqueue(runId: string, runAt?: Date, tx?: PrismaClientOrTransaction) {
return await workerQueue.enqueue(
"v3.expireRun",
{ runId },
{ runAt, jobKey: `v3.expireRun:${runId}`, tx }
);
public static async enqueue(runId: string, runAt?: Date) {
return await commonWorker.enqueue({
job: "v3.expireRun",
payload: { runId },
availableAt: runAt,
id: `v3.expireRun:${runId}`,
});
}

public async call(runId: string) {
Expand Down
2 changes: 1 addition & 1 deletion 2 apps/webapp/app/v3/services/finalizeTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export class FinalizeTaskRunService extends BaseService {
});

if (run.ttl) {
await ExpireEnqueuedRunService.dequeue(run.id);
await ExpireEnqueuedRunService.ack(run.id);
}

if (attemptStatus || error) {
Expand Down
31 changes: 12 additions & 19 deletions 31 apps/webapp/app/v3/services/rescheduleTaskRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { RescheduleRunRequestBody } from "@trigger.dev/core/v3";
import { TaskRun } from "@trigger.dev/database";
import { BaseService, ServiceValidationError } from "./baseService.server";
import { RescheduleRunRequestBody } from "@trigger.dev/core/v3";
import { EnqueueDelayedRunService } from "./enqueueDelayedRun.server";
import { parseDelay } from "./triggerTask.server";
import { $transaction } from "~/db.server";
import { workerQueue } from "~/services/worker.server";

export class RescheduleTaskRunService extends BaseService {
public async call(taskRun: TaskRun, body: RescheduleRunRequestBody) {
Expand All @@ -17,23 +16,17 @@ export class RescheduleTaskRunService extends BaseService {
throw new ServiceValidationError(`Invalid delay: ${body.delay}`);
}

return await $transaction(this._prisma, "reschedule run", async (tx) => {
const updatedRun = await tx.taskRun.update({
where: {
id: taskRun.id,
},
data: {
delayUntil: delay,
},
});
const updatedRun = await this._prisma.taskRun.update({
where: {
id: taskRun.id,
},
data: {
delayUntil: delay,
},
});

await workerQueue.enqueue(
"v3.enqueueDelayedRun",
{ runId: taskRun.id },
{ tx, runAt: delay, jobKey: `v3.enqueueDelayedRun.${taskRun.id}` }
);
await EnqueueDelayedRunService.reschedule(taskRun.id, delay);

return updatedRun;
});
return updatedRun;
Comment thread
ericallam marked this conversation as resolved.
}
}
9 changes: 3 additions & 6 deletions 9 apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { clampMaxDuration } from "../utils/maxDuration";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { Prisma, TaskRun } from "@trigger.dev/database";
import { sanitizeQueueName } from "~/models/taskQueue.server";
import { EnqueueDelayedRunService } from "./enqueueDelayedRun.server";

export type TriggerTaskServiceOptions = {
idempotencyKey?: string;
Expand Down Expand Up @@ -515,18 +516,14 @@ export class TriggerTaskService extends BaseService {
}

if (taskRun.delayUntil) {
await workerQueue.enqueue(
"v3.enqueueDelayedRun",
{ runId: taskRun.id },
{ tx, runAt: delayUntil, jobKey: `v3.enqueueDelayedRun.${taskRun.id}` }
);
await EnqueueDelayedRunService.enqueue(taskRun.id, taskRun.delayUntil);
}

if (!taskRun.delayUntil && taskRun.ttl) {
const expireAt = parseNaturalLanguageDuration(taskRun.ttl);

if (expireAt) {
await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt, tx);
await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt);
}
}

Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.