Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 3741f2a

Browse filesBrowse files
author
Amit Kapila
committed
Fix the review comments and a bug in the slot sync code.
Ensure that when updating the catalog_xmin of the synced slots, it is first written to disk before changing the in-memory value (effective_catalog_xmin). This is to prevent a scenario where the in-memory value change triggers a vacuum to remove catalog tuples before the catalog_xmin is written to disk. In the event of a crash before the catalog_xmin is persisted, we would not know that some required catalog tuples have been removed and the synced slot would be invalidated. Change the sanity check to ensure that remote_slot's confirmed_flush LSN can't precede the local/synced slot during slot sync. Note that the restart_lsn of the synced/local slot can be ahead of remote_slot. This can happen when slot advancing machinery finds a running xacts record after reaching the consistent state at a later point than the primary where it serializes the snapshot and updates the restart_lsn. Make the check to sync slots robust by allowing to sync only when the confirmed_lsn, restart_lsn, or catalog_xmin of the remote slot is ahead of the synced/local slot. Reported-by: Amit Kapila and Shveta Malik Author: Hou Zhijie, Shveta Malik Reviewed-by: Amit Kapila, Bertrand Drouvot Discussion: https://postgr.es/m/OS0PR01MB57162B67D3CB01B2756FBA6D94062@OS0PR01MB5716.jpnprd01.prod.outlook.com Discussion: https://postgr.es/m/CAJpy0uCSS5zmdyUXhvw41HSdTbRqX1hbYqkOfHNj7qQ+2zn0AQ@mail.gmail.com
1 parent 3af7040 commit 3741f2a
Copy full SHA for 3741f2a

File tree

Expand file treeCollapse file tree

1 file changed

+119
-44
lines changed
Filter options
Expand file treeCollapse file tree

1 file changed

+119
-44
lines changed

‎src/backend/replication/logical/slotsync.c

Copy file name to clipboardExpand all lines: src/backend/replication/logical/slotsync.c
+119-44Lines changed: 119 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -162,22 +162,78 @@ static void update_synced_slots_inactive_since(void);
162162
* *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
163163
* modified, and decoding from the corresponding LSN's can reach a
164164
* consistent snapshot.
165+
*
166+
* *remote_slot_precedes will be true if the remote slot's LSN or xmin
167+
* precedes locally reserved position.
165168
*/
166169
static bool
167170
update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
168-
bool *found_consistent_snapshot)
171+
bool *found_consistent_snapshot,
172+
bool *remote_slot_precedes)
169173
{
170174
ReplicationSlot *slot = MyReplicationSlot;
171-
bool slot_updated = false;
175+
bool updated_xmin_or_lsn = false;
176+
bool updated_config = false;
172177

173178
Assert(slot->data.invalidated == RS_INVAL_NONE);
174179

175180
if (found_consistent_snapshot)
176181
*found_consistent_snapshot = false;
177182

178-
if (remote_slot->confirmed_lsn != slot->data.confirmed_flush ||
179-
remote_slot->restart_lsn != slot->data.restart_lsn ||
180-
remote_slot->catalog_xmin != slot->data.catalog_xmin)
183+
if (remote_slot_precedes)
184+
*remote_slot_precedes = false;
185+
186+
/*
187+
* Don't overwrite if we already have a newer catalog_xmin and
188+
* restart_lsn.
189+
*/
190+
if (remote_slot->restart_lsn < slot->data.restart_lsn ||
191+
TransactionIdPrecedes(remote_slot->catalog_xmin,
192+
slot->data.catalog_xmin))
193+
{
194+
/*
195+
* This can happen in following situations:
196+
*
197+
* If the slot is temporary, it means either the initial WAL location
198+
* reserved for the local slot is ahead of the remote slot's
199+
* restart_lsn or the initial xmin_horizon computed for the local slot
200+
* is ahead of the remote slot.
201+
*
202+
* If the slot is persistent, restart_lsn of the synced slot could
203+
* still be ahead of the remote slot. Since we use slot advance
204+
* functionality to keep snapbuild/slot updated, it is possible that
205+
* the restart_lsn is advanced to a later position than it has on the
206+
* primary. This can happen when slot advancing machinery finds
207+
* running xacts record after reaching the consistent state at a later
208+
* point than the primary where it serializes the snapshot and updates
209+
* the restart_lsn.
210+
*
211+
* We LOG the message if the slot is temporary as it can help the user
212+
* to understand why the slot is not sync-ready. In the case of a
213+
* persistent slot, it would be a more common case and won't directly
214+
* impact the users, so we used DEBUG1 level to log the message.
215+
*/
216+
ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
217+
errmsg("could not sync slot \"%s\" as remote slot precedes local slot",
218+
remote_slot->name),
219+
errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.",
220+
LSN_FORMAT_ARGS(remote_slot->restart_lsn),
221+
remote_slot->catalog_xmin,
222+
LSN_FORMAT_ARGS(slot->data.restart_lsn),
223+
slot->data.catalog_xmin));
224+
225+
if (remote_slot_precedes)
226+
*remote_slot_precedes = true;
227+
}
228+
229+
/*
230+
* Attempt to sync LSNs and xmins only if remote slot is ahead of local
231+
* slot.
232+
*/
233+
else if (remote_slot->confirmed_lsn > slot->data.confirmed_flush ||
234+
remote_slot->restart_lsn > slot->data.restart_lsn ||
235+
TransactionIdFollows(remote_slot->catalog_xmin,
236+
slot->data.catalog_xmin))
181237
{
182238
/*
183239
* We can't directly copy the remote slot's LSN or xmin unless there
@@ -198,7 +254,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
198254
slot->data.restart_lsn = remote_slot->restart_lsn;
199255
slot->data.confirmed_flush = remote_slot->confirmed_lsn;
200256
slot->data.catalog_xmin = remote_slot->catalog_xmin;
201-
slot->effective_catalog_xmin = remote_slot->catalog_xmin;
202257
SpinLockRelease(&slot->mutex);
203258

204259
if (found_consistent_snapshot)
@@ -208,12 +263,18 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
208263
{
209264
LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
210265
found_consistent_snapshot);
211-
}
212266

213-
ReplicationSlotsComputeRequiredXmin(false);
214-
ReplicationSlotsComputeRequiredLSN();
267+
/* Sanity check */
268+
if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
269+
ereport(ERROR,
270+
errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
271+
remote_slot->name),
272+
errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.",
273+
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
274+
LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
275+
}
215276

