diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index dc5840cc79c..097ec412f4a 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -137,6 +137,9 @@ static void show_incremental_sort_info(IncrementalSortState *incrsortstate, static void show_hash_info(HashState *hashstate, ExplainState *es); static void show_runtime_filter_info(RuntimeFilterState *rfstate, ExplainState *es); +static void show_pushdown_runtime_filter_info(const char *qlabel, + PlanState *planstate, + ExplainState *es); static void show_memoize_info(MemoizeState *mstate, List *ancestors, ExplainState *es); static void show_hashagg_info(AggState *hashstate, ExplainState *es); @@ -2415,6 +2418,10 @@ ExplainNode(PlanState *planstate, List *ancestors, /* fall through to print additional fields the same as SeqScan */ /* FALLTHROUGH */ case T_SeqScan: + if (gp_enable_runtime_filter_pushdown && IsA(planstate, SeqScanState)) + show_pushdown_runtime_filter_info("Rows Removed by Pushdown Runtime Filter", + planstate, es); + /* FALLTHROUGH */ case T_DynamicSeqScan: case T_ValuesScan: case T_CteScan: @@ -4285,6 +4292,24 @@ show_instrumentation_count(const char *qlabel, int which, } } +/* + * If it's EXPLAIN ANALYZE, show instrumentation information with pushdown + * runtime filter. + */ +static void +show_pushdown_runtime_filter_info(const char *qlabel, + PlanState *planstate, + ExplainState *es) +{ + Assert(gp_enable_runtime_filter_pushdown && IsA(planstate, SeqScanState)); + + if (!es->analyze || !planstate->instrument) + return; + + if (planstate->instrument->prf_work) + ExplainPropertyFloat(qlabel, NULL, planstate->instrument->nfilteredPRF, 0, es); +} + /* * Show extra information for a ForeignScan node. */ diff --git a/src/backend/commands/explain_gp.c b/src/backend/commands/explain_gp.c index 35ca1c9787f..537037f0e4c 100644 --- a/src/backend/commands/explain_gp.c +++ b/src/backend/commands/explain_gp.c @@ -57,6 +57,8 @@ typedef struct CdbExplain_StatInst double nloops; /* # of run cycles for this node */ double nfiltered1; double nfiltered2; + bool prf_work; + double nfilteredPRF; double execmemused; /* executor memory used (bytes) */ double workmemused; /* work_mem actually used (bytes) */ double workmemwanted; /* work_mem to avoid workfile i/o (bytes) */ @@ -886,6 +888,8 @@ cdbexplain_collectStatsFromNode(PlanState *planstate, CdbExplain_SendStatCtx *ct si->nloops = instr->nloops; si->nfiltered1 = instr->nfiltered1; si->nfiltered2 = instr->nfiltered2; + si->prf_work = instr->prf_work; + si->nfilteredPRF = instr->nfilteredPRF; si->workmemused = instr->workmemused; si->workmemwanted = instr->workmemwanted; si->workfileCreated = instr->workfileCreated; @@ -1189,6 +1193,8 @@ cdbexplain_depositStatsToNode(PlanState *planstate, CdbExplain_RecvStatCtx *ctx) instr->nloops = nodeAcc->nsimax->nloops; instr->nfiltered1 = nodeAcc->nsimax->nfiltered1; instr->nfiltered2 = nodeAcc->nsimax->nfiltered2; + instr->prf_work = nodeAcc->nsimax->prf_work; + instr->nfilteredPRF = nodeAcc->nsimax->nfilteredPRF; instr->execmemused = nodeAcc->nsimax->execmemused; instr->workmemused = nodeAcc->nsimax->workmemused; instr->workmemwanted = nodeAcc->nsimax->workmemwanted; diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index e1c86f737cc..94fdeb0b0d1 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -102,7 +102,10 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, size_t size); static void ExecParallelHashMergeCounters(HashJoinTable hashtable); static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); - +static void BuildRuntimeFilter(HashState *node, TupleTableSlot *slot); +static void PushdownRuntimeFilter(HashState *node); +static void FreeRuntimeFilter(HashState *node); +static void ResetRuntimeFilter(HashState *node); /* ---------------------------------------------------------------- * ExecHash @@ -193,7 +196,15 @@ MultiExecPrivateHash(HashState *node) { slot = ExecProcNode(outerNode); if (TupIsNull(slot)) + { + if (gp_enable_runtime_filter_pushdown && node->filters) + PushdownRuntimeFilter(node); break; + } + + if (gp_enable_runtime_filter_pushdown && node->filters) + BuildRuntimeFilter(node, slot); + /* We have to compute the hash value */ econtext->ecxt_outertuple = slot; bool hashkeys_null = false; @@ -335,6 +346,7 @@ MultiExecParallelHash(HashState *node) slot = ExecProcNode(outerNode); if (TupIsNull(slot)) break; + econtext->ecxt_outertuple = slot; if (ExecHashGetHashValue(node, hashtable, econtext, hashkeys, false, hashtable->keepNulls, @@ -512,6 +524,9 @@ ExecEndHash(HashState *node) */ outerPlan = outerPlanState(node); ExecEndNode(outerPlan); + + if (gp_enable_runtime_filter_pushdown && node->filters) + FreeRuntimeFilter(node); } @@ -2520,6 +2535,9 @@ ExecReScanHash(HashState *node) */ if (node->ps.lefttree->chgParam == NULL) ExecReScan(node->ps.lefttree); + + if (gp_enable_runtime_filter_pushdown && node->filters) + ResetRuntimeFilter(node); } @@ -4126,3 +4144,142 @@ get_hash_mem(void) return (int) mem_limit; } + +/* + * Convert AttrFilter to ScanKeyData and send these runtime filters to the + * target node(seqscan). + */ +void +PushdownRuntimeFilter(HashState *node) +{ + ListCell *lc; + List *scankeys; + ScanKey sk; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + scankeys = NIL; + + attr_filter = lfirst(lc); + if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty) + continue; + + /* bloom filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags = SK_BLOOM_FILTER; + sk->sk_attno = attr_filter->lattno; + sk->sk_subtype = INT8OID; + sk->sk_argument = PointerGetDatum(attr_filter->blm_filter); + scankeys = lappend(scankeys, sk); + + /* range filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags = 0; + sk->sk_attno = attr_filter->lattno; + sk->sk_strategy = BTGreaterEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->min; + scankeys = lappend(scankeys, sk); + + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags = 0; + sk->sk_attno = attr_filter->lattno; + sk->sk_strategy = BTLessEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->max; + scankeys = lappend(scankeys, sk); + + /* append new runtime filters to target node */ + SeqScanState *sss = castNode(SeqScanState, attr_filter->target); + sss->filters = list_concat(sss->filters, scankeys); + } +} + +static void +BuildRuntimeFilter(HashState *node, TupleTableSlot *slot) +{ + Datum val; + bool isnull; + ListCell *lc; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + attr_filter = (AttrFilter *) lfirst(lc); + + val = slot_getattr(slot, attr_filter->rattno, &isnull); + if (isnull) + continue; + + attr_filter->empty = false; + + if ((int64_t)val < (int64_t)attr_filter->min) + attr_filter->min = val; + + if ((int64_t)val > (int64_t)attr_filter->max) + attr_filter->max = val; + + if (attr_filter->blm_filter) + bloom_add_element(attr_filter->blm_filter, (unsigned char *)&val, sizeof(Datum)); + } +} + +void +FreeRuntimeFilter(HashState *node) +{ + ListCell *lc; + AttrFilter *attr_filter; + + if (!node->filters) + return; + + foreach (lc, node->filters) + { + attr_filter = lfirst(lc); + if (attr_filter->blm_filter) + bloom_free(attr_filter->blm_filter); + } + + list_free_deep(node->filters); + node->filters = NIL; +} + +void +ResetRuntimeFilter(HashState *node) +{ + ListCell *lc; + AttrFilter *attr_filter; + SeqScanState *sss; + + if (!node->filters) + return; + + foreach (lc, node->filters) + { + attr_filter = lfirst(lc); + attr_filter->empty = true; + + if (IsA(attr_filter->target, SeqScanState)) + { + sss = castNode(SeqScanState, attr_filter->target); + if (sss->filters) + { + list_free_deep(sss->filters); + sss->filters = NIL; + } + } + + if (attr_filter->blm_filter) + bloom_free(attr_filter->blm_filter); + + attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows, + work_mem, + random()); + + StaticAssertDecl(sizeof(LONG_MAX) == sizeof(Datum), "sizeof(LONG_MAX) should be equal to sizeof(Datum)"); + StaticAssertDecl(sizeof(LONG_MIN) == sizeof(Datum), "sizeof(LONG_MIN) should be equal to sizeof(Datum)"); + attr_filter->min = LONG_MAX; + attr_filter->max = LONG_MIN; + } +} diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 818c0b69fa6..5119b9b1a93 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -119,6 +119,8 @@ #include "executor/nodeRuntimeFilter.h" #include "miscadmin.h" #include "pgstat.h" +#include "utils/guc.h" +#include "utils/fmgroids.h" #include "utils/memutils.h" #include "utils/sharedtuplestore.h" @@ -162,6 +164,19 @@ static void ReleaseHashTable(HashJoinState *node); static void SpillCurrentBatch(HashJoinState *node); static bool ExecHashJoinReloadHashTable(HashJoinState *hjstate); static void ExecEagerFreeHashJoin(HashJoinState *node); +static void CreateRuntimeFilter(HashJoinState* hjstate); +static bool IsEqualOp(Expr *expr); +static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno); +static bool CheckTargetNode(PlanState *node, + AttrNumber attno, + AttrNumber *lattno); +static List *FindTargetNodes(HashJoinState *hjstate, + AttrNumber attno, + AttrNumber *lattno); +static AttrFilter *CreateAttrFilter(PlanState *target, + AttrNumber lattno, + AttrNumber rattno, + double plan_rows); extern bool Test_print_prefetch_joinqual; @@ -1000,6 +1015,11 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) ExecInitRuntimeFilterFinish(rfstate, hstate->ps.plan->plan_rows); } + if (Gp_role == GP_ROLE_EXECUTE + && gp_enable_runtime_filter_pushdown + && !estate->useMppParallelMode) + CreateRuntimeFilter(hjstate); + return hjstate; } @@ -2157,3 +2177,293 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr *expr; + JoinType jointype; + HashJoin *hj; + HashState *hstate; + AttrFilter *attr_filter; + ListCell *lc; + List *targets; + + /* + * A build-side Bloom filter tells us if a row is definitely not in the build + * side. This allows us to early-eliminate rows or early-accept rows depending + * on the type of join. + * Left Outer Join and Full Outer Join output all rows, so a build-side Bloom + * filter would only allow us to early-output. Left Antijoin outputs only if + * there is no match, so again early output. We don't implement early output + * for now. + * So it's only applicatable for inner, right and semi join. + */ + jointype = hjstate->js.jointype; + if (jointype != JOIN_INNER && + jointype != JOIN_RIGHT && + jointype != JOIN_SEMI) + return; + + hstate = castNode(HashState, innerPlanState(hjstate)); + hstate->filters = NIL; + + /* + * check and initialize the runtime filter for all hash conds in + * hj->hashclauses + */ + hj = castNode(HashJoin, hjstate->js.ps.plan); + foreach (lc, hj->hashclauses) + { + expr = (Expr *)lfirst(lc); + + if (!IsEqualOp(expr)) + continue; + + lattno = -1; + rattno = -1; + if (!CheckEqualArgs(expr, &lattno, &rattno)) + continue; + + if (lattno < 1 || rattno < 1) + continue; + + targets = FindTargetNodes(hjstate, lattno, &lattno); + if (lattno == -1 || targets == NULL) + continue; + + foreach(lc, targets) + { + PlanState *target = lfirst(lc); + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } + } +} + +static bool +IsEqualOp(Expr *expr) +{ + Oid funcid = InvalidOid; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + funcid = ((OpExpr *)expr)->opfuncid; + else if (IsA(expr, FuncExpr)) + funcid = ((FuncExpr *)expr)->funcid; + else + return false; + + if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ + || funcid == F_INT24EQ || funcid == F_INT42EQ + || funcid == F_INT28EQ || funcid == F_INT82EQ + || funcid == F_INT48EQ || funcid == F_INT84EQ + ) + return true; + + return false; +} + +/* + * runtime filters which can be pushed down: + * 1. hash expr MUST BE equal op; + * 2. args MUST BE Var node; + * 3. the data type MUST BE integer; + */ +static bool +CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) +{ + Var *var; + List *args; + + if (lattno == NULL || rattno == NULL) + return false; + + if (IsA(expr, OpExpr)) + args = ((OpExpr *)expr)->args; + else if (IsA(expr, FuncExpr)) + args = ((FuncExpr *)expr)->args; + else + return false; + + if (!args || list_length(args) != 2) + return false; + + /* check the first arg */ + if (!IsA(linitial(args), Var)) + return false; + + var = linitial(args); + if (var->varno == INNER_VAR) + *rattno = var->varattno; + else if (var->varno == OUTER_VAR) + *lattno = var->varattno; + else + return false; + + /* check the second arg */ + if (!IsA(lsecond(args), Var)) + return false; + + var = lsecond(args); + if (var->varno == INNER_VAR) + *rattno = var->varattno; + else if (var->varno == OUTER_VAR) + *lattno = var->varattno; + else + return false; + + return true; +} + +static bool +CheckTargetNode(PlanState *node, AttrNumber attno, AttrNumber *lattno) +{ + Var *var; + TargetEntry *te; + + if (!IsA(node, SeqScanState)) + return false; + + te = (TargetEntry *)list_nth(node->plan->targetlist, attno - 1); + if (!IsA(te->expr, Var)) + return false; + + var = castNode(Var, te->expr); + + /* system column is not allowed */ + if (var->varattno <= 0) + return false; + + *lattno = var->varattno; + + return true; +} + +/* + * it's just allowed like this: + * HashJoin + * ... a series of HashJoin nodes + * HashJoin + * SeqScan <- target + */ +static List * +FindTargetNodes(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno) +{ + Var *var; + PlanState *child, *parent; + TargetEntry *te; + List *targetNodes; + + parent = (PlanState *)hjstate; + child = outerPlanState(hjstate); + Assert(child); + + *lattno = -1; + targetNodes = NIL; + while (true) + { + /* target is seqscan */ + if ((IsA(parent, HashJoinState) || IsA(parent, ResultState)) && IsA(child, SeqScanState)) + { + /* + * hashjoin + * seqscan + * or + * hashjoin + * result + * seqscan + */ + if (!CheckTargetNode(child, attno, lattno)) + return NULL; + + targetNodes = lappend(targetNodes, child); + return targetNodes; + } + else if (IsA(parent, AppendState) && child == NULL) + { + /* + * append + * seqscan on t1_prt_1 + * seqscan on t1_prt_2 + * ... + */ + AppendState *as = castNode(AppendState, parent); + for (int i = 0; i < as->as_nplans; i++) + { + child = as->appendplans[i]; + if (!CheckTargetNode(child, attno, lattno)) + return NULL; + + targetNodes = lappend(targetNodes, child); + } + + return targetNodes; + } + + /* + * hashjoin + * result (hash filter) + * seqscan on t1, t1 is replicated table + * or + * hashjoin + * append + * seqscan on t1_prt_1 + * seqscan on t1_prt_2 + * ... + */ + if (!IsA(child, HashJoinState) && !IsA(child, ResultState) && !IsA(child, AppendState)) + return NULL; + + /* child is hashjoin, result or append node */ + te = (TargetEntry *)list_nth(child->plan->targetlist, attno - 1); + if (!IsA(te->expr, Var)) + return NULL; + + var = castNode(Var, te->expr); + if (var->varno == INNER_VAR) + return NULL; + + attno = var->varattno; + + /* find at child node */ + parent = child; + child = outerPlanState(parent); + } + + return NULL; +} + +static AttrFilter* +CreateAttrFilter(PlanState *target, AttrNumber lattno, AttrNumber rattno, + double plan_rows) +{ + AttrFilter *attr_filter = palloc0(sizeof(AttrFilter)); + attr_filter->empty = true; + attr_filter->target = target; + + attr_filter->lattno = lattno; + attr_filter->rattno = rattno; + + attr_filter->blm_filter = bloom_create_aggresive(plan_rows, work_mem, random()); + + StaticAssertDecl(sizeof(LONG_MAX) == sizeof(Datum), "sizeof(LONG_MAX) should be equal to sizeof(Datum)"); + StaticAssertDecl(sizeof(LONG_MIN) == sizeof(Datum), "sizeof(LONG_MIN) should be equal to sizeof(Datum)"); + attr_filter->min = LONG_MAX; + attr_filter->max = LONG_MIN; + + return attr_filter; +} diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index d6e88e6ddf0..808fcb5835c 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -44,6 +44,9 @@ static TupleTableSlot *SeqNext(SeqScanState *node); +static bool PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot); +static ScanKey ScanKeyListToArray(List *keys, int *num); + /* ---------------------------------------------------------------- * Scan Support * ---------------------------------------------------------------- @@ -72,13 +75,23 @@ SeqNext(SeqScanState *node) if (scandesc == NULL) { + ScanKey keys = NULL; + + /* + * Just when gp_enable_runtime_filter_pushdown enabled and + * node->filter_in_seqscan is false means scankey need to be pushed to + * AM. + */ + if (gp_enable_runtime_filter_pushdown && !node->filter_in_seqscan) + keys = ScanKeyListToArray(node->filters, &node->num_scan_keys); + /* * We reach here if the scan is not parallel, or if we're serially * executing a scan that was planned to be parallel. */ scandesc = table_beginscan_es(node->ss.ss_currentRelation, estate->es_snapshot, - 0, NULL, + node->num_scan_keys, keys, NULL, &node->ss.ps); node->ss.ss_currentScanDesc = scandesc; @@ -87,8 +100,22 @@ SeqNext(SeqScanState *node) /* * get the next tuple from the table */ - if (table_scan_getnextslot(scandesc, direction, slot)) - return slot; + if (node->filter_in_seqscan && node->filters) + { + while (table_scan_getnextslot(scandesc, direction, slot)) + { + if (!PassByBloomFilter(node, slot)) + continue; + + return slot; + } + } + else + { + if (table_scan_getnextslot(scandesc, direction, slot)) + return slot; + } + return NULL; } @@ -192,6 +219,16 @@ ExecInitSeqScanForPartition(SeqScan *node, EState *estate, scanstate->ss.ps.qual = ExecInitQual(node->plan.qual, (PlanState *) scanstate); + /* + * check scan slot with bloom filters in seqscan node or not. + */ + if (gp_enable_runtime_filter_pushdown + && !estate->useMppParallelMode) + { + scanstate->filter_in_seqscan = + !(table_scan_flags(currentRelation) & SCAN_SUPPORT_RUNTIME_FILTER); + } + return scanstate; } @@ -245,12 +282,27 @@ void ExecReScanSeqScan(SeqScanState *node) { TableScanDesc scan; + ScanKey keys; scan = node->ss.ss_currentScanDesc; + /* + * Clear all the pushdown scan keys. + */ + keys = NULL; + if (node->num_scan_keys) + { + keys = (ScanKey)palloc(sizeof(ScanKeyData) * node->num_scan_keys); + for (int i = 0; i < node->num_scan_keys; ++i) + keys[i].sk_flags = SK_EMPYT; + } + if (scan != NULL) table_rescan(scan, /* scan desc */ - NULL); /* new scan keys */ + keys); /* new scan keys */ + + if (keys) + pfree(keys); ExecScanReScan((ScanState *) node); } @@ -365,3 +417,74 @@ ExecSeqScanInitializeWorker(SeqScanState *node, } node->ss.ss_currentScanDesc = scandesc; } + +/* + * Returns true if the element may be in the bloom filter. + */ +static bool +PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot) +{ + ScanKey sk; + Datum val; + bool isnull; + ListCell *lc; + bloom_filter *blm_filter; + + /* + * Mark that the pushdown runtime filter is actually taking effect. + */ + if (node->ss.ps.instrument && + !node->ss.ps.instrument->prf_work && + list_length(node->filters)) + node->ss.ps.instrument->prf_work = true; + + foreach (lc, node->filters) + { + sk = lfirst(lc); + if (sk->sk_flags != SK_BLOOM_FILTER) + continue; + + val = slot_getattr(slot, sk->sk_attno, &isnull); + if (isnull) + continue; + + blm_filter = (bloom_filter *)DatumGetPointer(sk->sk_argument); + if (bloom_lacks_element(blm_filter, (unsigned char *)&val, sizeof(Datum))) + { + InstrCountFilteredPRF(node, 1); + return false; + } + } + + return true; +} + +/* + * Convert the list of ScanKey to the array, and append an emtpy ScanKey as + * the end flag of the array. + */ +static ScanKey +ScanKeyListToArray(List *keys, int *num) +{ + ScanKey sk; + + if (list_length(keys) == 0) + { + *num = 0; + return NULL; + } + + Assert(num); + *num = list_length(keys); + + sk = (ScanKey)palloc(sizeof(ScanKeyData) * (*num + 1)); + for (int i = 0; i < *num; ++i) + memcpy(&sk[i], list_nth(keys, i), sizeof(ScanKeyData)); + + /* + * SK_EMPYT means the end of the array of the ScanKey + */ + sk[*num].sk_flags = SK_EMPYT; + + return sk; +} diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c index 1995d91e757..5dd24aed045 100644 --- a/src/backend/utils/misc/guc_gp.c +++ b/src/backend/utils/misc/guc_gp.c @@ -453,6 +453,8 @@ bool gp_enable_global_deadlock_detector = false; bool gp_enable_predicate_pushdown; int gp_predicate_pushdown_sample_rows; +bool gp_enable_runtime_filter_pushdown; + bool enable_offload_entry_to_qe = false; bool enable_answer_query_using_materialized_views = false; bool aqumv_allow_foreign_table = false; @@ -3203,6 +3205,16 @@ struct config_bool ConfigureNamesBool_gp[] = NULL, NULL, NULL }, + { + {"gp_enable_runtime_filter_pushdown", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Try to push the hash table of hash join to the seqscan or AM as bloom filter."), + NULL + }, + &gp_enable_runtime_filter_pushdown, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL diff --git a/src/include/access/skey.h b/src/include/access/skey.h index 92b7d09fa45..2f533c03db8 100644 --- a/src/include/access/skey.h +++ b/src/include/access/skey.h @@ -122,6 +122,9 @@ typedef ScanKeyData *ScanKey; #define SK_SEARCHNOTNULL 0x0080 /* scankey represents "col IS NOT NULL" */ #define SK_ORDER_BY 0x0100 /* scankey is for ORDER BY op */ +/* for runtime filter */ +#define SK_EMPYT 0x8000 /* scankey is empty */ +#define SK_BLOOM_FILTER 0x4000 /* scankey is for bloom filter */ /* * prototypes for functions in access/common/scankey.c diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 5c166aa94ab..ea1bf508687 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -43,15 +43,16 @@ struct ValidateIndexState; /** * Flags represented the supported features of scan. - * + * * The first 8 bits are reserved for kernel expansion of some attributes, * and the remaining 24 bits are reserved for custom tableam. - * + * * If you add a new flag, make sure the flag's bit is consecutive with * the previous one. - * -*/ + * + */ #define SCAN_SUPPORT_COLUMN_ORIENTED_SCAN (1 << 0) /* support column-oriented scanning*/ +#define SCAN_SUPPORT_RUNTIME_FILTER (1 << 1) /* support runtime filter scan */ /* * Bitmask values for the flags argument to the scan_begin callback. diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index 78a81f709f6..e3ade84bbc5 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -78,6 +78,7 @@ typedef struct Instrumentation bool need_bufusage; /* true if we need buffer usage data */ bool need_walusage; /* true if we need WAL usage data */ bool async_mode; /* true if node is in async mode */ + bool prf_work; /* true if pushdown runtime filters really work */ /* Info about current plan cycle: */ bool running; /* true if we've completed first tuple */ instr_time starttime; /* Start time of current iteration of node */ @@ -94,6 +95,7 @@ typedef struct Instrumentation uint64 nloops; /* # of run cycles for this node */ double nfiltered1; /* # tuples removed by scanqual or joinqual */ double nfiltered2; /* # tuples removed by "other" quals */ + double nfilteredPRF; /* # tuples removed by pushdown runtime filter */ BufferUsage bufusage; /* Total buffer usage */ WalUsage walusage; /* total WAL usage */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 32ae92f64a3..782c9f7ed4a 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1186,6 +1186,11 @@ extern uint64 PlanStateOperatorMemKB(const PlanState *ps); if (((PlanState *)(node))->instrument) \ ((PlanState *)(node))->instrument->nfiltered2 += (delta); \ } while(0) +#define InstrCountFilteredPRF(node, delta) \ + do { \ + if (((PlanState *)(node))->instrument) \ + ((PlanState *)(node))->instrument->nfilteredPRF += (delta); \ + } while(0) /* * EPQState is state for executing an EvalPlanQual recheck on a candidate @@ -1544,6 +1549,12 @@ typedef struct SeqScanState { ScanState ss; /* its first field is NodeTag */ Size pscan_len; /* size of parallel heap scan descriptor */ + + List *filters; /* the list of struct ScanKeyData */ + bool filter_in_seqscan; /* check scan slot with runtime filters in + seqscan node or in am */ + int num_scan_keys; /* valid if filter_in_seqscan is false, + number of pushdown scan keys */ } SeqScanState; /* ---------------- @@ -3061,6 +3072,20 @@ typedef struct RuntimeFilterState bloom_filter *bf; } RuntimeFilterState; +typedef struct AttrFilter +{ + bool empty; /* empty filter or not */ + PlanState *target;/* the node in where runtime filter will be used, + target will be seqscan, see FindTargetAttr(). + in nodeHashjoin.c */ + AttrNumber rattno; /* attr no in hash node */ + AttrNumber lattno; /* if target is seqscan, attr no in relation */ + + bloom_filter *blm_filter; + Datum min; + Datum max; +} AttrFilter; + /* ---------------- * HashState information * ---------------- @@ -3096,6 +3121,8 @@ typedef struct HashState struct ParallelHashJoinState *parallel_state; Barrier *sync_barrier; + + List *filters; /* the list of AttrFilter */ } HashState; /* ---------------- diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index f9045dc8a22..02dce376898 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -639,6 +639,12 @@ extern bool gp_log_endpoints; extern bool gp_allow_date_field_width_5digits; +/* + * Try to push the hash table of hash join node down to the scan node as + * bloom filter for performance. + */ +extern bool gp_enable_runtime_filter_pushdown; + typedef enum { INDEX_CHECK_NONE, diff --git a/src/include/utils/sync_guc_name.h b/src/include/utils/sync_guc_name.h index 57ac3d89377..a20ed986dad 100644 --- a/src/include/utils/sync_guc_name.h +++ b/src/include/utils/sync_guc_name.h @@ -196,5 +196,6 @@ "work_mem", "gp_appendonly_insert_files", "gp_appendonly_insert_files_tuples_range", + "gp_enable_runtime_filter_pushdown", "gp_random_insert_segments", "zero_damaged_pages", diff --git a/src/test/regress/expected/gp_runtime_filter.out b/src/test/regress/expected/gp_runtime_filter.out index 16f893d465f..ca5ddefc7c3 100644 --- a/src/test/regress/expected/gp_runtime_filter.out +++ b/src/test/regress/expected/gp_runtime_filter.out @@ -1,3 +1,7 @@ +-- start_matchignore +-- m/^.*Extra Text:.*/ +-- m/^.*Buckets:.*/ +-- end_matchignore -- Disable ORCA SET optimizer TO off; -- Test Suit 1: runtime filter main case @@ -250,6 +254,294 @@ SELECT COUNT(*) FROM dim_rf 1600 (1 row) +-- Test bloom filter pushdown +SET enable_parallel TO off; +-- case 1: join on distribution table and replicated table. +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (actual rows=0 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=128 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(9 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (actual rows=0 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=1 loops=1) + Rows Removed by Pushdown Runtime Filter: 127 + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(10 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 2: join on partition table and replicated table. +CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2) (START (1) END (100) EVERY (50)); +CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED; +INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5); +INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51); +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1) + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of 524288 buckets. + -> Append (actual rows=608 loops=1) + Partition Selectors: $0 + -> Seq Scan on t1_1_prt_1 t1_1 (actual rows=288 loops=1) + -> Seq Scan on t1_1_prt_2 t1_2 (actual rows=320 loops=1) + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Partition Selector (selector id: $0) (actual rows=6 loops=1) + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Postgres query optimizer +(13 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1) + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of 524288 buckets. + -> Append (actual rows=64 loops=1) + Partition Selectors: $0 + -> Seq Scan on t1_1_prt_1 t1_1 (actual rows=48 loops=1) + Rows Removed by Pushdown Runtime Filter: 240 + -> Seq Scan on t1_1_prt_2 t1_2 (actual rows=16 loops=1) + Rows Removed by Pushdown Runtime Filter: 304 + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Partition Selector (selector id: $0) (actual rows=6 loops=1) + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Postgres query optimizer +(15 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 3: bug fix with explain +DROP TABLE IF EXISTS test_tablesample1; +NOTICE: table "test_tablesample1" does not exist, skipping +CREATE TABLE test_tablesample1 (dist int, id int, name text) WITH (fillfactor=10) DISTRIBUTED BY (dist); +INSERT INTO test_tablesample1 SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (COSTS OFF) SELECT id FROM test_tablesample1 TABLESAMPLE SYSTEM (50) REPEATABLE (2); + QUERY PLAN +-------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + -> Sample Scan on test_tablesample1 + Sampling: system ('50'::real) REPEATABLE ('2'::double precision) + Optimizer: Postgres query optimizer +(4 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS test_tablesample1; +-- case 4: show debug info only when gp_enable_runtime_filter_pushdown is on +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Cloudberry Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +CREATE TABLE t2(c1 int, c2 int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Cloudberry Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +INSERT INTO t1 SELECT GENERATE_SERIES(1, 1000), GENERATE_SERIES(1, 1000); +INSERT INTO t2 SELECT * FROM t1; +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT count(t1.c2) FROM t1, t2 WHERE t1.c1 = t2.c1; + QUERY PLAN +--------------------------------------------------------------------------------------------------------- + Finalize Aggregate (actual rows=1 loops=1) + -> Gather Motion 3:1 (slice1; segments: 3) (actual rows=3 loops=1) + -> Partial Aggregate (actual rows=1 loops=1) + -> Hash Join (actual rows=340 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: (seg2) Hash chain length 1.0 avg, 1 max, using 340 of 524288 buckets. + -> Seq Scan on t1 (actual rows=340 loops=1) + Rows Removed by Pushdown Runtime Filter: 0 + -> Hash (actual rows=340 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4108kB + -> Seq Scan on t2 (actual rows=340 loops=1) + Optimizer: Postgres query optimizer +(12 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 5: hashjoin + result + seqsacn +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Cloudberry Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +INSERT INTO t1 VALUES (5,5,5,5,5), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +ANALYZE; +SET optimizer TO on; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=32 loops=1) + -> Hash Join (actual rows=32 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: (seg0) Hash chain length 2.0 avg, 2 max, using 3 of 524288 buckets. + -> Result (actual rows=16 loops=1) + -> Seq Scan on t1 (actual rows=24 loops=1) + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Pivotal Optimizer (GPORCA) +(10 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=32 loops=1) + -> Hash Join (actual rows=32 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: (seg0) Hash chain length 2.0 avg, 2 max, using 3 of 524288 buckets. + -> Result (actual rows=16 loops=1) + -> Seq Scan on t1 (actual rows=16 loops=1) + Rows Removed by Pushdown Runtime Filter: 8 + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Pivotal Optimizer (GPORCA) +(11 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 6: hashjoin + hashjoin + seqscan +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +DROP TABLE IF EXISTS t3; +NOTICE: table "t3" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t3(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +INSERT INTO t1 VALUES (1,1,1,1,1), (2,2,2,2,2), (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t3 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t3 select * FROM t3; +ANALYZE; +SET optimizer TO off; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + QUERY PLAN +--------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) (actual rows=256 loops=1) + -> Hash Join (actual rows=256 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: Hash chain length 4.0 avg, 4 max, using 4 of 32768 buckets. + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t3.c2) + Extra Text: Hash chain length 2.0 avg, 2 max, using 4 of 32768 buckets. + -> Seq Scan on t1 (actual rows=48 loops=1) + -> Hash (actual rows=8 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 258kB + -> Seq Scan on t3 (actual rows=8 loops=1) + -> Hash (actual rows=16 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 260kB + -> Seq Scan on t2 (actual rows=16 loops=1) + Optimizer: Postgres query optimizer +(15 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + QUERY PLAN +--------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) (actual rows=256 loops=1) + -> Hash Join (actual rows=256 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: Hash chain length 4.0 avg, 4 max, using 4 of 32768 buckets. + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t3.c2) + Extra Text: Hash chain length 2.0 avg, 2 max, using 4 of 32768 buckets. + -> Seq Scan on t1 (actual rows=32 loops=1) + Rows Removed by Pushdown Runtime Filter: 16 + -> Hash (actual rows=8 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 258kB + -> Seq Scan on t3 (actual rows=8 loops=1) + -> Hash (actual rows=16 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 260kB + -> Seq Scan on t2 (actual rows=16 loops=1) + Optimizer: Postgres query optimizer +(16 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +DROP TABLE IF EXISTS t3; +RESET enable_parallel; -- Clean up: reset guc SET gp_enable_runtime_filter TO off; SET optimizer TO default; diff --git a/src/test/regress/sql/gp_runtime_filter.sql b/src/test/regress/sql/gp_runtime_filter.sql index 876575b78c3..fc1fe487745 100644 --- a/src/test/regress/sql/gp_runtime_filter.sql +++ b/src/test/regress/sql/gp_runtime_filter.sql @@ -1,3 +1,7 @@ +-- start_matchignore +-- m/^.*Extra Text:.*/ +-- m/^.*Buckets:.*/ +-- end_matchignore -- Disable ORCA SET optimizer TO off; @@ -75,6 +79,155 @@ SELECT COUNT(*) FROM fact_rf SELECT COUNT(*) FROM dim_rf WHERE dim_rf.did IN (SELECT did FROM fact_rf) AND proj_id < 2; +-- Test bloom filter pushdown +SET enable_parallel TO off; + +-- case 1: join on distribution table and replicated table. +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; + +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); + +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; + +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; + +ANALYZE; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +-- case 2: join on partition table and replicated table. +CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2) (START (1) END (100) EVERY (50)); +CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED; +INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5); +INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51); +ANALYZE; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + +SET gp_enable_runtime_filter_pushdown TO on; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +-- case 3: bug fix with explain +DROP TABLE IF EXISTS test_tablesample1; +CREATE TABLE test_tablesample1 (dist int, id int, name text) WITH (fillfactor=10) DISTRIBUTED BY (dist); +INSERT INTO test_tablesample1 SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (COSTS OFF) SELECT id FROM test_tablesample1 TABLESAMPLE SYSTEM (50) REPEATABLE (2); +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS test_tablesample1; + +-- case 4: show debug info only when gp_enable_runtime_filter_pushdown is on +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1(c1 int, c2 int); +CREATE TABLE t2(c1 int, c2 int); +INSERT INTO t1 SELECT GENERATE_SERIES(1, 1000), GENERATE_SERIES(1, 1000); +INSERT INTO t2 SELECT * FROM t1; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT count(t1.c2) FROM t1, t2 WHERE t1.c1 = t2.c1; +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +-- case 5: hashjoin + result + seqsacn +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)); +INSERT INTO t1 VALUES (5,5,5,5,5), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +ANALYZE; + +SET optimizer TO on; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +-- case 6: hashjoin + hashjoin + seqscan +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +DROP TABLE IF EXISTS t3; +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t3(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +INSERT INTO t1 VALUES (1,1,1,1,1), (2,2,2,2,2), (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t3 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t3 select * FROM t3; +ANALYZE; + +SET optimizer TO off; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +DROP TABLE IF EXISTS t3; + +RESET enable_parallel; -- Clean up: reset guc SET gp_enable_runtime_filter TO off;