Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
2 changes: 2 additions & 0 deletions 2 apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const Env = z.object({
// Dequeue settings (provider mode)
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000),
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10),

// Optional services
TRIGGER_WARM_START_URL: z.string().optional(),
Expand All @@ -50,6 +51,7 @@ const Env = z.object({
// Kubernetes specific settings
KUBERNETES_FORCE_ENABLED: BoolEnv.default(false),
KUBERNETES_NAMESPACE: z.string().default("default"),
KUBERNETES_WORKER_NODETYPE_LABEL: z.string().default("v4-worker"),
EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"),
EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"),

Expand Down
17 changes: 11 additions & 6 deletions 17 apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ class ManagedSupervisor {
this.logger.warn("[ManagedWorker] Failed pod handler disabled");
}

this.resourceMonitor = new KubernetesResourceMonitor(createK8sApi(), "");
this.resourceMonitor = new KubernetesResourceMonitor(
createK8sApi(),
env.TRIGGER_WORKER_INSTANCE_NAME
);
this.workloadManager = new KubernetesWorkloadManager(workloadManagerOptions);
} else {
this.resourceMonitor = new DockerResourceMonitor(new Docker());
Expand All @@ -113,10 +116,11 @@ class ManagedSupervisor {
managedWorkerSecret: env.MANAGED_WORKER_SECRET,
dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS,
queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED,
maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT,
runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED,
preDequeue: async () => {
if (this.isKubernetes) {
// TODO: Test k8s resource monitor and remove this
// Not used in k8s for now
return {};
}

Expand Down Expand Up @@ -234,10 +238,11 @@ class ManagedSupervisor {
snapshotFriendlyId: message.snapshot.friendlyId,
});

this.resourceMonitor.blockResources({
cpu: message.run.machine.cpu,
memory: message.run.machine.memory,
});
// Disabled for now
// this.resourceMonitor.blockResources({
// cpu: message.run.machine.cpu,
// memory: message.run.machine.memory,
// });
} catch (error) {
this.logger.error("[ManagedWorker] Failed to create workload", { error });
}
Expand Down
4 changes: 4 additions & 0 deletions 4 apps/supervisor/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ export function getDockerHostDomain() {

return isMacOs || isWindows ? "host.docker.internal" : "localhost";
}

export function getRunnerId(runId: string) {
return `runner-${runId.replace("run_", "")}`;
}
6 changes: 3 additions & 3 deletions 6 apps/supervisor/src/workloadManager/docker.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
import { RunnerId } from "@trigger.dev/core/v3/isomorphic";
import {
type WorkloadManager,
type WorkloadManagerCreateOptions,
type WorkloadManagerOptions,
} from "./types.js";
import { x } from "tinyexec";
import { env } from "../env.js";
import { getDockerHostDomain } from "../util.js";
import { getDockerHostDomain, getRunnerId } from "../util.js";

export class DockerWorkloadManager implements WorkloadManager {
private readonly logger = new SimpleStructuredLogger("docker-workload-provider");
Expand All @@ -23,7 +22,8 @@ export class DockerWorkloadManager implements WorkloadManager {
async create(opts: WorkloadManagerCreateOptions) {
this.logger.log("[DockerWorkloadProvider] Creating container", { opts });

const runnerId = RunnerId.generate();
const runnerId = getRunnerId(opts.runFriendlyId);

const runArgs = [
"run",
"--detach",
Expand Down
6 changes: 3 additions & 3 deletions 6 apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import {
type WorkloadManagerCreateOptions,
type WorkloadManagerOptions,
} from "./types.js";
import { RunnerId } from "@trigger.dev/core/v3/isomorphic";
import type { EnvironmentType, MachinePreset } from "@trigger.dev/core/v3";
import { env } from "../env.js";
import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js";
import { getRunnerId } from "../util.js";

type ResourceQuantities = {
[K in "cpu" | "memory" | "ephemeral-storage"]?: string;
Expand All @@ -31,7 +31,7 @@ export class KubernetesWorkloadManager implements WorkloadManager {
async create(opts: WorkloadManagerCreateOptions) {
this.logger.log("[KubernetesWorkloadManager] Creating container", { opts });

const runnerId = RunnerId.generate().replace(/_/g, "-");
const runnerId = getRunnerId(opts.runFriendlyId);

try {
await this.k8s.core.createNamespacedPod({
Expand Down Expand Up @@ -217,7 +217,7 @@ export class KubernetesWorkloadManager implements WorkloadManager {
automountServiceAccountToken: false,
imagePullSecrets: this.getImagePullSecrets(),
nodeSelector: {
nodetype: "worker-re2",
nodetype: env.KUBERNETES_WORKER_NODETYPE_LABEL,
},
};
}
Expand Down
6 changes: 5 additions & 1 deletion 6 apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { json, TypedResponse } from "@remix-run/server-runtime";
import { WorkerApiDequeueRequestBody, WorkerApiDequeueResponseBody } from "@trigger.dev/core/v3/workers";
import {
WorkerApiDequeueRequestBody,
WorkerApiDequeueResponseBody,
} from "@trigger.dev/core/v3/workers";
import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";

export const action = createActionWorkerApiRoute(
Expand All @@ -10,6 +13,7 @@ export const action = createActionWorkerApiRoute(
return json(
await authenticatedWorker.dequeue({
maxResources: body.maxResources,
maxRunCount: body.maxRunCount,
})
);
}
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.