[CP] [OBCDC] Fix flow_controll Issue for sysbench oltp_insert test

This commit is contained in:
SanmuWangZJU 2024-02-02 22:42:30 +00:00 committed by ob-robot
parent b85c110e1e
commit 6bfc404874
10 changed files with 147 additions and 62 deletions

View File

@ -870,8 +870,10 @@ void ObDictTableMeta::reset()
database_id_ = OB_INVALID_ID;
table_id_ = OB_INVALID_ID;
schema_version_ = OB_INVALID_TIMESTAMP;
allocator_->free(table_name_.ptr());
table_name_.reset();
if (table_name_.ptr() != nullptr) {
allocator_->free(table_name_.ptr());
table_name_.reset();
}
name_case_mode_ = common::OB_NAME_CASE_INVALID;
table_type_ = schema::ObTableType::USER_TABLE;
table_mode_.reset();

View File

@ -115,7 +115,7 @@ private:
ObDictMetaType meta_type_;
ObDictMetaStorageType storage_type_;
int64_t dict_serialized_length_;
};
}; // end of ObDictMetaHeader
class ObDictTenantMeta
{
@ -189,7 +189,7 @@ private:
int64_t drop_tenant_time_;
bool in_recyclebin_;
share::ObLSArray ls_arr_;
};
}; // end of ObDictTenantMeta
class ObDictDatabaseMeta
{
@ -250,7 +250,7 @@ private:
common::ObCollationType collation_type_;//default:utf8mb4_general_ci
common::ObNameCaseMode name_case_mode_;
bool in_recyclebin_;
};
}; // end of ObDictDatabaseMeta
class ObDictColumnMeta
{
@ -361,7 +361,7 @@ private:
common::ObSEArray<uint64_t, 2> column_ref_ids_;
uint64_t udt_set_id_;
uint64_t sub_type_;
};
}; // end of ObDictColumnMeta
class ObDictTableMeta
{
@ -527,7 +527,7 @@ private:
common::ObIndexColumn *index_cols_;
uint64_t data_table_id_;
uint64_t association_table_id_;
};
}; // end of ObDictTableMeta
} // namespace datadict
} // namespace oceanbase

View File

