Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2ec005b

Browse filesBrowse files
author
Amit Kapila
committed
Ensure that the sync slots reach a consistent state after promotion without losing data.
We were directly copying the LSN locations while syncing the slots on the standby. Now, it is possible that at some particular restart_lsn there are some running xacts, which means if we start reading the WAL from that location after promotion, we won't reach a consistent snapshot state at that point. However, on the primary, we would have already been in a consistent snapshot state at that restart_lsn so we would have just serialized the existing snapshot. To avoid this problem we will use the advance_slot functionality unless the snapshot already exists at the synced restart_lsn location. This will help us to ensure that snapbuilder/slot statuses are updated properly without generating any changes. Note that the synced slot will remain as RS_TEMPORARY till the decoding from corresponding restart_lsn can reach a consistent snapshot state after which they will be marked as RS_PERSISTENT. Per buildfarm Author: Hou Zhijie Reviewed-by: Bertrand Drouvot, Shveta Malik, Bharath Rupireddy, Amit Kapila Discussion: https://postgr.es/m/OS0PR01MB5716B3942AE49F3F725ACA92943B2@OS0PR01MB5716.jpnprd01.prod.outlook.com
1 parent e37662f commit 2ec005b
Copy full SHA for 2ec005b

File tree

Expand file treeCollapse file tree

7 files changed

+351
-171
lines changed
Filter options
Expand file treeCollapse file tree

7 files changed

+351
-171
lines changed

‎src/backend/replication/logical/logical.c

Copy file name to clipboardExpand all lines: src/backend/replication/logical/logical.c
+143-4Lines changed: 143 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "replication/decode.h"
3737
#include "replication/logical.h"
3838
#include "replication/reorderbuffer.h"
39+
#include "replication/slotsync.h"
3940
#include "replication/snapbuild.h"
4041
#include "storage/proc.h"
4142
#include "storage/procarray.h"
@@ -516,17 +517,23 @@ CreateDecodingContext(XLogRecPtr start_lsn,
516517
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
517518
errmsg("cannot use physical replication slot for logical decoding")));
518519

519-
if (slot->data.database != MyDatabaseId)
520+
/*
521+
* We need to access the system tables during decoding to build the
522+
* logical changes unless we are in fast_forward mode where no changes are
523+
* generated.
524+
*/
525+
if (slot->data.database != MyDatabaseId && !fast_forward)
520526
ereport(ERROR,
521527
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
522528
errmsg("replication slot \"%s\" was not created in this database",
523529
NameStr(slot->data.name))));
524530

