fix load data failed when tablet changed
This commit is contained in:
parent
947a2b17e6
commit
b1bd203810
@ -927,9 +927,6 @@ ObShuffleTaskHandle::~ObShuffleTaskHandle()
|
||||
if (OB_NOT_NULL(data_buffer)) {
|
||||
ob_free(data_buffer);
|
||||
}
|
||||
if (OB_NOT_NULL(escape_buffer)) {
|
||||
ob_free(escape_buffer);
|
||||
}
|
||||
}
|
||||
|
||||
int ObShuffleTaskHandle::expand_buf(const int64_t max_size, const int64_t to_buffer_size)
|
||||
@ -940,21 +937,16 @@ int ObShuffleTaskHandle::expand_buf(const int64_t max_size, const int64_t to_buf
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
LOG_WARN("buffer size not enough", K(ret));
|
||||
} else {
|
||||
void *buf1 = NULL;
|
||||
void *buf2 = NULL;
|
||||
if (OB_ISNULL(buf1 = ob_malloc(new_size, attr))
|
||||
|| OB_ISNULL(buf2 = ob_malloc(new_size, attr))) {
|
||||
char *buf = NULL;
|
||||
if (OB_ISNULL(buf = static_cast<char*>(ob_malloc(new_size * 2, attr)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
} else {
|
||||
if (OB_NOT_NULL(data_buffer)) {
|
||||
ob_free(data_buffer);
|
||||
}
|
||||
data_buffer = new(buf1) ObLoadFileBuffer(
|
||||
data_buffer = new(buf) ObLoadFileBuffer(
|
||||
new_size - sizeof(ObLoadFileBuffer));
|
||||
if (OB_NOT_NULL(escape_buffer)) {
|
||||
ob_free(escape_buffer);
|
||||
}
|
||||
escape_buffer = new(buf2) ObLoadFileBuffer(
|
||||
escape_buffer = new(buf + new_size) ObLoadFileBuffer(
|
||||
new_size - sizeof(ObLoadFileBuffer));
|
||||
}
|
||||
}
|
||||
@ -3014,6 +3006,13 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm
|
||||
handle->row_in_file.assign(obj_array, num_of_file_column);
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), handle->schema_guard))) {
|
||||
LOG_WARN("get tenant schema guard failed", KR(ret));
|
||||
} else {
|
||||
handle->exec_ctx.get_sql_ctx()->schema_guard_ = &handle->schema_guard;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret) || OB_FAIL(shuffle_resource.push_back(handle))) {
|
||||
handle->~ObShuffleTaskHandle();
|
||||
|
@ -511,6 +511,7 @@ struct ObShuffleTaskHandle {
|
||||
ObShuffleResult result;
|
||||
ObSEArray<ObParserErrRec, 16> err_records;
|
||||
common::ObMemAttr attr;
|
||||
share::schema::ObSchemaGetterGuard schema_guard;
|
||||
TO_STRING_KV("task_id", result.task_id_);
|
||||
};
|
||||
|
||||
|
@ -143,12 +143,7 @@ int ObRpcLoadDataShuffleTaskExecuteP::process()
|
||||
} else if (OB_ISNULL(handle)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("handle is null", K(ret));
|
||||
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard_))) {
|
||||
//Confirmed with the load data owner that the inability to calculate the correct tablet_id here will not affect the execution,
|
||||
//so we use the latest schema version to obtain the guard
|
||||
LOG_WARN("get tenant schema guard failed", KR(ret));
|
||||
} else {
|
||||
handle->exec_ctx.get_sql_ctx()->schema_guard_ = &schema_guard_;
|
||||
} else {
|
||||
if (OB_UNLIKELY(THIS_WORKER.is_timeout())) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("LOAD DATA shuffle task timeout", K(ret), K(task));
|
||||
|
@ -531,7 +531,6 @@ protected:
|
||||
int process();
|
||||
private:
|
||||
const observer::ObGlobalContext &gctx_;
|
||||
share::schema::ObSchemaGetterGuard schema_guard_;
|
||||
};
|
||||
|
||||
class ObRpcLoadDataShuffleTaskCallBack
|
||||
|
Loading…
x
Reference in New Issue
Block a user