Add a tenant_guard to prevent the tenant from being migrated during the intermediate result transmission process.

This commit is contained in:
obdev
2023-08-04 09:48:41 +00:00
committed by ob-robot
parent 5da97d0102
commit 0fc560ea70

View File

@ -439,33 +439,40 @@ int ObDTLIntermResultManager::process_interm_result(ObDtlLinkedBuffer *buffer, i
if (OB_ISNULL(buffer)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to process buffer", K(ret));
} else if (buffer->is_batch_info_valid()) {
const ObSArray<ObDtlBatchInfo> &infos = buffer->get_batch_info();
for (int64_t i = 0; i < infos.count() && OB_SUCC(ret); i++) {
} else {
// Prevent the tenant from being migrated during the intermediate result transmission process, leading to error 4013.
MAKE_TENANT_SWITCH_SCOPE_GUARD(tenant_guard);
if (OB_FAIL(tenant_guard.switch_to(buffer->tenant_id()))) {
LOG_WARN("switch tenant failed", K(buffer->tenant_id()));
} else if (buffer->is_batch_info_valid()) {
const ObSArray<ObDtlBatchInfo> &infos = buffer->get_batch_info();
for (int64_t i = 0; i < infos.count() && OB_SUCC(ret); i++) {
ObDTLIntermResultKey key;
const ObDtlBatchInfo &batch_info = infos.at(i);
key.time_us_ = buffer->timeout_ts();
key.batch_id_ = batch_info.batch_id_;
key.channel_id_ = channel_id;
const int64_t start_pos = batch_info.start_;
const int64_t length = batch_info.end_ - start_pos;
const int64_t rows = batch_info.rows_;
const bool is_eof = infos.count() - 1 == i ? buffer->is_eof() : true;
if (OB_FAIL(process_interm_result_inner(*buffer, key, start_pos, length, rows, is_eof, false))) {
LOG_WARN("process interm result inner", K(ret));
}
}
LOG_TRACE("process interm result", K(buffer->size()), K(buffer->get_batch_info().count()),
K(buffer->get_batch_info()));
} else {
ObDTLIntermResultKey key;
const ObDtlBatchInfo &batch_info = infos.at(i);
key.time_us_ = buffer->timeout_ts();
key.batch_id_ = batch_info.batch_id_;
key.batch_id_ = buffer->get_batch_id();
key.channel_id_ = channel_id;
const int64_t start_pos = batch_info.start_;
const int64_t length = batch_info.end_ - start_pos;
const int64_t rows = batch_info.rows_;
const bool is_eof = infos.count() - 1 == i ? buffer->is_eof() : true;
if (OB_FAIL(process_interm_result_inner(*buffer, key, start_pos, length, rows, is_eof, false))) {
if (OB_FAIL(process_interm_result_inner(*buffer, key, 0, buffer->size(), 0, buffer->is_eof(), true))) {
LOG_WARN("process interm result inner", K(ret));
}
}
LOG_TRACE("process interm result", K(buffer->size()), K(buffer->get_batch_info().count()),
K(buffer->get_batch_info()));
} else {
ObDTLIntermResultKey key;
key.time_us_ = buffer->timeout_ts();
key.batch_id_ = buffer->get_batch_id();
key.channel_id_ = channel_id;
if (OB_FAIL(process_interm_result_inner(*buffer, key, 0, buffer->size(), 0, buffer->is_eof(), true))) {
LOG_WARN("process interm result inner", K(ret));
}
}
return ret;
}