525531
/*
526-
* Do not allow consumption of a "synchronized" slot until the standby
527-
* gets promoted.
532+
* The slots being synced from the primary can't be used for decoding as
533+
* they are used after failover. However, we do allow advancing the LSNs
534+
* during the synchronization of slots. See update_local_synced_slot.
528535
*/
529-
if (RecoveryInProgress() && slot->data.synced)
536+
if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
530537
ereport(ERROR,
531538
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
532539
errmsg("cannot use replication slot \"%s\" for logical decoding",
@@ -2034,3 +2041,135 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
20342041

20352042
return has_pending_wal;
20362043
}
2044+
2045+
/*
2046+
* Helper function for advancing our logical replication slot forward.
2047+
*
2048+
* The slot's restart_lsn is used as start point for reading records, while
2049+
* confirmed_flush is used as base point for the decoding context.
2050+
*
2051+
* We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
2052+
* because we need to digest WAL to advance restart_lsn allowing to recycle
2053+
* WAL and removal of old catalog tuples. As decoding is done in fast_forward
2054+
* mode, no changes are generated anyway.
2055+
*
2056+
* *found_consistent_snapshot will be true if the initial decoding snapshot has
2057+
* been built; Otherwise, it will be false.
2058+
*/
2059+
XLogRecPtr
2060+
LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
2061+
bool *found_consistent_snapshot)
2062+
{
2063+
LogicalDecodingContext *ctx;
2064+
ResourceOwner old_resowner = CurrentResourceOwner;
2065+
XLogRecPtr retlsn;
2066+
2067+
Assert(moveto != InvalidXLogRecPtr);
2068+
2069+
if (found_consistent_snapshot)
2070+
*found_consistent_snapshot = false;
2071+
2072+
PG_TRY();
2073+
{
2074+
/*
2075+
* Create our decoding context in fast_forward mode, passing start_lsn
2076+
* as InvalidXLogRecPtr, so that we start processing from my slot's
2077+
* confirmed_flush.
2078+
*/
2079+
ctx = CreateDecodingContext(InvalidXLogRecPtr,
2080+
NIL,
2081+
true, /* fast_forward */
2082+
XL_ROUTINE(.page_read = read_local_xlog_page,
2083+
.segment_open = wal_segment_open,
2084+
.segment_close = wal_segment_close),
2085+
NULL, NULL, NULL);
2086+
2087+
/*
2088+
* Wait for specified streaming replication standby servers (if any)
2089+
* to confirm receipt of WAL up to moveto lsn.
2090+
*/
2091+
WaitForStandbyConfirmation(moveto);
2092+
2093+
/*
2094+
* Start reading at the slot's restart_lsn, which we know to point to
2095+
* a valid record.
2096+
*/
2097+
XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
2098+
2099+
/* invalidate non-timetravel entries */
2100+
InvalidateSystemCaches();
2101+
2102+
/* Decode records until we reach the requested target */
2103+
while (ctx->reader->EndRecPtr < moveto)
2104+
{
2105+
char *errm = NULL;
2106+
XLogRecord *record;
2107+
2108+
/*
2109+
* Read records. No changes are generated in fast_forward mode,
2110+
* but snapbuilder/slot statuses are updated properly.
2111+
*/
2112+
record = XLogReadRecord(ctx->reader, &errm);
2113+
if (errm)
2114+
elog(ERROR, "could not find record while advancing replication slot: %s",
2115+
errm);
2116+
2117+
/*
2118+
* Process the record. Storage-level changes are ignored in
2119+
* fast_forward mode, but other modules (such as snapbuilder)
2120+
* might still have critical updates to do.
2121+
*/
2122+
if (record)
2123+
LogicalDecodingProcessRecord(ctx, ctx->reader);
2124+
2125+
CHECK_FOR_INTERRUPTS();
2126+
}
2127+
2128+
if (found_consistent_snapshot && DecodingContextReady(ctx))
2129+
*found_consistent_snapshot = true;
2130+
2131+
/*
2132+
* Logical decoding could have clobbered CurrentResourceOwner during
2133+
* transaction management, so restore the executor's value. (This is
2134+
* a kluge, but it's not worth cleaning up right now.)
2135+
*/
2136+
CurrentResourceOwner = old_resowner;
2137+
2138+
if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
2139+
{
2140+
LogicalConfirmReceivedLocation(moveto);
2141+
2142+
/*
2143+
* If only the confirmed_flush LSN has changed the slot won't get
2144+
* marked as dirty by the above. Callers on the walsender
2145+
* interface are expected to keep track of their own progress and
2146+
* don't need it written out. But SQL-interface users cannot
2147+
* specify their own start positions and it's harder for them to
2148+
* keep track of their progress, so we should make more of an
2149+
* effort to save it for them.
2150+
*
2151+
* Dirty the slot so it is written out at the next checkpoint. The
2152+
* LSN position advanced to may still be lost on a crash but this
2153+
* makes the data consistent after a clean shutdown.
2154+
*/
2155+
ReplicationSlotMarkDirty();
2156+
}
2157+
2158+
retlsn = MyReplicationSlot->data.confirmed_flush;
2159+
2160+
/* free context, call shutdown callback */
2161+
FreeDecodingContext(ctx);
2162+
2163+
InvalidateSystemCaches();
2164+
}
2165+
PG_CATCH();
2166+
{
2167+
/* clear all timetravel entries */
2168+
InvalidateSystemCaches();
2169+
2170+
PG_RE_THROW();
2171+
}
2172+
PG_END_TRY();
2173+
2174+
return retlsn;
2175+
}

‎src/backend/replication/logical/slotsync.c

