-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: extract & insert sidecar batches in replay
's action iterator
#679
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #679 +/- ##
==========================================
+ Coverage 84.09% 84.24% +0.15%
==========================================
Files 77 77
Lines 17805 18348 +543
Branches 17805 18348 +543
==========================================
+ Hits 14973 15458 +485
+ Misses 2117 2113 -4
- Partials 715 777 +62 ☔ View full report in Codecov by Sentry. |
b2c5001
to
00af1f9
Compare
let sidecar_files: Result<Vec<_>, _> = visitor | ||
.sidecars | ||
.iter() | ||
.map(|sidecar| Self::sidecar_to_filemeta(sidecar, &log_root)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if sidecar_to_file_meta could be a closure. We only use this once.
let sidecar_to_filemeta = |sidecar| {
let location = log_root.join("_sidecars/")?.join(&sidecar.path)?;
Ok(FileMeta {
location,
last_modified: sidecar.modification_time,
size: sidecar.size_in_bytes as usize,
})
}
And then map sidecar
visitor
.sidecars
.iter()
.map(sidecar_to_filemeta)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Give it a shot and see how it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's a good idea to leave it as a separate function for unit testing purposes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say either keep the separate function (if needed for testing) or embed the logic directly in the map
call? What purpose does a separately named closure serve?
(aside: not sure if cargo fmt will like my indentation choice above -- depends on whether the (
or {
is more important)
} | ||
|
||
fn process_single_checkpoint_batch( | ||
parquet_handler: Arc<dyn ParquetHandler>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iirc, we want to avoid passing handlers around. Only reference to the engine. I think it's because we want to make it clear that the handler is tied to the engine and not to encourage holding an Arc ref to the handler.
cc @zachschuermann to double check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall running into lifetime issues when passing the entire engine. I believe we would have to explicitly tie the iterator's lifetime to that of the engine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also pass an Arc.
Basically the iterator needs to hold a reference for the entire duration it's lazily evaluating. So you want to give it a reference it can hold for a long time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this change extends all the way to changing the scan_data
function signature to explicitly tie the engines lifetime to the iterator.
pub fn scan_data<'a>(
&self,
engine: &'a dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanData>> + 'a> {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mmm, so the basic issue is that we have delayed reading of parquet files, so at some point we want an item off the iterator, and to produce it, we need to read some parquet, so we need a handler. Previously we could do all the read calls up front and then just map off that iterator, so we didn't need an engine ref plumbed through.
I think if this is all internal, i.e., we don't want to expose any of these function signatures to engines (especially in the FFI), then cloning the Arc
s is fine (it's very cheap. as a suggestion we usually put // cheap arc clone
at those clone sites to make it clear).
If we do want to ever expose this, we'll need to think more, but afaict, we don't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the look @nicklan, just to confirm we will go ahead and clone the parquet handler
kernel/src/log_segment.rs
Outdated
} | ||
|
||
// Helper function to read checkpoint files based on the file type | ||
fn read_checkpoint_files( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also wonder if this could be inlined where it's used instead of having an entire method in log segment that's only used once. maybe hold off on moving this till we get a second opinion from ryan, zach or nick.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree... the call site + function signature are more lines of code than the actual code, because of all the state that has to be passed. Not worth it.
// We read checkpoint batches with the sidecar action. This results in empty paths | ||
// if a row is not a sidecar action. We do not want to create a sidecar action for | ||
// these rows. | ||
if path.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks wrong. Are we mishandling column nullability somewhere, that can cause empty strings to be returned instead of NULL?
It seems like this sort of issue has shown up a few times recently -- do we have a lurking bug somewhere? or is null handling just error-prone in general?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I encountered Sidecar actions with empty strings for path
when running test_create_checkpoint_stream_reads_parquet_checkpoint_batch
at first. I believe it is because of the way I am creating the dummy checkpoint batch (beginning with a json string, specifically add_batch_simple()
). The fix is not obvious to me though...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For more context, when I create a dummy engine data batch from a json string (without sidecar actions) with the SyncEngine
's json handler:
pub(crate) fn parse_json( |
and the sidecar action is included in the output_schema, I find the above error case. When the sidecar action is not included in the output_schema to the test util, the sidecar column handles nullability correctly. Does this seem like an issue with the
SyncEngine
's json handler's core functionality @scovich? This isn't an area I'm deeply familiar with but I'll look into it, my investigation might take a bit longer.
Below is the json string which is converted to engine data with string_array_to_engine_data
, which is finally passed to the sync engine.
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code has disappeared in the latest version, did you figure out a fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found a fix for the test case which created batches that had this weird empty-string Sidecar path field.
However, I am concerned that there may be something wrong with the SyncEngine
's json handler's functionality as it allowed me to create this malformed batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, how was the batch malformed (= physically invalid)?
It seemed like the test was simply passing an empty string, which is schema-compatible. Arrow has no way to know that Delta puts additional constraints on the field value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion, the test case this issue originated from was: test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_sidecars
I've left a comment below with more context
kernel/src/log_segment.rs
Outdated
// If sidecars files exist, read the sidecar files and return the iterator of sidecar batches | ||
// to replace the checkpoint batch in the top level iterator | ||
Ok(Right(parquet_handler.read_parquet_files( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is subtle -- replacing the top-level means all non-file actions will be lost. This is only safe correct if all checkpoint scans are exclusively requesting adds or exclusively requesting non-file actions. I'm pretty sure it will break our inspect-table
example that visits all actions during log replay.
We would either need to keep returning the top-level actions unconditionally (safer) or inspect the read schema to see whether we need non-file actions. Simper feels (a lot) safer to me, and seems unlikely to cause any measurable performance hit -- each checkpoint part has thousands of actions, vs. dozens in the top level. manifest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the above, we might be able to eliminate multiple left/right use sites, by careful management of type signatures. Conceptually, we would always do a map over the top level checkpoint iterator, producing the following output:
let sidecar_content = Self::process_sidecars(top_level_batch, ...); // returns Option<impl Iterator>
std::iter::once(top_level_batch).chain(sidecar_content.into_iter().flatten())
We could pass a flag into process_sidecars
that short circuits it to None
(pretending no sidecars were found), or we could just cheat and do a spurious map call, just to get the correct signature:
std::iter::once(top_level_batch).chain(None.into_iter().flatten())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh... compiler didn't like my toy example that did the spurious map
call...
note: no two closures, even if identical, have the same type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the catch @scovich I was operating under the assumption that
all checkpoint scans are exclusively requesting adds or exclusively requesting non-file actions
would be always true. But keeping the top-level actions unconditionally feels like a much safer approach. I'll move forward with this and have noted this decision in the design doc.
replay
's action iterator when necessary
replay
's action iterator when necessaryreplay
's action iterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shape of the PR looks good. A few questions and nits (and waiting for it to exit "draft" status)
// We read checkpoint batches with the sidecar action. This results in empty paths | ||
// if a row is not a sidecar action. We do not want to create a sidecar action for | ||
// these rows. | ||
if path.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code has disappeared in the latest version, did you figure out a fix?
kernel/src/log_segment.rs
Outdated
fn sidecar_to_filemeta(sidecar: &Sidecar, log_root: &Url) -> DeltaResult<FileMeta> { | ||
// If sidecar.path is relative (does not contain "://"), require that it is | ||
// just a file name. This will catch cases like "test/test/example.parquet". | ||
if !sidecar.path.contains("://") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this raw string search safe? At some point it's probably quicker to just create the Path
and check whether is_relative
and whether the components
iterator has more than one entry?
But meanwhile -- does the Delta spec actually forbid subdirectories in the first place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agh I think you might be right.
Because sidecar files must always reside in the table's own _delta_log/_sidecars directory, implementations are encouraged to store only the file's name (without scheme or parent directories).
I don't think there is an explicit mention that subdirectories are forbidden, only a recommendation. I will go ahead and update the logic here
kernel/src/log_segment.rs
Outdated
skip_sidecar_search, | ||
)?; | ||
|
||
Ok(std::iter::once(Ok((checkpoint_batch, false))).chain( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you really wanted to get fancy, could do:
let top_iterable = need_nonfile_actions.then(|| Ok((checkpoint_batch, false)));
Ok(sidecar_content...chain(top_iterable))
... where need_nonfile_actions
comes from a schema test, similar but opposite to the skip_sidecar_search
. But it's probably not worth the complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I'd like to move forward with unconditionally including the checkpoint batch for simplicities sake
let sidecar_files: Result<Vec<_>, _> = visitor | ||
.sidecars | ||
.iter() | ||
.map(|sidecar| Self::sidecar_to_filemeta(sidecar, &log_root)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say either keep the separate function (if needed for testing) or embed the logic directly in the map
call? What purpose does a separately named closure serve?
(aside: not sure if cargo fmt will like my indentation choice above -- depends on whether the (
or {
is more important)
541655e
to
4631bae
Compare
kernel/src/log_segment/tests.rs
Outdated
|
||
mock_table | ||
.parquet_checkpoint( | ||
add_batch_simple(get_log_add_schema().clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scovich this is the batch creation I mentioned in the other comment.
Previously, when passing a schema that included the sidecar action to add_batch_simple
, I found that the SidecarVisitor would find sidecar actions which had the empty path field
replay
's action iteratorreplay
's action iterator
(nit: changed |
Oh sorry, the github bot mistakenly put the breaking-change label on this PR even-though I don't think this is a breaking change @scovich correct me if I'm wrong! |
Ah, the last job does indeed seem to have passed semver checks: (we should revert the PR title change, sorry) |
replay
's action iteratorreplay
's action iterator
kernel/src/log_segment.rs
Outdated
require!( | ||
!need_file_actions || checkpoint_read_schema.contains(SIDECAR_NAME), | ||
Error::generic( | ||
"If the checkpoint read schema contains file actions, it must contain the sidecar column" | ||
) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question for other reviewers: Should this be a debug_assert
? LogSegment
is internal. We're trying to check an invariant here. This seems like the exact use case for debug assertions.
kernel/src/log_segment/tests.rs
Outdated
vec!["sidecar2.parquet"], | ||
v2_checkpoint_read_schema.clone(), | ||
), | ||
"00000000000000000001.checkpoint.0000000002.0000000002.parquet", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This string is used twice. Put it in a variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also use delta_path_for_multipart_checkpoint
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 the func you mentioned prepends the _delta_log directory though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I was thinking something along the lines of
write_parquet_to_store(store, data, delta_path_for_version(0, "checkpoint.parquet"));
write_parquet_to_store(store, data, delta_path_for_sidecar("sidecar1"));
write_json_to_store(store, data, delta_path_for_sidecar("sidecar1"));
Moving the _delta_log
stuff up to the caller.
This approach would reduce the number of test functions we maintain, but is more complicated to call. I think I like your add_sidecar_to_store
methods.
kernel/src/log_segment/tests.rs
Outdated
add_checkpoint_to_store( | ||
&store, | ||
// Create a checkpoint batch with sidecar actions to verify that the sidecar actions are not read. | ||
sidecar_batch_with_given_paths(vec!["sidecar1.parquet"], v2_checkpoint_read_schema.clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait doesn't this return an empty batch?
afaik sidecar_batch_with_given_paths
creates a bunch of sidecar actions in json, then parses them into arrow with this output schema. But In this case, the output schema is metadata, so those sidecar actions are projected away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I'm not convinced that sidecar_batch_with_given_paths
needs to take a schema. Seems that it should just have get_log_schema().project(&[SIDECAR_NAME])
as the schema in all cases. I think other columns are defaulted to NULL upon read. Correct me if I'm wrong tho
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it should. This tests the scenario where replay is called with a schema with non-file actions. I tested this with a checkpoint batch with sidecar actions for extra assurance that the sidecar actions were not extracted & read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the sidecar_batch_with_given_paths
take a schema to allow for easier testing in this scenario in particular, where checkpoint files can be read with all types of schemas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed in-person.
The kernel checkpointing functionality reads batches with tight schemas. Asserting equality on the returned batches requires us to generate our own batches conforming to the same schemas (null columns affect equality checks). Passing the schema to this function is convenient as we want to 1. generate a dummy batch with a loose schema to test the checkpointing functionality, as well as 2. generate an expected batch with the tighter schema.
// Multi-part checkpoints can never have sidecar actions. | ||
// We place batches with sidecar actions in multi-part checkpoints to verify we do not read the actions, as we | ||
// should instead short-circuit and return the checkpoint batches as-is when encountering multi-part checkpoints. | ||
let checkpoint_part_1 = "00000000000000000001.checkpoint.0000000001.0000000002.parquet"; | ||
let checkpoint_part_2 = "00000000000000000001.checkpoint.0000000002.0000000002.parquet"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I'm not sure we should be testing this case. This seems to be an incorrect table, and it feels like we're testing undefined behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I were to place batches without sidecar actions in the multi-part checkpoint, I don't know whether or not the single functionality of skipping the visiting of multi-part checkpoint batches with the SidecarVisitor
is actually working, as there were never sidecar actions to be read in the first place.
This way, we know for a fact that the multi-part checkpoint batches are not being visited as the visitor would otherwise find sidecar actions, try to read the corresponding sidecar files, then error.
// TODO: Known issue here https://github.com/apache/arrow-rs/issues/7119 | ||
// Once https://github.com/delta-io/delta-kernel-rs/pull/692 is merged, remove empty path check. | ||
// This is a workaround to avoid constructing a sidecar action with an empty path for testing. | ||
if path.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will leave this workaround temporarily instead of mucking with the test cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, mostly looks great, just a couple small things
|
||
/// Try to convert an `EngineData` into a `RecordBatch`. Panics if not using `ArrowEngineData` from | ||
/// the default module | ||
fn into_record_batch(engine_data: Box<dyn EngineData>) -> RecordBatch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense as a util. There is a bunch of test code that already does this (see to_arrow
in read.rs
for example). Maybe we can unify and just have everyone use this?
If you want to take that as a follow-up PR that's fine, just create an issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take this up in a follow-up PR just to not bloat this PR further. Thanks for suggestion 👍 issue tracked here: #694
What changes are proposed in this pull request?
Summary
This PR introduces foundational changes required for V2 checkpoint read support. The high-level changes required for v2 checkpoint support are:
Item 1. Allow log segments to be built with V2 checkpoint files
Item 2. Allow log segment
replay
functionality to retrieve actions from sidecar files if need be.This PR specifically adds support for Item 2.
This PR does not introduce full v2Checkpoints reader/writer support as we are missing support for Item 1, meaning log segments can never have V2 checkpoint files in the first place. That functionality will be completed in PR #685 which is stacked on top of this PR. However, the changes to log
replay
done here are compatible with tables using V1 checkpoints, allowing us to split the changes across two pull requests.Changes
For each batch of
EngineData
from a checkpoint file:SidecarVisitor
to scan each batch for sidecar file paths embedded in sidecar actions.- Note: the original checkpoint batch is still included in the iterator
Notes:
checkpoint_read_schema
does not have file actions, we do not need to scan the batch with theSidecarVisitor
and can leave the batch as-is in the top-level iterator.SidecarVisitor
and can leave the batch as-is in the top-level iterator.resolves #670
How was this change tested?
Although log segments can not yet have V2 checkpoints, we can easily mock batches that include sidecar actions that we can encounter in V2 checkpoints.
test_sidecar_to_filemeta_valid_paths
Unit tests for process_single_checkpoint_batch:
test_checkpoint_batch_with_no_sidecars_returns_none
test_checkpoint_batch_with_sidecars_returns_sidecar_batches
test_checkpoint_batch_with_sidecar_files_that_do_not_exist
Unit tests for create_checkpoint_stream:
test_create_checkpoint_stream_returns_none_if_checkpoint_parts_is_empty
test_create_checkpoint_stream_errors_when_schema_has_add_but_no_sidecar_action
test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schema_has_no_file_actions
test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_is_multi_part
test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_sidecars
test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidecars
test_create_checkpoint_stream_reads_checkpoint_batch_with_sidecar