Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 11c2d6f

Browse filesBrowse files
committed
Parallel Hash Full Join.
Full and right outer joins were not supported in the initial implementation of Parallel Hash Join because of deadlock hazards (see discussion). Therefore FULL JOIN inhibited parallelism, as the other join strategies can't do that in parallel either. Add a new PHJ phase PHJ_BATCH_SCAN that scans for unmatched tuples on the inner side of one batch's hash table. For now, sidestep the deadlock problem by terminating parallelism there. The last process to arrive at that phase emits the unmatched tuples, while others detach and are free to go and work on other batches, if there are any, but otherwise they finish the join early. That unfairness is considered acceptable for now, because it's better than no parallelism at all. The build and probe phases are run in parallel, and the new scan-for-unmatched phase, while serial, is usually applied to the smaller of the two relations and is either limited by some multiple of work_mem, or it's too big and is partitioned into batches and then the situation is improved by batch-level parallelism. Author: Melanie Plageman <melanieplageman@gmail.com> Author: Thomas Munro <thomas.munro@gmail.com> Reviewed-by: Thomas Munro <thomas.munro@gmail.com> Discussion: https://postgr.es/m/CA%2BhUKG%2BA6ftXPz4oe92%2Bx8Er%2BxpGZqto70-Q_ERwRaSyA%3DafNg%40mail.gmail.com
1 parent ca7b3c4 commit 11c2d6f
Copy full SHA for 11c2d6f

File tree

7 files changed

+323
-48
lines changed
Filter options

7 files changed

+323
-48
lines changed

‎src/backend/executor/nodeHash.c

Copy file name to clipboardExpand all lines: src/backend/executor/nodeHash.c
+170-5
Original file line numberDiff line numberDiff line change
@@ -2071,6 +2071,69 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
20712071
hjstate->hj_CurTuple = NULL;
20722072
}
20732073

