From e4e04a9bd5894167cfaa0f7d250bac0c17b0fe77 Mon Sep 17 00:00:00 2001 From: zhangyue Date: Fri, 20 Oct 2023 22:33:42 +0800 Subject: [PATCH 01/12] Push the runtime filter from HashJoin down to SeqScan or AM. +----------+ AttrFilter +------+ ScanKey +------------+ | HashJoin | ------------> | Hash | ---------> | SeqScan/AM | +----------+ +------+ +------------+ If "gp_enable_runtime_filter_pushdown" is on, three steps will be run: Step 1. In ExecInitHashJoin(), try to find the mapper between the var in hashclauses and the var in SeqScan. If found we will save the mapper in AttrFilter and push them to Hash node; Step 2. We will create the range/bloom filters in AttrFilter during building hash table, and these filters will be converted to the list of ScanKey and pushed down to Seqscan when the building finishes; Step 3. If AM support SCAN_SUPPORT_RUNTIME_FILTER, these ScanKeys will be pushed down to the AM module further, otherwise will be used to filter slot in Seqscan; --- src/backend/executor/nodeHash.c | 162 +++++++++++- src/backend/executor/nodeHashjoin.c | 239 ++++++++++++++++++ src/backend/executor/nodeSeqscan.c | 94 ++++++- src/backend/utils/misc/guc_gp.c | 12 + src/include/access/skey.h | 3 + src/include/access/tableam.h | 9 +- src/include/nodes/execnodes.h | 20 ++ src/include/utils/guc.h | 6 + src/include/utils/sync_guc_name.h | 1 + .../regress/expected/gp_runtime_filter.out | 54 ++++ src/test/regress/sql/gp_runtime_filter.sql | 34 +++ 11 files changed, 627 insertions(+), 7 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index e1c86f737cc..602717fce1a 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -102,7 +102,10 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable, size_t size); static void ExecParallelHashMergeCounters(HashJoinTable hashtable); static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable); - +static void BuildRuntimeFilter(HashState *node, TupleTableSlot *slot); +static void PushdownRuntimeFilter(HashState *node); +static void FreeRuntimeFilter(HashState *node); +static void ResetRuntimeFilter(HashState *node); /* ---------------------------------------------------------------- * ExecHash @@ -193,7 +196,15 @@ MultiExecPrivateHash(HashState *node) { slot = ExecProcNode(outerNode); if (TupIsNull(slot)) + { + if (gp_enable_runtime_filter_pushdown && node->filters) + PushdownRuntimeFilter(node); break; + } + + if (gp_enable_runtime_filter_pushdown && node->filters) + BuildRuntimeFilter(node, slot); + /* We have to compute the hash value */ econtext->ecxt_outertuple = slot; bool hashkeys_null = false; @@ -334,7 +345,15 @@ MultiExecParallelHash(HashState *node) slot = ExecProcNode(outerNode); if (TupIsNull(slot)) + { + if (gp_enable_runtime_filter_pushdown && node->filters) + PushdownRuntimeFilter(node); break; + } + + if (gp_enable_runtime_filter_pushdown && node->filters) + BuildRuntimeFilter(node, slot); + econtext->ecxt_outertuple = slot; if (ExecHashGetHashValue(node, hashtable, econtext, hashkeys, false, hashtable->keepNulls, @@ -512,6 +531,9 @@ ExecEndHash(HashState *node) */ outerPlan = outerPlanState(node); ExecEndNode(outerPlan); + + if (gp_enable_runtime_filter_pushdown && node->filters) + FreeRuntimeFilter(node); } @@ -2520,6 +2542,9 @@ ExecReScanHash(HashState *node) */ if (node->ps.lefttree->chgParam == NULL) ExecReScan(node->ps.lefttree); + + if (gp_enable_runtime_filter_pushdown && node->filters) + ResetRuntimeFilter(node); } @@ -4126,3 +4151,138 @@ get_hash_mem(void) return (int) mem_limit; } + +/* + * Convert AttrFilter to ScanKeyData and send these runtime filters to the + * target node(seqscan). + */ +void +PushdownRuntimeFilter(HashState *node) +{ + ListCell *lc; + List *scankeys; + ScanKey sk; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + scankeys = NIL; + + attr_filter = lfirst(lc); + if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty) + continue; + + /* bloom filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags = SK_BLOOM_FILTER; + sk->sk_attno = attr_filter->lattno; + sk->sk_subtype = INT8OID; + sk->sk_argument = PointerGetDatum(attr_filter->blm_filter); + scankeys = lappend(scankeys, sk); + + /* range filter */ + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags = 0; + sk->sk_attno = attr_filter->lattno; + sk->sk_strategy = BTGreaterEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->min; + scankeys = lappend(scankeys, sk); + + sk = (ScanKey)palloc0(sizeof(ScanKeyData)); + sk->sk_flags = 0; + sk->sk_attno = attr_filter->lattno; + sk->sk_strategy = BTLessEqualStrategyNumber; + sk->sk_subtype = INT8OID; + sk->sk_argument = attr_filter->max; + scankeys = lappend(scankeys, sk); + + /* append new runtime filters to target node */ + SeqScanState *sss = castNode(SeqScanState, attr_filter->target); + sss->filters = list_concat(sss->filters, scankeys); + } +} + +static void +BuildRuntimeFilter(HashState *node, TupleTableSlot *slot) +{ + Datum val; + bool isnull; + ListCell *lc; + AttrFilter *attr_filter; + + foreach (lc, node->filters) + { + attr_filter = (AttrFilter *) lfirst(lc); + + val = slot_getattr(slot, attr_filter->rattno, &isnull); + if (isnull) + continue; + + attr_filter->empty = false; + + if ((int64_t)val < (int64_t)attr_filter->min) + attr_filter->min = val; + + if ((int64_t)val > (int64_t)attr_filter->max) + attr_filter->max = val; + + if (attr_filter->blm_filter) + bloom_add_element(attr_filter->blm_filter, (unsigned char *)&val, sizeof(Datum)); + } +} + +void +FreeRuntimeFilter(HashState *node) +{ + ListCell *lc; + AttrFilter *attr_filter; + + if (!node->filters) + return; + + foreach (lc, node->filters) + { + attr_filter = lfirst(lc); + if (attr_filter->blm_filter) + bloom_free(attr_filter->blm_filter); + } + + list_free_deep(node->filters); + node->filters = NIL; +} + +void +ResetRuntimeFilter(HashState *node) +{ + ListCell *lc; + AttrFilter *attr_filter; + SeqScanState *sss; + + if (!node->filters) + return; + + foreach (lc, node->filters) + { + attr_filter = lfirst(lc); + attr_filter->empty = true; + + if (IsA(attr_filter->target, SeqScanState)) + { + sss = castNode(SeqScanState, attr_filter->target); + if (sss->filters) + { + list_free_deep(sss->filters); + sss->filters = NIL; + } + } + + if (attr_filter->blm_filter) + bloom_free(attr_filter->blm_filter); + + attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows, + work_mem, random()); + attr_filter->min = LLONG_MAX; + attr_filter->max = LLONG_MIN; + } +} diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 818c0b69fa6..6a3d6c95b26 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -119,6 +119,8 @@ #include "executor/nodeRuntimeFilter.h" #include "miscadmin.h" #include "pgstat.h" +#include "utils/guc.h" +#include "utils/fmgroids.h" #include "utils/memutils.h" #include "utils/sharedtuplestore.h" @@ -162,6 +164,16 @@ static void ReleaseHashTable(HashJoinState *node); static void SpillCurrentBatch(HashJoinState *node); static bool ExecHashJoinReloadHashTable(HashJoinState *hjstate); static void ExecEagerFreeHashJoin(HashJoinState *node); +static void CreateRuntimeFilter(HashJoinState* hjstate); +static bool IsEqualOp(Expr *expr); +static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno); +static PlanState *FindTargetAttr(HashJoinState *hjstate, + AttrNumber attno, + AttrNumber *lattno); +static AttrFilter *CreateAttrFilter(PlanState *target, + AttrNumber lattno, + AttrNumber rattno, + double plan_rows); extern bool Test_print_prefetch_joinqual; @@ -1000,6 +1012,11 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) ExecInitRuntimeFilterFinish(rfstate, hstate->ps.plan->plan_rows); } + if (Gp_role == GP_ROLE_EXECUTE + && gp_enable_runtime_filter_pushdown + && !estate->useMppParallelMode) + CreateRuntimeFilter(hjstate); + return hjstate; } @@ -2157,3 +2174,225 @@ ExecHashJoinInitializeWorker(HashJoinState *state, ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin); } } + +/* + * Find "inner var = outer var" in hj->hashclauses and create runtime filter + * for it. + */ +void +CreateRuntimeFilter(HashJoinState* hjstate) +{ + AttrNumber lattno, rattno; + Expr *expr; + JoinType jointype; + HashJoin *hj; + HashState *hstate; + PlanState *target; + AttrFilter *attr_filter; + ListCell *lc; + + /* + * Only applicatable for inner, right and semi join, + */ + jointype = hjstate->js.jointype; + if (jointype != JOIN_INNER + && jointype != JOIN_RIGHT + && jointype != JOIN_SEMI + ) + return; + + hstate = castNode(HashState, innerPlanState(hjstate)); + hstate->filters = NIL; + + /* + * check and initialize the runtime filter for all hash conds in + * hj->hashclauses + */ + hj = castNode(HashJoin, hjstate->js.ps.plan); + foreach (lc, hj->hashclauses) + { + expr = (Expr *)lfirst(lc); + + if (!IsEqualOp(expr)) + continue; + + lattno = -1; + rattno = -1; + if (!CheckEqualArgs(expr, &lattno, &rattno)) + continue; + + if (lattno < 1 || rattno < 1) + continue; + + target = FindTargetAttr(hjstate, lattno, &lattno); + if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) + continue; + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } +} + +static bool +IsEqualOp(Expr *expr) +{ + Oid funcid = InvalidOid; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + funcid = ((OpExpr *)expr)->opfuncid; + else if (IsA(expr, FuncExpr)) + funcid = ((FuncExpr *)expr)->funcid; + else + return false; + + if (funcid == F_INT2EQ || funcid == F_INT4EQ || funcid == F_INT8EQ + || funcid == F_INT24EQ || funcid == F_INT42EQ + || funcid == F_INT28EQ || funcid == F_INT82EQ + || funcid == F_INT48EQ || funcid == F_INT84EQ + ) + return true; + + return false; +} + +/* + * runtime filters which can be pushed down: + * 1. hash expr MUST BE equal op; + * 2. args MUST BE Var node; + * 3. the data type MUST BE integer; + */ +static bool +CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) +{ + Var *var; + bool match; + List *args; + ListCell *lc; + + if (lattno == NULL || rattno == NULL) + return false; + + if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) + return false; + + if (IsA(expr, OpExpr)) + args = ((OpExpr *)expr)->args; + else if (IsA(expr, FuncExpr)) + args = ((FuncExpr *)expr)->args; + else + return false; + + if (!args || list_length(args) != 2) + return false; + + match = false; + foreach (lc, args) + { + match = false; + + if (!IsA(lfirst(lc), Var)) + break; + + var = lfirst(lc); + if (var->varno == INNER_VAR) + *rattno = var->varattno; + else if (var->varno == OUTER_VAR) + *lattno = var->varattno; + else + break; + + match = true; + } + + return match; +} + +/* + * it's just allowed like this: + * HashJoin + * ... a series of HashJoin nodes + * HashJoin + * SeqScan <- target + */ +static PlanState * +FindTargetAttr(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno) +{ + Var *var; + PlanState *child, *parent; + TargetEntry *te; + + parent = (PlanState *)hjstate; + child = outerPlanState(hjstate); + Assert(child); + + *lattno = -1; + while (child) + { + /* target is seqscan */ + if (IsA(child, SeqScanState)) + { + te = (TargetEntry *)list_nth(child->plan->targetlist, attno - 1); + if (!IsA(te->expr, Var)) + return NULL; + + var = castNode(Var, te->expr); + + /* system column is not allowed */ + if (var->varattno <= 0) + return NULL; + + *lattno = var->varattno; + return child; + } + + /* + * hashjoin + * result (hash filter) + * seqscan on t1, t1 is replicated table + */ + if (!IsA(child, HashJoinState) && !IsA(child, ResultState)) + return NULL; + + /* child is hashjoin or result node */ + te = (TargetEntry *)list_nth(child->plan->targetlist, attno - 1); + if (!IsA(te->expr, Var)) + return NULL; + + var = castNode(Var, te->expr); + if (var->varno == INNER_VAR) + return NULL; + + attno = var->varattno; + + /* find at child node */ + parent = child; + child = outerPlanState(parent); + } + + return NULL; +} + +static AttrFilter* +CreateAttrFilter(PlanState *target, AttrNumber lattno, AttrNumber rattno, + double plan_rows) +{ + AttrFilter *attr_filter = palloc0(sizeof(AttrFilter)); + attr_filter->empty = true; + attr_filter->target = target; + + attr_filter->lattno = lattno; + attr_filter->rattno = rattno; + attr_filter->blm_filter = bloom_create_aggresive(plan_rows, work_mem, random()); + attr_filter->min = LLONG_MAX; + attr_filter->max = LLONG_MIN; + + return attr_filter; +} diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index d6e88e6ddf0..74510615804 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -44,6 +44,9 @@ static TupleTableSlot *SeqNext(SeqScanState *node); +static bool PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot); +static ScanKey ScanKeyListToArray(List *keys, int *num); + /* ---------------------------------------------------------------- * Scan Support * ---------------------------------------------------------------- @@ -72,13 +75,24 @@ SeqNext(SeqScanState *node) if (scandesc == NULL) { + int nkeys = 0; + ScanKey keys = NULL; + + /* + * Just when gp_enable_runtime_filter_pushdown enabled and + * node->filter_in_seqscan is false means scankey need to be pushed to + * AM. + */ + if (gp_enable_runtime_filter_pushdown && !node->filter_in_seqscan) + keys = ScanKeyListToArray(node->filters, &nkeys); + /* * We reach here if the scan is not parallel, or if we're serially * executing a scan that was planned to be parallel. */ scandesc = table_beginscan_es(node->ss.ss_currentRelation, estate->es_snapshot, - 0, NULL, + nkeys, keys, NULL, &node->ss.ps); node->ss.ss_currentScanDesc = scandesc; @@ -87,8 +101,17 @@ SeqNext(SeqScanState *node) /* * get the next tuple from the table */ - if (table_scan_getnextslot(scandesc, direction, slot)) + while (table_scan_getnextslot(scandesc, direction, slot)) + { + if (TupIsNull(slot)) + return slot; + + if (node->filter_in_seqscan && node->filters && + !PassByBloomFilter(node, slot)) + continue; + return slot; + } return NULL; } @@ -192,6 +215,16 @@ ExecInitSeqScanForPartition(SeqScan *node, EState *estate, scanstate->ss.ps.qual = ExecInitQual(node->plan.qual, (PlanState *) scanstate); + /* + * check scan slot with bloom filters in seqscan node or not. + */ + if (gp_enable_runtime_filter_pushdown + && !estate->useMppParallelMode) + { + scanstate->filter_in_seqscan = + !(table_scan_flags(currentRelation) & SCAN_SUPPORT_RUNTIME_FILTER); + } + return scanstate; } @@ -365,3 +398,60 @@ ExecSeqScanInitializeWorker(SeqScanState *node, } node->ss.ss_currentScanDesc = scandesc; } + +/* + * Returns true if the element may be in the bloom filter. + */ +static bool +PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot) +{ + ScanKey sk; + Datum val; + bool isnull; + ListCell *lc; + bloom_filter *blm_filter; + + foreach (lc, node->filters) + { + sk = lfirst(lc); + if (sk->sk_flags != SK_BLOOM_FILTER) + continue; + + val = slot_getattr(slot, sk->sk_attno, &isnull); + if (isnull) + continue; + + blm_filter = (bloom_filter *)DatumGetPointer(sk->sk_argument); + if (bloom_lacks_element(blm_filter, (unsigned char *)&val, sizeof(Datum))) + return false; + } + + return true; +} + +/* + * Convert the list of ScanKey to the array, and append an emtpy ScanKey as + * the end flag of the array. + */ +static ScanKey +ScanKeyListToArray(List *keys, int *num) +{ + ScanKey sk; + + if (list_length(keys) == 0) + return NULL; + + Assert(num); + *num = list_length(keys); + + sk = (ScanKey)palloc(sizeof(ScanKeyData) * (*num + 1)); + for (int i = 0; i < *num; ++i) + memcpy(&sk[i], list_nth(keys, i), sizeof(ScanKeyData)); + + /* + * SK_EMPYT means the end of the array of the ScanKey + */ + sk[*num].sk_flags = SK_EMPYT; + + return sk; +} diff --git a/src/backend/utils/misc/guc_gp.c b/src/backend/utils/misc/guc_gp.c index 1995d91e757..5dd24aed045 100644 --- a/src/backend/utils/misc/guc_gp.c +++ b/src/backend/utils/misc/guc_gp.c @@ -453,6 +453,8 @@ bool gp_enable_global_deadlock_detector = false; bool gp_enable_predicate_pushdown; int gp_predicate_pushdown_sample_rows; +bool gp_enable_runtime_filter_pushdown; + bool enable_offload_entry_to_qe = false; bool enable_answer_query_using_materialized_views = false; bool aqumv_allow_foreign_table = false; @@ -3203,6 +3205,16 @@ struct config_bool ConfigureNamesBool_gp[] = NULL, NULL, NULL }, + { + {"gp_enable_runtime_filter_pushdown", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Try to push the hash table of hash join to the seqscan or AM as bloom filter."), + NULL + }, + &gp_enable_runtime_filter_pushdown, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL diff --git a/src/include/access/skey.h b/src/include/access/skey.h index 92b7d09fa45..2f533c03db8 100644 --- a/src/include/access/skey.h +++ b/src/include/access/skey.h @@ -122,6 +122,9 @@ typedef ScanKeyData *ScanKey; #define SK_SEARCHNOTNULL 0x0080 /* scankey represents "col IS NOT NULL" */ #define SK_ORDER_BY 0x0100 /* scankey is for ORDER BY op */ +/* for runtime filter */ +#define SK_EMPYT 0x8000 /* scankey is empty */ +#define SK_BLOOM_FILTER 0x4000 /* scankey is for bloom filter */ /* * prototypes for functions in access/common/scankey.c diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 5c166aa94ab..ea1bf508687 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -43,15 +43,16 @@ struct ValidateIndexState; /** * Flags represented the supported features of scan. - * + * * The first 8 bits are reserved for kernel expansion of some attributes, * and the remaining 24 bits are reserved for custom tableam. - * + * * If you add a new flag, make sure the flag's bit is consecutive with * the previous one. - * -*/ + * + */ #define SCAN_SUPPORT_COLUMN_ORIENTED_SCAN (1 << 0) /* support column-oriented scanning*/ +#define SCAN_SUPPORT_RUNTIME_FILTER (1 << 1) /* support runtime filter scan */ /* * Bitmask values for the flags argument to the scan_begin callback. diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 32ae92f64a3..8d79aaa3eed 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1544,6 +1544,10 @@ typedef struct SeqScanState { ScanState ss; /* its first field is NodeTag */ Size pscan_len; /* size of parallel heap scan descriptor */ + + List *filters; /* the list of struct ScanKeyData */ + bool filter_in_seqscan; /* check scan slot with runtime filters in + seqscan node or in am */ } SeqScanState; /* ---------------- @@ -3061,6 +3065,20 @@ typedef struct RuntimeFilterState bloom_filter *bf; } RuntimeFilterState; +typedef struct AttrFilter +{ + bool empty; /* empty filter or not */ + PlanState *target;/* the node in where runtime filter will be used, + target will be seqscan, see FindTargetAttr(). + in nodeHashjoin.c */ + AttrNumber rattno; /* attr no in hash node */ + AttrNumber lattno; /* if target is seqscan, attr no in relation */ + + bloom_filter *blm_filter; + Datum min; + Datum max; +} AttrFilter; + /* ---------------- * HashState information * ---------------- @@ -3096,6 +3114,8 @@ typedef struct HashState struct ParallelHashJoinState *parallel_state; Barrier *sync_barrier; + + List *filters; /* the list of AttrFilter */ } HashState; /* ---------------- diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index f9045dc8a22..02dce376898 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -639,6 +639,12 @@ extern bool gp_log_endpoints; extern bool gp_allow_date_field_width_5digits; +/* + * Try to push the hash table of hash join node down to the scan node as + * bloom filter for performance. + */ +extern bool gp_enable_runtime_filter_pushdown; + typedef enum { INDEX_CHECK_NONE, diff --git a/src/include/utils/sync_guc_name.h b/src/include/utils/sync_guc_name.h index 57ac3d89377..a20ed986dad 100644 --- a/src/include/utils/sync_guc_name.h +++ b/src/include/utils/sync_guc_name.h @@ -196,5 +196,6 @@ "work_mem", "gp_appendonly_insert_files", "gp_appendonly_insert_files_tuples_range", + "gp_enable_runtime_filter_pushdown", "gp_random_insert_segments", "zero_damaged_pages", diff --git a/src/test/regress/expected/gp_runtime_filter.out b/src/test/regress/expected/gp_runtime_filter.out index 16f893d465f..3394f1b533d 100644 --- a/src/test/regress/expected/gp_runtime_filter.out +++ b/src/test/regress/expected/gp_runtime_filter.out @@ -250,6 +250,60 @@ SELECT COUNT(*) FROM dim_rf 1600 (1 row) +-- Test bloom filter pushdown +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=128 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(9 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (never executed) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=1 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(9 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; -- Clean up: reset guc SET gp_enable_runtime_filter TO off; SET optimizer TO default; diff --git a/src/test/regress/sql/gp_runtime_filter.sql b/src/test/regress/sql/gp_runtime_filter.sql index 876575b78c3..110e118eee2 100644 --- a/src/test/regress/sql/gp_runtime_filter.sql +++ b/src/test/regress/sql/gp_runtime_filter.sql @@ -75,6 +75,40 @@ SELECT COUNT(*) FROM fact_rf SELECT COUNT(*) FROM dim_rf WHERE dim_rf.did IN (SELECT did FROM fact_rf) AND proj_id < 2; +-- Test bloom filter pushdown +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; + +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); + +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; + +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; + +ANALYZE; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; -- Clean up: reset guc SET gp_enable_runtime_filter TO off; From a7c899136d5ce7732f622067738fa37899a95136 Mon Sep 17 00:00:00 2001 From: zhangyue Date: Tue, 26 Nov 2024 12:37:45 +0800 Subject: [PATCH 02/12] Fix with code review. --- src/backend/executor/nodeHash.c | 7 +++-- src/backend/executor/nodeHashjoin.c | 48 ++++++++++++++--------------- src/backend/executor/nodeSeqscan.c | 3 -- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 602717fce1a..bf42ba015fa 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -4282,7 +4282,10 @@ ResetRuntimeFilter(HashState *node) attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows, work_mem, random()); - attr_filter->min = LLONG_MAX; - attr_filter->max = LLONG_MIN; + + StaticAssertDecl(sizeof(LONG_MAX) == sizeof(Datum), "sizeof(LONG_MAX) should be equal to sizeof(Datum)"); + StaticAssertDecl(sizeof(LONG_MIN) == sizeof(Datum), "sizeof(LONG_MIN) should be equal to sizeof(Datum)"); + attr_filter->min = LONG_MAX; + attr_filter->max = LONG_MIN; } } diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 6a3d6c95b26..e9803268540 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -167,7 +167,7 @@ static void ExecEagerFreeHashJoin(HashJoinState *node); static void CreateRuntimeFilter(HashJoinState* hjstate); static bool IsEqualOp(Expr *expr); static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno); -static PlanState *FindTargetAttr(HashJoinState *hjstate, +static PlanState *FindTargetNode(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno); static AttrFilter *CreateAttrFilter(PlanState *target, @@ -2224,7 +2224,7 @@ CreateRuntimeFilter(HashJoinState* hjstate) if (lattno < 1 || rattno < 1) continue; - target = FindTargetAttr(hjstate, lattno, &lattno); + target = FindTargetNode(hjstate, lattno, &lattno); if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) continue; Assert(IsA(target, SeqScanState)); @@ -2273,16 +2273,11 @@ static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) { Var *var; - bool match; List *args; - ListCell *lc; if (lattno == NULL || rattno == NULL) return false; - if (!IsA(expr, OpExpr) && !IsA(expr, FuncExpr)) - return false; - if (IsA(expr, OpExpr)) args = ((OpExpr *)expr)->args; else if (IsA(expr, FuncExpr)) @@ -2293,26 +2288,31 @@ CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) if (!args || list_length(args) != 2) return false; - match = false; - foreach (lc, args) - { - match = false; + /* check the first arg */ + if (!IsA(linitial(args), Var)) + return false; - if (!IsA(lfirst(lc), Var)) - break; + var = linitial(args); + if (var->varno == INNER_VAR) + *rattno = var->varattno; + else if (var->varno == OUTER_VAR) + *lattno = var->varattno; + else + return false; - var = lfirst(lc); - if (var->varno == INNER_VAR) - *rattno = var->varattno; - else if (var->varno == OUTER_VAR) - *lattno = var->varattno; - else - break; + /* check the second arg */ + if (!IsA(lsecond(args), Var)) + return false; - match = true; - } + var = lsecond(args); + if (var->varno == INNER_VAR) + *rattno = var->varattno; + else if (var->varno == OUTER_VAR) + *lattno = var->varattno; + else + return false; - return match; + return true; } /* @@ -2323,7 +2323,7 @@ CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) * SeqScan <- target */ static PlanState * -FindTargetAttr(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno) +FindTargetNode(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno) { Var *var; PlanState *child, *parent; diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 74510615804..f5db69f4fd3 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -103,9 +103,6 @@ SeqNext(SeqScanState *node) */ while (table_scan_getnextslot(scandesc, direction, slot)) { - if (TupIsNull(slot)) - return slot; - if (node->filter_in_seqscan && node->filters && !PassByBloomFilter(node, slot)) continue; From b7c50a6ed6d9926cdc329a1ef84faca0c449fd74 Mon Sep 17 00:00:00 2001 From: zhangyue Date: Sat, 30 Nov 2024 14:34:32 +0800 Subject: [PATCH 03/12] Support partition table. --- src/backend/executor/nodeHash.c | 5 +- src/backend/executor/nodeHashjoin.c | 115 ++++++++++++++---- .../regress/expected/gp_runtime_filter.out | 58 ++++++++- src/test/regress/sql/gp_runtime_filter.sql | 26 ++++ 4 files changed, 173 insertions(+), 31 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index bf42ba015fa..ddc4b8529c8 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -4280,8 +4280,9 @@ ResetRuntimeFilter(HashState *node) if (attr_filter->blm_filter) bloom_free(attr_filter->blm_filter); - attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows, - work_mem, random()); + attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows, + work_mem, + random()); StaticAssertDecl(sizeof(LONG_MAX) == sizeof(Datum), "sizeof(LONG_MAX) should be equal to sizeof(Datum)"); StaticAssertDecl(sizeof(LONG_MIN) == sizeof(Datum), "sizeof(LONG_MIN) should be equal to sizeof(Datum)"); diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index e9803268540..6c8ae45d7b5 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -167,9 +167,12 @@ static void ExecEagerFreeHashJoin(HashJoinState *node); static void CreateRuntimeFilter(HashJoinState* hjstate); static bool IsEqualOp(Expr *expr); static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno); -static PlanState *FindTargetNode(HashJoinState *hjstate, - AttrNumber attno, - AttrNumber *lattno); +static bool CheckTargetNode(PlanState *node, + AttrNumber attno, + AttrNumber *lattno); +static List *FindTargetNodes(HashJoinState *hjstate, + AttrNumber attno, + AttrNumber *lattno); static AttrFilter *CreateAttrFilter(PlanState *target, AttrNumber lattno, AttrNumber rattno, @@ -2187,9 +2190,9 @@ CreateRuntimeFilter(HashJoinState* hjstate) JoinType jointype; HashJoin *hj; HashState *hstate; - PlanState *target; AttrFilter *attr_filter; ListCell *lc; + List *targets; /* * Only applicatable for inner, right and semi join, @@ -2224,17 +2227,22 @@ CreateRuntimeFilter(HashJoinState* hjstate) if (lattno < 1 || rattno < 1) continue; - target = FindTargetNode(hjstate, lattno, &lattno); - if (lattno == -1 || target == NULL || IsA(target, HashJoinState)) + targets = FindTargetNodes(hjstate, lattno, &lattno); + if (lattno == -1 || targets == NULL) continue; - Assert(IsA(target, SeqScanState)); - attr_filter = CreateAttrFilter(target, lattno, rattno, - hstate->ps.plan->plan_rows); - if (attr_filter->blm_filter) - hstate->filters = lappend(hstate->filters, attr_filter); - else - pfree(attr_filter); + foreach(lc, targets) + { + PlanState *target = lfirst(lc); + Assert(IsA(target, SeqScanState)); + + attr_filter = CreateAttrFilter(target, lattno, rattno, + hstate->ps.plan->plan_rows); + if (attr_filter->blm_filter) + hstate->filters = lappend(hstate->filters, attr_filter); + else + pfree(attr_filter); + } } } @@ -2315,6 +2323,30 @@ CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) return true; } +static bool +CheckTargetNode(PlanState *node, AttrNumber attno, AttrNumber *lattno) +{ + Var *var; + TargetEntry *te; + + if (!IsA(node, SeqScanState)) + return false; + + te = (TargetEntry *)list_nth(node->plan->targetlist, attno - 1); + if (!IsA(te->expr, Var)) + return false; + + var = castNode(Var, te->expr); + + /* system column is not allowed */ + if (var->varattno <= 0) + return false; + + *lattno = var->varattno; + + return true; +} + /* * it's just allowed like this: * HashJoin @@ -2322,46 +2354,75 @@ CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno) * HashJoin * SeqScan <- target */ -static PlanState * -FindTargetNode(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno) +static List * +FindTargetNodes(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno) { Var *var; PlanState *child, *parent; TargetEntry *te; + List *targetNodes; parent = (PlanState *)hjstate; child = outerPlanState(hjstate); Assert(child); *lattno = -1; - while (child) + targetNodes = NIL; + while (true) { /* target is seqscan */ - if (IsA(child, SeqScanState)) + if ((IsA(parent, HashJoinState) || IsA(parent, ResultState)) && IsA(child, SeqScanState)) { - te = (TargetEntry *)list_nth(child->plan->targetlist, attno - 1); - if (!IsA(te->expr, Var)) + /* + * hashjoin + * seqscan + * or + * hashjoin + * result + * seqscan + */ + if (!CheckTargetNode(child, attno, lattno)) return NULL; - var = castNode(Var, te->expr); + targetNodes = lappend(targetNodes, child); + return targetNodes; + } + else if (IsA(parent, AppendState) && child == NULL) + { + /* + * append + * seqscan on t1_prt_1 + * seqscan on t1_prt_2 + * ... + */ + AppendState *as = castNode(AppendState, parent); + for (int i = 0; i < as->as_nplans; i++) + { + child = as->appendplans[i]; + if (!CheckTargetNode(child, attno, lattno)) + return NULL; - /* system column is not allowed */ - if (var->varattno <= 0) - return NULL; + targetNodes = lappend(targetNodes, child); + } - *lattno = var->varattno; - return child; + return targetNodes; } /* * hashjoin * result (hash filter) * seqscan on t1, t1 is replicated table + * or + * hashjoin + * append + * seqscan on t1_prt_1 + * seqscan on t1_prt_2 + * ... */ - if (!IsA(child, HashJoinState) && !IsA(child, ResultState)) + if (!IsA(child, HashJoinState) && !IsA(child, ResultState) && !IsA(child, AppendState)) return NULL; - /* child is hashjoin or result node */ + /* child is hashjoin, result or append node */ te = (TargetEntry *)list_nth(child->plan->targetlist, attno - 1); if (!IsA(te->expr, Var)) return NULL; diff --git a/src/test/regress/expected/gp_runtime_filter.out b/src/test/regress/expected/gp_runtime_filter.out index 3394f1b533d..d7f7bbf0c4f 100644 --- a/src/test/regress/expected/gp_runtime_filter.out +++ b/src/test/regress/expected/gp_runtime_filter.out @@ -251,6 +251,7 @@ SELECT COUNT(*) FROM dim_rf (1 row) -- Test bloom filter pushdown +-- case 1: join on distribution table and replicated table. DROP TABLE IF EXISTS t1; NOTICE: table "t1" does not exist, skipping DROP TABLE IF EXISTS t2; @@ -275,7 +276,7 @@ SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; QUERY PLAN ------------------------------------------------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) - -> Hash Join (never executed) + -> Hash Join (actual rows=0 loops=1) Hash Cond: (t1.c2 = t2.c2) Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. -> Seq Scan on t1 (actual rows=128 loops=1) @@ -291,7 +292,7 @@ SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; QUERY PLAN ------------------------------------------------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) - -> Hash Join (never executed) + -> Hash Join (actual rows=0 loops=1) Hash Cond: (t1.c2 = t2.c2) Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. -> Seq Scan on t1 (actual rows=1 loops=1) @@ -301,6 +302,59 @@ SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; Optimizer: Postgres query optimizer (9 rows) +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 2: join on partition table and replicated table. +CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2) (START (1) END (100) EVERY (50)); +CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED; +INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5); +INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51); +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1) + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of 524288 buckets. + -> Append (actual rows=608 loops=1) + Partition Selectors: $0 + -> Seq Scan on t1_1_prt_1 t1_1 (actual rows=288 loops=1) + -> Seq Scan on t1_1_prt_2 t1_2 (actual rows=320 loops=1) + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Partition Selector (selector id: $0) (actual rows=6 loops=1) + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Postgres query optimizer +(13 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1) + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of 524288 buckets. + -> Append (actual rows=64 loops=1) + Partition Selectors: $0 + -> Seq Scan on t1_1_prt_1 t1_1 (actual rows=48 loops=1) + -> Seq Scan on t1_1_prt_2 t1_2 (actual rows=16 loops=1) + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Partition Selector (selector id: $0) (actual rows=6 loops=1) + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Postgres query optimizer +(13 rows) + RESET gp_enable_runtime_filter_pushdown; DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; diff --git a/src/test/regress/sql/gp_runtime_filter.sql b/src/test/regress/sql/gp_runtime_filter.sql index 110e118eee2..83be20467a9 100644 --- a/src/test/regress/sql/gp_runtime_filter.sql +++ b/src/test/regress/sql/gp_runtime_filter.sql @@ -76,6 +76,7 @@ SELECT COUNT(*) FROM dim_rf WHERE dim_rf.did IN (SELECT did FROM fact_rf) AND proj_id < 2; -- Test bloom filter pushdown +-- case 1: join on distribution table and replicated table. DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); @@ -110,6 +111,31 @@ RESET gp_enable_runtime_filter_pushdown; DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; +-- case 2: join on partition table and replicated table. +CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2) (START (1) END (100) EVERY (50)); +CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED; +INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5); +INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51); +ANALYZE; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + +SET gp_enable_runtime_filter_pushdown TO on; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + -- Clean up: reset guc SET gp_enable_runtime_filter TO off; SET optimizer TO default; From 1d9c467d914dbf77d5880a39726257e1ab158ccd Mon Sep 17 00:00:00 2001 From: zhangyue Date: Sat, 30 Nov 2024 16:19:41 +0800 Subject: [PATCH 04/12] Add debug info in EXPLAIN ANALYZE. --- src/backend/commands/explain.c | 28 +++++++++++++++++++ src/backend/commands/explain_gp.c | 3 ++ src/backend/executor/nodeSeqscan.c | 3 ++ src/include/executor/instrument.h | 1 + src/include/nodes/execnodes.h | 5 ++++ .../regress/expected/gp_runtime_filter.out | 7 +++-- 6 files changed, 45 insertions(+), 2 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index dc5840cc79c..37982b3b896 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -137,6 +137,9 @@ static void show_incremental_sort_info(IncrementalSortState *incrsortstate, static void show_hash_info(HashState *hashstate, ExplainState *es); static void show_runtime_filter_info(RuntimeFilterState *rfstate, ExplainState *es); +static void show_pushdown_runtime_filter_info(const char *qlabel, + PlanState *planstate, + ExplainState *es); static void show_memoize_info(MemoizeState *mstate, List *ancestors, ExplainState *es); static void show_hashagg_info(AggState *hashstate, ExplainState *es); @@ -2415,6 +2418,9 @@ ExplainNode(PlanState *planstate, List *ancestors, /* fall through to print additional fields the same as SeqScan */ /* FALLTHROUGH */ case T_SeqScan: + show_pushdown_runtime_filter_info("Rows Removed by Pushdown Runtime Filter", + planstate, es); + /* FALLTHROUGH */ case T_DynamicSeqScan: case T_ValuesScan: case T_CteScan: @@ -4285,6 +4291,28 @@ show_instrumentation_count(const char *qlabel, int which, } } +/* + * If it's EXPLAIN ANALYZE, show instrumentation information with pushdown runtime filter + */ +static void +show_pushdown_runtime_filter_info(const char *qlabel, PlanState *planstate, ExplainState *es) +{ + double nfiltered; + + Assert(IsA(planstate, SeqScanState)); + + if (!es->analyze || !planstate->instrument) + return; + + nfiltered = planstate->instrument->nfilteredPRF; + + /* In text mode, suppress zero counts; they're not interesting enough */ + if (nfiltered > 0 || es->format != EXPLAIN_FORMAT_TEXT) + { + ExplainPropertyFloat(qlabel, NULL, nfiltered, 0, es); + } +} + /* * Show extra information for a ForeignScan node. */ diff --git a/src/backend/commands/explain_gp.c b/src/backend/commands/explain_gp.c index 35ca1c9787f..ae02134beb8 100644 --- a/src/backend/commands/explain_gp.c +++ b/src/backend/commands/explain_gp.c @@ -57,6 +57,7 @@ typedef struct CdbExplain_StatInst double nloops; /* # of run cycles for this node */ double nfiltered1; double nfiltered2; + double nfilteredPRF; double execmemused; /* executor memory used (bytes) */ double workmemused; /* work_mem actually used (bytes) */ double workmemwanted; /* work_mem to avoid workfile i/o (bytes) */ @@ -886,6 +887,7 @@ cdbexplain_collectStatsFromNode(PlanState *planstate, CdbExplain_SendStatCtx *ct si->nloops = instr->nloops; si->nfiltered1 = instr->nfiltered1; si->nfiltered2 = instr->nfiltered2; + si->nfilteredPRF = instr->nfilteredPRF; si->workmemused = instr->workmemused; si->workmemwanted = instr->workmemwanted; si->workfileCreated = instr->workfileCreated; @@ -1189,6 +1191,7 @@ cdbexplain_depositStatsToNode(PlanState *planstate, CdbExplain_RecvStatCtx *ctx) instr->nloops = nodeAcc->nsimax->nloops; instr->nfiltered1 = nodeAcc->nsimax->nfiltered1; instr->nfiltered2 = nodeAcc->nsimax->nfiltered2; + instr->nfilteredPRF = nodeAcc->nsimax->nfilteredPRF; instr->execmemused = nodeAcc->nsimax->execmemused; instr->workmemused = nodeAcc->nsimax->workmemused; instr->workmemwanted = nodeAcc->nsimax->workmemwanted; diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index f5db69f4fd3..fd4730a601b 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -420,7 +420,10 @@ PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot) blm_filter = (bloom_filter *)DatumGetPointer(sk->sk_argument); if (bloom_lacks_element(blm_filter, (unsigned char *)&val, sizeof(Datum))) + { + InstrCountFilteredPRF(node, 1); return false; + } } return true; diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index 78a81f709f6..c7044c853c3 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -94,6 +94,7 @@ typedef struct Instrumentation uint64 nloops; /* # of run cycles for this node */ double nfiltered1; /* # tuples removed by scanqual or joinqual */ double nfiltered2; /* # tuples removed by "other" quals */ + double nfilteredPRF; /* # tuples removed by pushdown runtime filter */ BufferUsage bufusage; /* Total buffer usage */ WalUsage walusage; /* total WAL usage */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 8d79aaa3eed..4cd1b237f01 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1186,6 +1186,11 @@ extern uint64 PlanStateOperatorMemKB(const PlanState *ps); if (((PlanState *)(node))->instrument) \ ((PlanState *)(node))->instrument->nfiltered2 += (delta); \ } while(0) +#define InstrCountFilteredPRF(node, delta) \ + do { \ + if (((PlanState *)(node))->instrument) \ + ((PlanState *)(node))->instrument->nfilteredPRF += (delta); \ + } while(0) /* * EPQState is state for executing an EvalPlanQual recheck on a candidate diff --git a/src/test/regress/expected/gp_runtime_filter.out b/src/test/regress/expected/gp_runtime_filter.out index d7f7bbf0c4f..0c3d8b47b07 100644 --- a/src/test/regress/expected/gp_runtime_filter.out +++ b/src/test/regress/expected/gp_runtime_filter.out @@ -296,11 +296,12 @@ SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; Hash Cond: (t1.c2 = t2.c2) Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. -> Seq Scan on t1 (actual rows=1 loops=1) + Rows Removed by Pushdown Runtime Filter: 127 -> Hash (actual rows=32 loops=1) Buckets: 524288 Batches: 1 Memory Usage: 4098kB -> Seq Scan on t2 (actual rows=32 loops=1) Optimizer: Postgres query optimizer -(9 rows) +(10 rows) RESET gp_enable_runtime_filter_pushdown; DROP TABLE IF EXISTS t1; @@ -347,13 +348,15 @@ SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; -> Append (actual rows=64 loops=1) Partition Selectors: $0 -> Seq Scan on t1_1_prt_1 t1_1 (actual rows=48 loops=1) + Rows Removed by Pushdown Runtime Filter: 240 -> Seq Scan on t1_1_prt_2 t1_2 (actual rows=16 loops=1) + Rows Removed by Pushdown Runtime Filter: 304 -> Hash (actual rows=6 loops=1) Buckets: 524288 Batches: 1 Memory Usage: 4097kB -> Partition Selector (selector id: $0) (actual rows=6 loops=1) -> Seq Scan on t2 (actual rows=6 loops=1) Optimizer: Postgres query optimizer -(13 rows) +(15 rows) RESET gp_enable_runtime_filter_pushdown; DROP TABLE IF EXISTS t1; From 669225849c3b33da3dfb5b9f3d62c513f8388953 Mon Sep 17 00:00:00 2001 From: zhangyue Date: Thu, 5 Dec 2024 20:30:36 +0800 Subject: [PATCH 05/12] Fix bug with explain and Add more message about join type. --- src/backend/commands/explain.c | 25 ++++----- src/backend/commands/explain_gp.c | 3 ++ src/backend/executor/nodeHashjoin.c | 16 ++++-- src/backend/executor/nodeSeqscan.c | 8 +++ src/include/executor/instrument.h | 1 + .../regress/expected/gp_runtime_filter.out | 53 +++++++++++++++++++ src/test/regress/sql/gp_runtime_filter.sql | 28 ++++++++++ 7 files changed, 115 insertions(+), 19 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 37982b3b896..097ec412f4a 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -2418,8 +2418,9 @@ ExplainNode(PlanState *planstate, List *ancestors, /* fall through to print additional fields the same as SeqScan */ /* FALLTHROUGH */ case T_SeqScan: - show_pushdown_runtime_filter_info("Rows Removed by Pushdown Runtime Filter", - planstate, es); + if (gp_enable_runtime_filter_pushdown && IsA(planstate, SeqScanState)) + show_pushdown_runtime_filter_info("Rows Removed by Pushdown Runtime Filter", + planstate, es); /* FALLTHROUGH */ case T_DynamicSeqScan: case T_ValuesScan: @@ -4292,25 +4293,21 @@ show_instrumentation_count(const char *qlabel, int which, } /* - * If it's EXPLAIN ANALYZE, show instrumentation information with pushdown runtime filter + * If it's EXPLAIN ANALYZE, show instrumentation information with pushdown + * runtime filter. */ static void -show_pushdown_runtime_filter_info(const char *qlabel, PlanState *planstate, ExplainState *es) +show_pushdown_runtime_filter_info(const char *qlabel, + PlanState *planstate, + ExplainState *es) { - double nfiltered; - - Assert(IsA(planstate, SeqScanState)); + Assert(gp_enable_runtime_filter_pushdown && IsA(planstate, SeqScanState)); if (!es->analyze || !planstate->instrument) return; - nfiltered = planstate->instrument->nfilteredPRF; - - /* In text mode, suppress zero counts; they're not interesting enough */ - if (nfiltered > 0 || es->format != EXPLAIN_FORMAT_TEXT) - { - ExplainPropertyFloat(qlabel, NULL, nfiltered, 0, es); - } + if (planstate->instrument->prf_work) + ExplainPropertyFloat(qlabel, NULL, planstate->instrument->nfilteredPRF, 0, es); } /* diff --git a/src/backend/commands/explain_gp.c b/src/backend/commands/explain_gp.c index ae02134beb8..537037f0e4c 100644 --- a/src/backend/commands/explain_gp.c +++ b/src/backend/commands/explain_gp.c @@ -57,6 +57,7 @@ typedef struct CdbExplain_StatInst double nloops; /* # of run cycles for this node */ double nfiltered1; double nfiltered2; + bool prf_work; double nfilteredPRF; double execmemused; /* executor memory used (bytes) */ double workmemused; /* work_mem actually used (bytes) */ @@ -887,6 +888,7 @@ cdbexplain_collectStatsFromNode(PlanState *planstate, CdbExplain_SendStatCtx *ct si->nloops = instr->nloops; si->nfiltered1 = instr->nfiltered1; si->nfiltered2 = instr->nfiltered2; + si->prf_work = instr->prf_work; si->nfilteredPRF = instr->nfilteredPRF; si->workmemused = instr->workmemused; si->workmemwanted = instr->workmemwanted; @@ -1191,6 +1193,7 @@ cdbexplain_depositStatsToNode(PlanState *planstate, CdbExplain_RecvStatCtx *ctx) instr->nloops = nodeAcc->nsimax->nloops; instr->nfiltered1 = nodeAcc->nsimax->nfiltered1; instr->nfiltered2 = nodeAcc->nsimax->nfiltered2; + instr->prf_work = nodeAcc->nsimax->prf_work; instr->nfilteredPRF = nodeAcc->nsimax->nfilteredPRF; instr->execmemused = nodeAcc->nsimax->execmemused; instr->workmemused = nodeAcc->nsimax->workmemused; diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 6c8ae45d7b5..8234b3207a3 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -2195,13 +2195,19 @@ CreateRuntimeFilter(HashJoinState* hjstate) List *targets; /* - * Only applicatable for inner, right and semi join, + * A build-side Bloom filter tells us if a row is definitely not in the build + * side. This allows us to early-eliminate rows or early-accept rows depending + * on the type of join. + * Left Outer Join and Full Outer Join output all rows, so a build-side Bloom + * filter would only allow us to early-output. Left Antijoin outputs only if + * there is no match, so again early output. We don't implement early output + * for now. + * So it's only applicatable for inner, right and semi join. */ jointype = hjstate->js.jointype; - if (jointype != JOIN_INNER - && jointype != JOIN_RIGHT - && jointype != JOIN_SEMI - ) + if (jointype != JOIN_INNER && + jointype != JOIN_RIGHT && + jointype != JOIN_SEMI) return; hstate = castNode(HashState, innerPlanState(hjstate)); diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index fd4730a601b..f5b71191667 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -408,6 +408,14 @@ PassByBloomFilter(SeqScanState *node, TupleTableSlot *slot) ListCell *lc; bloom_filter *blm_filter; + /* + * Mark that the pushdown runtime filter is actually taking effect. + */ + if (node->ss.ps.instrument && + !node->ss.ps.instrument->prf_work && + list_length(node->filters)) + node->ss.ps.instrument->prf_work = true; + foreach (lc, node->filters) { sk = lfirst(lc); diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index c7044c853c3..e3ade84bbc5 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -78,6 +78,7 @@ typedef struct Instrumentation bool need_bufusage; /* true if we need buffer usage data */ bool need_walusage; /* true if we need WAL usage data */ bool async_mode; /* true if node is in async mode */ + bool prf_work; /* true if pushdown runtime filters really work */ /* Info about current plan cycle: */ bool running; /* true if we've completed first tuple */ instr_time starttime; /* Start time of current iteration of node */ diff --git a/src/test/regress/expected/gp_runtime_filter.out b/src/test/regress/expected/gp_runtime_filter.out index 0c3d8b47b07..d8f2e9e43f1 100644 --- a/src/test/regress/expected/gp_runtime_filter.out +++ b/src/test/regress/expected/gp_runtime_filter.out @@ -358,6 +358,59 @@ SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; Optimizer: Postgres query optimizer (15 rows) +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 3: bug fix with explain +DROP TABLE IF EXISTS test_tablesample; +NOTICE: table "test_tablesample" does not exist, skipping +CREATE TABLE test_tablesample (dist int, id int, name text) WITH (fillfactor=10) DISTRIBUTED BY (dist); +INSERT INTO test_tablesample SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; +INSERT INTO test_tablesample SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; +INSERT INTO test_tablesample SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (COSTS OFF) SELECT id FROM test_tablesample TABLESAMPLE SYSTEM (50) REPEATABLE (2); + QUERY PLAN +-------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + -> Sample Scan on test_tablesample + Sampling: system ('50'::real) REPEATABLE ('2'::double precision) + Optimizer: Postgres query optimizer +(4 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS test_tablesample; +-- case 4: show debug info only when gp_enable_runtime_filter_pushdown is on +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Cloudberry Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +CREATE TABLE t2(c1 int, c2 int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Cloudberry Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +INSERT INTO t1 SELECT GENERATE_SERIES(1, 1000), GENERATE_SERIES(1, 1000); +INSERT INTO t2 SELECT * FROM t1; +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT count(t1.c2) FROM t1, t2 WHERE t1.c1 = t2.c1; + QUERY PLAN +--------------------------------------------------------------------------------------------------------- + Finalize Aggregate (actual rows=1 loops=1) + -> Gather Motion 3:1 (slice1; segments: 3) (actual rows=3 loops=1) + -> Partial Aggregate (actual rows=1 loops=1) + -> Hash Join (actual rows=340 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: (seg2) Hash chain length 1.0 avg, 1 max, using 340 of 524288 buckets. + -> Seq Scan on t1 (actual rows=340 loops=1) + Rows Removed by Pushdown Runtime Filter: 0 + -> Hash (actual rows=340 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4108kB + -> Seq Scan on t2 (actual rows=340 loops=1) + Optimizer: Postgres query optimizer +(12 rows) + RESET gp_enable_runtime_filter_pushdown; DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; diff --git a/src/test/regress/sql/gp_runtime_filter.sql b/src/test/regress/sql/gp_runtime_filter.sql index 83be20467a9..10262079901 100644 --- a/src/test/regress/sql/gp_runtime_filter.sql +++ b/src/test/regress/sql/gp_runtime_filter.sql @@ -136,6 +136,34 @@ RESET gp_enable_runtime_filter_pushdown; DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; +-- case 3: bug fix with explain +DROP TABLE IF EXISTS test_tablesample; +CREATE TABLE test_tablesample (dist int, id int, name text) WITH (fillfactor=10) DISTRIBUTED BY (dist); +INSERT INTO test_tablesample SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; +INSERT INTO test_tablesample SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; +INSERT INTO test_tablesample SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (COSTS OFF) SELECT id FROM test_tablesample TABLESAMPLE SYSTEM (50) REPEATABLE (2); +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS test_tablesample; + +-- case 4: show debug info only when gp_enable_runtime_filter_pushdown is on +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1(c1 int, c2 int); +CREATE TABLE t2(c1 int, c2 int); +INSERT INTO t1 SELECT GENERATE_SERIES(1, 1000), GENERATE_SERIES(1, 1000); +INSERT INTO t2 SELECT * FROM t1; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT count(t1.c2) FROM t1, t2 WHERE t1.c1 = t2.c1; +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + -- Clean up: reset guc SET gp_enable_runtime_filter TO off; SET optimizer TO default; From 0e4099bcf399e4f00e69842cfed7a241de6d9fd4 Mon Sep 17 00:00:00 2001 From: zhangyue Date: Tue, 10 Dec 2024 16:37:30 +0800 Subject: [PATCH 06/12] Remove code in MultiExecParallelHash(). --- src/backend/executor/nodeHash.c | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index ddc4b8529c8..94fdeb0b0d1 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -345,14 +345,7 @@ MultiExecParallelHash(HashState *node) slot = ExecProcNode(outerNode); if (TupIsNull(slot)) - { - if (gp_enable_runtime_filter_pushdown && node->filters) - PushdownRuntimeFilter(node); break; - } - - if (gp_enable_runtime_filter_pushdown && node->filters) - BuildRuntimeFilter(node, slot); econtext->ecxt_outertuple = slot; if (ExecHashGetHashValue(node, hashtable, econtext, hashkeys, From 590bbace97240bff08f57597e391db6ce9c15dcc Mon Sep 17 00:00:00 2001 From: zhangyue Date: Tue, 10 Dec 2024 22:38:49 +0800 Subject: [PATCH 07/12] Add test case with Hashjoin+result+seqscan. --- .../regress/expected/gp_runtime_filter.out | 53 +++++++++++++++++++ src/test/regress/sql/gp_runtime_filter.sql | 25 +++++++++ 2 files changed, 78 insertions(+) diff --git a/src/test/regress/expected/gp_runtime_filter.out b/src/test/regress/expected/gp_runtime_filter.out index d8f2e9e43f1..c87a5d81aca 100644 --- a/src/test/regress/expected/gp_runtime_filter.out +++ b/src/test/regress/expected/gp_runtime_filter.out @@ -411,6 +411,59 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT count(t1.c2) FROM t Optimizer: Postgres query optimizer (12 rows) +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 5: hashjoin + result + seqsacn +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Cloudberry Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +INSERT INTO t1 VALUES (5,5,5,5,5), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +ANALYZE; +SET optimizer TO on; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=32 loops=1) + -> Hash Join (actual rows=32 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: (seg0) Hash chain length 2.0 avg, 2 max, using 3 of 524288 buckets. + -> Result (actual rows=16 loops=1) + -> Seq Scan on t1 (actual rows=24 loops=1) + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Pivotal Optimizer (GPORCA) +(10 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=32 loops=1) + -> Hash Join (actual rows=32 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: (seg0) Hash chain length 2.0 avg, 2 max, using 3 of 524288 buckets. + -> Result (actual rows=16 loops=1) + -> Seq Scan on t1 (actual rows=16 loops=1) + Rows Removed by Pushdown Runtime Filter: 8 + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Pivotal Optimizer (GPORCA) +(11 rows) + RESET gp_enable_runtime_filter_pushdown; DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; diff --git a/src/test/regress/sql/gp_runtime_filter.sql b/src/test/regress/sql/gp_runtime_filter.sql index 10262079901..8eab4819d25 100644 --- a/src/test/regress/sql/gp_runtime_filter.sql +++ b/src/test/regress/sql/gp_runtime_filter.sql @@ -164,6 +164,31 @@ RESET gp_enable_runtime_filter_pushdown; DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; +-- case 5: hashjoin + result + seqsacn +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)); +INSERT INTO t1 VALUES (5,5,5,5,5), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +ANALYZE; + +SET optimizer TO on; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + -- Clean up: reset guc SET gp_enable_runtime_filter TO off; SET optimizer TO default; From bcccfb46a524b0bc11155ea89b41b8d6ae590c99 Mon Sep 17 00:00:00 2001 From: zhangyue Date: Tue, 10 Dec 2024 23:12:43 +0800 Subject: [PATCH 08/12] Add test case with hashjoin+hashjoin+seqscan. --- .../regress/expected/gp_runtime_filter.out | 69 +++++++++++++++++++ src/test/regress/sql/gp_runtime_filter.sql | 32 +++++++++ 2 files changed, 101 insertions(+) diff --git a/src/test/regress/expected/gp_runtime_filter.out b/src/test/regress/expected/gp_runtime_filter.out index c87a5d81aca..6ae6a189b5d 100644 --- a/src/test/regress/expected/gp_runtime_filter.out +++ b/src/test/regress/expected/gp_runtime_filter.out @@ -467,6 +467,75 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 W RESET gp_enable_runtime_filter_pushdown; DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; +-- case 6: hashjoin + hashjoin + seqscan +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +DROP TABLE IF EXISTS t3; +NOTICE: table "t3" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t3(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +INSERT INTO t1 VALUES (1,1,1,1,1), (2,2,2,2,2), (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t3 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t3 select * FROM t3; +ANALYZE; +SET optimizer TO off; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + QUERY PLAN +--------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) (actual rows=256 loops=1) + -> Hash Join (actual rows=256 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: Hash chain length 4.0 avg, 4 max, using 4 of 32768 buckets. + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t3.c2) + Extra Text: Hash chain length 2.0 avg, 2 max, using 4 of 32768 buckets. + -> Seq Scan on t1 (actual rows=48 loops=1) + -> Hash (actual rows=8 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 258kB + -> Seq Scan on t3 (actual rows=8 loops=1) + -> Hash (actual rows=16 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 260kB + -> Seq Scan on t2 (actual rows=16 loops=1) + Optimizer: Postgres query optimizer +(15 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + QUERY PLAN +--------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) (actual rows=256 loops=1) + -> Hash Join (actual rows=256 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: Hash chain length 4.0 avg, 4 max, using 4 of 32768 buckets. + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t3.c2) + Extra Text: Hash chain length 2.0 avg, 2 max, using 4 of 32768 buckets. + -> Seq Scan on t1 (actual rows=32 loops=1) + Rows Removed by Pushdown Runtime Filter: 16 + -> Hash (actual rows=8 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 258kB + -> Seq Scan on t3 (actual rows=8 loops=1) + -> Hash (actual rows=16 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 260kB + -> Seq Scan on t2 (actual rows=16 loops=1) + Optimizer: Postgres query optimizer +(16 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +DROP TABLE IF EXISTS t3; -- Clean up: reset guc SET gp_enable_runtime_filter TO off; SET optimizer TO default; diff --git a/src/test/regress/sql/gp_runtime_filter.sql b/src/test/regress/sql/gp_runtime_filter.sql index 8eab4819d25..f4cc770881b 100644 --- a/src/test/regress/sql/gp_runtime_filter.sql +++ b/src/test/regress/sql/gp_runtime_filter.sql @@ -189,6 +189,38 @@ RESET gp_enable_runtime_filter_pushdown; DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; +-- case 6: hashjoin + hashjoin + seqscan +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +DROP TABLE IF EXISTS t3; +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t3(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +INSERT INTO t1 VALUES (1,1,1,1,1), (2,2,2,2,2), (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t3 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t3 select * FROM t3; +ANALYZE; + +SET optimizer TO off; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +DROP TABLE IF EXISTS t3; + -- Clean up: reset guc SET gp_enable_runtime_filter TO off; SET optimizer TO default; From 2dd81f87e5371890d84f77c1838c24caf5c310a8 Mon Sep 17 00:00:00 2001 From: zhangyue Date: Sun, 15 Dec 2024 20:02:26 +0800 Subject: [PATCH 09/12] Refactor SeqNext() for more efficient and readable. --- src/backend/executor/nodeSeqscan.c | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index f5b71191667..2beaa15c948 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -101,14 +101,22 @@ SeqNext(SeqScanState *node) /* * get the next tuple from the table */ - while (table_scan_getnextslot(scandesc, direction, slot)) + if (node->filter_in_seqscan && node->filters) { - if (node->filter_in_seqscan && node->filters && - !PassByBloomFilter(node, slot)) - continue; + while (table_scan_getnextslot(scandesc, direction, slot)) + { + if (!PassByBloomFilter(node, slot)) + continue; - return slot; + return slot; + } } + else + { + if (table_scan_getnextslot(scandesc, direction, slot)) + return slot; + } + return NULL; } From d5ede1de3ed3789ca52301e46835e7a987d96656 Mon Sep 17 00:00:00 2001 From: zhangyue Date: Sun, 15 Dec 2024 22:40:59 +0800 Subject: [PATCH 10/12] Add StaticAssert to check the size of LONG_MIN, LONG_MAX and Datum. --- src/backend/executor/nodeHashjoin.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 8234b3207a3..5119b9b1a93 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -2457,9 +2457,13 @@ CreateAttrFilter(PlanState *target, AttrNumber lattno, AttrNumber rattno, attr_filter->lattno = lattno; attr_filter->rattno = rattno; - attr_filter->blm_filter = bloom_create_aggresive(plan_rows, work_mem, random()); - attr_filter->min = LLONG_MAX; - attr_filter->max = LLONG_MIN; + + attr_filter->blm_filter = bloom_create_aggresive(plan_rows, work_mem, random()); + + StaticAssertDecl(sizeof(LONG_MAX) == sizeof(Datum), "sizeof(LONG_MAX) should be equal to sizeof(Datum)"); + StaticAssertDecl(sizeof(LONG_MIN) == sizeof(Datum), "sizeof(LONG_MIN) should be equal to sizeof(Datum)"); + attr_filter->min = LONG_MAX; + attr_filter->max = LONG_MIN; return attr_filter; } From c5a6fd2e1a63fc9bd1786309cab4861886d0d581 Mon Sep 17 00:00:00 2001 From: zhangyue Date: Sun, 26 Jan 2025 14:19:45 +0800 Subject: [PATCH 11/12] Fix bug with rescan. --- src/backend/executor/nodeSeqscan.c | 25 +++++++++++++++++++++---- src/include/nodes/execnodes.h | 2 ++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 2beaa15c948..808fcb5835c 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -75,7 +75,6 @@ SeqNext(SeqScanState *node) if (scandesc == NULL) { - int nkeys = 0; ScanKey keys = NULL; /* @@ -84,7 +83,7 @@ SeqNext(SeqScanState *node) * AM. */ if (gp_enable_runtime_filter_pushdown && !node->filter_in_seqscan) - keys = ScanKeyListToArray(node->filters, &nkeys); + keys = ScanKeyListToArray(node->filters, &node->num_scan_keys); /* * We reach here if the scan is not parallel, or if we're serially @@ -92,7 +91,7 @@ SeqNext(SeqScanState *node) */ scandesc = table_beginscan_es(node->ss.ss_currentRelation, estate->es_snapshot, - nkeys, keys, + node->num_scan_keys, keys, NULL, &node->ss.ps); node->ss.ss_currentScanDesc = scandesc; @@ -283,12 +282,27 @@ void ExecReScanSeqScan(SeqScanState *node) { TableScanDesc scan; + ScanKey keys; scan = node->ss.ss_currentScanDesc; + /* + * Clear all the pushdown scan keys. + */ + keys = NULL; + if (node->num_scan_keys) + { + keys = (ScanKey)palloc(sizeof(ScanKeyData) * node->num_scan_keys); + for (int i = 0; i < node->num_scan_keys; ++i) + keys[i].sk_flags = SK_EMPYT; + } + if (scan != NULL) table_rescan(scan, /* scan desc */ - NULL); /* new scan keys */ + keys); /* new scan keys */ + + if (keys) + pfree(keys); ExecScanReScan((ScanState *) node); } @@ -455,7 +469,10 @@ ScanKeyListToArray(List *keys, int *num) ScanKey sk; if (list_length(keys) == 0) + { + *num = 0; return NULL; + } Assert(num); *num = list_length(keys); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 4cd1b237f01..782c9f7ed4a 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1553,6 +1553,8 @@ typedef struct SeqScanState List *filters; /* the list of struct ScanKeyData */ bool filter_in_seqscan; /* check scan slot with runtime filters in seqscan node or in am */ + int num_scan_keys; /* valid if filter_in_seqscan is false, + number of pushdown scan keys */ } SeqScanState; /* ---------------- From e607900d9d143ff66648c721bf44db8026a64de4 Mon Sep 17 00:00:00 2001 From: zhangyue Date: Sun, 26 Jan 2025 21:48:16 +0800 Subject: [PATCH 12/12] Fix test case. --- .../regress/expected/gp_runtime_filter.out | 24 ++++++++++++------- src/test/regress/sql/gp_runtime_filter.sql | 22 +++++++++++------ 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/src/test/regress/expected/gp_runtime_filter.out b/src/test/regress/expected/gp_runtime_filter.out index 6ae6a189b5d..ca5ddefc7c3 100644 --- a/src/test/regress/expected/gp_runtime_filter.out +++ b/src/test/regress/expected/gp_runtime_filter.out @@ -1,3 +1,7 @@ +-- start_matchignore +-- m/^.*Extra Text:.*/ +-- m/^.*Buckets:.*/ +-- end_matchignore -- Disable ORCA SET optimizer TO off; -- Test Suit 1: runtime filter main case @@ -251,6 +255,7 @@ SELECT COUNT(*) FROM dim_rf (1 row) -- Test bloom filter pushdown +SET enable_parallel TO off; -- case 1: join on distribution table and replicated table. DROP TABLE IF EXISTS t1; NOTICE: table "t1" does not exist, skipping @@ -362,24 +367,24 @@ RESET gp_enable_runtime_filter_pushdown; DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; -- case 3: bug fix with explain -DROP TABLE IF EXISTS test_tablesample; -NOTICE: table "test_tablesample" does not exist, skipping -CREATE TABLE test_tablesample (dist int, id int, name text) WITH (fillfactor=10) DISTRIBUTED BY (dist); -INSERT INTO test_tablesample SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; -INSERT INTO test_tablesample SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; -INSERT INTO test_tablesample SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; +DROP TABLE IF EXISTS test_tablesample1; +NOTICE: table "test_tablesample1" does not exist, skipping +CREATE TABLE test_tablesample1 (dist int, id int, name text) WITH (fillfactor=10) DISTRIBUTED BY (dist); +INSERT INTO test_tablesample1 SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; SET gp_enable_runtime_filter_pushdown TO on; -EXPLAIN (COSTS OFF) SELECT id FROM test_tablesample TABLESAMPLE SYSTEM (50) REPEATABLE (2); +EXPLAIN (COSTS OFF) SELECT id FROM test_tablesample1 TABLESAMPLE SYSTEM (50) REPEATABLE (2); QUERY PLAN -------------------------------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) - -> Sample Scan on test_tablesample + -> Sample Scan on test_tablesample1 Sampling: system ('50'::real) REPEATABLE ('2'::double precision) Optimizer: Postgres query optimizer (4 rows) RESET gp_enable_runtime_filter_pushdown; -DROP TABLE IF EXISTS test_tablesample; +DROP TABLE IF EXISTS test_tablesample1; -- case 4: show debug info only when gp_enable_runtime_filter_pushdown is on DROP TABLE IF EXISTS t1; NOTICE: table "t1" does not exist, skipping @@ -536,6 +541,7 @@ RESET gp_enable_runtime_filter_pushdown; DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; DROP TABLE IF EXISTS t3; +RESET enable_parallel; -- Clean up: reset guc SET gp_enable_runtime_filter TO off; SET optimizer TO default; diff --git a/src/test/regress/sql/gp_runtime_filter.sql b/src/test/regress/sql/gp_runtime_filter.sql index f4cc770881b..fc1fe487745 100644 --- a/src/test/regress/sql/gp_runtime_filter.sql +++ b/src/test/regress/sql/gp_runtime_filter.sql @@ -1,3 +1,7 @@ +-- start_matchignore +-- m/^.*Extra Text:.*/ +-- m/^.*Buckets:.*/ +-- end_matchignore -- Disable ORCA SET optimizer TO off; @@ -76,6 +80,8 @@ SELECT COUNT(*) FROM dim_rf WHERE dim_rf.did IN (SELECT did FROM fact_rf) AND proj_id < 2; -- Test bloom filter pushdown +SET enable_parallel TO off; + -- case 1: join on distribution table and replicated table. DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; @@ -137,17 +143,17 @@ DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; -- case 3: bug fix with explain -DROP TABLE IF EXISTS test_tablesample; -CREATE TABLE test_tablesample (dist int, id int, name text) WITH (fillfactor=10) DISTRIBUTED BY (dist); -INSERT INTO test_tablesample SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; -INSERT INTO test_tablesample SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; -INSERT INTO test_tablesample SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; +DROP TABLE IF EXISTS test_tablesample1; +CREATE TABLE test_tablesample1 (dist int, id int, name text) WITH (fillfactor=10) DISTRIBUTED BY (dist); +INSERT INTO test_tablesample1 SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; SET gp_enable_runtime_filter_pushdown TO on; -EXPLAIN (COSTS OFF) SELECT id FROM test_tablesample TABLESAMPLE SYSTEM (50) REPEATABLE (2); +EXPLAIN (COSTS OFF) SELECT id FROM test_tablesample1 TABLESAMPLE SYSTEM (50) REPEATABLE (2); RESET gp_enable_runtime_filter_pushdown; -DROP TABLE IF EXISTS test_tablesample; +DROP TABLE IF EXISTS test_tablesample1; -- case 4: show debug info only when gp_enable_runtime_filter_pushdown is on DROP TABLE IF EXISTS t1; @@ -221,6 +227,8 @@ DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; DROP TABLE IF EXISTS t3; +RESET enable_parallel; + -- Clean up: reset guc SET gp_enable_runtime_filter TO off; SET optimizer TO default;