Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 05a7be9

Browse filesBrowse files
committed
Suppress useless wakeups in walreceiver.
Instead of waking up 10 times per second to check for various timeout conditions, keep track of when we next have periodic work to do. Author: Thomas Munro <thomas.munro@gmail.com> Author: Nathan Bossart <nathandbossart@gmail.com> Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org> Discussion: https://postgr.es/m/CA%2BhUKGJGhX4r2LPUE3Oy9BX71Eum6PBcS8L3sJpScR9oKaTVaA%40mail.gmail.com
1 parent bd95816 commit 05a7be9
Copy full SHA for 05a7be9

File tree

2 files changed

+115
-57
lines changed
Filter options

2 files changed

+115
-57
lines changed

‎src/backend/replication/walreceiver.c

Copy file name to clipboardExpand all lines: src/backend/replication/walreceiver.c
+114-57
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,6 @@ bool hot_standby_feedback;
9595
static WalReceiverConn *wrconn = NULL;
9696
WalReceiverFunctionsType *WalReceiverFunctions = NULL;
9797

98-
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
99-
10098
/*
10199
* These variables are used similarly to openLogFile/SegNo,
102100
* but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
@@ -116,6 +114,23 @@ static struct
116114
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
117115
} LogstreamResult;
118116

117+
/*
118+
* Reasons to wake up and perform periodic tasks.
119+
*/
120+
typedef enum WalRcvWakeupReason
121+
{
122+
WALRCV_WAKEUP_TERMINATE,
123+
WALRCV_WAKEUP_PING,
124+
WALRCV_WAKEUP_REPLY,
125+
WALRCV_WAKEUP_HSFEEDBACK,
126+
NUM_WALRCV_WAKEUPS
127+
} WalRcvWakeupReason;
128+
129+
/*
130+
* Wake up times for periodic tasks.
131+
*/
132+
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
133+
119134
static StringInfoData reply_message;
120135
static StringInfoData incoming_message;
121136

@@ -132,6 +147,7 @@ static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
132147
static void XLogWalRcvSendReply(bool force, bool requestReply);
133148
static void XLogWalRcvSendHSFeedback(bool immed);
134149
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
150+
static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
135151

136152
/*
137153
* Process any interrupts the walreceiver process may have received.
@@ -179,9 +195,7 @@ WalReceiverMain(void)
179195
TimeLineID primaryTLI;
180196
bool first_stream;
181197
WalRcvData *walrcv = WalRcv;
182-
TimestampTz last_recv_timestamp;
183-
TimestampTz starttime;
184-
bool ping_sent;
198+
TimestampTz now;
185199
char *err;
186200
char *sender_host = NULL;
187201
int sender_port = 0;
@@ -192,7 +206,7 @@ WalReceiverMain(void)
192206
*/
193207
Assert(walrcv != NULL);
194208

195-
starttime = GetCurrentTimestamp();
209+
now = GetCurrentTimestamp();
196210

197211
/*
198212
* Mark walreceiver as running in shared memory.
@@ -248,7 +262,7 @@ WalReceiverMain(void)
248262

249263
/* Initialise to a sanish value */
250264
walrcv->lastMsgSendTime =
251-
walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = starttime;
265+
walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
252266

253267
/* Report the latch to use to awaken this process */
254268
walrcv->latch = &MyProc->procLatch;
@@ -414,9 +428,10 @@ WalReceiverMain(void)
414428
initStringInfo(&reply_message);
415429
initStringInfo(&incoming_message);
416430

417-
/* Initialize the last recv timestamp */
418-
last_recv_timestamp = GetCurrentTimestamp();
419-
ping_sent = false;
431+
/* Initialize nap wakeup times. */
432+
now = GetCurrentTimestamp();
433+
for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
434+
WalRcvComputeNextWakeup(i, now);
420435

421436
/* Loop until end-of-streaming or error */
422437
for (;;)
@@ -426,6 +441,8 @@ WalReceiverMain(void)
426441
bool endofwal = false;
427442
pgsocket wait_fd = PGINVALID_SOCKET;
428443
int rc;
444+
TimestampTz nextWakeup;
445+
int nap;
429446

