Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable conditional writes in connector #814

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

subhashiyer9
Copy link
Member

@subhashiyer9 subhashiyer9 commented Jan 23, 2025

Problem

Solution

  • Adds a config to enable conditional writes, the default being true
  • The conditional write will be enabled only when scheduled interval is set and store kafka key/headers config is disabled to retain the existing delivery semantics
  • On a conditional write exception, a FileExistsException is rethrown , which being a retriable exception will reset the offset and poll again accordingly
  • In case of FileExistsException, connector will scan the s3 bucket for next available file which is not present in S3 by making consecutive get object requests. If the get api fails with 403 error, it will simply increment to next offset.
  • To enable determining the next available file, the equivalent target file name is cached for initial few records, had the record been the first record in the file. The offset will be reset to the one whose equivalent file does not exist in S3
  • Use LinkedHashMap to store commitFiles instead of existing HashMap to ensure files with smaller start offsets (i.e. the ones which were added to the map first) are written to S3 first. This is required to ensure we safely increment to next offset in case of conflict
Does this solution apply anywhere else?
  • yes
  • no
If yes, where?

Test Strategy

Testing done:
  • Unit tests
  • Integration tests
  • System tests
  • Manual tests
  • Performance tests - TODO
  • Chaos tests - TODO

Release Plan

Conditional write is enabled for only connectors running with at least once semantics using scheduled rotation.
The behaviour can be controlled using a configuration property.
When an upload fails due to file being present in s3, the connector scans for the next available file name in s3 which the connector can commit to and resets the offset to same
@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant