103
103
* until we reach either a notification from an uncommitted transaction or
104
104
* the head pointer's position.
105
105
*
106
- * 6. To avoid SLRU wraparound and limit disk space consumption, the tail
107
- * pointer needs to be advanced so that old pages can be truncated.
108
- * This is relatively expensive (notably, it requires an exclusive lock),
109
- * so we don't want to do it often. We make sending backends do this work
110
- * if they advanced the queue head into a new page, but only once every
111
- * QUEUE_CLEANUP_DELAY pages.
106
+ * 6. To limit disk space consumption, the tail pointer needs to be advanced
107
+ * so that old pages can be truncated. This is relatively expensive
108
+ * (notably, it requires an exclusive lock), so we don't want to do it
109
+ * often. We make sending backends do this work if they advanced the queue
110
+ * head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
112
111
*
113
112
* An application that listens on the same channel it notifies will get
114
113
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
120
119
* The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS)
121
120
* can be varied without affecting anything but performance. The maximum
122
121
* amount of notification data that can be queued at one time is determined
123
- * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below .
122
+ * by max_notify_queue_pages GUC .
124
123
*-------------------------------------------------------------------------
125
124
*/
126
125
@@ -312,23 +311,8 @@ static SlruCtlData NotifyCtlData;
312
311
313
312
#define NotifyCtl (&NotifyCtlData)
314
313
#define QUEUE_PAGESIZE BLCKSZ
315
- #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
316
314
317
- /*
318
- * Use segments 0000 through FFFF. Each contains SLRU_PAGES_PER_SEGMENT pages
319
- * which gives us the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
320
- * We could use as many segments as SlruScanDirectory() allows, but this gives
321
- * us so much space already that it doesn't seem worth the trouble.
322
- *
323
- * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
324
- * pages, because more than that would confuse slru.c into thinking there
325
- * was a wraparound condition. With the default BLCKSZ this means there
326
- * can be up to 8GB of queued-and-not-read data.
327
- *
328
- * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
329
- * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
330
- */
331
- #define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
315
+ #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
332
316
333
317
/*
334
318
* listenChannels identifies the channels we are actually listening to
@@ -439,12 +423,15 @@ static bool amRegisteredListener = false;
439
423
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
440
424
static bool tryAdvanceTail = false;
441
425
442
- /* GUC parameter */
426
+ /* GUC parameters */
443
427
bool Trace_notify = false;
444
428
429
+ /* For 8 KB pages this gives 8 GB of disk space */
430
+ int max_notify_queue_pages = 1048576 ;
431
+
445
432
/* local function prototypes */
446
- static int64 asyncQueuePageDiff (int64 p , int64 q );
447
- static bool asyncQueuePagePrecedes (int64 p , int64 q );
433
+ static inline int64 asyncQueuePageDiff (int64 p , int64 q );
434
+ static inline bool asyncQueuePagePrecedes (int64 p , int64 q );
448
435
static void queue_listen (ListenActionKind action , const char * channel );
449
436
static void Async_UnlistenOnExit (int code , Datum arg );
450
437
static void Exec_ListenPreCommit (void );
@@ -474,39 +461,23 @@ static int notification_match(const void *key1, const void *key2, Size keysize);
474
461
static void ClearPendingActionsAndNotifies (void );
475
462
476
463
/*
477
- * Compute the difference between two queue page numbers (i.e., p - q),
478
- * accounting for wraparound.
464
+ * Compute the difference between two queue page numbers.
465
+ * Previously this function accounted for a wraparound.
479
466
*/
480
- static int64
467
+ static inline int64
481
468
asyncQueuePageDiff (int64 p , int64 q )
482
469
{
483
- int64 diff ;
484
-
485
- /*
486
- * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be
487
- * in the range 0..QUEUE_MAX_PAGE.
488
- */
489
- Assert (p >= 0 && p <= QUEUE_MAX_PAGE );
490
- Assert (q >= 0 && q <= QUEUE_MAX_PAGE );
491
-
492
- diff = p - q ;
493
- if (diff >= ((QUEUE_MAX_PAGE + 1 ) / 2 ))
494
- diff -= QUEUE_MAX_PAGE + 1 ;
495
- else if (diff < - ((QUEUE_MAX_PAGE + 1 ) / 2 ))
496
- diff += QUEUE_MAX_PAGE + 1 ;
497
- return diff ;
470
+ return p - q ;
498
471
}
499
472
500
473
/*
501
- * Is p < q, accounting for wraparound?
502
- *
503
- * Since asyncQueueIsFull() blocks creation of a page that could precede any
504
- * extant page, we need not assess entries within a page.
474
+ * Determines whether p precedes q.
475
+ * Previously this function accounted for a wraparound.
505
476
*/
506
- static bool
477
+ static inline bool
507
478
asyncQueuePagePrecedes (int64 p , int64 q )
508
479
{
509
- return asyncQueuePageDiff ( p , q ) < 0 ;
480
+ return p < q ;
510
481
}
511
482
512
483
/*
@@ -566,12 +537,13 @@ AsyncShmemInit(void)
566
537
}
567
538
568
539
/*
569
- * Set up SLRU management of the pg_notify data.
540
+ * Set up SLRU management of the pg_notify data. Note that long segment
541
+ * names are used in order to avoid wraparound.
570
542
*/
571
543
NotifyCtl -> PagePrecedes = asyncQueuePagePrecedes ;
572
544
SimpleLruInit (NotifyCtl , "Notify" , NUM_NOTIFY_BUFFERS , 0 ,
573
545
NotifySLRULock , "pg_notify" , LWTRANCHE_NOTIFY_BUFFER ,
574
- SYNC_HANDLER_NONE , false );
546
+ SYNC_HANDLER_NONE , true );
575
547
576
548
if (!found )
577
549
{
@@ -1305,27 +1277,11 @@ asyncQueueUnregister(void)
1305
1277
static bool
1306
1278
asyncQueueIsFull (void )
1307
1279
{
1308
- int nexthead ;
1309
- int boundary ;
1280
+ int headPage = QUEUE_POS_PAGE (QUEUE_HEAD );
1281
+ int tailPage = QUEUE_POS_PAGE (QUEUE_TAIL );
1282
+ int occupied = headPage - tailPage ;
1310
1283
1311
- /*
1312
- * The queue is full if creating a new head page would create a page that
1313
- * logically precedes the current global tail pointer, ie, the head
1314
- * pointer would wrap around compared to the tail. We cannot create such
1315
- * a head page for fear of confusing slru.c. For safety we round the tail
1316
- * pointer back to a segment boundary (truncation logic in
1317
- * asyncQueueAdvanceTail does not do this, so doing it here is optional).
1318
- *
1319
- * Note that this test is *not* dependent on how much space there is on
1320
- * the current head page. This is necessary because asyncQueueAddEntries
1321
- * might try to create the next head page in any case.
1322
- */
1323
- nexthead = QUEUE_POS_PAGE (QUEUE_HEAD ) + 1 ;
1324
- if (nexthead > QUEUE_MAX_PAGE )
1325
- nexthead = 0 ; /* wrap around */
1326
- boundary = QUEUE_STOP_PAGE ;
1327
- boundary -= boundary % SLRU_PAGES_PER_SEGMENT ;
1328
- return asyncQueuePagePrecedes (nexthead , boundary );
1284
+ return occupied >= max_notify_queue_pages ;
1329
1285
}
1330
1286
1331
1287
/*
@@ -1355,8 +1311,6 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
1355
1311
if (offset + QUEUEALIGN (AsyncQueueEntryEmptySize ) > QUEUE_PAGESIZE )
1356
1312
{
1357
1313
pageno ++ ;
1358
- if (pageno > QUEUE_MAX_PAGE )
1359
- pageno = 0 ; /* wrap around */
1360
1314
offset = 0 ;
1361
1315
pageJump = true;
1362
1316
}
@@ -1433,9 +1387,6 @@ asyncQueueAddEntries(ListCell *nextNotify)
1433
1387
* If this is the first write since the postmaster started, we need to
1434
1388
* initialize the first page of the async SLRU. Otherwise, the current
1435
1389
* page should be initialized already, so just fetch it.
1436
- *
1437
- * (We could also take the first path when the SLRU position has just
1438
- * wrapped around, but re-zeroing the page is harmless in that case.)
1439
1390
*/
1440
1391
pageno = QUEUE_POS_PAGE (queue_head );
1441
1392
if (QUEUE_POS_IS_ZERO (queue_head ))
@@ -1548,20 +1499,12 @@ asyncQueueUsage(void)
1548
1499
{
1549
1500
int headPage = QUEUE_POS_PAGE (QUEUE_HEAD );
1550
1501
int tailPage = QUEUE_POS_PAGE (QUEUE_TAIL );
1551
- int occupied ;
1552
-
1553
- occupied = headPage - tailPage ;
1502
+ int occupied = headPage - tailPage ;
1554
1503
1555
1504
if (occupied == 0 )
1556
1505
return (double ) 0 ; /* fast exit for common case */
1557
1506
1558
- if (occupied < 0 )
1559
- {
1560
- /* head has wrapped around, tail not yet */
1561
- occupied += QUEUE_MAX_PAGE + 1 ;
1562
- }
1563
-
1564
- return (double ) occupied / (double ) ((QUEUE_MAX_PAGE + 1 ) / 2 );
1507
+ return (double ) occupied / (double ) max_notify_queue_pages ;
1565
1508
}
1566
1509
1567
1510
/*
@@ -2209,11 +2152,6 @@ asyncQueueAdvanceTail(void)
2209
2152
*/
2210
2153
SimpleLruTruncate (NotifyCtl , newtailpage );
2211
2154
2212
- /*
2213
- * Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict
2214
- * for the segment immediately prior to the old tail, allowing fresh
2215
- * data into that segment.
2216
- */
2217
2155
LWLockAcquire (NotifyQueueLock , LW_EXCLUSIVE );
2218
2156
QUEUE_STOP_PAGE = newtailpage ;
2219
2157
LWLockRelease (NotifyQueueLock );
0 commit comments