From b0352a53494326bbd1ffda1647c4a03149fe0d0f Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Mon, 25 May 2026 08:38:13 +0800 Subject: [PATCH] feat: iceberg v3 variant non-shredding --- src/iceberg/avro/avro_constants.h | 2 + src/iceberg/avro/avro_register.cc | 3 + src/iceberg/avro/avro_schema_util.cc | 48 +++- src/iceberg/avro/avro_schema_util_internal.h | 10 + src/iceberg/json_serde.cc | 5 + src/iceberg/metrics_config.cc | 6 +- src/iceberg/parquet/parquet_reader.cc | 2 + src/iceberg/parquet/parquet_register.cc | 5 + src/iceberg/parquet/parquet_schema_util.cc | 126 ++++++++- .../parquet/parquet_schema_util_internal.h | 3 + src/iceberg/parquet/parquet_writer.cc | 2 + src/iceberg/schema_internal.cc | 55 ++++ src/iceberg/table_metadata.h | 1 + src/iceberg/test/arrow_test.cc | 61 +++++ src/iceberg/test/avro_schema_test.cc | 87 ++++++ src/iceberg/test/parquet_schema_test.cc | 250 +++++++++++++++++- src/iceberg/test/schema_json_test.cc | 1 + src/iceberg/test/schema_test.cc | 70 +++-- src/iceberg/test/type_test.cc | 13 + src/iceberg/type.cc | 9 +- src/iceberg/type.h | 32 ++- src/iceberg/type_fwd.h | 3 +- src/iceberg/update/update_schema.cc | 6 + src/iceberg/util/struct_like_set.cc | 2 + src/iceberg/util/type_util.cc | 34 ++- src/iceberg/util/type_util.h | 5 + src/iceberg/util/visitor_generate.h | 4 + 27 files changed, 795 insertions(+), 50 deletions(-) diff --git a/src/iceberg/avro/avro_constants.h b/src/iceberg/avro/avro_constants.h index 6fdfdc5ed..488b986a5 100644 --- a/src/iceberg/avro/avro_constants.h +++ b/src/iceberg/avro/avro_constants.h @@ -25,11 +25,13 @@ namespace iceberg::avro { // Avro logical type constants constexpr std::string_view kMapLogicalType = "map"; +constexpr std::string_view kVariantLogicalType = "variant"; // Name mapping field constants constexpr std::string_view kElement = "element"; constexpr std::string_view kKey = "key"; constexpr std::string_view kValue = "value"; +constexpr std::string_view kMetadata = "metadata"; // Avro custom attributes constants constexpr std::string_view kIcebergFieldNameProp = "iceberg-field-name"; diff --git a/src/iceberg/avro/avro_register.cc b/src/iceberg/avro/avro_register.cc index 07efac381..810a73c2a 100644 --- a/src/iceberg/avro/avro_register.cc +++ b/src/iceberg/avro/avro_register.cc @@ -28,6 +28,9 @@ void RegisterLogicalTypes() { // See https://github.com/apache/avro/pull/3326 for details. ::avro::CustomLogicalTypeRegistry::instance().registerType( "map", [](const std::string&) { return std::make_shared(); }); + ::avro::CustomLogicalTypeRegistry::instance().registerType( + "variant", + [](const std::string&) { return std::make_shared(); }); } void RegisterAll() { diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index 3d61d283f..59717bee4 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -48,6 +48,10 @@ ::avro::LogicalType GetMapLogicalType() { return ::avro::LogicalType(std::make_shared()); } +::avro::LogicalType GetVariantLogicalType() { + return ::avro::LogicalType(std::make_shared()); +} + ::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) { ::avro::CustomAttributes attributes; attributes.addAttribute(std::string(kFieldIdProp), std::to_string(field_id), @@ -237,6 +241,22 @@ Status ToAvroNodeVisitor::Visit(const BinaryType& type, ::avro::NodePtr* node) { return {}; } +Status ToAvroNodeVisitor::Visit(const VariantType& type, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodeRecord>(); + if (field_ids_.empty()) { + (*node)->setName(::avro::Name(std::string(kVariantLogicalType))); + } else { + (*node)->setName(::avro::Name(std::format("r{}", field_ids_.top()))); + } + (*node)->setLogicalType(GetVariantLogicalType()); + + (*node)->addName(std::string(kMetadata)); + (*node)->addLeaf(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BYTES)); + (*node)->addName(std::string(kValue)); + (*node)->addLeaf(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BYTES)); + return {}; +} + Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) { *node = std::make_shared<::avro::NodeRecord>(); @@ -392,6 +412,10 @@ Status HasIdVisitor::Visit(const ::avro::NodePtr& node) { } Status HasIdVisitor::VisitRecord(const ::avro::NodePtr& node) { + if (HasVariantLogicalType(node)) { + return {}; + } + static const std::string kFieldIdKey{kFieldIdProp}; total_fields_ += node->leaves(); for (size_t i = 0; i < node->leaves(); ++i) { @@ -510,6 +534,13 @@ Result GetFieldId(const ::avro::NodePtr& node, size_t field_idx) { return GetId(node, kFieldIdKey, field_idx); } +bool IsVariantAvroSchema(const ::avro::NodePtr& node) { + return HasVariantLogicalType(node) && node->type() == ::avro::AVRO_RECORD && + node->leaves() == 2 && node->names() == 2 && node->nameAt(0) == kMetadata && + node->nameAt(1) == kValue && node->leafAt(0)->type() == ::avro::AVRO_BYTES && + node->leafAt(1)->type() == ::avro::AVRO_BYTES; +} + Status ValidateAvroSchemaEvolution(const Type& expected_type, const ::avro::NodePtr& avro_node) { switch (expected_type.type_id()) { @@ -615,6 +646,11 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type, return {}; } break; + case TypeId::kVariant: + if (IsVariantAvroSchema(avro_node)) { + return {}; + } + break; default: break; } @@ -847,7 +883,13 @@ Result ProjectNested(const Type& expected_type, bool HasMapLogicalType(const ::avro::NodePtr& node) { return node->logicalType().type() == ::avro::LogicalType::CUSTOM && node->logicalType().customLogicalType() != nullptr && - node->logicalType().customLogicalType()->name() == "map"; + node->logicalType().customLogicalType()->name() == kMapLogicalType; +} + +bool HasVariantLogicalType(const ::avro::NodePtr& node) { + return node->logicalType().type() == ::avro::LogicalType::CUSTOM && + node->logicalType().customLogicalType() != nullptr && + node->logicalType().customLogicalType()->name() == kVariantLogicalType; } Result Project(const Schema& expected_schema, @@ -1032,6 +1074,10 @@ Result<::avro::NodePtr> MakeUnionNodeWithFieldIds(const ::avro::NodePtr& origina Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node, const NameMapping& mapping, std::vector& names) { + if (HasVariantLogicalType(original_node)) { + return original_node; + } + switch (original_node->type()) { case ::avro::AVRO_RECORD: return MakeRecordNodeWithFieldIds(original_node, mapping, names); diff --git a/src/iceberg/avro/avro_schema_util_internal.h b/src/iceberg/avro/avro_schema_util_internal.h index e3b7a7ffd..7e9a2415d 100644 --- a/src/iceberg/avro/avro_schema_util_internal.h +++ b/src/iceberg/avro/avro_schema_util_internal.h @@ -39,6 +39,10 @@ struct MapLogicalType : public ::avro::CustomLogicalType { MapLogicalType() : ::avro::CustomLogicalType("map") {} }; +struct VariantLogicalType : public ::avro::CustomLogicalType { + VariantLogicalType() : ::avro::CustomLogicalType("variant") {} +}; + /// \brief A visitor that converts an Iceberg type to an Avro node. class ToAvroNodeVisitor { public: @@ -58,6 +62,7 @@ class ToAvroNodeVisitor { Status Visit(const UuidType& type, ::avro::NodePtr* node); Status Visit(const FixedType& type, ::avro::NodePtr* node); Status Visit(const BinaryType& type, ::avro::NodePtr* node); + Status Visit(const VariantType& type, ::avro::NodePtr* node); Status Visit(const StructType& type, ::avro::NodePtr* node); Status Visit(const ListType& type, ::avro::NodePtr* node); Status Visit(const MapType& type, ::avro::NodePtr* node); @@ -151,6 +156,11 @@ std::string ToString(const ::avro::LogicalType::Type& logical_type); /// \return True if the node has a map logical type, false otherwise. bool HasMapLogicalType(const ::avro::NodePtr& node); +/// \brief Check if an Avro node has a variant logical type. +/// \param node The Avro node to check. +/// \return True if the node has a variant logical type, false otherwise. +bool HasVariantLogicalType(const ::avro::NodePtr& node); + /// \brief Check if a string is a valid Avro name. /// /// Valid Avro names must: diff --git a/src/iceberg/json_serde.cc b/src/iceberg/json_serde.cc index 3944e510c..652840e2a 100644 --- a/src/iceberg/json_serde.cc +++ b/src/iceberg/json_serde.cc @@ -73,6 +73,7 @@ constexpr std::string_view kType = "type"; constexpr std::string_view kStruct = "struct"; constexpr std::string_view kList = "list"; constexpr std::string_view kMap = "map"; +constexpr std::string_view kVariant = "variant"; constexpr std::string_view kElement = "element"; constexpr std::string_view kKey = "key"; constexpr std::string_view kValue = "value"; @@ -377,6 +378,8 @@ nlohmann::json ToJson(const Type& type) { } case TypeId::kUuid: return "uuid"; + case TypeId::kVariant: + return "variant"; } std::unreachable(); } @@ -502,6 +505,8 @@ Result> TypeFromJson(const nlohmann::json& json) { return std::make_unique(); } else if (type_str == "uuid") { return std::make_unique(); + } else if (type_str == kVariant) { + return std::make_unique(); } else if (type_str.starts_with("fixed")) { std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])"); std::smatch match; diff --git a/src/iceberg/metrics_config.cc b/src/iceberg/metrics_config.cc index e378640e0..45c1259e7 100644 --- a/src/iceberg/metrics_config.cc +++ b/src/iceberg/metrics_config.cc @@ -174,6 +174,8 @@ Result> MetricsConfig::LimitFieldIds(const Schema& s Status Visit(const Type& type) { if (type.is_nested()) { return VisitNested(internal::checked_cast(type)); + } else if (type.is_variant()) { + return VisitVariant(internal::checked_cast(type)); } else { return VisitPrimitive(internal::checked_cast(type)); } @@ -184,8 +186,7 @@ Result> MetricsConfig::LimitFieldIds(const Schema& s if (!ShouldContinue()) { break; } - // TODO(zhuo.wang): variant type should also be handled here - if (field.type()->is_primitive()) { + if (field.type()->is_primitive() || field.type()->is_variant()) { ids_.insert(field.field_id()); } } @@ -199,6 +200,7 @@ Result> MetricsConfig::LimitFieldIds(const Schema& s } Status VisitPrimitive(const PrimitiveType& type) { return {}; } + Status VisitVariant(const VariantType& type) { return {}; } std::unordered_set Finish() const { return ids_; } diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 775644a94..924ea6db9 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -113,6 +113,7 @@ class ParquetReader::Impl { read_schema_ = options.projection; // Prepare reader properties + ICEBERG_RETURN_UNEXPECTED(EnsureParquetVariantExtensionRegistered()); ::parquet::ReaderProperties reader_properties(pool_); ::parquet::ArrowReaderProperties arrow_reader_properties; arrow_reader_properties.set_batch_size( @@ -212,6 +213,7 @@ class ParquetReader::Impl { // Build the output Arrow schema ArrowSchema arrow_schema; ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema)); + ICEBERG_RETURN_UNEXPECTED(EnsureParquetVariantExtensionRegistered()); ICEBERG_ARROW_ASSIGN_OR_RETURN(context_->output_arrow_schema_, ::arrow::ImportSchema(&arrow_schema)); diff --git a/src/iceberg/parquet/parquet_register.cc b/src/iceberg/parquet/parquet_register.cc index d79e158c1..5e6623bb1 100644 --- a/src/iceberg/parquet/parquet_register.cc +++ b/src/iceberg/parquet/parquet_register.cc @@ -19,9 +19,14 @@ #include "iceberg/parquet/parquet_register.h" +#include + +#include "iceberg/parquet/parquet_schema_util_internal.h" + namespace iceberg::parquet { void RegisterAll() { + std::ignore = EnsureParquetVariantExtensionRegistered(); RegisterReader(); RegisterWriter(); } diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc index 849bbd1f8..927cf8e9b 100644 --- a/src/iceberg/parquet/parquet_schema_util.cc +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -17,6 +17,7 @@ * under the License. */ +#include #include #include #include @@ -24,6 +25,7 @@ #include #include +#include "iceberg/arrow/arrow_status_internal.h" #include "iceberg/constants.h" #include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_schema_util_internal.h" @@ -38,6 +40,10 @@ namespace iceberg::parquet { namespace { +constexpr std::string_view kArrowVariantExtensionName = "arrow.parquet.variant"; +constexpr std::string_view kVariantMetadataFieldName = "metadata"; +constexpr std::string_view kVariantValueFieldName = "value"; + std::optional FieldIdFromMetadata( const std::shared_ptr& metadata) { if (!metadata) { @@ -186,6 +192,15 @@ Status ValidateParquetSchemaEvolution( } } break; + case TypeId::kVariant: + if (arrow_type->id() == ::arrow::Type::EXTENSION) { + const auto& extension_type = + internal::checked_cast(*arrow_type); + if (extension_type.extension_name() == kArrowVariantExtensionName) { + return {}; + } + } + break; case TypeId::kStruct: if (arrow_type->id() == ::arrow::Type::STRUCT) { return {}; @@ -214,6 +229,64 @@ Result ProjectNested( const Type& nested_type, const std::vector<::parquet::arrow::SchemaField>& parquet_fields); +Result ProjectVariant( + const ::parquet::arrow::SchemaField& parquet_field) { + if (parquet_field.children.size() != 2) { + return NotSupported( + "Reading shredded Variant columns is not supported yet; expected " + "metadata and value fields only"); + } + + std::optional metadata_index; + std::optional value_index; + for (size_t i = 0; i < parquet_field.children.size(); ++i) { + const auto& child = parquet_field.children[i]; + const auto name = std::string_view(child.field->name()); + if (name == kVariantMetadataFieldName) { + if (metadata_index.has_value()) { + return InvalidSchema("Variant Parquet field contains duplicate metadata field"); + } + metadata_index = i; + } else if (name == kVariantValueFieldName) { + if (value_index.has_value()) { + return InvalidSchema("Variant Parquet field contains duplicate value field"); + } + value_index = i; + } + } + + if (!metadata_index.has_value() || !value_index.has_value()) { + return InvalidSchema("Variant Parquet field must contain metadata and value"); + } + + const auto& metadata_field = parquet_field.children[metadata_index.value()]; + const auto& value_field = parquet_field.children[value_index.value()]; + if (metadata_field.field->type()->id() != ::arrow::Type::BINARY || + value_field.field->type()->id() != ::arrow::Type::BINARY) { + return InvalidSchema("Variant metadata and value fields must be binary"); + } + if (metadata_field.field->nullable() || value_field.field->nullable()) { + return InvalidSchema("Variant metadata and value fields must be required"); + } + + FieldProjection metadata_projection; + metadata_projection.kind = FieldProjection::Kind::kProjected; + metadata_projection.from = metadata_index.value(); + metadata_projection.attributes = + std::make_shared(metadata_field.column_index); + + FieldProjection value_projection; + value_projection.kind = FieldProjection::Kind::kProjected; + value_projection.from = value_index.value(); + value_projection.attributes = + std::make_shared(value_field.column_index); + + FieldProjection result; + result.children.emplace_back(std::move(metadata_projection)); + result.children.emplace_back(std::move(value_projection)); + return result; +} + Result ProjectStruct( const StructType& struct_type, const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { @@ -248,14 +321,18 @@ Result ProjectStruct( if (auto iter = field_context_map.find(field_id); iter != field_context_map.cend()) { const auto& parquet_field = iter->second.parquet_field; - ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*field.type(), parquet_field)); - if (field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(child_projection, - ProjectNested(*field.type(), parquet_field.children)); + if (field.type()->is_variant()) { + ICEBERG_ASSIGN_OR_RAISE(child_projection, ProjectVariant(parquet_field)); } else { - child_projection.attributes = - std::make_shared(parquet_field.column_index); + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*field.type(), parquet_field)); + if (field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(child_projection, + ProjectNested(*field.type(), parquet_field.children)); + } else { + child_projection.attributes = + std::make_shared(parquet_field.column_index); + } } child_projection.from = iter->second.local_index; child_projection.kind = FieldProjection::Kind::kProjected; @@ -294,14 +371,17 @@ Result ProjectList( element_field.field_id(), element_field_id.value()); } - ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*element_field.type(), parquet_field)); - FieldProjection element_projection; - if (element_field.type()->is_nested()) { + if (element_field.type()->is_variant()) { + ICEBERG_ASSIGN_OR_RAISE(element_projection, ProjectVariant(parquet_field)); + } else if (element_field.type()->is_nested()) { + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*element_field.type(), parquet_field)); ICEBERG_ASSIGN_OR_RAISE(element_projection, ProjectNested(*element_field.type(), parquet_field.children)); } else { + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*element_field.type(), parquet_field)); element_projection.attributes = std::make_shared(parquet_field.column_index); } @@ -349,12 +429,16 @@ Result ProjectMap( FieldProjection sub_projection; const auto& sub_node = parquet_fields[i]; const auto& sub_field = map_type.fields()[i]; - ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*sub_field.type(), sub_node)); - if (sub_field.type()->is_nested()) { + if (sub_field.type()->is_variant()) { + ICEBERG_ASSIGN_OR_RAISE(sub_projection, ProjectVariant(sub_node)); + } else if (sub_field.type()->is_nested()) { + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*sub_field.type(), sub_node)); ICEBERG_ASSIGN_OR_RAISE(sub_projection, ProjectNested(*sub_field.type(), sub_node.children)); } else { + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*sub_field.type(), sub_node)); sub_projection.attributes = std::make_shared(sub_node.column_index); } @@ -410,6 +494,20 @@ void CollectColumnIds(const FieldProjection& field_projection, } // namespace +Status EnsureParquetVariantExtensionRegistered() { + if (::arrow::GetExtensionType(std::string(kArrowVariantExtensionName)) != nullptr) { + return {}; + } + + auto metadata = ::arrow::field("metadata", ::arrow::binary(), /*nullable=*/false); + auto value = ::arrow::field("value", ::arrow::binary(), /*nullable=*/false); + auto storage_type = ::arrow::struct_({metadata, value}); + ICEBERG_ARROW_RETURN_NOT_OK(::arrow::RegisterExtensionType( + internal::checked_pointer_cast<::arrow::ExtensionType>( + ::arrow::extension::variant(storage_type)))); + return {}; +} + Result Project(const Schema& expected_schema, const ::parquet::arrow::SchemaManifest& parquet_schema) { ICEBERG_ASSIGN_OR_RAISE(auto field_projection, diff --git a/src/iceberg/parquet/parquet_schema_util_internal.h b/src/iceberg/parquet/parquet_schema_util_internal.h index 8e06b0bcf..64ac56247 100644 --- a/src/iceberg/parquet/parquet_schema_util_internal.h +++ b/src/iceberg/parquet/parquet_schema_util_internal.h @@ -62,4 +62,7 @@ std::vector SelectedColumnIndices(const SchemaProjection& projection); /// \return True if the Parquet schema has field IDs, false otherwise. bool HasFieldIds(const ::parquet::schema::NodePtr& root_node); +/// \brief Register Arrow's Parquet Variant extension type if available. +Status EnsureParquetVariantExtensionRegistered(); + } // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index c70d3310c..2ac427d57 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -33,6 +33,7 @@ #include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/parquet/parquet_schema_util_internal.h" #include "iceberg/schema_internal.h" #include "iceberg/util/macros.h" @@ -99,6 +100,7 @@ class ParquetWriter::Impl { ArrowSchema c_schema; ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &c_schema)); + ICEBERG_RETURN_UNEXPECTED(EnsureParquetVariantExtensionRegistered()); ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, ::arrow::ImportSchema(&c_schema)); std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor; diff --git a/src/iceberg/schema_internal.cc b/src/iceberg/schema_internal.cc index bdd5b859f..eecf96164 100644 --- a/src/iceberg/schema_internal.cc +++ b/src/iceberg/schema_internal.cc @@ -37,6 +37,7 @@ namespace { constexpr const char* kArrowExtensionName = "ARROW:extension:name"; constexpr const char* kArrowExtensionMetadata = "ARROW:extension:metadata"; constexpr const char* kArrowUuidExtensionName = "arrow.uuid"; +constexpr const char* kArrowVariantExtensionName = "arrow.parquet.variant"; constexpr int32_t kUnknownFieldId = -1; // Convert an Iceberg type to Arrow schema. Return value is Nanoarrow error code. @@ -150,6 +151,20 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n ArrowMetadataBuilderAppend(&metadata_buffer, ArrowCharView(kArrowExtensionName), ArrowCharView(kArrowUuidExtensionName))); } break; + case TypeId::kVariant: { + NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeStruct(schema, 2)); + NANOARROW_RETURN_NOT_OK(ToArrowSchema(*binary(), /*optional=*/false, "metadata", + /*field_id=*/std::nullopt, + schema->children[0])); + NANOARROW_RETURN_NOT_OK(ToArrowSchema(*binary(), /*optional=*/false, "value", + /*field_id=*/std::nullopt, + schema->children[1])); + NANOARROW_RETURN_NOT_OK( + ArrowMetadataBuilderAppend(&metadata_buffer, ArrowCharView(kArrowExtensionName), + ArrowCharView(kArrowVariantExtensionName))); + NANOARROW_RETURN_NOT_OK(ArrowMetadataBuilderAppend( + &metadata_buffer, ArrowCharView(kArrowExtensionMetadata), ArrowCharView(""))); + } break; } if (!name.empty()) { @@ -231,6 +246,46 @@ Result> FromArrowSchema(const ArrowSchema& schema) { arrow_error.message); } + auto extension_name = std::string_view(schema_view.extension_name.data, + schema_view.extension_name.size_bytes); + if (extension_name == kArrowVariantExtensionName || + extension_name == "parquet.variant") { + if (schema_view.type != NANOARROW_TYPE_STRUCT || schema.n_children != 2) { + return InvalidSchema("Variant Arrow storage must be a struct with 2 fields"); + } + for (int i = 0; i < schema.n_children; ++i) { + ArrowError child_error; + ArrowErrorInit(&child_error); + ArrowSchemaView child_view; + if (auto error_code = + ArrowSchemaViewInit(&child_view, schema.children[i], &child_error); + error_code != NANOARROW_OK) { + return InvalidSchema( + "Failed to read Variant child Arrow schema, code: {}, " + "message: {}", + error_code, child_error.message); + } + if (child_view.type != NANOARROW_TYPE_BINARY && + child_view.type != NANOARROW_TYPE_LARGE_BINARY) { + return InvalidSchema("Variant Arrow child '{}' must be binary", + schema.children[i]->name); + } + if ((schema.children[i]->flags & ARROW_FLAG_NULLABLE) != 0) { + return InvalidSchema("Variant Arrow child '{}' must be non-nullable", + schema.children[i]->name); + } + } + std::string_view child0_name = schema.children[0]->name; + std::string_view child1_name = schema.children[1]->name; + bool has_metadata_and_value = (child0_name == "metadata" && child1_name == "value") || + (child0_name == "value" && child1_name == "metadata"); + if (!has_metadata_and_value) { + return InvalidSchema( + "Variant Arrow storage must contain metadata and value fields"); + } + return iceberg::variant(); + } + switch (schema_view.type) { case NANOARROW_TYPE_STRUCT: { std::vector fields; diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index 2f0c7e181..4aec32b20 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -80,6 +80,7 @@ struct ICEBERG_EXPORT TableMetadata { static inline const std::unordered_map kMinFormatVersions = { {TypeId::kTimestampNs, 3}, {TypeId::kTimestampTzNs, 3}, + {TypeId::kVariant, 3}, }; /// An integer version number for the format diff --git a/src/iceberg/test/arrow_test.cc b/src/iceberg/test/arrow_test.cc index dcfdb6b56..c41cc81c7 100644 --- a/src/iceberg/test/arrow_test.cc +++ b/src/iceberg/test/arrow_test.cc @@ -22,7 +22,9 @@ #include #include +#include #include +#include #include #include #include @@ -32,9 +34,30 @@ #include "iceberg/schema.h" #include "iceberg/schema_internal.h" #include "iceberg/test/matchers.h" +#include "iceberg/util/checked_cast.h" namespace iceberg { +namespace { + +std::shared_ptr<::arrow::DataType> ArrowVariantType() { + return ::arrow::extension::variant(::arrow::struct_({ + ::arrow::field("metadata", ::arrow::binary(), /*nullable=*/false), + ::arrow::field("value", ::arrow::binary(), /*nullable=*/false), + })); +} + +void EnsureArrowVariantExtensionRegistered() { + if (::arrow::GetExtensionType("arrow.parquet.variant") != nullptr) { + return; + } + ASSERT_TRUE(::arrow::RegisterExtensionType( + std::static_pointer_cast<::arrow::ExtensionType>(ArrowVariantType())) + .ok()); +} + +} // namespace + struct ToArrowSchemaParam { std::shared_ptr iceberg_type; bool optional = true; @@ -109,6 +132,42 @@ INSTANTIATE_TEST_SUITE_P( ToArrowSchemaParam{.iceberg_type = iceberg::fixed(20), .arrow_type = ::arrow::fixed_size_binary(20)})); +TEST(ToArrowSchemaTest, VariantType) { + EnsureArrowVariantExtensionRegistered(); + + Schema schema({SchemaField::MakeOptional(3, "payload", iceberg::variant())}, + /*schema_id=*/0); + ArrowSchema arrow_schema; + ASSERT_THAT(ToArrowSchema(schema, &arrow_schema), IsOk()); + + auto imported_schema = ::arrow::ImportSchema(&arrow_schema).ValueOrDie(); + ASSERT_EQ(imported_schema->num_fields(), 1); + + auto field = imported_schema->field(0); + ASSERT_EQ(field->name(), "payload"); + ASSERT_TRUE(field->nullable()); + ASSERT_EQ(field->type()->id(), ::arrow::Type::EXTENSION); + const auto& extension_type = + internal::checked_cast(*field->type()); + ASSERT_EQ(extension_type.extension_name(), "arrow.parquet.variant"); + ASSERT_TRUE(field->metadata()->Contains(kParquetFieldIdKey)); + ASSERT_EQ(field->metadata()->Get(kParquetFieldIdKey), "3"); + + const auto& storage_type = + internal::checked_cast(*extension_type.storage_type()); + ASSERT_EQ(storage_type.num_fields(), 2); + ASSERT_EQ(storage_type.field(0)->name(), "metadata"); + ASSERT_EQ(storage_type.field(0)->type()->id(), ::arrow::Type::BINARY); + ASSERT_FALSE(storage_type.field(0)->nullable()); + ASSERT_TRUE(storage_type.field(0)->metadata() == nullptr || + !storage_type.field(0)->metadata()->Contains(kParquetFieldIdKey)); + ASSERT_EQ(storage_type.field(1)->name(), "value"); + ASSERT_EQ(storage_type.field(1)->type()->id(), ::arrow::Type::BINARY); + ASSERT_FALSE(storage_type.field(1)->nullable()); + ASSERT_TRUE(storage_type.field(1)->metadata() == nullptr || + !storage_type.field(1)->metadata()->Contains(kParquetFieldIdKey)); +} + namespace { void CheckArrowField(const ::arrow::Field& field, ::arrow::Type::type type_id, @@ -306,6 +365,8 @@ INSTANTIATE_TEST_SUITE_P( .iceberg_type = iceberg::binary()}, FromArrowSchemaParam{.arrow_type = ::arrow::extension::uuid(), .iceberg_type = iceberg::uuid()}, + FromArrowSchemaParam{.arrow_type = ArrowVariantType(), + .iceberg_type = iceberg::variant()}, FromArrowSchemaParam{.arrow_type = ::arrow::fixed_size_binary(20), .iceberg_type = iceberg::fixed(20)})); diff --git a/src/iceberg/test/avro_schema_test.cc b/src/iceberg/test/avro_schema_test.cc index dc2cb0a51..1e88174f0 100644 --- a/src/iceberg/test/avro_schema_test.cc +++ b/src/iceberg/test/avro_schema_test.cc @@ -17,6 +17,7 @@ * under the License. */ +#include #include #include @@ -250,6 +251,64 @@ TEST(ToAvroNodeVisitorTest, BinaryType) { EXPECT_EQ(node->type(), ::avro::AVRO_BYTES); } +TEST(ToAvroNodeVisitorTest, VariantType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(VariantType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_RECORD); + EXPECT_EQ(node->name().fullname(), "variant"); + ASSERT_NO_FATAL_FAILURE(CheckCustomLogicalType(node, "variant")); + + ASSERT_EQ(node->names(), 2); + EXPECT_EQ(node->nameAt(0), "metadata"); + EXPECT_EQ(node->nameAt(1), "value"); + ASSERT_EQ(node->leaves(), 2); + EXPECT_EQ(node->leafAt(0)->type(), ::avro::AVRO_BYTES); + EXPECT_EQ(node->leafAt(1)->type(), ::avro::AVRO_BYTES); + EXPECT_EQ(node->customAttributes(), 0); +} + +TEST(ToAvroNodeVisitorTest, VariantFieldType) { + ::avro::NodePtr node; + EXPECT_THAT( + ToAvroNodeVisitor{}.Visit( + SchemaField::MakeOptional(/*field_id=*/7, "payload", iceberg::variant()), + &node), + IsOk()); + + ASSERT_EQ(node->type(), ::avro::AVRO_UNION); + ASSERT_EQ(node->leaves(), 2); + EXPECT_EQ(node->leafAt(0)->type(), ::avro::AVRO_NULL); + auto variant_node = node->leafAt(1); + ASSERT_EQ(variant_node->type(), ::avro::AVRO_RECORD); + EXPECT_EQ(variant_node->name().fullname(), "r7"); + ASSERT_NO_FATAL_FAILURE(CheckCustomLogicalType(variant_node, "variant")); +} + +TEST(ToAvroNodeVisitorTest, VariantSchemaConversionUsesFieldIdsForRecordNames) { + Schema schema({ + SchemaField::MakeRequired(/*field_id=*/1, "variantCol1", iceberg::variant()), + SchemaField::MakeRequired(/*field_id=*/2, "variantCol2", iceberg::variant()), + }); + + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(schema, &node), IsOk()); + ASSERT_EQ(node->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(node->leaves(), 2); + + for (int id = 1; id <= 2; ++id) { + auto variant_node = node->leafAt(id - 1); + ASSERT_EQ(variant_node->type(), ::avro::AVRO_RECORD); + EXPECT_EQ(variant_node->name().fullname(), std::format("r{}", id)); + ASSERT_NO_FATAL_FAILURE(CheckCustomLogicalType(variant_node, "variant")); + ASSERT_EQ(variant_node->names(), 2); + EXPECT_EQ(variant_node->nameAt(0), "metadata"); + EXPECT_EQ(variant_node->nameAt(1), "value"); + ASSERT_EQ(variant_node->leaves(), 2); + EXPECT_EQ(variant_node->leafAt(0)->type(), ::avro::AVRO_BYTES); + EXPECT_EQ(variant_node->leafAt(1)->type(), ::avro::AVRO_BYTES); + } +} + TEST(ToAvroNodeVisitorTest, StructType) { StructType struct_type{{SchemaField{/*field_id=*/1, "bool_field", iceberg::boolean(), /*optional=*/false}, @@ -666,6 +725,18 @@ TEST(HasIdVisitorTest, ArrayBackedMapWithPartialIds) { EXPECT_FALSE(visitor.AllHaveIds()); } +TEST(HasIdVisitorTest, VariantField) { + ::avro::NodePtr node; + Schema schema( + {SchemaField::MakeRequired(/*field_id=*/1, "payload", iceberg::variant())}); + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(schema, &node), IsOk()); + + HasIdVisitor visitor; + EXPECT_THAT(visitor.Visit(node), IsOk()); + EXPECT_FALSE(visitor.HasNoIds()); + EXPECT_TRUE(visitor.AllHaveIds()); +} + TEST(AvroSchemaProjectionTest, ProjectIdenticalSchemas) { // Create an iceberg schema Schema expected_schema({ @@ -847,6 +918,22 @@ TEST(AvroSchemaProjectionTest, ProjectMetadataColumn) { ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kMetadata); } +TEST(AvroSchemaProjectionTest, ProjectVariantField) { + Schema expected_schema( + {SchemaField::MakeRequired(/*field_id=*/1, "payload", iceberg::variant())}); + + ::avro::NodePtr avro_node; + ASSERT_THAT(ToAvroNodeVisitor{}.Visit(expected_schema, &avro_node), IsOk()); + + auto projection_result = Project(expected_schema, avro_node, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); +} + TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionIntToLong) { // Create iceberg schema expecting a long Schema expected_schema({ diff --git a/src/iceberg/test/parquet_schema_test.cc b/src/iceberg/test/parquet_schema_test.cc index a9da3f9f7..b1bd30aea 100644 --- a/src/iceberg/test/parquet_schema_test.cc +++ b/src/iceberg/test/parquet_schema_test.cc @@ -17,15 +17,18 @@ * under the License. */ +#include #include #include #include +#include #include #include #include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_schema_util_internal.h" #include "iceberg/schema.h" +#include "iceberg/schema_internal.h" #include "iceberg/test/matchers.h" #include "iceberg/type.h" @@ -59,6 +62,14 @@ ::parquet::schema::NodePtr MakeStringNode(const std::string& name, int field_id /*primitive_length=*/-1, field_id); } +::parquet::schema::NodePtr MakeBinaryNode(const std::string& name, int field_id = -1, + bool optional = true) { + return ::parquet::schema::PrimitiveNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + ::parquet::LogicalType::None(), ::parquet::Type::BYTE_ARRAY, + /*primitive_length=*/-1, field_id); +} + ::parquet::schema::NodePtr MakeDoubleNode(const std::string& name, int field_id = -1, bool optional = true) { return ::parquet::schema::PrimitiveNode::Make( @@ -104,14 +115,26 @@ ::parquet::schema::NodePtr MakeMapNode(const std::string& name, {key_value_group}, ::parquet::LogicalType::Map(), field_id); } +::parquet::schema::NodePtr MakeVariantNode(const std::string& name, int field_id = -1, + bool optional = true) { + return ::parquet::schema::GroupNode::Make( + name, optional ? ::parquet::Repetition::OPTIONAL : ::parquet::Repetition::REQUIRED, + { + MakeBinaryNode("metadata", /*field_id=*/-1, /*optional=*/false), + MakeBinaryNode("value", /*field_id=*/-1, /*optional=*/false), + }, + ::parquet::LogicalType::Variant(), field_id); +} + // Helper to create SchemaManifest from Parquet schema ::parquet::arrow::SchemaManifest MakeSchemaManifest( - const ::parquet::schema::NodePtr& parquet_schema) { + const ::parquet::schema::NodePtr& parquet_schema, + bool arrow_extensions_enabled = true) { auto parquet_schema_descriptor = std::make_shared<::parquet::SchemaDescriptor>(); parquet_schema_descriptor->Init(parquet_schema); auto properties = ::parquet::default_arrow_reader_properties(); - properties.set_arrow_extensions_enabled(true); + properties.set_arrow_extensions_enabled(arrow_extensions_enabled); ::parquet::arrow::SchemaManifest manifest; auto status = ::parquet::arrow::SchemaManifest::Make(parquet_schema_descriptor.get(), @@ -138,6 +161,49 @@ TEST(HasFieldIdsTest, PrimitiveNode) { EXPECT_FALSE(HasFieldIds(MakeInt32Node("test_field", /*field_id=*/-1))); } +TEST(ParquetSchemaConversionTest, VariantType) { + ASSERT_THAT(EnsureParquetVariantExtensionRegistered(), IsOk()); + + Schema schema( + {SchemaField::MakeOptional(/*field_id=*/7, "payload", iceberg::variant())}); + ArrowSchema c_schema; + ASSERT_THAT(ToArrowSchema(schema, &c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportSchema(&c_schema).ValueOrDie(); + + std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor; + auto writer_properties = ::parquet::default_writer_properties(); + auto arrow_writer_properties = ::parquet::default_arrow_writer_properties(); + ASSERT_TRUE(::parquet::arrow::ToParquetSchema(arrow_schema.get(), *writer_properties, + *arrow_writer_properties, + &schema_descriptor) + .ok()); + + auto root = std::static_pointer_cast( + schema_descriptor->schema_root()); + ASSERT_EQ(root->field_count(), 1); + auto variant_node = + std::static_pointer_cast(root->field(0)); + ASSERT_EQ(variant_node->name(), "payload"); + ASSERT_EQ(variant_node->field_id(), 7); + ASSERT_EQ(variant_node->repetition(), ::parquet::Repetition::OPTIONAL); + ASSERT_TRUE(variant_node->logical_type()->is_variant()); + ASSERT_EQ(variant_node->field_count(), 2); + + auto metadata_node = std::static_pointer_cast( + variant_node->field(0)); + ASSERT_EQ(metadata_node->name(), "metadata"); + ASSERT_EQ(metadata_node->repetition(), ::parquet::Repetition::REQUIRED); + ASSERT_EQ(metadata_node->physical_type(), ::parquet::Type::BYTE_ARRAY); + ASSERT_EQ(metadata_node->field_id(), -1); + + auto value_node = std::static_pointer_cast( + variant_node->field(1)); + ASSERT_EQ(value_node->name(), "value"); + ASSERT_EQ(value_node->repetition(), ::parquet::Repetition::REQUIRED); + ASSERT_EQ(value_node->physical_type(), ::parquet::Type::BYTE_ARRAY); + ASSERT_EQ(value_node->field_id(), -1); +} + // NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks) TEST(HasFieldIdsTest, GroupNode) { EXPECT_FALSE( @@ -420,6 +486,186 @@ TEST(ParquetSchemaProjectionTest, ProjectMapType) { ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 1})); } +TEST(ParquetSchemaProjectionTest, ProjectVariantType) { + ASSERT_THAT(EnsureParquetVariantExtensionRegistered(), IsOk()); + + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/1, "payload", iceberg::variant()), + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", { + MakeVariantNode("payload", /*field_id=*/1), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + + ASSERT_EQ(projection.fields[0].children.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0].children[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[0].children[1], 1); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 1})); +} + +TEST(ParquetSchemaProjectionTest, ProjectPrunesUnselectedVariantFields) { + ASSERT_THAT(EnsureParquetVariantExtensionRegistered(), IsOk()); + + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int32()), + SchemaField::MakeRequired(/*field_id=*/2, "variant_1", iceberg::variant()), + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", { + MakeInt32Node("id", /*field_id=*/1), + MakeVariantNode("variant_1", /*field_id=*/2), + MakeVariantNode("variant_2", /*field_id=*/3), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[1], 1); + + ASSERT_EQ(projection.fields[1].children.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[1].children[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[1].children[1], 1); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 1, 2})); +} + +TEST(ParquetSchemaProjectionTest, ProjectListOfVariant) { + ASSERT_THAT(EnsureParquetVariantExtensionRegistered(), IsOk()); + + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "payloads", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/101, "element", iceberg::variant()))), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + { + MakeListNode("payloads", MakeVariantNode("element", /*field_id=*/101), + /*field_id=*/1), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + + ASSERT_EQ(projection.fields[0].children.size(), 1); + const auto& element_projection = projection.fields[0].children[0]; + ASSERT_PROJECTED_FIELD(element_projection, 0); + ASSERT_EQ(element_projection.children.size(), 2); + ASSERT_PROJECTED_FIELD(element_projection.children[0], 0); + ASSERT_PROJECTED_FIELD(element_projection.children[1], 1); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 1})); +} + +TEST(ParquetSchemaProjectionTest, ProjectMapWithVariantValue) { + ASSERT_THAT(EnsureParquetVariantExtensionRegistered(), IsOk()); + + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "payloads", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/101, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/102, "value", iceberg::variant()))), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + { + MakeMapNode("payloads", + MakeStringNode("key", /*field_id=*/101, /*optional=*/false), + MakeVariantNode("value", /*field_id=*/102), /*field_id=*/1), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + + ASSERT_EQ(projection.fields[0].children.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0].children[0], 0); + ASSERT_PROJECTED_FIELD(projection.fields[0].children[1], 1); + + const auto& value_projection = projection.fields[0].children[1]; + ASSERT_EQ(value_projection.children.size(), 2); + ASSERT_PROJECTED_FIELD(value_projection.children[0], 0); + ASSERT_PROJECTED_FIELD(value_projection.children[1], 1); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 1, 2})); +} + +TEST(ParquetSchemaProjectionTest, ProjectShreddedVariantNotSupported) { + ASSERT_THAT(EnsureParquetVariantExtensionRegistered(), IsOk()); + + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/1, "payload", iceberg::variant()), + }); + + auto shredded_variant = ::parquet::schema::GroupNode::Make( + "payload", ::parquet::Repetition::OPTIONAL, + { + MakeBinaryNode("metadata", /*field_id=*/-1, /*optional=*/false), + MakeBinaryNode("value", /*field_id=*/-1, /*optional=*/false), + MakeStringNode("typed_value", /*field_id=*/-1), + }, + ::parquet::LogicalType::Variant(), /*field_id=*/1); + auto parquet_schema = MakeGroupNode("iceberg_schema", {shredded_variant}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema, + /*arrow_extensions_enabled=*/false); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kNotSupported)); + ASSERT_THAT(projection_result, HasErrorMessage("Reading shredded Variant columns")); +} + +TEST(ParquetSchemaProjectionTest, ProjectVariantRequiresRequiredBinaryChildren) { + ASSERT_THAT(EnsureParquetVariantExtensionRegistered(), IsOk()); + + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/1, "payload", iceberg::variant()), + }); + + auto variant_with_optional_value = ::parquet::schema::GroupNode::Make( + "payload", ::parquet::Repetition::OPTIONAL, + { + MakeBinaryNode("metadata", /*field_id=*/-1, /*optional=*/false), + MakeBinaryNode("value", /*field_id=*/-1, /*optional=*/true), + }, + ::parquet::LogicalType::Variant(), /*field_id=*/1); + auto parquet_schema = MakeGroupNode("iceberg_schema", {variant_with_optional_value}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema, + /*arrow_extensions_enabled=*/false); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("metadata and value fields must be required")); +} + TEST(ParquetSchemaProjectionTest, ProjectListOfStruct) { Schema expected_schema({ SchemaField::MakeOptional( diff --git a/src/iceberg/test/schema_json_test.cc b/src/iceberg/test/schema_json_test.cc index c9532eeb6..b0f7a1639 100644 --- a/src/iceberg/test/schema_json_test.cc +++ b/src/iceberg/test/schema_json_test.cc @@ -73,6 +73,7 @@ INSTANTIATE_TEST_SUITE_P( .type = std::make_shared()}, SchemaJsonParam{.json = "\"timestamp_ns\"", .type = iceberg::timestamp_ns()}, SchemaJsonParam{.json = "\"timestamptz_ns\"", .type = iceberg::timestamptz_ns()}, + SchemaJsonParam{.json = "\"variant\"", .type = iceberg::variant()}, SchemaJsonParam{ .json = R"({"element":"string","element-id":3,"element-required":true,"type":"list"})", diff --git a/src/iceberg/test/schema_test.cc b/src/iceberg/test/schema_test.cc index 838b57600..b6500112a 100644 --- a/src/iceberg/test/schema_test.cc +++ b/src/iceberg/test/schema_test.cc @@ -102,6 +102,8 @@ TEST(SchemaTest, ValidateRejectsV3TypesBeforeFormatV3) { {iceberg::SchemaField(1, "timestamp_ns", iceberg::timestamp_ns(), false)}); iceberg::Schema timestamptz_ns_schema( {iceberg::SchemaField(1, "timestamptz_ns", iceberg::timestamptz_ns(), false)}); + iceberg::Schema variant_schema( + {iceberg::SchemaField(1, "payload", iceberg::variant(), false)}); auto status = timestamp_ns_schema.Validate(2); ASSERT_THAT(status, iceberg::IsError(iceberg::ErrorKind::kInvalidSchema)); @@ -115,12 +117,20 @@ TEST(SchemaTest, ValidateRejectsV3TypesBeforeFormatV3) { "Invalid type for timestamptz_ns: timestamptz_ns is not " "supported until v3")); + status = variant_schema.Validate(2); + ASSERT_THAT(status, iceberg::IsError(iceberg::ErrorKind::kInvalidSchema)); + EXPECT_THAT(status, iceberg::HasErrorMessage( + "Invalid type for payload: variant is not supported until v3")); + EXPECT_THAT( timestamp_ns_schema.Validate(iceberg::TableMetadata::kSupportedTableFormatVersion), iceberg::IsOk()); EXPECT_THAT(timestamptz_ns_schema.Validate( iceberg::TableMetadata::kSupportedTableFormatVersion), iceberg::IsOk()); + EXPECT_THAT( + variant_schema.Validate(iceberg::TableMetadata::kSupportedTableFormatVersion), + iceberg::IsOk()); } TEST(SchemaTest, IdentifierFields) { @@ -900,30 +910,44 @@ TEST_P(ProjectParamTest, ProjectFields) { INSTANTIATE_TEST_SUITE_P( ProjectTestCases, ProjectParamTest, - ::testing::Values(ProjectTestParam{.test_name = "ProjectAllFields", - .create_schema = []() { return BasicSchema(); }, - .selected_ids = {1, 2, 3, 4}, - .expected_schema = []() { return BasicSchema(); }, - .should_succeed = true}, - - ProjectTestParam{ - .test_name = "ProjectSingleField", - .create_schema = []() { return BasicSchema(); }, - .selected_ids = {2}, - .expected_schema = []() { return MakeSchema(Name()); }, - .should_succeed = true}, + ::testing::Values( + ProjectTestParam{.test_name = "ProjectAllFields", + .create_schema = []() { return BasicSchema(); }, + .selected_ids = {1, 2, 3, 4}, + .expected_schema = []() { return BasicSchema(); }, + .should_succeed = true}, + + ProjectTestParam{.test_name = "ProjectSingleField", + .create_schema = []() { return BasicSchema(); }, + .selected_ids = {2}, + .expected_schema = []() { return MakeSchema(Name()); }, + .should_succeed = true}, + + ProjectTestParam{.test_name = "ProjectNonExistentFieldId", + .create_schema = []() { return BasicSchema(); }, + .selected_ids = {999}, + .expected_schema = []() { return MakeSchema(); }, + .should_succeed = true}, + + ProjectTestParam{.test_name = "ProjectEmptySelection", + .create_schema = []() { return BasicSchema(); }, + .selected_ids = {}, + .expected_schema = []() { return MakeSchema(); }, + .should_succeed = true}, - ProjectTestParam{.test_name = "ProjectNonExistentFieldId", - .create_schema = []() { return BasicSchema(); }, - .selected_ids = {999}, - .expected_schema = []() { return MakeSchema(); }, - .should_succeed = true}, - - ProjectTestParam{.test_name = "ProjectEmptySelection", - .create_schema = []() { return BasicSchema(); }, - .selected_ids = {}, - .expected_schema = []() { return MakeSchema(); }, - .should_succeed = true})); + ProjectTestParam{.test_name = "ProjectVariantField", + .create_schema = + []() { + return MakeSchema(iceberg::SchemaField::MakeOptional( + 10, "payload", iceberg::variant())); + }, + .selected_ids = {10}, + .expected_schema = + []() { + return MakeSchema(iceberg::SchemaField::MakeOptional( + 10, "payload", iceberg::variant())); + }, + .should_succeed = true})); INSTANTIATE_TEST_SUITE_P(ProjectNestedTestCases, ProjectParamTest, ::testing::Values(ProjectTestParam{ diff --git a/src/iceberg/test/type_test.cc b/src/iceberg/test/type_test.cc index e68843be4..430804036 100644 --- a/src/iceberg/test/type_test.cc +++ b/src/iceberg/test/type_test.cc @@ -54,6 +54,7 @@ TEST_P(TypeTest, TypeId) { TEST_P(TypeTest, IsPrimitive) { const auto& test_case = GetParam(); + ASSERT_FALSE(test_case.type->is_variant()); if (test_case.primitive) { ASSERT_TRUE(test_case.type->is_primitive()); ASSERT_FALSE(test_case.type->is_nested()); @@ -66,6 +67,7 @@ TEST_P(TypeTest, IsPrimitive) { TEST_P(TypeTest, IsNested) { const auto& test_case = GetParam(); + ASSERT_FALSE(test_case.type->is_variant()); if (!test_case.primitive) { ASSERT_FALSE(test_case.type->is_primitive()); ASSERT_TRUE(test_case.type->is_nested()); @@ -273,6 +275,7 @@ TEST(TypeTest, Equality) { for (const auto& test_case : kNestedTypes) { alltypes.push_back(test_case.type); } + alltypes.push_back(iceberg::variant()); for (size_t i = 0; i < alltypes.size(); i++) { for (size_t j = 0; j < alltypes.size(); j++) { @@ -287,6 +290,16 @@ TEST(TypeTest, Equality) { } } +TEST(TypeTest, Variant) { + ASSERT_EQ(iceberg::TypeId::kVariant, iceberg::variant()->type_id()); + ASSERT_FALSE(iceberg::variant()->is_primitive()); + ASSERT_FALSE(iceberg::variant()->is_nested()); + ASSERT_TRUE(iceberg::variant()->is_variant()); + ASSERT_EQ("variant", iceberg::variant()->ToString()); + ASSERT_EQ(*iceberg::variant(), *iceberg::variant()); + ASSERT_EQ("variant", std::format("{}", *iceberg::variant())); +} + TEST(TypeTest, Decimal) { { iceberg::DecimalType decimal(38, 2); diff --git a/src/iceberg/type.cc b/src/iceberg/type.cc index b5bee37e2..754e7aee0 100644 --- a/src/iceberg/type.cc +++ b/src/iceberg/type.cc @@ -369,8 +369,12 @@ TypeId BinaryType::type_id() const { return kTypeId; } std::string BinaryType::ToString() const { return "binary"; } bool BinaryType::Equals(const Type& other) const { return other.type_id() == kTypeId; } +TypeId VariantType::type_id() const { return kTypeId; } +std::string VariantType::ToString() const { return "variant"; } +bool VariantType::Equals(const Type& other) const { return other.type_id() == kTypeId; } + // ---------------------------------------------------------------------- -// Factory functions for creating primitive data types +// Factory functions for creating data types #define TYPE_FACTORY(NAME, KLASS) \ const std::shared_ptr& NAME() { \ @@ -392,6 +396,7 @@ TYPE_FACTORY(timestamptz_ns, TimestampTzNsType) TYPE_FACTORY(binary, BinaryType) TYPE_FACTORY(string, StringType) TYPE_FACTORY(uuid, UuidType) +TYPE_FACTORY(variant, VariantType) #undef TYPE_FACTORY @@ -455,6 +460,8 @@ std::string_view ToString(TypeId id) { return "fixed"; case TypeId::kBinary: return "binary"; + case TypeId::kVariant: + return "variant"; } std::unreachable(); diff --git a/src/iceberg/type.h b/src/iceberg/type.h index 53237cdb5..29e0ebbaf 100644 --- a/src/iceberg/type.h +++ b/src/iceberg/type.h @@ -54,6 +54,9 @@ class ICEBERG_EXPORT Type : public iceberg::util::Formattable { /// \brief Is this a nested type (may have child fields)? [[nodiscard]] virtual bool is_nested() const = 0; + /// \brief Is this a variant type? + [[nodiscard]] virtual bool is_variant() const { return false; } + /// \brief Compare two types for equality. friend bool operator==(const Type& lhs, const Type& rhs) { return lhs.Equals(rhs); } @@ -208,6 +211,28 @@ class ICEBERG_EXPORT MapType : public NestedType { /// @} +/// \brief Iceberg v3 semi-structured Variant type. +/// +/// Variant is a leaf type, but the Iceberg spec does not classify it as a +/// primitive type. +class ICEBERG_EXPORT VariantType : public Type { + public: + constexpr static const TypeId kTypeId = TypeId::kVariant; + + VariantType() = default; + ~VariantType() override = default; + + bool is_primitive() const override { return false; } + bool is_nested() const override { return false; } + bool is_variant() const override { return true; } + + TypeId type_id() const override; + std::string ToString() const override; + + protected: + bool Equals(const Type& other) const override; +}; + /// \defgroup type-primitive Primitive Types /// Primitive types do not have nested fields. /// @{ @@ -505,9 +530,9 @@ class ICEBERG_EXPORT UuidType : public PrimitiveType { /// @} -/// \defgroup type-factories Factory functions for creating primitive data types +/// \defgroup type-factories Factory functions for creating data types /// -/// Factory functions for creating primitive data types +/// Factory functions for creating data types. /// @{ /// \brief Return a BooleanType instance. @@ -539,6 +564,9 @@ ICEBERG_EXPORT const std::shared_ptr& string(); /// \brief Return a UuidType instance. ICEBERG_EXPORT const std::shared_ptr& uuid(); +/// \brief Return a VariantType instance. +ICEBERG_EXPORT const std::shared_ptr& variant(); + /// \brief Create a DecimalType with the given precision and scale. /// \param precision The number of decimal digits (max 38). /// \param scale The number of decimal digits after the decimal point. diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 144a9e33a..d2a33ceeb 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -31,7 +31,6 @@ namespace iceberg { /// This is not a complete data type by itself because some types are nested /// and/or parameterized. /// -/// Iceberg V3 types are not currently supported. enum class TypeId { kStruct, kList, @@ -52,6 +51,7 @@ enum class TypeId { kUuid, kFixed, kBinary, + kVariant, }; /// \brief The time unit. In Iceberg V3 nanoseconds are also supported. @@ -84,6 +84,7 @@ class TimestampNsType; class TimestampTzNsType; class Type; class UuidType; +class VariantType; /// \brief Data values. class Decimal; diff --git a/src/iceberg/update/update_schema.cc b/src/iceberg/update/update_schema.cc index 1f35781fa..6ca2b1bb3 100644 --- a/src/iceberg/update/update_schema.cc +++ b/src/iceberg/update/update_schema.cc @@ -181,6 +181,12 @@ class ApplyChangesVisitor { return base_type; } + Result> VisitVariant(const VariantType& variant_type, + const std::shared_ptr& base_type, + int32_t parent_id) { + return base_type; + } + private: Result> ProcessField( const SchemaField& field, const std::shared_ptr& field_type_result) { diff --git a/src/iceberg/util/struct_like_set.cc b/src/iceberg/util/struct_like_set.cc index 433cfa681..b93a3b15c 100644 --- a/src/iceberg/util/struct_like_set.cc +++ b/src/iceberg/util/struct_like_set.cc @@ -316,6 +316,8 @@ Status ValidateScalarAgainstType(const Scalar& scalar, const Type& type) { value.size()); return {}; } + case TypeId::kVariant: + return NotSupported("StructLike variant values are not supported"); case TypeId::kStruct: { ICEBERG_PRECHECK(std::holds_alternative>(scalar), "Expected struct but got {}", ScalarTypeName(scalar)); diff --git a/src/iceberg/util/type_util.cc b/src/iceberg/util/type_util.cc index c6b9bb3ed..81c525524 100644 --- a/src/iceberg/util/type_util.cc +++ b/src/iceberg/util/type_util.cc @@ -37,6 +37,8 @@ IdToFieldVisitor::IdToFieldVisitor( Status IdToFieldVisitor::Visit(const PrimitiveType& type) { return {}; } +Status IdToFieldVisitor::Visit(const VariantType& type) { return {}; } + Status IdToFieldVisitor::Visit(const NestedType& type) { const auto& nested = internal::checked_cast(type); const auto& fields = nested.fields(); @@ -128,6 +130,11 @@ Status NameToIdVisitor::Visit(const PrimitiveType& type, const std::string& path return {}; } +Status NameToIdVisitor::Visit(const VariantType& type, const std::string& path, + const std::string& short_path) { + return {}; +} + std::string NameToIdVisitor::BuildPath(std::string_view prefix, std::string_view field_name, bool case_sensitive) { std::string quoted_name; @@ -168,6 +175,20 @@ Status PositionPathVisitor::Visit(const PrimitiveType& type) { return {}; } +Status PositionPathVisitor::Visit(const VariantType& type) { + if (current_field_id_ == kUnassignedFieldId) { + return InvalidSchema("Current field id is not assigned, type: {}", type.ToString()); + } + + if (auto ret = position_path_.try_emplace(current_field_id_, current_path_); + !ret.second) { + return InvalidSchema("Duplicate field id found: {}, prev path: {}, curr path: {}", + current_field_id_, ret.first->second, current_path_); + } + + return {}; +} + Status PositionPathVisitor::Visit(const StructType& type) { for (size_t i = 0; i < type.fields().size(); ++i) { const auto& field = type.fields()[i]; @@ -208,8 +229,10 @@ Result> PruneColumnVisitor::Visit( Result> PruneColumnVisitor::Visit(const SchemaField& field) const { if (selected_ids_.contains(field.field_id())) { - return (select_full_types_ || field.type()->is_primitive()) ? field.type() - : Visit(field.type()); + return (select_full_types_ || field.type()->is_primitive() || + field.type()->is_variant()) + ? field.type() + : Visit(field.type()); } return Visit(field.type()); } @@ -278,6 +301,8 @@ GetProjectedIdsVisitor::GetProjectedIdsVisitor(bool include_struct_ids) Status GetProjectedIdsVisitor::Visit(const Type& type) { if (type.is_nested()) { return VisitNested(internal::checked_cast(type)); + } else if (type.is_variant()) { + return VisitVariant(internal::checked_cast(type)); } else { return VisitPrimitive(internal::checked_cast(type)); } @@ -288,9 +313,8 @@ Status GetProjectedIdsVisitor::VisitNested(const NestedType& type) { ICEBERG_RETURN_UNEXPECTED(Visit(*field.type())); } for (auto& field : type.fields()) { - // TODO(zhuo.wang) or is_variant if ((include_struct_ids_ && field.type()->type_id() == TypeId::kStruct) || - field.type()->is_primitive()) { + field.type()->is_primitive() || field.type()->is_variant()) { ids_.insert(field.field_id()); } } @@ -299,6 +323,8 @@ Status GetProjectedIdsVisitor::VisitNested(const NestedType& type) { Status GetProjectedIdsVisitor::VisitPrimitive(const PrimitiveType& type) { return {}; } +Status GetProjectedIdsVisitor::VisitVariant(const VariantType& type) { return {}; } + std::unordered_set GetProjectedIdsVisitor::Finish() const { return ids_; } Result> GetProjectedIdsVisitor::GetProjectedIds( diff --git a/src/iceberg/util/type_util.h b/src/iceberg/util/type_util.h index ceb5e62ec..7d0998c06 100644 --- a/src/iceberg/util/type_util.h +++ b/src/iceberg/util/type_util.h @@ -45,6 +45,7 @@ class IdToFieldVisitor { std::unordered_map>& id_to_field); Status Visit(const PrimitiveType& type); + Status Visit(const VariantType& type); Status Visit(const NestedType& type); private: @@ -67,6 +68,8 @@ class NameToIdVisitor { const std::string& short_path); Status Visit(const PrimitiveType& type, const std::string& path, const std::string& short_path); + Status Visit(const VariantType& type, const std::string& path, + const std::string& short_path); void Finish(); private: @@ -85,6 +88,7 @@ class NameToIdVisitor { class PositionPathVisitor { public: Status Visit(const PrimitiveType& type); + Status Visit(const VariantType& type); Status Visit(const StructType& type); Status Visit(const ListType& type); Status Visit(const MapType& type); @@ -130,6 +134,7 @@ class GetProjectedIdsVisitor { Status Visit(const Type& type); Status VisitNested(const NestedType& type); Status VisitPrimitive(const PrimitiveType& type); + Status VisitVariant(const VariantType& type); std::unordered_set Finish() const; static Result> GetProjectedIds( diff --git a/src/iceberg/util/visitor_generate.h b/src/iceberg/util/visitor_generate.h index 7a3648546..e8ff3a105 100644 --- a/src/iceberg/util/visitor_generate.h +++ b/src/iceberg/util/visitor_generate.h @@ -38,6 +38,7 @@ namespace iceberg { ACTION(Uuid); \ ACTION(Fixed); \ ACTION(Binary); \ + ACTION(Variant); \ ACTION(Struct); \ ACTION(List); \ ACTION(Map); @@ -48,6 +49,7 @@ namespace iceberg { /// - Struct types -> calls ACTION with Struct /// - List types -> calls ACTION with List /// - Map types -> calls ACTION with Map +/// - Variant type -> calls ACTION with Variant /// - All primitive types (default) -> calls ACTION with Primitive #define ICEBERG_TYPE_SWITCH_WITH_PRIMITIVE_DEFAULT(ACTION) \ case ::iceberg::TypeId::kStruct: \ @@ -56,6 +58,8 @@ namespace iceberg { ACTION(List) \ case ::iceberg::TypeId::kMap: \ ACTION(Map) \ + case ::iceberg::TypeId::kVariant: \ + ACTION(Variant) \ default: \ ACTION(Primitive)