Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 701453e

Browse filesBrowse files
committed
Add rate-limit visibility feature
1 parent e48716f commit 701453e
Copy full SHA for 701453e

File tree

Expand file treeCollapse file tree

3 files changed

+1235
-4
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

3 files changed

+1235
-4
lines changed
Open diff view settings
Collapse file

‎readme.md‎

Copy file name to clipboardExpand all lines: readme.md
+84Lines changed: 84 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,36 @@ await queue.onPendingZero();
241241
// All running tasks have finished, though the queue may still have items
242242
```
243243

244+
#### .onRateLimit()
245+
246+
Returns a promise that settles when the queue becomes rate-limited due to `intervalCap`. If the queue is already rate-limited, the promise resolves immediately.
247+
248+
Useful for implementing backpressure to prevent memory issues when producers are faster than consumers.
249+
250+
```js
251+
const queue = new PQueue({intervalCap: 5, interval: 1000});
252+
253+
// Add many tasks
254+
for (let index = 0; index < 10; index++) {
255+
queue.add(() => someTask());
256+
}
257+
258+
await queue.onRateLimit();
259+
console.log('Queue is now rate-limited - time for maintenance tasks');
260+
```
261+
262+
#### .onRateLimitCleared()
263+
264+
Returns a promise that settles when the queue is no longer rate-limited. If the queue is not currently rate-limited, the promise resolves immediately.
265+
266+
```js
267+
const queue = new PQueue({intervalCap: 5, interval: 1000});
268+
269+
// Wait for rate limiting to be cleared
270+
await queue.onRateLimitCleared();
271+
console.log('Rate limit cleared - can add more tasks');
272+
```
273+
244274
#### .onSizeLessThan(limit)
245275

246276
Returns a promise that settles when the queue size is less than the given limit: `queue.size < limit`.
@@ -329,6 +359,10 @@ Number of running items (no longer in the queue).
329359

330360
Whether the queue is currently paused.
331361

362+
#### .isRateLimited
363+
364+
Whether the queue is currently rate-limited due to `intervalCap`. Returns `true` when the number of tasks executed in the current interval has reached the `intervalCap` and there are still tasks waiting to be processed.
365+
332366
## Events
333367

334368
#### active
@@ -465,6 +499,56 @@ await queue.add(() => delay(600));
465499
//=> 'Task is completed. Size: 0 Pending: 0'
466500
```
467501

502+
#### rateLimit
503+
504+
Emitted when the queue becomes rate-limited due to `intervalCap`. This happens when the maximum number of tasks allowed per interval has been reached.
505+
506+
Useful for implementing backpressure to prevent memory issues when producers are faster than consumers.
507+
508+
```js
509+
import delay from 'delay';
510+
import PQueue from 'p-queue';
511+
512+
const queue = new PQueue({
513+
intervalCap: 2,
514+
interval: 1000
515+
});
516+
517+
queue.on('rateLimit', () => {
518+
console.log('Queue is rate-limited - processing backlog or maintenance tasks');
519+
});
520+
521+
// Add 3 tasks - third one triggers rate limiting
522+
queue.add(() => delay(100));
523+
queue.add(() => delay(100));
524+
queue.add(() => delay(100));
525+
```
526+
527+
#### rateLimitCleared
528+
529+
Emitted when the queue is no longer rate-limited—either because the interval reset and new tasks can start, or because the backlog was drained.
530+
531+
```js
532+
import delay from 'delay';
533+
import PQueue from 'p-queue';
534+
535+
const queue = new PQueue({
536+
intervalCap: 1,
537+
interval: 1000
538+
});
539+
540+
queue.on('rateLimit', () => {
541+
console.log('Rate limited - waiting for interval to reset');
542+
});
543+
544+
queue.on('rateLimitCleared', () => {
545+
console.log('Rate limit cleared - can process more tasks');
546+
});
547+
548+
queue.add(() => delay(100));
549+
queue.add(() => delay(100)); // This triggers rate limiting
550+
```
551+
468552
## Advanced example
469553

470554
A more advanced example to help you understand the flow.
Collapse file

‎source/index.ts‎

Copy file name to clipboardExpand all lines: source/index.ts
+98-4Lines changed: 98 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ type Task<TaskResultType> =
88
| ((options: TaskOptions) => PromiseLike<TaskResultType>)
99
| ((options: TaskOptions) => TaskResultType);
1010

11-
type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error' | 'pendingZero';
11+
type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error' | 'pendingZero' | 'rateLimit' | 'rateLimitCleared';
1212

