Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions 6 .changeset/tender-cycles-melt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Fixed deploy timeout issues and improve the output of logs when deploying
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { FinalizeDeploymentV2Service } from "~/v3/services/finalizeDeploymentV2";
import { FinalizeDeploymentV2Service } from "~/v3/services/finalizeDeploymentV2.server";

const ParamsSchema = z.object({
deploymentId: z.string(),
Expand Down
104 changes: 104 additions & 0 deletions 104 apps/webapp/app/routes/api.v3.deployments.$deploymentId.finalize.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { FinalizeDeploymentRequestBody } from "@trigger.dev/core/v3";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { FinalizeDeploymentV2Service } from "~/v3/services/finalizeDeploymentV2.server";

const ParamsSchema = z.object({
deploymentId: z.string(),
});

export async function action({ request, params }: ActionFunctionArgs) {
// Ensure this is a POST request
if (request.method.toUpperCase() !== "POST") {
return { status: 405, body: "Method Not Allowed" };
}

const parsedParams = ParamsSchema.safeParse(params);

if (!parsedParams.success) {
return json({ error: "Invalid params" }, { status: 400 });
}

// Next authenticate the request
const authenticationResult = await authenticateApiRequest(request);

if (!authenticationResult) {
logger.info("Invalid or missing api key", { url: request.url });
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const authenticatedEnv = authenticationResult.environment;

const { deploymentId } = parsedParams.data;

const rawBody = await request.json();
const body = FinalizeDeploymentRequestBody.safeParse(rawBody);

if (!body.success) {
return json({ error: "Invalid body", issues: body.error.issues }, { status: 400 });
}

try {
// Create a text stream chain
const stream = new TransformStream();
const encoder = new TextEncoderStream();
const writer = stream.writable.getWriter();

const service = new FinalizeDeploymentV2Service();

// Chain the streams: stream -> encoder -> response
const response = new Response(stream.readable.pipeThrough(encoder), {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});

const pingInterval = setInterval(() => {
writer.write("event: ping\ndata: {}\n\n");
}, 10000); // 10 seconds

service
.call(authenticatedEnv, deploymentId, body.data, writer)
.then(async () => {
clearInterval(pingInterval);

await writer.write(`event: complete\ndata: ${JSON.stringify({ id: deploymentId })}\n\n`);
await writer.close();
})
.catch(async (error) => {
let errorMessage;

if (error instanceof ServiceValidationError) {
errorMessage = { error: error.message };
} else if (error instanceof Error) {
logger.error("Error finalizing deployment", { error: error.message });
errorMessage = { error: `Internal server error: ${error.message}` };
} else {
logger.error("Error finalizing deployment", { error: String(error) });
errorMessage = { error: "Internal server error" };
}

clearInterval(pingInterval);

await writer.write(`event: error\ndata: ${JSON.stringify(errorMessage)}\n\n`);
await writer.close();
});

return response;
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 400 });
} else if (error instanceof Error) {
logger.error("Error finalizing deployment", { error: error.message });
return json({ error: `Internal server error: ${error.message}` }, { status: 500 });
} else {
logger.error("Error finalizing deployment", { error: String(error) });
return json({ error: "Internal server error" }, { status: 500 });
}
}
}
6 changes: 6 additions & 0 deletions 6 apps/webapp/app/v3/services/finalizeDeployment.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ export class FinalizeDeploymentService extends BaseService {
throw new ServiceValidationError("Worker deployment does not have a worker");
}

if (deployment.status === "DEPLOYED") {
logger.debug("Worker deployment is already deployed", { id });

return deployment;
}

