Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit a82fac0

Browse filesBrowse files
wuwenchiYour Name
authored and
Your Name
committed
[fix](iceberg)Bring field_id with parquet files And fix map type's key optional (#44470)
### What problem does this PR solve? 1. Column IDs are required to be stored as [field IDs](http://github.com/apache/parquet-format/blob/40699d05bd24181de6b1457babbee2c16dce3803/src/main/thrift/parquet.thrift#L459) on the parquet schema. ref: https://iceberg.apache.org/spec/?h=field+id#parquet So, we should add field ids. 2. For `MapType`, its key is always required.
1 parent 88926d2 commit a82fac0
Copy full SHA for a82fac0

File tree

9 files changed

+547
-25
lines changed
Filter options

9 files changed

+547
-25
lines changed
+134Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "vec/exec/format/table/iceberg/arrow_schema_util.h"
19+
20+
#include <arrow/type.h>
21+
#include <arrow/util/key_value_metadata.h>
22+
23+
namespace doris {
24+
namespace iceberg {
25+
26+
const char* ArrowSchemaUtil::PARQUET_FIELD_ID = "PARQUET:field_id";
27+
const char* ArrowSchemaUtil::ORIGINAL_TYPE = "originalType";
28+
const char* ArrowSchemaUtil::MAP_TYPE_VALUE = "mapType";
29+
30+
Status ArrowSchemaUtil::convert(const Schema* schema, const std::string& timezone,
31+
std::vector<std::shared_ptr<arrow::Field>>& fields) {
32+
for (const auto& column : schema->columns()) {
33+
std::shared_ptr<arrow::Field> arrow_field;
34+
RETURN_IF_ERROR(convert_to(column, &arrow_field, timezone));
35+
fields.push_back(arrow_field);
36+
}
37+
return Status::OK();
38+
}
39+
40+
Status ArrowSchemaUtil::convert_to(const iceberg::NestedField& field,
41+
std::shared_ptr<arrow::Field>* arrow_field,
42+
const std::string& timezone) {
43+
std::shared_ptr<arrow::DataType> arrow_type;
44+
std::unordered_map<std::string, std::string> metadata;
45+
metadata[PARQUET_FIELD_ID] = std::to_string(field.field_id());
46+
47+
switch (field.field_type()->type_id()) {
48+
case iceberg::TypeID::BOOLEAN:
49+
arrow_type = arrow::boolean();
50+
break;
51+
52+
case iceberg::TypeID::INTEGER:
53+
arrow_type = arrow::int32();
54+
break;
55+
56+
case iceberg::TypeID::LONG:
57+
arrow_type = arrow::int64();
58+
break;
59+
60+
case iceberg::TypeID::FLOAT:
61+
arrow_type = arrow::float32();
62+
break;
63+
64+
case iceberg::TypeID::DOUBLE:
65+
arrow_type = arrow::float64();
66+
break;
67+
68+
case iceberg::TypeID::DATE:
69+
arrow_type = arrow::date32();
70+
break;
71+
72+
case iceberg::TypeID::TIMESTAMP: {
73+
arrow_type = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO, timezone);
74+
break;
75+
}
76+
77+
case iceberg::TypeID::BINARY:
78+
case iceberg::TypeID::STRING:
79+
case iceberg::TypeID::UUID:
80+
case iceberg::TypeID::FIXED:
81+
arrow_type = arrow::utf8();
82+
break;
83+
84+
case iceberg::TypeID::DECIMAL: {
85+
auto dt = dynamic_cast<DecimalType*>(field.field_type());
86+
arrow_type = arrow::decimal(dt->get_precision(), dt->get_scale());
87+
break;
88+
}
89+
90+
case iceberg::TypeID::STRUCT: {
91+
std::vector<std::shared_ptr<arrow::Field>> element_fields;
92+
StructType* st = field.field_type()->as_struct_type();
93+
for (const auto& column : st->fields()) {
94+
std::shared_ptr<arrow::Field> element_field;
95+
RETURN_IF_ERROR(convert_to(column, &element_field, timezone));
96+
element_fields.push_back(element_field);
97+
}
98+
arrow_type = arrow::struct_(element_fields);
99+
break;
100+
}
101+
102+
case iceberg::TypeID::LIST: {
103+
std::shared_ptr<arrow::Field> item_field;
104+
ListType* list_type = field.field_type()->as_list_type();
105+
RETURN_IF_ERROR(convert_to(list_type->element_field(), &item_field, timezone));
106+
arrow_type = arrow::list(item_field);
107+
break;
108+
}
109+
110+
case iceberg::TypeID::MAP: {
111+
std::shared_ptr<arrow::Field> key_field;
112+
std::shared_ptr<arrow::Field> value_field;
113+
MapType* map_type = field.field_type()->as_map_type();
114+
RETURN_IF_ERROR(convert_to(map_type->key_field(), &key_field, timezone));
115+
RETURN_IF_ERROR(convert_to(map_type->value_field(), &value_field, timezone));
116+
metadata[ORIGINAL_TYPE] = MAP_TYPE_VALUE;
117+
arrow_type = std::make_shared<arrow::MapType>(key_field, value_field);
118+
break;
119+
}
120+
121+
case iceberg::TypeID::TIME:
122+
default:
123+
return Status::InternalError("Unsupported field type:" + field.field_type()->to_string());
124+
}
125+
126+
std::shared_ptr<arrow::KeyValueMetadata> schema_metadata =
127+
std::make_shared<arrow::KeyValueMetadata>(metadata);
128+
*arrow_field =
129+
arrow::field(field.field_name(), arrow_type, field.is_optional(), schema_metadata);
130+
return Status::OK();
131+
}
132+
133+
} // namespace iceberg
134+
} // namespace doris
+45Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <arrow/type.h>
21+
22+
#include <shared_mutex>
23+
24+
#include "vec/exec/format/table/iceberg/schema.h"
25+
26+
namespace doris {
27+
namespace iceberg {
28+
29+
class ArrowSchemaUtil {
30+
public:
31+
static Status convert(const Schema* schema, const std::string& timezone,
32+
std::vector<std::shared_ptr<arrow::Field>>& fields);
33+
34+
private:
35+
static const char* PARQUET_FIELD_ID;
36+
static const char* ORIGINAL_TYPE;
37+
static const char* MAP_TYPE_VALUE;
38+
39+
static Status convert_to(const iceberg::NestedField& field,
40+
std::shared_ptr<arrow::Field>* arrow_field,
41+
const std::string& timezone);
42+
};
43+
44+
} // namespace iceberg
45+
} // namespace doris

