[CP] fix trans of ddl filling lob submit after lob tablet dropped.
This commit is contained in:
@ -706,6 +706,7 @@ int ObComplementDataDag::report_replica_build_status()
|
||||
LOG_WARN("fail to send build ddl single replica response", K(ret), K(arg));
|
||||
}
|
||||
}
|
||||
DEBUG_SYNC(HOLD_DDL_COMPLEMENT_DAG_AFTER_REPORT_FINISH);
|
||||
FLOG_INFO("complement data finished", K(ret), K(context_.complement_data_ret_));
|
||||
return ret;
|
||||
}
|
||||
@ -748,6 +749,42 @@ int ObComplementDataDag::fill_dag_key(char *buf, const int64_t buf_len) const
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObComplementDataDag::check_and_exit_on_demand()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("dag has not been initialized", K(ret));
|
||||
} else {
|
||||
DEBUG_SYNC(HOLD_DDL_COMPLEMENT_DAG_WHEN_APPEND_ROW);
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
ObSqlString sql_string;
|
||||
sqlclient::ObMySQLResult *result = nullptr;
|
||||
if (OB_TMP_FAIL(sql_string.assign_fmt("SELECT status FROM %s WHERE task_id = %lu", share::OB_ALL_DDL_TASK_STATUS_TNAME, param_.task_id_))) {
|
||||
LOG_WARN("assign sql string failed", K(tmp_ret), K(param_));
|
||||
} else if (OB_TMP_FAIL(GCTX.sql_proxy_->read(res, param_.dest_tenant_id_, sql_string.ptr()))) {
|
||||
LOG_WARN("fail to execute sql", K(tmp_ret), K(sql_string));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
tmp_ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("error unexpected, query result must not be NULL", K(tmp_ret));
|
||||
} else if (OB_TMP_FAIL(result->next())) {
|
||||
if (OB_ENTRY_NOT_EXIST == tmp_ret) {
|
||||
ret = OB_CANCELED;
|
||||
}
|
||||
LOG_WARN("iterate next failed", K(ret), K(tmp_ret));
|
||||
} else {
|
||||
int task_status = 0;
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "status", task_status, int);
|
||||
if (OB_SUCC(ret)) {
|
||||
ret = task_status == ObDDLTaskStatus::REDEFINITION ? ret : OB_CANCELED;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObComplementPrepareTask::ObComplementPrepareTask()
|
||||
: ObITask(TASK_TYPE_COMPLEMENT_PREPARE), is_inited_(false), param_(nullptr), context_(nullptr)
|
||||
{
|
||||
@ -1286,6 +1323,9 @@ int ObComplementWriteTask::append_lob(
|
||||
int ObComplementWriteTask::append_row(ObScan *scan)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObComplementDataDag *current_dag = nullptr;
|
||||
const int64_t CHECK_DAG_NEED_EXIT_INTERVAL = 10000; // 1w rows.
|
||||
ObDataStoreDesc data_desc;
|
||||
HEAP_VARS_4((ObMacroBlockWriter, writer),
|
||||
(ObSchemaGetterGuard, schema_guard),
|
||||
(ObRelativeTable, relative_table),
|
||||
@ -1303,6 +1343,8 @@ int ObComplementWriteTask::append_row(ObScan *scan)
|
||||
int64_t t2 = 0;
|
||||
int64_t t3 = 0;
|
||||
int64_t lob_cnt = 0;
|
||||
int64_t row_scanned = 0;
|
||||
int64_t row_inserted = 0;
|
||||
ObArenaAllocator lob_allocator(ObModIds::OB_LOB_ACCESS_BUFFER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
|
||||
ObStoreRow reshaped_row;
|
||||
reshaped_row.flag_.set_flag(ObDmlFlag::DF_INSERT);
|
||||
@ -1357,7 +1399,7 @@ int ObComplementWriteTask::append_row(ObScan *scan)
|
||||
LOG_WARN("hidden table key is invalid", K(ret), K(hidden_table_key));
|
||||
} else if (OB_FAIL(sstable_redo_writer.init(param_->dest_ls_id_, param_->dest_tablet_id_))) {
|
||||
LOG_WARN("fail to init sstable redo writer", K(ret));
|
||||
} else if (OB_UNLIKELY(nullptr == static_cast<ObComplementDataDag *>(get_dag()))) {
|
||||
} else if (OB_ISNULL(current_dag = static_cast<ObComplementDataDag *>(get_dag()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("the dag of this task is null", K(ret));
|
||||
} else if (OB_FAIL(tenant_direct_load_mgr->get_tablet_mgr_and_check_major(
|
||||
@ -1450,7 +1492,9 @@ int ObComplementWriteTask::append_row(ObScan *scan)
|
||||
} else {
|
||||
t2 = ObTimeUtility::current_time();
|
||||
get_next_row_time += t2 - t1;
|
||||
context_->row_scanned_++;
|
||||
if (++row_scanned % 100 == 0) {
|
||||
(void) ATOMIC_AAF(&context_->row_scanned_, 100);
|
||||
}
|
||||
if (!ddl_committed && OB_FAIL(writer.append_row(datum_row))) {
|
||||
LOG_WARN("fail to append row to macro block", K(ret), K(datum_row));
|
||||
}
|
||||
@ -1474,7 +1518,14 @@ int ObComplementWriteTask::append_row(ObScan *scan)
|
||||
if (OB_SUCC(ret)) {
|
||||
t3 = ObTimeUtility::current_time();
|
||||
append_row_time += t3 - t2;
|
||||
context_->row_inserted_++;
|
||||
if (++row_inserted % 100 == 0) {
|
||||
(void) ATOMIC_AAF(&context_->row_inserted_, 100);
|
||||
}
|
||||
if (row_inserted % CHECK_DAG_NEED_EXIT_INTERVAL == 0) {
|
||||
if (OB_FAIL(current_dag->check_and_exit_on_demand())) {
|
||||
LOG_WARN("dag check and exit on demand failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
lob_allocator.reuse(); // reuse after append_row to macro block to save memory
|
||||
}
|
||||
@ -1485,6 +1536,8 @@ int ObComplementWriteTask::append_row(ObScan *scan)
|
||||
LOG_WARN("close lob sstable slice failed", K(ret));
|
||||
}
|
||||
}
|
||||
(void) ATOMIC_AAF(&context_->row_scanned_, row_scanned % 100);
|
||||
(void) ATOMIC_AAF(&context_->row_inserted_, row_inserted % 100);
|
||||
LOG_INFO("print append row to macro block cost time", K(ret), K(task_id_), K(context_->row_inserted_),
|
||||
K(get_next_row_time), K(append_row_time));
|
||||
ObRowReshapeUtil::free_row_reshape(allocator, reshape_ptr, 1);
|
||||
|
||||
@ -185,6 +185,7 @@ public:
|
||||
virtual bool is_ha_dag() const override { return false; }
|
||||
// report replica build status to RS.
|
||||
int report_replica_build_status();
|
||||
int check_and_exit_on_demand();
|
||||
private:
|
||||
bool is_inited_;
|
||||
ObComplementDataParam param_;
|
||||
|
||||
Reference in New Issue
Block a user