@@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
58
58
bool have_prep_stmt ; /* have we prepared any stmts in this xact? */
59
59
bool have_error ; /* have any subxacts aborted in this xact? */
60
60
bool changing_xact_state ; /* xact state change in process */
61
+ bool parallel_commit ; /* do we commit (sub)xacts in parallel? */
61
62
bool invalidated ; /* true if reconnect is pending */
62
63
bool keep_connections ; /* setting value of keep_connections
63
64
* server option */
@@ -92,6 +93,9 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
92
93
static void disconnect_pg_server (ConnCacheEntry * entry );
93
94
static void check_conn_params (const char * * keywords , const char * * values , UserMapping * user );
94
95
static void configure_remote_session (PGconn * conn );
96
+ static void do_sql_command_begin (PGconn * conn , const char * sql );
97
+ static void do_sql_command_end (PGconn * conn , const char * sql ,
98
+ bool consume_input );
95
99
static void begin_remote_xact (ConnCacheEntry * entry );
96
100
static void pgfdw_xact_callback (XactEvent event , void * arg );
97
101
static void pgfdw_subxact_callback (SubXactEvent event ,
@@ -100,13 +104,17 @@ static void pgfdw_subxact_callback(SubXactEvent event,
100
104
void * arg );
101
105
static void pgfdw_inval_callback (Datum arg , int cacheid , uint32 hashvalue );
102
106
static void pgfdw_reject_incomplete_xact_state_change (ConnCacheEntry * entry );
107
+ static void pgfdw_reset_xact_state (ConnCacheEntry * entry , bool toplevel );
103
108
static bool pgfdw_cancel_query (PGconn * conn );
104
109
static bool pgfdw_exec_cleanup_query (PGconn * conn , const char * query ,
105
110
bool ignore_errors );
106
111
static bool pgfdw_get_cleanup_result (PGconn * conn , TimestampTz endtime ,
107
112
PGresult * * result , bool * timed_out );
108
113
static void pgfdw_abort_cleanup (ConnCacheEntry * entry , const char * sql ,
109
114
bool toplevel );
115
+ static void pgfdw_finish_pre_commit_cleanup (List * pending_entries );
116
+ static void pgfdw_finish_pre_subcommit_cleanup (List * pending_entries ,
117
+ int curlevel );
110
118
static bool UserMappingPasswordRequired (UserMapping * user );
111
119
static bool disconnect_cached_connections (Oid serverid );
112
120
@@ -316,14 +324,20 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
316
324
* is changed will be closed and re-made later.
317
325
*
318
326
* By default, all the connections to any foreign servers are kept open.
327
+ *
328
+ * Also determine whether to commit (sub)transactions opened on the remote
329
+ * server in parallel at (sub)transaction end.
319
330
*/
320
331
entry -> keep_connections = true;
332
+ entry -> parallel_commit = false;
321
333
foreach (lc , server -> options )
322
334
{
323
335
DefElem * def = (DefElem * ) lfirst (lc );
324
336
325
337
if (strcmp (def -> defname , "keep_connections" ) == 0 )
326
338
entry -> keep_connections = defGetBoolean (def );
339
+ else if (strcmp (def -> defname , "parallel_commit" ) == 0 )
340
+ entry -> parallel_commit = defGetBoolean (def );
327
341
}
328
342
329
343
/* Now try to make the connection */
@@ -623,10 +637,30 @@ configure_remote_session(PGconn *conn)
623
637
void
624
638
do_sql_command (PGconn * conn , const char * sql )
625
639
{
626
- PGresult * res ;
640
+ do_sql_command_begin (conn , sql );
641
+ do_sql_command_end (conn , sql , false);
642
+ }
627
643
644
+ static void
645
+ do_sql_command_begin (PGconn * conn , const char * sql )
646
+ {
628
647
if (!PQsendQuery (conn , sql ))
629
648
pgfdw_report_error (ERROR , NULL , conn , false, sql );
649
+ }
650
+
651
+ static void
652
+ do_sql_command_end (PGconn * conn , const char * sql , bool consume_input )
653
+ {
654
+ PGresult * res ;
655
+
656
+ /*
657
+ * If requested, consume whatever data is available from the socket.
658
+ * (Note that if all data is available, this allows pgfdw_get_result to
659
+ * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
660
+ * which would be large compared to the overhead of PQconsumeInput.)
661
+ */
662
+ if (consume_input && !PQconsumeInput (conn ))
663
+ pgfdw_report_error (ERROR , NULL , conn , false, sql );
630
664
res = pgfdw_get_result (conn , sql );
631
665
if (PQresultStatus (res ) != PGRES_COMMAND_OK )
632
666
pgfdw_report_error (ERROR , res , conn , true, sql );
@@ -888,6 +922,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
888
922
{
889
923
HASH_SEQ_STATUS scan ;
890
924
ConnCacheEntry * entry ;
925
+ List * pending_entries = NIL ;
891
926
892
927
/* Quick exit if no connections were touched in this transaction. */
893
928
if (!xact_got_connection )
@@ -925,6 +960,12 @@ pgfdw_xact_callback(XactEvent event, void *arg)
925
960
926
961
/* Commit all remote transactions during pre-commit */
927
962
entry -> changing_xact_state = true;
963
+ if (entry -> parallel_commit )
964
+ {
965
+ do_sql_command_begin (entry -> conn , "COMMIT TRANSACTION" );
966
+ pending_entries = lappend (pending_entries , entry );
967
+ continue ;
968
+ }
928
969
do_sql_command (entry -> conn , "COMMIT TRANSACTION" );
929
970
entry -> changing_xact_state = false;
930
971
@@ -981,23 +1022,15 @@ pgfdw_xact_callback(XactEvent event, void *arg)
981
1022
}
982
1023
983
1024
/* Reset state to show we're out of a transaction */
984
- entry -> xact_depth = 0 ;
1025
+ pgfdw_reset_xact_state (entry , true);
1026
+ }
985
1027
986
- /*
987
- * If the connection isn't in a good idle state, it is marked as
988
- * invalid or keep_connections option of its server is disabled, then
989
- * discard it to recover. Next GetConnection will open a new
990
- * connection.
991
- */
992
- if (PQstatus (entry -> conn ) != CONNECTION_OK ||
993
- PQtransactionStatus (entry -> conn ) != PQTRANS_IDLE ||
994
- entry -> changing_xact_state ||
995
- entry -> invalidated ||
996
- !entry -> keep_connections )
997
- {
998
- elog (DEBUG3 , "discarding connection %p" , entry -> conn );
999
- disconnect_pg_server (entry );
1000
- }
1028
+ /* If there are any pending connections, finish cleaning them up */
1029
+ if (pending_entries )
1030
+ {
1031
+ Assert (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1032
+ event == XACT_EVENT_PRE_COMMIT );
1033
+ pgfdw_finish_pre_commit_cleanup (pending_entries );
1001
1034
}
1002
1035
1003
1036
/*
@@ -1021,6 +1054,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1021
1054
HASH_SEQ_STATUS scan ;
1022
1055
ConnCacheEntry * entry ;
1023
1056
int curlevel ;
1057
+ List * pending_entries = NIL ;
1024
1058
1025
1059
/* Nothing to do at subxact start, nor after commit. */
1026
1060
if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
@@ -1063,6 +1097,12 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1063
1097
/* Commit all remote subtransactions during pre-commit */
1064
1098
snprintf (sql , sizeof (sql ), "RELEASE SAVEPOINT s%d" , curlevel );
1065
1099
entry -> changing_xact_state = true;
1100
+ if (entry -> parallel_commit )
1101
+ {
1102
+ do_sql_command_begin (entry -> conn , sql );
1103
+ pending_entries = lappend (pending_entries , entry );
1104
+ continue ;
1105
+ }
1066
1106
do_sql_command (entry -> conn , sql );
1067
1107
entry -> changing_xact_state = false;
1068
1108
}
@@ -1076,7 +1116,14 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1076
1116
}
1077
1117
1078
1118
/* OK, we're outta that level of subtransaction */
1079
- entry -> xact_depth -- ;
1119
+ pgfdw_reset_xact_state (entry , false);
1120
+ }
1121
+
1122
+ /* If there are any pending connections, finish cleaning them up */
1123
+ if (pending_entries )
1124
+ {
1125
+ Assert (event == SUBXACT_EVENT_PRE_COMMIT_SUB );
1126
+ pgfdw_finish_pre_subcommit_cleanup (pending_entries , curlevel );
1080
1127
}
1081
1128
}
1082
1129
@@ -1169,6 +1216,40 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
1169
1216
server -> servername )));
1170
1217
}
1171
1218
1219
+ /*
1220
+ * Reset state to show we're out of a (sub)transaction.
1221
+ */
1222
+ static void
1223
+ pgfdw_reset_xact_state (ConnCacheEntry * entry , bool toplevel )
1224
+ {
1225
+ if (toplevel )
1226
+ {
1227
+ /* Reset state to show we're out of a transaction */
1228
+ entry -> xact_depth = 0 ;
1229
+
1230
+ /*
1231
+ * If the connection isn't in a good idle state, it is marked as
1232
+ * invalid or keep_connections option of its server is disabled, then
1233
+ * discard it to recover. Next GetConnection will open a new
1234
+ * connection.
1235
+ */
1236
+ if (PQstatus (entry -> conn ) != CONNECTION_OK ||
1237
+ PQtransactionStatus (entry -> conn ) != PQTRANS_IDLE ||
1238
+ entry -> changing_xact_state ||
1239
+ entry -> invalidated ||
1240
+ !entry -> keep_connections )
1241
+ {
1242
+ elog (DEBUG3 , "discarding connection %p" , entry -> conn );
1243
+ disconnect_pg_server (entry );
1244
+ }
1245
+ }
1246
+ else
1247
+ {
1248
+ /* Reset state to show we're out of a subtransaction */
1249
+ entry -> xact_depth -- ;
1250
+ }
1251
+ }
1252
+
1172
1253
/*
1173
1254
* Cancel the currently-in-progress query (whose query text we do not have)
1174
1255
* and ignore the result. Returns true if we successfully cancel the query
@@ -1456,6 +1537,112 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel)
1456
1537
entry -> changing_xact_state = false;
1457
1538
}
1458
1539
1540
+ /*
1541
+ * Finish pre-commit cleanup of connections on each of which we've sent a
1542
+ * COMMIT command to the remote server.
1543
+ */
1544
+ static void
1545
+ pgfdw_finish_pre_commit_cleanup (List * pending_entries )
1546
+ {
1547
+ ConnCacheEntry * entry ;
1548
+ List * pending_deallocs = NIL ;
1549
+ ListCell * lc ;
1550
+
1551
+ Assert (pending_entries );
1552
+
1553
+ /*
1554
+ * Get the result of the COMMIT command for each of the pending entries
1555
+ */
1556
+ foreach (lc , pending_entries )
1557
+ {
1558
+ entry = (ConnCacheEntry * ) lfirst (lc );
1559
+
1560
+ Assert (entry -> changing_xact_state );
1561
+ /*
1562
+ * We might already have received the result on the socket, so pass
1563
+ * consume_input=true to try to consume it first
1564
+ */
1565
+ do_sql_command_end (entry -> conn , "COMMIT TRANSACTION" , true);
1566
+ entry -> changing_xact_state = false;
1567
+
1568
+ /* Do a DEALLOCATE ALL in parallel if needed */
1569
+ if (entry -> have_prep_stmt && entry -> have_error )
1570
+ {
1571
+ /* Ignore errors (see notes in pgfdw_xact_callback) */
1572
+ if (PQsendQuery (entry -> conn , "DEALLOCATE ALL" ))
1573
+ {
1574
+ pending_deallocs = lappend (pending_deallocs , entry );
1575
+ continue ;
1576
+ }
1577
+ }
1578
+ entry -> have_prep_stmt = false;
1579
+ entry -> have_error = false;
1580
+
1581
+ pgfdw_reset_xact_state (entry , true);
1582
+ }
1583
+
1584
+ /* No further work if no pending entries */
1585
+ if (!pending_deallocs )
1586
+ return ;
1587
+
1588
+ /*
1589
+ * Get the result of the DEALLOCATE command for each of the pending
1590
+ * entries
1591
+ */
1592
+ foreach (lc , pending_deallocs )
1593
+ {
1594
+ PGresult * res ;
1595
+
1596
+ entry = (ConnCacheEntry * ) lfirst (lc );
1597
+
1598
+ /* Ignore errors (see notes in pgfdw_xact_callback) */
1599
+ while ((res = PQgetResult (entry -> conn )) != NULL )
1600
+ {
1601
+ PQclear (res );
1602
+ /* Stop if the connection is lost (else we'll loop infinitely) */
1603
+ if (PQstatus (entry -> conn ) == CONNECTION_BAD )
1604
+ break ;
1605
+ }
1606
+ entry -> have_prep_stmt = false;
1607
+ entry -> have_error = false;
1608
+
1609
+ pgfdw_reset_xact_state (entry , true);
1610
+ }
1611
+ }
1612
+
1613
+ /*
1614
+ * Finish pre-subcommit cleanup of connections on each of which we've sent a
1615
+ * RELEASE command to the remote server.
1616
+ */
1617
+ static void
1618
+ pgfdw_finish_pre_subcommit_cleanup (List * pending_entries , int curlevel )
1619
+ {
1620
+ ConnCacheEntry * entry ;
1621
+ char sql [100 ];
1622
+ ListCell * lc ;
1623
+
1624
+ Assert (pending_entries );
1625
+
1626
+ /*
1627
+ * Get the result of the RELEASE command for each of the pending entries
1628
+ */
1629
+ snprintf (sql , sizeof (sql ), "RELEASE SAVEPOINT s%d" , curlevel );
1630
+ foreach (lc , pending_entries )
1631
+ {
1632
+ entry = (ConnCacheEntry * ) lfirst (lc );
1633
+
1634
+ Assert (entry -> changing_xact_state );
1635
+ /*
1636
+ * We might already have received the result on the socket, so pass
1637
+ * consume_input=true to try to consume it first
1638
+ */
1639
+ do_sql_command_end (entry -> conn , sql , true);
1640
+ entry -> changing_xact_state = false;
1641
+
1642
+ pgfdw_reset_xact_state (entry , false);
1643
+ }
1644
+ }
1645
+
1459
1646
/*
1460
1647
* List active foreign server connections.
1461
1648
*
0 commit comments