[FEAT MERGE] optimizer statistics gather enhance
Co-authored-by: Larry955 <1412857955@qq.com> Co-authored-by: wangt1xiuyi <13547954130@163.com>
This commit is contained in:
@ -229,7 +229,6 @@ int ObChunkDatumStore::StoredRow::build(StoredRow *&sr,
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int ObChunkDatumStore::Block::add_row(const common::ObIArray<ObExpr*> &exprs, ObEvalCtx &ctx,
|
||||
const int64_t row_size, uint32_t row_extend_size, StoredRow **stored_row)
|
||||
{
|
||||
@ -342,6 +341,42 @@ int ObChunkDatumStore::Block::copy_stored_row(const StoredRow &stored_row, Store
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObChunkDatumStore::Block::copy_datums(const ObDatum *datums, const int64_t cnt,
|
||||
const int64_t extra_size, StoredRow **dst_sr)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
BlockBuffer *buf = get_buffer();
|
||||
int64_t head_size = sizeof(StoredRow);
|
||||
int64_t datum_size = sizeof(ObDatum) * cnt;
|
||||
int64_t row_size = head_size + sizeof(ObDatum) * cnt + extra_size;
|
||||
if (!buf->is_inited()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(buf), K(row_size));
|
||||
} else {
|
||||
StoredRow *sr = new (buf->head())StoredRow;
|
||||
sr->cnt_ = cnt;
|
||||
MEMCPY(sr->payload_, static_cast<const void*>(datums), datum_size);
|
||||
char* data_start = sr->payload_ + datum_size + extra_size;
|
||||
int64_t pos = 0;
|
||||
for (int64_t i = 0; i < cnt; ++i) {
|
||||
MEMCPY(data_start + pos, datums[i].ptr_, datums[i].len_);
|
||||
sr->cells()[i].ptr_ = data_start + pos;
|
||||
pos += datums[i].len_;
|
||||
row_size += datums[i].len_;
|
||||
}
|
||||
sr->row_size_ = row_size;
|
||||
if (OB_FAIL(buf->advance(row_size))) {
|
||||
LOG_WARN("fill buffer head failed", K(ret), K(buf), K(row_size));
|
||||
} else {
|
||||
rows_++;
|
||||
if (nullptr != dst_sr) {
|
||||
*dst_sr = sr;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
//the memory of shadow stored row is not continuous,
|
||||
//so you cannot directly copy the memory of the entire stored row,
|
||||
//and you should make a deep copy of each datum in turn
|
||||
@ -1157,6 +1192,35 @@ int ObChunkDatumStore::add_row(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObChunkDatumStore::add_row(const ObDatum *datums, const int64_t cnt,
|
||||
const int64_t extra_size, StoredRow **stored_row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_inited()) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else {
|
||||
int64_t head_size = sizeof(StoredRow);
|
||||
int64_t datum_size = sizeof(ObDatum) * cnt;
|
||||
int64_t data_size = 0;
|
||||
for (int64_t i = 0; i < cnt; ++i) {
|
||||
data_size += datums[i].len_;
|
||||
}
|
||||
const int64_t row_size = head_size + datum_size + extra_size + data_size;
|
||||
if (OB_FAIL(ensure_write_blk(row_size))) {
|
||||
LOG_WARN("ensure write block failed", K(ret));
|
||||
} else if (OB_FAIL(cur_blk_->copy_datums(datums, cnt, extra_size, stored_row))) {
|
||||
LOG_WARN("add row to block failed", K(ret), K(datums), K(cnt), K(extra_size), K(row_size));
|
||||
} else {
|
||||
row_cnt_++;
|
||||
if (col_count_ < 0) {
|
||||
col_count_ = cnt;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObChunkDatumStore::add_row(const ShadowStoredRow &sr, StoredRow **stored_row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -443,6 +443,10 @@ public:
|
||||
int add_row(const common::ObIArray<ObExpr*> &exprs, ObEvalCtx &ctx,
|
||||
const int64_t row_size, uint32_t row_extend_size, StoredRow **stored_row = nullptr);
|
||||
int copy_stored_row(const StoredRow &stored_row, StoredRow **dst_sr);
|
||||
int copy_datums(const ObDatum *datums,
|
||||
const int64_t cnt,
|
||||
const int64_t extra_size,
|
||||
StoredRow **dst_sr);
|
||||
//the memory of shadow stored row is not continuous,
|
||||
//so you cannot directly copy the memory of the entire stored row,
|
||||
//and you should make a deep copy of each datum in turn
|
||||
@ -853,6 +857,8 @@ public:
|
||||
int add_row(const common::ObIArray<ObExpr*> &exprs, ObEvalCtx *ctx,
|
||||
StoredRow **stored_row = nullptr);
|
||||
int add_row(const StoredRow &sr, StoredRow **stored_row = nullptr);
|
||||
int add_row(const ObDatum *datums, const int64_t cnt,
|
||||
const int64_t extra_size, StoredRow **stored_row);
|
||||
int add_row(const StoredRow &sr, ObEvalCtx *ctx, StoredRow **stored_row = nullptr);
|
||||
int add_row(const ShadowStoredRow &sr, StoredRow **stored_row = nullptr);
|
||||
|
||||
|
||||
@ -41,45 +41,21 @@ int ObMaterialOp::inner_open()
|
||||
|
||||
void ObMaterialOp::destroy()
|
||||
{
|
||||
sql_mem_processor_.unregister_profile_if_necessary();
|
||||
datum_store_it_.reset();
|
||||
datum_store_.reset();
|
||||
destroy_mem_context();
|
||||
material_impl_.unregister_profile_if_necessary();
|
||||
material_impl_.~ObMaterialOpImpl();
|
||||
ObOperator::destroy();
|
||||
}
|
||||
|
||||
int ObMaterialOp::process_dump()
|
||||
int ObMaterialOp::init_material_impl(int64_t tenant_id, int64_t row_count)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool updated = false;
|
||||
bool dumped = false;
|
||||
UNUSED(updated);
|
||||
// 对于material由于需要保序,所以dump实现方式是,会dump掉所有的,同时可以保留最后的内存数据。目前选择一定是从前往后dump
|
||||
// 还一种方式实现是,dump剩下到最大内存量的数据,以后写入数据则必须dump,但这种方式必须对内存进行伸缩处理
|
||||
if (OB_FAIL(sql_mem_processor_.update_max_available_mem_size_periodically(
|
||||
&mem_context_->get_malloc_allocator(),
|
||||
[&](int64_t cur_cnt){ return datum_store_.get_row_cnt_in_memory() > cur_cnt; },
|
||||
updated))) {
|
||||
LOG_WARN("failed to update max available memory size periodically", K(ret));
|
||||
} else if (need_dump() && GCONF.is_sql_operator_dump_enabled()
|
||||
&& OB_FAIL(sql_mem_processor_.extend_max_memory_size(
|
||||
&mem_context_->get_malloc_allocator(),
|
||||
[&](int64_t max_memory_size) {
|
||||
return sql_mem_processor_.get_data_size() > max_memory_size;
|
||||
},
|
||||
dumped, sql_mem_processor_.get_data_size()))) {
|
||||
LOG_WARN("failed to extend max memory size", K(ret));
|
||||
} else if (dumped) {
|
||||
if (OB_FAIL(datum_store_.dump(false, true))) {
|
||||
LOG_WARN("failed to dump row store", K(ret));
|
||||
} else {
|
||||
sql_mem_processor_.reset();
|
||||
sql_mem_processor_.set_number_pass(1);
|
||||
LOG_TRACE("trace material dump",
|
||||
K(sql_mem_processor_.get_data_size()),
|
||||
K(datum_store_.get_row_cnt_in_memory()),
|
||||
K(sql_mem_processor_.get_mem_bound()));
|
||||
}
|
||||
if (OB_FAIL(material_impl_.init(tenant_id, &eval_ctx_, &ctx_, &io_event_observer_))) {
|
||||
LOG_WARN("failed to init material impl", K(tenant_id));
|
||||
} else {
|
||||
material_impl_.set_input_rows(row_count);
|
||||
material_impl_.set_input_width(MY_SPEC.width_);
|
||||
material_impl_.set_operator_type(MY_SPEC.type_);
|
||||
material_impl_.set_operator_id(MY_SPEC.id_);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -87,51 +63,20 @@ int ObMaterialOp::process_dump()
|
||||
int ObMaterialOp::get_all_row_from_child(ObSQLSessionInfo &session)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool first_row = true;
|
||||
int64_t tenant_id = session.get_effective_tenant_id();
|
||||
if (OB_ISNULL(mem_context_)) {
|
||||
lib::ContextParam param;
|
||||
param.set_mem_attr(tenant_id, ObModIds::OB_SQL_SORT_ROW, ObCtxIds::WORK_AREA)
|
||||
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
|
||||
if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(mem_context_, param))) {
|
||||
LOG_WARN("create entity failed", K(ret));
|
||||
} else if (OB_ISNULL(mem_context_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("null memory entity returned", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(datum_store_.init(UINT64_MAX, tenant_id, ObCtxIds::WORK_AREA))) {
|
||||
LOG_WARN("init row store failed", K(ret));
|
||||
} else {
|
||||
datum_store_.set_allocator(mem_context_->get_malloc_allocator());
|
||||
datum_store_.set_callback(&sql_mem_processor_);
|
||||
datum_store_.set_io_event_observer(&io_event_observer_);
|
||||
int64_t row_count = MY_SPEC.rows_;
|
||||
if (OB_FAIL(ObPxEstimateSizeUtil::get_px_size(
|
||||
&ctx_, MY_SPEC.px_est_size_factor_, row_count, row_count))) {
|
||||
LOG_WARN("failed to get px size", K(ret));
|
||||
} else if (OB_FAIL(init_material_impl(tenant_id, row_count))) {
|
||||
LOG_WARN("failed to init material impl");
|
||||
}
|
||||
|
||||
while (OB_SUCCESS == ret) {
|
||||
clear_evaluated_flag();
|
||||
if (OB_FAIL(child_->get_next_row())) {
|
||||
} else if (first_row) {
|
||||
int64_t row_count = MY_SPEC.rows_;
|
||||
if (OB_FAIL(ObPxEstimateSizeUtil::get_px_size(
|
||||
&ctx_, MY_SPEC.px_est_size_factor_, row_count, row_count))) {
|
||||
LOG_WARN("failed to get px size", K(ret));
|
||||
} else if (OB_FAIL(sql_mem_processor_.init(
|
||||
&mem_context_->get_malloc_allocator(),
|
||||
tenant_id,
|
||||
row_count * MY_SPEC.width_, MY_SPEC.type_, MY_SPEC.id_, &ctx_))) {
|
||||
LOG_WARN("failed to init sql memory manager processor", K(ret));
|
||||
} else {
|
||||
datum_store_.set_dir_id(sql_mem_processor_.get_dir_id());
|
||||
LOG_TRACE("trace init sql mem mgr for material", K(row_count), K(MY_SPEC.width_),
|
||||
K(profile_.get_cache_size()), K(profile_.get_expect_size()));
|
||||
}
|
||||
first_row = false;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(process_dump())) {
|
||||
LOG_WARN("failed to process dump", K(ret));
|
||||
} else if (OB_FAIL(datum_store_.add_row(child_->get_spec().output_, &eval_ctx_))) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(material_impl_.add_row(child_->get_spec().output_))) {
|
||||
LOG_WARN("failed to add row to row store", K(ret));
|
||||
}
|
||||
}
|
||||
@ -140,10 +85,8 @@ int ObMaterialOp::get_all_row_from_child(ObSQLSessionInfo &session)
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
// 最后一批数据retain到内存中
|
||||
if (OB_FAIL(datum_store_.finish_add_row(false))) {
|
||||
if (OB_FAIL(material_impl_.finish_add_row())) {
|
||||
LOG_WARN("failed to finish add row to row store", K(ret));
|
||||
} else if (OB_FAIL(datum_store_.begin(datum_store_it_))) {
|
||||
LOG_WARN("failed to begin iterator for chunk row store", K(ret));
|
||||
} else {
|
||||
is_first_ = false;
|
||||
}
|
||||
@ -154,69 +97,41 @@ int ObMaterialOp::get_all_row_from_child(ObSQLSessionInfo &session)
|
||||
int ObMaterialOp::get_all_batch_from_child(ObSQLSessionInfo &session)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool first_row = true;
|
||||
int64_t tenant_id = session.get_effective_tenant_id();
|
||||
if (OB_ISNULL(mem_context_)) {
|
||||
lib::ContextParam param;
|
||||
param.set_mem_attr(tenant_id, ObModIds::OB_SQL_SORT_ROW, ObCtxIds::WORK_AREA)
|
||||
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
|
||||
if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(mem_context_, param))) {
|
||||
LOG_WARN("create entity failed", K(ret));
|
||||
} else if (OB_ISNULL(mem_context_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("null memory entity returned", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(datum_store_.init(UINT64_MAX, tenant_id, ObCtxIds::WORK_AREA))) {
|
||||
LOG_WARN("init row store failed", K(ret));
|
||||
} else {
|
||||
datum_store_.set_allocator(mem_context_->get_malloc_allocator());
|
||||
datum_store_.set_callback(&sql_mem_processor_);
|
||||
datum_store_.set_io_event_observer(&io_event_observer_);
|
||||
int64_t row_count = MY_SPEC.rows_;
|
||||
if (OB_FAIL(ObPxEstimateSizeUtil::get_px_size(
|
||||
&ctx_, MY_SPEC.px_est_size_factor_, row_count, row_count))) {
|
||||
LOG_WARN("failed to get px size", K(ret));
|
||||
} else if (OB_FAIL(init_material_impl(tenant_id, row_count))) {
|
||||
LOG_WARN("failed to init material impl");
|
||||
}
|
||||
|
||||
const ObBatchRows *input_brs = nullptr;
|
||||
bool iter_end = false;
|
||||
while (OB_SUCCESS == ret && !iter_end) {
|
||||
clear_evaluated_flag();
|
||||
if (OB_FAIL(child_->get_next_batch(MY_SPEC.max_batch_size_, input_brs))) {
|
||||
LOG_WARN("failed to get next batch", K(ret));
|
||||
} else if (first_row) {
|
||||
int64_t row_count = MY_SPEC.rows_;
|
||||
if (OB_FAIL(ObPxEstimateSizeUtil::get_px_size(
|
||||
&ctx_, MY_SPEC.px_est_size_factor_, row_count, row_count))) {
|
||||
LOG_WARN("failed to get px size", K(ret));
|
||||
} else if (OB_FAIL(sql_mem_processor_.init(
|
||||
&mem_context_->get_malloc_allocator(),
|
||||
tenant_id,
|
||||
row_count * MY_SPEC.width_, MY_SPEC.type_, MY_SPEC.id_, &ctx_))) {
|
||||
LOG_WARN("failed to init sql memory manager processor", K(ret));
|
||||
} else {
|
||||
datum_store_.set_dir_id(sql_mem_processor_.get_dir_id());
|
||||
LOG_TRACE("trace init sql mem mgr for material", K(row_count), K(MY_SPEC.width_),
|
||||
K(profile_.get_cache_size()), K(profile_.get_expect_size()));
|
||||
}
|
||||
first_row = false;
|
||||
}
|
||||
int64_t read_rows = -1;
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(process_dump())) {
|
||||
LOG_WARN("failed to process dump", K(ret));
|
||||
} else if (OB_FAIL(datum_store_.add_batch(child_->get_spec().output_, eval_ctx_,
|
||||
*input_brs->skip_, input_brs->size_,
|
||||
read_rows))) {
|
||||
} else if (OB_FAIL(material_impl_.add_batch(child_->get_spec().output_,
|
||||
*input_brs->skip_, input_brs->size_))) {
|
||||
LOG_WARN("failed to add row to row store", K(ret));
|
||||
} else {
|
||||
iter_end = input_brs->end_;
|
||||
}
|
||||
}
|
||||
|
||||
// normally child_ should not return OB_ITER_END. We add a defence check here to prevent
|
||||
// some one return OB_ITER_END by mistake.
|
||||
if (OB_UNLIKELY(OB_ITER_END == ret)) {
|
||||
brs_.size_ = 0;
|
||||
brs_.end_ = true;
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
// 最后一批数据retain到内存中
|
||||
if (OB_FAIL(datum_store_.finish_add_row(false))) {
|
||||
if (OB_FAIL(material_impl_.finish_add_row())) {
|
||||
LOG_WARN("failed to finish add row to row store", K(ret));
|
||||
} else if (OB_FAIL(datum_store_.begin(datum_store_it_))) {
|
||||
LOG_WARN("failed to begin iterator for chunk row store", K(ret));
|
||||
} else {
|
||||
is_first_ = false;
|
||||
}
|
||||
@ -227,8 +142,7 @@ int ObMaterialOp::get_all_batch_from_child(ObSQLSessionInfo &session)
|
||||
int ObMaterialOp::inner_rescan()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
datum_store_it_.reset();
|
||||
datum_store_.reset();
|
||||
material_impl_.rescan();
|
||||
// restart material op
|
||||
if (OB_FAIL(ObOperator::inner_rescan())) {
|
||||
LOG_WARN("operator rescan failed", K(ret));
|
||||
@ -243,7 +157,7 @@ int ObMaterialOp::rewind()
|
||||
if (OB_FAIL(ObOperator::inner_rescan())) {//do not cascade rescan
|
||||
LOG_WARN("failed to do inner rescan", K(ret));
|
||||
} else {
|
||||
datum_store_it_.reset();
|
||||
material_impl_.rewind();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -251,9 +165,7 @@ int ObMaterialOp::rewind()
|
||||
int ObMaterialOp::inner_close()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
datum_store_it_.reset();
|
||||
datum_store_.reset();
|
||||
sql_mem_processor_.unregister_profile();
|
||||
material_impl_.reset();
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -271,7 +183,7 @@ int ObMaterialOp::inner_get_next_row()
|
||||
clear_evaluated_flag();
|
||||
if (is_first_ && OB_FAIL(get_all_row_from_child(*ctx_.get_my_session()))) {
|
||||
LOG_WARN("failed to get all row from child", K(child_), K(ret));
|
||||
} else if (OB_FAIL(datum_store_it_.get_next_row(child_->get_spec().output_, eval_ctx_))) {
|
||||
} else if (OB_FAIL(material_impl_.get_next_row(child_->get_spec().output_))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("get row from row store failed", K(ret));
|
||||
}
|
||||
@ -300,9 +212,9 @@ int ObMaterialOp::inner_get_next_batch(int64_t max_row_cnt)
|
||||
clear_evaluated_flag();
|
||||
if (is_first_ && OB_FAIL(get_all_batch_from_child(*ctx_.get_my_session()))) {
|
||||
LOG_WARN("failed to get all batch from child", K(child_), K(ret));
|
||||
} else if (OB_FAIL(datum_store_it_.get_next_batch(
|
||||
child_->get_spec().output_, eval_ctx_,
|
||||
std::min(MY_SPEC.max_batch_size_, max_row_cnt), read_rows))) {
|
||||
} else if (OB_FAIL(material_impl_.get_next_batch(child_->get_spec().output_,
|
||||
std::min(MY_SPEC.max_batch_size_, max_row_cnt),
|
||||
read_rows))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("failed to get next batch from datum store", K(ret));
|
||||
}
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
#include "sql/engine/ob_operator.h"
|
||||
#include "sql/engine/basic/ob_chunk_datum_store.h"
|
||||
#include "sql/engine/ob_sql_mem_mgr_processor.h"
|
||||
#include "sql/engine/basic/ob_material_op_impl.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -52,9 +53,9 @@ class ObMaterialOp : public ObOperator
|
||||
public:
|
||||
ObMaterialOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input)
|
||||
: ObOperator(exec_ctx, spec, input),
|
||||
mem_context_(nullptr), datum_store_(), datum_store_it_(),
|
||||
profile_(ObSqlWorkAreaType::HASH_WORK_AREA),
|
||||
sql_mem_processor_(profile_, op_monitor_info_), is_first_(false)
|
||||
is_first_(false),
|
||||
material_impl_(op_monitor_info_, profile_)
|
||||
{}
|
||||
|
||||
virtual int inner_open() override;
|
||||
@ -63,36 +64,24 @@ public:
|
||||
virtual int inner_get_next_batch(int64_t max_row_cnt) override;
|
||||
virtual int inner_close() override;
|
||||
virtual void destroy() override;
|
||||
int init_material_impl(int64_t tenant_id, int64_t row_count);
|
||||
|
||||
int get_material_row_count(int64_t &count) const
|
||||
{
|
||||
count = datum_store_.get_row_cnt();
|
||||
count = material_impl_.get_material_row_count();
|
||||
return common::OB_SUCCESS;
|
||||
}
|
||||
// reset material iterator, used for NLJ/NL connectby
|
||||
int rewind();
|
||||
private:
|
||||
int process_dump();
|
||||
int get_all_row_from_child(ObSQLSessionInfo &session);
|
||||
int get_all_batch_from_child(ObSQLSessionInfo &session);
|
||||
|
||||
bool need_dump()
|
||||
{ return sql_mem_processor_.get_data_size() > sql_mem_processor_.get_mem_bound(); }
|
||||
void destroy_mem_context()
|
||||
{
|
||||
if (nullptr != mem_context_) {
|
||||
DESTROY_CONTEXT(mem_context_);
|
||||
mem_context_ = nullptr;
|
||||
}
|
||||
}
|
||||
private:
|
||||
lib::MemoryContext mem_context_;
|
||||
ObChunkDatumStore datum_store_;
|
||||
ObChunkDatumStore::Iterator datum_store_it_;
|
||||
friend class ObValues;
|
||||
ObSqlWorkAreaProfile profile_;
|
||||
ObSqlMemMgrProcessor sql_mem_processor_;
|
||||
bool is_first_;
|
||||
ObMaterialOpImpl material_impl_;
|
||||
};
|
||||
|
||||
} // end namespace sql
|
||||
|
||||
309
src/sql/engine/basic/ob_material_op_impl.cpp
Normal file
309
src/sql/engine/basic/ob_material_op_impl.cpp
Normal file
@ -0,0 +1,309 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#define USING_LOG_PREFIX SQL_ENG
|
||||
|
||||
#include "ob_material_op_impl.h"
|
||||
#include "sql/engine/ob_operator.h"
|
||||
#include "sql/engine/ob_tenant_sql_memory_manager.h"
|
||||
#include "storage/blocksstable/encoding/ob_encoding_query_util.h"
|
||||
#include "lib/container/ob_iarray.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
using namespace common;
|
||||
namespace sql
|
||||
{
|
||||
ObMaterialOpImpl::ObMaterialOpImpl(ObMonitorNode &op_monitor_info, ObSqlWorkAreaProfile &profile)
|
||||
: inited_(false),
|
||||
got_first_row_(false),
|
||||
tenant_id_(OB_INVALID_ID),
|
||||
exec_ctx_(nullptr),
|
||||
mem_context_(nullptr),
|
||||
datum_store_(),
|
||||
datum_store_it_(),
|
||||
eval_ctx_(nullptr),
|
||||
profile_(profile),
|
||||
sql_mem_processor_(profile, op_monitor_info),
|
||||
input_rows_(OB_INVALID_ID),
|
||||
input_width_(OB_INVALID_ID),
|
||||
op_type_(PHY_INVALID),
|
||||
op_id_(UINT64_MAX)
|
||||
{}
|
||||
|
||||
ObMaterialOpImpl::~ObMaterialOpImpl()
|
||||
{
|
||||
reset();
|
||||
if (nullptr != mem_context_) {
|
||||
DESTROY_CONTEXT(mem_context_);
|
||||
mem_context_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void ObMaterialOpImpl::reset()
|
||||
{
|
||||
sql_mem_processor_.unregister_profile();
|
||||
io_event_observer_ = nullptr;
|
||||
datum_store_.reset();
|
||||
datum_store_it_.reset();
|
||||
got_first_row_ = false;
|
||||
inited_ = false;
|
||||
// can not destroy mem_entify here, the memory may hold by %iter_ or %datum_store_
|
||||
}
|
||||
|
||||
int ObMaterialOpImpl::init(const uint64_t tenant_id,
|
||||
ObEvalCtx *eval_ctx,
|
||||
ObExecContext *exec_ctx,
|
||||
ObIOEventObserver *observer,
|
||||
const int64_t default_block_size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (inited_) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("init twice");
|
||||
} else if (OB_INVALID_ID == tenant_id) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(tenant_id));
|
||||
} else if (OB_ISNULL(eval_ctx) || OB_ISNULL(exec_ctx)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get null argument", K(eval_ctx), K(exec_ctx));
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
eval_ctx_ = eval_ctx;
|
||||
exec_ctx_ = exec_ctx;
|
||||
io_event_observer_ = observer;
|
||||
if (OB_ISNULL(mem_context_)) {
|
||||
lib::ContextParam param;
|
||||
param.set_mem_attr(tenant_id, ObModIds::OB_SQL_SORT_ROW, ObCtxIds::WORK_AREA)
|
||||
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
|
||||
if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(mem_context_, param))) {
|
||||
LOG_WARN("create entity failed");
|
||||
} else if (OB_ISNULL(mem_context_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("null memory entity returned");
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(datum_store_.init(UINT64_MAX, tenant_id_, ObCtxIds::WORK_AREA))) {
|
||||
LOG_WARN("init row store failed");
|
||||
} else {
|
||||
datum_store_.set_allocator(mem_context_->get_malloc_allocator());
|
||||
datum_store_.set_callback(&sql_mem_processor_);
|
||||
datum_store_.set_io_event_observer(io_event_observer_);
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
inited_ = true;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMaterialOpImpl::add_row(const common::ObIArray<ObExpr*> &exprs,
|
||||
const ObChunkDatumStore::StoredRow *&store_row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObChunkDatumStore::StoredRow *sr = NULL;
|
||||
if (OB_FAIL(before_add_row())) {
|
||||
LOG_WARN("before add row process failed");
|
||||
} else if (OB_FAIL(datum_store_.add_row(exprs, eval_ctx_, &sr))) {
|
||||
LOG_WARN("add store row failed", K(mem_context_->used()), K(get_memory_limit()));
|
||||
} else {
|
||||
store_row = sr;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMaterialOpImpl::add_row(const ObChunkDatumStore::StoredRow &src_sr,
|
||||
const ObChunkDatumStore::StoredRow *&store_row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObChunkDatumStore::StoredRow *sr = NULL;
|
||||
if (OB_FAIL(before_add_row())) {
|
||||
LOG_WARN("before add row process failed");
|
||||
} else if (OB_FAIL(datum_store_.add_row(src_sr, &sr))) {
|
||||
LOG_WARN("add store row failed", K(mem_context_->used()), K(get_memory_limit()));
|
||||
} else {
|
||||
store_row = sr;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMaterialOpImpl::add_row(const ObDatum *src_datums,
|
||||
const int64_t datum_cnt,
|
||||
const int64_t extra_size,
|
||||
const ObChunkDatumStore::StoredRow *&store_row)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObChunkDatumStore::StoredRow *sr = NULL;
|
||||
if (OB_FAIL(before_add_row())) {
|
||||
LOG_WARN("before add row process failed");
|
||||
} else if (OB_FAIL(datum_store_.add_row(src_datums, datum_cnt, extra_size, &sr))) {
|
||||
LOG_WARN("add store row failed", K(mem_context_->used()), K(get_memory_limit()));
|
||||
} else {
|
||||
store_row = sr;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMaterialOpImpl::add_batch(const common::ObIArray<ObExpr *> &exprs,
|
||||
const ObBitVector &skip,
|
||||
const int64_t batch_size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t read_rows = -1;
|
||||
if (OB_FAIL(before_add_row())) {
|
||||
LOG_WARN("before add row process failed");
|
||||
} else if (OB_FAIL(datum_store_.add_batch(exprs, *eval_ctx_, skip, batch_size, read_rows))) {
|
||||
LOG_WARN("add store row failed", K(mem_context_->used()), K(get_memory_limit()));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMaterialOpImpl::finish_add_row()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(datum_store_.finish_add_row(false))) {
|
||||
LOG_WARN("failed to finish add row to row store");
|
||||
} else if (OB_FAIL(datum_store_.begin(datum_store_it_))) {
|
||||
LOG_WARN("failed to begin iterator for chunk row store");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int ObMaterialOpImpl::before_add_row()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init");
|
||||
} else if (OB_UNLIKELY(!got_first_row_)) {
|
||||
int64_t size = OB_INVALID_ID == input_rows_ ? 0 : input_rows_ * input_width_;
|
||||
if (OB_FAIL(sql_mem_processor_.init(&mem_context_->get_malloc_allocator(),
|
||||
tenant_id_, size, op_type_,
|
||||
op_id_, exec_ctx_))) {
|
||||
LOG_WARN("failed to init sql memory manager processor", K(ret));
|
||||
} else {
|
||||
got_first_row_ = true;
|
||||
datum_store_.set_dir_id(sql_mem_processor_.get_dir_id());
|
||||
LOG_TRACE("trace init sql mem mgr for material", K(size), K(input_width_),
|
||||
K(profile_.get_cache_size()), K(profile_.get_expect_size()));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(process_dump())) {
|
||||
LOG_WARN("failed to process dump", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMaterialOpImpl::process_dump()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool updated = false;
|
||||
bool dumped = false;
|
||||
UNUSED(updated);
|
||||
// 对于material由于需要保序,所以dump实现方式是,会dump掉所有的,同时可以保留最后的内存数据。目前选择一定是从前往后dump
|
||||
// 还一种方式实现是,dump剩下到最大内存量的数据,以后写入数据则必须dump,但这种方式必须对内存进行伸缩处理
|
||||
if (OB_FAIL(sql_mem_processor_.update_max_available_mem_size_periodically(
|
||||
&mem_context_->get_malloc_allocator(),
|
||||
[&](int64_t cur_cnt){ return datum_store_.get_row_cnt_in_memory() > cur_cnt; },
|
||||
updated))) {
|
||||
LOG_WARN("failed to update max available memory size periodically", K(ret));
|
||||
} else if (need_dump() && GCONF.is_sql_operator_dump_enabled()
|
||||
&& OB_FAIL(sql_mem_processor_.extend_max_memory_size(
|
||||
&mem_context_->get_malloc_allocator(),
|
||||
[&](int64_t max_memory_size) {
|
||||
return sql_mem_processor_.get_data_size() > max_memory_size;
|
||||
},
|
||||
dumped, sql_mem_processor_.get_data_size()))) {
|
||||
LOG_WARN("failed to extend max memory size", K(ret));
|
||||
} else if (dumped) {
|
||||
if (OB_FAIL(datum_store_.dump(false, true))) {
|
||||
LOG_WARN("failed to dump row store", K(ret));
|
||||
} else {
|
||||
sql_mem_processor_.reset();
|
||||
sql_mem_processor_.set_number_pass(1);
|
||||
LOG_TRACE("trace material dump",
|
||||
K(sql_mem_processor_.get_data_size()),
|
||||
K(datum_store_.get_row_cnt_in_memory()),
|
||||
K(sql_mem_processor_.get_mem_bound()));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMaterialOpImpl::get_next_row(const common::ObIArray<ObExpr*> &exprs)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(datum_store_it_.get_next_row(exprs, *eval_ctx_))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("get row from row store failed");
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMaterialOpImpl::get_next_row(const ObChunkDatumStore::StoredRow *&sr)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(datum_store_it_.get_next_row(sr))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("get row from row store failed");
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMaterialOpImpl::get_next_batch(const common::ObIArray<ObExpr*> &exprs,
|
||||
const int64_t max_rows,
|
||||
int64_t &read_rows)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(datum_store_it_.get_next_batch(exprs, *eval_ctx_, max_rows, read_rows))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("failed to get next batch from datum store");
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMaterialOpImpl::rescan()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
got_first_row_ = false;
|
||||
datum_store_it_.reset();
|
||||
datum_store_.reset();
|
||||
inited_ = false;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMaterialOpImpl::reuse()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
got_first_row_ = false;
|
||||
datum_store_it_.reset();
|
||||
datum_store_.reset();
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMaterialOpImpl::rewind()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
datum_store_it_.reset();
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
124
src/sql/engine/basic/ob_material_op_impl.h
Normal file
124
src/sql/engine/basic/ob_material_op_impl.h
Normal file
@ -0,0 +1,124 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#ifndef OCEANBASE_SQL_ENGINE_BASIC_MATERIAL_OP_IMPL_H_
|
||||
#define OCEANBASE_SQL_ENGINE_BASIC_MATERIAL_OP_IMPL_H_
|
||||
|
||||
#include "lib/container/ob_array.h"
|
||||
#include "lib/container/ob_heap.h"
|
||||
#include "sql/engine/basic/ob_chunk_datum_store.h"
|
||||
#include "sql/engine/ob_sql_mem_mgr_processor.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace sql
|
||||
{
|
||||
|
||||
class ObMaterialOpImpl {
|
||||
public:
|
||||
explicit ObMaterialOpImpl(ObMonitorNode &op_monitor_info, ObSqlWorkAreaProfile &profile);
|
||||
virtual ~ObMaterialOpImpl();
|
||||
|
||||
void reset();
|
||||
void unregister_profile_if_necessary()
|
||||
{
|
||||
sql_mem_processor_.unregister_profile_if_necessary();
|
||||
}
|
||||
|
||||
int init(const uint64_t tenant_id,
|
||||
ObEvalCtx *eval_ctx,
|
||||
ObExecContext *exec_ctx,
|
||||
ObIOEventObserver *observer,
|
||||
const int64_t default_block_size = ObChunkDatumStore::BLOCK_SIZE);
|
||||
inline void set_input_rows(int64_t input_rows) { input_rows_ = input_rows; }
|
||||
inline void set_input_width(int64_t input_width) { input_width_ = input_width; }
|
||||
inline void set_operator_type(ObPhyOperatorType op_type) { op_type_ = op_type; }
|
||||
inline void set_operator_id(uint64_t op_id) { op_id_ = op_id; }
|
||||
int64_t get_material_row_count() const { return datum_store_.get_row_cnt(); }
|
||||
|
||||
// before add row process: update date used memory, try dump ...
|
||||
int before_add_row();
|
||||
|
||||
int add_row(const common::ObIArray<ObExpr*> &exprs,
|
||||
const ObChunkDatumStore::StoredRow *&store_row);
|
||||
int add_row(const ObChunkDatumStore::StoredRow &src_sr,
|
||||
const ObChunkDatumStore::StoredRow *&store_row);
|
||||
int add_row(const ObDatum *src_datums,
|
||||
const int64_t datum_cnt,
|
||||
const int64_t extra_size,
|
||||
const ObChunkDatumStore::StoredRow *&store_row);
|
||||
int add_row(const ObChunkDatumStore::StoredRow &sr)
|
||||
{
|
||||
const ObChunkDatumStore::StoredRow *store_row = NULL;
|
||||
return add_row(sr, store_row);
|
||||
}
|
||||
int add_row(const common::ObIArray<ObExpr*> &exprs)
|
||||
{
|
||||
const ObChunkDatumStore::StoredRow *store_row = NULL;
|
||||
return add_row(exprs, store_row);
|
||||
}
|
||||
|
||||
// add batch rows by selector
|
||||
int add_batch(const common::ObIArray<ObExpr *> &exprs,
|
||||
const ObBitVector &skip,
|
||||
const int64_t batch_size);
|
||||
int finish_add_row();
|
||||
|
||||
int get_next_row(const common::ObIArray<ObExpr*> &exprs);
|
||||
int get_next_row(const ObChunkDatumStore::StoredRow *&sr);
|
||||
|
||||
// get next batch rows, %max_cnt should equal or smaller than max batch size.
|
||||
// return OB_ITER_END for EOF
|
||||
int get_next_batch(const common::ObIArray<ObExpr*> &exprs,
|
||||
const int64_t max_rows,
|
||||
int64_t &read_rows);
|
||||
|
||||
int rescan();
|
||||
int reuse();
|
||||
// rewind get_next_row() iterator to begin.
|
||||
int rewind();
|
||||
|
||||
|
||||
private:
|
||||
int process_dump();
|
||||
bool need_dump()
|
||||
{ return sql_mem_processor_.get_data_size() > sql_mem_processor_.get_mem_bound(); }
|
||||
void destroy_mem_context()
|
||||
{
|
||||
if (nullptr != mem_context_) {
|
||||
DESTROY_CONTEXT(mem_context_);
|
||||
mem_context_ = nullptr;
|
||||
}
|
||||
}
|
||||
private:
|
||||
friend class ObValues;
|
||||
bool inited_;
|
||||
bool got_first_row_;
|
||||
int64_t tenant_id_;
|
||||
ObExecContext *exec_ctx_;
|
||||
lib::MemoryContext mem_context_;
|
||||
ObChunkDatumStore datum_store_;
|
||||
ObChunkDatumStore::Iterator datum_store_it_;
|
||||
ObEvalCtx *eval_ctx_;
|
||||
ObSqlWorkAreaProfile &profile_;
|
||||
ObSqlMemMgrProcessor sql_mem_processor_;
|
||||
ObIOEventObserver *io_event_observer_;
|
||||
int64_t input_rows_;
|
||||
int64_t input_width_;
|
||||
ObPhyOperatorType op_type_;
|
||||
uint64_t op_id_;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
Reference in New Issue
Block a user