430447
/*
431448
* Exit walreceiver if we're not in recovery. This should not
@@ -443,11 +460,15 @@ WalReceiverMain(void)
443460
{
444461
ConfigReloadPending = false;
445462
ProcessConfigFile(PGC_SIGHUP);
463+
now = GetCurrentTimestamp();
464+
for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
465+
WalRcvComputeNextWakeup(i, now);
446466
XLogWalRcvSendHSFeedback(true);
447467
}
448468

449469
/* See if we can read data immediately */
450470
len = walrcv_receive(wrconn, &buf, &wait_fd);
471+
now = GetCurrentTimestamp();
451472
if (len != 0)
452473
{
453474
/*
@@ -459,11 +480,12 @@ WalReceiverMain(void)
459480
if (len > 0)
460481
{
461482
/*
462-
* Something was received from primary, so reset
463-
* timeout
483+
* Something was received from primary, so adjust
484+
* the ping and terminate wakeup times.
464485
*/
465-
last_recv_timestamp = GetCurrentTimestamp();
466-
ping_sent = false;
486+
WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
487+
now);
488+
WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
467489
XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
468490
startpointTLI);
469491
}
@@ -480,6 +502,7 @@ WalReceiverMain(void)
480502
break;
481503
}
482504
len = walrcv_receive(wrconn, &buf, &wait_fd);
505+
now = GetCurrentTimestamp();
483506
}
484507

485508
/* Let the primary know that we received some data. */
@@ -497,6 +520,20 @@ WalReceiverMain(void)
497520
if (endofwal)
498521
break;
499522

523+
/* Find the soonest wakeup time, to limit our nap. */
524+
nextWakeup = PG_INT64_MAX;
525+
for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
526+
nextWakeup = Min(wakeup[i], nextWakeup);
527+
528+
/*
529+
* Calculate the nap time. WaitLatchOrSocket() doesn't accept
530+
* timeouts longer than INT_MAX milliseconds, so we limit the
531+
* result accordingly. Also, we round up to the next
532+
* millisecond to avoid waking up too early and spinning until
533+
* one of the wakeup times.
534+
*/
535+
nap = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
536+
500537
/*
501538
* Ideally we would reuse a WaitEventSet object repeatedly
502539
* here to avoid the overheads of WaitLatchOrSocket on epoll
@@ -513,8 +550,9 @@ WalReceiverMain(void)
513550
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
514551
WL_TIMEOUT | WL_LATCH_SET,
515552
wait_fd,
516-
NAPTIME_PER_CYCLE,
553+
nap,
517554
WAIT_EVENT_WAL_RECEIVER_MAIN);
555+
now = GetCurrentTimestamp();
518556
if (rc & WL_LATCH_SET)
519557
{
520558
ResetLatch(MyLatch);
@@ -550,34 +588,19 @@ WalReceiverMain(void)
550588
* Check if time since last receive from primary has
551589
* reached the configured limit.
552590
*/
553-
if (wal_receiver_timeout > 0)
554-
{
555-
TimestampTz now = GetCurrentTimestamp();
556-
TimestampTz timeout;
557-
558-
timeout =
559-
TimestampTzPlusMilliseconds(last_recv_timestamp,
560-
wal_receiver_timeout);
591+
if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
592+
ereport(ERROR,
593+
(errcode(ERRCODE_CONNECTION_FAILURE),
594+
errmsg("terminating walreceiver due to timeout")));
561595

562-
if (now >= timeout)
563-
ereport(ERROR,
564-
(errcode(ERRCODE_CONNECTION_FAILURE),
565-
errmsg("terminating walreceiver due to timeout")));
566-
567-
/*
568-
* We didn't receive anything new, for half of
569-
* receiver replication timeout. Ping the server.
570-
*/
571-
if (!ping_sent)
572-
{
573-
timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
574-
(wal_receiver_timeout / 2));
575-
if (now >= timeout)
576-
{
577-
requestReply = true;
578-
ping_sent = true;
579-
}
580-
}
596+
/*
597+
* We didn't receive anything new, for half of receiver
598+
* replication timeout. Ping the server.
599+
*/
600+
if (now >= wakeup[WALRCV_WAKEUP_PING])
601+
{
602+
requestReply = true;
603+
wakeup[WALRCV_WAKEUP_PING] = PG_INT64_MAX;
581604
}
582605

