[Chore](unused) remove QSorter #13769
This commit is contained in:
@ -55,7 +55,6 @@ set(RUNTIME_FILES
|
||||
collection_value.cpp
|
||||
tuple.cpp
|
||||
tuple_row.cpp
|
||||
qsorter.cpp
|
||||
fragment_mgr.cpp
|
||||
dpp_sink_internal.cpp
|
||||
load_path_mgr.cpp
|
||||
|
||||
@ -1,137 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "runtime/qsorter.h"
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
#include "exprs/expr.h"
|
||||
#include "exprs/expr_context.h"
|
||||
#include "runtime/descriptors.h"
|
||||
#include "runtime/raw_value.h"
|
||||
#include "runtime/row_batch.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/tuple_row.h"
|
||||
#include "util/debug_util.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class TupleRowLessThan {
|
||||
public:
|
||||
TupleRowLessThan(std::vector<ExprContext*>& lhs_expr_ctxs,
|
||||
std::vector<ExprContext*>& rhs_expr_ctxs);
|
||||
bool operator()(TupleRow* const& lhs, TupleRow* const& rhs) const;
|
||||
|
||||
private:
|
||||
std::vector<ExprContext*>& _lhs_expr_ctxs;
|
||||
std::vector<ExprContext*>& _rhs_expr_ctxs;
|
||||
};
|
||||
|
||||
TupleRowLessThan::TupleRowLessThan(std::vector<ExprContext*>& lhs_expr_ctxs,
|
||||
std::vector<ExprContext*>& rhs_expr_ctxs)
|
||||
: _lhs_expr_ctxs(lhs_expr_ctxs), _rhs_expr_ctxs(rhs_expr_ctxs) {}
|
||||
|
||||
// Return true only when lhs less than rhs
|
||||
// nullptr is the positive infinite
|
||||
bool TupleRowLessThan::operator()(TupleRow* const& lhs, TupleRow* const& rhs) const {
|
||||
for (int i = 0; i < _lhs_expr_ctxs.size(); ++i) {
|
||||
void* lhs_value = _lhs_expr_ctxs[i]->get_value(lhs);
|
||||
void* rhs_value = _rhs_expr_ctxs[i]->get_value(rhs);
|
||||
|
||||
// nullptr's always go at the end regardless of asc/desc
|
||||
if (lhs_value == nullptr && rhs_value == nullptr) {
|
||||
continue;
|
||||
}
|
||||
if (rhs_value == nullptr) {
|
||||
return false;
|
||||
}
|
||||
if (lhs_value == nullptr) {
|
||||
return true;
|
||||
}
|
||||
|
||||
int result = RawValue::compare(lhs_value, rhs_value, _lhs_expr_ctxs[i]->root()->type());
|
||||
if (result > 0) {
|
||||
return false;
|
||||
} else if (result < 0) {
|
||||
return true;
|
||||
} else {
|
||||
// Otherwise, try the next Expr
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: must return false when two value equal with each other
|
||||
return false;
|
||||
}
|
||||
|
||||
QSorter::QSorter(const RowDescriptor& row_desc, const std::vector<ExprContext*>& order_expr_ctxs,
|
||||
RuntimeState* state)
|
||||
: _row_desc(row_desc), _order_expr_ctxs(order_expr_ctxs), _tuple_pool(new MemPool()) {}
|
||||
|
||||
Status QSorter::prepare(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(Expr::clone_if_not_exists(_order_expr_ctxs, state, &_lhs_expr_ctxs));
|
||||
RETURN_IF_ERROR(Expr::clone_if_not_exists(_order_expr_ctxs, state, &_rhs_expr_ctxs));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Insert if either not at the limit or it's a new TopN tuple_row
|
||||
Status QSorter::insert_tuple_row(TupleRow* input_row) {
|
||||
TupleRow* insert_tuple_row =
|
||||
input_row->deep_copy(_row_desc.tuple_descriptors(), _tuple_pool.get());
|
||||
if (insert_tuple_row == nullptr) {
|
||||
return Status::InternalError("deep copy failed.");
|
||||
}
|
||||
_sorted_rows.push_back(insert_tuple_row);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status QSorter::add_batch(RowBatch* batch) {
|
||||
for (int i = 0; i < batch->num_rows(); ++i) {
|
||||
RETURN_IF_ERROR(insert_tuple_row(batch->get_row(i)));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Reverse result in priority_queue
|
||||
Status QSorter::input_done() {
|
||||
std::sort(_sorted_rows.begin(), _sorted_rows.end(),
|
||||
TupleRowLessThan(_lhs_expr_ctxs, _rhs_expr_ctxs));
|
||||
_next_iter = _sorted_rows.begin();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status QSorter::get_next(RowBatch* batch, bool* eos) {
|
||||
while (!batch->is_full() && (_next_iter != _sorted_rows.end())) {
|
||||
int row_idx = batch->add_row();
|
||||
TupleRow* dst_row = batch->get_row(row_idx);
|
||||
TupleRow* src_row = *_next_iter;
|
||||
batch->copy_row(src_row, dst_row);
|
||||
++_next_iter;
|
||||
batch->commit_last_row();
|
||||
}
|
||||
|
||||
*eos = _next_iter == _sorted_rows.end();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status QSorter::close(RuntimeState* state) {
|
||||
_tuple_pool.reset();
|
||||
Expr::close(_lhs_expr_ctxs, state);
|
||||
Expr::close(_rhs_expr_ctxs, state);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
@ -1,75 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "runtime/sorter.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class ExprContext;
|
||||
class RowBatch;
|
||||
class RowDescriptor;
|
||||
class RuntimeState;
|
||||
class TupleRow;
|
||||
class MemPool;
|
||||
|
||||
// This sorter use memory heap to sort data added.
|
||||
// So when data is too large, 'add_batch' will return failure
|
||||
class QSorter : public Sorter {
|
||||
public:
|
||||
QSorter(const RowDescriptor& row_desc, const std::vector<ExprContext*>& order_expr_ctxs,
|
||||
RuntimeState* state);
|
||||
|
||||
virtual ~QSorter() {}
|
||||
|
||||
virtual Status prepare(RuntimeState* state);
|
||||
|
||||
// Add data to be sorted.
|
||||
virtual Status add_batch(RowBatch* batch);
|
||||
|
||||
// call when all data be added
|
||||
virtual Status input_done();
|
||||
|
||||
// fetch data already sorted,
|
||||
// client must insure that call this function AFTER call input_done
|
||||
virtual Status get_next(RowBatch* batch, bool* eos);
|
||||
|
||||
virtual Status close(RuntimeState* state);
|
||||
// hll merge will create
|
||||
MemPool* get_mem_pool() { return _tuple_pool.get(); };
|
||||
|
||||
private:
|
||||
Status insert_tuple_row(TupleRow* input_row);
|
||||
|
||||
const RowDescriptor& _row_desc;
|
||||
const std::vector<ExprContext*>& _order_expr_ctxs;
|
||||
std::vector<ExprContext*> _lhs_expr_ctxs;
|
||||
std::vector<ExprContext*> _rhs_expr_ctxs;
|
||||
|
||||
// After computing the TopN in the priority_queue, pop them and put them in this vector
|
||||
std::vector<TupleRow*> _sorted_rows;
|
||||
std::vector<TupleRow*>::iterator _next_iter;
|
||||
|
||||
// Stores everything referenced in _priority_queue
|
||||
std::unique_ptr<MemPool> _tuple_pool;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
@ -220,7 +220,6 @@ set(RUNTIME_TEST_FILES
|
||||
# runtime/tmp_file_mgr_test.cpp
|
||||
# runtime/disk_io_mgr_test.cpp
|
||||
# runtime/thread_resource_mgr_test.cpp
|
||||
# runtime/qsorter_test.cpp
|
||||
# runtime/buffered_block_mgr2_test.cpp
|
||||
# runtime/buffered_tuple_stream2_test.cpp
|
||||
# runtime/export_task_mgr_test.cpp
|
||||
|
||||
@ -1,279 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "runtime/qsorter.h"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "common/object_pool.h"
|
||||
#include "exprs/expr.h"
|
||||
#include "gen_cpp/Descriptors_types.h"
|
||||
#include "runtime/descriptors.h"
|
||||
#include "runtime/mem_pool.h"
|
||||
#include "runtime/row_batch.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/tuple.h"
|
||||
#include "runtime/tuple_row.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class QSorterTest : public testing::Test {
|
||||
public:
|
||||
QSorterTest() {
|
||||
init_desc_tbl();
|
||||
init_row_desc();
|
||||
init_runtime_state();
|
||||
init_order_expr();
|
||||
}
|
||||
|
||||
~QSorterTest() {}
|
||||
|
||||
void init_desc_tbl();
|
||||
|
||||
void init_row_desc();
|
||||
|
||||
void init_order_expr();
|
||||
|
||||
void init_runtime_state();
|
||||
|
||||
protected:
|
||||
virtual void SetUp() {}
|
||||
|
||||
virtual void TearDown() {}
|
||||
|
||||
private:
|
||||
ObjectPool _obj_pool;
|
||||
TDescriptorTable _t_desc_tbl;
|
||||
DescriptorTbl* _desc_tbl;
|
||||
RowDescriptor* _row_desc;
|
||||
std::vector<Expr*> _order_expr;
|
||||
RuntimeState* _state;
|
||||
MemPool _tuple_pool;
|
||||
};
|
||||
|
||||
void QSorterTest::init_runtime_state() {
|
||||
_state = _obj_pool.add(new RuntimeState("2011-10-01 12:34:56"));
|
||||
_state->set_desc_tbl(_desc_tbl);
|
||||
}
|
||||
|
||||
void QSorterTest::init_order_expr() {
|
||||
SlotRef* slot = _obj_pool.add(new SlotRef(_desc_tbl->get_slot_descriptor(0)));
|
||||
slot->prepare(_state, *_row_desc);
|
||||
_order_expr.push_back(slot);
|
||||
slot = _obj_pool.add(new SlotRef(_desc_tbl->get_slot_descriptor(1)));
|
||||
slot->prepare(_state, *_row_desc);
|
||||
_order_expr.push_back(slot);
|
||||
}
|
||||
|
||||
void QSorterTest::init_row_desc() {
|
||||
std::vector<TTupleId> row_tuples;
|
||||
row_tuples.push_back(0);
|
||||
std::vector<bool> nullable_tuples;
|
||||
nullable_tuples.push_back(false);
|
||||
|
||||
_row_desc = _obj_pool.add(new RowDescriptor(*_desc_tbl, row_tuples, nullable_tuples));
|
||||
}
|
||||
|
||||
void QSorterTest::init_desc_tbl() {
|
||||
// slot desc
|
||||
std::vector<TSlotDescriptor> slot_descs;
|
||||
|
||||
// 1 byte null, 4 byte int, 4 byte int
|
||||
// slot 0
|
||||
{
|
||||
TSlotDescriptor t_slot_desc;
|
||||
t_slot_desc.__set_id(0);
|
||||
t_slot_desc.__set_parent(0);
|
||||
t_slot_desc.__set_slotType(TPrimitiveType::INT);
|
||||
t_slot_desc.__set_columnPos(0);
|
||||
t_slot_desc.__set_byteOffset(1);
|
||||
t_slot_desc.__set_nullIndicatorByte(0);
|
||||
t_slot_desc.__set_nullIndicatorBit(1);
|
||||
t_slot_desc.__set_colName("col1");
|
||||
t_slot_desc.__set_slotIdx(0);
|
||||
t_slot_desc.__set_isMaterialized(true);
|
||||
|
||||
slot_descs.push_back(t_slot_desc);
|
||||
}
|
||||
// slot 1
|
||||
{
|
||||
TSlotDescriptor t_slot_desc;
|
||||
t_slot_desc.__set_id(1);
|
||||
t_slot_desc.__set_parent(0);
|
||||
t_slot_desc.__set_slotType(TPrimitiveType::INT);
|
||||
t_slot_desc.__set_columnPos(1);
|
||||
t_slot_desc.__set_byteOffset(5);
|
||||
t_slot_desc.__set_nullIndicatorByte(0);
|
||||
t_slot_desc.__set_nullIndicatorBit(2);
|
||||
t_slot_desc.__set_colName("col2");
|
||||
t_slot_desc.__set_slotIdx(1);
|
||||
t_slot_desc.__set_isMaterialized(true);
|
||||
|
||||
slot_descs.push_back(t_slot_desc);
|
||||
}
|
||||
|
||||
_t_desc_tbl.__set_slotDescriptors(slot_descs);
|
||||
|
||||
// tuple desc
|
||||
std::vector<TTupleDescriptor> tuple_descs;
|
||||
// tuple 0
|
||||
{
|
||||
TTupleDescriptor t_tuple_desc;
|
||||
|
||||
t_tuple_desc.__set_id(0);
|
||||
t_tuple_desc.__set_byteSize(9);
|
||||
t_tuple_desc.__set_numNullBytes(1);
|
||||
t_tuple_desc.__set_tableId(0);
|
||||
|
||||
tuple_descs.push_back(t_tuple_desc);
|
||||
}
|
||||
_t_desc_tbl.__set_tupleDescriptors(tuple_descs);
|
||||
|
||||
// table
|
||||
std::vector<TTableDescriptor> table_descs;
|
||||
// table 0
|
||||
{
|
||||
TTableDescriptor t_table_desc;
|
||||
|
||||
t_table_desc.__set_id(0);
|
||||
t_table_desc.__set_tableType(TTableType::MYSQL_TABLE);
|
||||
t_table_desc.__set_numCols(2);
|
||||
t_table_desc.__set_numClusteringCols(2);
|
||||
t_table_desc.__set_tableName("test_tbl");
|
||||
t_table_desc.__set_dbName("test_db");
|
||||
|
||||
TMySQLTable mysql_table;
|
||||
t_table_desc.__set_mysqlTable(mysql_table);
|
||||
|
||||
table_descs.push_back(t_table_desc);
|
||||
}
|
||||
_t_desc_tbl.__set_tableDescriptors(table_descs);
|
||||
|
||||
DescriptorTbl::create(&_obj_pool, _t_desc_tbl, &_desc_tbl);
|
||||
}
|
||||
|
||||
TEST_F(QSorterTest, normalCase) {
|
||||
RowBatch batch(*_row_desc, 1024);
|
||||
|
||||
// 5, 100
|
||||
{
|
||||
batch.add_row();
|
||||
TupleRow* row = batch.get_row(batch.num_rows());
|
||||
Tuple* tuple = Tuple::create(9, &_tuple_pool);
|
||||
row->set_tuple(0, tuple);
|
||||
char* pos = (char*)tuple;
|
||||
memset(pos, 0, 9);
|
||||
*(int*)(pos + 1) = 5;
|
||||
*(int*)(pos + 5) = 100;
|
||||
batch.commit_last_row();
|
||||
}
|
||||
|
||||
// 1, 10
|
||||
{
|
||||
batch.add_row();
|
||||
TupleRow* row = batch.get_row(batch.num_rows());
|
||||
Tuple* tuple = Tuple::create(9, &_tuple_pool);
|
||||
row->set_tuple(0, tuple);
|
||||
char* pos = (char*)tuple;
|
||||
memset(pos, 0, 9);
|
||||
*(int*)(pos + 1) = 1;
|
||||
*(int*)(pos + 5) = 10;
|
||||
batch.commit_last_row();
|
||||
}
|
||||
|
||||
// 5, 5
|
||||
{
|
||||
batch.add_row();
|
||||
TupleRow* row = batch.get_row(batch.num_rows());
|
||||
Tuple* tuple = Tuple::create(9, &_tuple_pool);
|
||||
row->set_tuple(0, tuple);
|
||||
char* pos = (char*)tuple;
|
||||
memset(pos, 0, 9);
|
||||
*(int*)(pos + 1) = 5;
|
||||
*(int*)(pos + 5) = 5;
|
||||
batch.commit_last_row();
|
||||
}
|
||||
|
||||
// 1000, 5
|
||||
{
|
||||
batch.add_row();
|
||||
TupleRow* row = batch.get_row(batch.num_rows());
|
||||
Tuple* tuple = Tuple::create(9, &_tuple_pool);
|
||||
row->set_tuple(0, tuple);
|
||||
char* pos = (char*)tuple;
|
||||
memset(pos, 0, 9);
|
||||
*(int*)(pos + 1) = 10000;
|
||||
*(int*)(pos + 5) = 5;
|
||||
batch.commit_last_row();
|
||||
}
|
||||
|
||||
// 0, 195
|
||||
{
|
||||
batch.add_row();
|
||||
TupleRow* row = batch.get_row(batch.num_rows());
|
||||
Tuple* tuple = Tuple::create(9, &_tuple_pool);
|
||||
row->set_tuple(0, tuple);
|
||||
char* pos = (char*)tuple;
|
||||
memset(pos, 0, 9);
|
||||
*(int*)(pos + 1) = 0;
|
||||
*(int*)(pos + 5) = 195;
|
||||
batch.commit_last_row();
|
||||
}
|
||||
|
||||
QSorter sorter(*_row_desc, _order_expr);
|
||||
|
||||
EXPECT_TRUE(sorter.prepare(_state).ok());
|
||||
EXPECT_TRUE(sorter.add_batch(&batch).ok());
|
||||
EXPECT_TRUE(sorter.input_done().ok());
|
||||
|
||||
RowBatch result(*_row_desc, 1024);
|
||||
bool eos = false;
|
||||
EXPECT_TRUE(sorter.get_next(&result, &eos).ok());
|
||||
EXPECT_TRUE(eos);
|
||||
EXPECT_EQ(5, result.num_rows());
|
||||
|
||||
// 0, 195
|
||||
{
|
||||
EXPECT_EQ(0, *(int*)_order_expr[0]->get_value(result.get_row(0)));
|
||||
EXPECT_EQ(195, *(int*)_order_expr[1]->get_value(result.get_row(0)));
|
||||
}
|
||||
// 1, 10
|
||||
{
|
||||
EXPECT_EQ(1, *(int*)_order_expr[0]->get_value(result.get_row(1)));
|
||||
EXPECT_EQ(10, *(int*)_order_expr[1]->get_value(result.get_row(1)));
|
||||
}
|
||||
// 5, 5
|
||||
{
|
||||
EXPECT_EQ(5, *(int*)_order_expr[0]->get_value(result.get_row(2)));
|
||||
EXPECT_EQ(5, *(int*)_order_expr[1]->get_value(result.get_row(2)));
|
||||
}
|
||||
// 5, 100
|
||||
{
|
||||
EXPECT_EQ(5, *(int*)_order_expr[0]->get_value(result.get_row(3)));
|
||||
EXPECT_EQ(100, *(int*)_order_expr[1]->get_value(result.get_row(3)));
|
||||
}
|
||||
// 10000, 5
|
||||
{
|
||||
EXPECT_EQ(10000, *(int*)_order_expr[0]->get_value(result.get_row(4)));
|
||||
EXPECT_EQ(5, *(int*)_order_expr[1]->get_value(result.get_row(4)));
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
Reference in New Issue
Block a user