diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index c6e7da4fd9..ce071b3514 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -204,6 +204,7 @@ PCODE_DEF(OB_REVOKE_SYSPRIV, 0x270) PCODE_DEF(OB_EXECUTE_RANGE_PART_SPLIT, 0x271) PCODE_DEF(OB_CLUSTER_ACTION_VERIFY, 0x273) PCODE_DEF(OB_SUBMIT_BUILD_INDEX_TASK, 0x274) +PCODE_DEF(OB_FETCH_SSTABLE_SIZE, 0x275) // system admin commnad PCODE_DEF(OB_ADMIN_SWITCH_REPLICA_ROLE, 0x280) diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index 601633929f..225acb58f2 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -1992,5 +1992,37 @@ int ObKillPartTransCtxP::process() return ret; } +int ObFetchSstableSizeP::process() +{ + int ret = OB_SUCCESS; + ObPartitionStorage *storage = NULL; + ObTableHandle sstable_handle; + ObIPartitionGroupGuard guard; + ObPGPartitionGuard pg_partition_guard; + ObSSTable *sstable = nullptr; + if (OB_FAIL(gctx_.par_ser_->get_partition(arg_.pkey_, guard)) + || NULL == guard.get_partition_group()) { + LOG_WARN("get_partition_storage fail", "pkey", arg_); + ret = OB_ENTRY_NOT_EXIST; + } else if (OB_FAIL(guard.get_partition_group()->get_pg_partition(arg_.pkey_, pg_partition_guard))) { + LOG_WARN("get pg partition fail", K(ret), "pkey", arg_.pkey_); + } else if (OB_ISNULL(storage = static_cast + (pg_partition_guard.get_pg_partition()->get_storage()))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get_partition_storage fail", "pkey", arg_.pkey_); + } else if (OB_FAIL(storage->get_partition_store().get_last_major_sstable(arg_.index_id_, sstable_handle))) { + LOG_WARN("fail to get index major sstable handle", K(ret), K_(arg)); + } else if (OB_FAIL(sstable_handle.get_sstable(sstable))) { + LOG_WARN("fail to get index major sstable handle", K(ret), K_(arg)); + } else if (OB_ISNULL(sstable)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("index sstable is null", K(ret), K_(arg)); + } else { + const blocksstable::ObSSTableMeta &sstable_meta = sstable->get_meta(); + result_.size_ = sstable_meta.get_total_macro_block_count() * OB_FILE_SYSTEM.get_macro_block_size(); + } + return ret; +} + } // end of namespace observer } // end of namespace oceanbase diff --git a/src/observer/ob_rpc_processor_simple.h b/src/observer/ob_rpc_processor_simple.h index e859e85405..8a6b915a00 100644 --- a/src/observer/ob_rpc_processor_simple.h +++ b/src/observer/ob_rpc_processor_simple.h @@ -226,6 +226,8 @@ OB_DEFINE_PROCESSOR_S(Common, OB_GET_MASTER_RS, ObRpcGetMasterRSP); OB_DEFINE_PROCESSOR_S(Srv, OB_CHECK_PHYSICAL_FLASHBACK_SUCC, ObCheckPhysicalFlashbackSUCCP); OB_DEFINE_PROCESSOR_S(Srv, OB_RENEW_IN_ZONE_HB, ObRenewInZoneHbP); OB_DEFINE_PROCESSOR_S(Srv, OB_KILL_PART_TRANS_CTX, ObKillPartTransCtxP); +OB_DEFINE_PROCESSOR_S(Srv, OB_FETCH_SSTABLE_SIZE, ObFetchSstableSizeP); + } // end of namespace observer } // end of namespace oceanbase diff --git a/src/observer/ob_srv_xlator_storage.cpp b/src/observer/ob_srv_xlator_storage.cpp index 5de150717c..580294646c 100644 --- a/src/observer/ob_srv_xlator_storage.cpp +++ b/src/observer/ob_srv_xlator_storage.cpp @@ -131,4 +131,5 @@ void oceanbase::observer::init_srv_xlator_for_storage(ObSrvRpcXlator* xlator) RPC_PROCESSOR(ObRpcValidateBackupBatchP, gctx_); RPC_PROCESSOR(ObRpcBackupArchiveLogBatchP, gctx_); RPC_PROCESSOR(ObRpcBackupBackupsetBatchP, gctx_); + RPC_PROCESSOR(ObFetchSstableSizeP, gctx_); } diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index 87164b118f..b2696ddf92 100644 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -7804,7 +7804,10 @@ int ObRootService::observer_copy_local_index_sstable(const obrpc::ObServerCopyLo } else if (OB_FAIL(rebalance_task_mgr_.add_task(task))) { LOG_WARN("fail to add task", K(ret), K(task)); } else { - LOG_INFO("add copy local index sstable task", K(ret)); + ROOTSERVICE_EVENT_ADD("balancer", "add_copy_sstable_task", + "partition", ReplicaInfo(pkey, data_size), + "source", src_member, + "destination", dest_replica); } } } diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 590f44e4d2..0d00bfe843 100755 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -4939,5 +4939,30 @@ int ObSubmitBuildIndexTaskArg::assign(const ObSubmitBuildIndexTaskArg &other) OB_SERIALIZE_MEMBER((ObSubmitBuildIndexTaskArg, ObDDLArg), index_tid_); +OB_SERIALIZE_MEMBER(ObFetchSstableSizeArg, pkey_, index_id_); + +int ObFetchSstableSizeArg::assign(const ObFetchSstableSizeArg &other) +{ + int ret = OB_SUCCESS; + if (this == &other) { + } else { + pkey_ = other.pkey_; + index_id_ = other.index_id_; + } + return ret; +} + +OB_SERIALIZE_MEMBER(ObFetchSstableSizeRes, size_); + +int ObFetchSstableSizeRes::assign(const ObFetchSstableSizeRes &other) +{ + int ret = OB_SUCCESS; + if (this == &other) { + } else { + size_ = other.size_; + } + return ret; +} + } // end namespace obrpc } // namespace oceanbase diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index eda618ffa9..eaf19bd7aa 100755 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -8346,6 +8346,47 @@ public: uint64_t index_tid_; }; +struct ObFetchSstableSizeArg final { + OB_UNIS_VERSION(1); + +public: + ObFetchSstableSizeArg() : pkey_(), index_id_(-1) + {} + ~ObFetchSstableSizeArg() + {} + bool is_valid() const + { + return pkey_.is_valid() && index_id_ > 0; + } + int assign(const ObFetchSstableSizeArg &other); + TO_STRING_KV(K_(pkey), K_(index_id)); + +public: + common::ObPartitionKey pkey_; + int64_t index_id_; + +private: + DISALLOW_COPY_AND_ASSIGN(ObFetchSstableSizeArg); +}; + +struct ObFetchSstableSizeRes final { + OB_UNIS_VERSION(1); + +public: + ObFetchSstableSizeRes() : size_(0) + {} + ~ObFetchSstableSizeRes() + {} + int assign(const ObFetchSstableSizeRes &other); + TO_STRING_KV(K_(size)); + +public: + int64_t size_; + +private: + DISALLOW_COPY_AND_ASSIGN(ObFetchSstableSizeRes); +}; + } // end namespace obrpc } // end namespace oceanbase #endif diff --git a/src/share/ob_srv_rpc_proxy.h b/src/share/ob_srv_rpc_proxy.h index b27fb06945..05e4fa5eef 100644 --- a/src/share/ob_srv_rpc_proxy.h +++ b/src/share/ob_srv_rpc_proxy.h @@ -208,6 +208,8 @@ public: RPC_S(PR5 renew_in_zone_hb, OB_RENEW_IN_ZONE_HB, (share::ObInZoneHbRequest), share::ObInZoneHbResponse); RPC_AP(PR5 batch_get_protection_level, OB_BATCH_GET_PROTECTION_LEVEL, (ObBatchCheckLeaderArg), ObBatchCheckRes); RPC_S(PR5 kill_part_trans_ctx, OB_KILL_PART_TRANS_CTX, (obrpc::ObKillPartTransCtxArg)); + RPC_S(PR5 fetch_sstable_size, obrpc::OB_FETCH_SSTABLE_SIZE, (obrpc::ObFetchSstableSizeArg), + obrpc::ObFetchSstableSizeRes); }; // end of class ObSrvRpcProxy } // namespace obrpc diff --git a/src/storage/ob_build_index_scheduler.cpp b/src/storage/ob_build_index_scheduler.cpp index 0c0871222f..5951f7fe5f 100644 --- a/src/storage/ob_build_index_scheduler.cpp +++ b/src/storage/ob_build_index_scheduler.cpp @@ -1419,42 +1419,18 @@ int ObBuildIndexScheduleTask::schedule_dag() int ObBuildIndexScheduleTask::get_data_size(int64_t &data_size) { int ret = OB_SUCCESS; - ObSqlString sql; - SMART_VAR(ObMySQLProxy::MySQLResult, res) - { - sqlclient::ObMySQLResult *result = NULL; - char ip[common::OB_MAX_SERVER_ADDR_SIZE] = ""; - if (OB_INVALID_ID == index_id_ || !pkey_.is_valid() || !candidate_replica_.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), K(index_id_), K(pkey_), K(candidate_replica_)); - } else if (!candidate_replica_.ip_to_string(ip, sizeof(ip))) { - LOG_WARN("fail to convert ObAddr to ip", K(ret)); - } else if (OB_FAIL(sql.assign_fmt( - "SELECT used_size, MAX(major_version) from %s " - "WHERE tenant_id = %ld AND table_id = %ld AND partition_id = %ld AND sstable_id = %ld " - "AND svr_ip = '%s' AND svr_port = %d", - OB_ALL_VIRTUAL_STORAGE_STAT_TNAME, - extract_tenant_id(index_id_), - pkey_.get_table_id(), - pkey_.get_partition_id(), - index_id_, - ip, - candidate_replica_.get_port()))) { - STORAGE_LOG(WARN, "fail to assign sql", K(ret)); - } else if (OB_FAIL(GCTX.sql_proxy_->read(res, sql.ptr()))) { - LOG_WARN("fail to execute sql", K(ret), K(sql)); - } else if (OB_ISNULL(result = res.get_result())) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "error unexpected, query result must not be NULL", K(ret)); - } else if (OB_FAIL(result->next())) { - if (OB_LIKELY(OB_ITER_END == ret)) { - ret = OB_SUCCESS; - } else { - LOG_WARN("fail to get next row", K(ret)); - } - } else { - EXTRACT_INT_FIELD_MYSQL(*result, "used_size", data_size, int64_t); - } + data_size = 0; + ObFetchSstableSizeArg arg; + ObFetchSstableSizeRes res; + arg.pkey_ = pkey_; + arg.index_id_ = index_id_; + if (OB_UNLIKELY(candidate_replica_.is_valid())) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "candidate replica is invalid", K(ret), K(candidate_replica_)); + } else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(candidate_replica_).fetch_sstable_size(arg, res))) { + STORAGE_LOG(WARN, "fail to get sstable size", K(ret), K(arg)); + } else { + data_size = res.size_; } return ret; } diff --git a/src/storage/ob_build_index_task.cpp b/src/storage/ob_build_index_task.cpp index bda16e300f..3f1a19a4d0 100644 --- a/src/storage/ob_build_index_task.cpp +++ b/src/storage/ob_build_index_task.cpp @@ -146,10 +146,11 @@ int ObBuildIndexDag::fill_comment(char* buf, const int64_t buf_len) const STORAGE_LOG(WARN, "ObBuildIndexDag has not been inited", K(ret)); } else if (OB_FAIL(databuff_printf(buf, buf_len, - "build index task: key=%s index_id=%ld snapshot_version=%ld", + "build index task: pkey=%s index_id=%ld snapshot_version=%ld parallelism=%ld", to_cstring(pkey_), index_id, - param_.snapshot_version_))) { + param_.snapshot_version_, + param_.concurrent_cnt_))) { STORAGE_LOG(WARN, "failed to fill comment", K(ret), K(pkey_)); }