[FEAT MERGE] das framework optimization.

This commit is contained in:
rolandqi
2023-01-10 12:49:51 +00:00
committed by ob-robot
parent 82426bb509
commit 31171bb553
31 changed files with 1497 additions and 275 deletions

View File

@ -570,9 +570,11 @@ DEF_NAME(id, "id")
DEF_NAME_PAIR(get_das_id, "get_das_id")
DEF_NAME_PAIR(do_local_das_task, "do_local_das_task")
DEF_NAME_PAIR(do_remote_das_task, "do_remote_das_task")
DEF_NAME_PAIR(do_async_remote_das_task, "do_async_remote_das_task")
DEF_NAME_PAIR(do_sync_remote_das_task, "do_sync_remote_das_task")
DEF_NAME_PAIR(rescan_das_task, "rescan_das_task")
DEF_NAME_PAIR(das_rpc_process, "das_rpc_process")
DEF_NAME_PAIR(das_async_rpc_process, "das_async_rpc_process")
DEF_NAME_PAIR(das_sync_rpc_process, "das_sync_rpc_process")
DEF_NAME_PAIR(close_das_task, "close_das_task")
DEF_NAME_PAIR(fetch_das_extra_result, "fetch_das_extra_result")
DEF_NAME_PAIR(fetch_das_result_process, "fetch_das_result_process")

View File

@ -75,8 +75,10 @@ FLT_DEF_SPAN(com_query_process, "com_query process")
// for das
FLT_DEF_SPAN(get_das_id, "fetch das task id")
FLT_DEF_SPAN(do_local_das_task, "execute local das task")
FLT_DEF_SPAN(do_remote_das_task, "execute remote das task")
FLT_DEF_SPAN(das_rpc_process, "das task rpc process")
FLT_DEF_SPAN(do_async_remote_das_task, "execute async remote das task")
FLT_DEF_SPAN(das_async_rpc_process, "das task async rpc process")
FLT_DEF_SPAN(do_sync_remote_das_task, "execute sync remote das task")
FLT_DEF_SPAN(das_sync_rpc_process, "das task sync rpc process")
FLT_DEF_SPAN(rescan_das_task, "rescan das task")
FLT_DEF_SPAN(close_das_task, "close das task")
FLT_DEF_SPAN(fetch_das_extra_result, "fetch das extra result")

View File

@ -206,4 +206,5 @@ void oceanbase::observer::init_srv_xlator_for_executor(ObSrvRpcXlator *xlator) {
RPC_PROCESSOR(ObDASSyncFetchP);
RPC_PROCESSOR(ObDASAsyncEraseP);
RPC_PROCESSOR(ObRpcEraseIntermResultP, gctx_);
RPC_PROCESSOR(ObDASAsyncAccessP, gctx_);
}

View File

@ -1899,6 +1899,25 @@ void ObTenant::check_dtl()
}
}
void ObTenant::check_das()
{
int ret = OB_SUCCESS;
if (!is_virtual_tenant_id(id_)) {
ObTenantSwitchGuard guard(this);
if (OB_ISNULL(MTL(ObDataAccessService *))) {
LOG_WARN("failed to get das ptr", K(MTL_ID()));
} else {
double min_cpu = .0;
double max_cpu = .0;
if (OB_FAIL(GCTX.omt_->get_tenant_cpu(MTL_ID(), min_cpu, max_cpu))) {
LOG_WARN("failed to set das task max concurrency", K(MTL_ID()));
} else {
MTL(ObDataAccessService *)->set_max_concurrency(min_cpu);
}
}
}
}
void ObTenant::check_parallel_servers_target()
{
int ret = OB_SUCCESS;

View File

@ -581,6 +581,7 @@ private:
void check_resource_manager_plan();
// clean buffer on time
void check_dtl();
void check_das();
int construct_mtl_init_ctx(const ObTenantMeta &meta, share::ObTenantModuleInitCtx *&ctx);

View File

@ -163,7 +163,7 @@ int ObDASBatchScanOp::decode_task_result(ObIDASTaskResult *task_result)
return ret;
}
int ObDASBatchScanOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more)
int ObDASBatchScanOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit)
{
int ret = OB_SUCCESS;
DASExpandIterator *expand_iter = nullptr;
@ -174,7 +174,7 @@ int ObDASBatchScanOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_
} else {
result_ = expand_iter;
result_outputs_ = &(expand_iter->get_output_exprs());
if (OB_FAIL(ObDASScanOp::fill_task_result(task_result, has_more))) {
if (OB_FAIL(ObDASScanOp::fill_task_result(task_result, has_more, memory_limit))) {
LOG_WARN("fill task result failed", K(ret));
}
}

View File

@ -60,6 +60,13 @@ const int64_t OB_DAS_MAX_PACKET_SIZE = 2 * 1024 * 1024l - 8 * 1024;
const int64_t OB_DAS_MAX_TOTAL_PACKET_SIZE = 3 * OB_DAS_MAX_PACKET_SIZE;
} // namespace das
enum class ObDasTaskStatus: uint8_t
{
UNSTART = 0,
FAILED,
FINISHED
};
enum ObDASOpType
{
//can not adjust the order of DASOpType, append OpType at the last

View File

@ -66,7 +66,9 @@ int ObDASIndexDMLAdaptor<DAS_OP_TABLE_DELETE, ObDASDMLIterator>::write_rows(cons
ObDASDeleteOp::ObDASDeleteOp(ObIAllocator &op_alloc)
: ObIDASTaskOp(op_alloc),
del_ctdef_(nullptr),
del_rtdef_(nullptr)
del_rtdef_(nullptr),
write_buffer_(),
affected_rows_(0)
{
}
@ -92,6 +94,7 @@ int ObDASDeleteOp::open_op()
}
} else {
del_rtdef_->affected_rows_ += affected_rows;
affected_rows_ = affected_rows;
}
return ret;
}
@ -116,15 +119,16 @@ int ObDASDeleteOp::decode_task_result(ObIDASTaskResult *task_result)
return ret;
}
int ObDASDeleteOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more)
int ObDASDeleteOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit)
{
int ret = OB_SUCCESS;
UNUSED(memory_limit);
#if !defined(NDEBUG)
CK(typeid(task_result) == typeid(ObDASDeleteResult));
#endif
if (OB_SUCC(ret)) {
ObDASDeleteResult &del_result = static_cast<ObDASDeleteResult&>(task_result);
del_result.set_affected_rows(del_rtdef_->affected_rows_);
del_result.set_affected_rows(affected_rows_);
has_more = false;
}
return ret;
@ -179,9 +183,10 @@ ObDASDeleteResult::~ObDASDeleteResult()
{
}
int ObDASDeleteResult::init(const ObIDASTaskOp &op)
int ObDASDeleteResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc)
{
UNUSED(op);
UNUSED(alloc);
return OB_SUCCESS;
}

View File

@ -30,7 +30,7 @@ public:
virtual int open_op() override;
virtual int release_op() override;
virtual int decode_task_result(ObIDASTaskResult *task_result) override;
virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more) override;
virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override;
virtual int init_task_info() override;
virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override;
virtual const ObDASBaseCtDef *get_ctdef() const override { return del_ctdef_; }
@ -52,6 +52,7 @@ private:
const ObDASDelCtDef *del_ctdef_;
ObDASDelRtDef *del_rtdef_;
ObDASWriteBuffer write_buffer_;
int64_t affected_rows_; // local execute result, no need to serialize
};
class ObDASDeleteResult : public ObIDASTaskResult
@ -60,7 +61,7 @@ class ObDASDeleteResult : public ObIDASTaskResult
public:
ObDASDeleteResult();
virtual ~ObDASDeleteResult();
virtual int init(const ObIDASTaskOp &op) override;
virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override;
int64_t get_affected_rows() const { return affected_rows_; }
void set_affected_rows(int64_t affected_rows) { affected_rows_ = affected_rows; }
INHERIT_TO_STRING_KV("ObIDASTaskResult", ObIDASTaskResult,

View File

@ -20,6 +20,8 @@
#include "sql/das/ob_das_lock_op.h"
#include "sql/das/ob_das_extra_data.h"
#include "sql/das/ob_das_def_reg.h"
#include "sql/das/ob_das_rpc_processor.h"
#include "sql/das/ob_das_ref.h"
#include "share/datum/ob_datum_util.h"
#define STORE_DAS_OBJ(obj_store, das_obj, class_name) \
@ -180,6 +182,7 @@ ObDASTaskFactory::ObDASTaskFactory(ObIAllocator &allocator)
: das_op_store_(allocator),
das_result_store_(allocator),
das_extra_data_store_(allocator),
das_async_cb_store_(allocator),
ctdef_store_(allocator),
rtdef_store_(allocator),
allocator_(allocator)
@ -274,6 +277,28 @@ int ObDASTaskFactory::create_das_extra_data(ObDASExtraData *&extra_result)
return ret;
}
int ObDASTaskFactory::create_das_async_cb(
const common::ObSEArray<ObIDASTaskOp *, 2> &task_ops, const ObMemAttr &attr,
ObDASRef &das_ref, ObRpcDasAsyncAccessCallBack *&async_cb) {
int ret = OB_SUCCESS;
void *buffer = nullptr;
ObDasAsyncRpcCallBackContext *context = nullptr;
if (OB_ISNULL(buffer = allocator_.alloc(sizeof(ObDasAsyncRpcCallBackContext)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate das async cb context memory", K(ret), K(sizeof(ObDasAsyncRpcCallBackContext)));
} else if (FALSE_IT(context = new (buffer) ObDasAsyncRpcCallBackContext(das_ref, task_ops))) {
} else if (OB_FAIL(context->init(attr))) {
LOG_WARN("fail to init das async cb context", K(ret));
} else if (OB_ISNULL(buffer = allocator_.alloc(sizeof(ObRpcDasAsyncAccessCallBack)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate das async cb memory", K(ret), K(sizeof(ObRpcDasAsyncAccessCallBack)));
} else {
async_cb = new (buffer) ObRpcDasAsyncAccessCallBack(context);
STORE_DAS_OBJ(das_async_cb_store_, async_cb, ObRpcDasAsyncAccessCallBack);
}
return ret;
}
void ObDASTaskFactory::cleanup()
{
if (OB_LIKELY(!das_op_store_.empty())) {
@ -312,6 +337,19 @@ void ObDASTaskFactory::cleanup()
}
das_extra_data_store_.clear();
}
if (OB_UNLIKELY(!das_async_cb_store_.empty())) {
for (DasAsyncCallBackIter async_cb_iter = das_async_cb_store_.begin();
!async_cb_iter.is_end();
++async_cb_iter) {
ObRpcDasAsyncAccessCallBack *async_cb = async_cb_iter.get_item();
if (async_cb != nullptr) {
async_cb->get_async_cb_context()->~ObDasAsyncRpcCallBackContext();
async_cb->~ObRpcDasAsyncAccessCallBack();
async_cb = nullptr;
}
}
das_async_cb_store_.clear();
}
if (OB_UNLIKELY(!ctdef_store_.empty())) {
for (DasCtDefIter ctdef_iter = ctdef_store_.begin();
!ctdef_iter.is_end();

View File

@ -23,6 +23,8 @@ class ObIDASTaskResult;
class ObIDASTaskOp;
class ObIDASTaskResult;
class ObDASTaskArg;
class ObRpcDasAsyncAccessCallBack;
class ObDASRef;
class ObDASTaskFactory
{
@ -36,6 +38,9 @@ public:
int create_das_extra_data(ObDASExtraData *&extra_result);
int create_das_ctdef(ObDASOpType op_type, ObDASBaseCtDef *&ctdef);
int create_das_rtdef(ObDASOpType op_type, ObDASBaseRtDef *&rtdef);
int create_das_async_cb(const common::ObSEArray<ObIDASTaskOp *, 2> &task_ops,
const ObMemAttr &attr, ObDASRef &das_ref,
ObRpcDasAsyncAccessCallBack *&async_cb);
static int create_das_ctdef(ObDASOpType op_type, common::ObIAllocator &alloc, ObDASBaseCtDef *&ctdef);
static int create_das_rtdef(ObDASOpType op_type, common::ObIAllocator &alloc, ObDASBaseRtDef *&rtdef);
template <typename CtDef>
@ -62,6 +67,8 @@ private:
typedef DasResultStore::Iterator DasResultIter;
typedef common::ObObjStore<ObDASExtraData*, common::ObIAllocator&> DasExtraDataStore;
typedef DasExtraDataStore::Iterator DasExtraDataIter;
typedef common::ObObjStore<ObRpcDasAsyncAccessCallBack*, common::ObIAllocator&> DasAsyncCallBackStore;
typedef DasAsyncCallBackStore::Iterator DasAsyncCallBackIter;
typedef common::ObObjStore<ObDASBaseCtDef*, common::ObIAllocator&> DasCtDefStore;
typedef DasCtDefStore::Iterator DasCtDefIter;
typedef common::ObObjStore<ObDASBaseRtDef*, common::ObIAllocator&> DasRtDefStore;
@ -70,6 +77,7 @@ private:
DasOpStore das_op_store_;
DasResultStore das_result_store_;
DasExtraDataStore das_extra_data_store_;
DasAsyncCallBackStore das_async_cb_store_;
DasCtDefStore ctdef_store_;
DasRtDefStore rtdef_store_;
common::ObIAllocator &allocator_;

View File

@ -168,7 +168,7 @@ ObNewRowIterator *ObDASGroupScanOp::get_storage_scan_iter()
return iter;
}
int ObDASGroupScanOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more)
int ObDASGroupScanOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit)
{
int ret = OB_SUCCESS;
if (NULL == group_lookup_op_) {
@ -177,7 +177,7 @@ int ObDASGroupScanOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_
result_iter_ = group_lookup_op_;
set_is_exec_remote(true);
}
if (OB_FAIL(ObDASScanOp::fill_task_result(task_result, has_more))) {
if (OB_FAIL(ObDASScanOp::fill_task_result(task_result, has_more, memory_limit))) {
LOG_WARN("fail to fill task result", K(ret));
}

View File

@ -73,7 +73,7 @@ public:
ObNewRowIterator *get_storage_scan_iter() override;
int do_local_index_lookup() override;
int decode_task_result(ObIDASTaskResult *task_result) override;
int fill_task_result(ObIDASTaskResult &task_result, bool &has_more) override;
int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override;
void set_is_exec_remote(bool v) { is_exec_remote_ = v; }
virtual bool need_all_output() override { return is_exec_remote_; }
TO_STRING_KV(K(iter_), KP(group_lookup_op_), K(group_size_), K(cur_group_idx_));

View File

@ -70,7 +70,9 @@ ObDASInsertOp::ObDASInsertOp(ObIAllocator &op_alloc)
: ObIDASTaskOp(op_alloc),
ins_ctdef_(nullptr),
ins_rtdef_(nullptr),
result_(nullptr)
result_(nullptr),
affected_rows_(0),
is_duplicated_(false)
{
}
@ -108,6 +110,7 @@ int ObDASInsertOp::insert_rows()
}
} else {
ins_rtdef_->affected_rows_ += affected_rows;
affected_rows_ = affected_rows;
}
return ret;
}
@ -161,6 +164,7 @@ int ObDASInsertOp::insert_row_with_fetch()
} else {
LOG_DEBUG("insert one row and conflicted", KPC(insert_row));
ins_rtdef_->is_duplicated_ = true;
is_duplicated_ = true;
}
}
}
@ -216,6 +220,7 @@ int ObDASInsertOp::insert_row_with_fetch()
} else {
LOG_DEBUG("insert one row and conflicted", KPC(insert_row));
ins_rtdef_->is_duplicated_ = true;
is_duplicated_ = true;
}
} else {
// 需要释放iter的内存, 否则会内存泄漏
@ -291,18 +296,18 @@ int ObDASInsertOp::decode_task_result(ObIDASTaskResult *task_result)
} else {
result_ = insert_result;
ins_rtdef_->affected_rows_ += insert_result->get_affected_rows();
ins_rtdef_->is_duplicated_ = ins_rtdef_->is_duplicated_ || insert_result->is_duplicated();
ins_rtdef_->is_duplicated_ |= insert_result->is_duplicated();
}
} else {
result_ = insert_result;
ins_rtdef_->affected_rows_ += insert_result->get_affected_rows();
ins_rtdef_->is_duplicated_ = ins_rtdef_->is_duplicated_ || insert_result->is_duplicated();
ins_rtdef_->is_duplicated_ |= insert_result->is_duplicated();
}
}
return ret;
}
int ObDASInsertOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more)
int ObDASInsertOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit)
{
int ret = OB_SUCCESS;
#if !defined(NDEBUG)
@ -316,12 +321,13 @@ int ObDASInsertOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_mor
if (OB_FAIL(store_conflict_row(ins_result))) {
LOG_WARN("fail to fetch conflict row", K(ret));
} else {
ins_result.set_affected_rows(ins_rtdef_->affected_rows_);
ins_result.set_is_duplicated(ins_rtdef_->is_duplicated_);
ins_result.set_affected_rows(affected_rows_);
ins_result.set_is_duplicated(is_duplicated_);
has_more = false;
memory_limit -= ins_result.get_result_buffer().get_mem_used();
}
} else {
ins_result.set_affected_rows(ins_rtdef_->affected_rows_);
ins_result.set_affected_rows(affected_rows_);
has_more = false;
}
}
@ -387,7 +393,9 @@ int ObDASInsertResult::get_next_row(ObNewRow *&row)
int ret = OB_SUCCESS;
ObNewRow *result_row = NULL;
if (OB_FAIL(result_newrow_iter_.get_next_row(result_row))) {
if (ret != OB_ITER_END) {
LOG_WARN("get next row from result iter failed", K(ret));
}
} else {
row = result_row;
}
@ -431,7 +439,7 @@ void ObDASInsertResult::reset()
output_types_ = nullptr;
}
int ObDASInsertResult::init(const ObIDASTaskOp &op)
int ObDASInsertResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc)
{
int ret = OB_SUCCESS;
const ObDASInsertOp &ins_op = static_cast<const ObDASInsertOp&>(op);
@ -445,7 +453,7 @@ int ObDASInsertResult::init(const ObIDASTaskOp &op)
tenant_id = MTL_ID();
// replace和insert_up拉回冲突数据的das_write_buff暂时不需要带pay_load
if (!result_buffer_.is_inited()
&& OB_FAIL(result_buffer_.init(ins_op.op_alloc_, 0 /*row_extend_size*/, tenant_id, "DASInsRsultBuffer"))) {
&& OB_FAIL(result_buffer_.init(alloc, 0 /*row_extend_size*/, tenant_id, "DASInsRsultBuffer"))) {
LOG_WARN("init result buffer failed", K(ret));
}
}