@ -215,51 +215,27 @@ int ObCDCLobDataMerger::push_lob_column_(
const auto seq_no_st = transaction::ObTxSEQ::cast_from_int(lob_data_out_row_ctx->seq_no_st_);
const uint32_t seq_no_cnt = lob_data_out_row_ctx->seq_no_cnt_;
const uint32_t del_seq_no_cnt = lob_data_out_row_ctx->del_seq_no_cnt_;
const uint32_t insert_seq_no_cnt = seq_no_cnt - del_seq_no_cnt;
LobColumnFragmentCtxList new_lob_col_fra_ctx_list;
LobColumnFragmentCtxList old_lob_col_fra_ctx_list;
if (is_empty_sql) {
// do nothing
} else if (lob_data_get_ctx.is_insert()) {
if (OB_FAIL(lob_data_get_ctx.new_lob_col_ctx_.init(seq_no_cnt, allocator))) {
if (OB_FAIL(check_empty_outrow_lob_col_(lob_data_get_ctx, seq_no_cnt, del_seq_no_cnt, is_update_outrow_lob_from_empty_to_empty))) {
LOG_ERROR("check_empty_outrow_lob_col_ failed", K(lob_data_get_ctx), K(seq_no_cnt), K(del_seq_no_cnt), K(is_update_outrow_lob_from_empty_to_empty));
} else if (OB_FAIL(lob_data_get_ctx.new_lob_col_ctx_.init(insert_seq_no_cnt, allocator))) {
LOG_ERROR("lob_data_get_ctx new_lob_col_ctx_ init failed", KR(ret), K(seq_no_cnt),
K(lob_data_get_ctx));
} else if (OB_FAIL(get_lob_col_fra_ctx_list_(true/*is_new_col*/, seq_no_st, seq_no_cnt, allocator,
lob_data_get_ctx, new_lob_col_fra_ctx_list))) {
lob_data_get_ctx, new_lob_col_fra_ctx_list))) {
LOG_ERROR("get_lob_col_fra_ctx_list_ failed", KR(ret), K(seq_no_st), K(seq_no_cnt),
K(new_lob_col_fra_ctx_list));
}
} else if (lob_data_get_ctx.is_update()) {
const int64_t insert_seq_no_cnt = seq_no_cnt - del_seq_no_cnt;
// NOTICE:
// 1. Update LOB column data from in_row to out_row, the del_seq_no_cnt is 0
// 2. Update LOB column data from out_row to empty string, the insert_seq_no_cnt is 0
//
// 3. Currently, LOB column data is stored in out_row in these cases:
// 3.1 Length of column data is larger than 4K
// 3.2. Length of column data is less than 4K(even if column data is empty string),
// but was larger than 4K(stored out_row) and not update to NULL until this trans.
if (del_seq_no_cnt == 0 && insert_seq_no_cnt == del_seq_no_cnt) {
// empty out_row update to empty out_row
// Under normal circumstances, this scenario should not occur;
// Abnormaly circumstances in OBServer version less than 4.2.1 BP2 and 4.1.0 BP4, please refer case t/libobcdc/lob_empty_outrow_udpate.test
const uint64_t cluster_version = TCTX.global_info_.get_min_cluster_version();
const bool skip_ob_version_exist_known_issues = (cluster_version == 0) // can't get cluster_version, may in direct mode
|| (cluster_version < CLUSTER_VERSION_4_2_1_2) // ob version less than 4213 and 4102 has known issues will result in this scenario.
|| (cluster_version <= CLUSTER_VERSION_4_1_0_2);
const bool can_ignore_empty_outrow_update = (1 == TCONF.skip_empty_outrow_lob_update) || skip_ob_version_exist_known_issues;
if (can_ignore_empty_outrow_update) {
is_update_outrow_lob_from_empty_to_empty = true;
} else {
ret = OB_NOT_SUPPORTED;
LOG_ERROR("[FATAL] [OUTROW_LOB] unexpected update outrow lob from empty to empty, config skip_empty_outrow_lob_update = 1 if necessary",
KR(ret), K(can_ignore_empty_outrow_update), K(cluster_version));
}
}
if (FAILEDx(lob_data_get_ctx.old_lob_col_ctx_.init(del_seq_no_cnt, allocator))) {
if (OB_FAIL(check_empty_outrow_lob_col_(lob_data_get_ctx, seq_no_cnt, del_seq_no_cnt, is_update_outrow_lob_from_empty_to_empty))) {
LOG_ERROR("check_empty_outrow_lob_col_ failed", K(lob_data_get_ctx), K(seq_no_cnt), K(del_seq_no_cnt), K(is_update_outrow_lob_from_empty_to_empty));
} else if (OB_FAIL(lob_data_get_ctx.old_lob_col_ctx_.init(del_seq_no_cnt, allocator))) {
LOG_ERROR("lob_data_get_ctx old_lob_col_ctx_ init failed", KR(ret), K(del_seq_no_cnt),
K(lob_data_get_ctx));
} else if (OB_FAIL(get_lob_col_fra_ctx_list_(false/*is_new_col*/, seq_no_st, del_seq_no_cnt,
@ -312,6 +288,46 @@ int ObCDCLobDataMerger::push_lob_column_(
return ret;
}
int ObCDCLobDataMerger::check_empty_outrow_lob_col_(
ObLobDataGetCtx &lob_data_get_ctx,
uint32_t seq_no_cnt,
uint32_t del_seq_no_cnt,
bool &is_update_outrow_lob_from_empty_to_empty)
{
int ret = OB_SUCCESS;
const int64_t insert_seq_no_cnt = seq_no_cnt - del_seq_no_cnt;
const bool is_empty_lob = lob_data_get_ctx.is_insert() ? insert_seq_no_cnt == 0 : (insert_seq_no_cnt == 0 && del_seq_no_cnt == 0);
// NOTICE:
// 1. Update LOB column data from in_row to out_row, the del_seq_no_cnt is 0
// 2. Update LOB column data from out_row to empty string, the insert_seq_no_cnt is 0
//
// 3. Currently, LOB column data is stored in out_row in these cases:
// 3.1 Length of column data is larger than 4K
// 3.2. Length of column data is less than 4K(even if column data is empty string),
// but was larger than 4K(stored out_row) and not update to NULL until this trans.
if (is_empty_lob) {
// empty out_row update to empty out_row
// Under normal circumstances, this scenario should not occur;
// Abnormaly circumstances in OBServer version less than 4.2.1 BP2 and 4.1.0 BP4, please refer case t/libobcdc/lob_empty_outrow_udpate.test
const uint64_t cluster_version = TCTX.global_info_.get_min_cluster_version();
const bool skip_ob_version_exist_known_issues = (cluster_version == 0) // can't get cluster_version, may in direct mode
|| (cluster_version < CLUSTER_VERSION_4_2_1_2) // ob version less than 4213 and 4102 has known issues will result in this scenario.
|| (cluster_version <= CLUSTER_VERSION_4_1_0_2);
const bool can_ignore_empty_outrow_update = (1 == TCONF.skip_empty_outrow_lob_update) || skip_ob_version_exist_known_issues;
if (can_ignore_empty_outrow_update) {
is_update_outrow_lob_from_empty_to_empty = true;
} else {
ret = OB_NOT_SUPPORTED;
LOG_ERROR("[FATAL] [OUTROW_LOB] unexpected update outrow lob from empty to empty, config skip_empty_outrow_lob_update = 1 if necessary",
KR(ret), K(can_ignore_empty_outrow_update), K(cluster_version));
}
}
return ret;
}
int ObCDCLobDataMerger::get_lob_col_fra_ctx_list_(
const bool is_new_col,
const transaction::ObTxSEQ &seq_no_start,

View File

@ -81,6 +81,11 @@ private:
ObLobDataOutRowCtxList &task,
ObLobDataGetCtx &lob_data_get_ctx,
volatile bool &stop_flag);
int check_empty_outrow_lob_col_(
ObLobDataGetCtx &lob_data_get_ctx,
uint32_t seq_no_cnt,
uint32_t del_seq_no_cnt,
bool &is_update_outrow_lob_from_empty_to_empty);
int get_lob_col_fra_ctx_list_(
const bool is_new_col,
const transaction::ObTxSEQ &seq_no_start,

View File

@ -106,6 +106,7 @@ int ObCDCTenantSQLServerProvider::prepare_refresh()
} else if (OB_FAIL(query_tenant_server_())) {
LOG_WARN("query_tenant_server_ failed", KR(ret));
}
LOG_INFO("prepare_refresh tenant sql_server_provider done", KR(ret));
return ret;
}
@ -120,6 +121,7 @@ int ObCDCTenantSQLServerProvider::end_refresh()
} else if (OB_FAIL(refresh_server_lock_.unlock())) {
LOG_ERROR("release refresh_server_lock_ failed", KR(ret), K(this));
}
LOG_INFO("refresh tenant sql_server_provider end", KR(ret));
return ret;
}
@ -296,7 +298,7 @@ int ObCDCTenantSQLServerProvider::query_tenant_server_()
LOG_WARN("tenant_server_list is empty but tenant is not dropped", K(tenant_id), K(tenant_server_list));
}
} else {
LOG_DEBUG("find tenant servers", K(tenant_id), K(tenant_server_list));
LOG_INFO("find tenant servers", K(tenant_id), K(tenant_server_list));
}
if (OB_NOT_NULL(tenant_server_list)) {

View File

@ -133,9 +133,10 @@ int ObLogLsGetter::get_ls_ids(
// query and set
if (OB_FAIL(query_and_set_tenant_ls_info_(tenant_id, snapshot_ts))) {
LOG_ERROR("query_and_set_tenant_ls_info_ failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(tenant_ls_ids_cache_.get(tenant_id, ls_ids))) {
LOG_ERROR("get tenant_ls_ids from cache failed", KR(ret), K(tenant_id), K(snapshot_ts));
}
}
} else {
}
if (OB_SUCC(ret)) {
@ -144,6 +145,10 @@ int ObLogLsGetter::get_ls_ids(
LOG_ERROR("ls_id_array push_back failed", KR(ret), K(ls_id_array));
}
}
if (OB_SUCC(ret)) {
LOG_INFO("get_ls_ids succ", K(tenant_id), K(ls_id_array), K(snapshot_ts));
}
}
return ret;

