From 0fc560ea70647052c5075961aa4dd2ad47cd86a2 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 4 Aug 2023 09:48:41 +0000 Subject: [PATCH] Add a tenant_guard to prevent the tenant from being migrated during the intermediate result transmission process. --- src/sql/dtl/ob_dtl_interm_result_manager.cpp | 47 +++++++++++--------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/src/sql/dtl/ob_dtl_interm_result_manager.cpp b/src/sql/dtl/ob_dtl_interm_result_manager.cpp index 2afb66fb9f..06ea94f6c8 100644 --- a/src/sql/dtl/ob_dtl_interm_result_manager.cpp +++ b/src/sql/dtl/ob_dtl_interm_result_manager.cpp @@ -439,33 +439,40 @@ int ObDTLIntermResultManager::process_interm_result(ObDtlLinkedBuffer *buffer, i if (OB_ISNULL(buffer)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("fail to process buffer", K(ret)); - } else if (buffer->is_batch_info_valid()) { - const ObSArray &infos = buffer->get_batch_info(); - for (int64_t i = 0; i < infos.count() && OB_SUCC(ret); i++) { + } else { + // Prevent the tenant from being migrated during the intermediate result transmission process, leading to error 4013. + MAKE_TENANT_SWITCH_SCOPE_GUARD(tenant_guard); + if (OB_FAIL(tenant_guard.switch_to(buffer->tenant_id()))) { + LOG_WARN("switch tenant failed", K(buffer->tenant_id())); + } else if (buffer->is_batch_info_valid()) { + const ObSArray &infos = buffer->get_batch_info(); + for (int64_t i = 0; i < infos.count() && OB_SUCC(ret); i++) { + ObDTLIntermResultKey key; + const ObDtlBatchInfo &batch_info = infos.at(i); + key.time_us_ = buffer->timeout_ts(); + key.batch_id_ = batch_info.batch_id_; + key.channel_id_ = channel_id; + const int64_t start_pos = batch_info.start_; + const int64_t length = batch_info.end_ - start_pos; + const int64_t rows = batch_info.rows_; + const bool is_eof = infos.count() - 1 == i ? buffer->is_eof() : true; + if (OB_FAIL(process_interm_result_inner(*buffer, key, start_pos, length, rows, is_eof, false))) { + LOG_WARN("process interm result inner", K(ret)); + } + } + LOG_TRACE("process interm result", K(buffer->size()), K(buffer->get_batch_info().count()), + K(buffer->get_batch_info())); + } else { ObDTLIntermResultKey key; - const ObDtlBatchInfo &batch_info = infos.at(i); key.time_us_ = buffer->timeout_ts(); - key.batch_id_ = batch_info.batch_id_; + key.batch_id_ = buffer->get_batch_id(); key.channel_id_ = channel_id; - const int64_t start_pos = batch_info.start_; - const int64_t length = batch_info.end_ - start_pos; - const int64_t rows = batch_info.rows_; - const bool is_eof = infos.count() - 1 == i ? buffer->is_eof() : true; - if (OB_FAIL(process_interm_result_inner(*buffer, key, start_pos, length, rows, is_eof, false))) { + if (OB_FAIL(process_interm_result_inner(*buffer, key, 0, buffer->size(), 0, buffer->is_eof(), true))) { LOG_WARN("process interm result inner", K(ret)); } } - LOG_TRACE("process interm result", K(buffer->size()), K(buffer->get_batch_info().count()), - K(buffer->get_batch_info())); - } else { - ObDTLIntermResultKey key; - key.time_us_ = buffer->timeout_ts(); - key.batch_id_ = buffer->get_batch_id(); - key.channel_id_ = channel_id; - if (OB_FAIL(process_interm_result_inner(*buffer, key, 0, buffer->size(), 0, buffer->is_eof(), true))) { - LOG_WARN("process interm result inner", K(ret)); - } } + return ret; }