Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4867f8a

Browse filesBrowse files
author
Amit Kapila
committed
Drop pre-existing subscriptions from the converted subscriber.
We don't need the pre-existing subscriptions on the newly formed subscriber by using pg_createsubscriber. The apply workers corresponding to these subscriptions can connect to other publisher nodes and either get some unwarranted data or can lead to ERRORs in connecting to such nodes. Author: Kuroda Hayato Reviewed-by: Amit Kapila, Shlok Kyal, Vignesh C Backpatch-through: 17 Discussion: https://postgr.es/m/OSBPR01MB25526A30A1FBF863ACCDDA3AF5C92@OSBPR01MB2552.jpnprd01.prod.outlook.com
1 parent 8f8bcb8 commit 4867f8a
Copy full SHA for 4867f8a

File tree

Expand file treeCollapse file tree

2 files changed

+120
-5
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+120
-5
lines changed

‎src/bin/pg_basebackup/pg_createsubscriber.c

Copy file name to clipboardExpand all lines: src/bin/pg_basebackup/pg_createsubscriber.c
+106-5Lines changed: 106 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
9292
const char *slot_name);
9393
static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
9494
static void start_standby_server(const struct CreateSubscriberOptions *opt,
95-
bool restricted_access);
95+
bool restricted_access,
96+
bool restrict_logical_worker);
9697
static void stop_standby_server(const char *datadir);
9798
static void wait_for_end_recovery(const char *conninfo,
9899
const struct CreateSubscriberOptions *opt);
@@ -102,6 +103,10 @@ static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinf
102103
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
103104
const char *lsn);
104105
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
106+
static void check_and_drop_existing_subscriptions(PGconn *conn,
107+
const struct LogicalRepInfo *dbinfo);
108+
static void drop_existing_subscriptions(PGconn *conn, const char *subname,
109+
const char *dbname);
105110

106111
#define USEC_PER_SEC 1000000
107112
#define WAIT_INTERVAL 1 /* 1 second */
@@ -1025,6 +1030,87 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
10251030
exit(1);
10261031
}
10271032

1033+
/*
1034+
* Drop a specified subscription. This is to avoid duplicate subscriptions on
1035+
* the primary (publisher node) and the newly created subscriber. We
1036+
* shouldn't drop the associated slot as that would be used by the publisher
1037+
* node.
1038+
*/
1039+
static void
1040+
drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
1041+
{
1042+
PQExpBuffer query = createPQExpBuffer();
1043+
PGresult *res;
1044+
1045+
Assert(conn != NULL);
1046+
1047+
/*
1048+
* Construct a query string. These commands are allowed to be executed
1049+
* within a transaction.
1050+
*/
1051+
appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1052+
subname);
1053+
appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1054+
subname);
1055+
appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1056+
1057+
pg_log_info("dropping subscription \"%s\" on database \"%s\"",
1058+
subname, dbname);
1059+
1060+
if (!dry_run)
1061+
{
1062+
res = PQexec(conn, query->data);
1063+
1064+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
1065+
{
1066+
pg_log_error("could not drop a subscription \"%s\" settings: %s",
1067+
subname, PQresultErrorMessage(res));
1068+
disconnect_database(conn, true);
1069+
}
1070+
1071+
PQclear(res);
1072+
}
1073+
1074+
destroyPQExpBuffer(query);
1075+
}
1076+
1077+
/*
1078+
* Retrieve and drop the pre-existing subscriptions.
1079+
*/
1080+
static void
1081+
check_and_drop_existing_subscriptions(PGconn *conn,
1082+
const struct LogicalRepInfo *dbinfo)
1083+
{
1084+
PQExpBuffer query = createPQExpBuffer();
1085+
char *dbname;
1086+
PGresult *res;
1087+
1088+
Assert(conn != NULL);
1089+
1090+
dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1091+
1092+
appendPQExpBuffer(query,
1093+
"SELECT s.subname FROM pg_catalog.pg_subscription s "
1094+
"INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1095+
"WHERE d.datname = %s",
1096+
dbname);
1097+
res = PQexec(conn, query->data);
1098+
1099+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
1100+
{
1101+
pg_log_error("could not obtain pre-existing subscriptions: %s",
1102+
PQresultErrorMessage(res));
1103+
disconnect_database(conn, true);
1104+
}
1105+
1106+
for (int i = 0; i < PQntuples(res); i++)
1107+
drop_existing_subscriptions(conn, PQgetvalue(res, i, 0),
1108+
dbinfo->dbname);
1109+
1110+
PQclear(res);
1111+
destroyPQExpBuffer(query);
1112+
}
1113+
10281114
/*
10291115
* Create the subscriptions, adjust the initial location for logical
10301116
* replication and enable the subscriptions. That's the last step for logical
@@ -1040,6 +1126,14 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
10401126
/* Connect to subscriber. */
10411127
conn = connect_database(dbinfo[i].subconninfo, true);
10421128