‎be/src/vec/exec/format/table/iceberg/types.cpp

Copy file name to clipboardExpand all lines: be/src/vec/exec/format/table/iceberg/types.cpp
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ namespace iceberg {
2525
std::unique_ptr<MapType> MapType::of_optional(int key_id, int value_id,
2626
std::unique_ptr<Type> key_type,
2727
std::unique_ptr<Type> value_type) {
28+
// key is always required
2829
auto key_field =
29-
std::make_unique<NestedField>(true, key_id, "key", std::move(key_type), std::nullopt);
30+
std::make_unique<NestedField>(false, key_id, "key", std::move(key_type), std::nullopt);
3031
auto value_field = std::make_unique<NestedField>(true, value_id, "value", std::move(value_type),
3132
std::nullopt);
3233
return std::unique_ptr<MapType>(new MapType(std::move(key_field), std::move(value_field)));

‎be/src/vec/exec/format/table/iceberg/types.h

Copy file name to clipboardExpand all lines: be/src/vec/exec/format/table/iceberg/types.h
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,10 @@ class DecimalType : public PrimitiveType {
265265
ss << "decimal(" << precision << ", " << scale << ")";
266266
return ss.str();
267267
}
268+
269+
int get_precision() const { return precision; }
270+
271+
int get_scale() const { return scale; }
268272
};
269273

270274
class BinaryType : public PrimitiveType {

‎be/src/vec/runtime/vparquet_transformer.cpp

Copy file name to clipboardExpand all lines: be/src/vec/runtime/vparquet_transformer.cpp
+29-22Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
#include "vec/core/types.h"
6666
#include "vec/data_types/data_type_decimal.h"
6767
#include "vec/data_types/data_type_nullable.h"
68+
#include "vec/exec/format/table/iceberg/arrow_schema_util.h"
6869
#include "vec/exprs/vexpr.h"
6970
#include "vec/exprs/vexpr_context.h"
7071
#include "vec/functions/function_helpers.h"
@@ -201,21 +202,20 @@ void ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build
201202
}
202203
}
203204

