Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 418611c

Browse filesBrowse files
committed
Generalize parallel slot result handling.
Instead of having a hard-coded behavior that we ignore missing tables and report all other errors, let the caller decide what to do by setting a callback. Mark Dilger, reviewed and somewhat revised by me. The larger patch series of which this is a part has also had review from Peter Geoghegan, Andres Freund, Álvaro Herrera, Michael Paquier, and Amul Sul, but I don't know whether any of them have reviewed this bit specifically. Discussion: http://postgr.es/m/12ED3DA8-25F0-4B68-937D-D907CFBF08E7@enterprisedb.com Discussion: http://postgr.es/m/5F743835-3399-419C-8324-2D424237E999@enterprisedb.com Discussion: http://postgr.es/m/70655DF3-33CE-4527-9A4D-DDEB582B6BA0@enterprisedb.com
1 parent e955bd4 commit 418611c
Copy full SHA for 418611c

File tree

Expand file treeCollapse file tree

4 files changed

+94
-28
lines changed
Filter options
Expand file treeCollapse file tree

4 files changed

+94
-28
lines changed

‎src/bin/scripts/reindexdb.c

Copy file name to clipboardExpand all lines: src/bin/scripts/reindexdb.c
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,7 @@ reindex_one_database(const ConnParams *cparams, ReindexType type,
466466
goto finish;
467467
}
468468

469+
ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);
469470
run_reindex_command(free_slot->connection, process_type, objname,
470471
echo, verbose, concurrently, true);
471472

‎src/bin/scripts/vacuumdb.c

Copy file name to clipboardExpand all lines: src/bin/scripts/vacuumdb.c
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,7 @@ vacuum_one_database(const ConnParams *cparams,
713713
* Execute the vacuum. All errors are handled in processQueryResult
714714
* through ParallelSlotsGetIdle.
715715
*/
716+
ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);
716717
run_vacuum_command(free_slot->connection, sql.data,
717718
echo, tabname);
718719

‎src/fe_utils/parallel_slot.c

Copy file name to clipboardExpand all lines: src/fe_utils/parallel_slot.c
+63-28Lines changed: 63 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,42 +30,32 @@
3030

3131
static void init_slot(ParallelSlot *slot, PGconn *conn);
3232
static int select_loop(int maxFd, fd_set *workerset);
33-
static bool processQueryResult(PGconn *conn, PGresult *result);
33+
static bool processQueryResult(ParallelSlot *slot, PGresult *result);
3434

3535
static void
3636
init_slot(ParallelSlot *slot, PGconn *conn)
3737
{
3838
slot->connection = conn;
3939
/* Initially assume connection is idle */
4040
slot->isFree = true;
41+
ParallelSlotClearHandler(slot);
4142
}
4243