2074+
/*
2075+
* Decide if this process is allowed to run the unmatched scan. If so, the
2076+
* batch barrier is advanced to PHJ_BATCH_SCAN and true is returned.
2077+
* Otherwise the batch is detached and false is returned.
2078+
*/
2079+
bool
2080+
ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate)
2081+
{
2082+
HashJoinTable hashtable = hjstate->hj_HashTable;
2083+
int curbatch = hashtable->curbatch;
2084+
ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
2085+
2086+
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE);
2087+
2088+
/*
2089+
* It would not be deadlock-free to wait on the batch barrier, because it
2090+
* is in PHJ_BATCH_PROBE phase, and thus processes attached to it have
2091+
* already emitted tuples. Therefore, we'll hold a wait-free election:
2092+
* only one process can continue to the next phase, and all others detach
2093+
* from this batch. They can still go any work on other batches, if there
2094+
* are any.
2095+
*/
2096+
if (!BarrierArriveAndDetachExceptLast(&batch->batch_barrier))
2097+
{
2098+
/* This process considers the batch to be done. */
2099+
hashtable->batches[hashtable->curbatch].done = true;
2100+
2101+
/* Make sure any temporary files are closed. */
2102+
sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
2103+
sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
2104+
2105+
/*
2106+
* Track largest batch we've seen, which would normally happen in
2107+
* ExecHashTableDetachBatch().
2108+
*/
2109+
hashtable->spacePeak =
2110+
Max(hashtable->spacePeak,
2111+
batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets);
2112+
hashtable->curbatch = -1;
2113+
return false;
2114+
}
2115+
2116+
/* Now we are alone with this batch. */
2117+
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
2118+
Assert(BarrierParticipants(&batch->batch_barrier) == 1);
2119+
2120+
/*
2121+
* Has another process decided to give up early and command all processes
2122+
* to skip the unmatched scan?
2123+
*/
2124+
if (batch->skip_unmatched)
2125+
{
2126+
hashtable->batches[hashtable->curbatch].done = true;
2127+
ExecHashTableDetachBatch(hashtable);
2128+
return false;
2129+
}
2130+
2131+
/* Now prepare the process local state, just as for non-parallel join. */
2132+
ExecPrepHashTableForUnmatched(hjstate);
2133+
2134+
return true;
2135+
}
2136+
20742137
/*
20752138
* ExecScanHashTableForUnmatched
20762139
* scan the hash table for unmatched inner tuples
@@ -2145,6 +2208,72 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
21452208
return false;
21462209
}
21472210

2211+
/*
2212+
* ExecParallelScanHashTableForUnmatched
2213+
* scan the hash table for unmatched inner tuples, in parallel join
2214+
*
2215+
* On success, the inner tuple is stored into hjstate->hj_CurTuple and
2216+
* econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
2217+
* for the latter.
2218+
*/
2219+
bool
2220+
ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate,
2221+
ExprContext *econtext)
2222+
{
2223+
HashJoinTable hashtable = hjstate->hj_HashTable;
2224+
HashJoinTuple hashTuple = hjstate->hj_CurTuple;
2225+
2226+
for (;;)
2227+
{
2228+
/*
2229+
* hj_CurTuple is the address of the tuple last returned from the
2230+
* current bucket, or NULL if it's time to start scanning a new
2231+
* bucket.
2232+
*/
2233+
if (hashTuple != NULL)
2234+
hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2235+
else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
2236+
hashTuple = ExecParallelHashFirstTuple(hashtable,
2237+
hjstate->hj_CurBucketNo++);
2238+
else
2239+
break; /* finished all buckets */
2240+
2241+
while (hashTuple != NULL)
2242+
{
2243+
if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
2244+
{
2245+
TupleTableSlot *inntuple;
2246+
2247+
/* insert hashtable's tuple into exec slot */
2248+
inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
2249+
hjstate->hj_HashTupleSlot,
2250+
false); /* do not pfree */
2251+
econtext->ecxt_innertuple = inntuple;
2252+
2253+
/*
2254+
* Reset temp memory each time; although this function doesn't
2255+
* do any qual eval, the caller will, so let's keep it
2256+
* parallel to ExecScanHashBucket.
2257+
*/
2258+
ResetExprContext(econtext);
2259+
2260+
hjstate->hj_CurTuple = hashTuple;
2261+
return true;
2262+
}
2263+
2264+
hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
2265+
}
2266+
2267+
/* allow this loop to be cancellable */
2268+
CHECK_FOR_INTERRUPTS();
2269+
}
2270+
2271+
/*
2272+
* no more unmatched tuples
2273+
*/
2274+
return false;
2275+
}
2276+
21482277
/*
21492278
* ExecHashTableReset
21502279
*
@@ -3088,6 +3217,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
30883217
accessor->shared = shared;
30893218
accessor->preallocated = 0;
30903219
accessor->done = false;
3220+
accessor->outer_eof = false;
30913221
accessor->inner_tuples =
30923222
sts_attach(ParallelHashJoinBatchInner(shared),
30933223
ParallelWorkerNumber + 1,
@@ -3133,18 +3263,53 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
31333263
{
31343264
int curbatch = hashtable->curbatch;
31353265
ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
3266+
bool attached = true;
31363267

31373268
/* Make sure any temporary files are closed. */
31383269
sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
31393270
sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
31403271

3141-
/* Detach from the batch we were last working on. */
3142-
if (BarrierArriveAndDetach(&batch->batch_barrier))
3272+
/* After attaching we always get at least to PHJ_BATCH_PROBE. */
3273+
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE ||
3274+
BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN);
3275+
3276+
/*
3277+
* If we're abandoning the PHJ_BATCH_PROBE phase early without having
3278+
* reached the end of it, it means the plan doesn't want any more
3279+
* tuples, and it is happy to abandon any tuples buffered in this
3280+
* process's subplans. For correctness, we can't allow any process to
3281+
* execute the PHJ_BATCH_SCAN phase, because we will never have the
3282+
* complete set of match bits. Therefore we skip emitting unmatched
3283+
* tuples in all backends (if this is a full/right join), as if those
3284+
* tuples were all due to be emitted by this process and it has
3285+
* abandoned them too.
3286+
*/
3287+
if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE &&
3288+
!hashtable->batches[curbatch].outer_eof)
3289+
{
3290+
/*
3291+
* This flag may be written to by multiple backends during
3292+
* PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN
3293+
* phase so requires no extra locking.
3294+
*/
3295+
batch->skip_unmatched = true;
3296+
}
3297+
3298+
/*
3299+
* Even if we aren't doing a full/right outer join, we'll step through
3300+
* the PHJ_BATCH_SCAN phase just to maintain the invariant that
3301+
* freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
3302+
*/
3303+
if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBE)
3304+
attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier);
3305+
if (attached && BarrierArriveAndDetach(&batch->batch_barrier))
31433306
{
31443307
/*
3145-
* Technically we shouldn't access the barrier because we're no
3146-
* longer attached, but since there is no way it's moving after
3147-
* this point it seems safe to make the following assertion.
3308+
* We are not longer attached to the batch barrier, but we're the
3309+
* process that was chosen to free resources and it's safe to
3310+
* assert the current phase. The ParallelHashJoinBatch can't go
3311+
* away underneath us while we are attached to the build barrier,
3312+
* making this access safe.
31483313
*/
31493314
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE);
31503315

