25
25
* which slot sync worker can perform the sync periodically or user can call
26
26
* pg_sync_replication_slots() periodically to perform the syncs.
27
27
*
28
+ * If synchronized slots fail to build a consistent snapshot from the
29
+ * restart_lsn before reaching confirmed_flush_lsn, they would become
30
+ * unreliable after promotion due to potential data loss from changes
31
+ * before reaching a consistent point. This can happen because the slots can
32
+ * be synced at some random time and we may not reach the consistent point
33
+ * at the same WAL location as the primary. So, we mark such slots as
34
+ * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
35
+ * consistent point, they will be marked as RS_PERSISTENT.
36
+ *
28
37
* The slot sync worker waits for some time before the next synchronization,
29
38
* with the duration varying based on whether any slots were updated during
30
39
* the last cycle. Refer to the comments above wait_for_slot_activity() for
49
58
#include "postmaster/fork_process.h"
50
59
#include "postmaster/interrupt.h"
51
60
#include "postmaster/postmaster.h"
52
- #include "replication/slot .h"
61
+ #include "replication/logical .h"
53
62
#include "replication/slotsync.h"
63
+ #include "replication/snapbuild.h"
54
64
#include "storage/ipc.h"
55
65
#include "storage/lmgr.h"
56
66
#include "storage/proc.h"
@@ -147,50 +157,85 @@ static void slotsync_failure_callback(int code, Datum arg);
147
157
*
148
158
* If no update was needed (the data of the remote slot is the same as the
149
159
* local slot) return false, otherwise true.
160
+ *
161
+ * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
162
+ * modified, and decoding from the corresponding LSN's can reach a
163
+ * consistent snapshot.
150
164
*/
151
165
static bool
152
- update_local_synced_slot (RemoteSlot * remote_slot , Oid remote_dbid )
166
+ update_local_synced_slot (RemoteSlot * remote_slot , Oid remote_dbid ,
167
+ bool * found_consistent_snapshot )
153
168
{
154
169
ReplicationSlot * slot = MyReplicationSlot ;
155
- bool xmin_changed ;
156
- bool restart_lsn_changed ;
157
- NameData plugin_name ;
170
+ bool slot_updated = false;
158
171
159
172
Assert (slot -> data .invalidated == RS_INVAL_NONE );
160
173
161
- xmin_changed = ( remote_slot -> catalog_xmin != slot -> data . catalog_xmin );
162
- restart_lsn_changed = ( remote_slot -> restart_lsn != slot -> data . restart_lsn ) ;
174
+ if ( found_consistent_snapshot )
175
+ * found_consistent_snapshot = false ;
163
176
164
- if (!xmin_changed &&
165
- !restart_lsn_changed &&
166
- remote_dbid == slot -> data .database &&
167
- remote_slot -> two_phase == slot -> data .two_phase &&
168
- remote_slot -> failover == slot -> data .failover &&
169
- remote_slot -> confirmed_lsn == slot -> data .confirmed_flush &&
170
- strcmp (remote_slot -> plugin , NameStr (slot -> data .plugin )) == 0 )
171
- return false;
177
+ if (remote_slot -> confirmed_lsn != slot -> data .confirmed_flush ||
178
+ remote_slot -> restart_lsn != slot -> data .restart_lsn ||
179
+ remote_slot -> catalog_xmin != slot -> data .catalog_xmin )
180
+ {
181
+ /*
182
+ * We can't directly copy the remote slot's LSN or xmin unless there
183
+ * exists a consistent snapshot at that point. Otherwise, after
184
+ * promotion, the slots may not reach a consistent point before the
185
+ * confirmed_flush_lsn which can lead to a data loss. To avoid data
186
+ * loss, we let slot machinery advance the slot which ensures that
187
+ * snapbuilder/slot statuses are updated properly.
188
+ */
189
+ if (SnapBuildSnapshotExists (remote_slot -> restart_lsn ))
190
+ {
191
+ /*
192
+ * Update the slot info directly if there is a serialized snapshot
193
+ * at the restart_lsn, as the slot can quickly reach consistency
194
+ * at restart_lsn by restoring the snapshot.
195
+ */
196
+ SpinLockAcquire (& slot -> mutex );
197
+ slot -> data .restart_lsn = remote_slot -> restart_lsn ;
198
+ slot -> data .confirmed_flush = remote_slot -> confirmed_lsn ;
199
+ slot -> data .catalog_xmin = remote_slot -> catalog_xmin ;
200
+ slot -> effective_catalog_xmin = remote_slot -> catalog_xmin ;
201
+ SpinLockRelease (& slot -> mutex );
172
202
173
- /* Avoid expensive operations while holding a spinlock. */
174
- namestrcpy (& plugin_name , remote_slot -> plugin );
175
-
176
- SpinLockAcquire (& slot -> mutex );
177
- slot -> data .plugin = plugin_name ;
178
- slot -> data .database = remote_dbid ;
179
- slot -> data .two_phase = remote_slot -> two_phase ;
180
- slot -> data .failover = remote_slot -> failover ;
181
- slot -> data .restart_lsn = remote_slot -> restart_lsn ;
182
- slot -> data .confirmed_flush = remote_slot -> confirmed_lsn ;
183
- slot -> data .catalog_xmin = remote_slot -> catalog_xmin ;
184
- slot -> effective_catalog_xmin = remote_slot -> catalog_xmin ;
185
- SpinLockRelease (& slot -> mutex );
186
-
187
- if (xmin_changed )
188
- ReplicationSlotsComputeRequiredXmin (false);
203
+ if (found_consistent_snapshot )
204
+ * found_consistent_snapshot = true;
205
+ }
206
+ else
207
+ {
208
+ LogicalSlotAdvanceAndCheckSnapState (remote_slot -> confirmed_lsn ,
209
+ found_consistent_snapshot );
210
+ }
189
211
190
- if ( restart_lsn_changed )
212
+ ReplicationSlotsComputeRequiredXmin (false);
191
213
ReplicationSlotsComputeRequiredLSN ();
192
214
193
- return true;
215
+ slot_updated = true;
216
+ }
217
+
218
+ if (remote_dbid != slot -> data .database ||
219
+ remote_slot -> two_phase != slot -> data .two_phase ||
220
+ remote_slot -> failover != slot -> data .failover ||
221
+ strcmp (remote_slot -> plugin , NameStr (slot -> data .plugin )) != 0 )
222
+ {
223
+ NameData plugin_name ;
224
+
225
+ /* Avoid expensive operations while holding a spinlock. */
226
+ namestrcpy (& plugin_name , remote_slot -> plugin );
227
+
228
+ SpinLockAcquire (& slot -> mutex );
229
+ slot -> data .plugin = plugin_name ;
230
+ slot -> data .database = remote_dbid ;
231
+ slot -> data .two_phase = remote_slot -> two_phase ;
232
+ slot -> data .failover = remote_slot -> failover ;
233
+ SpinLockRelease (& slot -> mutex );
234
+
235
+ slot_updated = true;
236
+ }
237
+
238
+ return slot_updated ;
194
239
}
195
240
196
241
/*
@@ -413,6 +458,7 @@ static bool
413
458
update_and_persist_local_synced_slot (RemoteSlot * remote_slot , Oid remote_dbid )
414
459
{
415
460
ReplicationSlot * slot = MyReplicationSlot ;
461
+ bool found_consistent_snapshot = false;
416
462
417
463
/*
418
464
* Check if the primary server has caught up. Refer to the comment atop
@@ -443,9 +489,22 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
443
489
return false;
444
490
}
445
491
446
- /* First time slot update, the function must return true */
447
- if (!update_local_synced_slot (remote_slot , remote_dbid ))
448
- elog (ERROR , "failed to update slot" );
492
+ (void ) update_local_synced_slot (remote_slot , remote_dbid ,
493
+ & found_consistent_snapshot );
494
+
495
+ /*
496
+ * Don't persist the slot if it cannot reach the consistent point from the
497
+ * restart_lsn. See comments atop this file.
498
+ */
499
+ if (!found_consistent_snapshot )
500
+ {
501
+ ereport (LOG ,
502
+ errmsg ("could not sync slot \"%s\"" , remote_slot -> name ),
503
+ errdetail ("Logical decoding cannot find consistent point from local slot's LSN %X/%X." ,
504
+ LSN_FORMAT_ARGS (slot -> data .restart_lsn )));
505
+
506
+ return false;
507
+ }
449
508
450
509
ReplicationSlotPersist ();
451
510
@@ -578,7 +637,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
578
637
LSN_FORMAT_ARGS (remote_slot -> restart_lsn ));
579
638
580
639
/* Make sure the slot changes persist across server restart */
581
- if (update_local_synced_slot (remote_slot , remote_dbid ))
640
+ if (update_local_synced_slot (remote_slot , remote_dbid , NULL ))
582
641
{
583
642
ReplicationSlotMarkDirty ();
584
643
ReplicationSlotSave ();
0 commit comments