Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8920] Optimized SerDe costs of Flink write, simple bucket and non bucket cases #12796

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

geserdugarov
Copy link
Contributor

@geserdugarov geserdugarov commented Feb 6, 2025

Change Logs

Changes in Flink stream write into Hudi table corresponding to RFC #12697. Here simple bucket index and non bucket cases are implemented. The only remaining work to do is to support consistent hashing and bounded context:
DataStream optimization progress - 1

Main points:

  • HoodieFlinkRecord is introduced. It doesn't extend HoodieRecord because we need to create data structure with Flink row data and Hudi metadata, constructed from Flink internal data types.
  • HoodieFlinkRecord is effective for Flink processing due to implemented HoodieFlinkRecordTypeInfo and HoodieFlinkRecordSerializer with custom serialize and deserialize methods.
  • We doesn't rewrite classes used in Flink write pipelines to new optimized ones because we want to save previous behavior. Therefore, new behavior could be turned on by write.fast.mode configuration, which is turned off by default. After proper testing we could turn it on by default, then deprecate previous behavior, and refactor all classes after drop of previous behavior.

Benchmark description

Lineitem table from TPC-H benchmark was used. 60 mln rows, from which 20 mln rows are unique.

Perfomance estimation results

current with Kryo HoodieFlinkRecord Optimization
Non bucket
Data passed, GB 43.9 29.3 33.3%
Total time, s 578 384 33.6%
Simple bucket index
Data passed, GB 19.4 13.6 29.9%
Total time, s 297 236 20.5%

Flink operators

Non bucket case:
1 operators - non bucket - 3 merged

Simple bucket case:
1 operators - simple bucket - 3 merged

Impact

Flink write performance improvement.

Risk level (write none, low medium or high below)

Low

Documentation Update

After merge

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Feb 6, 2025
@geserdugarov geserdugarov changed the title [HUDI-8946] [HUDI-8921] Optimized SerDe costs of Flink write, simple bucket and non bucket cases [HUDI-8920] Optimized SerDe costs of Flink write, simple bucket and non bucket cases Feb 6, 2025
@geserdugarov geserdugarov force-pushed the master-serde-non-bucket branch 2 times, most recently from 68028fc to 9377d36 Compare February 6, 2025 16:28
.booleanType()
.defaultValue(false)
.withDescription("Optimized Flink write into Hudi table, which uses customized serialization/deserialization. "
+ "Note, that only SIMPLE BUCKET index is supported for now.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR's title says "simple bucket and non bucket cases"

Copy link
Contributor Author

@geserdugarov geserdugarov Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed it. Thanks! Fixed in 5a81536.

@geserdugarov
Copy link
Contributor Author

geserdugarov commented Feb 11, 2025

@danny0405 , @xiarixiaoyao, @yuzhaojing, @wombatu-kun , hi!
If you don't mind and would have time, could you, please, review this PR related to corresponding RFC #12697.

Actually, the main part of proposed changes has been done in this PR. The only missed part for now is consistent hashing support (in progress) and bounded context (will check it next).

I've also finished testing.

Error:  testScheduleSplitPlan  Time elapsed: 0.034 s  <<< ERROR!
org.apache.hudi.exception.HoodieNotSupportedException: Currently, consistent hashing is not supported with enabled 'write.fast.mode'
	at org.apache.hudi.sink.cluster.ITTestFlinkConsistentHashingClustering.prepareData(ITTestFlinkConsistentHashingClustering.java:126)
	at org.apache.hudi.sink.cluster.ITTestFlinkConsistentHashingClustering.testScheduleSplitPlan(ITTestFlinkConsistentHashingClustering.java:79)

Error:  testScheduleMergePlan  Time elapsed: 0.027 s  <<< ERROR!
org.apache.hudi.exception.HoodieNotSupportedException: Currently, consistent hashing is not supported with enabled 'write.fast.mode'
	at org.apache.hudi.sink.cluster.ITTestFlinkConsistentHashingClustering.prepareData(ITTestFlinkConsistentHashingClustering.java:126)
	at org.apache.hudi.sink.cluster.ITTestFlinkConsistentHashingClustering.testScheduleMergePlan(ITTestFlinkConsistentHashingClustering.java:104)

These errors are related to not supported consistent hashing yet.
All other cases are successfully passed.

… Flink write, non bucket and simple bucket index
@geserdugarov geserdugarov force-pushed the master-serde-non-bucket branch from 04a23b3 to cb090d8 Compare February 12, 2025 13:58
@geserdugarov
Copy link
Contributor Author

I've squashed all commits into one, cb090d8, and renamed HoodieFlinkRecord into HoodieFlinkInternalRow to prevent confusion, because this class doesn't extend HoodieRecord.

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:XL PR with lines of changes > 1000
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants