Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4f29946

Browse filesBrowse files
committed
worker_spi: Expand set of options to start workers
A couple of new options are added to this module to provide more control on the ways bgworkers are started: - A new GUC called worker_spi.role to control which role to use by default when starting a worker. - worker_spi_launch() gains three arguments: a role OID, a database OID and flags (currently only BGWORKER_BYPASS_ALLOWCONN). By default, the role OID and the database OID are InvalidOid, in which case the worker would use the related GUCs. Workers loaded by shared_preload_libraries use the default values provided by the GUCs, with flags at 0. The options are given to the main bgworker routine through bgw_extra. A test case is tweaked to start two dynamic workers with databases and roles defined by the caller of worker_spi_launch(). These additions will have the advantage of expanding the tests for bgworkers, for at least two cases: - BGWORKER_BYPASS_ALLOWCONN has no coverage in the core tree. - A new bgworker flag is under discussion, and this eases the integration of new tests. Reviewed-by: Bertrand Drouvot Discussion: https://postgr.es/m/bcc36259-7850-4882-97ef-d6b905d2fc51@gmail.com
1 parent c789f0f commit 4f29946
Copy full SHA for 4f29946

File tree

3 files changed

+114
-11
lines changed
Filter options

3 files changed

+114
-11
lines changed

‎src/test/modules/worker_spi/t/001_worker_spi.pl

Copy file name to clipboardExpand all lines: src/test/modules/worker_spi/t/001_worker_spi.pl
+20-9Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
# This consists in making sure that a table name "counted" is created
2121
# on a new schema whose name includes the index defined in input argument
2222
# of worker_spi_launch().
23-
# By default, dynamic bgworkers connect to the "postgres" database.
23+
# By default, dynamic bgworkers connect to the "postgres" database with
24+
# an undefined role, falling back to the GUC defaults (or InvalidOid for
25+
# worker_spi_launch).
2426
my $result =
2527
$node->safe_psql('postgres', 'SELECT worker_spi_launch(4) IS NOT NULL;');
2628
is($result, 't', "dynamic bgworker launched");
@@ -44,8 +46,7 @@
4446
'postgres',
4547
qq[SELECT wait_event FROM pg_stat_activity WHERE backend_type ~ 'worker_spi';],
4648
qq[WorkerSpiMain]);
47-
is($result, 1,
48-
'dynamic bgworker has reported "WorkerSpiMain" as wait event');
49+
is($result, 1, 'dynamic bgworker has reported "WorkerSpiMain" as wait event');
4950

5051
# Check the wait event used by the dynamic bgworker appears in pg_wait_events
5152
$result = $node->safe_psql('postgres',
@@ -58,6 +59,7 @@
5859
# Create the database first so as the workers can connect to it when
5960
# the library is loaded.
6061
$node->safe_psql('postgres', q(CREATE DATABASE mydb;));
62+
$node->safe_psql('postgres', q(CREATE ROLE myrole SUPERUSER LOGIN;));
6163
$node->safe_psql('mydb', 'CREATE EXTENSION worker_spi;');
6264

6365
# Now load the module as a shared library.
@@ -80,16 +82,25 @@
8082

8183
# Ask worker_spi to launch dynamic bgworkers with the library loaded, then
8284
# check their existence. Use IDs that do not overlap with the schemas created
83-
# by the previous workers.
84-
my $worker1_pid = $node->safe_psql('mydb', 'SELECT worker_spi_launch(10);');
85-
my $worker2_pid = $node->safe_psql('mydb', 'SELECT worker_spi_launch(11);');
85+
# by the previous workers. These ones use a new role, on different databases.
86+
my $myrole_id = $node->safe_psql('mydb',
87+
"SELECT oid FROM pg_roles where rolname = 'myrole';");
88+
my $mydb_id = $node->safe_psql('mydb',
89+
"SELECT oid FROM pg_database where datname = 'mydb';");
90+
my $postgresdb_id = $node->safe_psql('mydb',
91+
"SELECT oid FROM pg_database where datname = 'postgres';");
92+
my $worker1_pid = $node->safe_psql('mydb',
93+
"SELECT worker_spi_launch(10, $mydb_id, $myrole_id);");
94+
my $worker2_pid = $node->safe_psql('mydb',
95+
"SELECT worker_spi_launch(11, $postgresdb_id, $myrole_id);");
8696

8797
ok( $node->poll_query_until(
8898
'mydb',
89-
qq[SELECT datname, count(datname), wait_event FROM pg_stat_activity
99+
qq[SELECT datname, usename, wait_event FROM pg_stat_activity
90100
WHERE backend_type = 'worker_spi dynamic' AND
91-
pid IN ($worker1_pid, $worker2_pid) GROUP BY datname, wait_event;],
92-
'mydb|2|WorkerSpiMain'),
101+
pid IN ($worker1_pid, $worker2_pid) ORDER BY datname;],
102+
qq[mydb|myrole|WorkerSpiMain
103+
postgres|myrole|WorkerSpiMain]),
93104
'dynamic bgworkers all launched'
94105
) or die "Timed out while waiting for dynamic bgworkers to be launched";
95106

‎src/test/modules/worker_spi/worker_spi--1.0.sql

Copy file name to clipboardExpand all lines: src/test/modules/worker_spi/worker_spi--1.0.sql
+5-1Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
44
\echo Use "CREATE EXTENSION worker_spi" to load this file. \quit
55

6-
CREATE FUNCTION worker_spi_launch(pg_catalog.int4)
6+
-- In the default case, dboid and roleoid fall back to their respective GUCs.
7+
CREATE FUNCTION worker_spi_launch(index int4,
8+
dboid oid DEFAULT 0,
9+
roleoid oid DEFAULT 0,
10+
flags text[] DEFAULT '{}')
711
RETURNS pg_catalog.int4 STRICT
812
AS 'MODULE_PATHNAME'
913
LANGUAGE C;

‎src/test/modules/worker_spi/worker_spi.c

Copy file name to clipboardExpand all lines: src/test/modules/worker_spi/worker_spi.c
+89-1Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@
3434

3535
/* these headers are used by this particular worker's code */
3636
#include "access/xact.h"
37+
#include "commands/dbcommands.h"
3738
#include "executor/spi.h"
3839
#include "fmgr.h"
3940
#include "lib/stringinfo.h"
4041
#include "pgstat.h"
42+
#include "utils/acl.h"
4143
#include "utils/builtins.h"
4244
#include "utils/snapmgr.h"
4345
#include "tcop/utility.h"
@@ -52,6 +54,7 @@ PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn();
5254
static int worker_spi_naptime = 10;
5355
static int worker_spi_total_workers = 2;
5456
static char *worker_spi_database = NULL;
57+
static char *worker_spi_role = NULL;
5558

5659
/* value cached, fetched from shared memory */
5760
static uint32 worker_spi_wait_event_main = 0;
@@ -138,12 +141,24 @@ worker_spi_main(Datum main_arg)
138141
worktable *table;
139142
StringInfoData buf;
140143
char name[20];
144+
Oid dboid;
145+
Oid roleoid;
146+
char *p;
147+
bits32 flags = 0;
141148

142149
table = palloc(sizeof(worktable));
143150
sprintf(name, "schema%d", index);
144151
table->schema = pstrdup(name);
145152
table->name = pstrdup("counted");
146153

154+
/* fetch database and role OIDs, these are set for a dynamic worker */
155+
p = MyBgworkerEntry->bgw_extra;
156+
memcpy(&dboid, p, sizeof(Oid));
157+
p += sizeof(Oid);
158+
memcpy(&roleoid, p, sizeof(Oid));
159+
p += sizeof(Oid);
160+
memcpy(&flags, p, sizeof(bits32));
161+
147162
/* Establish signal handlers before unblocking signals. */
148163
pqsignal(SIGHUP, SignalHandlerForConfigReload);
149164
pqsignal(SIGTERM, die);
@@ -152,7 +167,11 @@ worker_spi_main(Datum main_arg)
152167
BackgroundWorkerUnblockSignals();
153168