216-
slot_updated = true;
277+
updated_xmin_or_lsn = true;
217278
}
218279

219280
if (remote_dbid != slot->data.database ||
@@ -233,10 +294,37 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
233294
slot->data.failover = remote_slot->failover;
234295
SpinLockRelease(&slot->mutex);
235296

236-
slot_updated = true;
297+
updated_config = true;
237298
}
238299

239-
return slot_updated;
300+
/*
301+
* We have to write the changed xmin to disk *before* we change the
302+
* in-memory value, otherwise after a crash we wouldn't know that some
303+
* catalog tuples might have been removed already.
304+
*/
305+
if (updated_config || updated_xmin_or_lsn)
306+
{
307+
ReplicationSlotMarkDirty();
308+
ReplicationSlotSave();
309+
}
310+
311+
/*
312+
* Now the new xmin is safely on disk, we can let the global value
313+
* advance. We do not take ProcArrayLock or similar since we only advance
314+
* xmin here and there's not much harm done by a concurrent computation
315+
* missing that.
316+
*/
317+
if (updated_xmin_or_lsn)
318+
{
319+
SpinLockAcquire(&slot->mutex);
320+
slot->effective_catalog_xmin = remote_slot->catalog_xmin;
321+
SpinLockRelease(&slot->mutex);
322+
323+
ReplicationSlotsComputeRequiredXmin(false);
324+
ReplicationSlotsComputeRequiredLSN();
325+
}
326+
327+
return updated_config || updated_xmin_or_lsn;
240328
}
241329

242330
/*
@@ -460,14 +548,17 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
460548
{
461549
ReplicationSlot *slot = MyReplicationSlot;
462550
bool found_consistent_snapshot = false;
551+
bool remote_slot_precedes = false;
552+
553+
(void) update_local_synced_slot(remote_slot, remote_dbid,
554+
&found_consistent_snapshot,
555+
&remote_slot_precedes);
463556

464557
/*
465558
* Check if the primary server has caught up. Refer to the comment atop
466559
* the file for details on this check.
467560
*/
468-
if (remote_slot->restart_lsn < slot->data.restart_lsn ||
469-
TransactionIdPrecedes(remote_slot->catalog_xmin,
470-
slot->data.catalog_xmin))
561+
if (remote_slot_precedes)
471562
{
472563
/*
473564
* The remote slot didn't catch up to locally reserved position.
@@ -476,23 +567,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
476567
* current location when recreating the slot in the next cycle. It may
477568
* take more time to create such a slot. Therefore, we keep this slot
478569
* and attempt the synchronization in the next cycle.
479-
*
480-
* XXX should this be changed to elog(DEBUG1) perhaps?
481570
*/
482-
ereport(LOG,
483-
errmsg("could not sync slot \"%s\" as remote slot precedes local slot",
484-
remote_slot->name),
485-
errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.",
486-
LSN_FORMAT_ARGS(remote_slot->restart_lsn),
487-
remote_slot->catalog_xmin,
488-
LSN_FORMAT_ARGS(slot->data.restart_lsn),
489-
slot->data.catalog_xmin));
490571
return false;
491572
}
492573

493-
(void) update_local_synced_slot(remote_slot, remote_dbid,
494-
&found_consistent_snapshot);
495-
496574
/*
497575
* Don't persist the slot if it cannot reach the consistent point from the
498576
* restart_lsn. See comments atop this file.
@@ -633,23 +711,20 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
633711
/*
634712
* Sanity check: As long as the invalidations are handled
635713
* appropriately as above, this should never happen.
714+
*
715+
* We don't need to check restart_lsn here. See the comments in
716+
* update_local_synced_slot() for details.
636717
*/
637-
if (remote_slot->restart_lsn < slot->data.restart_lsn)
638-
elog(ERROR,
639-
"cannot synchronize local slot \"%s\" LSN(%X/%X)"
640-
" to remote slot's LSN(%X/%X) as synchronization"
641-
" would move it backwards", remote_slot->name,
642-
LSN_FORMAT_ARGS(slot->data.restart_lsn),
643-
LSN_FORMAT_ARGS(remote_slot->restart_lsn));
644-
645-
/* Make sure the slot changes persist across server restart */
646-
if (update_local_synced_slot(remote_slot, remote_dbid, NULL))
647-
{
648-
ReplicationSlotMarkDirty();
649-
ReplicationSlotSave();
650-
651-
slot_updated = true;
652-
}
718+
if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
719+
ereport(ERROR,
720+
errmsg_internal("cannot synchronize local slot \"%s\"",
721+
remote_slot->name),
722+
errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
723+
LSN_FORMAT_ARGS(slot->data.confirmed_flush),
724+
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
725+
726+
slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
727+
NULL, NULL);
653728
}
654729
}
655730
/* Otherwise create the slot first. */

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.