4344
/*
44-
* Process (and delete) a query result. Returns true if there's no error,
45-
* false otherwise -- but errors about trying to work on a missing relation
46-
* are reported and subsequently ignored.
45+
* Process (and delete) a query result. Returns true if there's no problem,
46+
* false otherwise. It's up to the handler to decide what cosntitutes a
47+
* problem.
4748
*/
4849
static bool
49-
processQueryResult(PGconn *conn, PGresult *result)
50+
processQueryResult(ParallelSlot *slot, PGresult *result)
5051
{
51-
/*
52-
* If it's an error, report it. Errors about a missing table are harmless
53-
* so we continue processing; but die for other errors.
54-
*/
55-
if (PQresultStatus(result) != PGRES_COMMAND_OK)
56-
{
57-
char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
52+
Assert(slot->handler != NULL);
5853

59-
pg_log_error("processing of database \"%s\" failed: %s",
60-
PQdb(conn), PQerrorMessage(conn));
61-
62-
if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
63-
{
64-
PQclear(result);
65-
return false;
66-
}
67-
}
54+
/* On failure, the handler should return NULL after freeing the result */
55+
if (!slot->handler(result, slot->connection, slot->handler_context))
56+
return false;
6857

58+
/* Ok, we have to free it ourself */
6959
PQclear(result);
7060
return true;
7161
}
@@ -76,15 +66,15 @@ processQueryResult(PGconn *conn, PGresult *result)
7666
* Note that this will block if the connection is busy.
7767
*/
7868
static bool
79-
consumeQueryResult(PGconn *conn)
69+
consumeQueryResult(ParallelSlot *slot)
8070
{
8171
bool ok = true;
8272
PGresult *result;
8373

84-
SetCancelConn(conn);
85-
while ((result = PQgetResult(conn)) != NULL)
74+
SetCancelConn(slot->connection);
75+
while ((result = PQgetResult(slot->connection)) != NULL)
8676
{
87-
if (!processQueryResult(conn, result))
77+
if (!processQueryResult(slot, result))
8878
ok = false;
8979
}
9080
ResetCancelConn();
@@ -227,14 +217,15 @@ ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
227217

228218
if (result != NULL)
229219
{
230-
/* Check and discard the command result */
231-
if (!processQueryResult(slots[i].connection, result))
220+
/* Handle and discard the command result */
221+
if (!processQueryResult(slots + i, result))
232222
return NULL;
233223
}
234224
else
235225
{
236226
/* This connection has become idle */
237227
slots[i].isFree = true;
228+
ParallelSlotClearHandler(slots + i);
238229
if (firstFree < 0)
239230
firstFree = i;
240231
break;
@@ -329,8 +320,52 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
329320

330321
for (i = 0; i < numslots; i++)
331322
{
332-
if (!consumeQueryResult((slots + i)->connection))
323+
if (!consumeQueryResult(slots + i))
324+
return false;
325+
}
326+
327+
return true;
328+
}
329+
330+
/*
331+
* TableCommandResultHandler
332+
*
333+
* ParallelSlotResultHandler for results of commands (not queries) against
334+
* tables.
335+
*
336+
* Requires that the result status is either PGRES_COMMAND_OK or an error about
337+
* a missing table. This is useful for utilities that compile a list of tables
338+
* to process and then run commands (vacuum, reindex, or whatever) against
339+
* those tables, as there is a race condition between the time the list is
340+
* compiled and the time the command attempts to open the table.
341+
*
342+
* For missing tables, logs an error but allows processing to continue.
343+
*
344+
* For all other errors, logs an error and terminates further processing.
345+
*
346+
* res: PGresult from the query executed on the slot's connection
347+
* conn: connection belonging to the slot
348+
* context: unused
349+
*/
350+
bool
351+
TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
352+
{
353+
/*
354+
* If it's an error, report it. Errors about a missing table are harmless
355+
* so we continue processing; but die for other errors.
356+
*/
357+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
358+
{
359+
char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
360+
361+
pg_log_error("processing of database \"%s\" failed: %s",
362+
PQdb(conn), PQerrorMessage(conn));
363+
364+
if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
365+
{
366+
PQclear(res);
333367
return false;
368+
}
334369
}
335370

336371
return true;

‎src/include/fe_utils/parallel_slot.h

Copy file name to clipboardExpand all lines: src/include/fe_utils/parallel_slot.h
+29Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,39 @@
1515
#include "fe_utils/connect_utils.h"
1616
#include "libpq-fe.h"
1717

18+
typedef bool (*ParallelSlotResultHandler) (PGresult *res, PGconn *conn,
19+
void *context);
20+
1821
typedef struct ParallelSlot
1922
{
2023
PGconn *connection; /* One connection */
2124
bool isFree; /* Is it known to be idle? */
25+
26+
/*
27+
* Prior to issuing a command or query on 'connection', a handler callback
28+
* function may optionally be registered to be invoked to process the
29+
* results, and context information may optionally be registered for use
30+
* by the handler. If unset, these fields should be NULL.
31+
*/
32+
ParallelSlotResultHandler handler;
33+
void *handler_context;
2234
} ParallelSlot;
2335

36+
static inline void
37+
ParallelSlotSetHandler(ParallelSlot *slot, ParallelSlotResultHandler handler,
38+
void *context)
39+
{
40+
slot->handler = handler;
41+
slot->handler_context = context;
42+
}
43+
44+
static inline void
45+
ParallelSlotClearHandler(ParallelSlot *slot)
46+
{
47+
slot->handler = NULL;
48+
slot->handler_context = NULL;
49+
}
50+
2451
extern ParallelSlot *ParallelSlotsGetIdle(ParallelSlot *slots, int numslots);
2552

2653
extern ParallelSlot *ParallelSlotsSetup(const ConnParams *cparams,
@@ -31,5 +58,7 @@ extern void ParallelSlotsTerminate(ParallelSlot *slots, int numslots);
3158

3259
extern bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots);
3360

61+
extern bool TableCommandResultHandler(PGresult *res, PGconn *conn,
62+
void *context);
3463

3564
#endif /* PARALLEL_SLOT_H */

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.