Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 210df93

Browse filesBrowse files
committed
opt source code and add tests.
1 parent 079a064 commit 210df93
Copy full SHA for 210df93

File tree

13 files changed

+2404
-27
lines changed
Filter options

13 files changed

+2404
-27
lines changed

‎be/src/vec/exec/format/parquet/parquet_common.cpp

Copy file name to clipboardExpand all lines: be/src/vec/exec/format/parquet/parquet_common.cpp
+8-5Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,11 @@ bool FilterMap::can_filter_all(size_t remaining_num_values, size_t filter_map_in
7878
Status FilterMap::generate_nested_filter_map(const std::vector<level_t>& rep_levels,
7979
std::vector<uint8_t>& nested_filter_map_data,
8080
std::unique_ptr<FilterMap>* nested_filter_map,
81-
size_t* current_row_ptr, bool is_cross_page,
82-
size_t start_index) const {
81+
size_t* current_row_ptr, size_t start_index) const {
8382
if (!has_filter() || filter_all()) {
84-
*nested_filter_map = std::make_unique<FilterMap>();
85-
return Status::OK();
83+
return Status::InternalError(fmt::format(
84+
"FilterMap::generate_nested_filter_map failed: has_filter={}, filter_all={}",
85+
has_filter(), filter_all()));
8686
}
8787

8888
if (rep_levels.empty()) {
@@ -91,10 +91,13 @@ Status FilterMap::generate_nested_filter_map(const std::vector<level_t>& rep_lev
9191

9292
nested_filter_map_data.resize(rep_levels.size());
9393

94+
fprintf(stderr, "nested_filter_map_data.size() in generate_nested_filter_map: %ld\n",
95+
nested_filter_map_data.size());
96+
9497
size_t current_row = current_row_ptr ? *current_row_ptr : 0;
9598

9699
for (size_t i = start_index; i < rep_levels.size(); i++) {
97-
if (!is_cross_page && i > start_index && rep_levels[i] == 0) {
100+
if (i != start_index && rep_levels[i] == 0) {
98101
current_row++;
99102
if (current_row >= _filter_map_size) {
100103
return Status::InvalidArgument(

‎be/src/vec/exec/format/parquet/parquet_common.h

Copy file name to clipboardExpand all lines: be/src/vec/exec/format/parquet/parquet_common.h
+1-3Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,7 @@ class FilterMap {
7777
Status generate_nested_filter_map(const std::vector<level_t>& rep_levels,
7878
std::vector<uint8_t>& nested_filter_map_data,
7979
std::unique_ptr<FilterMap>* nested_filter_map,
80-
size_t* current_row_ptr, // 当前处理到哪一行
81-
bool is_cross_page, // 是否是跨页的情况
82-
size_t start_index = 0) const; // rep_levels的起始处理位置
80+
size_t* current_row_ptr, size_t start_index = 0) const;
8381

8482
const uint8_t* filter_map_data() const { return _filter_map_data; }
8583
size_t filter_map_size() const { return _filter_map_size; }

‎be/src/vec/exec/format/parquet/vparquet_column_reader.cpp

Copy file name to clipboardExpand all lines: be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+25-16Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -323,9 +323,6 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
323323
size_t* read_rows, bool* eof, bool is_dict_filter,
324324
bool align_rows) {
325325
std::unique_ptr<FilterMap> nested_filter_map;
326-
std::unique_ptr<std::vector<uint8_t>> nested_filter_map_data;
327-
328-
size_t current_row;
329326

330327
FilterMap* current_filter_map = &filter_map;
331328
size_t origin_size = 0;
@@ -337,17 +334,22 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
337334
} else {
338335
_rep_levels.resize(0);
339336
_def_levels.resize(0);
337+
if (_nested_filter_map_data) {
338+
_nested_filter_map_data->resize(0);
339+
}
340340
}
341341
size_t parsed_rows = 0;
342342
size_t remaining_values = _chunk_reader->remaining_num_values();
343343
bool has_rep_level = _chunk_reader->max_rep_level() > 0;
344344
bool has_def_level = _chunk_reader->max_def_level() > 0;
345345

346+
// Handle repetition levels (indicates nesting structure)
346347
if (has_rep_level) {
347348
LevelDecoder& rep_decoder = _chunk_reader->rep_level_decoder();
349+
// Read repetition levels until batch is full or no more values
348350
while (parsed_rows <= batch_size && remaining_values > 0) {
349351
level_t rep_level = rep_decoder.get_next();
350-
if (rep_level == 0) {
352+
if (rep_level == 0) { // rep_level 0 indicates start of new row
351353
if (parsed_rows == batch_size) {
352354
rep_decoder.rewind_one();
353355
break;
@@ -358,13 +360,15 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
358360
remaining_values--;
359361
}
360362

361-
if (filter_map.has_filter()) {
362-
nested_filter_map_data = std::make_unique<std::vector<uint8_t>>();
363-
nested_filter_map_data->resize(_rep_levels.size());
364-
current_row = _orig_filter_map_index;
363+
// Generate nested filter map
364+
if (filter_map.has_filter() && (!filter_map.filter_all())) {
365+
if (_nested_filter_map_data == nullptr) {
366+
_nested_filter_map_data.reset(new std::vector<uint8_t>());
367+
}
365368
RETURN_IF_ERROR(filter_map.generate_nested_filter_map(
366-
_rep_levels, *nested_filter_map_data, &nested_filter_map, &current_row, false,
367-
0));
369+
_rep_levels, *_nested_filter_map_data, &nested_filter_map,
370+
&_orig_filter_map_index, origin_size));
371+
// Update current_filter_map to nested_filter_map
368372
current_filter_map = nested_filter_map.get();
369373
}
370374
} else if (!align_rows) {
@@ -374,15 +378,16 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
374378
_rep_levels.resize(parsed_rows, 0);
375379
}
376380

381+
// Process definition levels (indicates null values)
377382
size_t parsed_values = _chunk_reader->remaining_num_values() - remaining_values;
378-
379383
_def_levels.resize(origin_size + parsed_values);
380384
if (has_def_level) {
381385
_chunk_reader->def_level_decoder().get_levels(&_def_levels[origin_size], parsed_values);
382386
} else {
383387
std::fill(_def_levels.begin() + origin_size, _def_levels.end(), 0);
384388
}
385389

390+
// Handle nullable columns
386391
MutableColumnPtr data_column;
387392
std::vector<uint16_t> null_map;
388393
NullMap* map_data_column = nullptr;
@@ -399,6 +404,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
399404
data_column = doris_column->assume_mutable();
400405
}
401406

407+
// Process definition levels to build null map
402408
size_t has_read = origin_size;
403409
size_t ancestor_nulls = 0;
404410
size_t null_size = 0;
@@ -445,7 +451,9 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
445451

446452
size_t num_values = parsed_values - ancestor_nulls;
447453

454+
// Handle filtered values
448455
if (current_filter_map->filter_all()) {
456+
// Skip all values if everything is filtered
449457
if (null_size > 0) {
450458
RETURN_IF_ERROR(_chunk_reader->skip_values(null_size, false));
451459
}
@@ -470,11 +478,10 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
470478
RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_nulls, false));
471479
}
472480
}
473-
474-
if (!align_rows) {
475-
*read_rows = parsed_rows;
476-
}
481+
*read_rows += parsed_rows;
477482
_filter_map_index += parsed_values;
483+
484+
// Handle cross-page reading
478485
if (_chunk_reader->remaining_num_values() == 0) {
479486
if (_chunk_reader->has_next_page()) {
480487
RETURN_IF_ERROR(_chunk_reader->next_page());
@@ -486,6 +493,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
486493
}
487494
}
488495

496+
// Apply filtering to repetition and definition levels
489497
if (current_filter_map->has_filter()) {
490498
if (current_filter_map->filter_all()) {
491499
_rep_levels.resize(0);
@@ -510,7 +518,8 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
510518
}
511519
}
512520

513-
_orig_filter_map_index = current_row + 1;
521+
// Prepare for next row
522+
++_orig_filter_map_index;
514523

515524
if (_rep_levels.size() > 0) {
516525
// make sure the rows of complex type are aligned correctly,

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.