Skip to content

Commit

Permalink
ARROW-13794: [C++] Deprecate PARQUET_VERSION_2_0
Browse files Browse the repository at this point in the history
Introduce PARQUET_VERSION_2_4 and PARQUET_VERSION_2_6 which allow fine-grained selection of post-1.0 Parquet format features.

Closes apache#11031 from pitrou/ARROW-13794-parquet-version

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
pitrou authored and Matthew Topol committed Sep 12, 2021
1 parent 44098c2 commit 7632f0c
Show file tree
Hide file tree
Showing 27 changed files with 272 additions and 195 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/flight/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <mach-o/dyld.h>
#endif

#include <algorithm>
#include <cstdlib>
#include <sstream>

Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/util/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@
# define ARROW_DEPRECATED_USING(...)
# endif
#endif

#ifdef __COVERITY__
# define ARROW_DEPRECATED_ENUM_VALUE(...)
#elif __cplusplus > 201103L
# define ARROW_DEPRECATED_ENUM_VALUE(...) [[deprecated(__VA_ARGS__)]]
#else
# if defined(__GNUC__) && __GNUC__ >= 6
# define ARROW_DEPRECATED_ENUM_VALUE(...) __attribute__((deprecated(__VA_ARGS__)))
# else
# define ARROW_DEPRECATED_ENUM_VALUE(...)
# endif
#endif

// clang-format on

// Macros to disable deprecation warnings
Expand Down
127 changes: 59 additions & 68 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1243,11 +1243,11 @@ TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compatibility) {
ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &values));
std::shared_ptr<Table> table = MakeSimpleTable(values, true);

// Parquet 2.0 roundtrip should yield an uint32_t column again
// Parquet 2.4 roundtrip should yield an uint32_t column again
this->ResetSink();
std::shared_ptr<::parquet::WriterProperties> properties =
::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_2_0)
.version(ParquetVersion::PARQUET_2_4)
->build();
ASSERT_OK_NO_THROW(
WriteTable(*table, default_memory_pool(), this->sink_, 512, properties));
Expand Down Expand Up @@ -1946,27 +1946,42 @@ TEST(TestArrowReadWrite, ParquetVersionTimestampDifferences) {
auto input_table = Table::Make(input_schema, {a_s, a_ms, a_us, a_ns});

auto parquet_version_1_properties = ::parquet::default_writer_properties();
auto parquet_version_2_properties = ::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_2_0)
->build();
ARROW_SUPPRESS_DEPRECATION_WARNING
auto parquet_version_2_0_properties = ::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_2_0)
->build();
ARROW_UNSUPPRESS_DEPRECATION_WARNING
auto parquet_version_2_4_properties = ::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_2_4)
->build();
auto parquet_version_2_6_properties = ::parquet::WriterProperties::Builder()
.version(ParquetVersion::PARQUET_2_6)
->build();
const std::vector<std::shared_ptr<WriterProperties>> all_properties = {
parquet_version_1_properties, parquet_version_2_0_properties,
parquet_version_2_4_properties, parquet_version_2_6_properties};

{
// Using Parquet version 1.0 defaults, seconds should be coerced to milliseconds
// and nanoseconds should be coerced to microseconds
// Using Parquet version 1.0 and 2.4 defaults, seconds should be coerced to
// milliseconds and nanoseconds should be coerced to microseconds
auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
field("ts:us", t_us), field("ts:ns", t_us)});
auto expected_table = Table::Make(expected_schema, {a_ms, a_ms, a_us, a_us});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_1_properties));
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_4_properties));
}
{
// Using Parquet version 2.0 defaults, seconds should be coerced to milliseconds
// and nanoseconds should be retained
// Using Parquet version 2.0 and 2.6 defaults, seconds should be coerced to
// milliseconds and nanoseconds should be retained
auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
field("ts:us", t_us), field("ts:ns", t_ns)});
auto expected_table = Table::Make(expected_schema, {a_ms, a_ms, a_us, a_ns});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties));
parquet_version_2_0_properties));
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_6_properties));
}

