Skip to content

Commit

Permalink
Parallel DEDUP_SEMI and DEDUP_SEMI_REVERSE Join.
Browse files Browse the repository at this point in the history
To handle semi join in parallel, we have enabled Parallel Semi join.
This commit enable DEDUP_SEMI and DEDUP_SEMI_REVERSE join in parallel
to handle semi join cases which could be a win in MPP mode.

Both parallel-oblivious and parallel-aware are enabled, for an example
of parallel-aware case:

select * from foo where exists (select 1 from bar where foo.a = bar.b);
                                QUERY PLAN
---------------------------------------------------------------------------
 Gather Motion 6:1  (slice1; segments: 6)
   ->  HashAggregate
         Group Key: (RowIdExpr)
         ->  Redistribute Motion 6:6  (slice2; segments: 6)
               Hash Key: (RowIdExpr)
               Hash Module: 3
               ->  Parallel Hash Join
                     Hash Cond: (bar.b = foo.a)
                     ->  Parallel Seq Scan on bar
                     ->  Parallel Hash
                           ->  Broadcast Workers Motion 6:6  (slice3;
segments: 6)
                                 ->  Parallel Seq Scan on foo
 Optimizer: Postgres query optimizer
(13 rows)

For DEDUP_SEMI or DEDUP_SEMI_REVERSE join, each process need a unique
RowIdExpr to identify unique rows, which is assigned with a baseline
when building paths.

It's ok for non-parallel plan, but in parallel mode there are multiple
processes on same segment, RowIdExpr in not unique then.
To enable that, add ParallelWorkerNumberOfSlice to identify worker id
of a parallel plan of a slice.
When rowidexpr is used, it's executed by 48 bits and left other 16 bits
for segment_id. In parallel mode, we have to make more room for parallel
worker id within segment_id's bits. This is done during planner with
checks, in case that there are many segments with many parallel workers
(which rarely happens).

Authored-by: Zhang Mingli [email protected]
  • Loading branch information
avamingli committed Oct 1, 2024
1 parent 5f180c5 commit f32844c
Show file tree
Hide file tree
Showing 13 changed files with 376 additions and 14 deletions.
8 changes: 7 additions & 1 deletion src/backend/access/transam/parallel.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ typedef struct FixedParallelState
* worker will get a different parallel worker number.
*/
int ParallelWorkerNumber = -1;
int ParallelWorkerNumberOfSlice = -1;
int TotalParallelWorkerNumberOfSlice = 0;

/* Is there a parallel message pending which we need to receive? */
volatile bool ParallelMessagePending = false;
Expand Down Expand Up @@ -1746,6 +1748,8 @@ void GpDestroyParallelDSMEntry()
ParallelSession->area = NULL;
}
LWLockRelease(GpParallelDSMHashLock);
ParallelWorkerNumberOfSlice = -1;
TotalParallelWorkerNumberOfSlice = 0;
}

void
Expand Down Expand Up @@ -1835,6 +1839,8 @@ GpInsertParallelDSMHash(PlanState *planstate)
entry->tolaunch = parallel_workers - 1;
entry->parallel_workers = parallel_workers;
entry->temp_worker_id = 0;
ParallelWorkerNumberOfSlice = 0; /* The first worker. */
Assert(TotalParallelWorkerNumberOfSlice == parallel_workers);

/* Create a DSA area that can be used by the leader and all workers. */
char *area_space = shm_toc_allocate(entry->toc, dsa_minsize);
Expand Down Expand Up @@ -1894,7 +1900,7 @@ GpInsertParallelDSMHash(PlanState *planstate)
.nworkers = parallel_workers,
.worker_id = entry->temp_worker_id,
};

ParallelWorkerNumberOfSlice = ctx.worker_id;
InitializeGpParallelWorkers(planstate, &ctx);
}

Expand Down
111 changes: 107 additions & 4 deletions src/backend/cdb/cdbpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"

#include "port/pg_bitutils.h"

