Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions 6 apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ const EnvironmentSchema = z.object({
SHARED_QUEUE_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10),
SHARED_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(100),
SHARED_QUEUE_CONSUMER_NEXT_TICK_INTERVAL_MS: z.coerce.number().int().default(100),
SHARED_QUEUE_CONSUMER_EMIT_RESUME_DEPENDENCY_TIMEOUT_MS: z.coerce.number().int().default(1000),
SHARED_QUEUE_CONSUMER_RESOLVE_PAYLOADS_BATCH_SIZE: z.coerce.number().int().default(25),

// Development OTEL environment variables
DEV_OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(),
Expand Down Expand Up @@ -219,6 +221,10 @@ const EnvironmentSchema = z.object({
.number()
.int()
.default(60 * 1000 * 15),
MARQS_SHARED_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(36),
MARQS_DEV_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(12),
MARQS_MAXIMUM_NACK_COUNT: z.coerce.number().int().default(64),

PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(),

VERBOSE_GRAPHILE_LOGGING: z.string().default("false"),
Expand Down
63 changes: 63 additions & 0 deletions 63 apps/webapp/app/models/taskQueue.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { QueueOptions } from "@trigger.dev/core/v3/schemas";
import { TaskQueue } from "@trigger.dev/database";
import { prisma } from "~/db.server";

export async function findQueueInEnvironment(
queueName: string,
environmentId: string,
backgroundWorkerTaskId?: string,
backgroundTask?: { queueConfig?: unknown }
): Promise<TaskQueue | undefined> {
const sanitizedQueueName = sanitizeQueueName(queueName);

const queue = await prisma.taskQueue.findFirst({
where: {
runtimeEnvironmentId: environmentId,
name: sanitizedQueueName,
},
});

if (queue) {
return queue;
}

const task = backgroundTask
? backgroundTask
: backgroundWorkerTaskId
? await prisma.backgroundWorkerTask.findFirst({
where: {
id: backgroundWorkerTaskId,
},
})
: undefined;

if (!task) {
return;
}

const queueConfig = QueueOptions.safeParse(task.queueConfig);

if (queueConfig.success) {
const taskQueueName = queueConfig.data.name
? sanitizeQueueName(queueConfig.data.name)
: undefined;

if (taskQueueName && taskQueueName !== sanitizedQueueName) {
const queue = await prisma.taskQueue.findFirst({
where: {
runtimeEnvironmentId: environmentId,
name: taskQueueName,
},
});

if (queue) {
return queue;
}
}
}
}

// Only allow alphanumeric characters, underscores, hyphens, and slashes (and only the first 128 characters)
export function sanitizeQueueName(queueName: string) {
return queueName.replace(/[^a-zA-Z0-9_\-\/]/g, "").substring(0, 128);
}
Comment thread
ericallam marked this conversation as resolved.
31 changes: 0 additions & 31 deletions 31 apps/webapp/app/routes/admin.api.v1.marqs.ts

This file was deleted.

3 changes: 2 additions & 1 deletion 3 apps/webapp/app/services/apiAuth.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
isPersonalAccessToken,
} from "./personalAccessToken.server";
import { isPublicJWT, validatePublicJwtKey } from "./realtime/jwtAuth.server";
import { RuntimeEnvironmentForEnvRepo } from "~/v3/environmentVariables/environmentVariablesRepository.server";

const ClaimsSchema = z.object({
scopes: z.array(z.string()).optional(),
Expand Down Expand Up @@ -410,7 +411,7 @@ const JWT_ALGORITHM = "HS256";
const DEFAULT_JWT_EXPIRATION_IN_MS = 1000 * 60 * 60; // 1 hour

export async function generateJWTTokenForEnvironment(
environment: RuntimeEnvironment,
environment: RuntimeEnvironmentForEnvRepo,
payload: Record<string, string>
) {
const jwt = await new SignJWT({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,26 @@ export class EnvironmentVariablesRepository implements Repository {
}
}

export const RuntimeEnvironmentForEnvRepoPayload = {
select: {
id: true,
slug: true,
type: true,
projectId: true,
apiKey: true,
organizationId: true,
},
} as const;

export type RuntimeEnvironmentForEnvRepo = Prisma.RuntimeEnvironmentGetPayload<
typeof RuntimeEnvironmentForEnvRepoPayload
>;

export const environmentVariablesRepository = new EnvironmentVariablesRepository();

export async function resolveVariablesForEnvironment(runtimeEnvironment: RuntimeEnvironment) {
export async function resolveVariablesForEnvironment(
runtimeEnvironment: RuntimeEnvironmentForEnvRepo
) {
const projectSecrets = await environmentVariablesRepository.getEnvironmentVariables(
runtimeEnvironment.projectId,
runtimeEnvironment.id
Expand All @@ -672,7 +689,9 @@ export async function resolveVariablesForEnvironment(runtimeEnvironment: Runtime
return [...overridableTriggerVariables, ...projectSecrets, ...builtInVariables];
}

async function resolveOverridableTriggerVariables(runtimeEnvironment: RuntimeEnvironment) {
async function resolveOverridableTriggerVariables(
runtimeEnvironment: RuntimeEnvironmentForEnvRepo
) {
let result: Array<EnvironmentVariable> = [
{
key: "TRIGGER_REALTIME_STREAM_VERSION",
Expand All @@ -683,7 +702,7 @@ async function resolveOverridableTriggerVariables(runtimeEnvironment: RuntimeEnv
return result;
}

async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment) {
async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironmentForEnvRepo) {
let result: Array<EnvironmentVariable> = [
{
key: "OTEL_EXPORTER_OTLP_ENDPOINT",
Expand Down Expand Up @@ -745,7 +764,7 @@ async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment
return [...result, ...commonVariables];
}

async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironment) {
async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironmentForEnvRepo) {
let result: Array<EnvironmentVariable> = [
{
key: "TRIGGER_SECRET_KEY",
Expand Down Expand Up @@ -838,7 +857,7 @@ async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironmen
}

async function resolveCommonBuiltInVariables(
runtimeEnvironment: RuntimeEnvironment
runtimeEnvironment: RuntimeEnvironmentForEnvRepo
): Promise<Array<EnvironmentVariable>> {
return [];
Comment thread
ericallam marked this conversation as resolved.
}
25 changes: 10 additions & 15 deletions 25 apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,15 @@ import { prisma } from "~/db.server";
import { createNewSession, disconnectSession } from "~/models/runtimeEnvironment.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server";
import { marqs } from "~/v3/marqs/index.server";
import { resolveVariablesForEnvironment } from "../environmentVariables/environmentVariablesRepository.server";
import { FailedTaskRunService } from "../failedTaskRun.server";
import { CancelDevSessionRunsService } from "../services/cancelDevSessionRuns.server";
import { CompleteAttemptService } from "../services/completeAttempt.server";
import {
SEMINTATTRS_FORCE_RECORDING,
attributesFromAuthenticatedEnv,
tracer,
} from "../tracer.server";
import { DevSubscriber, devPubSub } from "./devPubSub.server";
import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
import { getMaxDuration } from "../utils/maxDuration";
import { DevSubscriber, devPubSub } from "./devPubSub.server";
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";

const MessageBody = z.discriminatedUnion("type", [
z.object({
Expand Down Expand Up @@ -436,14 +433,12 @@ export class DevQueueConsumer {
return;
}

const queue = await prisma.taskQueue.findUnique({
where: {
runtimeEnvironmentId_name: {
runtimeEnvironmentId: this.env.id,
name: sanitizeQueueName(lockedTaskRun.queue),
},
},
});
const queue = await findQueueInEnvironment(
lockedTaskRun.queue,
this.env.id,
backgroundTask.id,
backgroundTask
);

if (!queue) {
logger.debug("[DevQueueConsumer] Failed to find queue", {
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.