Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 04e706d

Browse filesBrowse files
author
Etsuro Fujita
committed
postgres_fdw: Add support for parallel commit.
postgres_fdw commits remote (sub)transactions opened on remote server(s) in a local (sub)transaction one by one when the local (sub)transaction commits. This patch allows it to commit the remote (sub)transactions in parallel to improve performance. This is enabled by the server option "parallel_commit". The default is false. Etsuro Fujita, reviewed by Fujii Masao and David Zhang. Discussion: http://postgr.es/m/CAPmGK17dAZCXvwnfpr1eTfknTGdt%3DhYTV9405Gt5SqPOX8K84w%40mail.gmail.com
1 parent cfb4e20 commit 04e706d
Copy full SHA for 04e706d

File tree

5 files changed

+376
-19
lines changed
Filter options

5 files changed

+376
-19
lines changed

‎contrib/postgres_fdw/connection.c

Copy file name to clipboardExpand all lines: contrib/postgres_fdw/connection.c
+205-18
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
5858
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
5959
bool have_error; /* have any subxacts aborted in this xact? */
6060
bool changing_xact_state; /* xact state change in process */
61+
bool parallel_commit; /* do we commit (sub)xacts in parallel? */
6162
bool invalidated; /* true if reconnect is pending */
6263
bool keep_connections; /* setting value of keep_connections
6364
* server option */
@@ -92,6 +93,9 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
9293
static void disconnect_pg_server(ConnCacheEntry *entry);
9394
static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
9495
static void configure_remote_session(PGconn *conn);
96+
static void do_sql_command_begin(PGconn *conn, const char *sql);
97+
static void do_sql_command_end(PGconn *conn, const char *sql,
98+
bool consume_input);
9599
static void begin_remote_xact(ConnCacheEntry *entry);
96100
static void pgfdw_xact_callback(XactEvent event, void *arg);
97101
static void pgfdw_subxact_callback(SubXactEvent event,
@@ -100,13 +104,17 @@ static void pgfdw_subxact_callback(SubXactEvent event,
100104
void *arg);
101105
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
102106
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
107+
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
103108
static bool pgfdw_cancel_query(PGconn *conn);
104109
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
105110
bool ignore_errors);
106111
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
107112
PGresult **result, bool *timed_out);
108113
static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql,
109114
bool toplevel);
115+
static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
116+
static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
117+
int curlevel);
110118
static bool UserMappingPasswordRequired(UserMapping *user);
111119
static bool disconnect_cached_connections(Oid serverid);
112120

@@ -316,14 +324,20 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
316324
* is changed will be closed and re-made later.
317325
*
318326
* By default, all the connections to any foreign servers are kept open.
327+
*
328+
* Also determine whether to commit (sub)transactions opened on the remote
329+
* server in parallel at (sub)transaction end.
319330
*/
320331
entry->keep_connections = true;
332+
entry->parallel_commit = false;
321333
foreach(lc, server->options)
322334
{
323335
DefElem *def = (DefElem *) lfirst(lc);
324336

325337
if (strcmp(def->defname, "keep_connections") == 0)
326338
entry->keep_connections = defGetBoolean(def);
339+
else if (strcmp(def->defname, "parallel_commit") == 0)
340+
entry->parallel_commit = defGetBoolean(def);
327341
}
328342

329343
/* Now try to make the connection */
@@ -623,10 +637,30 @@ configure_remote_session(PGconn *conn)
623637
void
624638
do_sql_command(PGconn *conn, const char *sql)
625639
{
626-
PGresult *res;
640+
do_sql_command_begin(conn, sql);
641+
do_sql_command_end(conn, sql, false);
642+
}
627643

