|
40 | 40 | $node_publisher->safe_psql('postgres',
|
41 | 41 | "CREATE PUBLICATION tpub FOR ALL TABLES");
|
42 | 42 |
|
| 43 | +# ------------------------------------------------------ |
| 44 | +# Ensure binary mode also executes COPY in binary format |
| 45 | +# ------------------------------------------------------ |
| 46 | + |
| 47 | +# Insert some content before creating a subscription |
| 48 | +$node_publisher->safe_psql( |
| 49 | + 'postgres', qq( |
| 50 | + INSERT INTO public.test_numerical (a, b, c, d) VALUES |
| 51 | + (1, 1.2, 1.3, 10), |
| 52 | + (2, 2.2, 2.3, 20); |
| 53 | + INSERT INTO public.test_arrays (a, b, c) VALUES |
| 54 | + ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'), |
| 55 | + ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}'); |
| 56 | + )); |
| 57 | + |
43 | 58 | my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres';
|
44 | 59 | $node_subscriber->safe_psql('postgres',
|
45 | 60 | "CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' "
|
46 | 61 | . "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)");
|
47 | 62 |
|
| 63 | +# Ensure the COPY command is executed in binary format on the publisher |
| 64 | +$node_publisher->wait_for_log( |
| 65 | + qr/LOG: ( [A-Z0-9]+:)? statement: COPY (.+)? TO STDOUT WITH \(FORMAT binary\)/ |
| 66 | +); |
| 67 | + |
48 | 68 | # Ensure nodes are in sync with each other
|
49 | 69 | $node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
|
50 | 70 |
|
| 71 | +my $sync_check = qq( |
| 72 | + SELECT a, b, c, d FROM test_numerical ORDER BY a; |
| 73 | + SELECT a, b, c FROM test_arrays ORDER BY a; |
| 74 | +); |
| 75 | + |
| 76 | +# Check the synced data on the subscriber |
| 77 | +my $result = $node_subscriber->safe_psql('postgres', $sync_check); |
| 78 | + |
| 79 | +is( $result, '1|1.2|1.3|10 |
| 80 | +2|2.2|2.3|20 |
| 81 | +{1,2,3}|{1.1,1.2,1.3}|{one,two,three} |
| 82 | +{3,1,2}|{1.3,1.1,1.2}|{three,one,two}', 'check synced data on subscriber'); |
| 83 | + |
| 84 | +# ---------------------------------- |
| 85 | +# Ensure apply works in binary mode |
| 86 | +# ---------------------------------- |
| 87 | + |
51 | 88 | # Insert some content and make sure it's replicated across
|
52 | 89 | $node_publisher->safe_psql(
|
53 | 90 | 'postgres', qq(
|
54 | 91 | INSERT INTO public.test_arrays (a, b, c) VALUES
|
55 |
| - ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'), |
56 |
| - ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}'); |
| 92 | + ('{2,1,3}', '{1.2, 1.1, 1.3}', '{"two", "one", "three"}'), |
| 93 | + ('{1,3,2}', '{1.1, 1.3, 1.2}', '{"one", "three", "two"}'); |
57 | 94 |
|
58 | 95 | INSERT INTO public.test_numerical (a, b, c, d) VALUES
|
59 |
| - (1, 1.2, 1.3, 10), |
60 |
| - (2, 2.2, 2.3, 20), |
61 |
| - (3, 3.2, 3.3, 30); |
| 96 | + (3, 3.2, 3.3, 30), |
| 97 | + (4, 4.2, 4.3, 40); |
62 | 98 | ));
|
63 | 99 |
|
64 | 100 | $node_publisher->wait_for_catchup('tsub');
|
65 | 101 |
|
66 |
| -my $result = $node_subscriber->safe_psql('postgres', |
| 102 | +$result = $node_subscriber->safe_psql('postgres', |
67 | 103 | "SELECT a, b, c, d FROM test_numerical ORDER BY a");
|
68 | 104 |
|
69 | 105 | is( $result, '1|1.2|1.3|10
|
70 | 106 | 2|2.2|2.3|20
|
71 |
| -3|3.2|3.3|30', 'check replicated data on subscriber'); |
| 107 | +3|3.2|3.3|30 |
| 108 | +4|4.2|4.3|40', 'check replicated data on subscriber'); |
72 | 109 |
|
73 | 110 | # Test updates as well
|
74 | 111 | $node_publisher->safe_psql(
|
|
83 | 120 | "SELECT a, b, c FROM test_arrays ORDER BY a");
|
84 | 121 |
|
85 | 122 | is( $result, '{1,2,3}|{42,1.2,1.3}|
|
| 123 | +{1,3,2}|{42,1.3,1.2}| |
| 124 | +{2,1,3}|{42,1.1,1.3}| |
86 | 125 | {3,1,2}|{42,1.1,1.2}|', 'check updated replicated data on subscriber');
|
87 | 126 |
|
88 | 127 | $result = $node_subscriber->safe_psql('postgres',
|
89 | 128 | "SELECT a, b, c, d FROM test_numerical ORDER BY a");
|
90 | 129 |
|
91 | 130 | is( $result, '1|42||10
|
92 | 131 | 2|42||20
|
93 |
| -3|42||30', 'check updated replicated data on subscriber'); |
| 132 | +3|42||30 |
| 133 | +4|42||40', 'check updated replicated data on subscriber'); |
| 134 | + |
| 135 | +# ------------------------------------------------------------------------------ |
| 136 | +# Use ALTER SUBSCRIPTION to change to text format and then back to binary format |
| 137 | +# ------------------------------------------------------------------------------ |
94 | 138 |
|
95 | 139 | # Test to reset back to text formatting, and then to binary again
|
96 | 140 | $node_subscriber->safe_psql('postgres',
|
|
99 | 143 | $node_publisher->safe_psql(
|
100 | 144 | 'postgres', qq(
|
101 | 145 | INSERT INTO public.test_numerical (a, b, c, d) VALUES
|
102 |
| - (4, 4.2, 4.3, 40); |
| 146 | + (5, 5.2, 5.3, 50); |
103 | 147 | ));
|
104 | 148 |
|
105 | 149 | $node_publisher->wait_for_catchup('tsub');
|
|
110 | 154 | is( $result, '1|42||10
|
111 | 155 | 2|42||20
|
112 | 156 | 3|42||30
|
113 |
| -4|4.2|4.3|40', 'check replicated data on subscriber'); |
| 157 | +4|42||40 |
| 158 | +5|5.2|5.3|50', 'check replicated data on subscriber'); |
114 | 159 |
|
115 | 160 | $node_subscriber->safe_psql('postgres',
|
116 | 161 | "ALTER SUBSCRIPTION tsub SET (binary = true);");
|
|
127 | 172 | "SELECT a, b, c FROM test_arrays ORDER BY a");
|
128 | 173 |
|
129 | 174 | is( $result, '{1,2,3}|{42,1.2,1.3}|
|
| 175 | +{1,3,2}|{42,1.3,1.2}| |
| 176 | +{2,1,3}|{42,1.1,1.3}| |
130 | 177 | {2,3,1}|{1.2,1.3,1.1}|{two,three,one}
|
131 | 178 | {3,1,2}|{42,1.1,1.2}|', 'check replicated data on subscriber');
|
132 | 179 |
|
| 180 | +# --------------------------------------------------------------- |
| 181 | +# Test binary replication without and with send/receive functions |
| 182 | +# --------------------------------------------------------------- |
| 183 | + |
| 184 | +# Create a custom type without send/rcv functions |
| 185 | +$ddl = qq( |
| 186 | + CREATE TYPE myvarchar; |
| 187 | + CREATE FUNCTION myvarcharin(cstring, oid, integer) RETURNS myvarchar |
| 188 | + LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharin'; |
| 189 | + CREATE FUNCTION myvarcharout(myvarchar) RETURNS cstring |
| 190 | + LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharout'; |
| 191 | + CREATE TYPE myvarchar ( |
| 192 | + input = myvarcharin, |
| 193 | + output = myvarcharout); |
| 194 | + CREATE TABLE public.test_myvarchar ( |
| 195 | + a myvarchar |
| 196 | + );); |
| 197 | + |
| 198 | +$node_publisher->safe_psql('postgres', $ddl); |
| 199 | +$node_subscriber->safe_psql('postgres', $ddl); |
| 200 | + |
| 201 | +# Insert some initial data |
| 202 | +$node_publisher->safe_psql( |
| 203 | + 'postgres', qq( |
| 204 | + INSERT INTO public.test_myvarchar (a) VALUES |
| 205 | + ('a'); |
| 206 | + )); |
| 207 | + |
| 208 | +# Check the subscriber log from now on. |
| 209 | +my $offset = -s $node_subscriber->logfile; |
| 210 | + |
| 211 | +# Refresh the publication to trigger the tablesync |
| 212 | +$node_subscriber->safe_psql('postgres', |
| 213 | + "ALTER SUBSCRIPTION tsub REFRESH PUBLICATION"); |
| 214 | + |
| 215 | +# It should fail |
| 216 | +$node_subscriber->wait_for_log( |
| 217 | + qr/ERROR: ( [A-Z0-9]+:)? no binary input function available for type/, |
| 218 | + $offset); |
| 219 | + |
| 220 | +# Create and set send/rcv functions for the custom type |
| 221 | +$ddl = qq( |
| 222 | + CREATE FUNCTION myvarcharsend(myvarchar) RETURNS bytea |
| 223 | + LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharsend'; |
| 224 | + CREATE FUNCTION myvarcharrecv(internal, oid, integer) RETURNS myvarchar |
| 225 | + LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharrecv'; |
| 226 | + ALTER TYPE myvarchar SET ( |
| 227 | + send = myvarcharsend, |
| 228 | + receive = myvarcharrecv |
| 229 | + );); |
| 230 | + |
| 231 | +$node_publisher->safe_psql('postgres', $ddl); |
| 232 | +$node_subscriber->safe_psql('postgres', $ddl); |
| 233 | + |
| 234 | +# Now tablesync should succeed |
| 235 | +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub'); |
| 236 | + |
| 237 | +# Check the synced data on the subscriber |
| 238 | +$result = |
| 239 | + $node_subscriber->safe_psql('postgres', 'SELECT a FROM test_myvarchar;'); |
| 240 | + |
| 241 | +is($result, 'a', 'check synced data on subscriber with custom type'); |
| 242 | + |
| 243 | +# ----------------------------------------------------- |
| 244 | +# Test mismatched column types with/without binary mode |
| 245 | +# ----------------------------------------------------- |
| 246 | + |
| 247 | +# Test syncing tables with mismatching column types |
| 248 | +$node_publisher->safe_psql( |
| 249 | + 'postgres', qq( |
| 250 | + CREATE TABLE public.test_mismatching_types ( |
| 251 | + a bigint PRIMARY KEY |
| 252 | + ); |
| 253 | + INSERT INTO public.test_mismatching_types (a) |
| 254 | + VALUES (1), (2); |
| 255 | + )); |
| 256 | + |
| 257 | +# Check the subscriber log from now on. |
| 258 | +$offset = -s $node_subscriber->logfile; |
| 259 | + |
| 260 | +$node_subscriber->safe_psql( |
| 261 | + 'postgres', qq( |
| 262 | + CREATE TABLE public.test_mismatching_types ( |
| 263 | + a int PRIMARY KEY |
| 264 | + ); |
| 265 | + ALTER SUBSCRIPTION tsub REFRESH PUBLICATION; |
| 266 | + )); |
| 267 | + |
| 268 | +# Cannot sync due to type mismatch |
| 269 | +$node_subscriber->wait_for_log( |
| 270 | + qr/ERROR: ( [A-Z0-9]+:)? incorrect binary data format/, $offset); |
| 271 | + |
| 272 | +# Check the publisher log from now on. |
| 273 | +$offset = -s $node_publisher->logfile; |
| 274 | + |
| 275 | +# Setting binary to false should allow syncing |
| 276 | +$node_subscriber->safe_psql( |
| 277 | + 'postgres', qq( |
| 278 | + ALTER SUBSCRIPTION tsub SET (binary = false);)); |
| 279 | + |
| 280 | +# Ensure the COPY command is executed in text format on the publisher |
| 281 | +$node_publisher->wait_for_log( |
| 282 | + qr/LOG: ( [A-Z0-9]+:)? statement: COPY (.+)? TO STDOUT\n/, $offset); |
| 283 | + |
| 284 | +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub'); |
| 285 | + |
| 286 | +# Check the synced data on the subscriber |
| 287 | +$result = $node_subscriber->safe_psql('postgres', |
| 288 | + 'SELECT a FROM test_mismatching_types ORDER BY a;'); |
| 289 | + |
| 290 | +is( $result, '1 |
| 291 | +2', 'check synced data on subscriber with binary = false'); |
| 292 | + |
133 | 293 | $node_subscriber->stop('fast');
|
134 | 294 | $node_publisher->stop('fast');
|
135 | 295 |
|
|
0 commit comments