@@ -95,8 +95,6 @@ bool hot_standby_feedback;
95
95
static WalReceiverConn * wrconn = NULL ;
96
96
WalReceiverFunctionsType * WalReceiverFunctions = NULL ;
97
97
98
- #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
99
-
100
98
/*
101
99
* These variables are used similarly to openLogFile/SegNo,
102
100
* but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
@@ -116,6 +114,23 @@ static struct
116
114
XLogRecPtr Flush ; /* last byte + 1 flushed in the standby */
117
115
} LogstreamResult ;
118
116
117
+ /*
118
+ * Reasons to wake up and perform periodic tasks.
119
+ */
120
+ typedef enum WalRcvWakeupReason
121
+ {
122
+ WALRCV_WAKEUP_TERMINATE ,
123
+ WALRCV_WAKEUP_PING ,
124
+ WALRCV_WAKEUP_REPLY ,
125
+ WALRCV_WAKEUP_HSFEEDBACK ,
126
+ NUM_WALRCV_WAKEUPS
127
+ } WalRcvWakeupReason ;
128
+
129
+ /*
130
+ * Wake up times for periodic tasks.
131
+ */
132
+ static TimestampTz wakeup [NUM_WALRCV_WAKEUPS ];
133
+
119
134
static StringInfoData reply_message ;
120
135
static StringInfoData incoming_message ;
121
136
@@ -132,6 +147,7 @@ static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
132
147
static void XLogWalRcvSendReply (bool force , bool requestReply );
133
148
static void XLogWalRcvSendHSFeedback (bool immed );
134
149
static void ProcessWalSndrMessage (XLogRecPtr walEnd , TimestampTz sendTime );
150
+ static void WalRcvComputeNextWakeup (WalRcvWakeupReason reason , TimestampTz now );
135
151
136
152
/*
137
153
* Process any interrupts the walreceiver process may have received.
@@ -179,9 +195,7 @@ WalReceiverMain(void)
179
195
TimeLineID primaryTLI ;
180
196
bool first_stream ;
181
197
WalRcvData * walrcv = WalRcv ;
182
- TimestampTz last_recv_timestamp ;
183
- TimestampTz starttime ;
184
- bool ping_sent ;
198
+ TimestampTz now ;
185
199
char * err ;
186
200
char * sender_host = NULL ;
187
201
int sender_port = 0 ;
@@ -192,7 +206,7 @@ WalReceiverMain(void)
192
206
*/
193
207
Assert (walrcv != NULL );
194
208
195
- starttime = GetCurrentTimestamp ();
209
+ now = GetCurrentTimestamp ();
196
210
197
211
/*
198
212
* Mark walreceiver as running in shared memory.
@@ -248,7 +262,7 @@ WalReceiverMain(void)
248
262
249
263
/* Initialise to a sanish value */
250
264
walrcv -> lastMsgSendTime =
251
- walrcv -> lastMsgReceiptTime = walrcv -> latestWalEndTime = starttime ;
265
+ walrcv -> lastMsgReceiptTime = walrcv -> latestWalEndTime = now ;
252
266
253
267
/* Report the latch to use to awaken this process */
254
268
walrcv -> latch = & MyProc -> procLatch ;
@@ -414,9 +428,10 @@ WalReceiverMain(void)
414
428
initStringInfo (& reply_message );
415
429
initStringInfo (& incoming_message );
416
430
417
- /* Initialize the last recv timestamp */
418
- last_recv_timestamp = GetCurrentTimestamp ();
419
- ping_sent = false;
431
+ /* Initialize nap wakeup times. */
432
+ now = GetCurrentTimestamp ();
433
+ for (int i = 0 ; i < NUM_WALRCV_WAKEUPS ; ++ i )
434
+ WalRcvComputeNextWakeup (i , now );
420
435
421
436
/* Loop until end-of-streaming or error */
422
437
for (;;)
@@ -426,6 +441,8 @@ WalReceiverMain(void)
426
441
bool endofwal = false;
427
442
pgsocket wait_fd = PGINVALID_SOCKET ;
428
443
int rc ;
444
+ TimestampTz nextWakeup ;
445
+ int nap ;
429
446
430
447
/*
431
448
* Exit walreceiver if we're not in recovery. This should not
@@ -443,11 +460,15 @@ WalReceiverMain(void)
443
460
{
444
461
ConfigReloadPending = false;
445
462
ProcessConfigFile (PGC_SIGHUP );
463
+ now = GetCurrentTimestamp ();
464
+ for (int i = 0 ; i < NUM_WALRCV_WAKEUPS ; ++ i )
465
+ WalRcvComputeNextWakeup (i , now );
446
466
XLogWalRcvSendHSFeedback (true);
447
467
}
448
468
449
469
/* See if we can read data immediately */
450
470
len = walrcv_receive (wrconn , & buf , & wait_fd );
471
+ now = GetCurrentTimestamp ();
451
472
if (len != 0 )
452
473
{
453
474
/*
@@ -459,11 +480,12 @@ WalReceiverMain(void)
459
480
if (len > 0 )
460
481
{
461
482
/*
462
- * Something was received from primary, so reset
463
- * timeout
483
+ * Something was received from primary, so adjust
484
+ * the ping and terminate wakeup times.
464
485
*/
465
- last_recv_timestamp = GetCurrentTimestamp ();
466
- ping_sent = false;
486
+ WalRcvComputeNextWakeup (WALRCV_WAKEUP_TERMINATE ,
487
+ now );
488
+ WalRcvComputeNextWakeup (WALRCV_WAKEUP_PING , now );
467
489
XLogWalRcvProcessMsg (buf [0 ], & buf [1 ], len - 1 ,
468
490
startpointTLI );
469
491
}
@@ -480,6 +502,7 @@ WalReceiverMain(void)
480
502
break ;
481
503
}
482
504
len = walrcv_receive (wrconn , & buf , & wait_fd );
505
+ now = GetCurrentTimestamp ();
483
506
}
484
507
485
508
/* Let the primary know that we received some data. */
@@ -497,6 +520,20 @@ WalReceiverMain(void)
497
520
if (endofwal )
498
521
break ;
499
522
523
+ /* Find the soonest wakeup time, to limit our nap. */
524
+ nextWakeup = PG_INT64_MAX ;
525
+ for (int i = 0 ; i < NUM_WALRCV_WAKEUPS ; ++ i )
526
+ nextWakeup = Min (wakeup [i ], nextWakeup );
527
+
528
+ /*
529
+ * Calculate the nap time. WaitLatchOrSocket() doesn't accept
530
+ * timeouts longer than INT_MAX milliseconds, so we limit the
531
+ * result accordingly. Also, we round up to the next
532
+ * millisecond to avoid waking up too early and spinning until
533
+ * one of the wakeup times.
534
+ */
535
+ nap = (int ) Min (INT_MAX , Max (0 , (nextWakeup - now + 999 ) / 1000 ));
536
+
500
537
/*
501
538
* Ideally we would reuse a WaitEventSet object repeatedly
502
539
* here to avoid the overheads of WaitLatchOrSocket on epoll
@@ -513,8 +550,9 @@ WalReceiverMain(void)
513
550
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
514
551
WL_TIMEOUT | WL_LATCH_SET ,
515
552
wait_fd ,
516
- NAPTIME_PER_CYCLE ,
553
+ nap ,
517
554
WAIT_EVENT_WAL_RECEIVER_MAIN );
555
+ now = GetCurrentTimestamp ();
518
556
if (rc & WL_LATCH_SET )
519
557
{
520
558
ResetLatch (MyLatch );
@@ -550,34 +588,19 @@ WalReceiverMain(void)
550
588
* Check if time since last receive from primary has
551
589
* reached the configured limit.
552
590
*/
553
- if (wal_receiver_timeout > 0 )
554
- {
555
- TimestampTz now = GetCurrentTimestamp ();
556
- TimestampTz timeout ;
557
-
558
- timeout =
559
- TimestampTzPlusMilliseconds (last_recv_timestamp ,
560
- wal_receiver_timeout );
591
+ if (now >= wakeup [WALRCV_WAKEUP_TERMINATE ])
592
+ ereport (ERROR ,
593
+ (errcode (ERRCODE_CONNECTION_FAILURE ),
594
+ errmsg ("terminating walreceiver due to timeout" )));
561
595
562
- if (now >= timeout )
563
- ereport (ERROR ,
564
- (errcode (ERRCODE_CONNECTION_FAILURE ),
565
- errmsg ("terminating walreceiver due to timeout" )));
566
-
567
- /*
568
- * We didn't receive anything new, for half of
569
- * receiver replication timeout. Ping the server.
570
- */
571
- if (!ping_sent )
572
- {
573
- timeout = TimestampTzPlusMilliseconds (last_recv_timestamp ,
574
- (wal_receiver_timeout / 2 ));
575
- if (now >= timeout )
576
- {
577
- requestReply = true;
578
- ping_sent = true;
579
- }
580
- }
596
+ /*
597
+ * We didn't receive anything new, for half of receiver
598
+ * replication timeout. Ping the server.
599
+ */
600
+ if (now >= wakeup [WALRCV_WAKEUP_PING ])
601
+ {
602
+ requestReply = true;
603
+ wakeup [WALRCV_WAKEUP_PING ] = PG_INT64_MAX ;
581
604
}
582
605
583
606
XLogWalRcvSendReply (requestReply , requestReply );
@@ -1076,7 +1099,6 @@ XLogWalRcvSendReply(bool force, bool requestReply)
1076
1099
static XLogRecPtr writePtr = 0 ;
1077
1100
static XLogRecPtr flushPtr = 0 ;
1078
1101
XLogRecPtr applyPtr ;
1079
- static TimestampTz sendTime = 0 ;
1080
1102
TimestampTz now ;
1081
1103
1082
1104
/*
@@ -1101,10 +1123,11 @@ XLogWalRcvSendReply(bool force, bool requestReply)
1101
1123
if (!force
1102
1124
&& writePtr == LogstreamResult .Write
1103
1125
&& flushPtr == LogstreamResult .Flush
1104
- && !TimestampDifferenceExceeds (sendTime , now ,
1105
- wal_receiver_status_interval * 1000 ))
1126
+ && now < wakeup [WALRCV_WAKEUP_REPLY ])
1106
1127
return ;
1107
- sendTime = now ;
1128
+
1129
+ /* Make sure we wake up when it's time to send another reply. */
1130
+ WalRcvComputeNextWakeup (WALRCV_WAKEUP_REPLY , now );
1108
1131
1109
1132
/* Construct a new message */
1110
1133
writePtr = LogstreamResult .Write ;
@@ -1149,7 +1172,6 @@ XLogWalRcvSendHSFeedback(bool immed)
1149
1172
catalog_xmin_epoch ;
1150
1173
TransactionId xmin ,
1151
1174
catalog_xmin ;
1152
- static TimestampTz sendTime = 0 ;
1153
1175
1154
1176
/* initially true so we always send at least one feedback message */
1155
1177
static bool primary_has_standby_xmin = true;
@@ -1165,16 +1187,12 @@ XLogWalRcvSendHSFeedback(bool immed)
1165
1187
/* Get current timestamp. */
1166
1188
now = GetCurrentTimestamp ();
1167
1189
1168
- if (!immed )
1169
- {
1170
- /*
1171
- * Send feedback at most once per wal_receiver_status_interval.
1172
- */
1173
- if (!TimestampDifferenceExceeds (sendTime , now ,
1174
- wal_receiver_status_interval * 1000 ))
1175
- return ;
1176
- sendTime = now ;
1177
- }
1190
+ /* Send feedback at most once per wal_receiver_status_interval. */
1191
+ if (!immed && now < wakeup [WALRCV_WAKEUP_HSFEEDBACK ])
1192
+ return ;
1193
+
1194
+ /* Make sure we wake up when it's time to send feedback again. */
1195
+ WalRcvComputeNextWakeup (WALRCV_WAKEUP_HSFEEDBACK , now );
1178
1196
1179
1197
/*
1180
1198
* If Hot Standby is not yet accepting connections there is nothing to
@@ -1285,6 +1303,45 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1285
1303
}
1286
1304
}
1287
1305
1306
+ /*
1307
+ * Compute the next wakeup time for a given wakeup reason. Can be called to
1308
+ * initialize a wakeup time, to adjust it for the next wakeup, or to
1309
+ * reinitialize it when GUCs have changed.
1310
+ */
1311
+ static void
1312
+ WalRcvComputeNextWakeup (WalRcvWakeupReason reason , TimestampTz now )
1313
+ {
1314
+ switch (reason )
1315
+ {
1316
+ case WALRCV_WAKEUP_TERMINATE :
1317
+ if (wal_receiver_timeout <= 0 )
1318
+ wakeup [reason ] = PG_INT64_MAX ;
1319
+ else
1320
+ wakeup [reason ] = now + wal_receiver_timeout * INT64CONST (1000 );
1321
+ break ;
1322
+ case WALRCV_WAKEUP_PING :
1323
+ if (wal_receiver_timeout <= 0 )
1324
+ wakeup [reason ] = PG_INT64_MAX ;
1325
+ else
1326
+ wakeup [reason ] = now + (wal_receiver_timeout / 2 ) * INT64CONST (1000 );
1327
+ break ;
1328
+ case WALRCV_WAKEUP_HSFEEDBACK :
1329
+ if (!hot_standby_feedback || wal_receiver_status_interval <= 0 )
1330
+ wakeup [reason ] = PG_INT64_MAX ;
1331
+ else
1332
+ wakeup [reason ] = now + wal_receiver_status_interval * INT64CONST (1000000 );
1333
+ break ;
1334
+ case WALRCV_WAKEUP_REPLY :
1335
+ if (wal_receiver_status_interval <= 0 )
1336
+ wakeup [reason ] = PG_INT64_MAX ;
1337
+ else
1338
+ wakeup [reason ] = now + wal_receiver_status_interval * INT64CONST (1000000 );
1339
+ break ;
1340
+ default :
1341
+ break ;
1342
+ }
1343
+ }
1344
+
1288
1345
/*
1289
1346
* Wake up the walreceiver main loop.
1290
1347
*
0 commit comments