Copy file name to clipboardExpand all lines: src/backend/replication/logical/slotsync.c
+96-37Lines changed: 96 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,15 @@
2525
* which slot sync worker can perform the sync periodically or user can call
2626
* pg_sync_replication_slots() periodically to perform the syncs.
2727
*
28+
* If synchronized slots fail to build a consistent snapshot from the
29+
* restart_lsn before reaching confirmed_flush_lsn, they would become
30+
* unreliable after promotion due to potential data loss from changes
31+
* before reaching a consistent point. This can happen because the slots can
32+
* be synced at some random time and we may not reach the consistent point
33+
* at the same WAL location as the primary. So, we mark such slots as
34+
* RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
35+
* consistent point, they will be marked as RS_PERSISTENT.
36+
*
2837
* The slot sync worker waits for some time before the next synchronization,
2938
* with the duration varying based on whether any slots were updated during
3039
* the last cycle. Refer to the comments above wait_for_slot_activity() for
@@ -49,8 +58,9 @@
4958
#include "postmaster/fork_process.h"
5059
#include "postmaster/interrupt.h"
5160
#include "postmaster/postmaster.h"
52-
#include "replication/slot.h"
61+
#include "replication/logical.h"
5362
#include "replication/slotsync.h"
63+
#include "replication/snapbuild.h"
5464
#include "storage/ipc.h"
5565
#include "storage/lmgr.h"
5666
#include "storage/proc.h"
@@ -147,50 +157,85 @@ static void slotsync_failure_callback(int code, Datum arg);
147157
*
148158
* If no update was needed (the data of the remote slot is the same as the
149159
* local slot) return false, otherwise true.
160+
*
161+
* *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
162+
* modified, and decoding from the corresponding LSN's can reach a
163+
* consistent snapshot.
150164
*/
151165
static bool
152-
update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
166+
update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
167+
bool *found_consistent_snapshot)
153168
{
154169
ReplicationSlot *slot = MyReplicationSlot;
155-
bool xmin_changed;
156-
bool restart_lsn_changed;
157-
NameData plugin_name;
170+
bool slot_updated = false;
158171

159172
Assert(slot->data.invalidated == RS_INVAL_NONE);
160173

161-
xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin);
162-
restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn);
174+
if (found_consistent_snapshot)
175+
*found_consistent_snapshot = false;
163176

164-
if (!xmin_changed &&
165-
!restart_lsn_changed &&
166-
remote_dbid == slot->data.database &&
167-
remote_slot->two_phase == slot->data.two_phase &&
168-
remote_slot->failover == slot->data.failover &&
169-
remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
170-
strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
171-
return false;
177+
if (remote_slot->confirmed_lsn != slot->data.confirmed_flush ||
178+
remote_slot->restart_lsn != slot->data.restart_lsn ||
179+
remote_slot->catalog_xmin != slot->data.catalog_xmin)
180+
{
181+
/*
182+
* We can't directly copy the remote slot's LSN or xmin unless there
183+
* exists a consistent snapshot at that point. Otherwise, after
184+
* promotion, the slots may not reach a consistent point before the
185+
* confirmed_flush_lsn which can lead to a data loss. To avoid data
186+
* loss, we let slot machinery advance the slot which ensures that
187+
* snapbuilder/slot statuses are updated properly.
188+
*/
189+
if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
190+
{
191+
/*
192+
* Update the slot info directly if there is a serialized snapshot
193+
* at the restart_lsn, as the slot can quickly reach consistency
194+
* at restart_lsn by restoring the snapshot.
195+
*/
196+
SpinLockAcquire(&slot->mutex);
197+
slot->data.restart_lsn = remote_slot->restart_lsn;
198+
slot->data.confirmed_flush = remote_slot->confirmed_lsn;
199+
slot->data.catalog_xmin = remote_slot->catalog_xmin;
200+
slot->effective_catalog_xmin = remote_slot->catalog_xmin;
201+
SpinLockRelease(&slot->mutex);
172202

173-
/* Avoid expensive operations while holding a spinlock. */
174-
namestrcpy(&plugin_name, remote_slot->plugin);
175-
176-
SpinLockAcquire(&slot->mutex);
177-
slot->data.plugin = plugin_name;
178-
slot->data.database = remote_dbid;
179-
slot->data.two_phase = remote_slot->two_phase;
180-
slot->data.failover = remote_slot->failover;
181-
slot->data.restart_lsn = remote_slot->restart_lsn;
182-
slot->data.confirmed_flush = remote_slot->confirmed_lsn;
183-
slot->data.catalog_xmin = remote_slot->catalog_xmin;
184-
slot->effective_catalog_xmin = remote_slot->catalog_xmin;
185-
SpinLockRelease(&slot->mutex);
186-
187-
if (xmin_changed)
188-
ReplicationSlotsComputeRequiredXmin(false);
203+
if (found_consistent_snapshot)
204+
*found_consistent_snapshot = true;
205+
}
206+
else
207+
{
208+
LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
209+
found_consistent_snapshot);
210+
}
189211

190-
if (restart_lsn_changed)
212+
ReplicationSlotsComputeRequiredXmin(false);
191213
ReplicationSlotsComputeRequiredLSN();
192214

193-
return true;
215+
slot_updated = true;
216+
}
217+
218+
if (remote_dbid != slot->data.database ||
219+
remote_slot->two_phase != slot->data.two_phase ||
220+
remote_slot->failover != slot->data.failover ||
221+
strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
222+
{
223+
NameData plugin_name;
224+
225+
/* Avoid expensive operations while holding a spinlock. */
226+
namestrcpy(&plugin_name, remote_slot->plugin);
227+
228+
SpinLockAcquire(&slot->mutex);
229+
slot->data.plugin = plugin_name;
230+
slot->data.database = remote_dbid;
231+
slot->data.two_phase = remote_slot->two_phase;
232+
slot->data.failover = remote_slot->failover;
233+
SpinLockRelease(&slot->mutex);
234+
235+
slot_updated = true;
236+
}
237+
238+
return slot_updated;
194239
}
195240