‎src/backend/executor/nodeHashjoin.c

Copy file name to clipboardExpand all lines: src/backend/executor/nodeHashjoin.c
+54-27
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
* PHJ_BATCH_ALLOCATE* -- one allocates buckets
8787
* PHJ_BATCH_LOAD -- all load the hash table from disk
8888
* PHJ_BATCH_PROBE -- all probe
89+
* PHJ_BATCH_SCAN* -- one does full/right unmatched scan
8990
* PHJ_BATCH_FREE* -- one frees memory
9091
*
9192
* Batch 0 is a special case, because it starts out in phase
@@ -103,9 +104,10 @@
103104
* to a barrier, unless the barrier has reached a phase that means that no
104105
* process will wait on it again. We emit tuples while attached to the build
105106
* barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
106-
* PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_FREE
107-
* respectively without waiting, using BarrierArriveAndDetach(). The last to
108-
* detach receives a different return value so that it knows that it's safe to
107+
* PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
108+
* respectively without waiting, using BarrierArriveAndDetach() and
109+
* BarrierArriveAndDetachExceptLast() respectively. The last to detach
110+
* receives a different return value so that it knows that it's safe to
109111
* clean up. Any straggler process that attaches after that phase is reached
110112
* will see that it's too late to participate or access the relevant shared
111113
* memory objects.
@@ -393,8 +395,23 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
393395
if (HJ_FILL_INNER(node))
394396
{
395397
/* set up to scan for unmatched inner tuples */
396-
ExecPrepHashTableForUnmatched(node);
397-
node->hj_JoinState = HJ_FILL_INNER_TUPLES;
398+
if (parallel)
399+
{
400+
/*
401+
* Only one process is currently allow to handle
402+
* each batch's unmatched tuples, in a parallel
403+
* join.
404+
*/
405+
if (ExecParallelPrepHashTableForUnmatched(node))
406+
node->hj_JoinState = HJ_FILL_INNER_TUPLES;
407+
else
408+
node->hj_JoinState = HJ_NEED_NEW_BATCH;
409+
}
410+
else
411+
{
412+
ExecPrepHashTableForUnmatched(node);
413+
node->hj_JoinState = HJ_FILL_INNER_TUPLES;
414+
}
398415
}
399416
else
400417
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -487,25 +504,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
487504
{
488505
node->hj_MatchedOuter = true;
489506

490-
if (parallel)
491-
{
492-
/*
493-
* Full/right outer joins are currently not supported
494-
* for parallel joins, so we don't need to set the
495-
* match bit. Experiments show that it's worth
496-
* avoiding the shared memory traffic on large
497-
* systems.
498-
*/
499-
Assert(!HJ_FILL_INNER(node));
500-
}
501-
else
502-
{
503-
/*
504-
* This is really only needed if HJ_FILL_INNER(node),
505-
* but we'll avoid the branch and just set it always.
506-
*/
507+
508+
/*
509+
* This is really only needed if HJ_FILL_INNER(node), but
510+
* we'll avoid the branch and just set it always.
511+
*/
512+
if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
507513
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
508-
}
509514

510515
/* In an antijoin, we never return a matched tuple */
511516
if (node->js.jointype == JOIN_ANTI)
@@ -563,7 +568,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
563568
* so any unmatched inner tuples in the hashtable have to be
564569
* emitted before we continue to the next batch.
565570
*/
566-
if (!ExecScanHashTableForUnmatched(node, econtext))
571+
if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
572+
: ExecScanHashTableForUnmatched(node, econtext)))
567573
{
568574
/* no more unmatched tuples */
569575
node->hj_JoinState = HJ_NEED_NEW_BATCH;
@@ -966,6 +972,8 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
966972
}
967973