1313
/**
1414
Promise queue with concurrency control.
@@ -22,6 +22,9 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
2222

2323
readonly #intervalCap: number;
2424

25+
#rateLimitedInInterval = false;
26+
#rateLimitFlushScheduled = false;
27+
2528
readonly #interval: number;
2629

2730
#intervalEnd = 0;
@@ -84,8 +87,15 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
8487
this.#queue = new options.queueClass!();
8588
this.#queueClass = options.queueClass!;
8689
this.concurrency = options.concurrency!;
90+
91+
if (options.timeout !== undefined && !(Number.isFinite(options.timeout) && options.timeout > 0)) {
92+
throw new TypeError(`Expected \`timeout\` to be a positive finite number, got \`${options.timeout}\` (${typeof options.timeout})`);
93+
}
94+
8795
this.timeout = options.timeout;
8896
this.#isPaused = options.autoStart === false;
97+
98+
this.#setupRateLimitTracking();
8999
}
90100

91101
get #doesIntervalAllowAnother(): boolean {
@@ -108,7 +118,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
108118
}
109119

110120
#onResumeInterval(): void {
111-
this.#onInterval();
121+
this.#onInterval(); // Already schedules update
112122
this.#initializeIntervalIfNeeded();
113123
this.#timeoutId = undefined;
114124
}
@@ -183,6 +193,8 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
183193
return false;
184194
}
185195

196+
let taskStarted = false;
197+
186198
if (!this.#isPaused) {
187199
const canInitializeInterval = !this.#isIntervalPaused;
188200
if (this.#doesIntervalAllowAnother && this.#doesConcurrentAllowAnother) {
@@ -191,6 +203,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
191203
// Increment interval count immediately to prevent race conditions
192204
if (!this.#isIntervalIgnored) {
193205
this.#intervalCount++;
206+
this.#scheduleRateLimitUpdate();
194207
}
195208

196209
this.emit('active');
@@ -201,11 +214,11 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
201214
this.#initializeIntervalIfNeeded();
202215
}
203216

204-
return true;
217+
taskStarted = true;
205218
}
206219
}
207220

208-
return false;
221+
return taskStarted;
209222
}
210223

211224
#initializeIntervalIfNeeded(): void {
@@ -229,7 +242,9 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
229242
}
230243

231244
this.#intervalCount = this.#carryoverConcurrencyCount ? this.#pending : 0;
245+
232246
this.#processQueue();
247+
this.#scheduleRateLimitUpdate();
233248
}
234249

235250
/**
@@ -299,6 +314,10 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
299314
Here, the promise function with `id: '🦀'` executes last.
300315
*/
301316
setPriority(id: string, priority: number) {
317+
if (typeof priority !== 'number' || !Number.isFinite(priority)) {
318+
throw new TypeError(`Expected \`priority\` to be a finite number, got \`${priority}\` (${typeof priority})`);
319+
}
320+
302321
this.#queue.setPriority(id, priority);
303322
}
304323

@@ -405,6 +424,8 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
405424
*/
406425
clear(): void {
407426
this.#queue = new this.#queueClass();
427+
// Force synchronous update since clear() should have immediate effect
428+
this.#updateRateLimitState();
408429
}
409430

410431
/**
@@ -464,6 +485,28 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
464485
await this.#onEvent('pendingZero');
465486
}
466487

488+
/**
489+
@returns A promise that settles when the queue becomes rate-limited due to intervalCap.
490+
*/
491+
async onRateLimit(): Promise<void> {
492+
if (this.isRateLimited) {
493+
return;
494+
}
495+
496+
await this.#onEvent('rateLimit');
497+
}
498+
499+
/**
500+
@returns A promise that settles when the queue is no longer rate-limited.
501+
*/
502+
async onRateLimitCleared(): Promise<void> {
503+
if (!this.isRateLimited) {
504+
return;
505+
}
506+
507+
await this.#onEvent('rateLimitCleared');
508+
}
509+
467510
async #onEvent(event: EventName, filter?: () => boolean): Promise<void> {
468511
return new Promise(resolve => {
469512
const listener = () => {
@@ -509,6 +552,57 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
509552
get isPaused(): boolean {
510553
return this.#isPaused;
511554
}
555+
556+
#setupRateLimitTracking(): void {
557+
// Only schedule updates when rate limiting is enabled
558+
if (this.#isIntervalIgnored) {
559+
return;
560+
}
561+
562+
// Wire up to lifecycle events that affect rate limit state
563+
// Only 'add' and 'next' can actually change rate limit state
564+
this.on('add', () => {
565+
if (this.#queue.size > 0) {
566+
this.#scheduleRateLimitUpdate();
567+
}
568+
});
569+
570+
this.on('next', () => {
571+
this.#scheduleRateLimitUpdate();
572+
});
573+
}
574+
575+
#scheduleRateLimitUpdate(): void {
576+
// Skip if rate limiting is not enabled or already scheduled
577+
if (this.#isIntervalIgnored || this.#rateLimitFlushScheduled) {
578+
return;
579+
}
580+
581+
this.#rateLimitFlushScheduled = true;
582+
queueMicrotask(() => {
583+
this.#rateLimitFlushScheduled = false;
584+
this.#updateRateLimitState();
585+
});
586+
}
587+
588+
#updateRateLimitState(): void {
589+
const previous = this.#rateLimitedInInterval;
590+
const shouldBeRateLimited = !this.#isIntervalIgnored
591+
&& this.#intervalCount >= this.#intervalCap
592+
&& this.#queue.size > 0;
593+
594+
if (shouldBeRateLimited !== previous) {
595+
this.#rateLimitedInInterval = shouldBeRateLimited;
596+
this.emit(shouldBeRateLimited ? 'rateLimit' : 'rateLimitCleared');
597+
}
598+
}
599+
600+
/**
601+
Whether the queue is currently rate-limited due to intervalCap.
602+
*/
603+
get isRateLimited(): boolean {
604+
return this.#rateLimitedInInterval;
605+
}
512606
}
513607

514608
export type {Queue} from './queue.js';

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.