1129+
/*
1130+
* We don't need the pre-existing subscriptions on the newly formed
1131+
* subscriber. They can connect to other publisher nodes and either
1132+
* get some unwarranted data or can lead to ERRORs in connecting to
1133+
* such nodes.
1134+
*/
1135+
check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
1136+
10431137
/*
10441138
* Since the publication was created before the consistent LSN, it is
10451139
* available on the subscriber when the physical replica is promoted.
@@ -1314,7 +1408,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
13141408
}
13151409

13161410
static void
1317-
start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access)
1411+
start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1412+
bool restrict_logical_worker)
13181413
{
13191414
PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
13201415
int rc;
@@ -1343,6 +1438,11 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
13431438
if (opt->config_file != NULL)
13441439
appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
13451440
opt->config_file);
1441+
1442+
/* Suppress to start logical replication if requested */
1443+
if (restrict_logical_worker)
1444+
appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1445+
13461446
pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
13471447
rc = system(pg_ctl_cmd->data);
13481448
pg_ctl_status(pg_ctl_cmd->data, rc);
@@ -2067,7 +2167,7 @@ main(int argc, char **argv)
20672167
* transformation steps.
20682168
*/
20692169
pg_log_info("starting the standby with command-line options");
2070-
start_standby_server(&opt, true);
2170+
start_standby_server(&opt, true, false);
20712171

20722172
/* Check if the standby server is ready for logical replication */
20732173
check_subscriber(dbinfo);
@@ -2098,10 +2198,11 @@ main(int argc, char **argv)
20982198

20992199
/*
21002200
* Start subscriber so the recovery parameters will take effect. Wait
2101-
* until accepting connections.
2201+
* until accepting connections. We don't want to start logical replication
2202+
* during setup.
21022203
*/
21032204
pg_log_info("starting the subscriber");
2104-
start_standby_server(&opt, true);
2205+
start_standby_server(&opt, true, true);
21052206

21062207
/* Waiting the subscriber to be promoted */
21072208
wait_for_end_recovery(dbinfo[0].subconninfo, &opt);

‎src/bin/pg_basebackup/t/040_pg_createsubscriber.pl

Copy file name to clipboardExpand all lines: src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+14Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,13 @@ sub generate_db
298298
"SELECT slot_name FROM pg_replication_slots WHERE slot_name = '$fslotname' AND synced AND NOT temporary"
299299
);
300300
is($result, 'failover_slot', 'failover slot is synced');
301+
302+
# Create subscription to test its removal
303+
my $dummy_sub = 'regress_sub_dummy';
304+
$node_p->safe_psql($db1,
305+
"CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
306+
);
307+
$node_p->wait_for_replay_catchup($node_s);
301308
$node_s->stop;
302309

303310
# dry run mode on node S
@@ -372,6 +379,13 @@ sub generate_db
372379
# Start subscriber
373380
$node_s->start;
374381

382+
# Confirm the pre-existing subscription has been removed
383+
$result = $node_s->safe_psql(
384+
'postgres', qq(
385+
SELECT count(*) FROM pg_subscription WHERE subname = '$dummy_sub'
386+
));
387+
is($result, qq(0), 'pre-existing subscription was dropped');
388+
375389
# Get subscription names
376390
$result = $node_s->safe_psql(
377391
'postgres', qq(

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.