154169
/* Connect to our database */
155-
BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
170+
if (OidIsValid(dboid))
171+
BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
172+
else
173+
BackgroundWorkerInitializeConnection(worker_spi_database,
174+
worker_spi_role, flags);
156175

157176
elog(LOG, "%s initialized with %s.%s",
158177
MyBgworkerEntry->bgw_name, table->schema, table->name);
@@ -316,6 +335,15 @@ _PG_init(void)
316335
0,
317336
NULL, NULL, NULL);
318337

338+
DefineCustomStringVariable("worker_spi.role",
339+
"Role to connect with.",
340+
NULL,
341+
&worker_spi_role,
342+
NULL,
343+
PGC_SIGHUP,
344+
0,
345+
NULL, NULL, NULL);
346+
319347
if (!process_shared_preload_libraries_in_progress)
320348
return;
321349

@@ -346,6 +374,10 @@ _PG_init(void)
346374

347375
/*
348376
* Now fill in worker-specific data, and do the actual registrations.
377+
*
378+
* bgw_extra can optionally include a dabatase OID, a role OID and a set
379+
* of flags. This is left empty here to fallback to the related GUCs at
380+
* startup (0 for the bgworker flags).
349381
*/
350382
for (int i = 1; i <= worker_spi_total_workers; i++)
351383
{
@@ -364,10 +396,18 @@ Datum
364396
worker_spi_launch(PG_FUNCTION_ARGS)
365397
{
366398
int32 i = PG_GETARG_INT32(0);
399+
Oid dboid = PG_GETARG_OID(1);
400+
Oid roleoid = PG_GETARG_OID(2);
367401
BackgroundWorker worker;
368402
BackgroundWorkerHandle *handle;
369403
BgwHandleStatus status;
370404
pid_t pid;
405+
char *p;
406+
bits32 flags = 0;
407+
ArrayType *arr = PG_GETARG_ARRAYTYPE_P(3);
408+
Size ndim;
409+
int nelems;
410+
Datum *datum_flags;
371411

372412
memset(&worker, 0, sizeof(worker));
373413
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
@@ -382,6 +422,54 @@ worker_spi_launch(PG_FUNCTION_ARGS)
382422
/* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
383423
worker.bgw_notify_pid = MyProcPid;
384424

425+
/* extract flags, if any */
426+
ndim = ARR_NDIM(arr);
427+
if (ndim > 1)
428+
ereport(ERROR,
429+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
430+
errmsg("flags array must be one-dimensional")));
431+
432+
if (array_contains_nulls(arr))
433+
ereport(ERROR,
434+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
435+
errmsg("flags array must not contain nulls")));
436+
437+
Assert(ARR_ELEMTYPE(arr) == TEXTOID);
438+
deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
439+
440+
for (i = 0; i < nelems; i++)
441+
{
442+
char *optname = TextDatumGetCString(datum_flags[i]);
443+
444+
if (strcmp(optname, "ALLOWCONN") == 0)
445+
flags |= BGWORKER_BYPASS_ALLOWCONN;
446+
else
447+
ereport(ERROR,
448+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
449+
errmsg("incorrect flag value found in array")));
450+
}
451+
452+
/*
453+
* Register database and role to use for the worker started in bgw_extra.
454+
* If none have been provided, this will fall back to the GUCs at startup.
455+
*/
456+
if (!OidIsValid(dboid))
457+
dboid = get_database_oid(worker_spi_database, false);
458+
459+
/*
460+
* worker_spi_role is NULL by default, so this gives to worker_spi_main()
461+
* an invalid OID in this case.
462+
*/
463+
if (!OidIsValid(roleoid) && worker_spi_role)
464+
roleoid = get_role_oid(worker_spi_role, false);
465+
466+
p = worker.bgw_extra;
467+
memcpy(p, &dboid, sizeof(Oid));
468+
p += sizeof(Oid);
469+
memcpy(p, &roleoid, sizeof(Oid));
470+
p += sizeof(Oid);
471+
memcpy(p, &flags, sizeof(bits32));
472+
385473
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
386474
PG_RETURN_NULL();
387475

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.