diff --git a/src/observer/mysql/ob_query_retry_ctrl.cpp b/src/observer/mysql/ob_query_retry_ctrl.cpp index 7b5eacddf0..874299b03b 100644 --- a/src/observer/mysql/ob_query_retry_ctrl.cpp +++ b/src/observer/mysql/ob_query_retry_ctrl.cpp @@ -266,6 +266,17 @@ public: return exec_ctx.get_table_direct_insert_ctx().get_is_direct(); } + bool is_load_local(ObRetryParam &v) const + { + bool bret = false; + const ObICmd *cmd = v.result_.get_cmd(); + if (OB_NOT_NULL(cmd) && cmd->get_cmd_type() == stmt::T_LOAD_DATA) { + const ObLoadDataStmt *load_data_stmt = static_cast(cmd); + bret = load_data_stmt->get_load_arguments().load_file_storage_ == ObLoadFileLocation::CLIENT_DISK; + } + return bret; + } + virtual void test(ObRetryParam &v) const override { int err = v.err_; @@ -287,6 +298,10 @@ public: v.retry_type_ = RETRY_TYPE_NONE; } v.no_more_test_ = true; + } else if (is_load_local(v)) { + v.client_ret_ = err; + v.retry_type_ = RETRY_TYPE_NONE; + v.no_more_test_ = true; } else if (is_direct_load(v)) { if (is_direct_load_retry_err(err)) { try_packet_retry(v); diff --git a/src/sql/engine/cmd/ob_load_data_file_reader.cpp b/src/sql/engine/cmd/ob_load_data_file_reader.cpp index b1819db722..04a235a95b 100644 --- a/src/sql/engine/cmd/ob_load_data_file_reader.cpp +++ b/src/sql/engine/cmd/ob_load_data_file_reader.cpp @@ -297,24 +297,27 @@ ObPacketStreamFileReader::~ObPacketStreamFileReader() { int ret = OB_SUCCESS; + LOG_INFO("load data local try to receive all packets from client if eof is false", K_(eof)); + // We read all data from client before close the file. // We will stop to handle the process while something error. // But the client must send all file content to us and the // normal SQL processor cannot handle the packets, so we // eat all packets with file content. - timeout_ts_ = -1; + // We will wait at most 10 seconds if there is no more data come in. const int64_t wait_timeout = 10 * 1000000L; // seconds - int64_t wait_deadline = ObTimeUtility::current_time() + wait_timeout; + timeout_ts_ = ObTimeUtility::current_time() + wait_timeout; int64_t last_received_size = received_size_; - while (!eof_ && OB_SUCC(ret) && ObTimeUtility::current_time() <= wait_deadline) { + while (!eof_ && OB_SUCC(ret) && ObTimeUtility::current_time() <= timeout_ts_) { ret = receive_packet(); if (received_size_ > last_received_size) { last_received_size = received_size_; - wait_deadline = ObTimeUtility::current_time() + wait_timeout; + timeout_ts_ = ObTimeUtility::current_time() + wait_timeout; } } arena_allocator_.reset(); + LOG_INFO("load data local file reader exit"); } int ObPacketStreamFileReader::open(const ObString &filename, @@ -335,7 +338,7 @@ int ObPacketStreamFileReader::open(const ObString &filename, } else if (OB_FAIL(packet_handle.flush_buffer(false/*is_last*/))) { LOG_INFO("failed to flush socket buffer while send local infile packet", K(ret), K(filename)); } else { - LOG_TRACE("send filename to client success", K(filename)); + LOG_INFO("[load data local]send filename to client success", K(filename)); observer::ObSMConnection *sm_connection = session->get_sm_connection(); if (OB_NOT_NULL(sm_connection) && @@ -351,6 +354,7 @@ int ObPacketStreamFileReader::open(const ObString &filename, received_size_ = 0; read_size_ = 0; eof_ = false; + LOG_INFO("[load data local] open socket file reader", K_(timeout_ts)); } return ret; } @@ -388,7 +392,7 @@ int ObPacketStreamFileReader::read(char *buf, int64_t count, int64_t &read_size) if (is_timeout()) { ret = OB_TIMEOUT; - LOG_WARN("load data reader file timeout", KR(ret)); + LOG_WARN("load data won't read more data from client as the task was timeout", KR(ret), K_(timeout_ts)); } else if (session_ != NULL && session_->is_query_killed()) { ret = OB_ERR_QUERY_INTERRUPTED; LOG_WARN("load data reader terminated as the query is killed", KR(ret));