View File

@ -14,6 +14,7 @@
#include "ob_log_sequencer1.h"
#include <math.h>
#include "lib/string/ob_string.h" // ObString
#include "lib/atomic/ob_atomic.h"
#include "lib/thread/ob_thread_name.h"
@ -228,7 +229,10 @@ int ObLogSequencer::push(PartTransTask *part_trans_task, volatile bool &stop_fla
hash_value = ATOMIC_FAA(&round_value_, 1);
}
void *push_task = static_cast<void *>(part_trans_task);
RETRY_FUNC(stop_flag, *(static_cast<ObMQThread *>(this)), push, push_task, hash_value, DATA_OP_TIMEOUT);
if (is_global_heartbeat || OB_SUCC(wait_until_ready_queue_not_busy_(stop_flag))) {
RETRY_FUNC(stop_flag, *(static_cast<ObMQThread *>(this)), push, push_task, hash_value, DATA_OP_TIMEOUT);
}
if (OB_SUCC(ret)) {
(void)ATOMIC_AAF(&queue_part_trans_task_count_, 1);
@ -807,20 +811,19 @@ int ObLogSequencer::push_task_into_committer_(PartTransTask *task,
return ret;
}
int ObLogSequencer::handle_participants_ready_trans_(const bool is_dml_trans,
int ObLogSequencer::handle_participants_ready_trans_(
const bool is_dml_trans,
TransCtx *trans_ctx,
volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
stop_flag = stop_flag;
uint64_t tenant_id = OB_INVALID_TENANT_ID;
if (OB_ISNULL(trans_ctx)) {
LOG_ERROR("invalid argument", K(trans_ctx));
ret = OB_INVALID_ARGUMENT;
} else {
// Avoiding TransCtx recycling
uint64_t tenant_id = OB_INVALID_TENANT_ID;
ObLogTenantGuard guard;
ObLogTenant *tenant = NULL;
@ -839,23 +842,67 @@ int ObLogSequencer::handle_participants_ready_trans_(const bool is_dml_trans,
}
}
}
}
if (OB_SUCC(ret)) {
TrxSortElem &trx_sort_elem = trans_ctx->get_trx_sort_elem();
{
ObByteLockGuard guard(trans_queue_lock_);
ready_trans_queue_.push(trx_sort_elem);
}
// signal push_ready_trans_to_seq_queue_
ready_queue_cond_.signal();
_DSTAT("[TRANS_QUEUE] TENANT_ID=%lu TRANS_ID=%s QUEUE_SIZE=(%lu/%ld) IS_DML=%d",
tenant_id,
to_cstring(trx_sort_elem),
ready_trans_queue_.size(),
seq_trans_queue_.get_curr_total(),
is_dml_trans);
if (OB_SUCC(ret)) {
TrxSortElem &trx_sort_elem = trans_ctx->get_trx_sort_elem();
{
ObByteLockGuard guard(trans_queue_lock_);
ready_trans_queue_.push(trx_sort_elem);
}
// signal push_ready_trans_to_seq_queue_
ready_queue_cond_.signal();
_DSTAT("[TRANS_QUEUE] TENANT_ID=%lu TRANS_ID=%s QUEUE_SIZE=(%lu/%ld) IS_DML=%d",
tenant_id,
to_cstring(trx_sort_elem),
ready_trans_queue_.size(),
seq_trans_queue_.get_curr_total(),
is_dml_trans);
}
return ret;
}
int ObLogSequencer::wait_until_ready_queue_not_busy_(volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
const int64_t seq_trans_queue_cap = seq_trans_queue_.capacity();
const int64_t seq_trans_threshold = seq_trans_queue_cap * TCONF.pause_redo_dispatch_task_count_threshold / 100;
bool ready_queue_busy = true;
const int64_t start_ts = get_timestamp();
const int64_t base_sleep_us_if_busy = 100;
while (ready_queue_busy && !stop_flag) {
// ready_queue_busy = true if: seq_trans_queue is not empty and memory hold touch warn threshold
const int64_t mem_hold = lib::get_memory_hold();
const int64_t mem_limit = TCONF.memory_limit;
const double mem_warn_threshold = mem_limit * TCONF.memory_usage_warn_threshold / 100.0;
const int64_t seq_trans_cnt = seq_trans_queue_.get_curr_total();
if (mem_hold < mem_warn_threshold) {
ready_queue_busy = (seq_trans_cnt >= seq_trans_queue_cap);
} else if (mem_hold < mem_limit) {
ready_queue_busy = seq_trans_cnt > seq_trans_threshold;
} else {
// should not disaptch if exist trans not handled
ready_queue_busy = seq_trans_cnt > 0;
}
if (ready_queue_busy) {
if (REACH_TIME_INTERVAL_THREAD_LOCAL(PRINT_SEQ_INFO_INTERVAL)) {
LOG_INFO("[FLOW_CONTROL][PAUSE_DISPATCH_TO_SEQUENCER]",
K(mem_hold),
"sequencer_queue_size", queue_part_trans_task_count_,
"ready_queue_size", ready_trans_queue_.size(),
K(seq_trans_cnt));
}
ob_usleep(100);
}
}
if (stop_flag) {
ret = OB_IN_STOP_STATE;
}
return ret;

View File

@ -169,6 +169,7 @@ private:
int handle_participants_ready_trans_(const bool is_dml_trans,
TransCtx *trans_ctx,
volatile bool &stop_flag);
int wait_until_ready_queue_not_busy_(volatile bool &stop_flag);
// Once the participants are gathered, the entire DML transaction is processed
// TODO
int handle_dml_trans_(ObLogTenant &tenant, TransCtx &trans_ctx, volatile bool &stop_flag);

View File

@ -0,0 +1,3 @@
LOG_FILE=log/libobcdc.log
watch -n 1 "if [ -f $LOG_FILE ]; then grep NEED_PAUSE log/libobcdc.log | awk '{print \$10, \$11, \$12, \$13, \$15}' | tail -n 5; fi"

View File

@ -73,7 +73,7 @@ public:
int build_table_schema(ObTableSchema &table_schema)
{
int ret = OB_SUCCESS;
int column_cnt = 5;
int column_cnt = 500;
table_schema.set_tenant_id(get_random_uint64_());
table_schema.set_database_id(get_random_uint64_());
table_schema.set_table_id(get_random_uint64_());
@ -101,6 +101,10 @@ public:
column_schema.schema_version_ = 2341235;
ObObjMeta meta;
meta.set_collation_type(ObCollationType::CS_TYPE_GB18030_BIN);
common::ObString ext_info("enum {a, b, c}");
ObArray<common::ObString> extend_info;
extend_info.push_back(ext_info);
column_schema.set_extended_type_info(extend_info);
column_schema.meta_type_ = meta;
return ret;
}