Interm Result Management is Splitted to Tenants

This commit is contained in:
obdev
2023-10-19 06:39:26 +00:00
committed by ob-robot
parent c72190a2c0
commit a68da2aa9c
17 changed files with 249 additions and 198 deletions

View File

@ -291,11 +291,6 @@ int ObDtl::init()
is_inited_ = true;
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(ObDTLIntermResultManager::getInstance().init())) {
LOG_WARN("fail to init dtl internal result manager", K(ret));
}
}
return ret;
}

View File

@ -380,7 +380,7 @@ int ObDtlBasicChannel::mock_eof_buffer(int64_t timeout_ts)
buffer->seq_no() = 1;
buffer->pos() = 0;
if (use_interm_result_) {
if (OB_FAIL(ObDTLIntermResultManager::process_interm_result(buffer, id_))) {
if (OB_FAIL(MTL(ObDTLIntermResultManager*)->process_interm_result(buffer, id_))) {
LOG_WARN("fail to process internal result", K(ret));
}
} else {
@ -663,30 +663,32 @@ int ObDtlBasicChannel::process1(
ObDTLIntermResultKey key;
key.channel_id_ = id_;
key.batch_id_ = batch_id_;
if (channel_is_eof_) {
ret = OB_EAGAIN;
} else if (OB_FAIL(ObDTLIntermResultManager::getInstance().atomic_get_interm_result_info(
key, result_info_guard_))) {
if (is_px_channel()) {
MTL_SWITCH(tenant_id_) {
if (channel_is_eof_) {
ret = OB_EAGAIN;
} else if (ignore_error()) {
ret = OB_SUCCESS;
} else if (OB_FAIL(MTL(ObDTLIntermResultManager*)->atomic_get_interm_result_info(
key, result_info_guard_))) {
if (is_px_channel()) {
ret = OB_EAGAIN;
} else if (ignore_error()) {
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to get row store", K(ret));
}
LOG_TRACE("fail to get row store", K(ret), K(key.batch_id_), K(key.channel_id_));
} else if (FALSE_IT(result_info = result_info_guard_.result_info_)) {
} else if (OB_SUCCESS != result_info->ret_) {
ret = result_info->ret_;
LOG_WARN("the interm result info meet a error", K(ret));
} else if (!result_info->is_store_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("there is no row store in internal result", K(ret));
} else if (OB_FAIL(DTL_IR_STORE_DO(*result_info, finish_add_row, true))) {
LOG_WARN("failed to finish add row", K(ret));
} else {
LOG_WARN("fail to get row store", K(ret));
}
LOG_TRACE("fail to get row store", K(ret), K(key.batch_id_), K(key.channel_id_));
} else if (FALSE_IT(result_info = result_info_guard_.result_info_)) {
} else if (OB_SUCCESS != result_info->ret_) {
ret = result_info->ret_;
LOG_WARN("the interm result info meet a error", K(ret));
} else if (!result_info->is_store_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("there is no row store in internal result", K(ret));
} else if (OB_FAIL(DTL_IR_STORE_DO(*result_info, finish_add_row, true))) {
LOG_WARN("failed to finish add row", K(ret));
} else {
if (OB_FAIL(result_info->datum_store_->begin(datum_iter_))) {
LOG_WARN("begin iterator failed", K(ret));
if (OB_FAIL(result_info->datum_store_->begin(datum_iter_))) {
LOG_WARN("begin iterator failed", K(ret));
}
}
}
if (OB_SUCC(ret) && !channel_is_eof()) {

View File

@ -11,6 +11,9 @@
*/
#define USING_LOG_PREFIX SQL_DTL
#include "observer/omt/ob_tenant.h"
#include "observer/omt/ob_multi_tenant.h"
#include "observer/ob_server_struct.h"
#include "ob_dtl_interm_result_manager.h"
#include "storage/blocksstable/ob_tmp_file.h"
#include "observer/virtual_table/ob_all_virtual_dtl_interm_result_monitor.h"
@ -23,45 +26,6 @@ using namespace common;
using namespace sql;
using namespace oceanbase::sql::dtl;
void ObDTLIntermResultGC::runTimerTask()
{
int ret = OB_SUCCESS;
cur_time_ = oceanbase::common::ObTimeUtility::current_time();
expire_keys_.reset();
// 之前放到了foreach里面,也就是每个元素会++一次,那这样,
// 如果缓存是一个比较大的质数,则可能需要好久,而且每次都在变,所以可能基本上走不进去,bigbig bug
++dump_count_;
interm_cnt_ = 0;
// dump 每隔10秒 && 未超时 && 未使用 的row_store
if (OB_SUCC(ret)) {
if (OB_FAIL(ObDTLIntermResultManager::getInstance().dump_result_info(*this))) {
LOG_WARN("fail to for each row store", K(ret));
} else {
int64_t dump_cost = oceanbase::common::ObTimeUtility::current_time() - cur_time_;
LOG_INFO("dump dtl interm result cost(us)", K(dump_cost), K(ret),
"interm count", interm_cnt_, "dump count", dump_count_);
}
}
clean_cnt_ = 0;
interm_cnt_ = 0;
cur_time_ = oceanbase::common::ObTimeUtility::current_time();
// 清理超时row_store
if (OB_SUCC(ret)) {
if (OB_FAIL(ObDTLIntermResultManager::getInstance().clear_timeout_result_info(*this))) {
LOG_WARN("fail to for each row store", K(ret));
} else {
int64_t clear_cost = oceanbase::common::ObTimeUtility::current_time() - cur_time_;
LOG_INFO("clear dtl interm result cost(us)", K(clear_cost), K(ret),
K(expire_keys_.count()), "dump count", dump_count_,
"interm count", interm_cnt_, "clean count", clean_cnt_);
}
}
}
void ObDTLIntermResultGC::reset()
{
expire_keys_.reset();
@ -77,9 +41,6 @@ int ObDTLIntermResultGC::operator() (common::hash::HashMapPair<ObDTLIntermResult
OB_SUCCESS == entry.second->ret_ &&
cur_time_ - entry.first.start_time_ > DUMP_TIME_THRESHOLD &&
dis < 0) {
if (NULL != tenant_guard_) {
tenant_guard_->switch_to(entry.second->datum_store_->get_tenant_id());
}
int64_t dump_time = oceanbase::common::ObTimeUtility::current_time();
if (OB_FAIL(DTL_IR_STORE_DO(*entry.second, dump, false, true))) {
LOG_WARN("fail to dump interm row store", K(ret));
@ -95,7 +56,7 @@ int ObDTLIntermResultGC::operator() (common::hash::HashMapPair<ObDTLIntermResult
if (OB_SUCCESS != ret) {
entry.second->ret_ = ret;
// free interm result info datum store in advance for memory optimization.
ObDTLIntermResultManager::getInstance().free_interm_result_info_store(entry.second);
ObDTLIntermResultManager::free_interm_result_info_store(entry.second);
}
}
@ -166,50 +127,51 @@ void ObAtomicAppendPartBlockCall::operator() (common::hash::HashMapPair<ObDTLInt
}
}
int ObEraseTenantIntermResultInfo::operator() (common::hash::HashMapPair<ObDTLIntermResultKey, ObDTLIntermResultInfo *> &entry)
{
int ret = OB_SUCCESS;
if (entry.second->tenant_id_ == tenant_id_) {
if (OB_FAIL(expire_keys_.push_back(entry.first))) {
LOG_WARN("push back failed", K(ret));
ret_ = ret;
}
}
return OB_SUCCESS;
}
ObDTLIntermResultManager &ObDTLIntermResultManager::getInstance()
{
static ObDTLIntermResultManager the_ir_manager;
return the_ir_manager;
}
int ObDTLIntermResultManager::init()
{
int ret = OB_SUCCESS;
auto attr = SET_USE_500("HashBuckDTLINT");
uint64_t tenant_id = MTL_ID();
ObMemAttr attr(tenant_id, "HashBuckDTLINT");
int64_t tenant_mem_limit = lib::get_tenant_memory_limit(tenant_id);
double mem_factor = static_cast<double>(tenant_mem_limit) / lib::get_memory_limit();
// less memory for meta tenant
if (is_meta_tenant(tenant_id)) {
mem_factor = mem_factor * 0.01;
}
if (IS_INIT) {
ret = OB_INIT_TWICE;
} else if (OB_FAIL(map_.create(BUCKET_NUM,
} else if (OB_FAIL(map_.create(static_cast<int64_t>(DEFAULT_BUCKET_NUM * mem_factor),
attr, attr))) {
LOG_WARN("create hash table failed", K(ret));
} else if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::ServerGTimer, gc_,
ObDTLIntermResultGC::REFRESH_INTERVAL, true))) {
LOG_WARN("schedule interm result gc failed", K(ret));
} else {
is_inited_ = true;
}
return ret;
}
int ObDTLIntermResultManager::mtl_init(ObDTLIntermResultManager *&dtl_interm_result_manager)
{
return dtl_interm_result_manager->init();
}
void ObDTLIntermResultManager::destroy()
{
if (IS_INIT) {
erase_tenant_interm_result_info();
map_.destroy();
}
}
void ObDTLIntermResultManager::mtl_destroy(ObDTLIntermResultManager *&dtl_interm_result_manager)
{
if (nullptr != dtl_interm_result_manager) {
dtl_interm_result_manager->destroy();
ob_delete(dtl_interm_result_manager);
dtl_interm_result_manager = nullptr;
}
}
int ObDTLIntermResultManager::get_interm_result_info(ObDTLIntermResultKey &key,
ObDTLIntermResultInfo &result_info)
{
@ -228,20 +190,17 @@ int ObDTLIntermResultManager::create_interm_result_info(ObMemAttr &attr,
const ObDTLIntermResultMonitorInfo &monitor_info)
{
int ret = OB_SUCCESS;
void *ptr = NULL;
ObDTLIntermResultInfo *result_info = NULL;
void *result_info_buf = NULL;
const int64_t size = sizeof(ObChunkDatumStore);
if (OB_ISNULL(result_info_buf =
static_cast<ObDTLIntermResultInfo *>(ob_malloc(sizeof(ObDTLIntermResultInfo), attr)))) {
void *datum_store_buf = NULL;
if (OB_ISNULL(result_info_buf = ob_malloc(sizeof(ObDTLIntermResultInfo), attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc dtl interm result info", K(ret));
} else if (OB_ISNULL(ptr = ob_malloc(size, attr))) {
} else if (OB_ISNULL(datum_store_buf = ob_malloc(sizeof(ObChunkDatumStore), attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc ob chunk row store ret", K(ret));
LOG_WARN("fail to alloc ob chunk datum store", K(ret));
} else {
result_info = new(result_info_buf) ObDTLIntermResultInfo();
result_info->datum_store_ = new(ptr) ObChunkDatumStore("DtlIntermRes");
ObDTLIntermResultInfo *result_info = new(result_info_buf) ObDTLIntermResultInfo();
result_info->datum_store_ = new(datum_store_buf) ObChunkDatumStore("DtlIntermRes");
result_info->is_read_ = false;
result_info->trace_id_ = *ObCurTraceId::get_trace_id();
result_info->monitor_info_ = monitor_info;
@ -252,8 +211,8 @@ int ObDTLIntermResultManager::create_interm_result_info(ObMemAttr &attr,
if (NULL != result_info_buf) {
ob_free(result_info_buf);
}
if (NULL != ptr) {
ob_free(ptr);
if (NULL != datum_store_buf) {
ob_free(datum_store_buf);
}
}
return ret;
@ -327,7 +286,7 @@ int ObDTLIntermResultManager::clear_timeout_result_info(ObDTLIntermResultGC &gc)
if (OB_SUCC(ret)) {
for (int i = 0; i < gc.expire_keys_.count(); ++i) {
ObDTLIntermResultKey &key = gc.expire_keys_.at(i);
if (OB_FAIL(ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) {
if (OB_FAIL(erase_interm_result_info(key))) {
LOG_WARN("fail to erase row store", K(key), K(ret));
}
}
@ -339,13 +298,10 @@ int ObDTLIntermResultManager::clear_timeout_result_info(ObDTLIntermResultGC &gc)
int ObDTLIntermResultManager::dump_result_info(ObDTLIntermResultGC &gc)
{
int ret = OB_SUCCESS;
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
gc.gc_type_ = ObDTLIntermResultGC::DUMP;
gc.tenant_guard_ = &guard;
if (OB_FAIL(map_.foreach_refactored(gc))) {
LOG_WARN("fail to get row store in result manager", K(ret));
}
gc.tenant_guard_ = NULL;
return ret;
}
@ -408,31 +364,23 @@ int ObDTLIntermResultManager::generate_monitor_info_rows(observer::ObDTLIntermRe
return ret;
}
int ObDTLIntermResultManager::erase_tenant_interm_result_info(int64_t tenant_id)
int ObDTLIntermResultManager::erase_tenant_interm_result_info()
{
int ret = OB_SUCCESS;
ObEraseTenantIntermResultInfo eraser;
eraser.tenant_id_ = tenant_id;
if (OB_FAIL(map_.foreach_refactored(eraser))) {
LOG_WARN("fail to get tenant result info in result manager", K(ret), K(tenant_id));
} else {
ret = eraser.ret_;
for (int i = 0; i < eraser.expire_keys_.count(); ++i) {
ObDTLIntermResultKey &key = eraser.expire_keys_.at(i);
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) {
if (OB_HASH_NOT_EXIST != tmp_ret) {
LOG_WARN("fail to erase result info", K(key), K(ret));
ret = tmp_ret;
}
for (auto iter = map_.begin(); iter != map_.end(); ++iter) {
ObDTLIntermResultKey &key = iter->first;
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = erase_interm_result_info(key))) {
if (OB_HASH_NOT_EXIST != tmp_ret) {
LOG_WARN("fail to erase result info", K(key), K(tmp_ret));
ret = tmp_ret;
}
}
if (eraser.expire_keys_.count() < 100) {
LOG_INFO("erase_tenant_interm_result_info", K(tenant_id), K(eraser.expire_keys_));
} else {
LOG_INFO("erase_tenant_interm_result_info", K(tenant_id), K(eraser.expire_keys_.count()));
}
}
if (OB_SUCC(ret)) {
LOG_INFO("erase_tenant_interm_result_info", K(MTL_ID()), K(map_.size()));
}
return ret;
}
@ -489,14 +437,13 @@ int ObDTLIntermResultManager::process_interm_result_inner(ObDtlLinkedBuffer &buf
{
int ret = OB_SUCCESS;
ObDTLIntermResultInfo result_info;
if (OB_FAIL(ObDTLIntermResultManager::getInstance().get_interm_result_info(key,
result_info))) {
if (OB_FAIL(get_interm_result_info(key, result_info))) {
if (OB_HASH_NOT_EXIST == ret) {
ObDTLIntermResultInfoGuard result_info_guard;
ObMemAttr attr(buffer.tenant_id(), "DtlIntermRes", common::ObCtxIds::EXECUTE_CTX_ID);
key.start_time_ = oceanbase::common::ObTimeUtility::current_time();
ret = OB_SUCCESS;
if (OB_FAIL(ObDTLIntermResultManager::getInstance().create_interm_result_info(attr,
if (OB_FAIL(create_interm_result_info(attr,
result_info_guard,
ObDTLIntermResultMonitorInfo(buffer.get_dfo_key().qc_id_,
buffer.get_dfo_id(), buffer.get_sqc_id())))) {
@ -505,7 +452,7 @@ int ObDTLIntermResultManager::process_interm_result_inner(ObDtlLinkedBuffer &buf
*result_info_guard.result_info_, init,
0, buffer.tenant_id(), common::ObCtxIds::EXECUTE_CTX_ID, "DtlIntermRes"))) {
LOG_WARN("fail to init buffer", K(ret));
} else if (OB_FAIL(ObDTLIntermResultManager::getInstance().insert_interm_result_info(key, result_info_guard.result_info_))) {
} else if (OB_FAIL(insert_interm_result_info(key, result_info_guard.result_info_))) {
LOG_WARN("fail to insert row store", K(ret));
} else {
int reg_dm_ret = ObDetectManagerUtils::single_dfo_register_check_item_into_dm(
@ -526,7 +473,7 @@ int ObDTLIntermResultManager::process_interm_result_inner(ObDtlLinkedBuffer &buf
LOG_WARN("out of index", K(ret), K(buffer.size()), K(start_pos), K(length));
} else if (append_whole_block) {
ObAtomicAppendBlockCall call(buffer.buf(), length, is_eof);
if (OB_FAIL(ObDTLIntermResultManager::getInstance().atomic_append_block(key, call))) {
if (OB_FAIL(atomic_append_block(key, call))) {
if (OB_HASH_NOT_EXIST == ret &&
oceanbase::common::ObTimeUtility::current_time() > key.time_us_) {
ret = OB_TIMEOUT;
@ -540,7 +487,7 @@ int ObDTLIntermResultManager::process_interm_result_inner(ObDtlLinkedBuffer &buf
}
} else {
ObAtomicAppendPartBlockCall call(buffer.buf(), start_pos, length, rows, is_eof);
if (OB_FAIL(ObDTLIntermResultManager::getInstance().atomic_append_part_block(key, call))) {
if (OB_FAIL(atomic_append_part_block(key, call))) {
if (OB_HASH_NOT_EXIST == ret &&
oceanbase::common::ObTimeUtility::current_time() > key.time_us_) {
ret = OB_TIMEOUT;
@ -559,7 +506,7 @@ int ObDTLIntermResultManager::process_interm_result_inner(ObDtlLinkedBuffer &buf
// 注意这里理论上也不会有并发问题,因为channel是点对点且串行发送的
// 所以这个接收到了,肯定没有其他线程给这个channel发送
// 尝试先从hash table中释放(尽早释放内存,其实也可以让timer来清理)
ObDTLIntermResultManager::getInstance().erase_interm_result_info(key);
erase_interm_result_info(key);
}
return ret;
}
@ -585,6 +532,46 @@ void ObDTLIntermResultManager::dec_interm_result_ref_count(ObDTLIntermResultInfo
}
}
void ObDTLIntermResultManager::runTimerTask()
{
int ret = OB_SUCCESS;
gc_.cur_time_ = oceanbase::common::ObTimeUtility::current_time();
gc_.expire_keys_.reset();
// Previously, it was placed inside a foreach loop,
// which means that each element would ++gc_.dump_count_.
// In this case, if the cache is a relatively large prime number,
// it may take a long time and it keeps changing,
// so it is unlikely to enter the loop, causing a big bug.
++gc_.dump_count_;
gc_.interm_cnt_ = 0;
// dump every_10_seconds && not_expired && unused row_store
if (OB_SUCC(ret)) {
if (OB_FAIL(dump_result_info(gc_))) {
LOG_WARN("fail to for each row store", K(ret));
} else {
int64_t dump_cost = oceanbase::common::ObTimeUtility::current_time() - gc_.cur_time_;
LOG_INFO("dump dtl interm result cost(us)", K(dump_cost), K(ret),
"interm count", gc_.interm_cnt_, "dump count", gc_.dump_count_);
}
}
gc_.clean_cnt_ = 0;
gc_.interm_cnt_ = 0;
gc_.cur_time_ = oceanbase::common::ObTimeUtility::current_time();
// Cleaning up expired row_store
if (OB_SUCC(ret)) {
if (OB_FAIL(clear_timeout_result_info(gc_))) {
LOG_WARN("fail to for each row store", K(ret));
} else {
int64_t clear_cost = oceanbase::common::ObTimeUtility::current_time() - gc_.cur_time_;
LOG_INFO("clear dtl interm result cost(us)", K(clear_cost), K(ret),
K(gc_.expire_keys_.count()), "dump count", gc_.dump_count_,
"interm count", gc_.interm_cnt_, "clean count", gc_.clean_cnt_);
}
}
}
void ObDTLIntermResultInfoGuard::set_result_info(ObDTLIntermResultInfo &result_info)
{
if (&result_info != result_info_) {
@ -601,3 +588,37 @@ void ObDTLIntermResultInfoGuard::reset()
result_info_ = NULL;
}
}
int ObDTLIntermResultManager::mtl_start(ObDTLIntermResultManager *&dtl_interm_result_manager)
{
int ret = OB_SUCCESS;
if (OB_LIKELY(nullptr != dtl_interm_result_manager)) {
if (OB_FAIL(TG_SCHEDULE(MTL(omt::ObSharedTimer*)->get_tg_id(), dtl_interm_result_manager->get_gc_task(),
ObDTLIntermResultGCTask::REFRESH_INTERVAL, true))) {
LOG_WARN("failed to scheduler flush all task", K(ret));
} else {
dtl_interm_result_manager->get_gc_task().disable_timeout_check();
dtl_interm_result_manager->get_gc_task().dtl_interm_result_manager_ = dtl_interm_result_manager;
}
}
return ret;
}
void ObDTLIntermResultManager::mtl_stop(ObDTLIntermResultManager *&dtl_interm_result_manager)
{
if (OB_LIKELY(nullptr != dtl_interm_result_manager)) {
TG_CANCEL_TASK(MTL(omt::ObSharedTimer*)->get_tg_id(), dtl_interm_result_manager->get_gc_task());
}
}
void ObDTLIntermResultManager::mtl_wait(ObDTLIntermResultManager *&dtl_interm_result_manager)
{
if (OB_LIKELY(nullptr != dtl_interm_result_manager)) {
TG_WAIT_TASK(MTL(omt::ObSharedTimer*)->get_tg_id(), dtl_interm_result_manager->get_gc_task());
}
}
void ObDTLIntermResultGCTask::runTimerTask()
{
dtl_interm_result_manager_->runTimerTask();
}

View File

@ -41,7 +41,7 @@ struct ObDTLIntermResultMonitorInfo
ObDTLIntermResultMonitorInfo(int64_t qc_id, int64_t dfo_id, int64_t sqc_id) :
qc_id_(qc_id), dfo_id_(dfo_id), sqc_id_(sqc_id)
{ }
TO_STRING_KV(K_(qc_id), K_(dfo_id), K_(sqc_id));
TO_STRING_KV(K_(qc_id), K_(dfo_id), K_(sqc_id));
int64_t qc_id_;
int64_t dfo_id_;
int64_t sqc_id_;
@ -50,7 +50,7 @@ struct ObDTLIntermResultMonitorInfo
class ObDtlLinkedBuffer;
struct ObDTLIntermResultKey
{
ObDTLIntermResultKey() : channel_id_(0), time_us_(0),
ObDTLIntermResultKey() : channel_id_(0), time_us_(0),
start_time_(0), batch_id_(0) {}
int64_t channel_id_;
int64_t time_us_;
@ -127,21 +127,19 @@ struct ObDTLIntermResultInfoGuard
#define DTL_IR_STORE_DO(ir, act, ...) \
((ir).datum_store_->act(__VA_ARGS__))
class ObDTLIntermResultGC : public common::ObTimerTask
class ObDTLIntermResultGC
{
friend class ObDTLIntermResultManager;
public:
ObDTLIntermResultGC()
: cur_time_(0), expire_keys_(), gc_type_(NOT_INIT), dump_count_(0),
interm_cnt_(0), clean_cnt_(0), tenant_guard_(nullptr)
interm_cnt_(0), clean_cnt_(0)
{}
virtual ~ObDTLIntermResultGC() = default;
void reset();
int operator() (common::hash::HashMapPair<ObDTLIntermResultKey,
ObDTLIntermResultInfo *> &entry);
void runTimerTask();
public:
const static int64_t REFRESH_INTERVAL = 10 * 1000L * 1000L; // 10秒间隔
const static int64_t DUMP_TIME_THRESHOLD = 10 * 1000L * 1000L; // 超过10秒dump
const static int64_t CLEAR_TIME_THRESHOLD = 10 * 1000L * 1000L; // 超过10秒清理
public:
@ -159,10 +157,8 @@ private:
int64_t dump_count_;
int64_t interm_cnt_;
int64_t clean_cnt_;
share::ObTenantSwitchGuard *tenant_guard_;
};
class ObAtomicGetIntermResultInfoCall
{
public:
@ -210,26 +206,22 @@ public:
bool is_eof_;
};
class ObEraseTenantIntermResultInfo
class ObDTLIntermResultGCTask : public common::ObTimerTask
{
public:
ObEraseTenantIntermResultInfo() : tenant_id_(OB_INVALID_ID), expire_keys_(), ret_(common::OB_SUCCESS) {}
~ObEraseTenantIntermResultInfo() = default;
int operator() (common::hash::HashMapPair<ObDTLIntermResultKey,
ObDTLIntermResultInfo *> &entry);
public:
uint64_t tenant_id_;
common::ObSEArray<ObDTLIntermResultKey, 64> expire_keys_;
int ret_;
ObDTLIntermResultGCTask() : dtl_interm_result_manager_(NULL) {}
virtual ~ObDTLIntermResultGCTask() {}
virtual void runTimerTask() override;
const static int64_t REFRESH_INTERVAL = 10 * 1000L * 1000L; // 10秒间隔
ObDTLIntermResultManager *dtl_interm_result_manager_;
};
class ObDTLIntermResultManager
{
friend class ObDTLIntermResultGC;
friend class ObDTLIntermResultGCTask;
public:
static ObDTLIntermResultManager &getInstance();
static int process_interm_result(ObDtlLinkedBuffer *buffer, int64_t channel_id);
static int process_interm_result_inner(ObDtlLinkedBuffer &buffer,
int process_interm_result(ObDtlLinkedBuffer *buffer, int64_t channel_id);
int process_interm_result_inner(ObDtlLinkedBuffer &buffer,
ObDTLIntermResultKey &key,
int64_t start_pos,
int64_t length,
@ -252,24 +244,34 @@ public:
int atomic_append_block(ObDTLIntermResultKey &key, ObAtomicAppendBlockCall &call);
int atomic_append_part_block(ObDTLIntermResultKey &key, ObAtomicAppendPartBlockCall &call);
int init();
static int mtl_init(ObDTLIntermResultManager* &dtl_interm_result_manager);
void destroy();
static void mtl_destroy(ObDTLIntermResultManager *&dtl_interm_result_manager);
int generate_monitor_info_rows(observer::ObDTLIntermResultMonitorInfoGetter &monitor_info_getter);
int erase_tenant_interm_result_info(int64_t tenant_id);
int erase_tenant_interm_result_info();
static void free_interm_result_info_store(ObDTLIntermResultInfo *result_info);
static void free_interm_result_info(ObDTLIntermResultInfo *result_info);
static void inc_interm_result_ref_count(ObDTLIntermResultInfo *result_info);
static void dec_interm_result_ref_count(ObDTLIntermResultInfo *&result_info);
void runTimerTask();
static int mtl_start(ObDTLIntermResultManager *&dtl_interm_result_manager);
static void mtl_stop(ObDTLIntermResultManager *&dtl_interm_result_manager);
static void mtl_wait(ObDTLIntermResultManager *&dtl_interm_result_manager);
ObDTLIntermResultGCTask &get_gc_task() { return gc_task_; }
ObDTLIntermResultManager();
~ObDTLIntermResultManager();
private:
// 由于此中间结果管理器是全局结构, 基于性能考虑,减少锁冲突设置bucket_num为50w.
static const int64_t BUCKET_NUM = 500000; //50w
// 由于此中间结果管理器是全局结构, 基于性能考虑, 减少锁冲突设置bucket_num为50w.
static const int64_t DEFAULT_BUCKET_NUM = 500000; //50w
static const int64_t MAX_TENANT_MEM_LIMIT = 17179869184; //16G
private:
MAP map_;
bool is_inited_;
int64_t dir_id_;
ObDTLIntermResultGC gc_;
ObDTLIntermResultGCTask gc_task_;
private:
ObDTLIntermResultManager();
~ObDTLIntermResultManager();
DISALLOW_COPY_AND_ASSIGN(ObDTLIntermResultManager);
};

View File

@ -81,8 +81,10 @@ int ObDtlLocalChannel::send_shared_message(ObDtlLinkedBuffer *&buf)
is_first = buf->is_data_msg() && 1 == buf->seq_no();
is_eof = buf->is_eof();
if (buf->is_data_msg() && buf->use_interm_result()) {
if (OB_FAIL(ObDTLIntermResultManager::process_interm_result(buf, peer_id_))) {
LOG_WARN("fail to process internal result", K(ret));
MTL_SWITCH(buf->tenant_id()) {
if (OB_FAIL(MTL(ObDTLIntermResultManager*)->process_interm_result(buf, peer_id_))) {
LOG_WARN("fail to process internal result", K(ret));
}
}
} else if (OB_FAIL(DTL.get_channel(peer_id_, chan))) {
int tmp_ret = ret;

View File

@ -42,8 +42,10 @@ int ObDtlSendMessageP::process_msg(ObDtlRpcDataResponse &response, ObDtlSendArgs
ObDtlChannel *chan = nullptr;
response.is_block_ = false;
if (arg.buffer_.is_data_msg() && arg.buffer_.use_interm_result()) {
if (OB_FAIL(ObDTLIntermResultManager::process_interm_result(&arg.buffer_, arg.chid_))) {
LOG_WARN("fail to process internal result", K(ret));
MTL_SWITCH(arg.buffer_.tenant_id()) {
if (OB_FAIL(MTL(ObDTLIntermResultManager*)->process_interm_result(&arg.buffer_, arg.chid_))) {
LOG_WARN("fail to process internal result", K(ret));
}
}
} else if (OB_FAIL(DTL.get_channel(arg.chid_, chan))) {
int tmp_ret = ret;
@ -164,7 +166,7 @@ int ObDtlSendMessageP::process_px_bloom_filter_data(ObDtlLinkedBuffer *&buffer)
ObPxBloomFilter *filter = NULL;
if (OB_FAIL(ObDtlLinkedBuffer::deserialize_msg_header(*buffer, header))) {
LOG_WARN("fail to decode header of buffer", K(ret));
}
}
if (OB_SUCC(ret)) {
const char *buf = buffer->buf();
int64_t size = buffer->size();
@ -172,13 +174,13 @@ int ObDtlSendMessageP::process_px_bloom_filter_data(ObDtlLinkedBuffer *&buffer)
if (OB_FAIL(common::serialization::decode(buf, size, pos, bf_data))) {
LOG_WARN("fail to decode bloom filter data", K(ret));
} else {
ObPXBloomFilterHashWrapper bf_key(bf_data.tenant_id_, bf_data.filter_id_,
ObPXBloomFilterHashWrapper bf_key(bf_data.tenant_id_, bf_data.filter_id_,
bf_data.server_id_, bf_data.px_sequence_id_, 0/*task_id*/);
if (OB_FAIL(ObPxBloomFilterManager::instance().get_px_bf_for_merge_filter(
bf_key, filter))) {
LOG_WARN("fail to get px bloom filter", K(ret));
}
// get_px_bf_for_merge_filter只有在成功后会增加filter的引用计数
// get_px_bf_for_merge_filter只有在成功后会增加filter的引用计数
if (OB_SUCC(ret) && OB_NOT_NULL(filter)) {
if (OB_FAIL(filter->merge_filter(&bf_data.filter_))) {
LOG_WARN("fail to merge filter", K(ret));
@ -187,7 +189,7 @@ int ObDtlSendMessageP::process_px_bloom_filter_data(ObDtlLinkedBuffer *&buffer)
}
// merge以及process操作完成之后, 需要减少其引用计数.
(void)filter->dec_merge_filter_count();
}
}
}
}
}

View File

@ -365,7 +365,7 @@ int ObTempTableAccessOp::locate_interm_result(int64_t result_id)
// The current operation of obtaining intermediate results and
// the operation of the background thread of dumping intermediate results
// are mutually exclusive
if (OB_FAIL(dtl::ObDTLIntermResultManager::getInstance().atomic_get_interm_result_info(
if (OB_FAIL(MTL(dtl::ObDTLIntermResultManager*)->atomic_get_interm_result_info(
dtl_int_key, result_info_guard_))) {
LOG_WARN("failed to create row store.", K(ret));
} else if (FALSE_IT(result_info = result_info_guard_.result_info_)) {

View File

@ -260,7 +260,7 @@ int ObTempTableInsertOp::init_chunk_row_store(ObDTLIntermResultInfo *&chunk_row_
uint64_t tenant_id = ctx_.get_my_session()->get_effective_tenant_id();
ObMemAttr mem_attr(tenant_id, "TempTableInsert", ObCtxIds::WORK_AREA);
dtl::ObDTLIntermResultInfoGuard result_info_guard;
if (OB_FAIL(dtl::ObDTLIntermResultManager::getInstance().create_interm_result_info(
if (OB_FAIL(MTL(dtl::ObDTLIntermResultManager*)->create_interm_result_info(
mem_attr,
result_info_guard,
dtl::ObDTLIntermResultMonitorInfo(
@ -305,11 +305,11 @@ int ObTempTableInsertOp::insert_chunk_row_store()
if (OB_ISNULL(phy_plan_ctx = GET_PHY_PLAN_CTX(ctx_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("phy_plan_ctx is NULL", K(ret));
} else if (!MY_SPEC.is_distributed_ &&
} else if (!MY_SPEC.is_distributed_ &&
all_datum_store_.empty() &&
OB_FAIL(init_chunk_row_store(chunk_row_store))) {
//local temp table需要一个空的row store占位
LOG_WARN("failed to init chunk row store", K(ret));
LOG_WARN("failed to init chunk row store", K(ret));
} else if (!MY_SPEC.is_distributed_ && all_datum_store_.count() != 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("local temp table shoud have one chunk row store", K(ret));
@ -334,12 +334,12 @@ int ObTempTableInsertOp::insert_chunk_row_store()
row_store->set_eof(true);
//chunk row store不需要管理dump逻辑
row_store->is_read_ = true;
if (OB_FAIL(dtl::ObDTLIntermResultManager::getInstance().insert_interm_result_info(
if (OB_FAIL(MTL(dtl::ObDTLIntermResultManager*)->insert_interm_result_info(
dtl_int_key, row_store))) {
LOG_WARN("failed to insert row store.", K(ret), K(dtl_int_key.channel_id_));
} else if (OB_FAIL(keys_insert.push_back(dtl_int_key))) {
LOG_WARN("failed to push back key", K(ret));
dtl::ObDTLIntermResultManager::getInstance().erase_interm_result_info(dtl_int_key);
MTL(dtl::ObDTLIntermResultManager*)->erase_interm_result_info(dtl_int_key);
} else {
row_store->datum_store_->reset_callback();
ObPxSqcHandler *handler = ctx_.get_sqc_handler();
@ -361,7 +361,7 @@ int ObTempTableInsertOp::insert_chunk_row_store()
if (OB_FAIL(ret)) {
//异常处理
for (int64_t i = 0; i < keys_insert.count(); ++i) {
dtl::ObDTLIntermResultManager::getInstance().erase_interm_result_info(keys_insert.at(i));
MTL(dtl::ObDTLIntermResultManager*)->erase_interm_result_info(keys_insert.at(i));
}
} else {
clear_all_datum_store();

View File

@ -240,7 +240,7 @@ int ObTempTableTransformationOp::destory_local_interm_results(ObIArray<uint64_t>
LOG_TRACE("destory interm results", K(get_exec_ctx().get_addr()), K(result_ids));
for (int64_t i = 0; OB_SUCC(ret) && i < result_ids.count(); ++i) {
dtl_int_key.channel_id_ = result_ids.at(i);
if (OB_FAIL(dtl::ObDTLIntermResultManager::getInstance().erase_interm_result_info(
if (OB_FAIL(MTL(dtl::ObDTLIntermResultManager*)->erase_interm_result_info(
dtl_int_key))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;

View File

@ -382,7 +382,7 @@ int ObPxReceiveOp::inner_rescan()
channel->reset_state();
channel->set_batch_id(ctx_.get_px_batch_id());
channel->reset_px_row_iterator();
release_channel_ret = ObDTLIntermResultManager::getInstance().erase_interm_result_info(key);
release_channel_ret = MTL(ObDTLIntermResultManager*)->erase_interm_result_info(key);
if (release_channel_ret != common::OB_SUCCESS) {
LOG_WARN("fail to release recieve internal result", KR(release_channel_ret), K(ret));
}
@ -541,7 +541,7 @@ int ObPxReceiveOp::erase_dtl_interm_result()
for (int64_t batch_id = ctx_.get_px_batch_id();
batch_id < PX_RESCAN_BATCH_ROW_COUNT && OB_SUCC(ret); batch_id++) {
key.batch_id_ = batch_id;
if (OB_FAIL(ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) {
if (OB_FAIL(MTL(ObDTLIntermResultManager*)->erase_interm_result_info(key))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;
break;

View File

@ -1121,7 +1121,7 @@ int ObPxCoordOp::erase_dtl_interm_result()
key.channel_id_ = ci.chid_;
for (int j = 0; j < last_px_batch_rescan_size_; ++j) {
key.batch_id_ = j;
if (OB_FAIL(ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) {
if (OB_FAIL(MTL(ObDTLIntermResultManager*)->erase_interm_result_info(key))) {
LOG_TRACE("fail to release recieve internal result", K(ret));
}
}

View File

@ -660,7 +660,7 @@ int ObPxCleanDtlIntermResP::process()
key.channel_id_ = ch_set.get_ch_info_set().at(ch_idx).chid_;
for (int64_t batch_id = 0; batch_id < batch_size && OB_SUCC(ret); batch_id++) {
key.batch_id_= batch_id;
if (OB_FAIL(dtl::ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) {
if (OB_FAIL(MTL(dtl::ObDTLIntermResultManager*)->erase_interm_result_info(key))) {
if (OB_HASH_NOT_EXIST == ret) {
// interm result is written from batch_id = 0 to batch_size,
// if some errors happen when batch_id = i, no interm result of batch_id > i will be written.

View File

@ -68,11 +68,10 @@ int ObRpcEraseIntermResultP::process()
int ret = OB_SUCCESS;
LOG_TRACE("receive erase interm result request", K(arg_));
dtl::ObDTLIntermResultKey dtl_int_key;
dtl::ObDTLIntermResultManager &mgr = dtl::ObDTLIntermResultManager::getInstance();
ObIArray<uint64_t> &interm_result_ids = arg_.interm_result_ids_;
for (int64_t i = 0; OB_SUCC(ret) && i < interm_result_ids.count(); ++i) {
dtl_int_key.channel_id_ = interm_result_ids.at(i);
if (OB_FAIL(mgr.erase_interm_result_info(dtl_int_key))) {
if (OB_FAIL(MTL(dtl::ObDTLIntermResultManager*)->erase_interm_result_info(dtl_int_key))) {
LOG_WARN("failed to erase interm result info in manager.", K(ret));
}
}