Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 134 additions & 2 deletions 136 packages/feathers/src/client/fetch.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { beforeAll, describe, it, expect } from 'vitest'
import { beforeAll, describe, it, expect, vi } from 'vitest'

Check warning on line 1 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

'vi' is defined but never used. Allowed unused vars must match /^_/u

Check warning on line 1 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (24.x)

'vi' is defined but never used. Allowed unused vars must match /^_/u

Check warning on line 1 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (24.x)

'vi' is defined but never used. Allowed unused vars must match /^_/u

Check warning on line 1 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

'vi' is defined but never used. Allowed unused vars must match /^_/u
import { feathers } from '../index.js'
import { clientTests } from '../../fixtures/client.js'
import { NotAcceptable, NotFound, MethodNotAllowed, BadRequest } from '../errors.js'

import { getApp, createTestServer, TestServiceTypes, verify } from '../../fixtures/index.js'
import { fetchClient } from './index.js'
import { fetchClient, FetchClient } from './index.js'

describe('fetch REST connector', function () {
const port = 8888
Expand Down Expand Up @@ -211,3 +211,135 @@

clientTests(app, 'todos')
})

describe('FetchClient.handleEventStream', () => {
/**
* Creates a mock Response with a ReadableStream that emits chunks
* simulating TCP fragmentation of SSE data
*/
function createChunkedSSEResponse(chunks: string[]): Response {
const encoder = new TextEncoder()
let chunkIndex = 0

const stream = new ReadableStream({
pull(controller) {
if (chunkIndex < chunks.length) {
controller.enqueue(encoder.encode(chunks[chunkIndex]))
chunkIndex++
} else {
controller.close()
}
}
})

return new Response(stream, {
headers: { 'content-type': 'text/event-stream' }
})
}

it('handles SSE events split across chunks', async () => {
// Simulate TCP fragmentation where JSON is split mid-object
const chunks = ['data: {"message":"Hel', 'lo"}\n\ndata: {"message":" wor', 'ld"}\n\n']

const client = new FetchClient({
name: 'test',
baseUrl: 'http://localhost',
connection: fetch,
stringify: (q) => ''

Check warning on line 248 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

'q' is defined but never used. Allowed unused args must match /^_/u

Check warning on line 248 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (24.x)

'q' is defined but never used. Allowed unused args must match /^_/u

Check warning on line 248 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (24.x)

'q' is defined but never used. Allowed unused args must match /^_/u

Check warning on line 248 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

'q' is defined but never used. Allowed unused args must match /^_/u
})

const response = createChunkedSSEResponse(chunks)
const messages: any[] = []

for await (const data of client.handleEventStream(response)) {
messages.push(data)
}

expect(messages).toHaveLength(2)
expect(messages[0]).toEqual({ message: 'Hello' })
expect(messages[1]).toEqual({ message: ' world' })
})

it('handles multiple events in a single chunk', async () => {
const chunks = ['data: {"a":1}\n\ndata: {"b":2}\n\ndata: {"c":3}\n\n']

const client = new FetchClient({
name: 'test',
baseUrl: 'http://localhost',
connection: fetch,
stringify: (q) => ''

Check warning on line 270 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

'q' is defined but never used. Allowed unused args must match /^_/u

Check warning on line 270 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (24.x)

'q' is defined but never used. Allowed unused args must match /^_/u

Check warning on line 270 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (24.x)

'q' is defined but never used. Allowed unused args must match /^_/u

Check warning on line 270 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

'q' is defined but never used. Allowed unused args must match /^_/u
})

const response = createChunkedSSEResponse(chunks)
const messages: any[] = []

for await (const data of client.handleEventStream(response)) {
messages.push(data)
}

expect(messages).toHaveLength(3)
expect(messages[0]).toEqual({ a: 1 })
expect(messages[1]).toEqual({ b: 2 })
expect(messages[2]).toEqual({ c: 3 })
})

it('handles event split at delimiter boundary', async () => {
// Split right at the \n\n boundary
const chunks = ['data: {"first":true}\n', '\ndata: {"second":true}\n\n']

const client = new FetchClient({
name: 'test',
baseUrl: 'http://localhost',
connection: fetch,
stringify: (q) => ''

Check warning on line 294 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

'q' is defined but never used. Allowed unused args must match /^_/u

Check warning on line 294 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (24.x)

'q' is defined but never used. Allowed unused args must match /^_/u

Check warning on line 294 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (24.x)

'q' is defined but never used. Allowed unused args must match /^_/u

Check warning on line 294 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

'q' is defined but never used. Allowed unused args must match /^_/u
})

const response = createChunkedSSEResponse(chunks)
const messages: any[] = []

for await (const data of client.handleEventStream(response)) {
messages.push(data)
}

expect(messages).toHaveLength(2)
expect(messages[0]).toEqual({ first: true })
expect(messages[1]).toEqual({ second: true })
})

it('handles multi-byte UTF-8 characters split across chunks', async () => {
// UTF-8 encoding of emoji can be split across chunks
const fullMessage = 'data: {"emoji":"🎉"}\n\n'
const bytes = new TextEncoder().encode(fullMessage)
// Split in the middle of the emoji (which is 4 bytes in UTF-8)
const chunk1 = bytes.slice(0, 18) // cuts into the emoji
const chunk2 = bytes.slice(18)

const stream = new ReadableStream({
start(controller) {
controller.enqueue(chunk1)
controller.enqueue(chunk2)
controller.close()
}
})

const response = new Response(stream, {
headers: { 'content-type': 'text/event-stream' }
})

const client = new FetchClient({
name: 'test',
baseUrl: 'http://localhost',
connection: fetch,
stringify: (q) => ''

Check warning on line 333 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

'q' is defined but never used. Allowed unused args must match /^_/u

Check warning on line 333 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (24.x)

'q' is defined but never used. Allowed unused args must match /^_/u

Check warning on line 333 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (24.x)

'q' is defined but never used. Allowed unused args must match /^_/u

Check warning on line 333 in packages/feathers/src/client/fetch.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

'q' is defined but never used. Allowed unused args must match /^_/u
})

const messages: any[] = []

for await (const data of client.handleEventStream(response)) {
messages.push(data)
}

expect(messages).toHaveLength(1)
expect(messages[0]).toEqual({ emoji: '🎉' })
})
})
15 changes: 11 additions & 4 deletions 15 packages/feathers/src/client/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ export class FetchClient<T = any, D = Partial<T>, P extends Params = FetchClient
async *handleEventStream(res: Response) {
const reader = res.body.getReader()
const decoder = new TextDecoder()
let buffer = ''

while (true) {
const { value, done } = await reader.read()
Expand All @@ -139,11 +140,17 @@ export class FetchClient<T = any, D = Partial<T>, P extends Params = FetchClient
}

if (value) {
const text = decoder.decode(value)
const eventChunks = text.split('\n\n').filter(Boolean)
buffer += decoder.decode(value, { stream: true })

for (const chunk of eventChunks) {
const lines = chunk.split('\n')
// SSE events are separated by \n\n
const events = buffer.split('\n\n')
// Keep the last potentially incomplete event in the buffer
buffer = events.pop() || ''

for (const event of events) {
if (!event.trim()) continue

const lines = event.split('\n')
const dataLine = lines.find((line) => line.startsWith('data: '))

if (dataLine) {
Expand Down
67 changes: 67 additions & 0 deletions 67 website/content/api/client/rest.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,73 @@ If no `Content-Type` header is specified, streaming requests default to `applica
Streaming uploads are only supported with the REST/HTTP transport. Socket.io does not support streaming request bodies.
::

### Streaming Responses (SSE)

When a service returns an [async generator or async iterable](../http#async-iterators-sse), the server sends the response as Server-Sent Events (SSE). The REST client automatically detects this and returns an async iterable that you can consume with `for await...of`:

```ts
// Server - service returns an async generator
class ChatService {
async *create(data: { prompt: string }) {
const stream = await ai.generateStream(data.prompt)

for await (const chunk of stream) {
yield { type: 'text', content: chunk }
}
}
}

// Client - consume the stream
const response = app.service('chat').create({ prompt: 'Hello' })

for await (const chunk of response) {
console.log(chunk.content) // Streams in real-time
}
```

This is useful for:

- **AI/LLM responses** - Stream tokens as they're generated
- **Progress updates** - Report status during long-running operations
- **Live data feeds** - Push data to clients as it becomes available

```ts
// Example: Streaming AI chat with status updates
class AIChatService {
async *create(data: { messages: Message[] }, params: Params) {
yield { type: 'status', text: 'Thinking...' }

const stream = await llm.chat(data.messages)

for await (const token of stream) {
yield { type: 'text', text: token }
}

yield { type: 'done' }
}
}

// Client
let fullResponse = ''

for await (const event of app.service('ai-chat').create({ messages })) {
if (event.type === 'status') {
showStatus(event.text)
} else if (event.type === 'text') {
fullResponse += event.text
updateUI(fullResponse)
}
}
```

::note[Automatic buffering]
The client automatically handles SSE stream buffering, correctly parsing events even when they arrive split across network chunks. This ensures reliable streaming regardless of network conditions.
::

::warning[REST only]
Streaming responses are only supported with the REST/HTTP transport. For real-time updates over Socket.io, use [channels and events](../channels) instead.
::

### Custom Methods

On the client, [custom service methods](../services#custom-methods) registered using the `methods` option when registering the service via `restClient.service()`:
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.