Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion 2 apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ const EnvironmentSchema = z.object({
RUN_REPLICATION_ENABLED: z.string().default("0"),
RUN_REPLICATION_SLOT_NAME: z.string().default("task_runs_to_clickhouse_v1"),
RUN_REPLICATION_PUBLICATION_NAME: z.string().default("task_runs_to_clickhouse_v1_publication"),
RUN_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(100),
RUN_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(2),
RUN_REPLICATION_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
RUN_REPLICATION_FLUSH_BATCH_SIZE: z.coerce.number().int().default(100),
RUN_REPLICATION_LEADER_LOCK_TIMEOUT_MS: z.coerce.number().int().default(30_000),
Expand Down
2 changes: 1 addition & 1 deletion 2 apps/webapp/app/presenters/RunFilters.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
TaskRunListSearchFilters,
} from "~/components/runs/v3/RunFilters";
import { getRootOnlyFilterPreference } from "~/services/preferences/uiPreferences.server";
import { type ParsedRunFilters } from "~/services/runsRepository.server";
import { type ParsedRunFilters } from "~/services/runsRepository/runsRepository.server";

type FiltersFromRequest = ParsedRunFilters & Required<Pick<ParsedRunFilters, "rootOnly">>;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getUsername } from "~/utils/username";
import { BasePresenter } from "./basePresenter.server";
import { type BulkActionMode } from "~/components/BulkActionFilterSummary";
import { parseRunListInputOptions } from "~/services/runsRepository.server";
import { parseRunListInputOptions } from "~/services/runsRepository/runsRepository.server";
import { TaskRunListSearchFilters } from "~/components/runs/v3/RunFilters";

type BulkActionOptions = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { type PrismaClient } from "@trigger.dev/database";
import { CreateBulkActionSearchParams } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction";
import { clickhouseClient } from "~/services/clickhouseInstance.server";
import { RunsRepository } from "~/services/runsRepository.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { getRunFiltersFromRequest } from "../RunFilters.server";
import { BasePresenter } from "./basePresenter.server";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { type Direction } from "~/components/ListPagination";
import { timeFilters } from "~/components/runs/v3/SharedFilters";
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
import { getAllTaskIdentifiers } from "~/models/task.server";
import { RunsRepository } from "~/services/runsRepository.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { machinePresetFromRun } from "~/v3/machinePresets.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";
Expand Down
71 changes: 71 additions & 0 deletions 71 apps/webapp/app/routes/admin.api.v1.feature-flags.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
import {
makeSetFlags,
setFlags,
FeatureFlagCatalogSchema,
validateAllFeatureFlags,
validatePartialFeatureFlags,
makeSetMultipleFlags,
} from "~/v3/featureFlags.server";
import { z } from "zod";

export async function action({ request }: ActionFunctionArgs) {
// Next authenticate the request
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const user = await prisma.user.findUnique({
where: {
id: authenticationResult.userId,
},
});

if (!user) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

if (!user.admin) {
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
}

try {
// Parse the request body
const body = await request.json();

// Validate the input using the partial schema
const validationResult = validatePartialFeatureFlags(body as Record<string, unknown>);
if (!validationResult.success) {
return json(
{
error: "Invalid feature flags data",
details: validationResult.error.issues,
},
{ status: 400 }
);
}

const featureFlags = validationResult.data;
const setMultipleFlags = makeSetMultipleFlags(prisma);
const updatedFlags = await setMultipleFlags(featureFlags);

return json({
success: true,
updatedFlags,
message: `Updated ${updatedFlags.length} feature flag(s)`,
});
} catch (error) {
return json(
{
error: error instanceof Error ? error.message : String(error),
},
{ status: 400 }
);
}
}
Original file line number Diff line number Diff line change
@@ -1,79 +1,26 @@
import { type ClickHouse, type ClickhouseQueryBuilder } from "@internal/clickhouse";
import { type Tracer } from "@internal/tracing";
import { type Logger, type LogLevel } from "@trigger.dev/core/logger";
import { MachinePresetName } from "@trigger.dev/core/v3";
import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic";
import { TaskRunStatus } from "@trigger.dev/database";
import parseDuration from "parse-duration";
import { z } from "zod";
import { timeFilters } from "~/components/runs/v3/SharedFilters";
import { type PrismaClient } from "~/db.server";

export type RunsRepositoryOptions = {
clickhouse: ClickHouse;
prisma: PrismaClient;
logger?: Logger;
logLevel?: LogLevel;
tracer?: Tracer;
};

const RunStatus = z.enum(Object.values(TaskRunStatus) as [TaskRunStatus, ...TaskRunStatus[]]);

const RunListInputOptionsSchema = z.object({
organizationId: z.string(),
projectId: z.string(),
environmentId: z.string(),
//filters
tasks: z.array(z.string()).optional(),
versions: z.array(z.string()).optional(),
statuses: z.array(RunStatus).optional(),
tags: z.array(z.string()).optional(),
scheduleId: z.string().optional(),
period: z.string().optional(),
from: z.number().optional(),
to: z.number().optional(),
isTest: z.boolean().optional(),
rootOnly: z.boolean().optional(),
batchId: z.string().optional(),
runId: z.array(z.string()).optional(),
bulkId: z.string().optional(),
queues: z.array(z.string()).optional(),
machines: MachinePresetName.array().optional(),
});

export type RunListInputOptions = z.infer<typeof RunListInputOptionsSchema>;
export type RunListInputFilters = Omit<
RunListInputOptions,
"organizationId" | "projectId" | "environmentId"
>;

export type ParsedRunFilters = RunListInputFilters & {
cursor?: string;
direction?: "forward" | "backward";
};

type FilterRunsOptions = Omit<RunListInputOptions, "period"> & {
period: number | undefined;
};

type Pagination = {
page: {
size: number;
cursor?: string;
direction?: "forward" | "backward";
};
};

export type ListRunsOptions = RunListInputOptions & Pagination;

export class RunsRepository {
import { type ClickhouseQueryBuilder } from "@internal/clickhouse";
import { RunId } from "@trigger.dev/core/v3/isomorphic";
import {
type FilterRunsOptions,
type IRunsRepository,
type ListRunsOptions,
type RunListInputOptions,
type RunsRepositoryOptions,
convertRunListInputOptionsToFilterRunsOptions,
} from "./runsRepository.server";

export class ClickHouseRunsRepository implements IRunsRepository {
constructor(private readonly options: RunsRepositoryOptions) {}

get name() {
return "clickhouse";
}

async listRunIds(options: ListRunsOptions) {
const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder();
applyRunFiltersToQueryBuilder(
queryBuilder,
await this.#convertRunListInputOptionsToFilterRunsOptions(options)
await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma)
);

if (options.page.cursor) {
Expand Down Expand Up @@ -200,7 +147,7 @@ export class RunsRepository {
const queryBuilder = this.options.clickhouse.taskRuns.countQueryBuilder();
applyRunFiltersToQueryBuilder(
queryBuilder,
await this.#convertRunListInputOptionsToFilterRunsOptions(options)
await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma)
);

const [queryError, result] = await queryBuilder.execute();
Expand All @@ -215,73 +162,6 @@ export class RunsRepository {

return result[0].count;
}

async #convertRunListInputOptionsToFilterRunsOptions(
options: RunListInputOptions
): Promise<FilterRunsOptions> {
const convertedOptions: FilterRunsOptions = {
...options,
period: undefined,
};

// Convert time period to ms
const time = timeFilters({
period: options.period,
from: options.from,
to: options.to,
});
convertedOptions.period = time.period ? parseDuration(time.period) ?? undefined : undefined;

// batch friendlyId to id
if (options.batchId && options.batchId.startsWith("batch_")) {
const batch = await this.options.prisma.batchTaskRun.findFirst({
select: {
id: true,
},
where: {
friendlyId: options.batchId,
runtimeEnvironmentId: options.environmentId,
},
});

if (batch) {
convertedOptions.batchId = batch.id;
}
}

// scheduleId can be a friendlyId
if (options.scheduleId && options.scheduleId.startsWith("sched_")) {
const schedule = await this.options.prisma.taskSchedule.findFirst({
select: {
id: true,
},
where: {
friendlyId: options.scheduleId,
projectId: options.projectId,
},
});

if (schedule) {
convertedOptions.scheduleId = schedule?.id;
}
}

if (options.bulkId && options.bulkId.startsWith("bulk_")) {
convertedOptions.bulkId = BulkActionId.toId(options.bulkId);
}

if (options.runId) {
//convert to friendlyId
convertedOptions.runId = options.runId.map((r) => RunId.toFriendlyId(r));
}

// Show all runs if we are filtering by batchId or runId
if (options.batchId || options.runId?.length || options.scheduleId || options.tasks?.length) {
convertedOptions.rootOnly = false;
}

return convertedOptions;
}
}

function applyRunFiltersToQueryBuilder<T>(
Expand Down Expand Up @@ -373,7 +253,3 @@ function applyRunFiltersToQueryBuilder<T>(
});
}
}

export function parseRunListInputOptions(data: any): RunListInputOptions {
return RunListInputOptionsSchema.parse(data);
}
Loading
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.