196241
/*
@@ -413,6 +458,7 @@ static bool
413458
update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
414459
{
415460
ReplicationSlot *slot = MyReplicationSlot;
461+
bool found_consistent_snapshot = false;
416462

417463
/*
418464
* Check if the primary server has caught up. Refer to the comment atop
@@ -443,9 +489,22 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
443489
return false;
444490
}
445491

446-
/* First time slot update, the function must return true */
447-
if (!update_local_synced_slot(remote_slot, remote_dbid))
448-
elog(ERROR, "failed to update slot");
492+
(void) update_local_synced_slot(remote_slot, remote_dbid,
493+
&found_consistent_snapshot);
494+
495+
/*
496+
* Don't persist the slot if it cannot reach the consistent point from the
497+
* restart_lsn. See comments atop this file.
498+
*/
499+
if (!found_consistent_snapshot)
500+
{
501+
ereport(LOG,
502+
errmsg("could not sync slot \"%s\"", remote_slot->name),
503+
errdetail("Logical decoding cannot find consistent point from local slot's LSN %X/%X.",
504+
LSN_FORMAT_ARGS(slot->data.restart_lsn)));
505+
506+
return false;
507+
}
449508

450509
ReplicationSlotPersist();
451510

@@ -578,7 +637,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
578637
LSN_FORMAT_ARGS(remote_slot->restart_lsn));
579638

580639
/* Make sure the slot changes persist across server restart */
581-
if (update_local_synced_slot(remote_slot, remote_dbid))
640+
if (update_local_synced_slot(remote_slot, remote_dbid, NULL))
582641
{
583642
ReplicationSlotMarkDirty();
584643
ReplicationSlotSave();

‎src/backend/replication/logical/snapbuild.c

Copy file name to clipboardExpand all lines: src/backend/replication/logical/snapbuild.c
+23Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2134,3 +2134,26 @@ CheckPointSnapBuild(void)
21342134
}
21352135
FreeDir(snap_dir);
21362136
}
2137+
2138+
/*
2139+
* Check if a logical snapshot at the specified point has been serialized.
2140+
*/
2141+
bool
2142+
SnapBuildSnapshotExists(XLogRecPtr lsn)
2143+
{
2144+
char path[MAXPGPATH];
2145+
int ret;
2146+
struct stat stat_buf;
2147+
2148+
sprintf(path, "pg_logical/snapshots/%X-%X.snap",
2149+
LSN_FORMAT_ARGS(lsn));
2150+
2151+
ret = stat(path, &stat_buf);
2152+
2153+
if (ret != 0 && errno != ENOENT)
2154+
ereport(ERROR,
2155+
(errcode_for_file_access(),
2156+
errmsg("could not stat file \"%s\": %m", path)));
2157+
2158+
return ret == 0;
2159+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.