From f223c57852fdcd4a1f8ca8f1e652d24156902eb0 Mon Sep 17 00:00:00 2001 From: Marshall Thompson Date: Wed, 28 Jan 2026 20:53:28 -0700 Subject: [PATCH 1/3] fix(client): Buffer SSE stream chunks before parsing The handleEventStream method was parsing each chunk independently, but TCP chunks don't align with SSE event boundaries. This caused JSON parse errors when events were split across chunks. For example: - Chunk 1: data: {"response":"Hel - Chunk 2: lo"}\n\ndata: {"response":" world"}\n\n The fix buffers incoming data and only parses complete events (those ending with the SSE delimiter \n\n). --- packages/feathers/src/client/fetch.ts | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/packages/feathers/src/client/fetch.ts b/packages/feathers/src/client/fetch.ts index bf3ec70d5..dc6cd1bf6 100644 --- a/packages/feathers/src/client/fetch.ts +++ b/packages/feathers/src/client/fetch.ts @@ -130,6 +130,7 @@ export class FetchClient, P extends Params = FetchClient async *handleEventStream(res: Response) { const reader = res.body.getReader() const decoder = new TextDecoder() + let buffer = '' while (true) { const { value, done } = await reader.read() @@ -139,11 +140,17 @@ export class FetchClient, P extends Params = FetchClient } if (value) { - const text = decoder.decode(value) - const eventChunks = text.split('\n\n').filter(Boolean) + buffer += decoder.decode(value, { stream: true }) - for (const chunk of eventChunks) { - const lines = chunk.split('\n') + // SSE events are separated by \n\n + const events = buffer.split('\n\n') + // Keep the last potentially incomplete event in the buffer + buffer = events.pop() || '' + + for (const event of events) { + if (!event.trim()) continue + + const lines = event.split('\n') const dataLine = lines.find((line) => line.startsWith('data: ')) if (dataLine) { From 57e10edec0ac5c0fb0c040dfc514a6eeaa1be418 Mon Sep 17 00:00:00 2001 From: Marshall Thompson Date: Wed, 28 Jan 2026 20:55:17 -0700 Subject: [PATCH 2/3] test(client): Add tests for SSE stream chunk buffering Tests verify handleEventStream correctly handles: - Events split across TCP chunks - Multiple events in a single chunk - Events split at the \n\n delimiter boundary - Multi-byte UTF-8 characters split across chunks --- packages/feathers/src/client/fetch.test.ts | 136 ++++++++++++++++++++- 1 file changed, 134 insertions(+), 2 deletions(-) diff --git a/packages/feathers/src/client/fetch.test.ts b/packages/feathers/src/client/fetch.test.ts index 97f674318..8d6e81a30 100644 --- a/packages/feathers/src/client/fetch.test.ts +++ b/packages/feathers/src/client/fetch.test.ts @@ -1,10 +1,10 @@ -import { beforeAll, describe, it, expect } from 'vitest' +import { beforeAll, describe, it, expect, vi } from 'vitest' import { feathers } from '../index.js' import { clientTests } from '../../fixtures/client.js' import { NotAcceptable, NotFound, MethodNotAllowed, BadRequest } from '../errors.js' import { getApp, createTestServer, TestServiceTypes, verify } from '../../fixtures/index.js' -import { fetchClient } from './index.js' +import { fetchClient, FetchClient } from './index.js' describe('fetch REST connector', function () { const port = 8888 @@ -211,3 +211,135 @@ describe('fetch REST connector', function () { clientTests(app, 'todos') }) + +describe('FetchClient.handleEventStream', () => { + /** + * Creates a mock Response with a ReadableStream that emits chunks + * simulating TCP fragmentation of SSE data + */ + function createChunkedSSEResponse(chunks: string[]): Response { + const encoder = new TextEncoder() + let chunkIndex = 0 + + const stream = new ReadableStream({ + pull(controller) { + if (chunkIndex < chunks.length) { + controller.enqueue(encoder.encode(chunks[chunkIndex])) + chunkIndex++ + } else { + controller.close() + } + } + }) + + return new Response(stream, { + headers: { 'content-type': 'text/event-stream' } + }) + } + + it('handles SSE events split across chunks', async () => { + // Simulate TCP fragmentation where JSON is split mid-object + const chunks = ['data: {"message":"Hel', 'lo"}\n\ndata: {"message":" wor', 'ld"}\n\n'] + + const client = new FetchClient({ + name: 'test', + baseUrl: 'http://localhost', + connection: fetch, + stringify: (q) => '' + }) + + const response = createChunkedSSEResponse(chunks) + const messages: any[] = [] + + for await (const data of client.handleEventStream(response)) { + messages.push(data) + } + + expect(messages).toHaveLength(2) + expect(messages[0]).toEqual({ message: 'Hello' }) + expect(messages[1]).toEqual({ message: ' world' }) + }) + + it('handles multiple events in a single chunk', async () => { + const chunks = ['data: {"a":1}\n\ndata: {"b":2}\n\ndata: {"c":3}\n\n'] + + const client = new FetchClient({ + name: 'test', + baseUrl: 'http://localhost', + connection: fetch, + stringify: (q) => '' + }) + + const response = createChunkedSSEResponse(chunks) + const messages: any[] = [] + + for await (const data of client.handleEventStream(response)) { + messages.push(data) + } + + expect(messages).toHaveLength(3) + expect(messages[0]).toEqual({ a: 1 }) + expect(messages[1]).toEqual({ b: 2 }) + expect(messages[2]).toEqual({ c: 3 }) + }) + + it('handles event split at delimiter boundary', async () => { + // Split right at the \n\n boundary + const chunks = ['data: {"first":true}\n', '\ndata: {"second":true}\n\n'] + + const client = new FetchClient({ + name: 'test', + baseUrl: 'http://localhost', + connection: fetch, + stringify: (q) => '' + }) + + const response = createChunkedSSEResponse(chunks) + const messages: any[] = [] + + for await (const data of client.handleEventStream(response)) { + messages.push(data) + } + + expect(messages).toHaveLength(2) + expect(messages[0]).toEqual({ first: true }) + expect(messages[1]).toEqual({ second: true }) + }) + + it('handles multi-byte UTF-8 characters split across chunks', async () => { + // UTF-8 encoding of emoji can be split across chunks + const fullMessage = 'data: {"emoji":"🎉"}\n\n' + const bytes = new TextEncoder().encode(fullMessage) + // Split in the middle of the emoji (which is 4 bytes in UTF-8) + const chunk1 = bytes.slice(0, 18) // cuts into the emoji + const chunk2 = bytes.slice(18) + + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(chunk1) + controller.enqueue(chunk2) + controller.close() + } + }) + + const response = new Response(stream, { + headers: { 'content-type': 'text/event-stream' } + }) + + const client = new FetchClient({ + name: 'test', + baseUrl: 'http://localhost', + connection: fetch, + stringify: (q) => '' + }) + + const messages: any[] = [] + + for await (const data of client.handleEventStream(response)) { + messages.push(data) + } + + expect(messages).toHaveLength(1) + expect(messages[0]).toEqual({ emoji: '🎉' }) + }) +}) From ccde4caf58d77e85de62e6d6f13c15b08f346317 Mon Sep 17 00:00:00 2001 From: Marshall Thompson Date: Wed, 28 Jan 2026 20:57:45 -0700 Subject: [PATCH 3/3] docs(client): Add Streaming Responses (SSE) section Documents how to consume streaming responses from services that return async generators. Includes examples for AI/LLM streaming, progress updates, and notes about automatic SSE buffering. --- website/content/api/client/rest.md | 67 ++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/website/content/api/client/rest.md b/website/content/api/client/rest.md index d2ec356ee..f717d876b 100644 --- a/website/content/api/client/rest.md +++ b/website/content/api/client/rest.md @@ -285,6 +285,73 @@ If no `Content-Type` header is specified, streaming requests default to `applica Streaming uploads are only supported with the REST/HTTP transport. Socket.io does not support streaming request bodies. :: +### Streaming Responses (SSE) + +When a service returns an [async generator or async iterable](../http#async-iterators-sse), the server sends the response as Server-Sent Events (SSE). The REST client automatically detects this and returns an async iterable that you can consume with `for await...of`: + +```ts +// Server - service returns an async generator +class ChatService { + async *create(data: { prompt: string }) { + const stream = await ai.generateStream(data.prompt) + + for await (const chunk of stream) { + yield { type: 'text', content: chunk } + } + } +} + +// Client - consume the stream +const response = app.service('chat').create({ prompt: 'Hello' }) + +for await (const chunk of response) { + console.log(chunk.content) // Streams in real-time +} +``` + +This is useful for: + +- **AI/LLM responses** - Stream tokens as they're generated +- **Progress updates** - Report status during long-running operations +- **Live data feeds** - Push data to clients as it becomes available + +```ts +// Example: Streaming AI chat with status updates +class AIChatService { + async *create(data: { messages: Message[] }, params: Params) { + yield { type: 'status', text: 'Thinking...' } + + const stream = await llm.chat(data.messages) + + for await (const token of stream) { + yield { type: 'text', text: token } + } + + yield { type: 'done' } + } +} + +// Client +let fullResponse = '' + +for await (const event of app.service('ai-chat').create({ messages })) { + if (event.type === 'status') { + showStatus(event.text) + } else if (event.type === 'text') { + fullResponse += event.text + updateUI(fullResponse) + } +} +``` + +::note[Automatic buffering] +The client automatically handles SSE stream buffering, correctly parsing events even when they arrive split across network chunks. This ensures reliable streaming regardless of network conditions. +:: + +::warning[REST only] +Streaming responses are only supported with the REST/HTTP transport. For real-time updates over Socket.io, use [channels and events](../channels) instead. +:: + ### Custom Methods On the client, [custom service methods](../services#custom-methods) registered using the `methods` option when registering the service via `restClient.service()`: