Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion 2 apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ const EnvironmentSchema = z
EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE: z.coerce.number().int().default(10485760),
EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS: z.coerce.number().int().default(5000),
EVENT_REPOSITORY_CLICKHOUSE_ROLLOUT_PERCENT: z.coerce.number().optional(),
EVENT_REPOSITORY_DEFAULT_STORE: z.enum(["postgres", "clickhouse"]).default("postgres"),
EVENT_REPOSITORY_DEFAULT_STORE: z.enum(["postgres", "clickhouse", "clickhouse_v2"]).default("postgres"),
EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),
EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(5_000),
EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING: z.coerce.number().int().default(2000),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type {
TaskEventDetailsV1Result,
TaskEventSummaryV1Result,
TaskEventV1Input,
TaskEventV2Input,
} from "@internal/clickhouse";
import { Attributes, startSpan, trace, Tracer } from "@internal/tracing";
import { createJsonErrorObject } from "@trigger.dev/core/v3/errors";
Expand Down Expand Up @@ -72,6 +73,12 @@ export type ClickhouseEventRepositoryConfig = {
maximumTraceSummaryViewCount?: number;
maximumTraceDetailedSummaryViewCount?: number;
maximumLiveReloadingSetting?: number;
/**
* The version of the ClickHouse task_events table to use.
* - "v1": Uses task_events_v1 (partitioned by start_time)
* - "v2": Uses task_events_v2 (partitioned by inserted_at to avoid "too many parts" errors)
*/
version?: "v1" | "v2";
};

/**
Expand All @@ -81,13 +88,15 @@ export type ClickhouseEventRepositoryConfig = {
export class ClickhouseEventRepository implements IEventRepository {
private _clickhouse: ClickHouse;
private _config: ClickhouseEventRepositoryConfig;
private readonly _flushScheduler: DynamicFlushScheduler<TaskEventV1Input>;
private readonly _flushScheduler: DynamicFlushScheduler<TaskEventV1Input | TaskEventV2Input>;
private _tracer: Tracer;
private _version: "v1" | "v2";

constructor(config: ClickhouseEventRepositoryConfig) {
this._clickhouse = config.clickhouse;
this._config = config;
this._tracer = config.tracer ?? trace.getTracer("clickhouseEventRepo", "0.0.1");
this._version = config.version ?? "v1";

this._flushScheduler = new DynamicFlushScheduler({
batchSize: config.batchSize ?? 1000,
Expand All @@ -99,31 +108,42 @@ export class ClickhouseEventRepository implements IEventRepository {
memoryPressureThreshold: 10000,
loadSheddingThreshold: 10000,
loadSheddingEnabled: false,
isDroppableEvent: (event: TaskEventV1Input) => {
isDroppableEvent: (event: TaskEventV1Input | TaskEventV2Input) => {
// Only drop LOG events during load shedding
return event.kind === "DEBUG_EVENT";
},
});
}

get version() {
return this._version;
}

get maximumLiveReloadingSetting() {
return this._config.maximumLiveReloadingSetting ?? 1000;
}

async #flushBatch(flushId: string, events: TaskEventV1Input[]) {
async #flushBatch(flushId: string, events: (TaskEventV1Input | TaskEventV2Input)[]) {
await startSpan(this._tracer, "flushBatch", async (span) => {
span.setAttribute("flush_id", flushId);
span.setAttribute("event_count", events.length);
span.setAttribute("version", this._version);

const firstEvent = events[0];

if (firstEvent) {
logger.debug("ClickhouseEventRepository.flushBatch first event", {
event: firstEvent,
version: this._version,
});
}

const [insertError, insertResult] = await this._clickhouse.taskEvents.insert(events, {
const insertFn =
this._version === "v2"
? this._clickhouse.taskEventsV2.insert
: this._clickhouse.taskEvents.insert;

const [insertError, insertResult] = await insertFn(events, {
params: {
clickhouse_settings: this.#getClickhouseInsertSettings(),
},
Expand All @@ -136,6 +156,7 @@ export class ClickhouseEventRepository implements IEventRepository {
logger.info("ClickhouseEventRepository.flushBatch Inserted batch into clickhouse", {
events: events.length,
insertResult,
version: this._version,
});

this.#publishToRedis(events);
Expand All @@ -155,7 +176,7 @@ export class ClickhouseEventRepository implements IEventRepository {
}
}

async #publishToRedis(events: TaskEventV1Input[]) {
async #publishToRedis(events: (TaskEventV1Input | TaskEventV2Input)[]) {
if (events.length === 0) return;
await tracePubSub.publish(events.map((e) => e.trace_id));
}
Expand Down Expand Up @@ -960,7 +981,10 @@ export class ClickhouseEventRepository implements IEventRepository {
): Promise<TraceSummary | undefined> {
const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000);

const queryBuilder = this._clickhouse.taskEvents.traceSummaryQueryBuilder();
const queryBuilder =
this._version === "v2"
? this._clickhouse.taskEventsV2.traceSummaryQueryBuilder()
: this._clickhouse.taskEvents.traceSummaryQueryBuilder();

queryBuilder.where("environment_id = {environmentId: String}", { environmentId });
queryBuilder.where("trace_id = {traceId: String}", { traceId });
Expand All @@ -974,6 +998,14 @@ export class ClickhouseEventRepository implements IEventRepository {
});
}

// For v2, add inserted_at filtering for partition pruning
if (this._version === "v2") {
queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", {
insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer),
});
// No upper bound on inserted_at - we want all events inserted up to now
}

if (options?.includeDebugLogs === false) {
queryBuilder.where("kind != {kind: String}", { kind: "DEBUG_EVENT" });
}
Expand Down Expand Up @@ -1058,7 +1090,10 @@ export class ClickhouseEventRepository implements IEventRepository {
): Promise<SpanDetail | undefined> {
const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000);

const queryBuilder = this._clickhouse.taskEvents.spanDetailsQueryBuilder();
const queryBuilder =
this._version === "v2"
? this._clickhouse.taskEventsV2.spanDetailsQueryBuilder()
: this._clickhouse.taskEvents.spanDetailsQueryBuilder();

queryBuilder.where("environment_id = {environmentId: String}", { environmentId });
queryBuilder.where("trace_id = {traceId: String}", { traceId });
Expand All @@ -1073,6 +1108,13 @@ export class ClickhouseEventRepository implements IEventRepository {
});
}

// For v2, add inserted_at filtering for partition pruning
if (this._version === "v2") {
queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", {
insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer),
});
}

queryBuilder.orderBy("start_time ASC");

const [queryError, records] = await queryBuilder.execute();
Expand Down Expand Up @@ -1477,7 +1519,10 @@ export class ClickhouseEventRepository implements IEventRepository {
): Promise<TraceDetailedSummary | undefined> {
const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000);

const queryBuilder = this._clickhouse.taskEvents.traceDetailedSummaryQueryBuilder();
const queryBuilder =
this._version === "v2"
? this._clickhouse.taskEventsV2.traceDetailedSummaryQueryBuilder()
: this._clickhouse.taskEvents.traceDetailedSummaryQueryBuilder();

queryBuilder.where("environment_id = {environmentId: String}", { environmentId });
queryBuilder.where("trace_id = {traceId: String}", { traceId });
Expand All @@ -1491,6 +1536,13 @@ export class ClickhouseEventRepository implements IEventRepository {
});
}

// For v2, add inserted_at filtering for partition pruning
if (this._version === "v2") {
queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", {
insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer),
});
}

if (options?.includeDebugLogs === false) {
queryBuilder.where("kind != {kind: String}", { kind: "DEBUG_EVENT" });
}
Expand Down Expand Up @@ -1675,7 +1727,10 @@ export class ClickhouseEventRepository implements IEventRepository {
): Promise<RunPreparedEvent[]> {
const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - 1000);

const queryBuilder = this._clickhouse.taskEvents.traceSummaryQueryBuilder();
const queryBuilder =
this._version === "v2"
? this._clickhouse.taskEventsV2.traceSummaryQueryBuilder()
: this._clickhouse.taskEvents.traceSummaryQueryBuilder();

queryBuilder.where("environment_id = {environmentId: String}", { environmentId });
queryBuilder.where("trace_id = {traceId: String}", { traceId });
Expand All @@ -1690,6 +1745,13 @@ export class ClickhouseEventRepository implements IEventRepository {
});
}

// For v2, add inserted_at filtering for partition pruning
if (this._version === "v2") {
queryBuilder.where("inserted_at >= {insertedAtStart: DateTime64(3)}", {
insertedAtStart: convertDateToClickhouseDateTime(startCreatedAtWithBuffer),
});
}

queryBuilder.where("kind != {kind: String}", { kind: "DEBUG_EVENT" });
queryBuilder.orderBy("start_time ASC");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ export const clickhouseEventRepository = singleton(
initializeClickhouseRepository
);

function initializeClickhouseRepository() {
export const clickhouseEventRepositoryV2 = singleton(
"clickhouseEventRepositoryV2",
initializeClickhouseRepositoryV2
);

function getClickhouseClient() {
if (!env.EVENTS_CLICKHOUSE_URL) {
throw new Error("EVENTS_CLICKHOUSE_URL is not set");
}

const url = new URL(env.EVENTS_CLICKHOUSE_URL);
url.searchParams.delete("secure");

const safeUrl = new URL(url.toString());
safeUrl.password = "redacted";

console.log("🗃️ Initializing Clickhouse event repository", { url: safeUrl.toString() });

const clickhouse = new ClickHouse({
return new ClickHouse({
url: url.toString(),
name: "task-events",
keepAlive: {
Expand All @@ -34,6 +34,55 @@ function initializeClickhouseRepository() {
},
maxOpenConnections: env.EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
});
}

function initializeClickhouseRepository() {
if (!env.EVENTS_CLICKHOUSE_URL) {
throw new Error("EVENTS_CLICKHOUSE_URL is not set");
}

const url = new URL(env.EVENTS_CLICKHOUSE_URL);
url.searchParams.delete("secure");

const safeUrl = new URL(url.toString());
safeUrl.password = "redacted";

console.log("🗃️ Initializing Clickhouse event repository (v1)", { url: safeUrl.toString() });

const clickhouse = getClickhouseClient();

const repository = new ClickhouseEventRepository({
clickhouse: clickhouse,
batchSize: env.EVENTS_CLICKHOUSE_BATCH_SIZE,
flushInterval: env.EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS,
maximumTraceSummaryViewCount: env.EVENTS_CLICKHOUSE_MAX_TRACE_SUMMARY_VIEW_COUNT,
maximumTraceDetailedSummaryViewCount:
env.EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT,
maximumLiveReloadingSetting: env.EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING,
insertStrategy: env.EVENTS_CLICKHOUSE_INSERT_STRATEGY,
waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1",
asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE,
asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS,
version: "v1",
});

return repository;
}

function initializeClickhouseRepositoryV2() {
if (!env.EVENTS_CLICKHOUSE_URL) {
throw new Error("EVENTS_CLICKHOUSE_URL is not set");
}

const url = new URL(env.EVENTS_CLICKHOUSE_URL);
url.searchParams.delete("secure");

const safeUrl = new URL(url.toString());
safeUrl.password = "redacted";

console.log("🗃️ Initializing Clickhouse event repository (v2)", { url: safeUrl.toString() });

const clickhouse = getClickhouseClient();

const repository = new ClickhouseEventRepository({
clickhouse: clickhouse,
Expand All @@ -47,6 +96,7 @@ function initializeClickhouseRepository() {
waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1",
asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE,
asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS,
version: "v2",
});

return repository;
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.