auto arrow_coerce_to_seconds_properties =
Expand All @@ -1977,85 +1992,61 @@ TEST(TestArrowReadWrite, ParquetVersionTimestampDifferences) {
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build();
auto arrow_coerce_to_nanos_properties =
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::NANO)->build();
{
// Neither Parquet version 1.0 nor 2.0 allow coercing to seconds
std::shared_ptr<Table> actual_table;
ASSERT_RAISES(
NotImplemented,
WriteTable(*input_table, ::arrow::default_memory_pool(), CreateOutputStream(),
input_table->num_rows(), parquet_version_1_properties,
arrow_coerce_to_seconds_properties));
ASSERT_RAISES(
NotImplemented,
WriteTable(*input_table, ::arrow::default_memory_pool(), CreateOutputStream(),
input_table->num_rows(), parquet_version_2_properties,
arrow_coerce_to_seconds_properties));
}
{
// Using Parquet version 1.0, coercing to milliseconds or microseconds is allowed
auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
field("ts:us", t_ms), field("ts:ns", t_ms)});
auto expected_table = Table::Make(expected_schema, {a_ms, a_ms, a_ms, a_ms});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_1_properties,
arrow_coerce_to_millis_properties));

expected_schema = schema({field("ts:s", t_us), field("ts:ms", t_us),
field("ts:us", t_us), field("ts:ns", t_us)});
expected_table = Table::Make(expected_schema, {a_us, a_us, a_us, a_us});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_1_properties,
arrow_coerce_to_micros_properties));
}
{
// Using Parquet version 2.0, coercing to milliseconds or microseconds is allowed
for (const auto& properties : all_properties) {
// Using all Parquet versions, coercing to milliseconds or microseconds is allowed
ARROW_SCOPED_TRACE("format = ", ParquetVersionToString(properties->version()));
auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
field("ts:us", t_ms), field("ts:ns", t_ms)});
auto expected_table = Table::Make(expected_schema, {a_ms, a_ms, a_ms, a_ms});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties,
arrow_coerce_to_millis_properties));
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(
input_table, expected_table, properties, arrow_coerce_to_millis_properties));

expected_schema = schema({field("ts:s", t_us), field("ts:ms", t_us),
field("ts:us", t_us), field("ts:ns", t_us)});
expected_table = Table::Make(expected_schema, {a_us, a_us, a_us, a_us});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties,
arrow_coerce_to_micros_properties));
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(
input_table, expected_table, properties, arrow_coerce_to_micros_properties));

// Neither Parquet version allows coercing to seconds
std::shared_ptr<Table> actual_table;
ASSERT_RAISES(NotImplemented,
WriteTable(*input_table, ::arrow::default_memory_pool(),
CreateOutputStream(), input_table->num_rows(), properties,
arrow_coerce_to_seconds_properties));
}
{
// Using Parquet version 1.0, coercing to (int64) nanoseconds is not allowed
// Using Parquet versions 1.0 and 2.4, coercing to (int64) nanoseconds is not allowed
for (const auto& properties :
{parquet_version_1_properties, parquet_version_2_4_properties}) {
ARROW_SCOPED_TRACE("format = ", ParquetVersionToString(properties->version()));
std::shared_ptr<Table> actual_table;
ASSERT_RAISES(
NotImplemented,
WriteTable(*input_table, ::arrow::default_memory_pool(), CreateOutputStream(),
input_table->num_rows(), parquet_version_1_properties,
arrow_coerce_to_nanos_properties));
ASSERT_RAISES(NotImplemented,
WriteTable(*input_table, ::arrow::default_memory_pool(),
CreateOutputStream(), input_table->num_rows(), properties,
arrow_coerce_to_nanos_properties));
}
{
// Using Parquet version 2.0, coercing to (int64) nanoseconds is allowed
// Using Parquet versions "2.0" and 2.6, coercing to (int64) nanoseconds is allowed
for (const auto& properties :
{parquet_version_2_0_properties, parquet_version_2_6_properties}) {
ARROW_SCOPED_TRACE("format = ", ParquetVersionToString(properties->version()));
auto expected_schema = schema({field("ts:s", t_ns), field("ts:ms", t_ns),
field("ts:us", t_ns), field("ts:ns", t_ns)});
auto expected_table = Table::Make(expected_schema, {a_ns, a_ns, a_ns, a_ns});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties,
arrow_coerce_to_nanos_properties));
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(
input_table, expected_table, properties, arrow_coerce_to_nanos_properties));
}