typedef struct
{
Expand Down Expand Up @@ -2998,6 +2999,8 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
CdbPathLocus_MakeNull(&inner.move_to);
outer.isouter = true;
inner.isouter = false;
int outerParallel = outer.locus.parallel_workers;
int innerParallel = inner.locus.parallel_workers;

Assert(cdbpathlocus_is_valid(outer.locus));
Assert(cdbpathlocus_is_valid(inner.locus));
Expand Down Expand Up @@ -3091,6 +3094,9 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
case JOIN_INNER:
break;
case JOIN_SEMI:
if (!enable_parallel_semi_join)
goto fail;
/* FALLTHROUGH */
case JOIN_ANTI:
case JOIN_LEFT:
case JOIN_LASJ_NOTIN:
Expand All @@ -3100,19 +3106,116 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
case JOIN_UNIQUE_INNER:
case JOIN_RIGHT:
case JOIN_FULL:
case JOIN_DEDUP_SEMI:
case JOIN_DEDUP_SEMI_REVERSE:
/* Join types are not supported in parallel yet. */
goto fail;
case JOIN_DEDUP_SEMI:
if (!enable_parallel_dedup_semi_join)
goto fail;

if (!CdbPathLocus_IsPartitioned(inner.locus))
goto fail;

if (CdbPathLocus_IsPartitioned(outer.locus) ||
CdbPathLocus_IsBottleneck(outer.locus))
{
/* ok */
}
else if (CdbPathLocus_IsGeneral(outer.locus))
{
CdbPathLocus_MakeSingleQE(&outer.locus,
CdbPathLocus_NumSegments(inner.locus));
outer.path->locus = outer.locus;
}
else if (CdbPathLocus_IsSegmentGeneral(outer.locus))
{
CdbPathLocus_MakeSingleQE(&outer.locus,
CdbPathLocus_CommonSegments(inner.locus,
outer.locus));
outer.path->locus = outer.locus;
}
else if (CdbPathLocus_IsSegmentGeneralWorkers(outer.locus))
{
/* CBDB_PARALLEL_FIXME: Consider gather from SegmentGeneralWorkers. */
goto fail;
}
else
goto fail;
inner.ok_to_replicate = false;

/*
* CBDB_PARALLEL:
* rowidexpr is executed by 48 bits of row counter of a 64 bit int.
* When in parallel mode, we need to compute the total bits of the
* left 16 bits for segments and parallel workers.
* The formula is:
* parallel_bits + seg_bits
* while segs is total primary segments.
* And keep some room to make sure there should not be
* duplicated rows when execution.
*/
if (outerParallel > 1)
{
int segs = getgpsegmentCount();
int parallel_bits = pg_leftmost_one_pos32(outerParallel) + 1;
int seg_bits = pg_leftmost_one_pos32(segs) + 1;
if (parallel_bits + seg_bits > 16)
goto fail;
}
outer.path = add_rowid_to_path(root, outer.path, p_rowidexpr_id);
*p_outer_path = outer.path;
break;

case JOIN_DEDUP_SEMI_REVERSE:
if (!enable_parallel_dedup_semi_reverse_join)
goto fail;
/* same as JOIN_DEDUP_SEMI, but with inner and outer reversed */
if (!CdbPathLocus_IsPartitioned(outer.locus))
goto fail;
if (CdbPathLocus_IsPartitioned(inner.locus) ||
CdbPathLocus_IsBottleneck(inner.locus))
{
/* ok */
}
else if (CdbPathLocus_IsGeneral(inner.locus))
{
CdbPathLocus_MakeSingleQE(&inner.locus,
CdbPathLocus_NumSegments(outer.locus));
inner.path->locus = inner.locus;
}
else if (CdbPathLocus_IsSegmentGeneral(inner.locus))
{
CdbPathLocus_MakeSingleQE(&inner.locus,
CdbPathLocus_CommonSegments(outer.locus,
inner.locus));
inner.path->locus = inner.locus;
}
else if (CdbPathLocus_IsSegmentGeneralWorkers(inner.locus))
{
/* CBDB_PARALLEL_FIXME: Consider gather from SegmentGeneralWorkers. */
goto fail;
}
else
goto fail;
outer.ok_to_replicate = false;
if (innerParallel > 1)
{
int segs = getgpsegmentCount();
int parallel_bits = pg_leftmost_one_pos32(innerParallel) + 1;
int seg_bits = pg_leftmost_one_pos32(segs) + 1;
if (parallel_bits + seg_bits > 16)
goto fail;
}
inner.path = add_rowid_to_path(root, inner.path, p_rowidexpr_id);
*p_inner_path = inner.path;
break;

default:
elog(ERROR, "unexpected join type %d", jointype);
}

/* Get rel sizes. */
outer.bytes = outer.path->rows * outer.path->pathtarget->width;
inner.bytes = inner.path->rows * inner.path->pathtarget->width;
int outerParallel = outer.locus.parallel_workers;
int innerParallel = inner.locus.parallel_workers;