968974
/* End of this batch */
975+
hashtable->batches[curbatch].outer_eof = true;
976+
969977
return NULL;
970978
}
971979

@@ -1197,13 +1205,32 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
11971205
* hash table stays alive until everyone's finished
11981206
* probing it, but no participant is allowed to wait at
11991207
* this barrier again (or else a deadlock could occur).
1200-
* All attached participants must eventually call
1201-
* BarrierArriveAndDetach() so that the final phase
1202-
* PHJ_BATCH_FREE can be reached.
1208+
* All attached participants must eventually detach from
1209+
* the barrier and one worker must advance the phase so
1210+
* that the final phase is reached.
12031211
*/
12041212
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
12051213
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1214+
12061215
return true;
1216+
case PHJ_BATCH_SCAN:
1217+
1218+
/*
1219+
* In principle, we could help scan for unmatched tuples,
1220+
* since that phase is already underway (the thing we
1221+
* can't do under current deadlock-avoidance rules is wait
1222+
* for others to arrive at PHJ_BATCH_SCAN, because
1223+
* PHJ_BATCH_PROBE emits tuples, but in this case we just
1224+
* got here without waiting). That is not yet done. For
1225+
* now, we just detach and go around again. We have to
1226+
* use ExecHashTableDetachBatch() because there's a small
1227+
* chance we'll be the last to detach, and then we're
1228+
* responsible for freeing memory.
1229+
*/
1230+
ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1231+
hashtable->batches[batchno].done = true;
1232+
ExecHashTableDetachBatch(hashtable);
1233+
break;
12071234

12081235
case PHJ_BATCH_FREE:
12091236

‎src/backend/optimizer/path/joinpath.c

Copy file name to clipboardExpand all lines: src/backend/optimizer/path/joinpath.c
+6-8
Original file line numberDiff line numberDiff line change
@@ -2193,15 +2193,9 @@ hash_inner_and_outer(PlannerInfo *root,
21932193
* able to properly guarantee uniqueness. Similarly, we can't handle
21942194
* JOIN_FULL and JOIN_RIGHT, because they can produce false null
21952195
* extended rows. Also, the resulting path must not be parameterized.
2196-
* We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel
2197-
* Hash, since in that case we're back to a single hash table with a
2198-
* single set of match bits for each batch, but that will require
2199-
* figuring out a deadlock-free way to wait for the probe to finish.
22002196
*/
22012197
if (joinrel->consider_parallel &&
22022198
save_jointype != JOIN_UNIQUE_OUTER &&
2203-
save_jointype != JOIN_FULL &&
2204-
save_jointype != JOIN_RIGHT &&
22052199
outerrel->partial_pathlist != NIL &&
22062200
bms_is_empty(joinrel->lateral_relids))
22072201
{
@@ -2235,9 +2229,13 @@ hash_inner_and_outer(PlannerInfo *root,
22352229
* total inner path will also be parallel-safe, but if not, we'll
22362230
* have to search for the cheapest safe, unparameterized inner
22372231
* path. If doing JOIN_UNIQUE_INNER, we can't use any alternative
2238-
* inner path.
2232+
* inner path. If full or right join, we can't use parallelism
2233+
* (building the hash table in each backend) because no one
2234+
* process has all the match bits.
22392235
*/
2240-
if (cheapest_total_inner->parallel_safe)
2236+
if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT)
2237+
cheapest_safe_inner = NULL;
2238+
else if (cheapest_total_inner->parallel_safe)
22412239
cheapest_safe_inner = cheapest_total_inner;
22422240
else if (save_jointype != JOIN_UNIQUE_INNER)
22432241
cheapest_safe_inner =

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.