Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/avro/avro_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ namespace iceberg::avro {

// Avro logical type constants
constexpr std::string_view kMapLogicalType = "map";
constexpr std::string_view kVariantLogicalType = "variant";

// Name mapping field constants
constexpr std::string_view kElement = "element";
constexpr std::string_view kKey = "key";
constexpr std::string_view kValue = "value";
constexpr std::string_view kMetadata = "metadata";

// Avro custom attributes constants
constexpr std::string_view kIcebergFieldNameProp = "iceberg-field-name";
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/avro/avro_register.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ void RegisterLogicalTypes() {
// See https://github.com/apache/avro/pull/3326 for details.
::avro::CustomLogicalTypeRegistry::instance().registerType(
"map", [](const std::string&) { return std::make_shared<MapLogicalType>(); });
::avro::CustomLogicalTypeRegistry::instance().registerType(
"variant",
[](const std::string&) { return std::make_shared<VariantLogicalType>(); });
}

void RegisterAll() {
Expand Down
48 changes: 47 additions & 1 deletion src/iceberg/avro/avro_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ ::avro::LogicalType GetMapLogicalType() {
return ::avro::LogicalType(std::make_shared<MapLogicalType>());
}

::avro::LogicalType GetVariantLogicalType() {
return ::avro::LogicalType(std::make_shared<VariantLogicalType>());
}

::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) {
::avro::CustomAttributes attributes;
attributes.addAttribute(std::string(kFieldIdProp), std::to_string(field_id),
Expand Down Expand Up @@ -237,6 +241,22 @@ Status ToAvroNodeVisitor::Visit(const BinaryType& type, ::avro::NodePtr* node) {
return {};
}

Status ToAvroNodeVisitor::Visit(const VariantType& type, ::avro::NodePtr* node) {
*node = std::make_shared<::avro::NodeRecord>();
if (field_ids_.empty()) {
(*node)->setName(::avro::Name(std::string(kVariantLogicalType)));
} else {
(*node)->setName(::avro::Name(std::format("r{}", field_ids_.top())));
}
(*node)->setLogicalType(GetVariantLogicalType());

(*node)->addName(std::string(kMetadata));
(*node)->addLeaf(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BYTES));
(*node)->addName(std::string(kValue));
(*node)->addLeaf(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BYTES));
return {};
}

Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) {
*node = std::make_shared<::avro::NodeRecord>();

Expand Down Expand Up @@ -392,6 +412,10 @@ Status HasIdVisitor::Visit(const ::avro::NodePtr& node) {
}

Status HasIdVisitor::VisitRecord(const ::avro::NodePtr& node) {
if (HasVariantLogicalType(node)) {
return {};
}

static const std::string kFieldIdKey{kFieldIdProp};
total_fields_ += node->leaves();
for (size_t i = 0; i < node->leaves(); ++i) {
Expand Down Expand Up @@ -510,6 +534,13 @@ Result<int32_t> GetFieldId(const ::avro::NodePtr& node, size_t field_idx) {
return GetId(node, kFieldIdKey, field_idx);
}

bool IsVariantAvroSchema(const ::avro::NodePtr& node) {
return HasVariantLogicalType(node) && node->type() == ::avro::AVRO_RECORD &&
node->leaves() == 2 && node->names() == 2 && node->nameAt(0) == kMetadata &&
node->nameAt(1) == kValue && node->leafAt(0)->type() == ::avro::AVRO_BYTES &&
node->leafAt(1)->type() == ::avro::AVRO_BYTES;
}

Status ValidateAvroSchemaEvolution(const Type& expected_type,
const ::avro::NodePtr& avro_node) {
switch (expected_type.type_id()) {
Expand Down Expand Up @@ -615,6 +646,11 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type,
return {};
}
break;
case TypeId::kVariant:
if (IsVariantAvroSchema(avro_node)) {
return {};
}
break;
default:
break;
}
Expand Down Expand Up @@ -847,7 +883,13 @@ Result<FieldProjection> ProjectNested(const Type& expected_type,
bool HasMapLogicalType(const ::avro::NodePtr& node) {
return node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
node->logicalType().customLogicalType() != nullptr &&
node->logicalType().customLogicalType()->name() == "map";
node->logicalType().customLogicalType()->name() == kMapLogicalType;
}

bool HasVariantLogicalType(const ::avro::NodePtr& node) {
return node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
node->logicalType().customLogicalType() != nullptr &&
node->logicalType().customLogicalType()->name() == kVariantLogicalType;
}

Result<SchemaProjection> Project(const Schema& expected_schema,
Expand Down Expand Up @@ -1032,6 +1074,10 @@ Result<::avro::NodePtr> MakeUnionNodeWithFieldIds(const ::avro::NodePtr& origina
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
const NameMapping& mapping,
std::vector<std::string>& names) {
if (HasVariantLogicalType(original_node)) {
return original_node;
}

switch (original_node->type()) {
case ::avro::AVRO_RECORD:
return MakeRecordNodeWithFieldIds(original_node, mapping, names);
Expand Down
10 changes: 10 additions & 0 deletions src/iceberg/avro/avro_schema_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ struct MapLogicalType : public ::avro::CustomLogicalType {
MapLogicalType() : ::avro::CustomLogicalType("map") {}
};

struct VariantLogicalType : public ::avro::CustomLogicalType {
VariantLogicalType() : ::avro::CustomLogicalType("variant") {}
};

/// \brief A visitor that converts an Iceberg type to an Avro node.
class ToAvroNodeVisitor {
public:
Expand All @@ -58,6 +62,7 @@ class ToAvroNodeVisitor {
Status Visit(const UuidType& type, ::avro::NodePtr* node);
Status Visit(const FixedType& type, ::avro::NodePtr* node);
Status Visit(const BinaryType& type, ::avro::NodePtr* node);
Status Visit(const VariantType& type, ::avro::NodePtr* node);
Status Visit(const StructType& type, ::avro::NodePtr* node);
Status Visit(const ListType& type, ::avro::NodePtr* node);
Status Visit(const MapType& type, ::avro::NodePtr* node);
Expand Down Expand Up @@ -151,6 +156,11 @@ std::string ToString(const ::avro::LogicalType::Type& logical_type);
/// \return True if the node has a map logical type, false otherwise.
bool HasMapLogicalType(const ::avro::NodePtr& node);

/// \brief Check if an Avro node has a variant logical type.
/// \param node The Avro node to check.
/// \return True if the node has a variant logical type, false otherwise.
bool HasVariantLogicalType(const ::avro::NodePtr& node);

/// \brief Check if a string is a valid Avro name.
///
/// Valid Avro names must:
Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ constexpr std::string_view kType = "type";
constexpr std::string_view kStruct = "struct";
constexpr std::string_view kList = "list";
constexpr std::string_view kMap = "map";
constexpr std::string_view kVariant = "variant";
constexpr std::string_view kElement = "element";
constexpr std::string_view kKey = "key";
constexpr std::string_view kValue = "value";
Expand Down Expand Up @@ -377,6 +378,8 @@ nlohmann::json ToJson(const Type& type) {
}
case TypeId::kUuid:
return "uuid";
case TypeId::kVariant:
return "variant";
}
std::unreachable();
}
Expand Down Expand Up @@ -502,6 +505,8 @@ Result<std::unique_ptr<Type>> TypeFromJson(const nlohmann::json& json) {
return std::make_unique<BinaryType>();
} else if (type_str == "uuid") {
return std::make_unique<UuidType>();
} else if (type_str == kVariant) {
return std::make_unique<VariantType>();
} else if (type_str.starts_with("fixed")) {
std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])");
std::smatch match;
Expand Down
6 changes: 4 additions & 2 deletions src/iceberg/metrics_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& s
Status Visit(const Type& type) {
if (type.is_nested()) {
return VisitNested(internal::checked_cast<const NestedType&>(type));
} else if (type.is_variant()) {
return VisitVariant(internal::checked_cast<const VariantType&>(type));
} else {
return VisitPrimitive(internal::checked_cast<const PrimitiveType&>(type));
}
Expand All @@ -184,8 +186,7 @@ Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& s
if (!ShouldContinue()) {
break;
}
// TODO(zhuo.wang): variant type should also be handled here
if (field.type()->is_primitive()) {
if (field.type()->is_primitive() || field.type()->is_variant()) {
ids_.insert(field.field_id());
}
}
Expand All @@ -199,6 +200,7 @@ Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& s
}

Status VisitPrimitive(const PrimitiveType& type) { return {}; }
Status VisitVariant(const VariantType& type) { return {}; }

std::unordered_set<int32_t> Finish() const { return ids_; }

Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/parquet/parquet_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class ParquetReader::Impl {
read_schema_ = options.projection;

// Prepare reader properties
ICEBERG_RETURN_UNEXPECTED(EnsureParquetVariantExtensionRegistered());
::parquet::ReaderProperties reader_properties(pool_);
::parquet::ArrowReaderProperties arrow_reader_properties;
arrow_reader_properties.set_batch_size(
Expand Down Expand Up @@ -212,6 +213,7 @@ class ParquetReader::Impl {
// Build the output Arrow schema
ArrowSchema arrow_schema;
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema));
ICEBERG_RETURN_UNEXPECTED(EnsureParquetVariantExtensionRegistered());
ICEBERG_ARROW_ASSIGN_OR_RETURN(context_->output_arrow_schema_,
::arrow::ImportSchema(&arrow_schema));

Expand Down
5 changes: 5 additions & 0 deletions src/iceberg/parquet/parquet_register.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@

#include "iceberg/parquet/parquet_register.h"

#include <tuple>

#include "iceberg/parquet/parquet_schema_util_internal.h"

namespace iceberg::parquet {

void RegisterAll() {
std::ignore = EnsureParquetVariantExtensionRegistered();
RegisterReader();
RegisterWriter();
}
Expand Down
Loading
Loading