diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index e9588e168b..4c6cf49aaa 100644 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -7762,7 +7762,7 @@ int ObRootService::observer_copy_local_index_sstable(const obrpc::ObServerCopyLo dest_replica.member_ = ObReplicaMember(r->server_, ObTimeUtility::current_time(), r->replica_type_, r->get_memstore_percent()); dest_replica.unit_id_ = r->unit_id_; - data_size = r->data_size_; + data_size = arg.data_size_ != 0 ? arg.data_size_ : r->data_size_; } else if (r->server_ != data_src) { // by pass } else if (OB_FAIL(server_manager_.check_server_alive(r->server_, alive))) { diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index d15f052481..fb693fefc6 100644 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -2642,7 +2642,7 @@ bool ObCopySSTableBatchArg::is_valid() const return is_valid; } -OB_SERIALIZE_MEMBER(ObServerCopyLocalIndexSSTableArg, data_src_, dst_, pkey_, index_table_id_, cluster_id_); +OB_SERIALIZE_MEMBER(ObServerCopyLocalIndexSSTableArg, data_src_, dst_, pkey_, index_table_id_, cluster_id_, data_size_); bool ObServerCopyLocalIndexSSTableArg::is_valid() const { diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 9ddbc2ac9a..3a214e8b8e 100644 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -3485,12 +3485,12 @@ struct ObServerCopyLocalIndexSSTableArg { public: ObServerCopyLocalIndexSSTableArg() - : data_src_(), dst_(), pkey_(), index_table_id_(common::OB_INVALID_ID), cluster_id_(common::OB_INVALID_ID) + : data_src_(), dst_(), pkey_(), index_table_id_(common::OB_INVALID_ID), cluster_id_(common::OB_INVALID_ID), data_size_(0) {} public: bool is_valid() const; - TO_STRING_KV(K_(data_src), K_(dst), K_(pkey), K_(index_table_id), K_(cluster_id)); + TO_STRING_KV(K_(data_src), K_(dst), K_(pkey), K_(index_table_id), K_(cluster_id), K_(data_size)); public: common::ObAddr data_src_; @@ -3498,6 +3498,7 @@ public: common::ObPartitionKey pkey_; uint64_t index_table_id_; int64_t cluster_id_; + int64_t data_size_; }; struct ObBackupBatchArg { diff --git a/src/storage/ob_build_index_scheduler.cpp b/src/storage/ob_build_index_scheduler.cpp index b679e1cea6..11fbb61ad7 100644 --- a/src/storage/ob_build_index_scheduler.cpp +++ b/src/storage/ob_build_index_scheduler.cpp @@ -994,6 +994,8 @@ int ObBuildIndexScheduleTask::send_copy_replica_rpc() // if the source and destination are the same, it means that this replica builds the index sstable itself, // just retry the scheduling process will get the right way to next state ret = OB_EAGAIN; + } else if (OB_FAIL(get_data_size(arg.data_size_))) { + STORAGE_LOG(WARN, "fail to get data size", K(ret)); } else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard( extract_tenant_id(index_id_), schema_guard))) { STORAGE_LOG(WARN, "fail to get schema guard", K(ret), K(schema_version_)); @@ -1392,6 +1394,49 @@ int ObBuildIndexScheduleTask::schedule_dag() return ret; } +int ObBuildIndexScheduleTask::get_data_size(int64_t &data_size) +{ + int ret = OB_SUCCESS; + ObSqlString sql; + SMART_VAR(ObMySQLProxy::MySQLResult, res) + { + sqlclient::ObMySQLResult *result = NULL; + char ip[common::OB_MAX_SERVER_ADDR_SIZE] = ""; + if (OB_INVALID_ID == index_id_ || !pkey_.is_valid() || !candidate_replica_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arguments", K(ret), K(index_id_), K(pkey_), K(candidate_replica_)); + } else if (!candidate_replica_.ip_to_string(ip, sizeof(ip))) { + LOG_WARN("fail to convert ObAddr to ip", K(ret)); + } else if (OB_FAIL(sql.assign_fmt( + "SELECT used_size, MAX(major_version) from %s " + "WHERE tenant_id = %ld AND table_id = %ld AND partition_id = %ld AND sstable_id = %ld " + "AND svr_ip = '%s' AND svr_port = %d", + OB_ALL_VIRTUAL_STORAGE_STAT_TNAME, + extract_tenant_id(index_id_), + pkey_.get_table_id(), + pkey_.get_partition_id(), + index_id_, + ip, + candidate_replica_.get_port()))) { + STORAGE_LOG(WARN, "fail to assign sql", K(ret)); + } else if (OB_FAIL(GCTX.sql_proxy_->read(res, sql.ptr()))) { + LOG_WARN("fail to execute sql", K(ret), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "error unexpected, query result must not be NULL", K(ret)); + } else if (OB_FAIL(result->next())) { + if (OB_LIKELY(OB_ITER_END == ret)) { + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to get next row", K(ret)); + } + } else { + EXTRACT_INT_FIELD_MYSQL(*result, "used_size", data_size, int64_t); + } + } + return ret; +} + int ObBuildIndexScheduleTask::process() { int ret = OB_SUCCESS; diff --git a/src/storage/ob_build_index_scheduler.h b/src/storage/ob_build_index_scheduler.h index ff75ca6fca..37c5bef939 100644 --- a/src/storage/ob_build_index_scheduler.h +++ b/src/storage/ob_build_index_scheduler.h @@ -139,6 +139,7 @@ private: int get_candidate_source_replica(const bool need_refresh = false); int unique_index_checking(const bool is_leader); int rollback_state(const int state); + int get_data_size(int64_t &data_size); private: static const int64_t COPY_BUILD_INDEX_DATA_TIMEOUT = 10 * 1000 * 1000LL; // 10s