583606
XLogWalRcvSendReply(requestReply, requestReply);
@@ -1076,7 +1099,6 @@ XLogWalRcvSendReply(bool force, bool requestReply)
10761099
static XLogRecPtr writePtr = 0;
10771100
static XLogRecPtr flushPtr = 0;
10781101
XLogRecPtr applyPtr;
1079-
static TimestampTz sendTime = 0;
10801102
TimestampTz now;
10811103

10821104
/*
@@ -1101,10 +1123,11 @@ XLogWalRcvSendReply(bool force, bool requestReply)
11011123
if (!force
11021124
&& writePtr == LogstreamResult.Write
11031125
&& flushPtr == LogstreamResult.Flush
1104-
&& !TimestampDifferenceExceeds(sendTime, now,
1105-
wal_receiver_status_interval * 1000))
1126+
&& now < wakeup[WALRCV_WAKEUP_REPLY])
11061127
return;
1107-
sendTime = now;
1128+
1129+
/* Make sure we wake up when it's time to send another reply. */
1130+
WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
11081131

11091132
/* Construct a new message */
11101133
writePtr = LogstreamResult.Write;
@@ -1149,7 +1172,6 @@ XLogWalRcvSendHSFeedback(bool immed)
11491172
catalog_xmin_epoch;
11501173
TransactionId xmin,
11511174
catalog_xmin;
1152-
static TimestampTz sendTime = 0;
11531175

11541176
/* initially true so we always send at least one feedback message */
11551177
static bool primary_has_standby_xmin = true;
@@ -1165,16 +1187,12 @@ XLogWalRcvSendHSFeedback(bool immed)
11651187
/* Get current timestamp. */
11661188
now = GetCurrentTimestamp();
11671189

1168-
if (!immed)
1169-
{
1170-
/*
1171-
* Send feedback at most once per wal_receiver_status_interval.
1172-
*/
1173-
if (!TimestampDifferenceExceeds(sendTime, now,
1174-
wal_receiver_status_interval * 1000))
1175-
return;
1176-
sendTime = now;
1177-
}
1190+
/* Send feedback at most once per wal_receiver_status_interval. */
1191+
if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1192+
return;
1193+
1194+
/* Make sure we wake up when it's time to send feedback again. */
1195+
WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
11781196

11791197
/*
11801198
* If Hot Standby is not yet accepting connections there is nothing to
@@ -1285,6 +1303,45 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
12851303
}
12861304
}
12871305

1306+
/*
1307+
* Compute the next wakeup time for a given wakeup reason. Can be called to
1308+
* initialize a wakeup time, to adjust it for the next wakeup, or to
1309+
* reinitialize it when GUCs have changed.
1310+
*/
1311+
static void
1312+
WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
1313+
{
1314+
switch (reason)
1315+
{
1316+
case WALRCV_WAKEUP_TERMINATE:
1317+
if (wal_receiver_timeout <= 0)
1318+
wakeup[reason] = PG_INT64_MAX;
1319+
else
1320+
wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
1321+
break;
1322+
case WALRCV_WAKEUP_PING:
1323+
if (wal_receiver_timeout <= 0)
1324+
wakeup[reason] = PG_INT64_MAX;
1325+
else
1326+
wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
1327+
break;
1328+
case WALRCV_WAKEUP_HSFEEDBACK:
1329+
if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
1330+
wakeup[reason] = PG_INT64_MAX;
1331+
else
1332+
wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
1333+
break;
1334+
case WALRCV_WAKEUP_REPLY:
1335+
if (wal_receiver_status_interval <= 0)
1336+
wakeup[reason] = PG_INT64_MAX;
1337+
else
1338+
wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
1339+
break;
1340+
default:
1341+
break;
1342+
}
1343+
}
1344+
12881345
/*
12891346
* Wake up the walreceiver main loop.
12901347
*

‎src/tools/pgindent/typedefs.list

Copy file name to clipboardExpand all lines: src/tools/pgindent/typedefs.list
+1
Original file line numberDiff line numberDiff line change
@@ -2927,6 +2927,7 @@ WALInsertLock
29272927
WALInsertLockPadded
29282928
WALOpenSegment
29292929
WALReadError
2930+
WalRcvWakeupReason
29302931
WALSegmentCloseCB
29312932
WALSegmentContext
29322933
WALSegmentOpenCB

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.