[obkv] fix for codescan
This commit is contained in:
@ -208,6 +208,7 @@ int ObTableRpcImpl::aggregate_tablet_by_server(const ObIArray<ObTabletLocation>
|
|||||||
int tmp_ret = ret;
|
int tmp_ret = ret;
|
||||||
FreeIdxArrayFunc free_idx_array_fn;
|
FreeIdxArrayFunc free_idx_array_fn;
|
||||||
if (OB_FAIL(server_tablet_map.foreach_refactored(free_idx_array_fn))) {
|
if (OB_FAIL(server_tablet_map.foreach_refactored(free_idx_array_fn))) {
|
||||||
|
//overwrite ret
|
||||||
LOG_WARN("failed to do foreach", K(ret));
|
LOG_WARN("failed to do foreach", K(ret));
|
||||||
}
|
}
|
||||||
ret = OB_SUCCESS == tmp_ret ? ret : tmp_ret;
|
ret = OB_SUCCESS == tmp_ret ? ret : tmp_ret;
|
||||||
|
@ -958,8 +958,7 @@ int ObTableServiceClientImpl::get_tablet_location(const ObString &table_name, co
|
|||||||
0, ObReplicaType::REPLICA_TYPE_FULL, ObReplicaProperty(),
|
0, ObReplicaType::REPLICA_TYPE_FULL, ObReplicaProperty(),
|
||||||
role_status, 1))) {
|
role_status, 1))) {
|
||||||
LOG_WARN("fail to init tablet replication location", K(ret));
|
LOG_WARN("fail to init tablet replication location", K(ret));
|
||||||
}
|
} else if (OB_FAIL(tablet_location.add_replica_location(fake_leader_loc))) {
|
||||||
if (OB_FAIL(tablet_location.add_replica_location(fake_leader_loc))) {
|
|
||||||
LOG_WARN("failed to push back", K(ret));
|
LOG_WARN("failed to push back", K(ret));
|
||||||
} else {
|
} else {
|
||||||
tablet_location.set_tenant_id(tenant_id_);
|
tablet_location.set_tenant_id(tenant_id_);
|
||||||
|
@ -126,7 +126,7 @@ uint64_t ObTableBatchExecuteP::get_request_checksum()
|
|||||||
int ObTableBatchExecuteP::response(const int retcode)
|
int ObTableBatchExecuteP::response(const int retcode)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (!need_retry_in_queue_ && !had_do_response()) {
|
if (!need_retry_in_queue_ && !is_async_response()) {
|
||||||
// For HKV table, modify the value of timetamp to be positive
|
// For HKV table, modify the value of timetamp to be positive
|
||||||
if (ObTableEntityType::ET_HKV == arg_.entity_type_) {
|
if (ObTableEntityType::ET_HKV == arg_.entity_type_) {
|
||||||
const int64_t N = result_.count();
|
const int64_t N = result_.count();
|
||||||
|
@ -315,7 +315,7 @@ uint64_t ObTableApiExecuteP::get_request_checksum()
|
|||||||
int ObTableApiExecuteP::response(const int retcode)
|
int ObTableApiExecuteP::response(const int retcode)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (!need_retry_in_queue_ && !had_do_response()) {
|
if (!need_retry_in_queue_ && !is_async_response()) {
|
||||||
if (OB_SUCC(ret) && ObTableEntityType::ET_HKV == arg_.entity_type_) {
|
if (OB_SUCC(ret) && ObTableEntityType::ET_HKV == arg_.entity_type_) {
|
||||||
// @note modify the value of timestamp to be positive
|
// @note modify the value of timestamp to be positive
|
||||||
ret = ObTableRpcProcessorUtil::negate_htable_timestamp(result_entity_);
|
ret = ObTableRpcProcessorUtil::negate_htable_timestamp(result_entity_);
|
||||||
@ -437,7 +437,7 @@ ObTableAPITransCb *ObTableApiExecuteP::new_callback(rpc::ObRequest *req)
|
|||||||
int ObTableApiExecuteP::before_response(int error_code)
|
int ObTableApiExecuteP::before_response(int error_code)
|
||||||
{
|
{
|
||||||
// NOTE: when check_timeout failed, the result.entity_ is null, and serialize result cause coredump
|
// NOTE: when check_timeout failed, the result.entity_ is null, and serialize result cause coredump
|
||||||
if (!had_do_response() && OB_ISNULL(result_.get_entity())) {
|
if (!is_async_response() && OB_ISNULL(result_.get_entity())) {
|
||||||
result_.set_entity(result_entity_);
|
result_.set_entity(result_entity_);
|
||||||
}
|
}
|
||||||
return ObTableRpcProcessor::before_response(error_code);
|
return ObTableRpcProcessor::before_response(error_code);
|
||||||
|
@ -667,6 +667,7 @@ int ObTableQuerySyncP::destory_query_session(bool need_rollback_trans)
|
|||||||
|
|
||||||
ObQuerySyncMgr::get_instance().get_locker(query_session_id_).lock();
|
ObQuerySyncMgr::get_instance().get_locker(query_session_id_).lock();
|
||||||
if (OB_ISNULL(query_session_)) {
|
if (OB_ISNULL(query_session_)) {
|
||||||
|
//overwrite ret
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("Unexpected null value", K(ret), KP_(query_session));
|
LOG_WARN("Unexpected null value", K(ret), KP_(query_session));
|
||||||
} else if (OB_FAIL(ObQuerySyncMgr::get_instance().get_query_session_map()->erase_refactored(query_session_id_))) {
|
} else if (OB_FAIL(ObQuerySyncMgr::get_instance().get_query_session_map()->erase_refactored(query_session_id_))) {
|
||||||
|
@ -249,7 +249,7 @@ ObTableApiProcessorBase::ObTableApiProcessorBase(const ObGlobalContext &gctx)
|
|||||||
need_retry_in_queue_(false),
|
need_retry_in_queue_(false),
|
||||||
retry_count_(0),
|
retry_count_(0),
|
||||||
trans_desc_(NULL),
|
trans_desc_(NULL),
|
||||||
had_do_response_(false),
|
is_async_response_(false),
|
||||||
user_client_addr_(),
|
user_client_addr_(),
|
||||||
sess_stat_guard_(MTL_ID(), ObActiveSessionGuard::get_stat().session_id_)
|
sess_stat_guard_(MTL_ID(), ObActiveSessionGuard::get_stat().session_id_)
|
||||||
{
|
{
|
||||||
@ -261,7 +261,7 @@ void ObTableApiProcessorBase::reset_ctx()
|
|||||||
{
|
{
|
||||||
trans_state_ptr_->reset();
|
trans_state_ptr_->reset();
|
||||||
trans_desc_ = NULL;
|
trans_desc_ = NULL;
|
||||||
had_do_response_ = false;
|
is_async_response_ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTableApiProcessorBase::get_ls_id(const ObTabletID &tablet_id, ObLSID &ls_id)
|
int ObTableApiProcessorBase::get_ls_id(const ObTabletID &tablet_id, ObLSID &ls_id)
|
||||||
@ -567,6 +567,7 @@ int ObTableApiProcessorBase::sync_end_trans_(bool is_rollback, transaction::ObTx
|
|||||||
|
|
||||||
int tmp_ret = ret;
|
int tmp_ret = ret;
|
||||||
if (OB_FAIL(txs->release_tx(*trans_desc))) {
|
if (OB_FAIL(txs->release_tx(*trans_desc))) {
|
||||||
|
//overwrite ret
|
||||||
LOG_ERROR("release tx failed", K(ret), KPC(trans_desc));
|
LOG_ERROR("release tx failed", K(ret), KPC(trans_desc));
|
||||||
}
|
}
|
||||||
if (lock_handle != nullptr) {
|
if (lock_handle != nullptr) {
|
||||||
@ -606,7 +607,7 @@ int ObTableApiProcessorBase::async_commit_trans(rpc::ObRequest *req, int64_t tim
|
|||||||
callback.callback(ret);
|
callback.callback(ret);
|
||||||
}
|
}
|
||||||
// ignore the return code of end_trans
|
// ignore the return code of end_trans
|
||||||
had_do_response_ = true; // don't send response in this worker thread
|
is_async_response_ = true; // don't send response in this worker thread
|
||||||
// @note the req_ may be freed, req_processor can not be read any more.
|
// @note the req_ may be freed, req_processor can not be read any more.
|
||||||
// The req_has_wokenup_ MUST set to be true, otherwise req_processor will invoke req_->set_process_start_end_diff, cause memory core
|
// The req_has_wokenup_ MUST set to be true, otherwise req_processor will invoke req_->set_process_start_end_diff, cause memory core
|
||||||
// @see ObReqProcessor::run() req_->set_process_start_end_diff(ObTimeUtility::current_time());
|
// @see ObReqProcessor::run() req_->set_process_start_end_diff(ObTimeUtility::current_time());
|
||||||
@ -931,7 +932,7 @@ int ObTableRpcProcessor<T>::process()
|
|||||||
if (OB_FAIL(process_with_retry(RpcProcessor::arg_.credential_, get_timeout_ts()))) {
|
if (OB_FAIL(process_with_retry(RpcProcessor::arg_.credential_, get_timeout_ts()))) {
|
||||||
if (OB_NOT_NULL(request_string_)) { // request_string_ has been generated if enable sql_audit
|
if (OB_NOT_NULL(request_string_)) { // request_string_ has been generated if enable sql_audit
|
||||||
LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_), K(request_string_));
|
LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_), K(request_string_));
|
||||||
} else if (had_do_response()) { // req_ may be freed
|
} else if (is_async_response_) { // req_ may be freed
|
||||||
LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_));
|
LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_));
|
||||||
} else {
|
} else {
|
||||||
LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_), "request", RpcProcessor::arg_);
|
LOG_WARN("fail to process table_api request", K(ret), K(stat_event_type_), "request", RpcProcessor::arg_);
|
||||||
@ -965,7 +966,7 @@ int ObTableRpcProcessor<T>::response(int error_code)
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
// if it is waiting for retry in queue, the response can NOT be sent.
|
// if it is waiting for retry in queue, the response can NOT be sent.
|
||||||
if (!need_retry_in_queue_ && !had_do_response()) {
|
if (!need_retry_in_queue_ && !is_async_response()) {
|
||||||
const ObRpcPacket *rpc_pkt = &reinterpret_cast<const ObRpcPacket&>(this->req_->get_packet());
|
const ObRpcPacket *rpc_pkt = &reinterpret_cast<const ObRpcPacket&>(this->req_->get_packet());
|
||||||
if (ObTableRpcProcessorUtil::need_do_move_response(error_code, *rpc_pkt)) {
|
if (ObTableRpcProcessorUtil::need_do_move_response(error_code, *rpc_pkt)) {
|
||||||
// response rerouting packet
|
// response rerouting packet
|
||||||
|
@ -139,7 +139,7 @@ public:
|
|||||||
int get_tablet_by_rowkey(uint64_t table_id, const ObIArray<ObRowkey> &rowkeys,
|
int get_tablet_by_rowkey(uint64_t table_id, const ObIArray<ObRowkey> &rowkeys,
|
||||||
ObIArray<ObTabletID> &tablet_ids);
|
ObIArray<ObTabletID> &tablet_ids);
|
||||||
inline transaction::ObTxReadSnapshot &get_tx_snapshot() { return tx_snapshot_; }
|
inline transaction::ObTxReadSnapshot &get_tx_snapshot() { return tx_snapshot_; }
|
||||||
inline bool had_do_response() const { return had_do_response_; }
|
inline bool is_async_response() const { return is_async_response_; }
|
||||||
int get_table_id(const ObString &table_name, const uint64_t arg_table_id, uint64_t &real_table_id) const;
|
int get_table_id(const ObString &table_name, const uint64_t arg_table_id, uint64_t &real_table_id) const;
|
||||||
protected:
|
protected:
|
||||||
virtual int check_arg() = 0;
|
virtual int check_arg() = 0;
|
||||||
@ -188,7 +188,7 @@ protected:
|
|||||||
// trans control
|
// trans control
|
||||||
sql::TransState trans_state_;
|
sql::TransState trans_state_;
|
||||||
transaction::ObTxDesc *trans_desc_;
|
transaction::ObTxDesc *trans_desc_;
|
||||||
bool had_do_response_; // asynchronous transactions return packet in advance
|
bool is_async_response_; // asynchronous transactions return packet in advance
|
||||||
sql::TransState *trans_state_ptr_;
|
sql::TransState *trans_state_ptr_;
|
||||||
transaction::ObTxReadSnapshot tx_snapshot_;
|
transaction::ObTxReadSnapshot tx_snapshot_;
|
||||||
ObAddr user_client_addr_;
|
ObAddr user_client_addr_;
|
||||||
|
Reference in New Issue
Block a user