diff --git a/docs/api/client/rest.md b/docs/api/client/rest.md index a6e188d4e3..a61d785484 100644 --- a/docs/api/client/rest.md +++ b/docs/api/client/rest.md @@ -22,7 +22,7 @@ The following chapter describes the use of npm install @feathersjs/rest-client --save ``` -`@feathersjs/rest-client` allows to connect to a service exposed through a REST HTTP transport (e.g. with [Koa](../koa.md#rest) or [Express](../express.md#rest)) using [fetch](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API), [Superagent](https://github.com/ladjs/superagent) or [Axios](https://github.com/mzabriskie/axios). +`@feathersjs/rest-client` allows to connect to a service exposed through a REST HTTP transport (e.g. with [Koa](../koa.md#rest) or [Express](../express.md#rest)) using [fetch](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API), [Superagent](https://github.com/ladjs/superagent) or [Axios](https://github.com/mzabriskie/axios).
@@ -226,6 +226,91 @@ File uploads use the native `Request.formData()` API which buffers the entire re
+### Streaming Uploads + +The REST client supports streaming data to services using `ReadableStream`. This is useful for large file uploads, real-time data ingestion, or piping data directly to storage without buffering. + +```ts +// Stream a file to a service +const file = fileInput.files[0] +const stream = file.stream() + +const result = await app.service('uploads').create(stream, { + headers: { + 'Content-Type': file.type, + 'X-Filename': file.name + } +}) +``` + +On the server, the service receives the `ReadableStream` directly: + +```ts +class UploadService { + async create(stream: ReadableStream, params: Params) { + const filename = params.headers['x-filename'] + const contentType = params.headers['content-type'] + + // Pipe directly to storage - no buffering + await storage.upload(filename, stream, { contentType }) + + return { filename, uploaded: true } + } +} +``` + +The stream can be piped directly to cloud storage (S3, R2, etc.) without loading the entire file into memory: + +```ts +async create(stream: ReadableStream, params: Params) { + // Stream directly to R2/S3 + await env.MY_BUCKET.put(params.headers['x-filename'], stream) + return { success: true } +} +``` + +For more complex metadata, you can stringify an object into a header: + +```ts +// Client +const file = fileInput.files[0] + +await app.service('csv-import').create(file.stream(), { + headers: { + 'Content-Type': 'text/csv', + 'X-Import-Options': JSON.stringify({ + filename: file.name, + tableName: 'products', + skipHeader: true + }) + } +}) + +// Server +async create(stream: ReadableStream, params: Params) { + const options = JSON.parse(params.headers['x-import-options']) + // options.filename, options.tableName, options.skipHeader +} +``` + +
+ +HTTP headers are typically limited to 8KB total. Keep metadata small - use headers for filenames, options, and IDs, not large data payloads. + +
+ +
+ +If no `Content-Type` header is specified, streaming requests default to `application/octet-stream`. Any content type not recognized as JSON, form-urlencoded, or multipart will be streamed through to the service. + +
+ +
+ +Streaming uploads are only supported with the REST/HTTP transport. Socket.io does not support streaming request bodies. + +
+ ### Custom Methods On the client, [custom service methods](../services.md#custom-methods) registered using the `methods` option when registering the service via `restClient.service()`: diff --git a/docs/api/hooks.md b/docs/api/hooks.md index 0709f96cf7..45c7dee43f 100644 --- a/docs/api/hooks.md +++ b/docs/api/hooks.md @@ -199,6 +199,74 @@ If you want to inspect the hook context, e.g. via `console.log`, the object retu +#### Working with Streams + +When using [streaming uploads](./client/rest.md#streaming-uploads), `context.data` will be a `ReadableStream`. Since streams can only be consumed once, around hooks are the recommended way to work with streaming data. Here are common patterns: + +**Passing streams through unchanged:** + +If you only need to validate metadata or check permissions, you can let the stream pass through to the service: + +```ts +app.service('uploads').hooks({ + around: { + create: [ + async (context: HookContext, next: NextFunction) => { + // Validate using headers - don't consume the stream + const contentType = context.params.headers?.['content-type'] + if (!contentType?.startsWith('image/')) { + throw new BadRequest('Only images are allowed') + } + + // Stream passes through unchanged + await next() + } + ] + } +}) +``` + +**Wrapping streams with transforms:** + +You can wrap the incoming stream with a transform stream for processing: + +```ts +import { TransformStream } from 'node:stream/web' + +app.service('uploads').hooks({ + around: { + create: [ + async (context: HookContext, next: NextFunction) => { + const originalStream = context.data as ReadableStream + + // Create a transform that tracks bytes + let totalBytes = 0 + const countingTransform = new TransformStream({ + transform(chunk, controller) { + totalBytes += chunk.length + controller.enqueue(chunk) + } + }) + + // Replace with transformed stream + context.data = originalStream.pipeThrough(countingTransform) + + await next() + + // After service completes, totalBytes is available + context.result.size = totalBytes + } + ] + } +}) +``` + +
+ +Streams can only be consumed once. If you need to read the stream content in a hook (e.g., for validation), you must either buffer the entire stream or use a tee/transform approach. For large files, prefer validating metadata from headers rather than consuming the stream. + +
+ ### `context.error` `context.error` is a **writeable** property with the error object that was thrown in a failed method call. It can be modified to change the error that is returned at the end. diff --git a/docs/api/http.md b/docs/api/http.md new file mode 100644 index 0000000000..12b5375420 --- /dev/null +++ b/docs/api/http.md @@ -0,0 +1,370 @@ +--- +outline: deep +--- + +# HTTP + +The `feathers/http` module provides a Web Standard HTTP handler that works across all JavaScript runtimes: Node.js, Deno, Bun, and Cloudflare Workers. + +## createHandler + +`createHandler(app, middleware?)` creates a Web Standard request handler that processes HTTP requests for your Feathers application. + +```ts +import { feathers } from 'feathers' +import { createHandler } from 'feathers/http' + +const app = feathers() + +app.use('messages', { + async find() { + return [{ id: 1, text: 'Hello world' }] + } +}) + +const handler = createHandler(app) +``` + +The handler has the signature `(request: Request) => Promise` which is the Web Standard used by Deno, Bun, and Cloudflare Workers. + +### Options + +- `app` - The Feathers application +- `middleware` - Optional array of middleware. Defaults to `[errorHandler(), queryParser(), bodyParser()]` + +## Runtime Usage + +### Deno + +```ts +import { feathers } from 'feathers' +import { createHandler } from 'feathers/http' + +const app = feathers() +// ... configure your app + +const handler = createHandler(app) + +Deno.serve({ port: 3030 }, handler) +``` + +### Bun + +```ts +import { feathers } from 'feathers' +import { createHandler } from 'feathers/http' + +const app = feathers() +// ... configure your app + +const handler = createHandler(app) + +Bun.serve({ + port: 3030, + fetch: handler +}) +``` + +### Cloudflare Workers + +```ts +import { feathers } from 'feathers' +import { createHandler } from 'feathers/http' + +const app = feathers() +// ... configure your app + +const handler = createHandler(app) + +export default { + fetch: handler +} +``` + +### Node.js + +Node.js does not have a built-in Web Standard HTTP server, so an adapter is required. The `toNodeHandler` function converts the Web Standard handler to work with Node's `http.createServer`. + +```ts +import { createServer } from 'node:http' +import { feathers } from 'feathers' +import { createHandler } from 'feathers/http' +import { toNodeHandler } from 'feathers/http/node' + +const app = feathers() +// ... configure your app + +const handler = createHandler(app) +const server = createServer(toNodeHandler(handler)) + +server.listen(3030, () => { + console.log('Server running on http://localhost:3030') +}) + +// Call app.setup to initialize all services +await app.setup(server) +``` + +## toNodeHandler + +`toNodeHandler(handler)` converts a Web Standard `(Request) => Promise` handler to Node's `(IncomingMessage, ServerResponse) => void` signature. + +```ts +import { toNodeHandler } from 'feathers/http/node' + +const nodeHandler = toNodeHandler(handler) +``` + +This adapter: + +- Converts Node's `IncomingMessage` to a Web Standard `Request` +- Buffers JSON, form-urlencoded, and multipart requests for proper parsing +- Streams all other content types directly +- Writes the Web Standard `Response` back to Node's `ServerResponse` + +## Middleware + +The HTTP handler uses a middleware chain for request processing. The default middleware handles common tasks like error handling, query parsing, and body parsing. + +### errorHandler + +Catches errors and returns them as properly formatted JSON responses with appropriate status codes. + +```ts +import { errorHandler } from 'feathers/http' +``` + +### queryParser + +Parses URL query parameters using [qs](https://www.npmjs.com/package/qs) and adds them to `params.query`. + +```ts +import { queryParser } from 'feathers/http' + +// With custom parser +import qs from 'qs' +queryParser((query) => qs.parse(query, { arrayLimit: 200 })) +``` + +### bodyParser + +Parses request bodies based on content type: + +| Content-Type | Parsing Method | +| ----------------------------------- | ------------------------------------ | +| `application/json` | `request.json()` | +| `application/x-www-form-urlencoded` | `request.text()` → `URLSearchParams` | +| `multipart/form-data` | `request.formData()` | +| Everything else | Streams `request.body` directly | + +```ts +import { bodyParser } from 'feathers/http' +``` + +### Custom Middleware + +You can provide custom middleware to the handler: + +```ts +import { createHandler, errorHandler, queryParser, bodyParser } from 'feathers/http' + +const customLogger = async (context, next) => { + console.log(`${context.request.method} ${context.request.url}`) + await next() +} + +const handler = createHandler(app, [errorHandler(), customLogger, queryParser(), bodyParser()]) +``` + +## params + +### params.query + +Contains the URL query parameters parsed by the `queryParser` middleware. + +```ts +// GET /messages?status=read&limit=10 +// params.query = { status: 'read', limit: '10' } +``` + +### params.provider + +For any service method call made through HTTP, `params.provider` will be set to `'rest'`. + +### params.headers + +Contains the request headers as a plain object. + +```ts +// params.headers = { 'content-type': 'application/json', ... } +``` + +### params.route + +Route placeholders in a service URL will be added to `params.route`. + +```ts +app.use('users/:userId/messages', messageService) + +// GET /users/123/messages +// params.route = { userId: '123' } +``` + +### params.request + +The original Web Standard `Request` object is available as `params.request`. + +## Content Types + +### JSON + +Standard JSON requests and responses: + +```ts +// Request +POST /messages +Content-Type: application/json + +{ "text": "Hello world" } + +// Service receives +data = { text: 'Hello world' } +``` + +### Form Data + +URL-encoded and multipart form data are automatically parsed: + +```ts +// Request +POST /messages +Content-Type: application/x-www-form-urlencoded + +text=Hello+world&status=sent + +// Service receives +data = { text: 'Hello world', status: 'sent' } +``` + +### File Uploads + +Multipart file uploads use the Web Standard `FormData` API: + +```ts +// Request +POST /uploads +Content-Type: multipart/form-data + +// Service receives +data = { + file: File, // Web Standard File object + description: 'string' +} +``` + +Multiple files with the same field name become an array: + +```ts +data = { + files: [File, File, File] +} +``` + +### Streaming + +Non-buffered content types are streamed directly to the service: + +```ts +class UploadService { + async create(stream: ReadableStream, params: Params) { + // Stream data directly to storage + const reader = stream.getReader() + + while (true) { + const { done, value } = await reader.read() + if (done) break + // Process chunks + } + + return { uploaded: true } + } +} +``` + +## Returning Responses + +Services can return a Web Standard `Response` directly for full control: + +```ts +class DownloadService { + async get(id: string) { + const file = await storage.get(id) + + return new Response(file.stream, { + headers: { + 'Content-Type': file.contentType, + 'Content-Disposition': `attachment; filename="${file.name}"` + } + }) + } +} +``` + +## Async Iterators (SSE) + +Services can return async iterators for Server-Sent Events: + +```ts +class StreamService { + async find() { + return (async function* () { + for (let i = 0; i < 10; i++) { + yield { count: i } + await new Promise((resolve) => setTimeout(resolve, 1000)) + } + })() + } +} +``` + +The response will be sent as `text/event-stream`: + +``` +data: {"count":0} + +data: {"count":1} + +data: {"count":2} +... +``` + +## CORS + +The handler automatically sets CORS headers based on the request's `Origin` header: + +``` +Access-Control-Allow-Origin: +``` + +For preflight `OPTIONS` requests, the handler returns: + +``` +Access-Control-Allow-Origin: +Access-Control-Allow-Methods: GET, POST, PUT, PATCH, DELETE, OPTIONS +Access-Control-Allow-Headers: accept, accept-language, content-language, content-type, range, authorization, x-service-method +Access-Control-Allow-Credentials: true +``` + +## Custom Methods + +[Custom service methods](./services.md#custom-methods) can be called via HTTP by setting the `X-Service-Method` header: + +``` +POST /messages +X-Service-Method: myCustomMethod +Content-Type: application/json + +{ "data": "value" } +``` + +This will call `messages.myCustomMethod({ data: 'value' }, params)`. diff --git a/docs/api/index.md b/docs/api/index.md index 71a56bae7c..c18c197b13 100644 --- a/docs/api/index.md +++ b/docs/api/index.md @@ -20,11 +20,12 @@ Feathers core functionality that works on the client and the server Expose a Feathers application as an API server -- [Configuration](./configuration.md) - A node-config wrapper to initialize configuration of a server side application. +- [HTTP](./http.md) - Web Standard HTTP handler for Deno, Bun, Cloudflare Workers, and Node.js - [Koa](./koa.md) - Feathers KoaJS framework bindings, REST API provider and error middleware. - [Express](./express.md) - Feathers Express framework bindings, REST API provider and error middleware. - [Socket.io](./socketio.md) - The Socket.io real-time transport provider - [Channels](./channels.md) - Channels are used to send real-time events to clients +- [Configuration](./configuration.md) - A node-config wrapper to initialize configuration of a server side application. ## Authentication diff --git a/packages/feathers/fixtures/index.ts b/packages/feathers/fixtures/index.ts index 7e4d63f46f..7a5c6cb0ac 100644 --- a/packages/feathers/fixtures/index.ts +++ b/packages/feathers/fixtures/index.ts @@ -1,94 +1,41 @@ -import { createServer, IncomingMessage, ServerResponse } from 'node:http' +import { createServer } from 'node:http' import { TestService } from './fixture.js' import { feathers, Application, Params } from '../src/index.js' import { createHandler, SseService } from '../src/http/index.js' +import { toNodeHandler } from '../src/http/node.js' export * from './client.js' export * from './rest.js' export * from './fixture.js' -/** - * Creates a native Node.js HTTP adapter that properly converts - * IncomingMessage to a standard Request object. - * This avoids bugs in @whatwg-node/server with FormData handling. - */ -function createNativeAdapter(handler: (request: Request) => Promise) { - return async (req: IncomingMessage, res: ServerResponse) => { - // Collect body chunks - const chunks: Buffer[] = [] - for await (const chunk of req) { - chunks.push(chunk as Buffer) - } - const body = Buffer.concat(chunks) - - // Build headers object - const headers = new Headers() - for (const [key, value] of Object.entries(req.headers)) { - if (value) { - if (Array.isArray(value)) { - value.forEach((v) => headers.append(key, v)) - } else { - headers.set(key, value) - } - } - } - - // Create the Request object - const url = `http://${req.headers.host || 'localhost'}${req.url}` - const request = new Request(url, { - method: req.method, - headers, - body: body.length > 0 ? body : undefined, - // @ts-expect-error duplex is required for streaming bodies in Node - duplex: 'half' - }) - - // Call the handler and get the Response - const response = await handler(request) - - // Write the response - res.statusCode = response.status - - response.headers.forEach((value, key) => { - res.setHeader(key, value) - }) - - if (response.body) { - const reader = response.body.getReader() - while (true) { - const { done, value } = await reader.read() - if (done) break - res.write(value) - } - } - - res.end() - } -} +export type UploadInput = FormData | UploadResult -export type UploadData = { +export type UploadResult = { + id?: number | string + status?: string + provider?: string file?: File | File[] files?: File | File[] description?: string name?: string tags?: string | string[] - [key: string]: File | File[] | string | string[] | undefined + [key: string]: File | File[] | string | string[] | number | undefined } export class UploadService { - async create(data: UploadData, params: Params) { + async create(data: UploadInput, params: Params): Promise { return { - ...data, + ...(data as UploadResult), id: 1, status: 'uploaded', provider: params.provider } } - async patch(id: number | string, data: UploadData, params: Params) { + async patch(id: number | string, data: UploadInput, params: Params): Promise { return { - ...data, + ...(data as UploadResult), id, status: 'patched', provider: params.provider @@ -96,6 +43,37 @@ export class UploadService { } } +export class StreamingService { + async create(data: ReadableStream, params: Params) { + // Consume the stream and collect the data + const chunks: Uint8Array[] = [] + const reader = data.getReader() + + while (true) { + const { done, value } = await reader.read() + if (done) break + chunks.push(value) + } + + const totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0) + const combined = new Uint8Array(totalLength) + let offset = 0 + for (const chunk of chunks) { + combined.set(chunk, offset) + offset += chunk.length + } + + const text = new TextDecoder().decode(combined) + + return { + received: text, + size: totalLength, + contentType: params.headers?.['content-type'] || 'unknown', + provider: params.provider + } + } +} + export class ResponseTestService { async find() { return new Response('Plain text', { @@ -132,6 +110,7 @@ export class ResponseTestService { export type TestServiceTypes = { todos: TestService uploads: UploadService + streaming: StreamingService test: ResponseTestService sse: SseService } @@ -147,6 +126,9 @@ export function getApp(): TestApplication { app.use('uploads', new UploadService(), { methods: ['create', 'patch'] }) + app.use('streaming', new StreamingService(), { + methods: ['create'] + }) app.use('test', new ResponseTestService()) app.use('sse', new SseService()) @@ -155,8 +137,7 @@ export function getApp(): TestApplication { export async function createTestServer(port: number, app: TestApplication) { const handler = createHandler(app) - // Use native Node.js adapter for proper FormData handling - const nodeServer = createServer(createNativeAdapter(handler)) + const nodeServer = createServer(toNodeHandler(handler)) await new Promise((resolve) => { nodeServer.listen(port, () => resolve()) diff --git a/packages/feathers/package.json b/packages/feathers/package.json index 32572c0883..18912c2607 100644 --- a/packages/feathers/package.json +++ b/packages/feathers/package.json @@ -24,7 +24,8 @@ "./commons": "./lib/commons.js", "./errors": "./lib/errors.js", "./client": "./lib/client/index.js", - "./http": "./lib/http/index.js" + "./http": "./lib/http/index.js", + "./http/node": "./lib/http/node.js" }, "typesVersions": { "*": { @@ -42,6 +43,9 @@ ], "http": [ "./lib/http/index.d.ts" + ], + "http/node": [ + "./lib/http/node.d.ts" ] } }, diff --git a/packages/feathers/src/client/fetch.test.ts b/packages/feathers/src/client/fetch.test.ts index b0641c3393..09a647332a 100644 --- a/packages/feathers/src/client/fetch.test.ts +++ b/packages/feathers/src/client/fetch.test.ts @@ -136,7 +136,7 @@ describe('fetch REST connector', function () { formData.append('description', 'FormData test') formData.append('name', 'test-file') - const result = await app.service('uploads').create(formData) + const result = await app.service('uploads').create(formData, {}) // Single FormData fields are unwrapped on the server expect(result.description).toBe('FormData test') @@ -151,7 +151,7 @@ describe('fetch REST connector', function () { formData.append('tags', 'two') formData.append('description', 'Multi-value test') - const result = await app.service('uploads').create(formData) + const result = await app.service('uploads').create(formData, {}) // Multiple values become array, single values unwrapped expect(result.tags).toEqual(['one', 'two']) @@ -162,12 +162,47 @@ describe('fetch REST connector', function () { const formData = new FormData() formData.append('description', 'Patched with FormData') - const result = await app.service('uploads').patch(42, formData) + const result = await app.service('uploads').patch(42, formData, {}) expect(result.description).toBe('Patched with FormData') expect(result.id).toBe('42') // ID comes from URL path, returned as string expect(result.status).toBe('patched') }) + it('supports streaming request body with ReadableStream', async () => { + const data = 'Streamed from client!' + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(data)) + controller.close() + } + }) + + const result = await app.service('streaming').create(stream as any, { + headers: { + 'Content-Type': 'text/plain' + } + }) + + expect(result.received).toBe(data) + expect(result.size).toBe(data.length) + expect(result.contentType).toBe('text/plain') + }) + + it('defaults to application/octet-stream for streams without Content-Type', async () => { + const data = 'Binary-ish data' + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(data)) + controller.close() + } + }) + + const result = await app.service('streaming').create(stream, {}) + + expect(result.received).toBe(data) + expect(result.contentType).toBe('application/octet-stream') + }) + clientTests(app, 'todos') }) diff --git a/packages/feathers/src/client/fetch.ts b/packages/feathers/src/client/fetch.ts index 81d6a2fa2e..bda3af303b 100644 --- a/packages/feathers/src/client/fetch.ts +++ b/packages/feathers/src/client/fetch.ts @@ -76,8 +76,19 @@ export class FetchClient, P extends Params = FetchClient } if (options.body) { + // Pass through FormData directly (browser sets Content-Type with boundary) if (options.body instanceof FormData) { fetchOptions.body = options.body + } else if (options.body instanceof ReadableStream) { + // Pass through ReadableStream directly for streaming uploads + fetchOptions.body = options.body + // @ts-expect-error duplex is required for streaming bodies + fetchOptions.duplex = 'half' + // Default to application/octet-stream if no Content-Type specified + fetchOptions.headers = { + 'Content-Type': 'application/octet-stream', + ...fetchOptions.headers + } } else { fetchOptions.body = JSON.stringify(options.body) fetchOptions.headers = { diff --git a/packages/feathers/src/http/index.test.ts b/packages/feathers/src/http/index.test.ts index f8ea908cc1..2d0e1e02a1 100644 --- a/packages/feathers/src/http/index.test.ts +++ b/packages/feathers/src/http/index.test.ts @@ -226,5 +226,92 @@ describe('http test', () => { }) }) + describe('streaming request body', () => { + it('streams text data to a service', async () => { + const data = 'Hello, this is streamed text data!' + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(data)) + controller.close() + } + }) + + const res = await fetch(`http://localhost:${TEST_PORT}/streaming`, { + method: 'POST', + headers: { + 'Content-Type': 'text/plain' + }, + body: stream, + // @ts-expect-error duplex required for streaming + duplex: 'half' + }) + + expect(res.status).toBe(201) + + const result = await res.json() + expect(result.received).toBe(data) + expect(result.size).toBe(data.length) + expect(result.contentType).toBe('text/plain') + }) + + it('streams binary data to a service', async () => { + const bytes = new Uint8Array([0x48, 0x65, 0x6c, 0x6c, 0x6f]) // "Hello" + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(bytes) + controller.close() + } + }) + + const res = await fetch(`http://localhost:${TEST_PORT}/streaming`, { + method: 'POST', + headers: { + 'Content-Type': 'application/octet-stream' + }, + body: stream, + // @ts-expect-error duplex required for streaming + duplex: 'half' + }) + + expect(res.status).toBe(201) + + const result = await res.json() + expect(result.received).toBe('Hello') + expect(result.size).toBe(5) + }) + + it('streams chunked data to a service', async () => { + const chunks = ['chunk1', 'chunk2', 'chunk3'] + let chunkIndex = 0 + + const stream = new ReadableStream({ + pull(controller) { + if (chunkIndex < chunks.length) { + controller.enqueue(new TextEncoder().encode(chunks[chunkIndex])) + chunkIndex++ + } else { + controller.close() + } + } + }) + + const res = await fetch(`http://localhost:${TEST_PORT}/streaming`, { + method: 'POST', + headers: { + 'Content-Type': 'text/plain' + }, + body: stream, + // @ts-expect-error duplex required for streaming + duplex: 'half' + }) + + expect(res.status).toBe(201) + + const result = await res.json() + expect(result.received).toBe('chunk1chunk2chunk3') + expect(result.size).toBe(18) + }) + }) + restTests('http', 'todos', TEST_PORT) }) diff --git a/packages/feathers/src/http/middleware.ts b/packages/feathers/src/http/middleware.ts index 1c32bb8d1b..908fd2a586 100644 --- a/packages/feathers/src/http/middleware.ts +++ b/packages/feathers/src/http/middleware.ts @@ -37,19 +37,21 @@ export function bodyParser() { const contentType = context.request.headers.get('content-type') if (BODY_METHODS.includes(context.request.method)) { - const request = context.request.clone() - try { if (contentType?.includes('application/json')) { + const request = context.request.clone() context.data = await request.json() } else if (contentType?.includes('application/x-www-form-urlencoded')) { + const request = context.request.clone() context.data = Object.fromEntries(new URLSearchParams(await request.text())) } else if (contentType?.includes('multipart/form-data')) { + const request = context.request.clone() context.data = formDataToObject(await request.formData()) } else { - throw new Error('Invalid content type') + // Stream all other content types directly to the service + context.data = context.request.body as any } - } catch (error) { + } catch (_error) { throw new BadRequest('Invalid request body') } } diff --git a/packages/feathers/src/http/node.ts b/packages/feathers/src/http/node.ts new file mode 100644 index 0000000000..3530af4ea5 --- /dev/null +++ b/packages/feathers/src/http/node.ts @@ -0,0 +1,121 @@ +import type { IncomingMessage, ServerResponse } from 'node:http' + +/** + * Content types that require buffering (structured data formats). + * These cannot be streamed because they need to be fully parsed. + */ +const BUFFERED_CONTENT_TYPES = [ + 'multipart/form-data', + 'application/x-www-form-urlencoded', + 'application/json' +] + +/** + * Converts a Node.js IncomingMessage to a Web Standard Request. + */ +async function toRequest(req: IncomingMessage): Promise { + const headers = new Headers() + + for (const [key, value] of Object.entries(req.headers)) { + if (value) { + if (Array.isArray(value)) { + value.forEach((v) => headers.append(key, v)) + } else { + headers.set(key, value) + } + } + } + + const url = `http://${req.headers.host || 'localhost'}${req.url}` + const method = req.method || 'GET' + const contentType = req.headers['content-type'] || '' + const hasBody = ['POST', 'PUT', 'PATCH'].includes(method) + + if (!hasBody) { + return new Request(url, { method, headers }) + } + + const needsBuffering = BUFFERED_CONTENT_TYPES.some((type) => contentType.includes(type)) + + if (needsBuffering) { + const chunks: Uint8Array[] = [] + + for await (const chunk of req) { + chunks.push(chunk instanceof Uint8Array ? chunk : new Uint8Array(chunk)) + } + + const totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0) + const body = new Uint8Array(totalLength) + let offset = 0 + + for (const chunk of chunks) { + body.set(chunk, offset) + offset += chunk.length + } + + return new Request(url, { + method, + headers, + body: body.length > 0 ? body : undefined + }) + } + + // Stream non-buffered content types + return new Request(url, { + method, + headers, + body: req as unknown as ReadableStream, + duplex: 'half' + } as RequestInit) +} + +/** + * Writes a Web Standard Response to a Node.js ServerResponse. + */ +async function writeResponse(response: Response, res: ServerResponse): Promise { + res.statusCode = response.status + + response.headers.forEach((value, key) => { + res.setHeader(key, value) + }) + + if (response.body) { + const reader = response.body.getReader() + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + res.write(value) + } + } finally { + reader.releaseLock() + } + } + + res.end() +} + +/** + * Creates a Node.js HTTP request handler from a Web Standard handler. + * + * @example + * ```typescript + * import { createServer } from 'node:http' + * import { createHandler } from '@feathersjs/feathers/http' + * import { toNodeHandler } from '@feathersjs/feathers/http/node' + * + * const app = feathers() + * const handler = createHandler(app) + * const server = createServer(toNodeHandler(handler)) + * + * server.listen(3000) + * ``` + */ +export function toNodeHandler(handler: (request: Request) => Promise) { + return async (req: IncomingMessage, res: ServerResponse): Promise => { + const request = await toRequest(req) + const response = await handler(request) + await writeResponse(response, res) + } +}