if (deployment.status !== "DEPLOYING") {
logger.error("Worker deployment is not in DEPLOYING status", { id });
throw new ServiceValidationError("Worker deployment is not in DEPLOYING status");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ export class FinalizeDeploymentV2Service extends BaseService {
public async call(
authenticatedEnv: AuthenticatedEnvironment,
id: string,
body: FinalizeDeploymentRequestBody
body: FinalizeDeploymentRequestBody,
writer?: WritableStreamDefaultWriter
) {
// if it's self hosted, lets just use the v1 finalize deployment service
if (body.selfHosted) {
Expand Down Expand Up @@ -83,24 +84,27 @@ export class FinalizeDeploymentV2Service extends BaseService {
throw new ServiceValidationError("Missing depot token");
}

const pushResult = await executePushToRegistry({
depot: {
buildId: externalBuildData.data.buildId,
orgToken: env.DEPOT_TOKEN,
projectId: externalBuildData.data.projectId,
},
registry: {
host: env.DEPLOY_REGISTRY_HOST,
namespace: env.DEPLOY_REGISTRY_NAMESPACE,
username: env.DEPLOY_REGISTRY_USERNAME,
password: env.DEPLOY_REGISTRY_PASSWORD,
},
deployment: {
version: deployment.version,
environmentSlug: deployment.environment.slug,
projectExternalRef: deployment.worker.project.externalRef,
const pushResult = await executePushToRegistry(
{
depot: {
buildId: externalBuildData.data.buildId,
orgToken: env.DEPOT_TOKEN,
projectId: externalBuildData.data.projectId,
},
registry: {
host: env.DEPLOY_REGISTRY_HOST,
namespace: env.DEPLOY_REGISTRY_NAMESPACE,
username: env.DEPLOY_REGISTRY_USERNAME,
password: env.DEPLOY_REGISTRY_PASSWORD,
},
deployment: {
version: deployment.version,
environmentSlug: deployment.environment.slug,
projectExternalRef: deployment.worker.project.externalRef,
},
},
});
writer
);

if (!pushResult.ok) {
throw new ServiceValidationError(pushResult.error);
Expand Down Expand Up @@ -148,11 +152,10 @@ type ExecutePushResult =
logs: string;
};

async function executePushToRegistry({
depot,
registry,
deployment,
}: ExecutePushToRegistryOptions): Promise<ExecutePushResult> {
async function executePushToRegistry(
{ depot, registry, deployment }: ExecutePushToRegistryOptions,
writer?: WritableStreamDefaultWriter
): Promise<ExecutePushResult> {
// Step 1: We need to "login" to the digital ocean registry
const configDir = await ensureLoggedIntoDockerRegistry(registry.host, {
username: registry.username,
Expand Down Expand Up @@ -180,7 +183,7 @@ async function executePushToRegistry({
try {
const processCode = await new Promise<number | null>((res, rej) => {
// For some reason everything is output on stderr, not stdout
childProcess.stderr?.on("data", (data: Buffer) => {
childProcess.stderr?.on("data", async (data: Buffer) => {
const text = data.toString();

// Emitted data chunks can contain multiple lines. Remove empty lines.
Expand All @@ -191,6 +194,13 @@ async function executePushToRegistry({
imageTag,
deployment,
});

// Now we can write strings directly
if (writer) {
for (const line of lines) {
await writer.write(`event: log\ndata: ${JSON.stringify({ message: line })}\n\n`);
}
}
Comment on lines +198 to +203

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider error handling for stream writes.

The code writes to the stream without handling potential write errors. Stream writes can fail, especially in case of network issues or if the client disconnects.

Apply this diff to add error handling:

 if (writer) {
   for (const line of lines) {
-    await writer.write(`event: log\ndata: ${JSON.stringify({ message: line })}\n\n`);
+    try {
+      await writer.write(`event: log\ndata: ${JSON.stringify({ message: line })}\n\n`);
+    } catch (error) {
+      logger.error("Failed to write to stream", { error });
+      break;
+    }
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Now we can write strings directly
if (writer) {
for (const line of lines) {
await writer.write(`event: log\ndata: ${JSON.stringify({ message: line })}\n\n`);
}
}
// Now we can write strings directly
if (writer) {
for (const line of lines) {
try {
await writer.write(`event: log\ndata: ${JSON.stringify({ message: line })}\n\n`);
} catch (error) {
logger.error("Failed to write to stream", { error });
break;
}
}
}

});

childProcess.on("error", (e) => rej(e));
Expand Down
65 changes: 57 additions & 8 deletions 65 packages/cli-v3/src/apiClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
FailDeploymentResponseBody,
FinalizeDeploymentRequestBody,
} from "@trigger.dev/core/v3";
import { zodfetch, ApiError } from "@trigger.dev/core/v3/zodfetch";
import { zodfetch, ApiError, zodfetchSSE } from "@trigger.dev/core/v3/zodfetch";

export class CliApiClient {
constructor(
Expand Down Expand Up @@ -247,23 +247,72 @@ export class CliApiClient {
);
}

async finalizeDeployment(id: string, body: FinalizeDeploymentRequestBody) {
async finalizeDeployment(
id: string,
body: FinalizeDeploymentRequestBody,
onLog?: (message: string) => void
): Promise<ApiResult<FailDeploymentResponseBody>> {
if (!this.accessToken) {
throw new Error("finalizeDeployment: No access token");
}

return wrapZodFetch(
FailDeploymentResponseBody,
`${this.apiURL}/api/v2/deployments/${id}/finalize`,
{
let resolvePromise: (value: ApiResult<FailDeploymentResponseBody>) => void;
let rejectPromise: (reason: any) => void;

const promise = new Promise<ApiResult<FailDeploymentResponseBody>>((resolve, reject) => {
resolvePromise = resolve;
rejectPromise = reject;
});

const source = zodfetchSSE({
url: `${this.apiURL}/api/v3/deployments/${id}/finalize`,
request: {
method: "POST",
headers: {
Authorization: `Bearer ${this.accessToken}`,
"Content-Type": "application/json",
},
body: JSON.stringify(body),
}
);
},
messages: {
error: z.object({ error: z.string() }),
log: z.object({ message: z.string() }),
complete: FailDeploymentResponseBody,
},
});

source.onConnectionError((error) => {
rejectPromise({
success: false,
error,
});
});

source.onMessage("complete", (message) => {
resolvePromise({
success: true,
data: message,
});
});

source.onMessage("error", ({ error }) => {
rejectPromise({
success: false,
error,
});
});

if (onLog) {
source.onMessage("log", ({ message }) => {
onLog(message);
});
}

const result = await promise;

source.stop();

return result;
}

async startDeploymentIndexing(deploymentId: string, body: StartDeploymentIndexingRequestBody) {
Expand Down
39 changes: 31 additions & 8 deletions 39 packages/cli-v3/src/commands/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,10 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
forcedExternals,
listener: {
onBundleStart() {
$buildSpinner.start("Building project");
$buildSpinner.start("Building trigger code");
},
onBundleComplete(result) {
$buildSpinner.stop("Successfully built project");
$buildSpinner.stop("Successfully built code");

logger.debug("Bundle result", result);
},
Expand Down Expand Up @@ -328,9 +328,9 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
const $spinner = spinner();

if (isLinksSupported) {
$spinner.start(`Deploying version ${version} ${deploymentLink}`);
$spinner.start(`Building version ${version} ${deploymentLink}`);
} else {
$spinner.start(`Deploying version ${version}`);
$spinner.start(`Building version ${version}`);
}

const selfHostedRegistryHost = deployment.registryHost ?? options.registry;
Expand Down Expand Up @@ -359,6 +359,13 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
compilationPath: destination.path,
buildEnvVars: buildManifest.build.env,
network: options.network,
onLog: (logMessage) => {
if (isLinksSupported) {
$spinner.message(`Building version ${version} ${deploymentLink}: ${logMessage}`);
} else {
$spinner.message(`Building version ${version}: ${logMessage}`);
}
},
});

logger.debug("Build result", buildResult);
Expand Down Expand Up @@ -426,10 +433,26 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {
}`
: `${buildResult.image}${buildResult.digest ? `@${buildResult.digest}` : ""}`;

const finalizeResponse = await projectClient.client.finalizeDeployment(deployment.id, {
imageReference,
selfHosted: options.selfHosted,
});
if (isLinksSupported) {
$spinner.message(`Deploying version ${version} ${deploymentLink}`);
} else {
$spinner.message(`Deploying version ${version}`);
}

const finalizeResponse = await projectClient.client.finalizeDeployment(
deployment.id,
{
imageReference,
selfHosted: options.selfHosted,
},
(logMessage) => {
if (isLinksSupported) {
$spinner.message(`Deploying version ${version} ${deploymentLink}: ${logMessage}`);
} else {
$spinner.message(`Deploying version ${version}: ${logMessage}`);
}
}
);

if (!finalizeResponse.success) {
await failDeploy(
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.