From 980e2de1d7cac7e2568c7881785bd91d01f7e7eb Mon Sep 17 00:00:00 2001 From: zhangyue Date: Fri, 20 Oct 2023 22:33:42 +0800 Subject: [PATCH] Push the runtime filter from HashJoin down to SeqScan or AM. +----------+ AttrFilter +------+ ScanKey +------------+ | HashJoin | ------------> | Hash | ---------> | SeqScan/AM | +----------+ +------+ +------------+ If "gp_enable_runtime_filter_pushdown" is on, three steps will be run: Step 1. In ExecInitHashJoin(), try to find the mapper between the var in hashclauses and the var in SeqScan. If found we will save the mapper in AttrFilter and push them to Hash node; Step 2. We will create the range/bloom filters in AttrFilter during building hash table, and these filters will be converted to the list of ScanKey and pushed down to Seqscan when the building finishes; Step 3. If AM support SCAN_SUPPORT_RUNTIME_FILTER, these ScanKeys will be pushed down to the AM module further, otherwise will be used to filter slot in Seqscan; --- src/backend/executor/nodeHash.c | 162 +++++++++++- src/backend/executor/nodeHashjoin.c | 239 ++++++++++++++++++ src/backend/executor/nodeSeqscan.c | 94 ++++++- src/backend/utils/misc/guc_gp.c | 12 + src/include/access/skey.h | 3 + src/include/access/tableam.h | 9 +- src/include/nodes/execnodes.h | 20 ++ src/include/utils/guc.h | 6 + src/include/utils/sync_guc_name.h | 1 + .../regress/expected/gp_runtime_filter.out | 54 ++++ src/test/regress/sql/gp_runtime_filter.sql | 34 +++ 11 files changed, 627 insertions(+), 7 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index e1c86f737cc..602717fce1a 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -102,7 +102,10 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, size_t size); static void ExecParallelHashMergeCounters(HashJoinTable hashtable); static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); - +static void BuildRuntimeFilter(HashState *node, TupleTableSlot *slot); +static void PushdownRuntimeFilter(HashState *node); +static void FreeRuntimeFilter(HashState *node); +static void ResetRuntimeFilter(HashState *node); /* ---------------------------------------------------------------- * ExecHash @@ -193,7 +196,15 @@ MultiExecPrivateHash(HashState *node) { slot = ExecProcNode(outerNode); if (TupIsNull(slot)) + { + if (gp_enable_runtime_filter_pushdown && node->filters) + PushdownRuntimeFilter(node); break; + } + + if (gp_enable_runtime_filter_pushdown && node->filters) + BuildRuntimeFilter(node, slot); + /* We have to compute the hash value */ econtext->ecxt_outertuple = slot; bool hashkeys_null = false; @@ -334,7 +345,15 @@ MultiExecParallelHash(HashState *node) slot = ExecProcNode(outerNode); if (TupIsNull(slot)) + { + if (gp_enable_runtime_filter_pushdown && node->filters) + PushdownRuntimeFilter(node); break; + } + + if (gp_enable_runtime_filter_pushdown && node->filters) + BuildRuntimeFilter(node, slot); + econtext->ecxt_outertuple = slot; if (ExecHashGetHashValue(node, hashtable, econtext, hashkeys, false, hashtable->keepNulls, @@ -512,6 +531,9 @@ ExecEndHash(HashState *node) */ outerPlan = outerPlanState(node); ExecEndNode(outerPlan); + + if (gp_enable_runtime_filter_pushdown && node->filters) + FreeRuntimeFilter(node); } @@ -2520,6 +2542,9 @@ ExecReScanHash(HashState *node) */ if (node->ps.lefttree->chgParam == NULL) ExecReScan(node->ps.lefttree); + + if (gp_enable_runtime_filter_pushdown && node->filters) + ResetRuntimeFilter(node); } @@ -4126,3 +4151,138 @@ get_hash_mem(void) return (int) mem_limit; } + +/* + * Convert AttrFilter to ScanKeyData and send these runtime filters to the + * target node(seqscan). + */ +void +PushdownRuntimeFilter(HashState *node) +{ + ListCell *lc; + List *scankeys; + ScanKey sk; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + scankeys = NIL; + + attr_filter = lfirst(lc); + if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty) + continue; + + /* bloom filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags = SK_BLOOM_FILTER; + sk->sk_attno = attr_filter->lattno; + sk->sk_subtype = INT8OID; + sk->sk_argument = PointerGetDatum(attr_filter->blm_filter); + scankeys = lappend(scankeys, sk); + + /* range filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags = 0; + sk->sk_attno = attr_filter->lattno; + sk->sk_strategy = BTGreaterEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->min; + scankeys = lappend(scankeys, sk); + + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags = 0; + sk->sk_attno = attr_filter->lattno; + sk->sk_strategy = BTLessEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->max; + scankeys = lappend(scankeys, sk); + + /* append new runtime filters to target node */ + SeqScanState *sss = castNode(SeqScanState, attr_filter->target); + sss->filters = list_concat(sss->filters, scankeys); + } +} + +static void +BuildRuntimeFilter(HashState *node, TupleTableSlot *slot) +{ + Datum val; + bool isnull; + ListCell *lc; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + attr_filter = (AttrFilter *) lfirst(lc); + + val = slot_getattr(slot, attr_filter->rattno, &isnull); + if (isnull) + continue; + + attr_filter->empty = false; + + if ((int64_t)val < (int64_t)attr_filter->min) + attr_filter->min = val; + + if ((int64_t)val > (int64_t)attr_filter->max) + attr_filter->max = val; + + if (attr_filter->blm_filter) + bloom_add_element(attr_filter->blm_filter, (unsigned char *)&val, sizeof(Datum)); + } +} + +void +FreeRuntimeFilter(HashState *node) +{ + ListCell *lc; + AttrFilter *attr_filter; + + if (!node->filters) + return; + + foreach (lc, node->filters) + { + attr_filter = lfirst(lc); + if (attr_filter->blm_filter) + bloom_free(attr_filter->blm_filter); + } + + list_free_deep(node->filters); + node->filters = NIL; +} + +void +ResetRuntimeFilter(HashState *node) +{ + ListCell *lc; + AttrFilter *attr_filter; + SeqScanState *sss; + + if (!node->filters) + return; + + foreach (lc, node->filters) + { + attr_filter = lfirst(lc); + attr_filter->empty = true; + + if (IsA(attr_filter->target, SeqScanState)) + { + sss = castNode(SeqScanState, attr_filter->target); + if (sss->filters) + { + list_free_deep(sss->filters); + sss->filters = NIL; + } + } + + if (attr_filter->blm_filter) + bloom_free(attr_filter->blm_filter); + + attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows, + work_mem, random()); + attr_filter->min = LLONG_MAX; + attr_filter->max = LLONG_MIN; + } +} diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 818c0b69fa6..6a3d6c95b26 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -119,6 +119,8 @@ #include "executor/nodeRuntimeFilter.h" #include "miscadmin.h" #include "pgstat.h" +#include "utils/guc.h" +#include "utils/fmgroids.h" #include "utils/memutils.h" #include "utils/sharedtuplestore.h" @@ -162,6 +164,16 @@ static void ReleaseHashTable(HashJoinState *node); static void SpillCurrentBatch(HashJoinState *node); static bool ExecHashJoinReloadHashTable(HashJoinState *hjstate); static void ExecEagerFreeHashJoin(HashJoinState *node); +static void CreateRuntimeFilter(HashJoinState* hjstate); +static bool IsEqualOp(Expr *expr); +static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno); +static PlanState *FindTargetAttr(HashJoinState *hjstate, + AttrNumber attno, + AttrNumber *lattno); +static AttrFilter *CreateAttrFilter(PlanState *target, + AttrNumber lattno, + AttrNumber rattno, + double plan_rows); extern bool Test_print_prefetch_joinqual; @@ -1000,6 +1012,11 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) ExecInitRuntimeFilterFinish(rfstate, hstate->ps.plan->plan_rows); } + if (Gp_role == GP_ROLE_EXECUTE + && gp_enable_runtime_filter_pushdown + && !estate->useMppParallelMode) + CreateRuntimeFilter(hjstate); + return hjstate; } @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr *expr; + JoinType jointype; + HashJoin *hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell *lc; + + /* + * Only applicatable for inner, right and semi join, + */ + jointype = hjstate->js.jointype; + if (jointype != JOIN_INNER + && jointype != JOIN_RIGHT + && jointype != JOIN_SEMI + ) + return; + + hstate = castNode(HashState, innerPlanState(hjstate)); + hstate->filters = NIL; + + /* + * check and initialize the runtime filter for all hash conds in + * hj->hashclauses + */ + hj = castNode(HashJoin, hjstate->js.ps.plan); + foreach (lc, hj->hashclauses) + { + expr = (Expr *)lfirst(lc); + + if (!IsEqualOp(expr)) + continue; + + lattno = -1; + rattno = -1; + if (!CheckEqualArgs(expr, &lattno, &rattno)) + continue; + + if (lattno < 1 || rattno < 1) + continue; + + target = FindTargetAttr(hjstate, lattno, &lattno); + if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) + continue; + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } +} + +static bool +IsEqualOp(Expr *expr) +{ + Oid funcid = InvalidOid; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + funcid = ((OpExpr *)expr)->opfuncid; + else if (IsA(expr, FuncExpr)) + funcid = ((FuncExpr *)expr)->funcid; + else + return false; + + if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ + || funcid == F_INT24EQ || funcid == F_INT42EQ + || funcid == F_INT28EQ || funcid == F_INT82EQ + || funcid == F_INT48EQ || funcid == F_INT84EQ + ) + return true; + + return false; +} + +/* + * runtime filters which can be pushed down: + * 1. hash expr MUST BE equal op; + * 2. args MUST BE Var node; + * 3. the data type MUST BE integer; + */ +static bool +CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) +{ + Var *var; + bool match; + List *args; + ListCell *lc; + + if (lattno == NULL || rattno == NULL) + return false; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + args = ((OpExpr *)expr)->args; + else if (IsA(expr, FuncExpr)) + args = ((FuncExpr *)expr)->args; + else + return false; + + if (!args || list_length(args) != 2) + return false; + + match = false; + foreach (lc, args) + { + match = false; + + if (!IsA(lfirst(lc), Var)) + break; + + var = lfirst(lc); + if (var->varno == INNER_VAR) + *rattno = var->varattno; + else if (var->varno == OUTER_VAR) + *lattno = var->varattno; + else + break; + + match = true; + } + + return match; +} + +/* + * it's just allowed like this: + * HashJoin + * ... a series of HashJoin nodes + * HashJoin + * SeqScan <- target + */ +static PlanState * +FindTargetAttr(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno) +{ + Var *var; + PlanState *child, *parent; + TargetEntry *te; + + parent = (PlanState *)hjstate; + child = outerPlanState(hjstate); + Assert(child); + + *lattno = -1; + while (child) + { + /* target is seqscan */ + if (IsA(child, SeqScanState)) + { + te = (TargetEntry *)list_nth(child->plan->targetlist, attno - 1); + if (!IsA(te->expr, Var)) + return NULL; + + var = castNode(Var, te->expr); + + /* system column is not allowed */ + if (var->varattno <= 0) + return NULL; + + *lattno = var->varattno; + return child; + } + + /* + * hashjoin + * result (hash filter) + * seqscan on t1, t1 is replicated table + */ + if (!IsA(child, HashJoinState) && !IsA(child, ResultState)) + return NULL; + + /* child is hashjoin or result node */ + te = (TargetEntry *)list_nth(child->plan->targetlist, attno - 1); + if (!IsA(te->expr, Var)) + return NULL; + + var = castNode(Var, te->expr); + if (var->varno == INNER_VAR) + return NULL; + + attno = var->varattno; + + /* find at child node */ + parent = child; + child = outerPlanState(parent); + } + + return NULL; +} + +static AttrFilter* +CreateAttrFilter(PlanState *target, AttrNumber lattno, AttrNumber rattno, + double plan_rows) +{ + AttrFilter *attr_filter = palloc0(sizeof(AttrFilter)); + attr_filter->empty = true; + attr_filter->target = target; + + attr_filter->lattno = lattno; + attr_filter->rattno = rattno; + attr_filter->blm_filter = bloom_create_aggresive(plan_rows, work_mem, random()); + attr_filter->min = LLONG_MAX; + attr_filter->max = LLONG_MIN; + + return attr_filter; +} diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index d6e88e6ddf0..74510615804 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -44,6 +44,9 @@ static TupleTableSlot *SeqNext(SeqScanState *node); +static bool PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot); +static ScanKey ScanKeyListToArray(List *keys, int *num); + /* ---------------------------------------------------------------- * Scan Support * ---------------------------------------------------------------- @@ -72,13 +75,24 @@ SeqNext(SeqScanState *node) if (scandesc == NULL) { + int nkeys = 0; + ScanKey keys = NULL; + + /* + * Just when gp_enable_runtime_filter_pushdown enabled and + * node->filter_in_seqscan is false means scankey need to be pushed to + * AM. + */ + if (gp_enable_runtime_filter_pushdown && !node->filter_in_seqscan) + keys = ScanKeyListToArray(node->filters, &nkeys); + /* * We reach here if the scan is not parallel, or if we're serially * executing a scan that was planned to be parallel. */ scandesc = table_beginscan_es(node->ss.ss_currentRelation, estate->es_snapshot, - 0, NULL, + nkeys, keys, NULL, &node->ss.ps); node->ss.ss_currentScanDesc = scandesc; @@ -87,8 +101,17 @@ SeqNext(SeqScanState *node) /* * get the next tuple from the table */ - if (table_scan_getnextslot(scandesc, direction, slot)) + while (table_scan_getnextslot(scandesc, direction, slot)) + { + if (TupIsNull(slot)) + return slot; + + if (node->filter_in_seqscan && node->filters && + !PassByBloomFilter(node, slot)) + continue; + return slot; + } return NULL; } @@ -192,6 +215,16 @@ ExecInitSeqScanForPartition(SeqScan *node, EState *estate, scanstate->ss.ps.qual = ExecInitQual(node->plan.qual, (PlanState *) scanstate); + /* + * check scan slot with bloom filters in seqscan node or not. + */ + if (gp_enable_runtime_filter_pushdown + && !estate->useMppParallelMode) + { + scanstate->filter_in_seqscan = + !(table_scan_flags(currentRelation) & SCAN_SUPPORT_RUNTIME_FILTER); + } + return scanstate; } @@ -365,3 +398,60 @@ ExecSeqScanInitializeWorker(SeqScanState *node, } node->ss.ss_currentScanDesc = scandesc; } + +/* + * Returns true if the element may be in the bloom filter. + */ +static bool +PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot) +{ + ScanKey sk; + Datum val; + bool isnull; + ListCell *lc; + bloom_filter *blm_filter; + + foreach (lc, node->filters) + { + sk = lfirst(lc); + if (sk->sk_flags != SK_BLOOM_FILTER) + continue; + + val = slot_getattr(slot, sk->sk_attno, &isnull); + if (isnull) + continue; + + blm_filter = (bloom_filter *)DatumGetPointer(sk->sk_argument); + if (bloom_lacks_element(blm_filter, (unsigned char *)&val, sizeof(Datum))) + return false; + } + + return true; +} + +/* + * Convert the list of ScanKey to the array, and append an emtpy ScanKey as + * the end flag of the array. + */ +static ScanKey +ScanKeyListToArray(List *keys, int *num) +{ + ScanKey sk; + + if (list_length(keys) == 0) + return NULL; + + Assert(num); + *num = list_length(keys); + + sk = (ScanKey)palloc(sizeof(ScanKeyData) * (*num + 1)); + for (int i = 0; i < *num; ++i) + memcpy(&sk[i], list_nth(keys, i), sizeof(ScanKeyData)); + + /* + * SK_EMPYT means the end of the array of the ScanKey + */ + sk[*num].sk_flags = SK_EMPYT; + + return sk; +} diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c index c1bac91236c..388aed9815f 100644 --- a/src/backend/utils/misc/guc_gp.c +++ b/src/backend/utils/misc/guc_gp.c @@ -445,6 +445,8 @@ bool gp_enable_global_deadlock_detector = false; bool gp_enable_predicate_pushdown; int gp_predicate_pushdown_sample_rows; +bool gp_enable_runtime_filter_pushdown; + bool enable_offload_entry_to_qe = false; bool enable_answer_query_using_materialized_views = false; @@ -3147,6 +3149,16 @@ struct config_bool ConfigureNamesBool_gp[] = NULL, NULL, NULL }, + { + {"gp_enable_runtime_filter_pushdown", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Try to push the hash table of hash join to the seqscan or AM as bloom filter."), + NULL + }, + &gp_enable_runtime_filter_pushdown, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL diff --git a/src/include/access/skey.h b/src/include/access/skey.h index 92b7d09fa45..2f533c03db8 100644 --- a/src/include/access/skey.h +++ b/src/include/access/skey.h @@ -122,6 +122,9 @@ typedef ScanKeyData *ScanKey; #define SK_SEARCHNOTNULL 0x0080 /* scankey represents "col IS NOT NULL" */ #define SK_ORDER_BY 0x0100 /* scankey is for ORDER BY op */ +/* for runtime filter */ +#define SK_EMPYT 0x8000 /* scankey is empty */ +#define SK_BLOOM_FILTER 0x4000 /* scankey is for bloom filter */ /* * prototypes for functions in access/common/scankey.c diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index f5469785c07..f873fa7a294 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -43,15 +43,16 @@ struct ValidateIndexState; /** * Flags represented the supported features of scan. - * + * * The first 8 bits are reserved for kernel expansion of some attributes, * and the remaining 24 bits are reserved for custom tableam. - * + * * If you add a new flag, make sure the flag's bit is consecutive with * the previous one. - * -*/ + * + */ #define SCAN_SUPPORT_COLUMN_ORIENTED_SCAN (1 << 0) /* support column-oriented scanning*/ +#define SCAN_SUPPORT_RUNTIME_FILTER (1 << 1) /* support runtime filter scan */ /* * Bitmask values for the flags argument to the scan_begin callback. diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 2b4f7ff23b2..6a157393001 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1539,6 +1539,10 @@ typedef struct SeqScanState { ScanState ss; /* its first field is NodeTag */ Size pscan_len; /* size of parallel heap scan descriptor */ + + List *filters; /* the list of struct ScanKeyData */ + bool filter_in_seqscan; /* check scan slot with runtime filters in + seqscan node or in am */ } SeqScanState; /* ---------------- @@ -2868,6 +2872,20 @@ typedef struct RuntimeFilterState bloom_filter *bf; } RuntimeFilterState; +typedef struct AttrFilter +{ + bool empty; /* empty filter or not */ + PlanState *target;/* the node in where runtime filter will be used, + target will be seqscan, see FindTargetAttr(). + in nodeHashjoin.c */ + AttrNumber rattno; /* attr no in hash node */ + AttrNumber lattno; /* if target is seqscan, attr no in relation */ + + bloom_filter *blm_filter; + Datum min; + Datum max; +} AttrFilter; + /* ---------------- * HashState information * ---------------- @@ -2903,6 +2921,8 @@ typedef struct HashState struct ParallelHashJoinState *parallel_state; Barrier *sync_barrier; + + List *filters; /* the list of AttrFilter */ } HashState; /* ---------------- diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index 3eac20b1013..37565aa04ef 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -632,6 +632,12 @@ extern bool gp_log_endpoints; extern bool gp_allow_date_field_width_5digits; +/* + * Try to push the hash table of hash join node down to the scan node as + * bloom filter for performance. + */ +extern bool gp_enable_runtime_filter_pushdown; + typedef enum { INDEX_CHECK_NONE, diff --git a/src/include/utils/sync_guc_name.h b/src/include/utils/sync_guc_name.h index 72986dd0ccb..d11254ca35b 100644 --- a/src/include/utils/sync_guc_name.h +++ b/src/include/utils/sync_guc_name.h @@ -195,5 +195,6 @@ "work_mem", "gp_appendonly_insert_files", "gp_appendonly_insert_files_tuples_range", + "gp_enable_runtime_filter_pushdown", "gp_random_insert_segments", "zero_damaged_pages", diff --git a/src/test/regress/expected/gp_runtime_filter.out b/src/test/regress/expected/gp_runtime_filter.out index 16f893d465f..3394f1b533d 100644 --- a/src/test/regress/expected/gp_runtime_filter.out +++ b/src/test/regress/expected/gp_runtime_filter.out @@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf 1600 (1 row) +-- Test bloom filter pushdown +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=128 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(9 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=1 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(9 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; -- Clean up: reset guc SET gp_enable_runtime_filter TO off; SET optimizer TO default; diff --git a/src/test/regress/sql/gp_runtime_filter.sql b/src/test/regress/sql/gp_runtime_filter.sql index 876575b78c3..110e118eee2 100644 --- a/src/test/regress/sql/gp_runtime_filter.sql +++ b/src/test/regress/sql/gp_runtime_filter.sql @@ -75,6 +75,40 @@ SELECT COUNT(*) FROM fact_rf SELECT COUNT(*) FROM dim_rf WHERE dim_rf.did IN (SELECT did FROM fact_rf) AND proj_id < 2; +-- Test bloom filter pushdown +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; + +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); + +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; + +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; + +ANALYZE; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; -- Clean up: reset guc SET gp_enable_runtime_filter TO off;