644+
static void
645+
do_sql_command_begin(PGconn *conn, const char *sql)
646+
{
628647
if (!PQsendQuery(conn, sql))
629648
pgfdw_report_error(ERROR, NULL, conn, false, sql);
649+
}
650+
651+
static void
652+
do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
653+
{
654+
PGresult *res;
655+
656+
/*
657+
* If requested, consume whatever data is available from the socket.
658+
* (Note that if all data is available, this allows pgfdw_get_result to
659+
* call PQgetResult without forcing the overhead of WaitLatchOrSocket,
660+
* which would be large compared to the overhead of PQconsumeInput.)
661+
*/
662+
if (consume_input && !PQconsumeInput(conn))
663+
pgfdw_report_error(ERROR, NULL, conn, false, sql);
630664
res = pgfdw_get_result(conn, sql);
631665
if (PQresultStatus(res) != PGRES_COMMAND_OK)
632666
pgfdw_report_error(ERROR, res, conn, true, sql);
@@ -888,6 +922,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
888922
{
889923
HASH_SEQ_STATUS scan;
890924
ConnCacheEntry *entry;
925+
List *pending_entries = NIL;
891926

892927
/* Quick exit if no connections were touched in this transaction. */
893928
if (!xact_got_connection)
@@ -925,6 +960,12 @@ pgfdw_xact_callback(XactEvent event, void *arg)
925960

926961
/* Commit all remote transactions during pre-commit */
927962
entry->changing_xact_state = true;
963+
if (entry->parallel_commit)
964+
{
965+
do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
966+
pending_entries = lappend(pending_entries, entry);
967+
continue;
968+
}
928969
do_sql_command(entry->conn, "COMMIT TRANSACTION");
929970
entry->changing_xact_state = false;
930971

@@ -981,23 +1022,15 @@ pgfdw_xact_callback(XactEvent event, void *arg)
9811022
}
9821023

9831024
/* Reset state to show we're out of a transaction */
984-
entry->xact_depth = 0;
1025+
pgfdw_reset_xact_state(entry, true);
1026+
}
9851027

986-
/*
987-
* If the connection isn't in a good idle state, it is marked as
988-
* invalid or keep_connections option of its server is disabled, then
989-
* discard it to recover. Next GetConnection will open a new
990-
* connection.
991-
*/
992-
if (PQstatus(entry->conn) != CONNECTION_OK ||
993-
PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
994-
entry->changing_xact_state ||
995-
entry->invalidated ||
996-
!entry->keep_connections)
997-
{
998-
elog(DEBUG3, "discarding connection %p", entry->conn);
999-
disconnect_pg_server(entry);
1000-
}
1028+
/* If there are any pending connections, finish cleaning them up */
1029+
if (pending_entries)
1030+
{
1031+
Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1032+
event == XACT_EVENT_PRE_COMMIT);
1033+
pgfdw_finish_pre_commit_cleanup(pending_entries);
10011034
}
10021035

10031036
/*
@@ -1021,6 +1054,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
10211054
HASH_SEQ_STATUS scan;
10221055
ConnCacheEntry *entry;
10231056
int curlevel;
1057+
List *pending_entries = NIL;
10241058

10251059
/* Nothing to do at subxact start, nor after commit. */
10261060
if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
@@ -1063,6 +1097,12 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
10631097
/* Commit all remote subtransactions during pre-commit */
10641098
snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
10651099
entry->changing_xact_state = true;
1100+
if (entry->parallel_commit)
1101+
{
1102+
do_sql_command_begin(entry->conn, sql);
1103+
pending_entries = lappend(pending_entries, entry);
1104+
continue;
1105+
}
10661106
do_sql_command(entry->conn, sql);
10671107
entry->changing_xact_state = false;
10681108
}
@@ -1076,7 +1116,14 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
10761116
}
10771117

10781118
/* OK, we're outta that level of subtransaction */
1079-
entry->xact_depth--;
1119+
pgfdw_reset_xact_state(entry, false);
1120+
}
1121+
1122+
/* If there are any pending connections, finish cleaning them up */
1123+
if (pending_entries)
1124+
{
1125+
Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB);
1126+
pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
10801127
}
10811128
}
10821129

@@ -1169,6 +1216,40 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
11691216
server->servername)));
11701217
}
11711218