// Using all Parquet versions, coercing to nanoseconds is allowed if Int96
// storage is used
auto arrow_enable_int96_properties =
ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build();
{
// For either Parquet version, coercing to nanoseconds is allowed if Int96
// storage is used
for (const auto& properties : all_properties) {
ARROW_SCOPED_TRACE("format = ", ParquetVersionToString(properties->version()));
auto expected_schema = schema({field("ts:s", t_ns), field("ts:ms", t_ns),
field("ts:us", t_ns), field("ts:ns", t_ns)});
auto expected_table = Table::Make(expected_schema, {a_ns, a_ns, a_ns, a_ns});
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_1_properties,
arrow_enable_int96_properties));
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
parquet_version_2_properties,
arrow_enable_int96_properties));
ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(
input_table, expected_table, properties, arrow_enable_int96_properties));
}
}

Expand Down
24 changes: 15 additions & 9 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

#include "parquet/arrow/schema_internal.h"
#include "parquet/exception.h"
#include "parquet/metadata.h"
#include "parquet/properties.h"
#include "parquet/types.h"

Expand Down Expand Up @@ -169,6 +170,7 @@ static Status GetTimestampMetadata(const ::arrow::TimestampType& type,
const bool coerce = arrow_properties.coerce_timestamps_enabled();
const auto target_unit =
coerce ? arrow_properties.coerce_timestamps_unit() : type.unit();
const auto version = properties.version();

// The user is explicitly asking for Impala int96 encoding, there is no
// logical type.
Expand All @@ -183,16 +185,18 @@ static Status GetTimestampMetadata(const ::arrow::TimestampType& type,
// The user is explicitly asking for timestamp data to be converted to the
// specified units (target_unit).
if (coerce) {
if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) {
if (version == ::parquet::ParquetVersion::PARQUET_1_0 ||
version == ::parquet::ParquetVersion::PARQUET_2_4) {
switch (target_unit) {
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO:
break;
case ::arrow::TimeUnit::NANO:
case ::arrow::TimeUnit::SECOND:
return Status::NotImplemented(
"For Parquet version 1.0 files, can only coerce Arrow timestamps to "
"milliseconds or microseconds");
return Status::NotImplemented("For Parquet version ",
::parquet::ParquetVersionToString(version),
", can only coerce Arrow timestamps to "
"milliseconds or microseconds");
}
} else {
switch (target_unit) {
Expand All @@ -201,19 +205,21 @@ static Status GetTimestampMetadata(const ::arrow::TimestampType& type,
case ::arrow::TimeUnit::NANO:
break;
case ::arrow::TimeUnit::SECOND:
return Status::NotImplemented(
"For Parquet files, can only coerce Arrow timestamps to milliseconds, "
"microseconds, or nanoseconds");
return Status::NotImplemented("For Parquet version ",
::parquet::ParquetVersionToString(version),
", can only coerce Arrow timestamps to "
"milliseconds, microseconds, or nanoseconds");
}
}
return Status::OK();
}

// The user implicitly wants timestamp data to retain its original time units,
// however the ConvertedType field used to indicate logical types for Parquet
// version 1.0 fields does not allow for nanosecond time units and so nanoseconds
// version <= 2.4 fields does not allow for nanosecond time units and so nanoseconds
// must be coerced to microseconds.
if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0 &&
if ((version == ::parquet::ParquetVersion::PARQUET_1_0 ||
version == ::parquet::ParquetVersion::PARQUET_2_4) &&
type.unit() == ::arrow::TimeUnit::NANO) {
*logical_type =
TimestampLogicalTypeFromArrowTimestamp(type, ::arrow::TimeUnit::MICRO);
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1805,6 +1805,8 @@ Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels,
maybe_parent_nulls);
};

const ParquetVersion::type version = writer->properties()->version();

if (ctx->properties->coerce_timestamps_enabled()) {
// User explicitly requested coercion to specific unit
if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) {
Expand All @@ -1814,9 +1816,10 @@ Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels,
} else {
return WriteCoerce(ctx->properties);
}
} else if (writer->properties()->version() == ParquetVersion::PARQUET_1_0 &&
} else if ((version == ParquetVersion::PARQUET_1_0 ||
version == ParquetVersion::PARQUET_2_4) &&
source_type.unit() == ::arrow::TimeUnit::NANO) {
// Absent superseding user instructions, when writing Parquet version 1.0 files,
// Absent superseding user instructions, when writing Parquet version <= 2.4 files,
// timestamps in nanoseconds are coerced to microseconds
std::shared_ptr<ArrowWriterProperties> properties =
(ArrowWriterProperties::Builder())
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion1_0) {
}

TYPED_TEST(TestPrimitiveWriter, DictionaryFallbackVersion2_0) {
this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_2_0);
this->TestDictionaryFallbackEncoding(ParquetVersion::PARQUET_2_4);
}

