load data local task shouldn't retry

This commit is contained in:
hnwyllmm
2024-02-06 07:12:20 +00:00
committed by ob-robot
parent 03b6636e04
commit 6f86116f83
2 changed files with 25 additions and 6 deletions

View File

@ -266,6 +266,17 @@ public:
return exec_ctx.get_table_direct_insert_ctx().get_is_direct(); return exec_ctx.get_table_direct_insert_ctx().get_is_direct();
} }
bool is_load_local(ObRetryParam &v) const
{
bool bret = false;
const ObICmd *cmd = v.result_.get_cmd();
if (OB_NOT_NULL(cmd) && cmd->get_cmd_type() == stmt::T_LOAD_DATA) {
const ObLoadDataStmt *load_data_stmt = static_cast<const ObLoadDataStmt *>(cmd);
bret = load_data_stmt->get_load_arguments().load_file_storage_ == ObLoadFileLocation::CLIENT_DISK;
}
return bret;
}
virtual void test(ObRetryParam &v) const override virtual void test(ObRetryParam &v) const override
{ {
int err = v.err_; int err = v.err_;
@ -287,6 +298,10 @@ public:
v.retry_type_ = RETRY_TYPE_NONE; v.retry_type_ = RETRY_TYPE_NONE;
} }
v.no_more_test_ = true; v.no_more_test_ = true;
} else if (is_load_local(v)) {
v.client_ret_ = err;
v.retry_type_ = RETRY_TYPE_NONE;
v.no_more_test_ = true;
} else if (is_direct_load(v)) { } else if (is_direct_load(v)) {
if (is_direct_load_retry_err(err)) { if (is_direct_load_retry_err(err)) {
try_packet_retry(v); try_packet_retry(v);

View File

@ -297,24 +297,27 @@ ObPacketStreamFileReader::~ObPacketStreamFileReader()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
LOG_INFO("load data local try to receive all packets from client if eof is false", K_(eof));
// We read all data from client before close the file. // We read all data from client before close the file.
// We will stop to handle the process while something error. // We will stop to handle the process while something error.
// But the client must send all file content to us and the // But the client must send all file content to us and the
// normal SQL processor cannot handle the packets, so we // normal SQL processor cannot handle the packets, so we
// eat all packets with file content. // eat all packets with file content.
timeout_ts_ = -1;
// We will wait at most 10 seconds if there is no more data come in. // We will wait at most 10 seconds if there is no more data come in.
const int64_t wait_timeout = 10 * 1000000L; // seconds const int64_t wait_timeout = 10 * 1000000L; // seconds
int64_t wait_deadline = ObTimeUtility::current_time() + wait_timeout; timeout_ts_ = ObTimeUtility::current_time() + wait_timeout;
int64_t last_received_size = received_size_; int64_t last_received_size = received_size_;
while (!eof_ && OB_SUCC(ret) && ObTimeUtility::current_time() <= wait_deadline) { while (!eof_ && OB_SUCC(ret) && ObTimeUtility::current_time() <= timeout_ts_) {
ret = receive_packet(); ret = receive_packet();
if (received_size_ > last_received_size) { if (received_size_ > last_received_size) {
last_received_size = received_size_; last_received_size = received_size_;
wait_deadline = ObTimeUtility::current_time() + wait_timeout; timeout_ts_ = ObTimeUtility::current_time() + wait_timeout;
} }
} }
arena_allocator_.reset(); arena_allocator_.reset();
LOG_INFO("load data local file reader exit");
} }
int ObPacketStreamFileReader::open(const ObString &filename, int ObPacketStreamFileReader::open(const ObString &filename,
@ -335,7 +338,7 @@ int ObPacketStreamFileReader::open(const ObString &filename,
} else if (OB_FAIL(packet_handle.flush_buffer(false/*is_last*/))) { } else if (OB_FAIL(packet_handle.flush_buffer(false/*is_last*/))) {
LOG_INFO("failed to flush socket buffer while send local infile packet", K(ret), K(filename)); LOG_INFO("failed to flush socket buffer while send local infile packet", K(ret), K(filename));
} else { } else {
LOG_TRACE("send filename to client success", K(filename)); LOG_INFO("[load data local]send filename to client success", K(filename));
observer::ObSMConnection *sm_connection = session->get_sm_connection(); observer::ObSMConnection *sm_connection = session->get_sm_connection();
if (OB_NOT_NULL(sm_connection) && if (OB_NOT_NULL(sm_connection) &&
@ -351,6 +354,7 @@ int ObPacketStreamFileReader::open(const ObString &filename,
received_size_ = 0; received_size_ = 0;
read_size_ = 0; read_size_ = 0;
eof_ = false; eof_ = false;
LOG_INFO("[load data local] open socket file reader", K_(timeout_ts));
} }
return ret; return ret;
} }
@ -388,7 +392,7 @@ int ObPacketStreamFileReader::read(char *buf, int64_t count, int64_t &read_size)
if (is_timeout()) { if (is_timeout()) {
ret = OB_TIMEOUT; ret = OB_TIMEOUT;
LOG_WARN("load data reader file timeout", KR(ret)); LOG_WARN("load data won't read more data from client as the task was timeout", KR(ret), K_(timeout_ts));
} else if (session_ != NULL && session_->is_query_killed()) { } else if (session_ != NULL && session_->is_query_killed()) {
ret = OB_ERR_QUERY_INTERRUPTED; ret = OB_ERR_QUERY_INTERRUPTED;
LOG_WARN("load data reader terminated as the query is killed", KR(ret)); LOG_WARN("load data reader terminated as the query is killed", KR(ret));