Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java API] Rough edges when recreating a DataFile that is partitioned by month or hour #11900

Open
1 of 3 tasks
ahmedabu98 opened this issue Jan 1, 2025 · 2 comments
Open
1 of 3 tasks
Labels
bug Something isn't working

Comments

@ahmedabu98
Copy link

Apache Iceberg version

1.7.1 (latest release)

Query engine

None

Please describe the bug 🐞

Part of our workflow in Apache Beam's Iceberg connector requires recreating DataFiles, but this process isn't smooth when the file is partitioned by month or hour. See the following reproducible code:

org.apache.iceberg.Schema schema =
    new org.apache.iceberg.Schema(
        Types.NestedField.required(1, "month", Types.TimestampType.withoutZone()),
        Types.NestedField.required(2, "hour", Types.TimestampType.withoutZone()));
PartitionSpec spec = PartitionSpec.builderFor(schema).month("month").hour("hour").build();
Table table = catalog.createTable(TableIdentifier.parse("db.table"), schema, spec);

LocalDateTime val = LocalDateTime.parse("2024-10-08T13:18:20.053");
Record rec = GenericRecord.create(schema).copy(
        ImmutableMap.of(
                "month", val,
                "hour", val));
Record partitionableRec = getPartitionableRecord(rec, spec, schema);
PartitionKey pk = new PartitionKey(spec, schema);
pk.partition(partitionableRec);
DataWriter<Record> writer =
    Parquet.writeData(
            table
                .io()
                .newOutputFile(table.locationProvider().newDataLocation(spec, pk, "test_file")))
        .createWriterFunc(GenericParquetWriter::buildWriter)
        .schema(table.schema())
        .withSpec(table.spec())
        .withPartition(pk)
        .overwrite()
        .build();
writer.write(rec);
writer.close();
DataFile file = writer.toDataFile();

// recreate data file using the original file
DataFiles.builder(spec)
    .withPath(file.path().toString())
    .withFormat(file.format())
    .withPartition(file.partition())
    .withFileSizeInBytes(file.fileSizeInBytes())
    .withRecordCount(file.recordCount())
    .withPartitionPath(spec.partitionToPath(file.partition()))
    .build();

The last bit fails with the following error:

java.lang.NumberFormatException: For input string: "2024-10"
	at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.base/java.lang.Integer.parseInt(Integer.java:652)
	at java.base/java.lang.Integer.valueOf(Integer.java:983)
	at org.apache.iceberg.types.Conversions.fromPartitionString(Conversions.java:51)
	at org.apache.iceberg.DataFiles.fillFromPath(DataFiles.java:86)
	at org.apache.iceberg.DataFiles$Builder.withPartitionPath(DataFiles.java:266)

I would expect that the result of spec.partitionToPath(file.partition()) could be naturally used when recreating the DataFile, but the logic here doesn't seem to be robust enough.

We've been able to use this work around, replicated below:

Work around
static String getPartitionDataPath(
    String partitionPath, Map<String, PartitionField> partitionFieldMap) {
  if (partitionPath.isEmpty() || partitionFieldMap.isEmpty()) {
    return partitionPath;
  }
  List<String> resolved = new ArrayList<>();
  for (String partition : Splitter.on('/').splitToList(partitionPath)) {
    List<String> nameAndValue = Splitter.on('=').splitToList(partition);
    String name = nameAndValue.get(0);
    String value = nameAndValue.get(1);
    String transformName =
        Preconditions.checkArgumentNotNull(partitionFieldMap.get(name)).transform().toString();
    if (Transforms.month().toString().equals(transformName)) {
      int month = YearMonth.parse(value).getMonthValue();
      value = String.valueOf(month);
    } else if (Transforms.hour().toString().equals(transformName)) {
      long hour = ChronoUnit.HOURS.between(EPOCH, LocalDateTime.parse(value, HOUR_FORMATTER));
      value = String.valueOf(hour);
    }
    resolved.add(name + "=" + value);
  }
  return String.join("/", resolved);
}

But I would expect the Iceberg API to take care of this by itself.

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@ahmedabu98 ahmedabu98 added the bug Something isn't working label Jan 1, 2025
@RussellSpitzer
Copy link
Member

I think we probably should be moving to deprecate .withPartitionPath and .withPartitionValues since they basically assume you are importing from a Hive table and only using identity transformations. in this case you should be doing

   .withPartition(file.partition())

Rather than converting to a string and then back again.

@ahmedabu98
Copy link
Author

We convert it to a String because with Beam's distributed data processing framework, it's best practice to deconstruct custom Java objects to built-in types when passing data over the wire (as opposed to direct byte serialization that can change from one version to the next).

For this scenario, we are deconstructing the DataFile to built-in java types, then reconstructing it later on. file.partition() returns a generic StructLike which is hard to reason about, so we manually build a partition path.

Is there a better way to go around it? Maybe some Iceberg utils that we're not aware of?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants