Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 0fdab27

Browse filesBrowse files
committed
Allow logical decoding on standbys
Unsurprisingly, this requires wal_level = logical to be set on the primary and standby. The infrastructure added in 2666975 ensures that slots are invalidated if the primary's wal_level is lowered. Creating a slot on a standby waits for a xl_running_xact record to be processed. If the primary is idle (and thus not emitting xl_running_xact records), that can take a while. To make that faster, this commit also introduces the pg_log_standby_snapshot() function. By executing it on the primary, completion of slot creation on the standby can be accelerated. Note that logical decoding on a standby does not itself enforce that required catalog rows are not removed. The user has to use physical replication slots + hot_standby_feedback or other measures to prevent that. If catalog rows required for a slot are removed, the slot is invalidated. See 6af1793 for an overall design of logical decoding on a standby. Bumps catversion, for the addition of the pg_log_standby_snapshot() function. Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Author: Andres Freund <andres@anarazel.de> (in an older version) Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version) Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: FabrÌzio de Royes Mello <fabriziomello@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-By: Robert Haas <robertmhaas@gmail.com>
1 parent e101dfa commit 0fdab27
Copy full SHA for 0fdab27

File tree

12 files changed

+202
-61
lines changed
Filter options

12 files changed

+202
-61
lines changed

‎doc/src/sgml/func.sgml

Copy file name to clipboardExpand all lines: doc/src/sgml/func.sgml
+15
Original file line numberDiff line numberDiff line change
@@ -27074,6 +27074,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
2707427074
prepared with <xref linkend="sql-prepare-transaction"/>.
2707527075
</para></entry>
2707627076
</row>
27077+
<row>
27078+
<entry role="func_table_entry"><para role="func_signature">
27079+
<indexterm>
27080+
<primary>pg_log_standby_snapshot</primary>
27081+
</indexterm>
27082+
<function>pg_log_standby_snapshot</function> ()
27083+
<returnvalue>pg_lsn</returnvalue>
27084+
</para>
27085+
<para>
27086+
Take a snapshot of running transactions and write it to WAL, without
27087+
having to wait bgwriter or checkpointer to log one. This is useful for
27088+
logical decoding on standby, as logical slot creation has to wait
27089+
until such a record is replayed on the standby.
27090+
</para></entry>
27091+
</row>
2707727092
</tbody>
2707827093
</tgroup>
2707927094
</table>

‎doc/src/sgml/logicaldecoding.sgml

Copy file name to clipboardExpand all lines: doc/src/sgml/logicaldecoding.sgml
+27
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,33 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
316316
may consume changes from a slot at any given time.
317317
</para>
318318

319+
<para>
320+
A logical replication slot can also be created on a hot standby. To prevent
321+
<command>VACUUM</command> from removing required rows from the system
322+
catalogs, <varname>hot_standby_feedback</varname> should be set on the
323+
standby. In spite of that, if any required rows get removed, the slot gets
324+
invalidated. It's highly recommended to use a physical slot between the primary
325+
and the standby. Otherwise, hot_standby_feedback will work, but only while the
326+
connection is alive (for example a node restart would break it). Then, the
327+
primary may delete system catalog rows that could be needed by the logical
328+
decoding on the standby (as it does not know about the catalog_xmin on the
329+
standby). Existing logical slots on standby also get invalidated if wal_level
330+
on primary is reduced to less than 'logical'. This is done as soon as the
331+
standby detects such a change in the WAL stream. It means, that for walsenders
332+
that are lagging (if any), some WAL records up to the wal_level parameter change
333+
on the primary won't be decoded.
334+
</para>
335+
336+
<para>
337+
Creation of a logical slot requires information about all the currently
338+
running transactions. On the primary, this information is available
339+
directly, but on a standby, this information has to be obtained from
340+
primary. Thus, slot creation may need to wait for some activity to happen
341+
on the primary. If the primary is idle, creating a logical slot on
342+
standby may take noticeable time. This can be sped up by calling the
343+
<function>pg_log_standby_snapshot</function> on the primary.
344+
</para>
345+
319346
<caution>
320347
<para>
321348
Replication slots persist across crashes and know nothing about the state

‎src/backend/access/transam/xlog.c

Copy file name to clipboardExpand all lines: src/backend/access/transam/xlog.c
+11
Original file line numberDiff line numberDiff line change
@@ -4469,6 +4469,17 @@ LocalProcessControlFile(bool reset)
44694469
ReadControlFile();
44704470
}
44714471

