StarRocks Segment源码阅读笔记--SegmentIterator创建

发布时间 2023-07-13 15:24:21作者: 飞舞的小蛇

StarRocks中要读取Segment中的数据,需要先创建SegmentIterator

StatusOr<ChunkIteratorPtr> Segment::_new_iterator(const Schema& schema, const SegmentReadOptions& read_options) {
    DCHECK(read_options.stats != nullptr);
    // trying to prune the current segment by segment-level zone map
    //这里会根据读操作中包含的谓词并结合segment级别的zone map信息来对segemnt进行过滤
    for (const auto& pair : read_options.predicates_for_zone_map) {
        ColumnId column_id = pair.first;
        if (_column_readers[column_id] == nullptr || !_column_readers[column_id]->has_zone_map()) {
            continue;
        }//将过滤条件传给column reader,通过segment_zone_map_filter过滤
        if (!_column_readers[column_id]->segment_zone_map_filter(pair.second)) {
            read_options.stats->segment_stats_filtered += _column_readers[column_id]->num_rows();
            return Status::EndOfFile(strings::Substitute("End of file $0, empty iterator", _fname));
        }
    }
    return new_segment_iterator(shared_from_this(), schema, read_options);
}
segment_zone_map_filter过滤的逻辑如下所示
bool ColumnReader::segment_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates) const {
    if (_segment_zone_map == nullptr) {
        return true;
    }
    ZoneMapDetail detail;
    /*通过_parse_zone_map将ZoneMapPB结构转成ZoneMapDetail结构,包含
    bool _has_null;
    Datum _null_value;
    Datum _min_value;
    Datum _max_value;
    size_t _num_rows;
    */
    _parse_zone_map(*_segment_zone_map, &detail);
    auto filter = [&](const ColumnPredicate* pred) { return pred->zone_map_filter(detail); };
    return std::all_of(predicates.begin(), predicates.end(), filter);
}

上面的zone_map_filter,不同的谓词会调用自己类中不同的zone_map_filter函数来根据detail进行过滤,最后根据所有谓词过滤的结果决定是否要读取这个segment的数据,

返回false表示不需要读取,即该segment中的数据都是不满足条件的,返回true表示需要读取,即该segment中有满足谓词条件的数据。

如果该segment的数据不需要读取,根据下面的代码,直接返回了

return Status::EndOfFile(strings::Substitute("End of file $0, empty iterator", _fname));

如果该segemnt的数据需要读取,则调用new_segment_iterator

ChunkIteratorPtr new_segment_iterator(const std::shared_ptr<Segment>& segment, const Schema& schema,
                                      const SegmentReadOptions& options) {
    if (options.predicates.empty() || options.predicates.size() >= schema.num_fields()) {
    //如果谓词为空或者谓词的数量超过表中列的数量,直接创建SegmentIterator
        return std::make_shared<SegmentIterator>(segment, schema, options);
    } else {
    //否则,先对表中的列进行排序,把经过谓词过滤的列放到前面,没有经过谓词过滤的列放在后面,便于更容易处理晚期物化。
        Schema ordered_schema = reorder_schema(schema, options.predicates);
        auto seg_iter = std::make_shared<SegmentIterator>(segment, ordered_schema, options);
        return new_projection_iterator(schema, seg_iter);
    }
}
reorder_schema负责将经过谓词过滤的列放在前面,代码实现如下:
inline Schema reorder_schema(const Schema& input, const std::unordered_map<ColumnId, PredicateList>& predicates) {
    const std::vector<FieldPtr>& fields = input.fields();

    Schema output;
    output.reserve(fields.size());
    for (const auto& field : fields) {
        if (predicates.count(field->id())) {//谓词中包含的列
            output.append(field);
        }
    }
    for (const auto& field : fields) {
        if (!predicates.count(field->id())) {//谓词中不包含的列
            output.append(field);
        }
    }
    return output;
}

对于经过reorder的schema和没有经过reorder的schema,返回的对象类型不同

经过reorder的返回的是ProjectionIterator

未经过reorder的返回的是SegmentIterator

上面这两个类都是ChunkIterator的子类,区别在于ProjectionIterator从SegmentIterator中获取chunk,并选择排序后的schema中包含的列,schema中指定的列顺序可能和SegmentIterator中指定的列顺序不一致,因为经过了reorder,比如SegmentIterator返回的列顺序是c1、c2、c3,ProjectionIterator可能返回的列顺序是c3、c1。

至此SegmentIterator的创建过程完毕。