1219+
/*
1220+
* Reset state to show we're out of a (sub)transaction.
1221+
*/
1222+
static void
1223+
pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
1224+
{
1225+
if (toplevel)
1226+
{
1227+
/* Reset state to show we're out of a transaction */
1228+
entry->xact_depth = 0;
1229+
1230+
/*
1231+
* If the connection isn't in a good idle state, it is marked as
1232+
* invalid or keep_connections option of its server is disabled, then
1233+
* discard it to recover. Next GetConnection will open a new
1234+
* connection.
1235+
*/
1236+
if (PQstatus(entry->conn) != CONNECTION_OK ||
1237+
PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1238+
entry->changing_xact_state ||
1239+
entry->invalidated ||
1240+
!entry->keep_connections)
1241+
{
1242+
elog(DEBUG3, "discarding connection %p", entry->conn);
1243+
disconnect_pg_server(entry);
1244+
}
1245+
}
1246+
else
1247+
{
1248+
/* Reset state to show we're out of a subtransaction */
1249+
entry->xact_depth--;
1250+
}
1251+
}
1252+
11721253
/*
11731254
* Cancel the currently-in-progress query (whose query text we do not have)
11741255
* and ignore the result. Returns true if we successfully cancel the query
@@ -1456,6 +1537,112 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel)
14561537
entry->changing_xact_state = false;
14571538
}
14581539

1540+
/*
1541+
* Finish pre-commit cleanup of connections on each of which we've sent a
1542+
* COMMIT command to the remote server.
1543+
*/
1544+
static void
1545+
pgfdw_finish_pre_commit_cleanup(List *pending_entries)
1546+
{
1547+
ConnCacheEntry *entry;
1548+
List *pending_deallocs = NIL;
1549+
ListCell *lc;
1550+
1551+
Assert(pending_entries);
1552+
1553+
/*
1554+
* Get the result of the COMMIT command for each of the pending entries
1555+
*/
1556+
foreach(lc, pending_entries)
1557+
{
1558+
entry = (ConnCacheEntry *) lfirst(lc);
1559+
1560+
Assert(entry->changing_xact_state);
1561+
/*
1562+
* We might already have received the result on the socket, so pass
1563+
* consume_input=true to try to consume it first
1564+
*/
1565+
do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1566+
entry->changing_xact_state = false;
1567+
1568+
/* Do a DEALLOCATE ALL in parallel if needed */
1569+
if (entry->have_prep_stmt && entry->have_error)
1570+
{
1571+
/* Ignore errors (see notes in pgfdw_xact_callback) */
1572+
if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1573+
{
1574+
pending_deallocs = lappend(pending_deallocs, entry);
1575+
continue;
1576+
}
1577+
}
1578+
entry->have_prep_stmt = false;
1579+
entry->have_error = false;
1580+
1581+
pgfdw_reset_xact_state(entry, true);
1582+
}
1583+
1584+
/* No further work if no pending entries */
1585+
if (!pending_deallocs)
1586+
return;
1587+
1588+
/*
1589+
* Get the result of the DEALLOCATE command for each of the pending
1590+
* entries
1591+
*/
1592+
foreach(lc, pending_deallocs)
1593+
{
1594+
PGresult *res;
1595+
1596+
entry = (ConnCacheEntry *) lfirst(lc);
1597+
1598+
/* Ignore errors (see notes in pgfdw_xact_callback) */
1599+
while ((res = PQgetResult(entry->conn)) != NULL)
1600+
{
1601+
PQclear(res);
1602+
/* Stop if the connection is lost (else we'll loop infinitely) */
1603+
if (PQstatus(entry->conn) == CONNECTION_BAD)
1604+
break;
1605+
}
1606+
entry->have_prep_stmt = false;
1607+
entry->have_error = false;
1608+
1609+
pgfdw_reset_xact_state(entry, true);
1610+
}
1611+
}
1612+
1613+
/*
1614+
* Finish pre-subcommit cleanup of connections on each of which we've sent a
1615+
* RELEASE command to the remote server.
1616+
*/
1617+
static void
1618+
pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
1619+
{
1620+
ConnCacheEntry *entry;
1621+
char sql[100];
1622+
ListCell *lc;
1623+
1624+
Assert(pending_entries);
1625+
1626+
/*
1627+
* Get the result of the RELEASE command for each of the pending entries
1628+
*/
1629+
snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1630+
foreach(lc, pending_entries)
1631+
{
1632+
entry = (ConnCacheEntry *) lfirst(lc);
1633+
1634+
Assert(entry->changing_xact_state);
1635+
/*
1636+
* We might already have received the result on the socket, so pass
1637+
* consume_input=true to try to consume it first
1638+
*/
1639+
do_sql_command_end(entry->conn, sql, true);
1640+
entry->changing_xact_state = false;
1641+
1642+
pgfdw_reset_xact_state(entry, false);
1643+
}
1644+
}
1645+
14591646
/*
14601647
* List active foreign server connections.
14611648
*

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.