Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit a82a456

Browse filesBrowse files
committed
opt source code and add tests.
1 parent 079a064 commit a82a456
Copy full SHA for a82a456

File tree

14 files changed

+2403
-28
lines changed
Filter options

14 files changed

+2403
-28
lines changed

‎be/src/vec/exec/format/parquet/parquet_common.cpp

Copy file name to clipboardExpand all lines: be/src/vec/exec/format/parquet/parquet_common.cpp
+5-5
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,11 @@ bool FilterMap::can_filter_all(size_t remaining_num_values, size_t filter_map_in
7878
Status FilterMap::generate_nested_filter_map(const std::vector<level_t>& rep_levels,
7979
std::vector<uint8_t>& nested_filter_map_data,
8080
std::unique_ptr<FilterMap>* nested_filter_map,
81-
size_t* current_row_ptr, bool is_cross_page,
82-
size_t start_index) const {
81+
size_t* current_row_ptr, size_t start_index) const {
8382
if (!has_filter() || filter_all()) {
84-
*nested_filter_map = std::make_unique<FilterMap>();
85-
return Status::OK();
83+
return Status::InternalError(fmt::format(
84+
"FilterMap::generate_nested_filter_map failed: has_filter={}, filter_all={}",
85+
has_filter(), filter_all()));
8686
}
8787

8888
if (rep_levels.empty()) {
@@ -94,7 +94,7 @@ Status FilterMap::generate_nested_filter_map(const std::vector<level_t>& rep_lev
9494
size_t current_row = current_row_ptr ? *current_row_ptr : 0;
9595

9696
for (size_t i = start_index; i < rep_levels.size(); i++) {
97-
if (!is_cross_page && i > start_index && rep_levels[i] == 0) {
97+
if (i != start_index && rep_levels[i] == 0) {
9898
current_row++;
9999
if (current_row >= _filter_map_size) {
100100
return Status::InvalidArgument(

‎be/src/vec/exec/format/parquet/parquet_common.h

Copy file name to clipboardExpand all lines: be/src/vec/exec/format/parquet/parquet_common.h
+1-3
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,7 @@ class FilterMap {
7777
Status generate_nested_filter_map(const std::vector<level_t>& rep_levels,
7878
std::vector<uint8_t>& nested_filter_map_data,
7979
std::unique_ptr<FilterMap>* nested_filter_map,
80-
size_t* current_row_ptr, // 当前处理到哪一行
81-
bool is_cross_page, // 是否是跨页的情况
82-
size_t start_index = 0) const; // rep_levels的起始处理位置
80+
size_t* current_row_ptr, size_t start_index = 0) const;
8381

8482
const uint8_t* filter_map_data() const { return _filter_map_data; }
8583
size_t filter_map_size() const { return _filter_map_size; }

‎be/src/vec/exec/format/parquet/vparquet_column_reader.cpp

Copy file name to clipboardExpand all lines: be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+26-17
Original file line numberDiff line numberDiff line change
@@ -323,9 +323,6 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
323323
size_t* read_rows, bool* eof, bool is_dict_filter,
324324
bool align_rows) {
325325
std::unique_ptr<FilterMap> nested_filter_map;
326-
std::unique_ptr<std::vector<uint8_t>> nested_filter_map_data;
327-
328-
size_t current_row;
329326

330327
FilterMap* current_filter_map = &filter_map;
331328
size_t origin_size = 0;
@@ -337,17 +334,22 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
337334
} else {
338335
_rep_levels.resize(0);
339336
_def_levels.resize(0);
337+
if (_nested_filter_map_data) {
338+
_nested_filter_map_data->resize(0);
339+
}
340340
}
341341
size_t parsed_rows = 0;
342342
size_t remaining_values = _chunk_reader->remaining_num_values();
343343
bool has_rep_level = _chunk_reader->max_rep_level() > 0;
344344
bool has_def_level = _chunk_reader->max_def_level() > 0;
345345

346+
// Handle repetition levels (indicates nesting structure)
346347
if (has_rep_level) {
347348
LevelDecoder& rep_decoder = _chunk_reader->rep_level_decoder();
349+
// Read repetition levels until batch is full or no more values
348350
while (parsed_rows <= batch_size && remaining_values > 0) {
349351
level_t rep_level = rep_decoder.get_next();
350-
if (rep_level == 0) {
352+
if (rep_level == 0) { // rep_level 0 indicates start of new row
351353
if (parsed_rows == batch_size) {
352354
rep_decoder.rewind_one();
353355
break;
@@ -358,13 +360,15 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
358360
remaining_values--;
359361
}
360362

361-
if (filter_map.has_filter()) {
362-
nested_filter_map_data = std::make_unique<std::vector<uint8_t>>();
363-
nested_filter_map_data->resize(_rep_levels.size());
364-
current_row = _orig_filter_map_index;
363+
// Generate nested filter map
364+
if (filter_map.has_filter() && (!filter_map.filter_all())) {
365+
if (_nested_filter_map_data == nullptr) {
366+
_nested_filter_map_data.reset(new std::vector<uint8_t>());
367+
}
365368
RETURN_IF_ERROR(filter_map.generate_nested_filter_map(
366-
_rep_levels, *nested_filter_map_data, &nested_filter_map, &current_row, false,
367-
0));
369+
_rep_levels, *_nested_filter_map_data, &nested_filter_map,
370+
&_orig_filter_map_index, origin_size));
371+
// Update current_filter_map to nested_filter_map
368372
current_filter_map = nested_filter_map.get();
369373
}
370374
} else if (!align_rows) {
@@ -374,15 +378,16 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
374378
_rep_levels.resize(parsed_rows, 0);
375379
}
376380

381+
// Process definition levels (indicates null values)
377382
size_t parsed_values = _chunk_reader->remaining_num_values() - remaining_values;
378-
379383
_def_levels.resize(origin_size + parsed_values);
380384
if (has_def_level) {
381385
_chunk_reader->def_level_decoder().get_levels(&_def_levels[origin_size], parsed_values);
382386
} else {
383387
std::fill(_def_levels.begin() + origin_size, _def_levels.end(), 0);
384388
}
385389

390+
// Handle nullable columns
386391
MutableColumnPtr data_column;
387392
std::vector<uint16_t> null_map;
388393
NullMap* map_data_column = nullptr;
@@ -399,6 +404,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
399404
data_column = doris_column->assume_mutable();
400405
}
401406

407+
// Process definition levels to build null map
402408
size_t has_read = origin_size;
403409
size_t ancestor_nulls = 0;
404410
size_t null_size = 0;
@@ -445,7 +451,9 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
445451

446452
size_t num_values = parsed_values - ancestor_nulls;
447453

454+
// Handle filtered values
448455
if (current_filter_map->filter_all()) {
456+
// Skip all values if everything is filtered
449457
if (null_size > 0) {
450458
RETURN_IF_ERROR(_chunk_reader->skip_values(null_size, false));
451459
}
@@ -461,7 +469,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
461469
SCOPED_RAW_TIMER(&_decode_null_map_time);
462470
RETURN_IF_ERROR(
463471
select_vector.init(null_map, num_values, map_data_column, current_filter_map,
464-
nested_filter_map_data ? origin_size : _filter_map_index));
472+
_nested_filter_map_data ? origin_size : _filter_map_index));
465473
}
466474

467475
RETURN_IF_ERROR(
@@ -470,11 +478,10 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
470478
RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_nulls, false));
471479
}
472480
}
473-
474-
if (!align_rows) {
475-
*read_rows = parsed_rows;
476-
}
481+
*read_rows += parsed_rows;
477482
_filter_map_index += parsed_values;
483+
484+
// Handle cross-page reading
478485
if (_chunk_reader->remaining_num_values() == 0) {
479486
if (_chunk_reader->has_next_page()) {
480487
RETURN_IF_ERROR(_chunk_reader->next_page());
@@ -486,6 +493,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
486493
}
487494
}
488495

496+
// Apply filtering to repetition and definition levels
489497
if (current_filter_map->has_filter()) {
490498
if (current_filter_map->filter_all()) {
491499
_rep_levels.resize(0);
@@ -510,7 +518,8 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
510518
}
511519
}
512520

513-
_orig_filter_map_index = current_row + 1;
521+
// Prepare for next row
522+
++_orig_filter_map_index;
514523

515524
if (_rep_levels.size() > 0) {
516525
// make sure the rows of complex type are aligned correctly,

‎be/src/vec/exec/format/parquet/vparquet_column_reader.h

Copy file name to clipboardExpand all lines: be/src/vec/exec/format/parquet/vparquet_column_reader.h
+1
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ class ScalarColumnReader : public ParquetColumnReader {
201201
std::vector<level_t> _rep_levels;
202202
std::vector<level_t> _def_levels;
203203
std::unique_ptr<parquet::PhysicalToLogicalConverter> _converter = nullptr;
204+
std::unique_ptr<std::vector<uint8_t>> _nested_filter_map_data = nullptr;
204205
size_t _orig_filter_map_index = 0;
205206

206207
Status _skip_values(size_t num_values);

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.