discard ObChunkRowDE mod && do not push view task when queue size almost full

This commit is contained in:
18523270951@163.com
2023-09-09 11:44:07 +00:00
committed by ob-robot
parent e1081698d6
commit de8174ebf2
36 changed files with 68 additions and 55 deletions

View File

@ -97,7 +97,7 @@ OB_DEF_DESERIALIZE(ObDynamicSamplePieceMsg)
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
} else {
ObChunkDatumStore *tmp_store = new (tmp_buf) ObChunkDatumStore;
ObChunkDatumStore *tmp_store = new (tmp_buf) ObChunkDatumStore("DYN_SAMPLE_CTX");
if (OB_FAIL(tmp_store->deserialize(buf, data_len, pos))) {
LOG_WARN("deserialize datum store failed", K(ret), K(i));
} else if (OB_FAIL(row_stores_.push_back(tmp_store))) {
@ -306,7 +306,7 @@ int ObDynamicSamplePieceMsgCtx::init(const ObIArray<uint64_t> &tablet_ids)
LOG_WARN("allocate memory failed", K(ret), K(tablet_ids.count()));
}
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) {
ObChunkDatumStore *sample_store = new (buf + i * sizeof(ObChunkDatumStore)) ObChunkDatumStore;
ObChunkDatumStore *sample_store = new (buf + i * sizeof(ObChunkDatumStore)) ObChunkDatumStore("DYN_SAMPLE_CTX");
if (OB_FAIL(sample_store->init(0, tenant_id_, ObCtxIds::DEFAULT_CTX_ID,
"DYN_SAMPLE_CTX", false/*enable dump*/))) {
LOG_WARN("init sample chunk store failed", K(ret), K(i));

View File

@ -76,7 +76,7 @@ public:
using WholeMsgProvider = ObWholeMsgProvider<ObWinbufWholeMsg>;
public:
ObWinbufWholeMsg() : ready_state_(0), is_empty_(true), is_datum_(false),
row_store_(), datum_store_(), assign_allocator_()
row_store_(), datum_store_("PXDhWinbuf"), assign_allocator_()
{}
~ObWinbufWholeMsg() = default;
int assign(const ObWinbufWholeMsg &other, common::ObIAllocator *allocator = NULL);

View File

@ -514,7 +514,7 @@ int ObPxDistTransmitOp::build_row_sample_piece_msg(int64_t expected_range_count,
int64_t tenant_id = ctx_.get_my_session()->get_effective_tenant_id();
ObChunkDatumStore *sample_store = OB_NEWx(ObChunkDatumStore, (&ctx_.get_allocator()));
ObChunkDatumStore *sample_store = OB_NEWx(ObChunkDatumStore, &ctx_.get_allocator(), "DYN_SAMPLE_CTX");
OV(NULL != sample_store, OB_ALLOCATE_MEMORY_FAILED);
bool sample_store_dump = false;
@ -577,7 +577,7 @@ int ObPxDistTransmitSpec::register_to_datahub(ObExecContext &ctx) const
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
} else {
ObChunkDatumStore *sample_store = new (chunk_buf) ObChunkDatumStore;
ObChunkDatumStore *sample_store = new (chunk_buf) ObChunkDatumStore("DYN_SAMPLE_CTX");
if (OB_FAIL(sample_store->init(0,
ctx.get_my_session()->get_effective_tenant_id(),
ObCtxIds::DEFAULT_CTX_ID, "DYN_SAMPLE_CTX", false/*enable dump*/))) {

View File

@ -512,7 +512,7 @@ int ObPxMSReceiveOp::GlobalOrderInput::create_chunk_datum_store(
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("create ra row store fail", K(ret));
} else {
row_store = new (buf) ObChunkDatumStore();
row_store = new (buf) ObChunkDatumStore("PxMSRecvGlobal");
// TODO: llongzhong.wlz 这里应该使用一个参数来控制row_store存储的数据量,或者SQL内存管理自动控制
int64_t mem_limit = 0;
row_store->set_allocator(*alloc_);

View File

@ -187,7 +187,7 @@ private:
public:
explicit LocalOrderInput()
: MergeSortInput(nullptr, nullptr, false),
datum_store_()
datum_store_("PxMSRecvLocal")
{
get_row_store_ = &datum_store_;
add_row_store_ = &datum_store_;