if (join_quals_contain_outer_references ||
CdbPathLocus_IsOuterQuery(outer.locus) ||
Expand Down
19 changes: 18 additions & 1 deletion src/backend/executor/execExpr.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include "cdb/cdbvars.h"
#include "utils/pg_locale.h"

#include "port/pg_bitutils.h"

typedef struct LastAttnumInfo
{
Expand Down Expand Up @@ -1139,7 +1140,23 @@ ExecInitExprRec(Expr *node, ExprState *state,
* value.
*/
scratch.opcode = EEOP_ROWIDEXPR;
scratch.d.rowidexpr.rowcounter = ((int64) GpIdentity.dbid) << 48;

/*
* CBDB_PARALLEL
* Planner have ensured that there is enough space for num of segments and parallel workers.
* As we has not set ParallelWokerNumber yet now, use TotalParallelWorkerNumberOfSlice here
* and keep bits space for ParallelWokerNumber.
*/
if (TotalParallelWorkerNumberOfSlice > 0)
{
int parallel_bits = pg_leftmost_one_pos32(TotalParallelWorkerNumberOfSlice) + 1;
int segs = getgpsegmentCount();
int segs_bits = pg_leftmost_one_pos32(segs) + 1;
Assert(segs_bits + parallel_bits <= 16);
scratch.d.rowidexpr.rowcounter = ((int64) GpIdentity.dbid) << (48 + parallel_bits);
}
else
scratch.d.rowidexpr.rowcounter = ((int64) GpIdentity.dbid) << 48;

ExprEvalPushStep(state, &scratch);
break;
Expand Down
6 changes: 6 additions & 0 deletions src/backend/executor/execExprInterp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,12 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
EEO_CASE(EEOP_ROWIDEXPR)
{
int64 rowcounter = ++op->d.rowidexpr.rowcounter;
/*
* CBDB_PARALLEL_FIXME
* Take ParallelWorkerNumberOfSlice into account for just once when initialization.
*/
if (IsParallelWorkerOfSlice())
rowcounter |= (((int64) ParallelWorkerNumberOfSlice) << (48));

*op->resvalue = Int64GetDatum(rowcounter);
*op->resnull = false;
Expand Down
9 changes: 8 additions & 1 deletion src/backend/executor/execMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -1925,6 +1925,12 @@ InitPlan(QueryDesc *queryDesc, int eflags)
ExecSlice *sendSlice = &estate->es_sliceTable->slices[m->motionID];
estate->currentSliceId = sendSlice->parentIndex;
estate->useMppParallelMode = sendSlice->useMppParallelMode;
/*
* CBDB_PARALLEL
* Remember: parallel_workers is set to no less than = 1 when gang is filled
* for convenience in Motion execution.
*/
TotalParallelWorkerNumberOfSlice = sendSlice->parallel_workers > 1 ? sendSlice->parallel_workers : 0;
}
/* Compute SubPlans' root plan nodes for SubPlans reachable from this plan root */
estate->locallyExecutableSubplans = getLocallyExecutableSubplans(plannedstmt, start_plan_node);
Expand Down Expand Up @@ -1961,9 +1967,10 @@ InitPlan(QueryDesc *queryDesc, int eflags)
bool save_useMppParallelMode = estate->useMppParallelMode;

estate->currentSliceId = estate->es_plannedstmt->subplan_sliceIds[subplan_id - 1];
/* FIXME: test whether mpp parallel style exists for subplan case */
/* CBDB_PARALLEL_FIXME: test whether mpp parallel style exists for subplan case */
estate->useMppParallelMode = false;

/* CBDB_PARALLEL_FIXME: update TotalParallelWorkerNumberOfSlice for subplan, could it be possible? */
Plan *subplan = (Plan *) lfirst(l);
subplanstate = ExecInitNode(subplan, estate, sp_eflags);

Expand Down
2 changes: 1 addition & 1 deletion src/backend/optimizer/path/costsize.c
Original file line number Diff line number Diff line change
Expand Up @@ -6784,7 +6784,7 @@ get_parallel_divisor(Path *path)
* parallel plan.
*/
/*
* GPDB parallel: We don't have a leader like upstream.
* CBDB_PARALLEL: We don't have a leader like upstream.
* parallel_divisor is usually used to estimate rows.
* Since we don't have a leader in GP parallel style, set it the same
* as path's parallel_workers which may be 0 sometimes.
Expand Down
6 changes: 0 additions & 6 deletions src/backend/optimizer/path/joinpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -1327,8 +1327,6 @@ sort_inner_and_outer(PlannerInfo *root,
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
save_jointype != JOIN_FULL &&
save_jointype != JOIN_DEDUP_SEMI &&
save_jointype != JOIN_DEDUP_SEMI_REVERSE &&
save_jointype != JOIN_RIGHT &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
Expand Down Expand Up @@ -1936,8 +1934,6 @@ match_unsorted_outer(PlannerInfo *root,
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
save_jointype != JOIN_FULL &&
save_jointype != JOIN_DEDUP_SEMI &&
save_jointype != JOIN_DEDUP_SEMI_REVERSE &&
save_jointype != JOIN_RIGHT &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
Expand Down Expand Up @@ -2309,8 +2305,6 @@ hash_inner_and_outer(PlannerInfo *root,
save_jointype != JOIN_UNIQUE_OUTER &&
save_jointype != JOIN_FULL &&
save_jointype != JOIN_RIGHT &&
save_jointype != JOIN_DEDUP_SEMI &&
save_jointype != JOIN_DEDUP_SEMI_REVERSE &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
{
Expand Down
33 changes: 33 additions & 0 deletions src/backend/utils/misc/guc_gp.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ bool gp_appendonly_verify_write_block = false;
bool gp_appendonly_compaction = true;
int gp_appendonly_compaction_threshold = 0;
bool enable_parallel = false;
bool enable_parallel_semi_join = true;
bool enable_parallel_dedup_semi_join = true;
bool enable_parallel_dedup_semi_reverse_join = true;
int gp_appendonly_insert_files = 0;
int gp_appendonly_insert_files_tuples_range = 0;
int gp_random_insert_segments = 0;
Expand Down Expand Up @@ -3041,6 +3044,36 @@ struct config_bool ConfigureNamesBool_gp[] =
false,
NULL, NULL, NULL
},
{
{"enable_parallel_semi_join", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("allow to use of parallel semi join."),
NULL,
GUC_EXPLAIN
},
&enable_parallel_semi_join,
true,
NULL, NULL, NULL
},
{
{"enable_parallel_dedup_semi_join", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("allow to use of parallel dedup semi join."),
NULL,
GUC_EXPLAIN
},
&enable_parallel_dedup_semi_join,
true,
NULL, NULL, NULL
},
{
{"enable_parallel_dedup_semi_reverse_join", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("allow to use of parallel dedup semi reverse join."),
NULL,
GUC_EXPLAIN
},
&enable_parallel_dedup_semi_reverse_join,
true,
NULL, NULL, NULL
},
{
{"gp_internal_is_singlenode", PGC_POSTMASTER, UNGROUPED,
gettext_noop("Is in SingleNode mode (no segments). WARNING: user SHOULD NOT set this by any means."),
Expand Down
13 changes: 13 additions & 0 deletions src/include/access/parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ extern volatile bool ParallelMessagePending;
extern PGDLLIMPORT int ParallelWorkerNumber;
extern PGDLLIMPORT bool InitializingParallelWorker;

/* CBDB_PARALLEL: Total parallel workers of a slice including myself, 0 for no parallel */
extern PGDLLIMPORT int ParallelWorkerNumberOfSlice;
extern PGDLLIMPORT int TotalParallelWorkerNumberOfSlice;

typedef struct ParallelEntryTag
{
int cid;
Expand Down Expand Up @@ -90,6 +94,15 @@ typedef struct GpParallelDSMEntry
Barrier build_barrier; /* synchronization for the build dsm phases */
} GpParallelDSMEntry;

/*
* CBDB_PARALLEL
* The Postgres uses ParallelWorkerNumber to handle background workers including
* parallel workers under Gather node.
* To avoid mixing them and assertion failure, we use ParallelWorkerNumberOfSlice
* to indentify CBDB style parallel mode.
*/
#define IsParallelWorkerOfSlice() (ParallelWorkerNumberOfSlice >= 0)

#define IsParallelWorker() (ParallelWorkerNumber >= 0)

extern ParallelContext *CreateParallelContext(const char *library_name,
Expand Down
3 changes: 3 additions & 0 deletions src/include/utils/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ extern bool gp_appendonly_verify_block_checksums;
extern bool gp_appendonly_verify_write_block;
extern bool gp_appendonly_compaction;
extern bool enable_parallel;
extern bool enable_parallel_semi_join;
extern bool enable_parallel_dedup_semi_join;
extern bool enable_parallel_dedup_semi_reverse_join;
extern int gp_appendonly_insert_files;
extern int gp_appendonly_insert_files_tuples_range;
extern int gp_random_insert_segments;
Expand Down
3 changes: 3 additions & 0 deletions src/include/utils/unsync_guc_name.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@
"enable_nestloop",
"enable_parallel_append",
"enable_parallel_hash",
"enable_parallel_semi_join",
"enable_parallel_dedup_semi_join",
"enable_parallel_dedup_semi_reverse_join",
"enable_partition_pruning",
"enable_partitionwise_aggregate",
"enable_partitionwise_join",
Expand Down
Loading

0 comments on commit f32844c

Please sign in to comment.