@@ -2071,6 +2071,69 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
2071
2071
hjstate -> hj_CurTuple = NULL ;
2072
2072
}
2073
2073
2074
+ /*
2075
+ * Decide if this process is allowed to run the unmatched scan. If so, the
2076
+ * batch barrier is advanced to PHJ_BATCH_SCAN and true is returned.
2077
+ * Otherwise the batch is detached and false is returned.
2078
+ */
2079
+ bool
2080
+ ExecParallelPrepHashTableForUnmatched (HashJoinState * hjstate )
2081
+ {
2082
+ HashJoinTable hashtable = hjstate -> hj_HashTable ;
2083
+ int curbatch = hashtable -> curbatch ;
2084
+ ParallelHashJoinBatch * batch = hashtable -> batches [curbatch ].shared ;
2085
+
2086
+ Assert (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_PROBE );
2087
+
2088
+ /*
2089
+ * It would not be deadlock-free to wait on the batch barrier, because it
2090
+ * is in PHJ_BATCH_PROBE phase, and thus processes attached to it have
2091
+ * already emitted tuples. Therefore, we'll hold a wait-free election:
2092
+ * only one process can continue to the next phase, and all others detach
2093
+ * from this batch. They can still go any work on other batches, if there
2094
+ * are any.
2095
+ */
2096
+ if (!BarrierArriveAndDetachExceptLast (& batch -> batch_barrier ))
2097
+ {
2098
+ /* This process considers the batch to be done. */
2099
+ hashtable -> batches [hashtable -> curbatch ].done = true;
2100
+
2101
+ /* Make sure any temporary files are closed. */
2102
+ sts_end_parallel_scan (hashtable -> batches [curbatch ].inner_tuples );
2103
+ sts_end_parallel_scan (hashtable -> batches [curbatch ].outer_tuples );
2104
+
2105
+ /*
2106
+ * Track largest batch we've seen, which would normally happen in
2107
+ * ExecHashTableDetachBatch().
2108
+ */
2109
+ hashtable -> spacePeak =
2110
+ Max (hashtable -> spacePeak ,
2111
+ batch -> size + sizeof (dsa_pointer_atomic ) * hashtable -> nbuckets );
2112
+ hashtable -> curbatch = -1 ;
2113
+ return false;
2114
+ }
2115
+
2116
+ /* Now we are alone with this batch. */
2117
+ Assert (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_SCAN );
2118
+ Assert (BarrierParticipants (& batch -> batch_barrier ) == 1 );
2119
+
2120
+ /*
2121
+ * Has another process decided to give up early and command all processes
2122
+ * to skip the unmatched scan?
2123
+ */
2124
+ if (batch -> skip_unmatched )
2125
+ {
2126
+ hashtable -> batches [hashtable -> curbatch ].done = true;
2127
+ ExecHashTableDetachBatch (hashtable );
2128
+ return false;
2129
+ }
2130
+
2131
+ /* Now prepare the process local state, just as for non-parallel join. */
2132
+ ExecPrepHashTableForUnmatched (hjstate );
2133
+
2134
+ return true;
2135
+ }
2136
+
2074
2137
/*
2075
2138
* ExecScanHashTableForUnmatched
2076
2139
* scan the hash table for unmatched inner tuples
@@ -2145,6 +2208,72 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
2145
2208
return false;
2146
2209
}
2147
2210
2211
+ /*
2212
+ * ExecParallelScanHashTableForUnmatched
2213
+ * scan the hash table for unmatched inner tuples, in parallel join
2214
+ *
2215
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
2216
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
2217
+ * for the latter.
2218
+ */
2219
+ bool
2220
+ ExecParallelScanHashTableForUnmatched (HashJoinState * hjstate ,
2221
+ ExprContext * econtext )
2222
+ {
2223
+ HashJoinTable hashtable = hjstate -> hj_HashTable ;
2224
+ HashJoinTuple hashTuple = hjstate -> hj_CurTuple ;
2225
+
2226
+ for (;;)
2227
+ {
2228
+ /*
2229
+ * hj_CurTuple is the address of the tuple last returned from the
2230
+ * current bucket, or NULL if it's time to start scanning a new
2231
+ * bucket.
2232
+ */
2233
+ if (hashTuple != NULL )
2234
+ hashTuple = ExecParallelHashNextTuple (hashtable , hashTuple );
2235
+ else if (hjstate -> hj_CurBucketNo < hashtable -> nbuckets )
2236
+ hashTuple = ExecParallelHashFirstTuple (hashtable ,
2237
+ hjstate -> hj_CurBucketNo ++ );
2238
+ else
2239
+ break ; /* finished all buckets */
2240
+
2241
+ while (hashTuple != NULL )
2242
+ {
2243
+ if (!HeapTupleHeaderHasMatch (HJTUPLE_MINTUPLE (hashTuple )))
2244
+ {
2245
+ TupleTableSlot * inntuple ;
2246
+
2247
+ /* insert hashtable's tuple into exec slot */
2248
+ inntuple = ExecStoreMinimalTuple (HJTUPLE_MINTUPLE (hashTuple ),
2249
+ hjstate -> hj_HashTupleSlot ,
2250
+ false); /* do not pfree */
2251
+ econtext -> ecxt_innertuple = inntuple ;
2252
+
2253
+ /*
2254
+ * Reset temp memory each time; although this function doesn't
2255
+ * do any qual eval, the caller will, so let's keep it
2256
+ * parallel to ExecScanHashBucket.
2257
+ */
2258
+ ResetExprContext (econtext );
2259
+
2260
+ hjstate -> hj_CurTuple = hashTuple ;
2261
+ return true;
2262
+ }
2263
+
2264
+ hashTuple = ExecParallelHashNextTuple (hashtable , hashTuple );
2265
+ }
2266
+
2267
+ /* allow this loop to be cancellable */
2268
+ CHECK_FOR_INTERRUPTS ();
2269
+ }
2270
+
2271
+ /*
2272
+ * no more unmatched tuples
2273
+ */
2274
+ return false;
2275
+ }
2276
+
2148
2277
/*
2149
2278
* ExecHashTableReset
2150
2279
*
@@ -3088,6 +3217,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
3088
3217
accessor -> shared = shared ;
3089
3218
accessor -> preallocated = 0 ;
3090
3219
accessor -> done = false;
3220
+ accessor -> outer_eof = false;
3091
3221
accessor -> inner_tuples =
3092
3222
sts_attach (ParallelHashJoinBatchInner (shared ),
3093
3223
ParallelWorkerNumber + 1 ,
@@ -3133,18 +3263,53 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
3133
3263
{
3134
3264
int curbatch = hashtable -> curbatch ;
3135
3265
ParallelHashJoinBatch * batch = hashtable -> batches [curbatch ].shared ;
3266
+ bool attached = true;
3136
3267
3137
3268
/* Make sure any temporary files are closed. */
3138
3269
sts_end_parallel_scan (hashtable -> batches [curbatch ].inner_tuples );
3139
3270
sts_end_parallel_scan (hashtable -> batches [curbatch ].outer_tuples );
3140
3271
3141
- /* Detach from the batch we were last working on. */
3142
- if (BarrierArriveAndDetach (& batch -> batch_barrier ))
3272
+ /* After attaching we always get at least to PHJ_BATCH_PROBE. */
3273
+ Assert (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_PROBE ||
3274
+ BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_SCAN );
3275
+
3276
+ /*
3277
+ * If we're abandoning the PHJ_BATCH_PROBE phase early without having
3278
+ * reached the end of it, it means the plan doesn't want any more
3279
+ * tuples, and it is happy to abandon any tuples buffered in this
3280
+ * process's subplans. For correctness, we can't allow any process to
3281
+ * execute the PHJ_BATCH_SCAN phase, because we will never have the
3282
+ * complete set of match bits. Therefore we skip emitting unmatched
3283
+ * tuples in all backends (if this is a full/right join), as if those
3284
+ * tuples were all due to be emitted by this process and it has
3285
+ * abandoned them too.
3286
+ */
3287
+ if (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_PROBE &&
3288
+ !hashtable -> batches [curbatch ].outer_eof )
3289
+ {
3290
+ /*
3291
+ * This flag may be written to by multiple backends during
3292
+ * PHJ_BATCH_PROBE phase, but will only be read in PHJ_BATCH_SCAN
3293
+ * phase so requires no extra locking.
3294
+ */
3295
+ batch -> skip_unmatched = true;
3296
+ }
3297
+
3298
+ /*
3299
+ * Even if we aren't doing a full/right outer join, we'll step through
3300
+ * the PHJ_BATCH_SCAN phase just to maintain the invariant that
3301
+ * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free.
3302
+ */
3303
+ if (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_PROBE )
3304
+ attached = BarrierArriveAndDetachExceptLast (& batch -> batch_barrier );
3305
+ if (attached && BarrierArriveAndDetach (& batch -> batch_barrier ))
3143
3306
{
3144
3307
/*
3145
- * Technically we shouldn't access the barrier because we're no
3146
- * longer attached, but since there is no way it's moving after
3147
- * this point it seems safe to make the following assertion.
3308
+ * We are not longer attached to the batch barrier, but we're the
3309
+ * process that was chosen to free resources and it's safe to
3310
+ * assert the current phase. The ParallelHashJoinBatch can't go
3311
+ * away underneath us while we are attached to the build barrier,
3312
+ * making this access safe.
3148
3313
*/
3149
3314
Assert (BarrierPhase (& batch -> batch_barrier ) == PHJ_BATCH_FREE );
3150
3315
0 commit comments