TEST(TestWriter, NullValuesBuffer) {
Expand Down
14 changes: 9 additions & 5 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,14 @@ std::string ParquetVersionToString(ParquetVersion::type ver) {
switch (ver) {
case ParquetVersion::PARQUET_1_0:
return "1.0";
ARROW_SUPPRESS_DEPRECATION_WARNING
case ParquetVersion::PARQUET_2_0:
return "2.0";
return "pseudo-2.0";
ARROW_UNSUPPRESS_DEPRECATION_WARNING
case ParquetVersion::PARQUET_2_4:
return "2.4";
case ParquetVersion::PARQUET_2_6:
return "2.6";
}

// This should be unreachable
Expand Down Expand Up @@ -815,7 +821,7 @@ ParquetVersion::type FileMetaData::version() const {
case 1:
return ParquetVersion::PARQUET_1_0;
case 2:
return ParquetVersion::PARQUET_2_0;
return ParquetVersion::PARQUET_2_LATEST;
default:
// Improperly set version, assuming Parquet 1.0
break;
Expand Down Expand Up @@ -1670,10 +1676,8 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
case ParquetVersion::PARQUET_1_0:
file_version = 1;
break;
case ParquetVersion::PARQUET_2_0:
file_version = 2;
break;
default:
file_version = 2;
break;
}
metadata_->__set_version(file_version);
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,12 @@ class PARQUET_EXPORT FileMetaData {
/// \throws ParquetException if the index is out of bound.
std::unique_ptr<RowGroupMetaData> RowGroup(int index) const;

/// \brief Return the version of the file.
/// \brief Return the "version" of the file
///
/// WARNING: The value returned by this method is unreliable as 1) the Parquet
/// file metadata stores the version as a single integer and 2) some producers
/// are known to always write a hardcoded value. Therefore, you cannot use
/// this value to know which features are used in the file.
ParquetVersion::type version() const;

/// \brief Return the application's user-agent string of the writer.
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/metadata_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ TEST(Metadata, TestBuildAccess) {
WriterProperties::Builder prop_builder;

std::shared_ptr<WriterProperties> props =
prop_builder.version(ParquetVersion::PARQUET_2_0)->build();
prop_builder.version(ParquetVersion::PARQUET_2_6)->build();

fields.push_back(parquet::schema::Int32("int_col", Repetition::REQUIRED));
fields.push_back(parquet::schema::Float("float_col", Repetition::REQUIRED));
Expand Down Expand Up @@ -126,7 +126,7 @@ TEST(Metadata, TestBuildAccess) {
ASSERT_EQ(nrows, f_accessors[loop_index]->num_rows());
ASSERT_LE(0, static_cast<int>(f_accessors[loop_index]->size()));
ASSERT_EQ(2, f_accessors[loop_index]->num_row_groups());
ASSERT_EQ(ParquetVersion::PARQUET_2_0, f_accessors[loop_index]->version());
ASSERT_EQ(ParquetVersion::PARQUET_2_6, f_accessors[loop_index]->version());
ASSERT_EQ(DEFAULT_CREATED_BY, f_accessors[loop_index]->created_by());
ASSERT_EQ(3, f_accessors[loop_index]->num_schema_elements());

Expand Down Expand Up @@ -213,7 +213,7 @@ TEST(Metadata, TestBuildAccess) {
ASSERT_EQ(4, f_accessor->num_row_groups());
ASSERT_EQ(nrows * 2, f_accessor->num_rows());
ASSERT_LE(0, static_cast<int>(f_accessor->size()));
ASSERT_EQ(ParquetVersion::PARQUET_2_0, f_accessor->version());
ASSERT_EQ(ParquetVersion::PARQUET_2_6, f_accessor->version());
ASSERT_EQ(DEFAULT_CREATED_BY, f_accessor->created_by());
ASSERT_EQ(3, f_accessor->num_schema_elements());

Expand Down
Loading

0 comments on commit 7632f0c

Please sign in to comment.