Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
6 changes: 6 additions & 0 deletions 6 .server-changes/task-metadata-cache.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Cache task defaults in Redis so the trigger API skips per-request database lookups, restoring the fast trigger path when callers pass queue and TTL options.
24 changes: 24 additions & 0 deletions 24 apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,30 @@ const EnvironmentSchema = z
CACHE_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
CACHE_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

TASK_META_CACHE_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
TASK_META_CACHE_REDIS_PORT: z.coerce
.number()
.optional()
.transform(
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)
),
TASK_META_CACHE_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
TASK_META_CACHE_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
TASK_META_CACHE_REDIS_TLS_DISABLED: z
.string()
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
TASK_META_CACHE_CURRENT_ENV_TTL_SECONDS: z.coerce.number().default(86400),
TASK_META_CACHE_BY_WORKER_TTL_SECONDS: z.coerce.number().default(2592000),

REALTIME_STREAMS_REDIS_HOST: z
.string()
.optional()
Expand Down
238 changes: 148 additions & 90 deletions 238 apps/webapp/app/runEngine/concerns/queues.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { tryCatch } from "@trigger.dev/core/v3";
import { ServiceValidationError } from "~/v3/services/common.server";
import { createCache, createLRUMemoryStore, DefaultStatefulContext, Namespace } from "@internal/cache";
import { singleton } from "~/utils/singleton";
import type { TaskMetadataCache, TaskMetadataEntry } from "~/services/taskMetadataCache.server";
import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server";

// LRU cache for environment queue sizes to reduce Redis calls
const queueSizeCache = singleton("queueSizeCache", () => {
Expand Down Expand Up @@ -63,13 +65,16 @@ function extractQueueName(queue: { name?: unknown } | undefined): string | undef

export class DefaultQueueManager implements QueueManager {
private readonly replicaPrisma: PrismaClientOrTransaction;
private readonly taskMetaCache: TaskMetadataCache;

constructor(
private readonly prisma: PrismaClientOrTransaction,
private readonly engine: RunEngine,
replicaPrisma?: PrismaClientOrTransaction
replicaPrisma?: PrismaClientOrTransaction,
taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance
) {
this.replicaPrisma = replicaPrisma ?? prisma;
this.taskMetaCache = taskMetaCache;
}

async resolveQueueProperties(
Expand All @@ -87,7 +92,10 @@ export class DefaultQueueManager implements QueueManager {
const specifiedQueueName = extractQueueName(request.body.options?.queue);

if (specifiedQueueName) {
// A specific queue name is provided, validate it exists for the locked worker
// A specific queue name is provided, validate it exists for the locked worker.
// Pre-existing query — not cached because TaskQueue rows can be added or
// removed independently of BackgroundWorkerTask, and a stale "queue exists"
// claim would silently route to the wrong queue.
const specifiedQueue = await this.prisma.taskQueue.findFirst({
where: {
name: specifiedQueueName,
Expand All @@ -107,49 +115,45 @@ export class DefaultQueueManager implements QueueManager {
queueName = specifiedQueue.name;
lockedQueueId = specifiedQueue.id;

// Always fetch the task so we can resolve `triggerSource` (which
// becomes `taskKind` on annotations and replicates to ClickHouse).
// Without this, AGENT/SCHEDULED runs triggered with
// `lockToVersion` + a queue override would be annotated as
// STANDARD and disappear from the run-list "Source" filter.
// `ttl` is read from the same row but only used when the caller
// didn't specify a per-trigger TTL.
const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: {
workerId: lockedBackgroundWorker.id,
runtimeEnvironmentId: request.environment.id,
slug: request.taskId,
},
select: { ttl: true, triggerSource: true },
});
// Pull `triggerSource` (for `taskKind` annotation) and `ttl` from cache.
// On cache hit this is 0 PG queries; on miss the helper falls back to
// a BackgroundWorkerTask lookup and back-fills the cache.
//
// If the task slug isn't on this locked worker version, we tolerate
// the missing row and fall through with `taskKind = undefined`
// (coalesced to "STANDARD" downstream) and `taskTtl = undefined`.
// This matches main's pre-PR behavior — the no-override branch below
// still throws because there's no queue to route to in that case,
// but here the caller already named the queue.
const lockedMeta = await this.resolveLockedTaskMetadata(
lockedBackgroundWorker.id,
request.environment.id,
request.taskId
);

if (request.body.options?.ttl === undefined) {
taskTtl = lockedTask?.ttl;
taskTtl = lockedMeta?.ttl ?? undefined;
}
taskKind = lockedTask?.triggerSource;
taskKind = lockedMeta?.triggerSource;
} else {
// No queue override - fetch task with queue to get both default queue and TTL
const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: {
workerId: lockedBackgroundWorker.id,
runtimeEnvironmentId: request.environment.id,
slug: request.taskId,
},
include: {
queue: true,
},
});
// No queue override - resolve default queue + TTL + triggerSource via cache,
// falling back to a single BackgroundWorkerTask lookup on miss.
const lockedMeta = await this.resolveLockedTaskMetadata(
lockedBackgroundWorker.id,
request.environment.id,
request.taskId
);

if (!lockedTask) {
if (!lockedMeta) {
throw new ServiceValidationError(
`Task '${request.taskId}' not found on locked version '${lockedBackgroundWorker.version ?? "<unknown>"
}'.`
);
}

taskTtl = lockedTask.ttl;
taskTtl = lockedMeta.ttl;

if (!lockedTask.queue) {
if (!lockedMeta.queueName) {
// This case should ideally be prevented by earlier checks or schema constraints,
// but handle it defensively.
logger.error("Task found on locked version, but has no associated queue record", {
Expand All @@ -164,9 +168,9 @@ export class DefaultQueueManager implements QueueManager {
}

// Use the task's default queue name
queueName = lockedTask.queue.name;
lockedQueueId = lockedTask.queue.id;
taskKind = lockedTask.triggerSource;
queueName = lockedMeta.queueName;
lockedQueueId = lockedMeta.queueId ?? undefined;
taskKind = lockedMeta.triggerSource;
}
} else {
// Task is not locked to a specific version, use regular logic
Expand Down Expand Up @@ -213,76 +217,130 @@ export class DefaultQueueManager implements QueueManager {

const defaultQueueName = `task/${taskId}`;

// Even when the caller provides both a queue override and a
// per-trigger TTL, we still need to fetch the task so `triggerSource`
// (which becomes `taskKind` on annotations and replicates to
// ClickHouse) is populated. Without it, AGENT/SCHEDULED runs hitting
// this path get stamped as STANDARD and disappear from the
// dashboard's `Source` filter. Mirrors the locked-worker fix above
// — `taskTtl` is harmless in the returned value because the call
// site coalesces `body.options.ttl ?? taskTtl`.

// Find the current worker for the environment. Replica is fine here —
// the adjacent `backgroundWorkerTask` lookups below already use
// `replicaPrisma` (replica lag for "just deployed" is bounded the same
// way for both queries; reading the worker from the writer and the
// task from the replica would only widen the inconsistency window).
const worker = await findCurrentWorkerFromEnvironment(environment, this.replicaPrisma);
// Resolve the current worker's task metadata via cache (HGET on warm path,
// BackgroundWorkerTask findFirst + cache back-fill on miss). When this hits,
// both the queue-override + TTL caller and the default-queue caller satisfy
// their full result without any database query.
const meta = await this.resolveCurrentTaskMetadata(environment, taskId);

if (overriddenQueueName) {
// Caller already named the queue. We only need triggerSource (for taskKind)
// and ttl (for the call site to coalesce against body.options.ttl).
return {
queueName: overriddenQueueName,
taskTtl: meta?.ttl ?? undefined,
taskKind: meta?.triggerSource,
};
}

if (!worker) {
logger.debug("Failed to get queue name: No worker found", {
if (!meta) {
logger.debug("Failed to get queue name: No worker or task found", {
taskId,
environmentId: environment.id,
});

return { queueName: overriddenQueueName ?? defaultQueueName, taskTtl: undefined };
return { queueName: defaultQueueName, taskTtl: undefined };
}

// When queue is overridden, we only need TTL from the task (no queue join needed)
if (overriddenQueueName) {
const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: {
workerId: worker.id,
runtimeEnvironmentId: environment.id,
slug: taskId,
},
select: { ttl: true, triggerSource: true },
if (!meta.queueName) {
logger.debug("Failed to get queue name: No queue found", {
taskId,
environmentId: environment.id,
});

return { queueName: overriddenQueueName, taskTtl: task?.ttl, taskKind: task?.triggerSource };
return { queueName: defaultQueueName, taskTtl: meta.ttl, taskKind: meta.triggerSource };
}

const task = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: {
workerId: worker.id,
runtimeEnvironmentId: environment.id,
slug: taskId,
},
include: {
queue: true,
return { queueName: meta.queueName, taskTtl: meta.ttl, taskKind: meta.triggerSource };
}

/**
* Resolve task metadata for a locked-version trigger. Reads from the
* `task-meta:by-worker:{workerId}` Redis hash; falls back to a single
* BackgroundWorkerTask findFirst on miss and back-fills the cache.
*
* Returns null when no BackgroundWorkerTask row exists.
*/
private async resolveLockedTaskMetadata(
workerId: string,
environmentId: string,
slug: string
): Promise<TaskMetadataEntry | null> {
const cached = await this.taskMetaCache.getByWorker(workerId, slug);
if (cached) return cached;

const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: { workerId, runtimeEnvironmentId: environmentId, slug },
select: {
ttl: true,
triggerSource: true,
queue: { select: { id: true, name: true } },
},
});

if (!task) {
console.log("Failed to get queue name: No task found", {
taskId,
environmentId: environment.id,
});
if (!row) return null;

return { queueName: defaultQueueName, taskTtl: undefined };
}
const entry: TaskMetadataEntry = {
slug,
ttl: row.ttl,
triggerSource: row.triggerSource,
queueId: row.queue?.id ?? null,
queueName: row.queue?.name ?? "",
};

if (!task.queue) {
console.log("Failed to get queue name: No queue found", {
taskId,
environmentId: environment.id,
queueConfig: task.queueConfig,
});
// Fire-and-forget back-fill — `setByWorker` upserts the single field and
// refreshes the hash TTL. Errors are logged inside the cache and swallowed.
void this.taskMetaCache.setByWorker(workerId, entry);

return { queueName: defaultQueueName, taskTtl: task.ttl, taskKind: task.triggerSource };
}
return entry;
}

/**
* Resolve task metadata for a non-locked trigger. Reads from the
* `task-meta:env:{envId}` Redis hash; falls back to
* findCurrentWorkerFromEnvironment + a single BackgroundWorkerTask findFirst
* on miss and back-fills both keyspaces.
*
* Returns null when no current worker or task can be resolved.
*/
private async resolveCurrentTaskMetadata(
environment: AuthenticatedEnvironment,
slug: string
): Promise<TaskMetadataEntry | null> {
const cached = await this.taskMetaCache.getCurrent(environment.id, slug);
if (cached) return cached;

// Cold cache: discover the current worker for the env. Replica is fine —
// the adjacent BackgroundWorkerTask lookup below uses `replicaPrisma` too
// (replica lag for "just deployed" is bounded the same way for both
// queries; reading from the writer here would only widen the window).
const worker = await findCurrentWorkerFromEnvironment(environment, this.replicaPrisma);
if (!worker) return null;

const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: { workerId: worker.id, runtimeEnvironmentId: environment.id, slug },
select: {
ttl: true,
triggerSource: true,
queue: { select: { id: true, name: true } },
},
});

if (!row) return null;

const entry: TaskMetadataEntry = {
slug,
ttl: row.ttl,
triggerSource: row.triggerSource,
queueId: row.queue?.id ?? null,
queueName: row.queue?.name ?? "",
};

// Fire-and-forget back-fill — atomically upserts the slug into both
// keyspaces so a subsequent locked-or-not trigger hits the cache. The
// env-keyspace TTL is preserved (promotion owns it); the by-worker TTL
// is refreshed (sliding window keeps active workers warm).
void this.taskMetaCache.setByCurrentWorker(environment.id, worker.id, entry);

return { queueName: task.queue.name ?? defaultQueueName, taskTtl: task.ttl, taskKind: task.triggerSource };
return entry;
}

async validateQueueLimits(
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.