@@ -8,7 +8,7 @@ type Task<TaskResultType> =
88 | ( ( options : TaskOptions ) => PromiseLike < TaskResultType > )
99 | ( ( options : TaskOptions ) => TaskResultType ) ;
1010
11- type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error' | 'pendingZero' ;
11+ type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error' | 'pendingZero' | 'rateLimit' | 'rateLimitCleared' ;
1212
1313/**
1414Promise queue with concurrency control.
@@ -22,6 +22,9 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
2222
2323 readonly #intervalCap: number ;
2424
25+ #rateLimitedInInterval = false ;
26+ #rateLimitFlushScheduled = false ;
27+
2528 readonly #interval: number ;
2629
2730 #intervalEnd = 0 ;
@@ -84,8 +87,15 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
8487 this . #queue = new options . queueClass ! ( ) ;
8588 this . #queueClass = options . queueClass ! ;
8689 this . concurrency = options . concurrency ! ;
90+
91+ if ( options . timeout !== undefined && ! ( Number . isFinite ( options . timeout ) && options . timeout > 0 ) ) {
92+ throw new TypeError ( `Expected \`timeout\` to be a positive finite number, got \`${ options . timeout } \` (${ typeof options . timeout } )` ) ;
93+ }
94+
8795 this . timeout = options . timeout ;
8896 this . #isPaused = options . autoStart === false ;
97+
98+ this . #setupRateLimitTracking( ) ;
8999 }
90100
91101 get #doesIntervalAllowAnother( ) : boolean {
@@ -108,7 +118,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
108118 }
109119
110120 #onResumeInterval( ) : void {
111- this . #onInterval( ) ;
121+ this . #onInterval( ) ; // Already schedules update
112122 this . #initializeIntervalIfNeeded( ) ;
113123 this . #timeoutId = undefined ;
114124 }
@@ -183,6 +193,8 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
183193 return false ;
184194 }
185195
196+ let taskStarted = false ;
197+
186198 if ( ! this . #isPaused) {
187199 const canInitializeInterval = ! this . #isIntervalPaused;
188200 if ( this . #doesIntervalAllowAnother && this . #doesConcurrentAllowAnother) {
@@ -191,6 +203,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
191203 // Increment interval count immediately to prevent race conditions
192204 if ( ! this . #isIntervalIgnored) {
193205 this . #intervalCount++ ;
206+ this . #scheduleRateLimitUpdate( ) ;
194207 }
195208
196209 this . emit ( 'active' ) ;
@@ -201,11 +214,11 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
201214 this . #initializeIntervalIfNeeded( ) ;
202215 }
203216
204- return true ;
217+ taskStarted = true ;
205218 }
206219 }
207220
208- return false ;
221+ return taskStarted ;
209222 }
210223
211224 #initializeIntervalIfNeeded( ) : void {
@@ -229,7 +242,9 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
229242 }
230243
231244 this . #intervalCount = this . #carryoverConcurrencyCount ? this . #pending : 0 ;
245+
232246 this . #processQueue( ) ;
247+ this . #scheduleRateLimitUpdate( ) ;
233248 }
234249
235250 /**
@@ -299,6 +314,10 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
299314 Here, the promise function with `id: '🦀'` executes last.
300315 */
301316 setPriority ( id : string , priority : number ) {
317+ if ( typeof priority !== 'number' || ! Number . isFinite ( priority ) ) {
318+ throw new TypeError ( `Expected \`priority\` to be a finite number, got \`${ priority } \` (${ typeof priority } )` ) ;
319+ }
320+
302321 this . #queue. setPriority ( id , priority ) ;
303322 }
304323
@@ -405,6 +424,8 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
405424 */
406425 clear ( ) : void {
407426 this . #queue = new this . #queueClass( ) ;
427+ // Force synchronous update since clear() should have immediate effect
428+ this . #updateRateLimitState( ) ;
408429 }
409430
410431 /**
@@ -464,6 +485,28 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
464485 await this . #onEvent( 'pendingZero' ) ;
465486 }
466487
488+ /**
489+ @returns A promise that settles when the queue becomes rate-limited due to intervalCap.
490+ */
491+ async onRateLimit ( ) : Promise < void > {
492+ if ( this . isRateLimited ) {
493+ return ;
494+ }
495+
496+ await this . #onEvent( 'rateLimit' ) ;
497+ }
498+
499+ /**
500+ @returns A promise that settles when the queue is no longer rate-limited.
501+ */
502+ async onRateLimitCleared ( ) : Promise < void > {
503+ if ( ! this . isRateLimited ) {
504+ return ;
505+ }
506+
507+ await this . #onEvent( 'rateLimitCleared' ) ;
508+ }
509+
467510 async #onEvent( event : EventName , filter ?: ( ) => boolean ) : Promise < void > {
468511 return new Promise ( resolve => {
469512 const listener = ( ) => {
@@ -509,6 +552,57 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
509552 get isPaused ( ) : boolean {
510553 return this . #isPaused;
511554 }
555+
556+ #setupRateLimitTracking( ) : void {
557+ // Only schedule updates when rate limiting is enabled
558+ if ( this . #isIntervalIgnored) {
559+ return ;
560+ }
561+
562+ // Wire up to lifecycle events that affect rate limit state
563+ // Only 'add' and 'next' can actually change rate limit state
564+ this . on ( 'add' , ( ) => {
565+ if ( this . #queue. size > 0 ) {
566+ this . #scheduleRateLimitUpdate( ) ;
567+ }
568+ } ) ;
569+
570+ this . on ( 'next' , ( ) => {
571+ this . #scheduleRateLimitUpdate( ) ;
572+ } ) ;
573+ }
574+
575+ #scheduleRateLimitUpdate( ) : void {
576+ // Skip if rate limiting is not enabled or already scheduled
577+ if ( this . #isIntervalIgnored || this . #rateLimitFlushScheduled) {
578+ return ;
579+ }
580+
581+ this . #rateLimitFlushScheduled = true ;
582+ queueMicrotask ( ( ) => {
583+ this . #rateLimitFlushScheduled = false ;
584+ this . #updateRateLimitState( ) ;
585+ } ) ;
586+ }
587+
588+ #updateRateLimitState( ) : void {
589+ const previous = this . #rateLimitedInInterval;
590+ const shouldBeRateLimited = ! this . #isIntervalIgnored
591+ && this . #intervalCount >= this . #intervalCap
592+ && this . #queue. size > 0 ;
593+
594+ if ( shouldBeRateLimited !== previous ) {
595+ this . #rateLimitedInInterval = shouldBeRateLimited ;
596+ this . emit ( shouldBeRateLimited ? 'rateLimit' : 'rateLimitCleared' ) ;
597+ }
598+ }
599+
600+ /**
601+ Whether the queue is currently rate-limited due to intervalCap.
602+ */
603+ get isRateLimited ( ) : boolean {
604+ return this . #rateLimitedInInterval;
605+ }
512606}
513607
514608export type { Queue } from './queue.js' ;
0 commit comments