4472+
/*
4473+
* Get the wal_level from the control file. For a standby, this value should be
4474+
* considered as its active wal_level, because it may be different from what
4475+
* was originally configured on standby.
4476+
*/
4477+
WalLevel
4478+
GetActiveWalLevelOnStandby(void)
4479+
{
4480+
return ControlFile->wal_level;
4481+
}
4482+
44724483
/*
44734484
* Initialization of shared memory for XLOG
44744485
*/

‎src/backend/access/transam/xlogfuncs.c

Copy file name to clipboardExpand all lines: src/backend/access/transam/xlogfuncs.c
+31
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "storage/fd.h"
3232
#include "storage/ipc.h"
3333
#include "storage/smgr.h"
34+
#include "storage/standby.h"
3435
#include "utils/builtins.h"
3536
#include "utils/guc.h"
3637
#include "utils/memutils.h"
@@ -196,6 +197,36 @@ pg_switch_wal(PG_FUNCTION_ARGS)
196197
PG_RETURN_LSN(switchpoint);
197198
}
198199

200+
/*
201+
* pg_log_standby_snapshot: call LogStandbySnapshot()
202+
*
203+
* Permission checking for this function is managed through the normal
204+
* GRANT system.
205+
*/
206+
Datum
207+
pg_log_standby_snapshot(PG_FUNCTION_ARGS)
208+
{
209+
XLogRecPtr recptr;
210+
211+
if (RecoveryInProgress())
212+
ereport(ERROR,
213+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
214+
errmsg("recovery is in progress"),
215+
errhint("pg_log_standby_snapshot() cannot be executed during recovery.")));
216+
217+
if (!XLogStandbyInfoActive())
218+
ereport(ERROR,
219+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
220+
errmsg("pg_log_standby_snapshot() can only be used if wal_level >= replica")));
221+
222+
recptr = LogStandbySnapshot();
223+
224+
/*
225+
* As a convenience, return the WAL location of the last inserted record
226+
*/
227+
PG_RETURN_LSN(recptr);
228+
}
229+
199230
/*
200231
* pg_create_restore_point: a named point for restore
201232
*

‎src/backend/catalog/system_functions.sql

Copy file name to clipboardExpand all lines: src/backend/catalog/system_functions.sql
+2
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) FROM public;
644644

645645
REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public;
646646

647+
REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public;
648+
647649
REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public;
648650

649651
REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public;

‎src/backend/replication/logical/decode.c

Copy file name to clipboardExpand all lines: src/backend/replication/logical/decode.c
+29-1
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,39 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
152152
* can restart from there.
153153
*/
154154
break;
155+
case XLOG_PARAMETER_CHANGE:
156+
{
157+
xl_parameter_change *xlrec =
158+
(xl_parameter_change *) XLogRecGetData(buf->record);
159+
160+
/*
161+
* If wal_level on the primary is reduced to less than
162+
* logical, we want to prevent existing logical slots from
163+
* being used. Existing logical slots on the standby get
164+
* invalidated when this WAL record is replayed; and further,
165+
* slot creation fails when wal_level is not sufficient; but
166+
* all these operations are not synchronized, so a logical
167+
* slot may creep in while the wal_level is being
168+
* reduced. Hence this extra check.
169+
*/
170+
if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
171+
{
172+
/*
173+
* This can occur only on a standby, as a primary would
174+
* not allow to restart after changing wal_level < logical
175+
* if there is pre-existing logical slot.
176+
*/
177+
Assert(RecoveryInProgress());
178+
ereport(ERROR,
179+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
180+
errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary")));
181+
}
182+
break;
183+
}
155184
case XLOG_NOOP:
156185
case XLOG_NEXTOID:
157186
case XLOG_SWITCH:
158187
case XLOG_BACKUP_END:
159-
case XLOG_PARAMETER_CHANGE:
160188
case XLOG_RESTORE_POINT:
161189
case XLOG_FPW_CHANGE:
162190
case XLOG_FPI_FOR_HINT:

‎src/backend/replication/logical/logical.c

Copy file name to clipboardExpand all lines: src/backend/replication/logical/logical.c
+20-16
Original file line numberDiff line numberDiff line change
@@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void)
124124
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
125125
errmsg("logical decoding requires a database connection")));
126126

127-
/* ----
128-
* TODO: We got to change that someday soon...
129-
*
130-
* There's basically three things missing to allow this:
131-
* 1) We need to be able to correctly and quickly identify the timeline a
132-
* LSN belongs to
133-
* 2) We need to force hot_standby_feedback to be enabled at all times so
134-
* the primary cannot remove rows we need.
135-
* 3) support dropping replication slots referring to a database, in
136-
* dbase_redo. There can't be any active ones due to HS recovery
137-
* conflicts, so that should be relatively easy.
138-
* ----
139-
*/
140127
if (RecoveryInProgress())
141-
ereport(ERROR,
142-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
143-
errmsg("logical decoding cannot be used while in recovery")));
128+
{
129+
/*
130+
* This check may have race conditions, but whenever
131+
* XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
132+
* verify that there are no existing logical replication slots. And to
133+
* avoid races around creating a new slot,
134+
* CheckLogicalDecodingRequirements() is called once before creating
135+
* the slot, and once when logical decoding is initially starting up.
136+
*/
137+
if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
138+
ereport(ERROR,
139+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
140+
errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary")));
141+
}
144142
}
145143

146144
/*
@@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin,
342340
LogicalDecodingContext *ctx;
343341
MemoryContext old_context;
344342

343+
/*
344+
* On a standby, this check is also required while creating the
345+
* slot. Check the comments in the function.
346+
*/
347+
CheckLogicalDecodingRequirements();
348+
345349
/* shorter lines... */
346350
slot = MyReplicationSlot;
347351

‎src/backend/replication/slot.c

Copy file name to clipboardExpand all lines: src/backend/replication/slot.c
+30-27
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
#include "access/transam.h"
4343
#include "access/xlog_internal.h"
44+
#include "access/xlogrecovery.h"
4445
#include "common/file_utils.h"
4546
#include "common/string.h"
4647
#include "miscadmin.h"
@@ -1192,37 +1193,28 @@ ReplicationSlotReserveWal(void)
11921193
/*
11931194
* For logical slots log a standby snapshot and start logical decoding
11941195
* at exactly that position. That allows the slot to start up more
1195-
* quickly.
1196+
* quickly. But on a standby we cannot do WAL writes, so just use the
1197+
* replay pointer; effectively, an attempt to create a logical slot on
1198+
* standby will cause it to wait for an xl_running_xact record to be
1199+
* logged independently on the primary, so that a snapshot can be
1200+
* built using the record.
11961201
*
1197-
* That's not needed (or indeed helpful) for physical slots as they'll
1198-
* start replay at the last logged checkpoint anyway. Instead return
1199-
* the location of the last redo LSN. While that slightly increases
1200-
* the chance that we have to retry, it's where a base backup has to
1201-
* start replay at.
1202+
* None of this is needed (or indeed helpful) for physical slots as
1203+
* they'll start replay at the last logged checkpoint anyway. Instead
1204+
* return the location of the last redo LSN. While that slightly
1205+
* increases the chance that we have to retry, it's where a base
1206+
* backup has to start replay at.
12021207
*/
1203-
if (!RecoveryInProgress() && SlotIsLogical(slot))
1204-
{
1205-
XLogRecPtr flushptr;
1206-
1207-
/* start at current insert position */
1208+
if (SlotIsPhysical(slot))
1209+
restart_lsn = GetRedoRecPtr();
1210+
else if (RecoveryInProgress())
1211+
restart_lsn = GetXLogReplayRecPtr(NULL);
1212+
else
12081213
restart_lsn = GetXLogInsertRecPtr();
1209-
SpinLockAcquire(&slot->mutex);
1210-
slot->data.restart_lsn = restart_lsn;
1211-
SpinLockRelease(&slot->mutex);
1212-
1213-
/* make sure we have enough information to start */
1214-
flushptr = LogStandbySnapshot();
12151214

1216-
/* and make sure it's fsynced to disk */
1217-
XLogFlush(flushptr);
1218-
}
1219-
else
1220-
{
1221-
restart_lsn = GetRedoRecPtr();
1222-
SpinLockAcquire(&slot->mutex);
1223-
slot->data.restart_lsn = restart_lsn;
1224-
SpinLockRelease(&slot->mutex);
1225-
}
1215+
SpinLockAcquire(&slot->mutex);
1216+
slot->data.restart_lsn = restart_lsn;
1217+
SpinLockRelease(&slot->mutex);
12261218

12271219
/* prevent WAL removal as fast as possible */
12281220
ReplicationSlotsComputeRequiredLSN();
@@ -1238,6 +1230,17 @@ ReplicationSlotReserveWal(void)
12381230
if (XLogGetLastRemovedSegno() < segno)
12391231
break;
12401232
}
1233+
1234+
if (!RecoveryInProgress() && SlotIsLogical(slot))
1235+
{
1236+
XLogRecPtr flushptr;
1237+
1238+
/* make sure we have enough information to start */
1239+
flushptr = LogStandbySnapshot();
1240+
1241+
/* and make sure it's fsynced to disk */
1242+
XLogFlush(flushptr);
1243+
}
12411244
}
12421245

12431246
/*

‎src/backend/replication/walsender.c

Copy file name to clipboardExpand all lines: src/backend/replication/walsender.c
+32-16
Original file line numberDiff line numberDiff line change
@@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
906906
int count;
907907
WALReadError errinfo;
908908
XLogSegNo segno;
909-
TimeLineID currTLI = GetWALInsertionTimeLine();
909+
TimeLineID currTLI;
910+
911+
/*
912+
* Make sure we have enough WAL available before retrieving the current
913+
* timeline. This is needed to determine am_cascading_walsender accurately
914+
* which is needed to determine the current timeline.
915+
*/
916+
flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
910917

911918
/*
912-
* Since logical decoding is only permitted on a primary server, we know
913-
* that the current timeline ID can't be changing any more. If we did this
914-
* on a standby, we'd have to worry about the values we compute here
915-
* becoming invalid due to a promotion or timeline change.
919+
* Since logical decoding is also permitted on a standby server, we need
920+
* to check if the server is in recovery to decide how to get the current
921+
* timeline ID (so that it also cover the promotion or timeline change
922+
* cases).
916923
*/
924+
am_cascading_walsender = RecoveryInProgress();
925+
926+
if (am_cascading_walsender)
927+
GetXLogReplayRecPtr(&currTLI);
928+
else
929+
currTLI = GetWALInsertionTimeLine();
930+
917931
XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
918932
sendTimeLineIsHistoric = (state->currTLI != currTLI);
919933
sendTimeLine = state->currTLI;
920934
sendTimeLineValidUpto = state->currTLIValidUntil;
921935
sendTimeLineNextTLI = state->nextTLI;
922936

923-
/* make sure we have enough WAL available */
924-
flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
925-
926937
/* fail if not (implies we are going to shut down) */
927938
if (flushptr < targetPagePtr + reqLen)
928939
return -1;
@@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
937948
cur_page,
938949
targetPagePtr,
939950
XLOG_BLCKSZ,
940-
state->seg.ws_tli, /* Pass the current TLI because only
941-
* WalSndSegmentOpen controls whether new
942-
* TLI is needed. */
951+
currTLI, /* Pass the current TLI because only
952+
* WalSndSegmentOpen controls whether new TLI
953+
* is needed. */
943954
&errinfo))
944955
WALReadRaiseError(&errinfo);
945956

@@ -3076,10 +3087,14 @@ XLogSendLogical(void)
30763087
* If first time through in this session, initialize flushPtr. Otherwise,
30773088
* we only need to update flushPtr if EndRecPtr is past it.
30783089
*/
3079-
if (flushPtr == InvalidXLogRecPtr)
3080-
flushPtr = GetFlushRecPtr(NULL);
3081-
else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3082-
flushPtr = GetFlushRecPtr(NULL);
3090+
if (flushPtr == InvalidXLogRecPtr ||
3091+
logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3092+
{
3093+
if (am_cascading_walsender)
3094+
flushPtr = GetStandbyFlushRecPtr(NULL);
3095+
else
3096+
flushPtr = GetFlushRecPtr(NULL);
3097+
}
30833098

30843099
/* If EndRecPtr is still past our flushPtr, it means we caught up. */
30853100
if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
@@ -3170,7 +3185,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
31703185
receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
31713186
replayPtr = GetXLogReplayRecPtr(&replayTLI);
31723187

3173-
*tli = replayTLI;
3188+
if (tli)
3189+
*tli = replayTLI;
31743190

31753191
result = replayPtr;
31763192
if (receiveTLI == replayTLI && receivePtr > replayPtr)

‎src/include/access/xlog.h

Copy file name to clipboardExpand all lines: src/include/access/xlog.h
+1
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
230230
extern void BootStrapXLOG(void);
231231
extern void InitializeWalConsistencyChecking(void);
232232
extern void LocalProcessControlFile(bool reset);
233+
extern WalLevel GetActiveWalLevelOnStandby(void);
233234
extern void StartupXLOG(void);
234235
extern void ShutdownXLOG(int code, Datum arg);
235236
extern void CreateCheckPoint(int flags);

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.