204-
VParquetTransformer::VParquetTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
205-
const VExprContextSPtrs& output_vexpr_ctxs,
206-
std::vector<std::string> column_names,
207-
TParquetCompressionType::type compression_type,
208-
bool parquet_disable_dictionary,
209-
TParquetVersion::type parquet_version,
210-
bool output_object_data,
211-
const std::string* iceberg_schema_json)
205+
VParquetTransformer::VParquetTransformer(
206+
RuntimeState* state, doris::io::FileWriter* file_writer,
207+
const VExprContextSPtrs& output_vexpr_ctxs, std::vector<std::string> column_names,
208+
TParquetCompressionType::type compression_type, bool parquet_disable_dictionary,
209+
TParquetVersion::type parquet_version, bool output_object_data,
210+
const std::string* iceberg_schema_json, const iceberg::Schema* iceberg_schema)
212211
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
213212
_column_names(std::move(column_names)),
214213
_parquet_schemas(nullptr),
215214
_compression_type(compression_type),
216215
_parquet_disable_dictionary(parquet_disable_dictionary),
217216
_parquet_version(parquet_version),
218-
_iceberg_schema_json(iceberg_schema_json) {
217+
_iceberg_schema_json(iceberg_schema_json),
218+
_iceberg_schema(iceberg_schema) {
219219
_outstream = std::shared_ptr<ParquetOutputStream>(new ParquetOutputStream(file_writer));
220220
}
221221

@@ -233,6 +233,7 @@ VParquetTransformer::VParquetTransformer(RuntimeState* state, doris::io::FileWri
233233
_parquet_disable_dictionary(parquet_disable_dictionary),
234234
_parquet_version(parquet_version),
235235
_iceberg_schema_json(iceberg_schema_json) {
236+
_iceberg_schema = nullptr;
236237
_outstream = std::shared_ptr<ParquetOutputStream>(new ParquetOutputStream(file_writer));
237238
}
238239

@@ -264,21 +265,27 @@ Status VParquetTransformer::_parse_properties() {
264265

265266
Status VParquetTransformer::_parse_schema() {
266267
std::vector<std::shared_ptr<arrow::Field>> fields;
267-
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
268-
std::shared_ptr<arrow::DataType> type;
269-
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type,
270-
_state->timezone()));
271-
if (_parquet_schemas != nullptr) {
272-
std::shared_ptr<arrow::Field> field =
273-
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
274-
_output_vexpr_ctxs[i]->root()->is_nullable());
275-
fields.emplace_back(field);
276-
} else {
277-
std::shared_ptr<arrow::Field> field = arrow::field(
278-
_column_names[i], type, _output_vexpr_ctxs[i]->root()->is_nullable());
279-
fields.emplace_back(field);
268+
if (_iceberg_schema != nullptr) {
269+
RETURN_IF_ERROR(
270+
iceberg::ArrowSchemaUtil::convert(_iceberg_schema, _state->timezone(), fields));
271+
} else {
272+
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
273+
std::shared_ptr<arrow::DataType> type;
274+
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type,
275+
_state->timezone()));
276+
if (_parquet_schemas != nullptr) {
277+
std::shared_ptr<arrow::Field> field =
278+
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
279+
_output_vexpr_ctxs[i]->root()->is_nullable());
280+
fields.emplace_back(field);
281+
} else {
282+
std::shared_ptr<arrow::Field> field = arrow::field(
283+
_column_names[i], type, _output_vexpr_ctxs[i]->root()->is_nullable());
284+
fields.emplace_back(field);
285+
}
280286
}
281287
}
288+
282289
if (_iceberg_schema_json != nullptr) {
283290
std::shared_ptr<arrow::KeyValueMetadata> schema_metadata =
284291
arrow::KeyValueMetadata::Make({"iceberg.schema"}, {*_iceberg_schema_json});

‎be/src/vec/runtime/vparquet_transformer.h

Copy file name to clipboardExpand all lines: be/src/vec/runtime/vparquet_transformer.h
+4-1Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <parquet/types.h>
2828
#include <stdint.h>
2929

30+
#include "vec/exec/format/table/iceberg/schema.h"
3031
#include "vfile_format_transformer.h"
3132

3233
namespace doris {
@@ -94,7 +95,8 @@ class VParquetTransformer final : public VFileFormatTransformer {
9495
std::vector<std::string> column_names,
9596
TParquetCompressionType::type compression_type,
9697
bool parquet_disable_dictionary, TParquetVersion::type parquet_version,
97-
bool output_object_data, const std::string* iceberg_schema_json = nullptr);
98+
bool output_object_data, const std::string* iceberg_schema_json = nullptr,
99+
const iceberg::Schema* iceberg_schema = nullptr);
98100

99101
VParquetTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
100102
const VExprContextSPtrs& output_vexpr_ctxs,
@@ -131,6 +133,7 @@ class VParquetTransformer final : public VFileFormatTransformer {
131133
const TParquetVersion::type _parquet_version;
132134
const std::string* _iceberg_schema_json;
133135
uint64_t _write_size = 0;
136+
const iceberg::Schema* _iceberg_schema;
134137
};
135138

136139
} // namespace doris::vectorized

‎be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp

Copy file name to clipboardExpand all lines: be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, RuntimeProfile* profil
8484
_file_format_transformer.reset(new VParquetTransformer(
8585
state, _file_writer.get(), _write_output_expr_ctxs, _write_column_names,
8686
parquet_compression_type, parquet_disable_dictionary, TParquetVersion::PARQUET_1_0,
87-
false, _iceberg_schema_json));
87+
false, _iceberg_schema_json, &_schema));
8888
return _file_format_transformer->open();
8989
}
9090
case TFileFormatType::FORMAT_ORC: {

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.