diff --git a/package.json b/package.json index d2a95e4..5e52b29 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "author": "Layercode", "license": "MIT", "name": "@layercode/node-server-sdk", - "version": "1.2.0", + "version": "1.2.2", "description": "Layercode Node.js Server Side SDK", "type": "module", "main": "./dist/cjs/index.js", diff --git a/src/index.ts b/src/index.ts index 38af640..44adc86 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,2 +1,3 @@ export { streamResponse } from './streamResponse'; export { verifySignature } from './verifySignature'; +export { ttsWorkersAIStream } from './ttsWorkersAIStream'; diff --git a/src/sseParse.ts b/src/sseParse.ts new file mode 100644 index 0000000..bc631c4 --- /dev/null +++ b/src/sseParse.ts @@ -0,0 +1,111 @@ +export interface SseMessage { + id?: string; + event: string; + data: string; + retry?: number; +} + +export function messageListFromString(input: string): { + messages: SseMessage[]; + leftoverData: string | undefined; +} { + const messages: SseMessage[] = []; + let line: string = ""; + let ignoreNextNewline = false; + let data: string | undefined; + let id: string | undefined; + let event: string | undefined; + let retry: number | undefined; + let previousChar: string | undefined; + let pendingIndex = 0; + let isEndOfMessage = false; + function handleParseLine(pIndex: number) { + const result = parseLine(line); + data = result.data ?? data; + id = result.id ?? id; + event = result.event ?? event; + retry = result.retry ?? retry; + if (isEndOfMessage) { + if (typeof data === "string") { + messages.push({ + id: id, + data: data, + event: event ?? "message", + retry: retry, + }); + } + id = undefined; + data = undefined; + event = undefined; + retry = undefined; + pendingIndex = pIndex; + } + line = ""; + } + for (let i = 0; i < input.length; i++) { + const char = input[i]; + switch (char) { + case "\r": { + isEndOfMessage = previousChar === "\n" || previousChar === "\r"; + ignoreNextNewline = true; + const pIndex = input[i + 1] === "\n" ? i + 2 : i + 1; + handleParseLine(pIndex); + break; + } + case "\n": { + if (ignoreNextNewline) { + ignoreNextNewline = false; + break; + } + isEndOfMessage = previousChar === "\n"; + handleParseLine(i + 1); + break; + } + default: + line += char; + break; + } + previousChar = char; + } + return { + messages, + leftoverData: input.substring(pendingIndex), + }; +} + +export function parseLine(input: string): Partial { + if (input.startsWith("data:")) { + return { data: input.substring(5).trim() }; + } + if (input.startsWith("id:")) { + return { id: input.substring(3).trim() }; + } + if (input.startsWith("event:")) { + return { + event: input.substring(6).trim(), + }; + } + if (input.startsWith("retry:")) { + const val = Number(input.substring(6).trim()); + if (!Number.isNaN(val)) { + if (Number.isInteger(val)) { + return { retry: val }; + } else { + return { retry: Math.round(val) }; + } + } + } + return {}; +} + +export async function getBytes( + controller: AbortController, + stream: ReadableStream, + onChunk: (arr: Uint8Array) => void, +) { + const reader = stream.getReader(); + let result: ReadableStreamReadResult; + while (!controller.signal.aborted && !(result = await reader.read()).done) { + onChunk(result.value); + } +} diff --git a/src/streamResponse.ts b/src/streamResponse.ts index c4d3987..fcd84e5 100644 --- a/src/streamResponse.ts +++ b/src/streamResponse.ts @@ -5,10 +5,17 @@ * @returns Response object */ +import { ttsWorkersAIStream as ttsWorkersAIStreamUtil } from './ttsWorkersAIStream'; + export interface StreamResponseHandlerHelpers { stream: { tts: (content: string) => void; ttsTextStream: (textStream: AsyncIterable) => Promise; + /** + * Parse a Workers AI (SSE JSON) stream and speak chunks via TTS. + * Returns the concatenated text that was spoken. + */ + ttsWorkersAIStream: (llmResponseStream: ReadableStream) => Promise; data: (content: any) => void; // other?: (type: string, payload: any) => void; end: () => void; @@ -38,6 +45,8 @@ export function streamResponse(requestBody: Record, handler: Stream stream.tts(chunk); } }, + ttsWorkersAIStream: async (llmResponseStream: ReadableStream): Promise => + ttsWorkersAIStreamUtil(llmResponseStream, (text) => stream.tts(text)), data: (content: any) => sendEvent('response.data', { content }), // other: (type: string, payload: any) => sendEvent(type, payload), end: () => { diff --git a/src/ttsWorkersAIStream.ts b/src/ttsWorkersAIStream.ts new file mode 100644 index 0000000..ba81723 --- /dev/null +++ b/src/ttsWorkersAIStream.ts @@ -0,0 +1,41 @@ +import { getBytes, messageListFromString } from './sseParse'; + +/** + * Parses a Workers AI SSE JSON stream and invokes the provided TTS callback + * with each text chunk. Returns the concatenated text. + */ +export async function ttsWorkersAIStream( + llmResponseStream: ReadableStream, + tts: (content: string) => void, +): Promise { + const decoder = new TextDecoder(); + const abort = new AbortController(); + let pendingData = ''; + let combinedText = ''; + + await getBytes(abort, llmResponseStream, (arr) => { + const text = pendingData + decoder.decode(arr, { stream: true }); + const result = messageListFromString(text); + pendingData = result.leftoverData ?? ''; + for (const msg of result.messages) { + const data = (msg.data || '').trim(); + if (!data) continue; + if (data === '[DONE]') { + abort.abort('done'); + break; + } + try { + const json = JSON.parse(data); + const chunk = typeof json?.response === 'string' ? json.response : ''; + if (chunk) { + tts(chunk); + combinedText += chunk; + } + } catch { + // ignore non-JSON payloads + } + } + }); + + return combinedText; +}