View File

@ -33,7 +33,7 @@ public:
virtual int open_op() override;
virtual int release_op() override;
virtual int decode_task_result(ObIDASTaskResult *task_result) override;
virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more) override;
virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override;
virtual int init_task_info() override;
virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override;
virtual const ObDASBaseCtDef *get_ctdef() const override { return ins_ctdef_; }
@ -65,6 +65,8 @@ private:
ObDASInsRtDef *ins_rtdef_;
ObDASWriteBuffer insert_buffer_;
common::ObNewRowIterator *result_;
int64_t affected_rows_; // local execute result, no need to serialize
bool is_duplicated_; // local execute result, no need to serialize
};
typedef common::ObList<ObNewRowIterator *, common::ObIAllocator> ObDuplicatedIterList;
@ -101,7 +103,7 @@ class ObDASInsertResult : public ObIDASTaskResult, public common::ObNewRowIterat
public:
ObDASInsertResult();
virtual ~ObDASInsertResult();
virtual int init(const ObIDASTaskOp &op) override;
virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override;
virtual int get_next_row(ObNewRow *&row) override;
virtual int get_next_row() override;
virtual int get_next_rows(int64_t &count, int64_t capacity) override;

View File

@ -41,7 +41,9 @@ namespace sql
ObDASLockOp::ObDASLockOp(ObIAllocator &op_alloc)
: ObIDASTaskOp(op_alloc),
lock_ctdef_(nullptr),
lock_rtdef_(nullptr)
lock_rtdef_(nullptr),
lock_buffer_(),
affected_rows_(0)
{
}
@ -68,6 +70,7 @@ int ObDASLockOp::open_op()
}
} else {
lock_rtdef_->affected_rows_ += affected_rows;
affected_rows_ = affected_rows;
}
return ret;
}
@ -92,15 +95,16 @@ int ObDASLockOp::decode_task_result(ObIDASTaskResult *task_result)
return ret;
}
int ObDASLockOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more)
int ObDASLockOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit)
{
int ret = OB_SUCCESS;
UNUSED(memory_limit);
#if !defined(NDEBUG)
CK(typeid(task_result) == typeid(ObDASLockResult));
#endif
if (OB_SUCC(ret)) {
ObDASLockResult &lock_result = static_cast<ObDASLockResult&>(task_result);
lock_result.set_affected_rows(lock_rtdef_->affected_rows_);
lock_result.set_affected_rows(affected_rows_);
has_more = false;
}
return ret;
@ -158,9 +162,10 @@ ObDASLockResult::~ObDASLockResult()
{
}
int ObDASLockResult::init(const ObIDASTaskOp &op)
int ObDASLockResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc)
{
UNUSED(op);
UNUSED(alloc);
return OB_SUCCESS;
}

View File

@ -30,7 +30,7 @@ public:
virtual int open_op() override;
virtual int release_op() override;
virtual int decode_task_result(ObIDASTaskResult *task_result) override;
virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more) override;
virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override;
virtual int init_task_info() override;
virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override;
virtual const ObDASBaseCtDef *get_ctdef() const override { return lock_ctdef_; }
@ -52,6 +52,7 @@ private:
const ObDASLockCtDef *lock_ctdef_;
ObDASLockRtDef *lock_rtdef_;
ObDASWriteBuffer lock_buffer_;
int64_t affected_rows_; // local execute result, no need to serialize
};
class ObDASLockResult : public ObIDASTaskResult
@ -60,7 +61,7 @@ class ObDASLockResult : public ObIDASTaskResult
public:
ObDASLockResult();
virtual ~ObDASLockResult();
virtual int init(const ObIDASTaskOp &op) override;
virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override;
int64_t get_affected_rows() const { return affected_rows_; }
void set_affected_rows(int64_t affected_rows) { affected_rows_ = affected_rows; }
INHERIT_TO_STRING_KV("ObIDASTaskResult", ObIDASTaskResult,

View File

@ -19,6 +19,8 @@
#include "sql/das/ob_das_utils.h"
#include "storage/tx/ob_trans_service.h"
#include "sql/engine/ob_exec_context.h"
#include "sql/das/ob_das_rpc_processor.h"
namespace oceanbase
{
using namespace common;
@ -47,11 +49,23 @@ ObDASRef::ObDASRef(ObEvalCtx &eval_ctx, ObExecContext &exec_ctx)
frozen_op_node_(nullptr),
expr_frame_info_(nullptr),
wild_datum_info_(eval_ctx),
aggregated_tasks_(das_alloc_),
lookup_cnt_(0),
task_cnt_(0),
task_map_(),
max_das_task_concurrency_(1),
das_task_concurrency_limit_(1),
cond_(),
async_cb_list_(das_alloc_),
flags_(0)
{
int ret = OB_SUCCESS;
max_das_task_concurrency_ = MTL(ObDataAccessService *)->get_das_concurrency_limit();
OB_ASSERT(max_das_task_concurrency_ > 0);
das_task_concurrency_limit_ = max_das_task_concurrency_;
if (OB_FAIL(cond_.init(ObWaitEventIds::DAS_ASYNC_RPC_LOCK_WAIT))) {
LOG_ERROR("Failed to init thread cond", K(ret), K(MTL_ID()));
}
}
DASOpResultIter ObDASRef::begin_result_iter()
@ -214,19 +228,198 @@ bool ObDASRef::is_all_local_task() const
int ObDASRef::execute_all_task()
{
int ret = OB_SUCCESS;
bool DAS_TASK_AGGREGATION = false;
if (DAS_TASK_AGGREGATION) {
// TODO(roland.qk): DAS task aggregation.
const bool async = GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_1_0_0;
// move local aggregated tasks to last for better concurrency.
if (OB_FAIL(move_local_tasks_to_last())) {
LOG_WARN("failed to move local tasks to last.", K(ret));
} else {
DASTaskIter task_iter = begin_task_iter();
while (OB_SUCC(ret) && !task_iter.is_end()) {
if (OB_FAIL(MTL(ObDataAccessService*)->execute_das_task(*this, **task_iter))) {
LOG_WARN("execute das task failed", K(ret));
uint32_t finished_cnt = 0;
while (finished_cnt < aggregated_tasks_.get_size() && OB_SUCC(ret)) {
finished_cnt = 0;
// execute tasks follows aggregated task state machine.
DLIST_FOREACH_X(curr, aggregated_tasks_.get_obj_list(), OB_SUCC(ret)) {
ObDasAggregatedTasks* aggregated_task = curr->get_obj();
if (aggregated_task->has_unstart_tasks() && OB_FAIL(MTL(ObDataAccessService *)
->execute_das_task(*this, *aggregated_task, async))) {
LOG_WARN("failed to execute aggregated das task", KR(ret));
} else {
LOG_DEBUG("successfully executing aggregated task", "server", aggregated_task->server_);
}
}
if (OB_FAIL(ret)) {
if (check_rcode_can_retry(ret, batched_tasks_.get_last_node()->get_obj()->get_ref_table_id())) {
ret = OB_SUCCESS; // we ignore current error code, since all error code should be checked whether can be retried.
} else {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(wait_executing_tasks())) {
LOG_WARN("failed to wait all tasks", K(ret));
}
break;
}
}
// wait all existing tasks to be finished
if (OB_SUCC(ret) && OB_FAIL(wait_executing_tasks())) {
LOG_WARN("failed to process all async remote tasks", K(ret));
if (check_rcode_can_retry(ret, batched_tasks_.get_last_node()->get_obj()->get_ref_table_id())) {
ret = OB_SUCCESS;
}
}
if (OB_SUCC(ret)) {
// check das task status.
DLIST_FOREACH_X(curr, aggregated_tasks_.get_obj_list(), OB_SUCC(ret)) {
ObDasAggregatedTasks* aggregated_task = curr->get_obj();
if (aggregated_task->has_unstart_tasks()) {
if (aggregated_task->has_failed_tasks()) {
if (OB_FAIL(aggregated_task->failed_tasks_can_retry())) {
LOG_WARN("failed das aggreagted task cannot retry.", K(ret));
} else {
// retry all failed tasks.
common::ObSEArray<ObIDASTaskOp *, 2> failed_tasks;
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(aggregated_task->get_failed_tasks(failed_tasks))) {
LOG_WARN("failed to get failed tasks", K(ret));
} else if (failed_tasks.count() == 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get failed tasks");
} else {
for (int i = 0; OB_SUCC(ret) && i < failed_tasks.count(); i++) {
if (OB_FAIL(MTL(ObDataAccessService *)->retry_das_task(*this, *failed_tasks.at(i)))) {
LOG_WARN("Failed to retry das task", K(ret));
break;
}
}
}
}
} else {
// proceed while loop for other unfinished tasks.
}
} else {
++finished_cnt;
#if !defined(NDEBUG)
OB_ASSERT(aggregated_task->high_priority_tasks_.get_size() == 0);
OB_ASSERT(aggregated_task->tasks_.get_size() == 0);
OB_ASSERT(aggregated_task->failed_tasks_.get_size() == 0);
OB_ASSERT(aggregated_task->success_tasks_.get_size() != 0);
DLIST_FOREACH_X(curr_task, aggregated_task->success_tasks_, true) {
ObIDASTaskOp *tmp_task = curr_task->get_data();
OB_ASSERT(ObDasTaskStatus::FINISHED == tmp_task->get_task_status());
}
#endif
}
}
LOG_DEBUG("current das task status", K(finished_cnt), K(aggregated_tasks_.get_size()));
}
}
}
return ret;
}
bool ObDASRef::check_rcode_can_retry(int ret, int64_t ref_table_id)
{
bool bret = false;
if ((is_master_changed_error(ret) ||
is_partition_change_error(ret) ||
OB_REPLICA_NOT_READABLE == ret) &&
GCONF._enable_partition_level_retry &&
!is_virtual_table(ref_table_id)) {
bret = true; // we ignore current error code, since all error code should be checked whether can be retried.
}
return bret;
}
int ObDASRef::wait_executing_tasks()
{
int ret = OB_SUCCESS;
if (get_current_concurrency() < max_das_task_concurrency_) {
ObThreadCondGuard guard(cond_);
while (OB_SUCC(ret) && get_current_concurrency() < max_das_task_concurrency_) {
// we cannot use ObCond here because it can not explicitly lock mutex, causing concurrency problem.
if (OB_FAIL(cond_.wait())) {
LOG_WARN("failed to wait all das tasks to be finished.", K(ret));
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(process_remote_task_resp())) {
LOG_WARN("failed to process remote task resp", K(ret));
}
}
return ret;
}
int ObDASRef::wait_all_tasks()
{
// won't implement until das async execution.
return OB_UNIMPLEMENTED_FEATURE;
}
int ObDASRef::allocate_async_das_cb(ObRpcDasAsyncAccessCallBack *&async_cb, const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops)
{
int ret = OB_SUCCESS;
OB_ASSERT(async_cb == nullptr);
ObDASTaskFactory &das_factory = get_das_factory();
if (OB_FAIL(das_factory.create_das_async_cb(task_ops, das_alloc_.get_attr(), *this, async_cb))) {
LOG_WARN("failed to create das async cb", K(ret));
} else if (OB_ISNULL(async_cb)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to allocate async cb obj", K(ret));
} else if (OB_FAIL(async_cb_list_.store_obj(async_cb))) {
LOG_WARN("failed to store async cb obj", K(ret));
}
return ret;
}
void ObDASRef::remove_async_das_cb(ObRpcDasAsyncAccessCallBack *das_async_cb)
{
bool removed = false;
DLIST_FOREACH_X(curr, async_cb_list_.get_obj_list(), !removed) {
if (curr->get_obj() == das_async_cb) {
async_cb_list_.get_obj_list().remove(curr);
removed = true;
LOG_DEBUG("found remove node", K(das_async_cb));
}
++task_iter;
}
}
int ObDASRef::process_remote_task_resp()
{
int ret = OB_SUCCESS;
DLIST_FOREACH_X(curr, async_cb_list_.get_obj_list(), OB_SUCC(ret)) {
const sql::ObDASTaskResp &task_resp = curr->get_obj()->get_task_resp();
const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops = curr->get_obj()->get_task_ops();
if (OB_UNLIKELY(OB_SUCCESS != task_resp.get_rpc_rcode())) {
LOG_WARN("das async rpc error", K(task_resp.get_rpc_rcode()));
for (int i = 0; i < task_ops.count(); i++) {
task_ops.at(i)->set_task_status(ObDasTaskStatus::FAILED);
task_ops.at(i)->errcode_ = task_resp.get_rpc_rcode();
if (OB_FAIL(task_ops.at(i)->state_advance())) {
LOG_WARN("failed to advance das task state", K(ret));
}
}
ret = task_resp.get_rpc_rcode();
} else if (OB_FAIL(MTL(ObDataAccessService *)->process_task_resp(*this, task_resp, task_ops))) {
LOG_WARN("failed to process das async task resp", K(ret), K(task_resp));
}
}
async_cb_list_.clear(); // no need to hold async cb anymore. destructor would be called in das factory.
return ret;
}
int ObDASRef::move_local_tasks_to_last()
{
int ret = OB_SUCCESS;
bool found_local_tasks = false;
const common::ObAddr &ctrl_addr = MTL(ObDataAccessService *)->get_ctrl_addr();
DLIST_FOREACH_X(curr, aggregated_tasks_.get_obj_list(), !found_local_tasks) {
ObDasAggregatedTasks* aggregated_task = curr->get_obj();
if (aggregated_task->server_ == ctrl_addr) {
if (!aggregated_tasks_.get_obj_list().move_to_last(curr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to move local task to last", KR(ret), K(*aggregated_task));
}
found_local_tasks = true;
}
}
return ret;
}
@ -279,6 +472,7 @@ int ObDASRef::close_all_task()
session->get_trans_result().set_incomplete();
}
batched_tasks_.destroy();
aggregated_tasks_.destroy();
if (task_map_.created()) {
task_map_.destroy();
}
@ -310,10 +504,43 @@ int ObDASRef::create_das_task(const ObDASTabletLoc *tablet_loc,
task_op->set_tablet_loc(tablet_loc);
if (OB_FAIL(task_op->init_task_info())) {
LOG_WARN("init task info failed", K(ret));
} else if (OB_FAIL(add_aggregated_task(task_op))) {
LOG_WARN("failed to add aggregated task", KR(ret));
}
}
if (OB_SUCC(ret) && OB_FAIL(add_batched_task(task_op))) {
LOG_WARN("add batched task failed", KR(ret), KPC(task_op));
return ret;
}
int ObDASRef::add_aggregated_task(ObIDASTaskOp *das_task)
{
int ret = OB_SUCCESS;
bool aggregated = false;
DLIST_FOREACH_X(curr, aggregated_tasks_.get_obj_list(), !aggregated && OB_SUCC(ret)) {
ObDasAggregatedTasks* aggregated_task = curr->get_obj();
if (aggregated_task->server_ == das_task->tablet_loc_->server_) {
if (OB_FAIL(aggregated_task->push_back_task(das_task))) {
LOG_WARN("failed to add aggregated tasks", KR(ret));
} else {
aggregated = true;
}
}
}
if (OB_FAIL(ret)) {
} else if (!aggregated) {
void *buf = das_alloc_.alloc(sizeof(ObDasAggregatedTasks));
ObDasAggregatedTasks *agg_tasks = nullptr;
if (OB_ISNULL(buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory for aggregated tasks", KR(ret));
} else if (FALSE_IT(agg_tasks = new(buf) ObDasAggregatedTasks(das_alloc_))) {
} else if (OB_FAIL(aggregated_tasks_.store_obj(agg_tasks))) {
LOG_WARN("failed to add aggregated tasks", KR(ret));
} else if (OB_FAIL(agg_tasks->push_back_task(das_task))) {
LOG_WARN("failed to add task", K(ret));
}
}
if (OB_SUCC(ret) && OB_FAIL(add_batched_task(das_task))) {
LOG_WARN("add batched task failed", KR(ret), KPC(das_task));
}
return ret;
}
@ -322,6 +549,7 @@ void ObDASRef::reset()
{
das_factory_.cleanup();
batched_tasks_.destroy();
aggregated_tasks_.destroy();
lookup_cnt_ = 0;
task_cnt_ = 0;
if (task_map_.created()) {
@ -340,6 +568,7 @@ void ObDASRef::reuse()
{
das_factory_.cleanup();
batched_tasks_.destroy();
aggregated_tasks_.destroy();
lookup_cnt_ = 0;
task_cnt_ = 0;
if (task_map_.created()) {
@ -354,5 +583,297 @@ void ObDASRef::reuse()
das_alloc_.set_alloc(reuse_alloc_);
}
}
int32_t ObDASRef::get_current_concurrency() const
{
return ATOMIC_LOAD(&das_task_concurrency_limit_);
};
void ObDASRef::inc_concurrency_limit()
{
ATOMIC_INC(&das_task_concurrency_limit_);
}
void ObDASRef::inc_concurrency_limit_with_signal()
{
ObThreadCondGuard guard(cond_);
if (__sync_add_and_fetch(&das_task_concurrency_limit_, 1) == max_das_task_concurrency_) {
cond_.signal();
}
}
int ObDASRef::dec_concurrency_limit()
{
int ret = OB_SUCCESS;
int32_t cur = get_current_concurrency();
int32_t next = cur - 1;
if (OB_UNLIKELY(0 == cur)) {
ret = OB_SIZE_OVERFLOW;
} else {
while (ATOMIC_CAS(&das_task_concurrency_limit_, cur, next) != cur) {
cur = get_current_concurrency();
next = cur - 1;
if (OB_UNLIKELY(0 == cur)) {
ret = OB_SIZE_OVERFLOW;
break;
}
}
}
return ret;
}
// not thread safe.
int ObDASRef::acquire_task_execution_resource()
{
int ret = OB_SUCCESS;
OB_ASSERT(get_current_concurrency() >= 0);
if (OB_FAIL(dec_concurrency_limit())) {
LOG_WARN("failed to acquire das execution resource", K(ret), K(get_current_concurrency()));
}
if (OB_UNLIKELY(OB_SIZE_OVERFLOW == ret)) {
ret = OB_SUCCESS;
ObThreadCondGuard guard(cond_);
if (OB_FAIL(cond_.wait(get_exec_ctx().get_my_session()->get_query_timeout_ts() -
ObTimeUtility::current_time()))) {
LOG_WARN("failed to acquire das task execution resource", K(ret), K(get_current_concurrency()));
} else if (OB_FAIL(dec_concurrency_limit())) {
LOG_WARN("failed to acquire das execution resource", K(ret), K(get_current_concurrency()));
}
}
return ret;
}
void ObDasAggregatedTasks::reset()
{
server_.reset();
high_priority_tasks_.reset();
tasks_.reset();
failed_tasks_.reset();
success_tasks_.reset();
}
void ObDasAggregatedTasks::reuse()
{
server_.reset();
high_priority_tasks_.reset();
tasks_.reset();
failed_tasks_.reset();
success_tasks_.reset();
}
int ObDasAggregatedTasks::push_back_task(ObIDASTaskOp *das_task)
{
int ret = OB_SUCCESS;
if (das_task->get_cur_agg_list()) {
// if task already have linked list (in task retry), remove it first
das_task->get_cur_agg_list()->remove(&das_task->get_node());
}
if (high_priority_tasks_.get_size() == 0 && tasks_.get_size() == 0) {
server_ = das_task->get_tablet_loc()->server_;
}
if (ObDASOpType::DAS_OP_TABLE_DELETE == das_task->get_type()) {
// we move all DELETE das op to high priority tasks anyway.
if (OB_UNLIKELY(!high_priority_tasks_.add_last(&das_task->get_node()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to push back high priority task", K(ret));
} else {
das_task->set_cur_agg_list(&high_priority_tasks_);
}
} else if (OB_UNLIKELY(!tasks_.add_last(&das_task->get_node()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to push back normal das task", K(ret));
} else {
das_task->set_cur_agg_list(&tasks_);
}
if (OB_SUCC(ret) && !das_task->get_agg_tasks()) {
das_task->set_agg_tasks(this);
}
return ret;
}
int ObDasAggregatedTasks::get_aggregated_tasks(
common::ObSEArray<ObIDASTaskOp *, 2> &tasks) {
int ret = OB_SUCCESS;
ObIDASTaskOp *cur_task = nullptr;
// 1. if have failed tasks, should explicitly get failed task via get_failed_tasks().
if (OB_UNLIKELY(failed_tasks_.get_size() != 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("das aggregated failed task exist. couldn't get unstarted tasks.", K(ret));
}
// 2. if no failed tasks exist and have unfinished high priority aggregated tasks, return all high priority tasks.
if (tasks.count() == 0 && OB_SUCC(ret)) {
DLIST_FOREACH_X(curr, high_priority_tasks_, OB_SUCC(ret)) {
cur_task = curr->get_data();
OB_ASSERT(cur_task != nullptr);
OB_ASSERT(cur_task->get_cur_agg_list() == &high_priority_tasks_);
OB_ASSERT(ObDASOpType::DAS_OP_TABLE_DELETE == cur_task->get_type());
OB_ASSERT(ObDasTaskStatus::UNSTART == cur_task->get_task_status());
if (OB_FAIL(tasks.push_back(cur_task))) {
LOG_WARN("failed to push back high prio tasks", KR(ret), K(cur_task));
}
}
}
// 3. if no unfinished high priority aggregated tasks exist, return all normal aggregated tasks.
if (tasks.count() == 0 && OB_SUCC(ret)) {
DLIST_FOREACH_X(curr, tasks_, OB_SUCC(ret)) {
cur_task = curr->get_data();
OB_ASSERT(cur_task != nullptr);
OB_ASSERT(cur_task->get_cur_agg_list() == &tasks_);
OB_ASSERT(ObDASOpType::DAS_OP_TABLE_DELETE != cur_task->get_type());
OB_ASSERT(ObDasTaskStatus::UNSTART == cur_task->get_task_status());
if (OB_FAIL(tasks.push_back(cur_task))) {
LOG_WARN("failed to push back high prio tasks", KR(ret), K(cur_task));
}
}
}
return ret;
}
int ObDasAggregatedTasks::get_aggregated_tasks(
common::ObSEArray<common::ObSEArray<ObIDASTaskOp *, 2>, 2> &task_groups,
int64_t count)
{
int ret = OB_SUCCESS;
ObIDASTaskOp *cur_task = nullptr;
// 1. if have failed tasks, should explicitly get failed task via get_failed_tasks().
if (OB_UNLIKELY(failed_tasks_.get_size() != 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("das aggregated failed task exist. couldn't get unstarted tasks.", K(ret));
} else if (OB_LIKELY(count == 1)) {
if (OB_FAIL(task_groups.push_back(common::ObSEArray<ObIDASTaskOp *, 2>()))) {
LOG_WARN("failed to push back", K(ret));
} else if (OB_FAIL(get_aggregated_tasks(task_groups.at(0)))) {
LOG_WARN("Failed to get aggregated das task", KR(ret));
}
} else {
// 2. if no failed tasks exist and have unfinished high priority aggregated tasks, return all high priority tasks.
if (task_groups.count() == 0 && high_priority_tasks_.get_size() != 0 && OB_SUCC(ret)) {
int idx = 0;
if (high_priority_tasks_.get_size() < count) {
count = high_priority_tasks_.get_size();
}
for (int i = 0; OB_SUCC(ret) && i < count; i++) {
if (OB_FAIL(task_groups.push_back(common::ObSEArray<ObIDASTaskOp *, 2>()))) {
LOG_WARN("failed to push back", K(ret));
}
}
DLIST_FOREACH_X(curr, high_priority_tasks_, OB_SUCC(ret)) {
cur_task = curr->get_data();
OB_ASSERT(cur_task != nullptr);
OB_ASSERT(ObDASOpType::DAS_OP_TABLE_DELETE == cur_task->get_type());
OB_ASSERT(ObDasTaskStatus::UNSTART == cur_task->get_task_status());
if (OB_FAIL(task_groups.at(idx++ % count).push_back(cur_task))) {
LOG_WARN("failed to push back high prio tasks", KR(ret), K(cur_task));
}
}
}
// 3. if no unfinished high priority aggregated tasks exist, return all normal aggregated tasks.
if (task_groups.count() == 0 && tasks_.get_size() != 0 && OB_SUCC(ret)) {
int idx = 0;
if (tasks_.get_size() < count) {
count = tasks_.get_size();
}
for (int i = 0; OB_SUCC(ret) && i < count; i++) {
if (OB_FAIL(task_groups.push_back(common::ObSEArray<ObIDASTaskOp *, 2>()))) {
LOG_WARN("failed to push back", K(ret));
}
}
DLIST_FOREACH_X(curr, tasks_, OB_SUCC(ret)) {
cur_task = curr->get_data();
OB_ASSERT(cur_task != nullptr);
OB_ASSERT(ObDASOpType::DAS_OP_TABLE_DELETE != cur_task->get_type());
OB_ASSERT(ObDasTaskStatus::UNSTART == cur_task->get_task_status());
if (OB_FAIL(task_groups.at(idx++ % count).push_back(cur_task))) {
LOG_WARN("failed to push back high prio tasks", KR(ret), K(cur_task));
}
}
}
}
return ret;
}
int ObDasAggregatedTasks::move_to_success_tasks(ObIDASTaskOp *das_task)
{
int ret = OB_SUCCESS;
das_task->get_cur_agg_list()->remove(&das_task->get_node());
if (OB_UNLIKELY(!success_tasks_.add_last(&das_task->get_node()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to move task to success tasks", KR(ret));
} else {
das_task->set_cur_agg_list(&success_tasks_);
}
return ret;
}
int ObDasAggregatedTasks::move_to_failed_tasks(ObIDASTaskOp *das_task)
{
int ret = OB_SUCCESS;
das_task->get_cur_agg_list()->remove(&das_task->get_node());
if (OB_UNLIKELY(!failed_tasks_.add_last(&das_task->get_node()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to move task to success tasks", KR(ret));
} else {
das_task->set_cur_agg_list(&failed_tasks_);
}
return ret;
}
int ObDasAggregatedTasks::get_failed_tasks(common::ObSEArray<ObIDASTaskOp *, 2> &tasks)
{
int ret = OB_SUCCESS;
ObIDASTaskOp *cur_task = nullptr;
DLIST_FOREACH_X(curr, failed_tasks_, OB_SUCC(ret)) {
cur_task = curr->get_data();
OB_ASSERT(cur_task != nullptr);
OB_ASSERT(ObDasTaskStatus::FAILED == cur_task->get_task_status());
if (OB_FAIL(tasks.push_back(cur_task))) {
LOG_WARN("failed to push back high prio tasks", KR(ret), K(cur_task));
}
}
return ret;
}
int ObDasAggregatedTasks::failed_tasks_can_retry() const
{
int ret = OB_SUCCESS;
bool can_retry = true;
if (!GCONF._enable_partition_level_retry) {
can_retry = false;
}
ObIDASTaskOp *cur_task = nullptr;
DLIST_FOREACH_X(curr, failed_tasks_, can_retry) {
cur_task = curr->get_data();
ret = cur_task->get_errcode();
if ((is_master_changed_error(ret) ||
is_partition_change_error(ret) ||
OB_REPLICA_NOT_READABLE == ret) &&
cur_task->can_part_retry()) {
} else {
can_retry = false;
}
}
#if !defined(NDEBUG)
if (can_retry) {
OB_ASSERT(OB_SUCCESS != ret);;
}
#endif
return can_retry ? OB_SUCCESS : ret; // return the error code that can't be retried.
};
bool ObDasAggregatedTasks::has_unstart_tasks() const
{
return high_priority_tasks_.get_size() != 0 ||
tasks_.get_size() != 0 ||
failed_tasks_.get_size() != 0;
}
int32_t ObDasAggregatedTasks::get_unstart_task_size() const
{
return tasks_.get_size() + high_priority_tasks_.get_size();
}
} // namespace sql
} // namespace oceanbase

View File

@ -17,12 +17,56 @@
#include "sql/das/ob_das_factory.h"
#include "sql/das/ob_das_def_reg.h"
#include "storage/tx/ob_trans_service.h"
namespace oceanbase
{
namespace sql
{
class ObDASScanOp;
class ObDASInsertOp;
class ObRpcDasAsyncAccessCallBack;
struct ObDasAggregatedTasks
{
public:
ObDasAggregatedTasks(common::ObIAllocator &allocator)
: server_(),
high_priority_tasks_(),
tasks_(),
failed_tasks_(),
success_tasks_() {}
~ObDasAggregatedTasks() { reset(); };
void reset();
void reuse();
int push_back_task(ObIDASTaskOp *das_task);
// not thread safe
int move_to_success_tasks(ObIDASTaskOp *das_task);
// not thread safe
int move_to_failed_tasks(ObIDASTaskOp *das_task);
/**
* get aggregated tasks.
* @param tasks: task array to output aggregated tasks.
*/
int get_aggregated_tasks(common::ObSEArray<ObIDASTaskOp *, 2> &tasks);
/**
* get aggregated tasks.
* @param task_groups: task array's array to output aggregated tasks.
* @param count: how many aggregated tasks should be generated.
*/
int get_aggregated_tasks(common::ObSEArray<common::ObSEArray<ObIDASTaskOp *, 2>, 2> &task_groups, int64_t count);
int get_failed_tasks(common::ObSEArray<ObIDASTaskOp *, 2> &tasks);
bool has_failed_tasks() const { return failed_tasks_.get_size() > 0; };
int failed_tasks_can_retry() const;
bool has_unstart_tasks() const;
int32_t get_unstart_task_size() const;
TO_STRING_KV(K_(server), K(high_priority_tasks_.get_size()), K(tasks_.get_size()), K(failed_tasks_.get_size()), K(success_tasks_.get_size()));
common::ObAddr server_;
DasTaskLinkedList high_priority_tasks_;
DasTaskLinkedList tasks_;
DasTaskLinkedList failed_tasks_;
DasTaskLinkedList success_tasks_;
};
struct DasRefKey
{
public:
@ -63,6 +107,7 @@ public:
template <typename DASOp>
bool has_das_op(const ObDASTabletLoc *tablet_loc, DASOp *&das_op);
ObIDASTaskOp* find_das_task(const ObDASTabletLoc *tablet_loc, ObDASOpType op_type);
int add_aggregated_task(ObIDASTaskOp *das_task);
int add_batched_task(ObIDASTaskOp *das_task);
//创建一个DAS Task,并由das_ref持有
template <typename DASOp>
@ -78,22 +123,32 @@ public:
void set_execute_directly(bool v) { execute_directly_ = v; }
bool is_execute_directly() const { return execute_directly_; }
common::ObIAllocator &get_das_alloc() { return das_alloc_; }
int pick_del_task_to_first();
void print_all_das_task();
void set_frozen_node();
const ObExprFrameInfo *get_expr_frame_info() const { return expr_frame_info_; }
void set_expr_frame_info(const ObExprFrameInfo *info) { expr_frame_info_ = info; }
ObEvalCtx &get_eval_ctx() { return eval_ctx_; };
void reset();
void reuse();
void set_lookup_iter(DASOpResultIter *lookup_iter) { wild_datum_info_.lookup_iter_ = lookup_iter; }
int32_t get_current_concurrency() const;
void inc_concurrency_limit();
void inc_concurrency_limit_with_signal();
int dec_concurrency_limit();
int32_t get_max_concurrency() const { return max_das_task_concurrency_; };
int acquire_task_execution_resource();
int get_aggregated_tasks_count() const { return aggregated_tasks_.get_size(); }
int wait_all_tasks();
int allocate_async_das_cb(ObRpcDasAsyncAccessCallBack *&async_cb, const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops);
void remove_async_das_cb(ObRpcDasAsyncAccessCallBack *das_async_cb);
private:
DISABLE_COPY_ASSIGN(ObDASRef);
int create_task_map();
int move_local_tasks_to_last();
int wait_executing_tasks();
int process_remote_task_resp();
bool check_rcode_can_retry(int ret, int64_t ref_table_id);
private:
typedef common::ObObjNode<ObIDASTaskOp*> DasOpNode;
//declare das allocator
@ -111,9 +166,16 @@ private:
DasOpNode *frozen_op_node_; // 初始为链表的head节点,冻结一次之后为链表的最后一个节点
const ObExprFrameInfo *expr_frame_info_;
DASOpResultIter::WildDatumPtrInfo wild_datum_info_;
typedef common::ObObjStore<ObDasAggregatedTasks *, common::ObIAllocator&> DasAggregatedTaskList;
DasAggregatedTaskList aggregated_tasks_;
int64_t lookup_cnt_;
int64_t task_cnt_;
ObDASRefMap task_map_;
int32_t max_das_task_concurrency_;
int32_t das_task_concurrency_limit_;
common::ObThreadCond cond_;
typedef common::ObObjStore<ObRpcDasAsyncAccessCallBack *, common::ObIAllocator&> DasAsyncCbList;
DasAsyncCbList async_cb_list_;
public:
//all flags
union {

View File

@ -17,15 +17,17 @@
#include "sql/engine/ob_exec_context.h"
#include "observer/ob_server_struct.h"
#include "storage/tx/ob_trans_service.h"
#include "observer/ob_srv_network_frame.h"
namespace oceanbase
{
namespace sql
{
int ObDASSyncAccessP::init()
template<obrpc::ObRpcPacketCode pcode>
int ObDASBaseAccessP<pcode>::init()
{
int ret = OB_SUCCESS;
ObDASTaskArg &task = arg_;
ObDASSyncAccessP::get_das_factory() = &das_factory_;
ObDASTaskArg &task = RpcProcessor::arg_;
ObDASBaseAccessP<pcode>::get_das_factory() = &das_factory_;
das_remote_info_.exec_ctx_ = &exec_ctx_;
das_remote_info_.frame_info_ = &frame_info_;
task.set_remote_info(&das_remote_info_);
@ -33,64 +35,72 @@ int ObDASSyncAccessP::init()
return ret;
}
int ObDASSyncAccessP::before_process()
template<obrpc::ObRpcPacketCode pcode>
int ObDASBaseAccessP<pcode>::before_process()
{
int ret = OB_SUCCESS;
ObDASTaskArg &task = arg_;
ObDASTaskResp &task_resp = result_;
ObIDASTaskResult *task_result = nullptr;
ObDASTaskArg &task = RpcProcessor::arg_;
ObDASTaskResp &task_resp = RpcProcessor::result_;
ObMemAttr mem_attr;
mem_attr.tenant_id_ = task.get_task_op()->get_tenant_id();
mem_attr.label_ = "DASRpcPCtx";
exec_ctx_.get_allocator().set_attr(mem_attr);
ObDASTaskFactory *das_factory = ObDASSyncAccessP::get_das_factory();
if (OB_ISNULL(das_factory)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("das factory is not inited", K(ret));
} else if (OB_FAIL(ObDASSyncRpcProcessor::before_process())) {
if (OB_FAIL(RpcProcessor::before_process())) {
LOG_WARN("do rpc processor before_process failed", K(ret));
} else if (das_remote_info_.need_calc_expr_ &&
OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard_))) {
LOG_WARN("fail to get schema guard", K(ret));
} else if (OB_FAIL(das_factory->create_das_task_result(task.get_task_op()->get_type(),
task_result))) {
LOG_WARN("create das task result failed", K(ret), K(task));
} else if (OB_FAIL(task_result->init(*task.get_task_op()))) {
LOG_WARN("init task result failed", K(ret), KPC(task_result), KPC(task.get_task_op()));
} else if (OB_FAIL(task_resp.add_op_result(task_result))) {
LOG_WARN("failed to add das op result", K(ret), K(*task_result));
} else {
exec_ctx_.get_sql_ctx()->schema_guard_ = &schema_guard_;
}
return ret;
}
int ObDASSyncAccessP::process()
ERRSIM_POINT_DEF(EN_DAS_SIMULATE_EXTRA_RESULT_MEMORY_LIMIT);
template<obrpc::ObRpcPacketCode pcode>
int ObDASBaseAccessP<pcode>::process()
{
int ret = OB_SUCCESS;
NG_TRACE(das_rpc_process_begin);
FLTSpanGuard(das_rpc_process);
ObDASTaskArg &task = arg_;
ObDASTaskResp &task_resp = result_;
ObIDASTaskOp *task_op = task.get_task_op();
ObIDASTaskResult *task_result = task_resp.get_op_result();
LOG_DEBUG("DAS base access remote process", K_(RpcProcessor::arg));
ObDASTaskArg &task = RpcProcessor::arg_;
ObDASTaskResp &task_resp = RpcProcessor::result_;
const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops = task.get_task_ops();
common::ObSEArray<ObIDASTaskResult*, 2> &task_results = task_resp.get_op_results();
ObDASTaskFactory *das_factory = ObDASBaseAccessP<pcode>::get_das_factory();
ObIDASTaskResult *op_result = nullptr;
ObIDASTaskOp *task_op = nullptr;
bool has_more = false;
ObDASOpType task_type = DAS_OP_INVALID;
//regardless of the success of the task execution, the fllowing meta info must be set
task_result->set_task_id(task_op->get_task_id());
int64_t memory_limit = das::OB_DAS_MAX_PACKET_SIZE;
#ifdef ERRSIM
if (EN_DAS_SIMULATE_EXTRA_RESULT_MEMORY_LIMIT) {
memory_limit = -EN_DAS_SIMULATE_EXTRA_RESULT_MEMORY_LIMIT;
LOG_INFO("das simulate extra result memory limit", K(memory_limit));
}
#endif
//regardless of the success of the task execution, the following meta info must be set
task_resp.set_ctrl_svr(task.get_ctrl_svr());
task_resp.set_runner_svr(task.get_runner_svr());
if (OB_ISNULL(task_op) || OB_ISNULL(task_result)) {
if (task_ops.count() == 0 || task_results.count() != 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("task op is nullptr", K(ret), K(task_op), K(task_result));
LOG_WARN("task op unexpected", K(ret), K(task_ops), K(task_results));
} else {
for (int i = 0; OB_SUCC(ret) && i < task_ops.count(); i++) {
task_op = task_ops.at(i);
if (OB_FAIL(das_factory->create_das_task_result(task_op->get_type(), op_result))) {
LOG_WARN("create das task result failed", K(ret));
} else if (OB_FAIL(task_resp.add_op_result(op_result))) {
LOG_WARN("failed to add op result", K(ret));
} else if (OB_FAIL(op_result->init(*task_op, CURRENT_CONTEXT->get_arena_allocator()))) {
LOG_WARN("failed to init op result", K(ret));
} else if (FALSE_IT(op_result->set_task_id(task_op->get_task_id()))) {
} else if (OB_FAIL(task_op->start_das_task())) {
LOG_WARN("start das task failed", K(ret));
} else if (OB_FAIL(task_op->fill_task_result(*task_result, has_more))) {
} else if (OB_FAIL(task_op->fill_task_result(*task_results.at(i), has_more, memory_limit))) {
LOG_WARN("fill task result to controller failed", K(ret));
} else if (OB_UNLIKELY(has_more) && OB_FAIL(task_op->fill_extra_result())) {
LOG_WARN("fill extra result to controller failed", KR(ret));
} else {
task_type = task_op->get_type();
task_resp.set_has_more(has_more);
ObWarningBuffer *wb = ob_get_tsi_warning_buffer();
if (wb != nullptr) {
@ -99,7 +109,6 @@ int ObDASSyncAccessP::process()
}
}
//因为end_task还有可能失败,需要通过RPC将end_task的返回值带回到scheduler上
if (OB_NOT_NULL(task_op)) {
int tmp_ret = task_op->end_das_task();
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("end das task failed", K(ret), K(tmp_ret), K(task));
@ -121,21 +130,36 @@ int ObDASSyncAccessP::process()
task_resp.set_err_code(ret);
if (OB_SUCCESS != ret) {
task_resp.store_err_msg(ob_get_tsi_err_msg(ret));
LOG_WARN("process das sync access task failed", K(ret),
LOG_WARN("process das access task failed", K(ret),
K(task.get_ctrl_svr()), K(task.get_runner_svr()));
}
if (has_more || memory_limit < 0) {
/**
* das serialized execution.
* If current resp buffer is overflow. We would reply result
* directly. following un-executed tasks would be executed
* later remotely.
*
* the insert op won't set has_more flag, but if it exceed the
* threshold of memory_limit, we should reply anyway.
*/
LOG_DEBUG("reply das result due to memory limit exceeded.",
K(has_more), K(memory_limit), K(i), K(task_ops.count()));
break;
}
LOG_DEBUG("process das base access task", K(ret), KPC(task_op), KPC(op_result), K(has_more));
}
}
LOG_DEBUG("process das sync access task", K(ret), K(task), KPC(task_result), K(has_more));
NG_TRACE_EXT(das_rpc_process_end, OB_ID(type), task_type);
return OB_SUCCESS;
}
int ObDASSyncAccessP::after_process(int error_code)
template<obrpc::ObRpcPacketCode pcode>
int ObDASBaseAccessP<pcode>::after_process(int error_code)
{
int ret = OB_SUCCESS;
const int64_t elapsed_time = common::ObTimeUtility::current_time() - get_receive_timestamp();
if (OB_FAIL(ObDASSyncRpcProcessor::after_process(error_code))) {
LOG_WARN("do das sync base rpc process failed", K(ret));
const int64_t elapsed_time = common::ObTimeUtility::current_time() - RpcProcessor::get_receive_timestamp();
if (OB_FAIL(RpcProcessor::after_process(error_code))) {
LOG_WARN("do das base rpc process failed", K(ret));
} else if (elapsed_time >= ObServerConfig::get_instance().trace_log_slow_query_watermark) {
//slow das task, print trace info
FORCE_PRINT_TRACE(THE_TRACE, "[slow das rpc process]");
@ -144,18 +168,88 @@ int ObDASSyncAccessP::after_process(int error_code)
return OB_SUCCESS;
}
void ObDASSyncAccessP::cleanup()
template<obrpc::ObRpcPacketCode pcode>
void ObDASBaseAccessP<pcode>::cleanup()
{
ObActiveSessionGuard::setup_default_ash();
das_factory_.cleanup();
ObDASSyncAccessP::get_das_factory() = nullptr;
ObDASBaseAccessP<pcode>::get_das_factory() = nullptr;
if (das_remote_info_.trans_desc_ != nullptr) {
MTL(transaction::ObTransService*)->release_tx(*das_remote_info_.trans_desc_);
das_remote_info_.trans_desc_ = nullptr;
}
ObDASSyncRpcProcessor::cleanup();
RpcProcessor::cleanup();
}
int ObDASSyncAccessP::process()
{
int ret = OB_SUCCESS;
LOG_DEBUG("DAS sync access remote process", K_(arg));
NG_TRACE(das_sync_rpc_process_begin);
FLTSpanGuard(das_sync_rpc_process);
if (OB_FAIL(ObDASSyncRpcProcessor::process())) {
LOG_WARN("failed to process das sync rpc", K(ret));
}
NG_TRACE(das_sync_rpc_process_end);
return OB_SUCCESS;
}
int ObDASAsyncAccessP::process()
{
int ret = OB_SUCCESS;
LOG_DEBUG("DAS async access remote process", K_(arg));
NG_TRACE(das_async_rpc_process_begin);
FLTSpanGuard(das_async_rpc_process);
if (OB_FAIL(ObDASAsyncRpcProcessor::process())) {
LOG_WARN("failed to process das async rpc", K(ret));
}
NG_TRACE(das_async_rpc_process_end);
return OB_SUCCESS;
}
void ObRpcDasAsyncAccessCallBack::on_timeout()
{
int ret = OB_SUCCESS;
LOG_WARN("das async task timeout", K(get_task_ops()));
result_.set_rpc_rcode(OB_TIMEOUT);
context_->get_das_ref().inc_concurrency_limit_with_signal();
}
void ObRpcDasAsyncAccessCallBack::on_invalid()
{
int ret = OB_SUCCESS;
// a valid packet on protocol level, but can't decode it.
LOG_WARN("das async task invalid", K(get_task_ops()));
result_.set_rpc_rcode(OB_INVALID_ERROR);
context_->get_das_ref().inc_concurrency_limit_with_signal();
}
void ObRpcDasAsyncAccessCallBack::set_args(const Request &arg)
{
UNUSED(arg);
}
int ObRpcDasAsyncAccessCallBack::process()
{
int ret = OB_SUCCESS;
LOG_DEBUG("DAS async access callback process", K_(result));
context_->get_das_ref().inc_concurrency_limit_with_signal();
return ret;
}
oceanbase::rpc::frame::ObReqTransport::AsyncCB *ObRpcDasAsyncAccessCallBack::clone(
const oceanbase::rpc::frame::SPAlloc &alloc) const {
UNUSED(alloc);
return const_cast<rpc::frame::ObReqTransport::AsyncCB *>(
static_cast<const rpc::frame::ObReqTransport::AsyncCB * const>(this));
}
int ObDasAsyncRpcCallBackContext::init(const ObMemAttr &attr)
{
alloc_.set_attr(attr);
return task_ops_.get_copy_assign_ret();
};
int ObDASSyncFetchP::process()
{
int ret = OB_SUCCESS;

View File

@ -29,23 +29,23 @@ struct ObGlobalContext;
}
namespace sql
{
typedef obrpc::ObRpcProcessor<obrpc::ObDASRpcProxy::ObRpc<obrpc::OB_DAS_SYNC_ACCESS> > ObDASSyncRpcProcessor;
typedef obrpc::ObRpcProcessor<obrpc::ObDASRpcProxy::ObRpc<obrpc::OB_DAS_SYNC_FETCH_RESULT> > ObDASSyncFetchResRpcProcessor;
typedef obrpc::ObRpcProcessor<obrpc::ObDASRpcProxy::ObRpc<obrpc::OB_DAS_ASYNC_ERASE_RESULT> > ObDASAsyncEraseResRpcProcessor;
class ObDASSyncAccessP : public ObDASSyncRpcProcessor
template<obrpc::ObRpcPacketCode pcode>
class ObDASBaseAccessP : public obrpc::ObRpcProcessor<obrpc::ObDASRpcProxy::ObRpc<pcode>>
{
public:
ObDASSyncAccessP(const observer::ObGlobalContext &gctx)
typedef obrpc::ObRpcProcessor<obrpc::ObDASRpcProxy::ObRpc<pcode>> RpcProcessor;
ObDASBaseAccessP(const observer::ObGlobalContext &gctx)
: das_factory_(CURRENT_CONTEXT->get_arena_allocator()),
exec_ctx_(CURRENT_CONTEXT->get_arena_allocator(), gctx.session_mgr_),
frame_info_(CURRENT_CONTEXT->get_arena_allocator()),
das_remote_info_()
{
set_preserve_recv_data();
RpcProcessor::set_preserve_recv_data();
}
virtual ~ObDASSyncAccessP() {}
virtual ~ObDASBaseAccessP() {}
virtual int init();
virtual int before_process();
virtual int process();
@ -56,7 +56,7 @@ public:
RLOCAL(ObDASTaskFactory*, g_das_fatory);
return g_das_fatory;
}
private:
protected:
ObDASTaskFactory das_factory_;
ObDesExecContext exec_ctx_;
ObExprFrameInfo frame_info_;
@ -64,6 +64,67 @@ private:
ObDASRemoteInfo das_remote_info_;
};
class ObDASSyncAccessP final : public ObDASBaseAccessP<obrpc::OB_DAS_SYNC_ACCESS> {
public:
typedef ObDASBaseAccessP<obrpc::OB_DAS_SYNC_ACCESS> ObDASSyncRpcProcessor;
ObDASSyncAccessP(const observer::ObGlobalContext &gctx)
: ObDASSyncRpcProcessor(gctx) {}
virtual ~ObDASSyncAccessP() {}
virtual int process();
};
class ObDASAsyncAccessP final : public ObDASBaseAccessP<obrpc::OB_DAS_ASYNC_ACCESS> {
public:
typedef ObDASBaseAccessP<obrpc::OB_DAS_ASYNC_ACCESS> ObDASAsyncRpcProcessor;
ObDASAsyncAccessP(const observer::ObGlobalContext &gctx)
: ObDASAsyncRpcProcessor(gctx) {}
virtual ~ObDASAsyncAccessP() {}
virtual int process();
};
class ObDasAsyncRpcCallBackContext
{
public:
ObDasAsyncRpcCallBackContext(ObDASRef &das_ref, const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops)
: das_ref_(das_ref), task_ops_(task_ops), alloc_() {}
~ObDasAsyncRpcCallBackContext() = default;
int init(const ObMemAttr &attr);
ObDASRef &get_das_ref() { return das_ref_; };
const common::ObSEArray<ObIDASTaskOp*, 2> &get_task_ops() const { return task_ops_; };
common::ObArenaAllocator &get_alloc() { return alloc_; };
private:
ObDASRef &das_ref_;
const common::ObSEArray<ObIDASTaskOp*, 2> task_ops_;
common::ObArenaAllocator alloc_; // used for async rpc result allocation.
};
class ObRpcDasAsyncAccessCallBack
: public obrpc::ObDASRpcProxy::AsyncCB<obrpc::OB_DAS_ASYNC_ACCESS>
{
public:
ObRpcDasAsyncAccessCallBack(ObDasAsyncRpcCallBackContext *context)
: context_(context)
{
// we need das_factory to allocate task op result on receiving rpc response.
result_.set_das_factory(&context->get_das_ref().get_das_factory());
}
~ObRpcDasAsyncAccessCallBack() = default;
void on_timeout() override;
void on_invalid() override;
void set_args(const Request &arg);
oceanbase::rpc::frame::ObReqTransport::AsyncCB *clone(
const oceanbase::rpc::frame::SPAlloc &alloc) const;
virtual int process();
const common::ObSEArray<ObIDASTaskResult*, 2> &get_op_results() const { return result_.get_op_results(); };
common::ObSEArray<ObIDASTaskResult*, 2> &get_op_results() { return result_.get_op_results(); };
const sql::ObDASTaskResp &get_task_resp() const { return result_; };
const common::ObSEArray<ObIDASTaskOp*, 2> &get_task_ops() const { return context_->get_task_ops(); };
common::ObIAllocator &get_result_alloc() { return context_->get_alloc(); }
ObDasAsyncRpcCallBackContext *get_async_cb_context() { return context_; };
private:
ObDasAsyncRpcCallBackContext *context_;
};
class ObDASSyncFetchP : public ObDASSyncFetchResRpcProcessor
{
public:

View File

@ -16,6 +16,7 @@
#include "share/ob_define.h"
#include "rpc/obrpc/ob_rpc_proxy.h"
#include "observer/ob_server_struct.h"
#include "share/rpc/ob_async_rpc_proxy.h"
namespace oceanbase
{
namespace obrpc
@ -31,7 +32,9 @@ public:
RPC_S(@PR5 sync_fetch_das_result, obrpc::OB_DAS_SYNC_FETCH_RESULT, (sql::ObDASDataFetchReq), sql::ObDASDataFetchRes);
// async rpc to erase das task result
RPC_AP(@PR5 async_erase_das_result, obrpc::OB_DAS_ASYNC_ERASE_RESULT, (sql::ObDASDataEraseReq));
RPC_AP(@PR5 das_async_access, obrpc::OB_DAS_ASYNC_ACCESS, (sql::ObDASTaskArg), sql::ObDASTaskResp);
};
} // namespace obrpc
} // namespace oceanbase
#endif /* OBDEV_SRC_SQL_DAS_OB_DAS_RPC_PROXY_H_ */

View File

@ -442,7 +442,7 @@ int ObDASScanOp::decode_task_result(ObIDASTaskResult *task_result)
//远程执行返回的TSC result,通过RPC回包带回给DAS Scheduler,
//如果结果集超过一个RPC,标记RPC包为has_more,剩余结果集通过DTL传输回DAS Scheduler
int ObDASScanOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more)
int ObDASScanOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit)
{
int ret = OB_SUCCESS;
bool added = false;
@ -512,7 +512,7 @@ int ObDASScanOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more)
// simulate a datum store overflow error, send the remaining result through RPC
has_more = true;
} else if (OB_UNLIKELY(OB_FAIL(datum_store.try_add_batch(result_output, &eval_ctx,
remain_row_cnt_, das::OB_DAS_MAX_PACKET_SIZE,
remain_row_cnt_, memory_limit,
added)))) {
LOG_WARN("try add row to datum store failed", K(ret));
} else if (!added) {
@ -528,6 +528,7 @@ int ObDASScanOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more)
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
memory_limit -= datum_store.get_mem_used();
return ret;
}
@ -703,8 +704,9 @@ int ObDASScanResult::init_result_iter(const ExprFixedArray *output_exprs, ObEval
return ret;
}
int ObDASScanResult::init(const ObIDASTaskOp &op)
int ObDASScanResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc)
{
UNUSED(alloc);
int ret = OB_SUCCESS;
const ObDASScanOp &scan_op = static_cast<const ObDASScanOp&>(op);
uint64_t tenant_id = MTL_ID();

View File

@ -154,7 +154,7 @@ public:
storage::ObTableScanParam &get_scan_param() { return scan_param_; }
const storage::ObTableScanParam &get_scan_param() const { return scan_param_; }
virtual int decode_task_result(ObIDASTaskResult *task_result) override;
virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more) override;
virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override;
virtual int fill_extra_result() override;
virtual int init_task_info() override { return common::OB_SUCCESS; }
virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override;
@ -212,7 +212,7 @@ class ObDASScanResult : public ObIDASTaskResult, public common::ObNewRowIterator
public:
ObDASScanResult();
virtual ~ObDASScanResult();
virtual int init(const ObIDASTaskOp &op) override;
virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override;
virtual int get_next_row(ObNewRow *&row) override;
virtual int get_next_row() override;
virtual int get_next_rows(int64_t &count, int64_t capacity) override;

View File

@ -79,7 +79,10 @@ OB_DEF_DESERIALIZE(ObDASRemoteInfo)
int ctdef_cnt = 0;
int rtdef_cnt = 0;
ObEvalCtx *eval_ctx = nullptr;
ObDASTaskFactory *das_factory = ObDASSyncAccessP::get_das_factory();
ObDASTaskFactory *das_factory =
ObDASAsyncAccessP::get_das_factory() != nullptr
? ObDASAsyncAccessP::get_das_factory()
: ObDASSyncAccessP::get_das_factory();
#if !defined(NDEBUG)
CK(typeid(*exec_ctx_) == typeid(ObDesExecContext));
#endif
@ -189,9 +192,20 @@ int ObIDASTaskOp::start_das_task()
}
}
}
// no need to advance state here because this function could be called on remote executor.
if (OB_FAIL(ret)) {
set_task_status(ObDasTaskStatus::FAILED);
} else {
set_task_status(ObDasTaskStatus::FINISHED);
}
return ret;
}
void ObIDASTaskOp::set_task_status(ObDasTaskStatus status)
{
task_status_ = status;
};
int ObIDASTaskOp::end_das_task()
{
int ret = OB_SUCCESS;
@ -252,7 +266,10 @@ OB_DEF_DESERIALIZE(ObDASTaskArg)
ObDASOpType op_type = DAS_OP_INVALID;
int64_t count = 0;
ObIDASTaskOp *task_op = nullptr;
ObDASTaskFactory *das_factory = ObDASSyncAccessP::get_das_factory();
ObDASTaskFactory *das_factory =
ObDASAsyncAccessP::get_das_factory() != nullptr
? ObDASAsyncAccessP::get_das_factory()
: ObDASSyncAccessP::get_das_factory();
CK(OB_NOT_NULL(das_factory));
LST_DO_CODE(OB_UNIS_DECODE,
timeout_ts_,
@ -302,23 +319,43 @@ OB_DEF_SERIALIZE_SIZE(ObDASTaskArg)
int ObDASTaskArg::add_task_op(ObIDASTaskOp *task_op)
{
// we currently only support single task per ObDASTaskArg.
// TODO(roland.qk):remove this assert check after we enable DAS task aggregation.
OB_ASSERT(task_ops_.count() == 0);
return task_ops_.push_back(task_op);
}
ObIDASTaskOp *ObDASTaskArg::get_task_op()
{
OB_ASSERT(task_ops_.count() == 1);
return task_ops_.at(0);
}
int ObIDASTaskOp::state_advance()
{
int ret = OB_SUCCESS;
OB_ASSERT(cur_agg_list_ != nullptr);
OB_ASSERT(task_status_ != ObDasTaskStatus::UNSTART);
if (task_status_ == ObDasTaskStatus::FINISHED) {
if (OB_FAIL(get_agg_tasks()->move_to_success_tasks(this))) {
LOG_WARN("failed to move task to success tasks", KR(ret));
}
} else if (task_status_ == ObDasTaskStatus::FAILED) {
if (OB_FAIL(get_agg_tasks()->move_to_failed_tasks(this))) {
LOG_WARN("failed to move task to success tasks", KR(ret));
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid task state",KR(ret), K_(task_status));
}
return ret;
}
ObDASTaskResp::ObDASTaskResp()
: has_more_(false),
ctrl_svr_(),
runner_svr_(),
op_results_()
op_results_(),
rcode_(),
trans_result_(),
das_factory_(nullptr),
rpc_rcode_(OB_SUCCESS)
{
}
@ -381,21 +418,23 @@ OB_DEF_DESERIALIZE(ObDASTaskResp)
{
int ret = OB_SUCCESS;
int64_t count = 0;
ObIDASTaskResult *op_result = nullptr;
LST_DO_CODE(OB_UNIS_DECODE,
has_more_,
ctrl_svr_,
runner_svr_);
if (OB_SUCC(ret) && OB_FAIL(serialization::decode_vi64(buf, data_len, pos, &count))) {
LOG_WARN("fail to decode ob array count", K(ret));
} else if (count != op_results_.count()) {
} else if (count > op_results_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("receive das task response count mismatch", K(count), K(op_results_.count()));
}
while (op_results_.count() > count) {
op_results_.pop_back();
}
OB_ASSERT(op_results_.count() == count);
for (int64_t i = 0; OB_SUCC(ret) && i < count; i++) {
if (OB_ISNULL(op_results_.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("op result is nullptr", K(ret), K_(op_results));
} else if (OB_FAIL(serialization::decode(buf, data_len, pos, *op_results_.at(i)))) {
if (OB_FAIL(serialization::decode(buf, data_len, pos, *op_results_.at(i)))) {
LOG_WARN("fail to decode array item", K(ret), K(i), K(count));
}
}
@ -424,18 +463,9 @@ OB_DEF_SERIALIZE_SIZE(ObDASTaskResp)
int ObDASTaskResp::add_op_result(ObIDASTaskResult *op_result)
{
// we currently only support single task per ObDASTaskResp.
// TODO(roland.qk):remove this assert check after we enable DAS task aggregation.
OB_ASSERT(op_results_.count() == 0);
return op_results_.push_back(op_result);
}
ObIDASTaskResult *ObDASTaskResp::get_op_result()
{
OB_ASSERT(op_results_.count() == 1);
return op_results_.at(0);
}
OB_SERIALIZE_MEMBER(ObIDASTaskResult, task_id_);
OB_SERIALIZE_MEMBER(ObDASDataFetchReq, tenant_id_, task_id_);

View File

@ -21,6 +21,7 @@
#include "storage/access/ob_dml_param.h"
#include "sql/engine/basic/ob_chunk_datum_store.h"
#include "lib/list/ob_obj_store.h"
#include "rpc/obrpc/ob_rpc_processor.h"
namespace oceanbase
{
namespace common
@ -34,6 +35,11 @@ class ObIDASTaskResult;
class ObDASExtraData;
class ObExprFrameInfo;
class ObDASScanOp;
class ObDASTaskFactory;
class ObDasAggregatedTasks;
typedef ObDLinkNode<ObIDASTaskOp*> DasTaskNode;
typedef ObDList<DasTaskNode> DasTaskLinkedList;
struct ObDASRemoteInfo
{
@ -78,8 +84,11 @@ public:
class ObIDASTaskOp
{
friend class ObDataAccessService;
friend class ObDASSyncAccessP;
template<obrpc::ObRpcPacketCode pcode>
friend class ObDASBaseAccessP;
friend class ObDASRef;
friend class ObRpcDasAsyncAccessCallBack;
friend class ObDataAccessService;
OB_UNIS_VERSION_V(1);
public:
ObIDASTaskOp(common::ObIAllocator &op_alloc)
@ -94,8 +103,14 @@ public:
op_alloc_(op_alloc),
related_ctdefs_(op_alloc),
related_rtdefs_(op_alloc),
related_tablet_ids_(op_alloc)
related_tablet_ids_(op_alloc),
task_status_(ObDasTaskStatus::UNSTART),
das_task_node_(),
agg_tasks_(nullptr),
cur_agg_list_(nullptr),
op_result_(nullptr)
{
das_task_node_.get_data() = this;
}
virtual ~ObIDASTaskOp() { }
@ -104,7 +119,7 @@ public:
void set_tablet_id(const common::ObTabletID &tablet_id) { tablet_id_ = tablet_id; }
const common::ObTabletID &get_tablet_id() const { return tablet_id_; }
void set_task_id(const int64_t task_id) { task_id_ = task_id; }
int64_t get_task_id() { return task_id_; }
int64_t get_task_id() const { return task_id_; }
void set_ls_id(const share::ObLSID &ls_id) { ls_id_ = ls_id; }
const share::ObLSID &get_ls_id() const { return ls_id_; }
void set_tablet_loc(const ObDASTabletLoc *tablet_loc) { tablet_loc_ = tablet_loc; }
@ -113,10 +128,11 @@ public:
virtual int decode_task_result(ObIDASTaskResult *task_result) = 0;
//远程执行填充第一个RPC结果,并返回是否还有剩余的RPC结果
virtual int fill_task_result(ObIDASTaskResult &task_result,
bool &has_more)
bool &has_more, int64_t &memory_limit)
{
UNUSED(task_result);
UNUSED(has_more);
UNUSED(memory_limit);
return OB_NOT_IMPLEMENT;
}
virtual int fill_extra_result()
@ -132,6 +148,9 @@ public:
DASRtDefFixedArray &get_related_rtdefs() { return related_rtdefs_; }
ObTabletIDFixedArray &get_related_tablet_ids() { return related_tablet_ids_; }
virtual int dump_data() const { return common::OB_SUCCESS; }
const DasTaskNode &get_node() const { return das_task_node_; };
DasTaskNode &get_node() { return das_task_node_; };
int get_errcode() const { return errcode_; };
VIRTUAL_TO_STRING_KV(K_(tenant_id),
K_(task_id),
K_(op_type),
@ -148,7 +167,9 @@ public:
KPC_(tablet_loc),
K_(related_ctdefs),
K_(related_rtdefs),
K_(related_tablet_ids));
K_(task_status),
K_(related_tablet_ids),
K_(das_task_node));
public:
void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; }
uint64_t get_tenant_id() const { return tenant_id_; }
@ -164,6 +185,22 @@ public:
bool is_in_retry() const { return in_part_retry_ || in_stmt_retry_; }
void set_need_switch_param(bool v) { need_switch_param_ = v; }
bool need_switch_param() const { return need_switch_param_; }
void set_task_status(ObDasTaskStatus status);
ObDasTaskStatus get_task_status() const { return task_status_; };
const ObDasAggregatedTasks *get_agg_tasks() const { return agg_tasks_; };
ObDasAggregatedTasks *get_agg_tasks() { return agg_tasks_; };
void set_agg_tasks(ObDasAggregatedTasks *agg_tasks)
{
OB_ASSERT(agg_tasks != nullptr);
OB_ASSERT(agg_tasks_ == nullptr);
agg_tasks_ = agg_tasks;
};
// NOT THREAD SAFE. We only advance state on das controller.
int state_advance();
void set_cur_agg_list(DasTaskLinkedList *list) { cur_agg_list_ = list; };
DasTaskLinkedList *get_cur_agg_list() { return cur_agg_list_; };
void set_remote_op_result(ObIDASTaskResult *op_result) { op_result_ = op_result; };
protected:
int start_das_task();
int end_das_task();
@ -200,6 +237,11 @@ protected:
DASCtDefFixedArray related_ctdefs_;
DASRtDefFixedArray related_rtdefs_;
ObTabletIDFixedArray related_tablet_ids_;
ObDasTaskStatus task_status_; // do not serialize
DasTaskNode das_task_node_; // tasks's linked list node, do not serialize
ObDasAggregatedTasks *agg_tasks_; // task's agg task, do not serialize
DasTaskLinkedList *cur_agg_list_; // task's agg_list, do not serialize
ObIDASTaskResult* op_result_; // as async result, do not serialize
public:
const static uint32_t DAS_ROW_EXTEND_SIZE = 16;
@ -213,7 +255,7 @@ class ObIDASTaskResult
public:
ObIDASTaskResult() : task_id_(0) { }
virtual ~ObIDASTaskResult() { }
virtual int init(const ObIDASTaskOp &task_op) = 0;
virtual int init(const ObIDASTaskOp &task_op, common::ObIAllocator &alloc) = 0;
virtual int link_extra_result(ObDASExtraData &extra_result)
{
UNUSED(extra_result);
@ -278,6 +320,8 @@ public:
int add_task_op(ObIDASTaskOp *task_op);
ObIDASTaskOp *get_task_op();
const common::ObSEArray<ObIDASTaskOp*, 2> &get_task_ops() const { return task_ops_; };
common::ObSEArray<ObIDASTaskOp*, 2> &get_task_ops() { return task_ops_; };
void set_remote_info(ObDASRemoteInfo *remote_info) { remote_info_ = remote_info; }
ObDASRemoteInfo *get_remote_info() { return remote_info_; }
common::ObAddr &get_runner_svr() { return runner_svr_; }
@ -305,7 +349,8 @@ class ObDASTaskResp
public:
ObDASTaskResp();
int add_op_result(ObIDASTaskResult *op_result);
ObIDASTaskResult *get_op_result();
const common::ObSEArray<ObIDASTaskResult*, 2> &get_op_results() const { return op_results_; };
common::ObSEArray<ObIDASTaskResult*, 2> &get_op_results() { return op_results_; };
void set_err_code(int err_code) { rcode_.rcode_ = err_code; }
int get_err_code() const { return rcode_.rcode_; }
const obrpc::ObRpcResultCode &get_rcode() const { return rcode_; }
@ -316,8 +361,12 @@ public:
bool has_more() const { return has_more_; }
void set_ctrl_svr(const common::ObAddr &ctrl_svr) { ctrl_svr_ = ctrl_svr; }
void set_runner_svr(const common::ObAddr &runner_svr) { runner_svr_ = runner_svr; }
common::ObAddr get_runner_svr() { return runner_svr_; }
common::ObAddr get_runner_svr() const { return runner_svr_; }
transaction::ObTxExecResult &get_trans_result() { return trans_result_; }
const transaction::ObTxExecResult &get_trans_result() const { return trans_result_; }
void set_das_factory(ObDASTaskFactory *das_factory) { das_factory_ = das_factory; };
void set_rpc_rcode(int rcode) { rpc_rcode_ = rcode; };
int get_rpc_rcode() const { return rpc_rcode_; };
TO_STRING_KV(K_(has_more),
K_(ctrl_svr),
K_(runner_svr),
@ -331,6 +380,8 @@ private:
common::ObSEArray<ObIDASTaskResult*, 2> op_results_; // 对应operation的结果信息,这是一个接口类,具体的定义由DML Service解析
obrpc::ObRpcResultCode rcode_; //返回的错误信息
transaction::ObTxExecResult trans_result_;
ObDASTaskFactory *das_factory_; // no need to serialize
int rpc_rcode_; // store async remote rpc error code. no need to serialize
};
template <typename T>

View File

@ -279,7 +279,9 @@ int ObDASIndexDMLAdaptor<DAS_OP_TABLE_UPDATE, ObDASUpdIterator>::write_rows(cons
ObDASUpdateOp::ObDASUpdateOp(ObIAllocator &op_alloc)
: ObIDASTaskOp(op_alloc),
upd_ctdef_(nullptr),
upd_rtdef_(nullptr)
upd_rtdef_(nullptr),
write_buffer_(),
affected_rows_(0)
{
}
@ -305,6 +307,7 @@ int ObDASUpdateOp::open_op()
}
} else {
upd_rtdef_->affected_rows_ += affected_rows;
affected_rows_ = affected_rows;
}
return ret;
}
@ -329,15 +332,16 @@ int ObDASUpdateOp::decode_task_result(ObIDASTaskResult *task_result)
return ret;
}
int ObDASUpdateOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more)
int ObDASUpdateOp::fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit)
{
int ret = OB_SUCCESS;
UNUSED(memory_limit);
#if !defined(NDEBUG)
CK(typeid(task_result) == typeid(ObDASUpdateResult));
#endif
if (OB_SUCC(ret)) {
ObDASUpdateResult &del_result = static_cast<ObDASUpdateResult&>(task_result);
del_result.set_affected_rows(upd_rtdef_->affected_rows_);
del_result.set_affected_rows(affected_rows_);
has_more = false;
}
return ret;
@ -392,9 +396,10 @@ ObDASUpdateResult::~ObDASUpdateResult()
{
}
int ObDASUpdateResult::init(const ObIDASTaskOp &op)
int ObDASUpdateResult::init(const ObIDASTaskOp &op, common::ObIAllocator &alloc)
{
UNUSED(op);
UNUSED(alloc);
return OB_SUCCESS;
}

View File

@ -30,7 +30,7 @@ public:
virtual int open_op() override;
virtual int release_op() override;
virtual int decode_task_result(ObIDASTaskResult *task_result) override;
virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more) override;
virtual int fill_task_result(ObIDASTaskResult &task_result, bool &has_more, int64_t &memory_limit) override;
virtual int init_task_info() override;
virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override;
virtual const ObDASBaseCtDef *get_ctdef() const override { return upd_ctdef_; }
@ -50,6 +50,7 @@ private:
const ObDASUpdCtDef *upd_ctdef_;
ObDASUpdRtDef *upd_rtdef_;
ObDASWriteBuffer write_buffer_;
int64_t affected_rows_; // local execute result, no need to serialize
};
class ObDASUpdateResult : public ObIDASTaskResult
@ -58,7 +59,7 @@ class ObDASUpdateResult : public ObIDASTaskResult
public:
ObDASUpdateResult();
virtual ~ObDASUpdateResult();
virtual int init(const ObIDASTaskOp &op) override;
virtual int init(const ObIDASTaskOp &op, common::ObIAllocator &alloc) override;
int64_t get_affected_rows() const { return affected_rows_; }
void set_affected_rows(int64_t affected_rows) { affected_rows_ = affected_rows; }
INHERIT_TO_STRING_KV("ObIDASTaskResult", ObIDASTaskResult,

View File

@ -16,6 +16,7 @@
#include "sql/das/ob_das_define.h"
#include "sql/das/ob_das_extra_data.h"
#include "sql/das/ob_das_ref.h"
#include "sql/das/ob_das_rpc_processor.h"
#include "sql/das/ob_das_utils.h"
#include "sql/ob_phy_table_location.h"
#include "sql/engine/ob_exec_context.h"
@ -28,6 +29,16 @@ using namespace common;
using namespace transaction;
namespace sql
{
ObDataAccessService::ObDataAccessService()
: das_rpc_proxy_(),
ctrl_addr_(),
id_cache_(),
task_result_mgr_(),
das_concurrency_limit_(INT32_MAX)
{
}
ObDataAccessService &ObDataAccessService::get_instance()
{
static ObDataAccessService instance;
@ -70,29 +81,33 @@ int ObDataAccessService::init(rpc::frame::ObReqTransport *transport, const ObAdd
return ret;
}
int ObDataAccessService::execute_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op)
{
int ObDataAccessService::execute_das_task(
ObDASRef &das_ref, ObDasAggregatedTasks &task_ops, bool async) {
int ret = OB_SUCCESS;
if (OB_LIKELY(das_ref.is_execute_directly())) {
common::ObSEArray<ObIDASTaskOp *, 2> task_wrapper;
NG_TRACE(do_local_das_task_begin);
FLTSpanGuard(do_local_das_task);
if (OB_FAIL(task_op.start_das_task())) {
LOG_WARN("start das task failed", K(ret), K(task_op));
while (OB_SUCC(ret) && OB_SUCC(task_ops.get_aggregated_tasks(task_wrapper)) &&
task_wrapper.count() != 0) {
for (int i = 0; OB_SUCC(ret) && i < task_wrapper.count(); i++) {
if (OB_FAIL(task_wrapper.at(i)->start_das_task())) {
int tmp_ret = OB_SUCCESS;
LOG_WARN("start das task failed", K(ret), K(*task_wrapper.at(i)));
if (OB_TMP_FAIL(task_wrapper.at(i)->state_advance())) {
LOG_WARN("failed to advance das task state.",K(ret));
}
} else {
if (OB_FAIL(task_wrapper.at(i)->state_advance())) {
LOG_WARN("failed to advance das task state.",K(ret));
}
}
}
task_wrapper.reuse();
}
NG_TRACE(do_local_das_task_end);
} else {
ret = execute_dist_das_task(das_ref, task_op);
task_op.errcode_ = ret;
}
OB_ASSERT(task_op.errcode_ == ret);
if (OB_FAIL(ret) && GCONF._enable_partition_level_retry && task_op.can_part_retry()) {
//only fast select can be retry with partition level
int tmp_ret = retry_das_task(das_ref, task_op);
if (OB_SUCCESS == tmp_ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to retry das task", K(tmp_ret));
}
} else if (OB_FAIL(execute_dist_das_task(das_ref, task_ops, async))) {
LOG_WARN("failed to execute dist das task", K(ret));
}
return ret;
}
@ -132,25 +147,58 @@ int ObDataAccessService::get_das_task_id(int64_t &das_id)
return ret;
}
OB_NOINLINE int ObDataAccessService::execute_dist_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op)
void ObDataAccessService::calc_das_task_parallelism(const ObDASRef &das_ref,
const ObDasAggregatedTasks &task_ops, int &target_parallelism)
{
// const int divide_async_task_threshold = 1024;
// if (task_ops.get_unstart_task_size() > divide_async_task_threshold) {
// target_parallelism = ceil(static_cast<double>(das_ref.get_max_concurrency()) / das_ref.get_aggregated_tasks_count());
// } else {
// target_parallelism = 1;
// }
target_parallelism = 1;
}
OB_NOINLINE int ObDataAccessService::execute_dist_das_task(
ObDASRef &das_ref, ObDasAggregatedTasks &task_ops, bool async) {
int ret = OB_SUCCESS;
ObExecContext &exec_ctx = das_ref.get_exec_ctx();
ObSQLSessionInfo *session = exec_ctx.get_my_session();
ObDASTaskArg task_arg;
if (OB_FAIL(task_arg.add_task_op(&task_op))) {
LOG_WARN("failed to add das task op", K(ret), K(task_op));
} else {
task_arg.set_timeout_ts(session->get_query_timeout_ts());
task_arg.set_ctrl_svr(ctrl_addr_);
task_arg.get_runner_svr() = task_op.tablet_loc_->server_;
if (task_arg.is_local_task()) {
if (OB_FAIL(do_local_das_task(das_ref, task_arg))) {
LOG_WARN("do local das task failed", K(ret), K(task_op));
task_arg.get_runner_svr() = task_ops.server_;
int target_parallelism = 0;
calc_das_task_parallelism(das_ref, task_ops, target_parallelism);
common::ObSEArray<common::ObSEArray<ObIDASTaskOp *, 2>, 2> task_groups;
if (OB_FAIL(task_ops.get_aggregated_tasks(task_groups, target_parallelism))) {
LOG_WARN("failed to get das task groups", K(ret));
}
} else if (OB_FAIL(do_remote_das_task(das_ref, task_arg))) {
OB_ASSERT(target_parallelism >= task_groups.count());
LOG_DEBUG("current dist das task group", K(ret), K(task_groups), K(task_ops));
for (int i = 0; OB_SUCC(ret) && i < task_groups.count(); i++) {
task_arg.get_task_ops() = task_groups.at(i);
if (OB_FAIL(task_arg.get_task_ops().get_copy_assign_ret())) {
LOG_WARN("failed to copy das task", K(ret));
} else if (task_arg.is_local_task()) {
if (OB_FAIL(do_local_das_task(das_ref, task_arg))) {
LOG_WARN("do local das task failed", K(ret), K(task_arg));
}
} else if (OB_FAIL(das_ref.acquire_task_execution_resource())) {
LOG_WARN("failed to acquire execution resource", K(ret));
} else {
if (async) {
if (OB_FAIL(do_async_remote_das_task(das_ref, task_ops, task_arg))) {
das_ref.inc_concurrency_limit();
LOG_WARN("do remote das task failed", K(ret));
}
} else {
if (OB_FAIL(do_sync_remote_das_task(das_ref, task_ops, task_arg))) {
LOG_WARN("do remote das task failed", K(ret));
}
}
}
task_arg.get_task_ops().reuse();
}
return ret;
}
@ -182,8 +230,12 @@ int ObDataAccessService::refresh_partition_location(ObDASRef &das_ref, ObIDASTas
int ObDataAccessService::retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op)
{
int ret = task_op.errcode_;
while (!is_virtual_table(task_op.get_ref_table_id()) &&
(is_master_changed_error(ret) || is_partition_change_error(ret) || OB_REPLICA_NOT_READABLE == ret)) {
ObArenaAllocator tmp_alloc;
ObDasAggregatedTasks das_task_wrapper(tmp_alloc);
while ((is_master_changed_error(ret) ||
is_partition_change_error(ret) ||
OB_REPLICA_NOT_READABLE == ret)
&& !is_virtual_table(task_op.get_ref_table_id())) {
if (!can_fast_fail(task_op)) {
task_op.in_part_retry_ = true;
das_ref.get_exec_ctx().get_my_session()->set_session_in_retry(true, ret);
@ -193,17 +245,31 @@ int ObDataAccessService::retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op
LOG_WARN("query is timeout, terminate retry", K(ret));
} else if (OB_FAIL(refresh_partition_location(das_ref, task_op))) {
LOG_WARN("refresh partition location failed", K(ret));
} else if (OB_FAIL(execute_dist_das_task(das_ref, task_op))) {
} else if (FALSE_IT(das_task_wrapper.reuse())) {
} else if (FALSE_IT(task_op.set_task_status(ObDasTaskStatus::UNSTART))) {
} else if (OB_FAIL(das_task_wrapper.push_back_task(&task_op))) {
LOG_WARN("failed to push back task", K(ret));
} else if (OB_FAIL(execute_dist_das_task(das_ref, das_task_wrapper, false))) {
task_op.errcode_ = ret;
LOG_WARN("execute dist das task failed", K(ret));
} else {
LOG_DEBUG("retry das task success!", K(task_op));
}
} else {
break;
}
}
if (OB_FAIL(ret)) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(task_op.state_advance())) {
LOG_WARN("failed to reset das task to original agg list.", K(ret));
}
}
OB_ASSERT(das_task_wrapper.has_unstart_tasks() == false &&
das_task_wrapper.success_tasks_.get_size() == 0);
return ret;
}
bool ObDataAccessService::can_fast_fail(const ObIDASTaskOp &task_op) const
{
bool bret = false;
@ -247,11 +313,17 @@ int ObDataAccessService::rescan_das_task(ObDASRef &das_ref, ObDASScanOp &scan_op
int ret = OB_SUCCESS;
NG_TRACE(rescan_das_task_begin);
FLTSpanGuard(rescan_das_task);
ObArenaAllocator tmp_alloc;
ObDasAggregatedTasks das_task_wrapper(tmp_alloc);
if (scan_op.is_local_task()) {
if (OB_FAIL(scan_op.rescan())) {
LOG_WARN("rescan das task failed", K(ret));
}
} else if (OB_FAIL(execute_dist_das_task(das_ref, scan_op))) {
} else if (OB_FAIL(das_task_wrapper.push_back_task(&scan_op))) {
LOG_WARN("failed to push back", K(ret));
} else if (OB_FAIL(execute_dist_das_task(das_ref, das_task_wrapper, false))) {
scan_op.errcode_ = ret;
LOG_WARN("execute dist das task failed", K(ret));
}
OB_ASSERT(scan_op.errcode_ == ret);
@ -268,32 +340,128 @@ int ObDataAccessService::rescan_das_task(ObDASRef &das_ref, ObDASScanOp &scan_op
return ret;
}
int ObDataAccessService::do_local_das_task(ObDASRef &das_ref, ObDASTaskArg &task_arg)
{
int ObDataAccessService::do_local_das_task(ObDASRef &das_ref,
ObDASTaskArg &task_arg) {
UNUSED(das_ref);
int ret = OB_SUCCESS;
LOG_DEBUG("begin to do local das task", K(task_arg));
ObIDASTaskOp *task_op = task_arg.get_task_op();
NG_TRACE(do_local_das_task_begin);
FLTSpanGuard(do_local_das_task);
if (OB_FAIL(task_op->start_das_task())) {
const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops = task_arg.get_task_ops();
LOG_DEBUG("begin to do local das task", K(task_arg));
for (int64_t i = 0; OB_SUCC(ret) && i < task_ops.count(); i++) {
if (OB_FAIL(task_ops.at(i)->start_das_task())) {
LOG_WARN("start local das task failed", K(ret));
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(task_ops.at(i)->state_advance())) {
LOG_WARN("failed to advance das task state.",K(ret));
}
break;
} else {
if (OB_FAIL(task_ops.at(i)->state_advance())) {
LOG_WARN("failed to advance das task state.",K(ret));
}
}
}
NG_TRACE(do_local_das_task_end);
return ret;
}
int ObDataAccessService::do_remote_das_task(ObDASRef &das_ref, ObDASTaskArg &task_arg)
{
int ObDataAccessService::do_async_remote_das_task(
ObDASRef &das_ref, ObDasAggregatedTasks &aggregated_tasks,
ObDASTaskArg &task_arg) {
int ret = OB_SUCCESS;
void *resp_buf = nullptr;
NG_TRACE(do_remote_das_task_begin);
FLTSpanGuard(do_remote_das_task);
NG_TRACE(do_async_remote_das_task_begin);
FLTSpanGuard(do_async_remote_das_task);
ObSQLSessionInfo *session = das_ref.get_exec_ctx().get_my_session();
ObPhysicalPlanCtx *plan_ctx = das_ref.get_exec_ctx().get_physical_plan_ctx();
int64_t timeout = plan_ctx->get_timeout_timestamp() - ObTimeUtility::current_time();
#ifdef ERRSIM
int inject_timeout = -E(EventTable::EN_DAS_SIMULATE_ASYNC_RPC_TIMEOUT) OB_SUCCESS;
if (OB_SUCCESS != inject_timeout) {
LOG_INFO("das async rpc simulate timeout", K(inject_timeout));
timeout = inject_timeout - 10;
}
#endif
uint64_t tenant_id = session->get_rpc_tenant_id();
ObIDASTaskOp *task_op = task_arg.get_task_op();
common::ObSEArray<ObIDASTaskOp*, 2> &task_ops = task_arg.get_task_ops();
ObDASRemoteInfo remote_info;
remote_info.exec_ctx_ = &das_ref.get_exec_ctx();
remote_info.frame_info_ = das_ref.get_expr_frame_info();
remote_info.trans_desc_ = session->get_tx_desc();
remote_info.snapshot_ = *task_op->get_snapshot();
remote_info.need_tx_ = (remote_info.trans_desc_ != nullptr);
task_arg.set_remote_info(&remote_info);
ObDASRemoteInfo::get_remote_info() = &remote_info;
ObIDASTaskResult *op_result = nullptr;
ObRpcDasAsyncAccessCallBack *das_async_cb = nullptr;
if (OB_FAIL(das_ref.allocate_async_das_cb(das_async_cb, task_ops))) {
LOG_WARN("failed to allocate das async cb", K(ret));
}
// prepare op result in advance avoiding racing condition.
for (int64_t i = 0; OB_SUCC(ret) && i < task_ops.count(); i++) {
if (OB_FAIL(das_ref.get_das_factory().create_das_task_result(task_ops.at(i)->get_type(), op_result))) {
LOG_WARN("failed to create das task result", K(ret));
} else if (OB_ISNULL(op_result)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get op result", K(ret));
} else if (OB_FAIL(das_async_cb->get_op_results().push_back(op_result))) {
LOG_WARN("failed to add task result", K(ret));
} else if (OB_FAIL(op_result->init(*task_ops.at(i), das_async_cb->get_result_alloc()))) {
LOG_WARN("failed to init task result", K(ret));
}
}
LOG_DEBUG("begin to do remote das task", K(task_arg));
if (OB_FAIL(ret)) {
} else if (OB_FAIL(collect_das_task_info(task_arg, remote_info))) {
LOG_WARN("collect das task info failed", K(ret));
} else if (OB_UNLIKELY(timeout <= 0)) {
ret = OB_TIMEOUT;
LOG_WARN("das is timeout", K(ret), K(plan_ctx->get_timeout_timestamp()), K(timeout));
} else if (OB_FAIL(das_rpc_proxy_
.to(task_arg.get_runner_svr())
.by(tenant_id)
.timeout(timeout)
.das_async_access(task_arg, das_async_cb))) {
LOG_WARN("rpc remote sync access failed", K(ret), K(task_arg));
// RPC fail, add task's LSID to trans_result
// indicate some transaction participant may touched
for (int i = 0; i < task_ops.count(); i++) {
session->get_trans_result().add_touched_ls(task_ops.at(i)->get_ls_id());
}
}
if (OB_FAIL(ret)) {
if (nullptr != das_async_cb) {
das_ref.remove_async_das_cb(das_async_cb);
}
for (int i = 0; i < task_ops.count(); i++) {
task_ops.at(i)->errcode_ = ret;
task_ops.at(i)->set_task_status(ObDasTaskStatus::FAILED);
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(task_ops.at(i)->state_advance())) {
LOG_WARN("failed to advance das task state", K(tmp_ret));
}
}
}
NG_TRACE_EXT(do_async_remote_das_task_end, OB_Y(ret), OB_ID(addr), task_arg.get_runner_svr());
return ret;
}
int ObDataAccessService::do_sync_remote_das_task(
ObDASRef &das_ref, ObDasAggregatedTasks &aggregated_tasks,
ObDASTaskArg &task_arg) {
int ret = OB_SUCCESS;
void *resp_buf = nullptr;
NG_TRACE(do_sync_remote_das_task_begin);
FLTSpanGuard(do_sync_remote_das_task);
ObSQLSessionInfo *session = das_ref.get_exec_ctx().get_my_session();
ObPhysicalPlanCtx *plan_ctx = das_ref.get_exec_ctx().get_physical_plan_ctx();
int64_t timeout = plan_ctx->get_timeout_timestamp() - ObTimeUtility::current_time();
uint64_t tenant_id = session->get_rpc_tenant_id();
ObIDASTaskOp *task_op = task_arg.get_task_op();
common::ObSEArray<ObIDASTaskOp*, 2> &task_ops = task_arg.get_task_ops();
ObIDASTaskResult *op_result = nullptr;
ObDASExtraData *extra_result = nullptr;
ObDASRemoteInfo remote_info;
@ -307,17 +475,28 @@ int ObDataAccessService::do_remote_das_task(ObDASRef &das_ref, ObDASTaskArg &tas
LOG_DEBUG("begin to do remote das task", K(task_arg));
SMART_VAR(ObDASTaskResp, task_resp) {
if (OB_FAIL(collect_das_task_info(task_arg, remote_info))) {
// we need das_factory to allocate task op result on receiving rpc response.
task_resp.set_das_factory(&das_ref.get_das_factory());
// prepare op result in advance avoiding racing condition.
for (int64_t i = 0; OB_SUCC(ret) && i < task_ops.count(); i++) {
if (OB_FAIL(das_ref.get_das_factory().create_das_task_result(task_ops.at(i)->get_type(), op_result))) {
LOG_WARN("failed to create das task result", K(ret));
} else if (OB_ISNULL(op_result)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get op result", K(ret));
} else if (OB_FAIL(task_resp.get_op_results().push_back(op_result))) {
LOG_WARN("failed to add task result", K(ret));
} else if (OB_FAIL(op_result->init(*task_ops.at(i), das_ref.get_das_alloc()))) {
LOG_WARN("failed to init task result", K(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(collect_das_task_info(task_arg, remote_info))) {
LOG_WARN("collect das task info failed", K(ret));
} else if (OB_FAIL(das_ref.get_das_factory().create_das_task_result(task_op->get_type(), op_result))) {
LOG_WARN("create das task result failed", K(ret));
} else if (OB_FAIL(op_result->init(*task_op))) {
LOG_WARN("init task result failed", K(ret));
} else if (OB_UNLIKELY(timeout <= 0)) {
ret = OB_TIMEOUT;
LOG_WARN("das is timeout", K(ret), K(plan_ctx->get_timeout_timestamp()), K(timeout));
} else if (OB_FAIL(task_resp.add_op_result(op_result))) {
LOG_WARN("failed to add op result", K(ret));
} else if (OB_FAIL(das_rpc_proxy_
.to(task_arg.get_runner_svr())
.by(tenant_id)
@ -326,20 +505,92 @@ int ObDataAccessService::do_remote_das_task(ObDASRef &das_ref, ObDASTaskArg &tas
LOG_WARN("rpc remote sync access failed", K(ret), K(task_arg));
// RPC fail, add task's LSID to trans_result
// indicate some transaction participant may touched
session->get_trans_result().add_touched_ls(task_op->get_ls_id());
} else {
for (int i = 0; i < task_ops.count(); i++) {
session->get_trans_result().add_touched_ls(task_ops.at(i)->get_ls_id());
}
}
if (OB_FAIL(ret)) {
for (int i = 0; i < task_ops.count(); i++) {
task_ops.at(i)->errcode_ = ret;
task_ops.at(i)->set_task_status(ObDasTaskStatus::FAILED);
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(task_ops.at(i)->state_advance())) {
LOG_WARN("failed to advance das task state", K(tmp_ret));
}
}
} else if (OB_FAIL(process_task_resp(das_ref, task_resp, task_ops))) {
LOG_WARN("failed to process task resp", K(ret), K(task_arg), K(task_resp));
}
}
das_ref.inc_concurrency_limit();
NG_TRACE_EXT(do_sync_remote_das_task_end, OB_Y(ret), OB_ID(addr), task_arg.get_runner_svr());
return ret;
}
int ObDataAccessService::process_task_resp(ObDASRef &das_ref, const ObDASTaskResp &task_resp, const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops)
{
int ret = OB_SUCCESS;
int save_ret = OB_SUCCESS;
ObSQLSessionInfo *session = das_ref.get_exec_ctx().get_my_session();
ObIDASTaskOp *task_op = nullptr;
ObIDASTaskResult *op_result = nullptr;
ObDASExtraData *extra_result = nullptr;
const common::ObSEArray<ObIDASTaskResult*, 2> &op_results = task_resp.get_op_results();
ObDASUtils::log_user_error_and_warn(task_resp.get_rcode());
if (OB_FAIL(task_resp.get_err_code())) {
LOG_WARN("error occurring in remote das task", K(ret), K(task_arg));
} else if (OB_FAIL(task_op->decode_task_result(op_result))) {
for (int i = 0; i < op_results.count() - 1; i++) {
// even if error happened durning iteration, we should iter to the end.
task_op = task_ops.at(i);
op_result = op_results.at(i);
OB_ASSERT(op_result != nullptr);
if (OB_FAIL(task_op->decode_task_result(op_result))) {
task_op->set_task_status(ObDasTaskStatus::FAILED);
task_op->errcode_ = ret;
LOG_WARN("decode das task result failed", K(ret));
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(task_op->state_advance())) {
LOG_WARN("failed to advance das task state.",K(ret));
}
} else {
task_op->set_task_status(ObDasTaskStatus::FINISHED);
if (OB_FAIL(task_op->state_advance())) {
LOG_WARN("failed to advance das task state.",K(ret));
}
}
}
// special check for last valid op result
if (OB_UNLIKELY(op_results.count() == 0)) {
// no op response. first task op must have error.
task_op = task_ops.at(0);
op_result = nullptr;
OB_ASSERT(task_resp.get_err_code() != OB_SUCCESS);
} else {
task_op = task_ops.at(op_results.count() - 1);
op_result = op_results.at(op_results.count() - 1);
}
if (OB_FAIL(ret)) {
save_ret = ret;
ret = OB_SUCCESS;
}
// task_resp's error code indicate the last valid op result.
if (OB_FAIL(task_resp.get_err_code())) {
LOG_WARN("error occurring in remote das task", K(ret));
OB_ASSERT(op_results.count() <= task_ops.count());
} else {
// decode last op result
if (OB_FAIL(task_op->decode_task_result(op_result))) {
LOG_WARN("decode das task result failed", K(ret));
}
}
// has_more flag only take effect on last valid op result.
if (OB_FAIL(ret)) {
} else if (task_resp.has_more()
&& OB_FAIL(setup_extra_result(das_ref, task_resp,
task_op, extra_result))) {
&& OB_FAIL(setup_extra_result(das_ref, task_resp, task_op, extra_result))) {
LOG_WARN("setup extra result failed", KR(ret));
} else if (task_resp.has_more() && OB_FAIL(op_result->link_extra_result(*extra_result))) {
LOG_WARN("link extra result failed", K(ret));
}
// even if trans result have a failure. It only take effect on the last valid op result.
if (OB_NOT_NULL(session->get_tx_desc())) {
int tmp_ret = MTL(transaction::ObTransService*)
->add_tx_exec_result(*session->get_tx_desc(),
@ -349,16 +600,35 @@ int ObDataAccessService::do_remote_das_task(ObDASRef &das_ref, ObDASTaskArg &tas
}
ret = COVER_SUCC(tmp_ret);
}
if (OB_FAIL(ret)) {
// if error happened, it must be the last.
task_op->set_task_status(ObDasTaskStatus::FAILED);
task_op->errcode_ = ret;
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(task_op->state_advance())) {
LOG_WARN("failed to advance das task state.",K(ret));
}
} else {
// if no error happened, all tasks were executed successfully.
task_op->set_task_status(ObDasTaskStatus::FINISHED);
if (OB_FAIL(task_op->state_advance())) {
LOG_WARN("failed to advance das task state.",K(ret));
}
if (OB_UNLIKELY(OB_SUCCESS != save_ret)) {
ret = COVER_SUCC(save_ret);
}
}
NG_TRACE_EXT(do_remote_das_task_end, OB_Y(ret), OB_ID(addr), task_arg.get_runner_svr());
return ret;
}
int ObDataAccessService::collect_das_task_info(ObDASTaskArg &task_arg, ObDASRemoteInfo &remote_info)
{
int ret = OB_SUCCESS;
ObIDASTaskOp *task_op = task_arg.get_task_op();
common::ObSEArray<ObIDASTaskOp*, 2> &task_ops = task_arg.get_task_ops();
ObIDASTaskOp *task_op = nullptr;
for (int i = 0; OB_SUCC(ret) && i < task_ops.count(); i++) {
task_op = task_ops.at(i);
if (task_op->get_ctdef() != nullptr) {
remote_info.has_expr_ |= task_op->get_ctdef()->has_expr();
remote_info.need_calc_expr_ |= task_op->get_ctdef()->has_pdfilter_or_calc_expr();
@ -379,12 +649,13 @@ int ObDataAccessService::collect_das_task_info(ObDASTaskArg &task_arg, ObDASRemo
LOG_WARN("append task op related rtdefs to remote info failed", K(ret));
}
}
}
return ret;
}
int ObDataAccessService::setup_extra_result(ObDASRef &das_ref,
ObDASTaskResp &task_resp,
ObIDASTaskOp *task_op,
const ObDASTaskResp &task_resp,
const ObIDASTaskOp *task_op,
ObDASExtraData *&extra_result)
{
int ret = OB_SUCCESS;
@ -406,5 +677,18 @@ int ObDataAccessService::setup_extra_result(ObDASRef &das_ref,
}
return ret;
}
void ObDataAccessService::set_max_concurrency(int32_t cpu_count)
{
const int32_t das_concurrent_upper_limit = 8;
const int32_t das_concurrent_factor = 5;
if (OB_UNLIKELY(!is_user_tenant(MTL_ID()))) {
das_concurrency_limit_ = 1;
} else {
das_concurrency_limit_ = min(cpu_count / das_concurrent_factor + 1, das_concurrent_upper_limit);
}
LOG_DEBUG("set current tenant's das max concurrency", K_(das_concurrency_limit), K(MTL_ID()));
}
} // namespace sql
} // namespace oceanbase

View File

@ -16,6 +16,7 @@
#include "sql/das/ob_das_rpc_proxy.h"
#include "sql/das/ob_das_id_cache.h"
#include "sql/das/ob_das_task_result.h"
#include "sql/das/ob_das_ref.h"
namespace oceanbase
{
namespace sql
@ -28,17 +29,15 @@ class ObDASExtraData;
class ObDataAccessService
{
public:
ObDataAccessService()
: das_rpc_proxy_(),
ctrl_addr_()
{ }
ObDataAccessService();
~ObDataAccessService() = default;
static int mtl_init(ObDataAccessService* &das);
static void mtl_destroy(ObDataAccessService* &das);
int init(rpc::frame::ObReqTransport *transport,
const common::ObAddr &self_addr);
//开启DAS Task分区相关的事务控制,并执行task对应的op
int execute_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op);
int execute_das_task(ObDASRef &das_ref,
ObDasAggregatedTasks &task_ops, bool async = true);
//关闭DAS Task的执行流程,并释放task持有的资源,并结束相关的事务控制
int end_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op);
int get_das_task_id(int64_t &das_id);
@ -46,24 +45,32 @@ public:
obrpc::ObDASRpcProxy &get_rpc_proxy() { return das_rpc_proxy_; }
ObDASTaskResultMgr &get_task_res_mgr() { return task_result_mgr_; }
static ObDataAccessService &get_instance();
const common::ObAddr &get_ctrl_addr() const { return ctrl_addr_; };
void set_max_concurrency(int32_t cpu_count);
int32_t get_das_concurrency_limit() const { return das_concurrency_limit_; };
int retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op);
int setup_extra_result(ObDASRef &das_ref,
const ObDASTaskResp &task_resp,
const ObIDASTaskOp *task_op,
ObDASExtraData *&extra_result);
int process_task_resp(ObDASRef &das_ref, const ObDASTaskResp &task_resp, const common::ObSEArray<ObIDASTaskOp*, 2> &task_ops);
private:
int execute_dist_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op);
int execute_dist_das_task(ObDASRef &das_ref,
ObDasAggregatedTasks &task_ops, bool async = true);
int clear_task_exec_env(ObDASRef &das_ref, ObIDASTaskOp &task_op);
int refresh_partition_location(ObDASRef &das_ref, ObIDASTaskOp &task_op);
int retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op);
int do_local_das_task(ObDASRef &das_ref, ObDASTaskArg &task_arg);
int do_remote_das_task(ObDASRef &das_ref, ObDASTaskArg &das_task);
int setup_extra_result(ObDASRef &das_ref,
ObDASTaskResp &task_resp,
ObIDASTaskOp *task_op,
ObDASExtraData *&extra_result);
int do_async_remote_das_task(ObDASRef &das_ref, ObDasAggregatedTasks &aggregated_tasks, ObDASTaskArg &task_arg);
int do_sync_remote_das_task(ObDASRef &das_ref, ObDasAggregatedTasks &aggregated_tasks, ObDASTaskArg &task_arg);
int collect_das_task_info(ObDASTaskArg &task_arg, ObDASRemoteInfo &remote_info);
bool can_fast_fail(const ObIDASTaskOp &task_op) const;
void calc_das_task_parallelism(const ObDASRef &das_ref, const ObDasAggregatedTasks &task_ops, int &target_parallelism);
private:
obrpc::ObDASRpcProxy das_rpc_proxy_;
common::ObAddr ctrl_addr_;
ObDASIDCache id_cache_;
ObDASTaskResultMgr task_result_mgr_;
int32_t das_concurrency_limit_;
};
} // namespace sql
} // namespace oceanbase

View File

@ -505,7 +505,8 @@ struct ObInsRtDef : ObDMLBaseRtDef
das_rtdef_(),
related_rtdefs_()
{ }
TO_STRING_KV(K_(das_rtdef),
INHERIT_TO_STRING_KV("ObDMLBaseRtDef", ObDMLBaseRtDef,
K_(das_rtdef),
K_(related_rtdefs));
ObDASInsRtDef das_rtdef_;
DASInsRtDefArray related_rtdefs_;