diff --git a/src/observer/table_load/ob_table_load_coordinator.h b/src/observer/table_load/ob_table_load_coordinator.h index ce1b8b6d30..bb0a1d0963 100644 --- a/src/observer/table_load/ob_table_load_coordinator.h +++ b/src/observer/table_load/ob_table_load_coordinator.h @@ -8,6 +8,7 @@ #include "observer/table_load/ob_table_load_struct.h" #include "share/table/ob_table_load_array.h" #include "share/table/ob_table_load_define.h" +#include "share/table/ob_table_load_sql_statistics.h" #include "share/table/ob_table_load_row_array.h" namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_merger.cpp b/src/observer/table_load/ob_table_load_merger.cpp index 9027d51fda..fd6c347867 100644 --- a/src/observer/table_load/ob_table_load_merger.cpp +++ b/src/observer/table_load/ob_table_load_merger.cpp @@ -303,6 +303,58 @@ int ObTableLoadMerger::build_merge_ctx() return ret; } +int ObTableLoadMerger::collect_dml_stat(ObTableLoadDmlStat &dml_stats) +{ + int ret = OB_SUCCESS; + if (store_ctx_->is_fast_heap_table_) { + ObDirectLoadMultiMap tables; + ObArray trans_store_array; + if (OB_FAIL(tables.init())) { + LOG_WARN("fail to init table", KR(ret)); + } else if (OB_FAIL(store_ctx_->get_committed_trans_stores(trans_store_array))) { + LOG_WARN("fail to get trans store", KR(ret)); + } else { + for (int i = 0; OB_SUCC(ret) && i < trans_store_array.count(); ++i) { + ObTableLoadTransStore *trans_store = trans_store_array.at(i); + for (int j = 0; OB_SUCC(ret) && j < trans_store->session_store_array_.count(); ++j) { + ObTableLoadTransStore::SessionStore * session_store = trans_store->session_store_array_.at(j); + for (int k = 0 ; OB_SUCC(ret) && k < session_store->partition_table_array_.count(); ++k) { + ObIDirectLoadPartitionTable *table = session_store->partition_table_array_.at(k); + ObDirectLoadFastHeapTable *sstable = nullptr; + if (OB_ISNULL(sstable = dynamic_cast(table))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected not heap sstable", KR(ret), KPC(table)); + } else { + const ObTabletID &tablet_id = sstable->get_tablet_id(); + if (OB_FAIL(tables.add(tablet_id, sstable))) { + LOG_WARN("fail to add tables", KR(ret), KPC(sstable)); + } + } + } + } + } + for (int i = 0; OB_SUCC(ret) && i < merge_ctx_.get_tablet_merge_ctxs().count(); ++i) { + ObDirectLoadTabletMergeCtx *tablet_ctx = merge_ctx_.get_tablet_merge_ctxs().at(i); + ObArray heap_table_array ; + if (OB_FAIL(tables.get(tablet_ctx->get_tablet_id(), heap_table_array))) { + LOG_WARN("get heap sstable failed", KR(ret)); + } else if (OB_FAIL(tablet_ctx->collect_dml_stat(heap_table_array, dml_stats))) { + LOG_WARN("fail to collect sql statics", KR(ret)); + } + } + } + } else { + for (int i = 0; OB_SUCC(ret) && i < merge_ctx_.get_tablet_merge_ctxs().count(); ++i) { + ObDirectLoadTabletMergeCtx *tablet_ctx = merge_ctx_.get_tablet_merge_ctxs().at(i); + ObArray heap_table_array ; + if (OB_FAIL(tablet_ctx->collect_dml_stat(heap_table_array, dml_stats))) { + LOG_WARN("fail to collect sql statics", KR(ret)); + } + } + } + return ret; +} + int ObTableLoadMerger::collect_sql_statistics(ObTableLoadSqlStatistics &sql_statistics) { int ret = OB_SUCCESS; diff --git a/src/observer/table_load/ob_table_load_merger.h b/src/observer/table_load/ob_table_load_merger.h index ecb507d02e..3c6a18ef83 100644 --- a/src/observer/table_load/ob_table_load_merger.h +++ b/src/observer/table_load/ob_table_load_merger.h @@ -30,6 +30,7 @@ public: void stop(); int handle_table_compact_success(); int collect_sql_statistics(table::ObTableLoadSqlStatistics &sql_statistics); + int collect_dml_stat(table::ObTableLoadDmlStat &dml_stats); private: int build_merge_ctx(); int start_merge(); diff --git a/src/observer/table_load/ob_table_load_store.cpp b/src/observer/table_load/ob_table_load_store.cpp index 8c4a4d6bac..57669d4d8a 100644 --- a/src/observer/table_load/ob_table_load_store.cpp +++ b/src/observer/table_load/ob_table_load_store.cpp @@ -16,6 +16,7 @@ #include "observer/table_load/ob_table_load_trans_store.h" #include "observer/table_load/ob_table_load_utils.h" #include "storage/direct_load/ob_direct_load_insert_table_ctx.h" +#include "share/stat/ob_opt_stat_monitor_manager.h" namespace oceanbase { @@ -283,6 +284,7 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, ObTableLoadSqlS LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this)); } else { LOG_INFO("store commit"); + ObTableLoadDmlStat dml_stats; obsys::ObWLockGuard guard(store_ctx_->get_status_lock()); if (OB_FAIL(store_ctx_->check_status_unlock(ObTableLoadStatusType::MERGED))) { LOG_WARN("fail to check store status", KR(ret)); @@ -293,6 +295,10 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, ObTableLoadSqlS } else if (param_.online_opt_stat_gather_ && OB_FAIL(store_ctx_->merger_->collect_sql_statistics(sql_statistics))) { LOG_WARN("fail to collect sql stats", KR(ret)); + } else if (OB_FAIL(store_ctx_->merger_->collect_dml_stat(dml_stats))) { + LOG_WARN("fail to build dml stat", KR(ret)); + } else if (OB_FAIL(ObOptStatMonitorManager::get_instance().update_dml_stat_info_from_direct_load(dml_stats.dml_stat_array_))) { + LOG_WARN("fail to update dml stat info", KR(ret)); } else if (OB_FAIL(store_ctx_->set_status_commit_unlock())) { LOG_WARN("fail to set store status commit", KR(ret)); } else { diff --git a/src/observer/table_load/ob_table_load_store.h b/src/observer/table_load/ob_table_load_store.h index 9f2d0fa30b..751637aea9 100644 --- a/src/observer/table_load/ob_table_load_store.h +++ b/src/observer/table_load/ob_table_load_store.h @@ -9,6 +9,7 @@ #include "share/table/ob_table_load_array.h" #include "share/table/ob_table_load_define.h" #include "share/table/ob_table_load_row_array.h" +#include "share/table/ob_table_load_sql_statistics.h" namespace oceanbase { diff --git a/src/share/CMakeLists.txt b/src/share/CMakeLists.txt index db9a904e82..1a1e9b28bc 100644 --- a/src/share/CMakeLists.txt +++ b/src/share/CMakeLists.txt @@ -250,6 +250,7 @@ ob_set_subtarget(ob_share common_mixed detect/ob_detect_rpc_proxy.cpp detect/ob_detect_rpc_processor.cpp detect/ob_detect_manager_utils.cpp + table/ob_table_load_sql_statistics.cpp ) ob_set_subtarget(ob_share tablet diff --git a/src/share/stat/ob_opt_stat_monitor_manager.cpp b/src/share/stat/ob_opt_stat_monitor_manager.cpp index bbb3f323ed..9872adeff2 100644 --- a/src/share/stat/ob_opt_stat_monitor_manager.cpp +++ b/src/share/stat/ob_opt_stat_monitor_manager.cpp @@ -495,11 +495,9 @@ int ObOptStatMonitorManager::update_tenant_dml_stat_info(uint64_t tenant_id) ObSqlString value_sql; int count = 0; for (auto iter = dml_stat_map->begin(); OB_SUCC(ret) && iter != dml_stat_map->end(); ++iter) { - if (OB_FAIL(get_dml_stat_sql(tenant_id, - iter->first, - iter->second, - 0 != count, // need_add_comma - value_sql))) { + if (OB_FAIL(get_dml_stat_sql(tenant_id, iter->second, + 0 != count, // need_add_comma + value_sql))) { LOG_WARN("failed to get dml stat sql", K(ret)); } else if (UPDATE_OPT_STAT_BATCH_CNT == ++count) { if (OB_FAIL(exec_insert_monitor_modified_sql(tenant_id, value_sql))) { @@ -917,24 +915,25 @@ int ObOptStatMonitorManager::exec_insert_monitor_modified_sql(uint64_t tenant_id LOG_WARN("failed to append string", K(ret)); } else if (OB_FAIL(mysql_proxy_->write(tenant_id, insert_sql.ptr(), affected_rows))) { LOG_WARN("fail to exec sql", K(insert_sql), K(ret)); + } else { + LOG_TRACE("succeed to exec insert monitor modified sql", K(tenant_id), K(values_sql)); } return ret; } int ObOptStatMonitorManager::get_dml_stat_sql(const uint64_t tenant_id, - const StatKey &dml_stat_key, const ObOptDmlStat &dml_stat, const bool need_add_comma, ObSqlString &sql_string) { int ret = OB_SUCCESS; share::ObDMLSqlSplicer dml_splicer; - uint64_t table_id = dml_stat_key.first; + uint64_t table_id = dml_stat.table_id_; uint64_t ext_tenant_id = share::schema::ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id); uint64_t pure_table_id = share::schema::ObSchemaUtils::get_extract_schema_id(tenant_id, table_id); if (OB_FAIL(dml_splicer.add_pk_column("tenant_id", ext_tenant_id)) || OB_FAIL(dml_splicer.add_pk_column("table_id", pure_table_id)) || - OB_FAIL(dml_splicer.add_pk_column("tablet_id", dml_stat_key.second)) || + OB_FAIL(dml_splicer.add_pk_column("tablet_id", dml_stat.tablet_id_)) || OB_FAIL(dml_splicer.add_column("inserts", dml_stat.insert_row_count_)) || OB_FAIL(dml_splicer.add_column("updates", dml_stat.update_row_count_)) || OB_FAIL(dml_splicer.add_column("deletes", dml_stat.delete_row_count_))) { @@ -998,6 +997,38 @@ int ObOptStatMonitorManager::clean_useless_dml_stat_info(uint64_t tenant_id) return ret; } +int ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(const ObIArray &dml_stats) +{ + int ret = OB_SUCCESS; + ObSqlString value_sql; + int count = 0; + uint64_t tenant_id = 0; + LOG_TRACE("begin to update dml stat info from direct load", K(dml_stats)); + for (int64_t i = 0; OB_SUCC(ret) && i < dml_stats.count(); ++i) { + if (OB_ISNULL(dml_stats.at(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpcted error", K(ret), K(dml_stats.at(i))); + } else { + tenant_id = dml_stats.at(i)->tenant_id_; + if (OB_FAIL(get_dml_stat_sql(tenant_id, *dml_stats.at(i), 0 != count, value_sql))) { + LOG_WARN("failed to get dml stat sql", K(ret)); + } else if (UPDATE_OPT_STAT_BATCH_CNT == ++count) { + if (OB_FAIL(exec_insert_monitor_modified_sql(tenant_id, value_sql))) { + LOG_WARN("failed to exec insert sql", K(ret)); + } else { + count = 0; + value_sql.reset(); + } + } + } + } + if (OB_SUCC(ret) && count != 0) { + if (OB_FAIL(exec_insert_monitor_modified_sql(tenant_id, value_sql))) { + LOG_WARN("failed to exec insert sql", K(ret)); + } + } + return ret; +} } } diff --git a/src/share/stat/ob_opt_stat_monitor_manager.h b/src/share/stat/ob_opt_stat_monitor_manager.h index b1919f5afa..9560c8b6ab 100644 --- a/src/share/stat/ob_opt_stat_monitor_manager.h +++ b/src/share/stat/ob_opt_stat_monitor_manager.h @@ -168,7 +168,6 @@ public: const bool need_add_comma, ObSqlString &sql_string); int get_dml_stat_sql(const uint64_t tenant_id, - const StatKey &dml_stat_key, const ObOptDmlStat &dml_stat, const bool need_add_comma, ObSqlString &sql_string); @@ -190,6 +189,8 @@ public: int clean_useless_dml_stat_info(uint64_t tenant_id); + int update_dml_stat_info_from_direct_load(const ObIArray &dml_stats); + private: DISALLOW_COPY_AND_ASSIGN(ObOptStatMonitorManager); const static int64_t UPDATE_OPT_STAT_BATCH_CNT = 200; diff --git a/src/share/table/ob_table_load_define.cpp b/src/share/table/ob_table_load_define.cpp index f221763501..9125f69a3a 100644 --- a/src/share/table/ob_table_load_define.cpp +++ b/src/share/table/ob_table_load_define.cpp @@ -41,101 +41,5 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableLoadResultInfo, skipped_, warnings_); -OB_DEF_SERIALIZE(ObTableLoadSqlStatistics) -{ - int ret = OB_SUCCESS; - OB_UNIS_ENCODE(table_stat_array_.count()); - for (int64_t i = 0; OB_SUCC(ret) && i < table_stat_array_.count(); i++) { - if (table_stat_array_.at(i) != nullptr) { - OB_UNIS_ENCODE(*table_stat_array_.at(i)); - } - } - OB_UNIS_ENCODE(col_stat_array_.count()); - for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); i++) { - if (col_stat_array_.at(i) != nullptr) { - OB_UNIS_ENCODE(*col_stat_array_.at(i)); - } - } - return ret; -} - -OB_DEF_DESERIALIZE(ObTableLoadSqlStatistics) -{ - int ret = OB_SUCCESS; - reset(); - int64_t size = 0; - OB_UNIS_DECODE(size) - for (int64_t i = 0; OB_SUCC(ret) && i < size; ++i) { - ObOptTableStat table_stat; - ObOptTableStat *copied_table_stat = nullptr; - if (OB_FAIL(table_stat.deserialize(buf, data_len, pos))) { - LOG_WARN("deserialize datum store failed", K(ret), K(i)); - } else { - int64_t size = table_stat.size(); - char *new_buf = nullptr; - if (OB_ISNULL(new_buf = static_cast(allocator_.alloc(size)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - OB_LOG(WARN, "fail to allocate buffer", KR(ret), K(size)); - } else if (OB_FAIL(table_stat.deep_copy(new_buf, size, copied_table_stat))) { - OB_LOG(WARN, "fail to copy table stat", KR(ret)); - } else if (OB_FAIL(table_stat_array_.push_back(copied_table_stat))) { - OB_LOG(WARN, "fail to add table stat", KR(ret)); - } - if (OB_FAIL(ret)) { - if (copied_table_stat != nullptr) { - copied_table_stat->~ObOptTableStat(); - copied_table_stat = nullptr; - } - if(new_buf != nullptr) { - allocator_.free(new_buf); - new_buf = nullptr; - } - } - } - } - size = 0; - OB_UNIS_DECODE(size) - for (int64_t i = 0; OB_SUCC(ret) && i < size; ++i) { - ObOptOSGColumnStat *osg_col_stat = NULL; - if (OB_ISNULL(osg_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_)) || - OB_ISNULL(osg_col_stat->col_stat_)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - OB_LOG(WARN, "failed to create col stat"); - } else if (OB_FAIL(osg_col_stat->deserialize(buf, data_len, pos))) { - OB_LOG(WARN, "deserialize datum store failed", K(ret), K(i)); - } else if (OB_FAIL(osg_col_stat->deep_copy(*osg_col_stat))) { - OB_LOG(WARN, "fail to deep copy", K(ret)); - } else if (OB_FAIL(col_stat_array_.push_back(osg_col_stat))) { - OB_LOG(WARN, "fail to add table stat", KR(ret)); - } - if (OB_FAIL(ret)) { - if (osg_col_stat != nullptr) { - osg_col_stat->~ObOptOSGColumnStat(); - osg_col_stat = nullptr; - } - } - } - return ret; -} - -OB_DEF_SERIALIZE_SIZE(ObTableLoadSqlStatistics) -{ - int ret = OB_SUCCESS; - int64_t len = 0; - OB_UNIS_ADD_LEN(table_stat_array_.count()); - for (int64_t i = 0; OB_SUCC(ret) && i < table_stat_array_.count(); i++) { - if (table_stat_array_.at(i) != nullptr) { - OB_UNIS_ADD_LEN(*table_stat_array_.at(i)); - } - } - OB_UNIS_ADD_LEN(col_stat_array_.count()); - for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); i++) { - if (col_stat_array_.at(i) != nullptr) { - OB_UNIS_ADD_LEN(*col_stat_array_.at(i)); - } - } - return len; -} - } // namespace table } // namespace oceanbase diff --git a/src/share/table/ob_table_load_define.h b/src/share/table/ob_table_load_define.h index f6e2a6a189..09f65dd717 100644 --- a/src/share/table/ob_table_load_define.h +++ b/src/share/table/ob_table_load_define.h @@ -11,9 +11,6 @@ #include "lib/oblog/ob_log_module.h" #include "lib/utility/ob_print_utils.h" #include "share/rc/ob_tenant_base.h" -#include "share/stat/ob_opt_table_stat.h" -#include "share/stat/ob_opt_column_stat.h" -#include "share/stat/ob_opt_osg_column_stat.h" namespace oceanbase { @@ -409,165 +406,5 @@ public: uint64_t warnings_ CACHE_ALIGNED; }; -struct ObTableLoadSqlStatistics -{ - OB_UNIS_VERSION(1); -public: - ObTableLoadSqlStatistics() : allocator_("TLD_Opstat") { allocator_.set_tenant_id(MTL_ID()); } - ~ObTableLoadSqlStatistics() { reset();} - void reset() { - for (int64_t i = 0; i < col_stat_array_.count(); ++i) { - ObOptOSGColumnStat *col_stat = col_stat_array_.at(i); - if (col_stat != nullptr) { - col_stat->~ObOptOSGColumnStat(); - } - } - col_stat_array_.reset(); - for (int64_t i = 0; i < table_stat_array_.count(); ++i) { - ObOptTableStat *table_stat = table_stat_array_.at(i); - if (table_stat != nullptr) { - table_stat->~ObOptTableStat(); - } - } - table_stat_array_.reset(); - allocator_.reset(); - }; - bool is_empty() const - { - return table_stat_array_.count() == 0 || col_stat_array_.count() == 0; - } - int allocate_table_stat(ObOptTableStat *&table_stat) - { - int ret = OB_SUCCESS; - ObOptTableStat *new_table_stat = OB_NEWx(ObOptTableStat, (&allocator_)); - if (OB_ISNULL(new_table_stat)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - OB_LOG(WARN, "fail to allocate buffer", KR(ret)); - } else if (OB_FAIL(table_stat_array_.push_back(new_table_stat))) { - OB_LOG(WARN, "fail to push back", KR(ret)); - } else { - table_stat = new_table_stat; - } - if (OB_FAIL(ret)) { - if (new_table_stat != nullptr) { - new_table_stat->~ObOptTableStat(); - allocator_.free(new_table_stat); - new_table_stat = nullptr; - } - } - return ret; - } - int allocate_col_stat(ObOptOSGColumnStat *&col_stat) - { - int ret = OB_SUCCESS; - ObOptOSGColumnStat *new_osg_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_); - if (OB_ISNULL(new_osg_col_stat) || OB_ISNULL(new_osg_col_stat->col_stat_)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - OB_LOG(WARN, "fail to allocate buffer", KR(ret)); - } else if (OB_FAIL(col_stat_array_.push_back(new_osg_col_stat))) { - OB_LOG(WARN, "fail to push back", KR(ret)); - } else { - col_stat = new_osg_col_stat; - } - if (OB_FAIL(ret)) { - if (new_osg_col_stat != nullptr) { - new_osg_col_stat->~ObOptOSGColumnStat(); - allocator_.free(new_osg_col_stat); - new_osg_col_stat = nullptr; - } - } - return ret; - } - int add(const ObTableLoadSqlStatistics& other) - { - int ret = OB_SUCCESS; - for (int64_t i = 0; OB_SUCC(ret)&& i < other.table_stat_array_.count(); ++i) { - ObOptTableStat *table_stat = other.table_stat_array_.at(i); - if (table_stat != nullptr) { - ObOptTableStat *copied_table_stat = nullptr; - int64_t size = table_stat->size(); - char *new_buf = nullptr; - if (OB_ISNULL(new_buf = static_cast(allocator_.alloc(size)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - OB_LOG(WARN, "fail to allocate buffer", KR(ret), K(size)); - } else if (OB_FAIL(table_stat->deep_copy(new_buf, size, copied_table_stat))) { - OB_LOG(WARN, "fail to copy table stat", KR(ret)); - } else if (OB_FAIL(table_stat_array_.push_back(copied_table_stat))) { - OB_LOG(WARN, "fail to add table stat", KR(ret)); - } - if (OB_FAIL(ret)) { - if (copied_table_stat != nullptr) { - copied_table_stat->~ObOptTableStat(); - copied_table_stat = nullptr; - } - if(new_buf != nullptr) { - allocator_.free(new_buf); - new_buf = nullptr; - } - } - } - } - for (int64_t i = 0; OB_SUCC(ret)&& i < other.col_stat_array_.count(); ++i) { - ObOptOSGColumnStat *col_stat = other.col_stat_array_.at(i); - ObOptOSGColumnStat *copied_col_stat = nullptr; - if (OB_ISNULL(col_stat)) { - ret = OB_ERR_UNEXPECTED; - OB_LOG(WARN, "get unexpected null"); - } else if (OB_ISNULL(copied_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - OB_LOG(WARN, "failed to create new col stat"); - } else if (OB_FAIL(copied_col_stat->deep_copy(*col_stat))) { - OB_LOG(WARN, "fail to copy col stat", KR(ret)); - } else if (OB_FAIL(col_stat_array_.push_back(copied_col_stat))) { - OB_LOG(WARN, "fail to add col stat", KR(ret)); - } - if (OB_FAIL(ret)) { - if (copied_col_stat != nullptr) { - copied_col_stat->~ObOptOSGColumnStat(); - copied_col_stat = nullptr; - } - } - } - return ret; - } - - int get_col_stat_array(ObIArray &col_stat_array) - { - int ret = OB_SUCCESS; - for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); ++i) { - if (OB_ISNULL(col_stat_array_.at(i))) { - ret = OB_ERR_UNEXPECTED; - OB_LOG(WARN, "get unexpected null"); - } else if (OB_FAIL(col_stat_array_.at(i)->set_min_max_datum_to_obj())) { - OB_LOG(WARN, "failed to persistence min max"); - } else if (OB_FAIL(col_stat_array.push_back(col_stat_array_.at(i)->col_stat_))) { - OB_LOG(WARN, "failed to push back col stat"); - } - } - return ret; - } - - int persistence_col_stats() - { - int ret = OB_SUCCESS; - for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); ++i) { - if (OB_ISNULL(col_stat_array_.at(i))) { - ret = OB_ERR_UNEXPECTED; - OB_LOG(WARN, "get unexpected null"); - } else if (OB_FAIL(col_stat_array_.at(i)->set_min_max_datum_to_obj())) { - OB_LOG(WARN, "failed to persistence min max"); - } - } - return ret; - } - - TO_STRING_KV(K_(col_stat_array), K_(table_stat_array)); -public: - common::ObSEArray table_stat_array_; - common::ObSEArray col_stat_array_; - common::ObArenaAllocator allocator_; -}; - - } // namespace table } // namespace oceanbase diff --git a/src/share/table/ob_table_load_dml_stat.h b/src/share/table/ob_table_load_dml_stat.h new file mode 100644 index 0000000000..f208b1e958 --- /dev/null +++ b/src/share/table/ob_table_load_dml_stat.h @@ -0,0 +1,70 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#pragma once + +#include "share/stat/ob_stat_define.h" + +namespace oceanbase +{ +namespace common +{ +class ObOptDmlStat; +} // namespace common +namespace table +{ +struct ObTableLoadDmlStat +{ +public: + ObTableLoadDmlStat() : allocator_("TLD_Dmlstat") { allocator_.set_tenant_id(MTL_ID()); } + ~ObTableLoadDmlStat() { reset(); } + void reset() + { + for (int64_t i = 0; i < dml_stat_array_.count(); ++i) { + ObOptDmlStat *col_stat = dml_stat_array_.at(i); + if (col_stat != nullptr) { + col_stat->~ObOptDmlStat(); + } + } + dml_stat_array_.reset(); + allocator_.reset(); + } + bool is_empty() const { return dml_stat_array_.count() == 0; } + int allocate_dml_stat(ObOptDmlStat *&dml_stat) + { + int ret = OB_SUCCESS; + ObOptDmlStat *new_dml_stat = OB_NEWx(ObOptDmlStat, (&allocator_)); + if (OB_ISNULL(new_dml_stat)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "fail to allocate buffer", KR(ret)); + } else if (OB_FAIL(dml_stat_array_.push_back(new_dml_stat))) { + OB_LOG(WARN, "fail to push back", KR(ret)); + } else { + dml_stat = new_dml_stat; + } + if (OB_FAIL(ret)) { + if (new_dml_stat != nullptr) { + new_dml_stat->~ObOptDmlStat(); + allocator_.free(new_dml_stat); + new_dml_stat = nullptr; + } + } + return ret; + } + TO_STRING_KV(K_(dml_stat_array)); +public: + common::ObSEArray dml_stat_array_; + common::ObArenaAllocator allocator_; +}; + +} // namespace table +} // namespace oceanbase diff --git a/src/share/table/ob_table_load_rpc_struct.h b/src/share/table/ob_table_load_rpc_struct.h index 668f04cd91..58300bfbb1 100644 --- a/src/share/table/ob_table_load_rpc_struct.h +++ b/src/share/table/ob_table_load_rpc_struct.h @@ -8,6 +8,7 @@ #include "lib/net/ob_addr.h" #include "ob_table_load_array.h" #include "ob_table_load_define.h" +#include "share/table/ob_table_load_sql_statistics.h" #include "share/table/ob_table_load_row_array.h" #include "sql/resolver/cmd/ob_load_data_stmt.h" #include "sql/session/ob_sql_session_info.h" diff --git a/src/share/table/ob_table_load_sql_statistics.cpp b/src/share/table/ob_table_load_sql_statistics.cpp new file mode 100644 index 0000000000..134257ffe9 --- /dev/null +++ b/src/share/table/ob_table_load_sql_statistics.cpp @@ -0,0 +1,263 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX CLIENT + +#include "ob_table_load_sql_statistics.h" + +namespace oceanbase +{ +namespace table +{ + +void ObTableLoadSqlStatistics::reset() +{ + for (int64_t i = 0; i < col_stat_array_.count(); ++i) { + ObOptOSGColumnStat *col_stat = col_stat_array_.at(i); + if (col_stat != nullptr) { + col_stat->~ObOptOSGColumnStat(); + } + } + col_stat_array_.reset(); + for (int64_t i = 0; i < table_stat_array_.count(); ++i) { + ObOptTableStat *table_stat = table_stat_array_.at(i); + if (table_stat != nullptr) { + table_stat->~ObOptTableStat(); + } + } + table_stat_array_.reset(); + allocator_.reset(); +} + +int ObTableLoadSqlStatistics::allocate_table_stat(ObOptTableStat *&table_stat) +{ + int ret = OB_SUCCESS; + ObOptTableStat *new_table_stat = OB_NEWx(ObOptTableStat, (&allocator_)); + if (OB_ISNULL(new_table_stat)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "fail to allocate buffer", KR(ret)); + } else if (OB_FAIL(table_stat_array_.push_back(new_table_stat))) { + OB_LOG(WARN, "fail to push back", KR(ret)); + } else { + table_stat = new_table_stat; + } + if (OB_FAIL(ret)) { + if (new_table_stat != nullptr) { + new_table_stat->~ObOptTableStat(); + allocator_.free(new_table_stat); + new_table_stat = nullptr; + } + } + return ret; +} + +int ObTableLoadSqlStatistics::allocate_col_stat(ObOptOSGColumnStat *&col_stat) +{ + int ret = OB_SUCCESS; + ObOptOSGColumnStat *new_osg_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_); + if (OB_ISNULL(new_osg_col_stat) || OB_ISNULL(new_osg_col_stat->col_stat_)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "fail to allocate buffer", KR(ret)); + } else if (OB_FAIL(col_stat_array_.push_back(new_osg_col_stat))) { + OB_LOG(WARN, "fail to push back", KR(ret)); + } else { + col_stat = new_osg_col_stat; + } + if (OB_FAIL(ret)) { + if (new_osg_col_stat != nullptr) { + new_osg_col_stat->~ObOptOSGColumnStat(); + allocator_.free(new_osg_col_stat); + new_osg_col_stat = nullptr; + } + } + return ret; +} + +int ObTableLoadSqlStatistics::add(const ObTableLoadSqlStatistics& other) +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret)&& i < other.table_stat_array_.count(); ++i) { + ObOptTableStat *table_stat = other.table_stat_array_.at(i); + if (table_stat != nullptr) { + ObOptTableStat *copied_table_stat = nullptr; + int64_t size = table_stat->size(); + char *new_buf = nullptr; + if (OB_ISNULL(new_buf = static_cast(allocator_.alloc(size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "fail to allocate buffer", KR(ret), K(size)); + } else if (OB_FAIL(table_stat->deep_copy(new_buf, size, copied_table_stat))) { + OB_LOG(WARN, "fail to copy table stat", KR(ret)); + } else if (OB_FAIL(table_stat_array_.push_back(copied_table_stat))) { + OB_LOG(WARN, "fail to add table stat", KR(ret)); + } + if (OB_FAIL(ret)) { + if (copied_table_stat != nullptr) { + copied_table_stat->~ObOptTableStat(); + copied_table_stat = nullptr; + } + if(new_buf != nullptr) { + allocator_.free(new_buf); + new_buf = nullptr; + } + } + } + } + for (int64_t i = 0; OB_SUCC(ret)&& i < other.col_stat_array_.count(); ++i) { + ObOptOSGColumnStat *col_stat = other.col_stat_array_.at(i); + ObOptOSGColumnStat *copied_col_stat = nullptr; + if (OB_ISNULL(col_stat)) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "get unexpected null"); + } else if (OB_ISNULL(copied_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "failed to create new col stat"); + } else if (OB_FAIL(copied_col_stat->deep_copy(*col_stat))) { + OB_LOG(WARN, "fail to copy col stat", KR(ret)); + } else if (OB_FAIL(col_stat_array_.push_back(copied_col_stat))) { + OB_LOG(WARN, "fail to add col stat", KR(ret)); + } + if (OB_FAIL(ret)) { + if (copied_col_stat != nullptr) { + copied_col_stat->~ObOptOSGColumnStat(); + copied_col_stat = nullptr; + } + } + } + return ret; +} + +int ObTableLoadSqlStatistics::get_col_stat_array(ObIArray &col_stat_array) +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); ++i) { + if (OB_ISNULL(col_stat_array_.at(i))) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "get unexpected null"); + } else if (OB_FAIL(col_stat_array_.at(i)->set_min_max_datum_to_obj())) { + OB_LOG(WARN, "failed to persistence min max"); + } else if (OB_FAIL(col_stat_array.push_back(col_stat_array_.at(i)->col_stat_))) { + OB_LOG(WARN, "failed to push back col stat"); + } + } + return ret; +} + +int ObTableLoadSqlStatistics::persistence_col_stats() +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); ++i) { + if (OB_ISNULL(col_stat_array_.at(i))) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "get unexpected null"); + } else if (OB_FAIL(col_stat_array_.at(i)->set_min_max_datum_to_obj())) { + OB_LOG(WARN, "failed to persistence min max"); + } + } + return ret; +} + +OB_DEF_SERIALIZE(ObTableLoadSqlStatistics) +{ + int ret = OB_SUCCESS; + OB_UNIS_ENCODE(table_stat_array_.count()); + for (int64_t i = 0; OB_SUCC(ret) && i < table_stat_array_.count(); i++) { + if (table_stat_array_.at(i) != nullptr) { + OB_UNIS_ENCODE(*table_stat_array_.at(i)); + } + } + OB_UNIS_ENCODE(col_stat_array_.count()); + for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); i++) { + if (col_stat_array_.at(i) != nullptr) { + OB_UNIS_ENCODE(*col_stat_array_.at(i)); + } + } + return ret; +} + +OB_DEF_DESERIALIZE(ObTableLoadSqlStatistics) +{ + int ret = OB_SUCCESS; + reset(); + int64_t size = 0; + OB_UNIS_DECODE(size) + for (int64_t i = 0; OB_SUCC(ret) && i < size; ++i) { + ObOptTableStat table_stat; + ObOptTableStat *copied_table_stat = nullptr; + if (OB_FAIL(table_stat.deserialize(buf, data_len, pos))) { + LOG_WARN("deserialize datum store failed", K(ret), K(i)); + } else { + int64_t size = table_stat.size(); + char *new_buf = nullptr; + if (OB_ISNULL(new_buf = static_cast(allocator_.alloc(size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "fail to allocate buffer", KR(ret), K(size)); + } else if (OB_FAIL(table_stat.deep_copy(new_buf, size, copied_table_stat))) { + OB_LOG(WARN, "fail to copy table stat", KR(ret)); + } else if (OB_FAIL(table_stat_array_.push_back(copied_table_stat))) { + OB_LOG(WARN, "fail to add table stat", KR(ret)); + } + if (OB_FAIL(ret)) { + if (copied_table_stat != nullptr) { + copied_table_stat->~ObOptTableStat(); + copied_table_stat = nullptr; + } + if(new_buf != nullptr) { + allocator_.free(new_buf); + new_buf = nullptr; + } + } + } + } + size = 0; + OB_UNIS_DECODE(size) + for (int64_t i = 0; OB_SUCC(ret) && i < size; ++i) { + ObOptOSGColumnStat *osg_col_stat = NULL; + if (OB_ISNULL(osg_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_)) || + OB_ISNULL(osg_col_stat->col_stat_)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + OB_LOG(WARN, "failed to create col stat"); + } else if (OB_FAIL(osg_col_stat->deserialize(buf, data_len, pos))) { + OB_LOG(WARN, "deserialize datum store failed", K(ret), K(i)); + } else if (OB_FAIL(col_stat_array_.push_back(osg_col_stat))) { + OB_LOG(WARN, "fail to add table stat", KR(ret)); + } + if (OB_FAIL(ret)) { + if (osg_col_stat != nullptr) { + osg_col_stat->~ObOptOSGColumnStat(); + osg_col_stat = nullptr; + } + } + } + return ret; +} + +OB_DEF_SERIALIZE_SIZE(ObTableLoadSqlStatistics) +{ + int ret = OB_SUCCESS; + int64_t len = 0; + OB_UNIS_ADD_LEN(table_stat_array_.count()); + for (int64_t i = 0; OB_SUCC(ret) && i < table_stat_array_.count(); i++) { + if (table_stat_array_.at(i) != nullptr) { + OB_UNIS_ADD_LEN(*table_stat_array_.at(i)); + } + } + OB_UNIS_ADD_LEN(col_stat_array_.count()); + for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); i++) { + if (col_stat_array_.at(i) != nullptr) { + OB_UNIS_ADD_LEN(*col_stat_array_.at(i)); + } + } + return len; +} + +} // namespace table +} // namespace oceanbase diff --git a/src/share/table/ob_table_load_sql_statistics.h b/src/share/table/ob_table_load_sql_statistics.h new file mode 100644 index 0000000000..c4a94aee80 --- /dev/null +++ b/src/share/table/ob_table_load_sql_statistics.h @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#pragma once + +#include "share/stat/ob_opt_table_stat.h" +#include "share/stat/ob_opt_column_stat.h" +#include "share/stat/ob_opt_osg_column_stat.h" + +namespace oceanbase +{ +namespace table +{ + +struct ObTableLoadSqlStatistics +{ + OB_UNIS_VERSION(1); +public: + ObTableLoadSqlStatistics() : allocator_("TLD_Opstat") { allocator_.set_tenant_id(MTL_ID()); } + ~ObTableLoadSqlStatistics() { reset(); } + void reset(); + bool is_empty() const + { + return table_stat_array_.count() == 0 || col_stat_array_.count() == 0; + } + int allocate_table_stat(ObOptTableStat *&table_stat); + int allocate_col_stat(ObOptOSGColumnStat *&col_stat); + int add(const ObTableLoadSqlStatistics& other); + int get_col_stat_array(ObIArray &col_stat_array); + int persistence_col_stats(); + TO_STRING_KV(K_(col_stat_array), K_(table_stat_array)); +public: + common::ObSEArray table_stat_array_; + common::ObSEArray col_stat_array_; + common::ObArenaAllocator allocator_; +}; + +} // namespace table +} // namespace oceanbase diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp index 24e91687f8..f825c7fdb5 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp @@ -211,7 +211,6 @@ int ObDirectLoadTabletMergeCtx::collect_sql_statistics( int64_t table_avg_len = 0; int64_t col_cnt = param_.table_data_desc_.column_count_; ObOptTableStat *table_stat = nullptr; - ObOptDmlStat dml_stat; StatLevel stat_level; if (table_schema->get_part_level() == PARTITION_LEVEL_ZERO) { stat_level = TABLE_LEVEL; @@ -287,13 +286,6 @@ int ObDirectLoadTabletMergeCtx::collect_sql_statistics( table_stat->set_object_type(stat_level); table_stat->set_row_count(table_row_cnt); table_stat->set_avg_row_size(table_avg_len); - dml_stat.tenant_id_ = tenant_id; - dml_stat.table_id_ = param_.target_table_id_; - dml_stat.tablet_id_ = get_target_tablet_id().id(); - dml_stat.insert_row_count_ = table_row_cnt; - if (OB_FAIL(ObOptStatMonitorManager::get_instance().update_local_cache(tenant_id, dml_stat))) { - LOG_WARN("failed to update dml stat local cache", K(ret)); - } } // persistence col stat once a merge task finished if (OB_SUCC(ret) && OB_FAIL(sql_statistics.persistence_col_stats())) { @@ -304,6 +296,32 @@ int ObDirectLoadTabletMergeCtx::collect_sql_statistics( return ret; } +int ObDirectLoadTabletMergeCtx::collect_dml_stat(const common::ObIArray &fast_heap_table_array, ObTableLoadDmlStat &dml_stats) +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = MTL_ID(); + int64_t insert_row_cnt = 0; + ObOptDmlStat *dml_stat = nullptr; + if (OB_FAIL(dml_stats.allocate_dml_stat(dml_stat))) { + LOG_WARN("fail to allocate table stat", KR(ret)); + } else { + // scan task_array + for (int64_t i = 0; OB_SUCC(ret) && i < task_array_.count(); ++i) { + insert_row_cnt += task_array_.at(i)->get_row_count(); + } + // scan fast heap table + for (int64_t i = 0; OB_SUCC(ret) && i < fast_heap_table_array.count(); ++i) { + insert_row_cnt += fast_heap_table_array.at(i)->get_row_count(); + } + dml_stat->tenant_id_ = tenant_id; + dml_stat->table_id_ = param_.target_table_id_; + dml_stat->tablet_id_ = target_tablet_id_.id(); + dml_stat->insert_row_count_ = insert_row_cnt; + } + return ret; +} + + int ObDirectLoadTabletMergeCtx::init_sstable_array( const ObIArray &table_array) { diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.h b/src/storage/direct_load/ob_direct_load_merge_ctx.h index 64e7d4cc76..36ff3bec26 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.h +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.h @@ -12,6 +12,8 @@ #include "share/stat/ob_opt_osg_column_stat.h" #include "share/stat/ob_opt_table_stat.h" #include "share/table/ob_table_load_define.h" +#include "share/table/ob_table_load_dml_stat.h" +#include "share/table/ob_table_load_sql_statistics.h" #include "storage/direct_load/ob_direct_load_origin_table.h" #include "storage/direct_load/ob_direct_load_table_data_desc.h" #include "storage/direct_load/ob_direct_load_fast_heap_table.h" @@ -101,6 +103,8 @@ public: int inc_finish_count(bool &is_ready); int collect_sql_statistics( const common::ObIArray &fast_heap_table_array, table::ObTableLoadSqlStatistics &sql_statistics); + int collect_dml_stat(const common::ObIArray &fast_heap_table_array, + table::ObTableLoadDmlStat &dml_stats); const ObDirectLoadMergeParam &get_param() const { return param_; } const common::ObTabletID &get_tablet_id() const { return tablet_id_; } const common::ObTabletID &get_target_tablet_id() const { return target_tablet_id_; } diff --git a/src/storage/tx_storage/ob_access_service.cpp b/src/storage/tx_storage/ob_access_service.cpp index efadd657ad..60894fc3dc 100644 --- a/src/storage/tx_storage/ob_access_service.cpp +++ b/src/storage/tx_storage/ob_access_service.cpp @@ -663,7 +663,7 @@ int ObAccessService::insert_rows( column_ids, row_iter, affected_rows); - if (OB_SUCC(ret)) { + if (OB_SUCC(ret) && !dml_param.is_direct_insert()) { int tmp_ret = audit_tablet_opt_dml_stat(dml_param, tablet_id, ObOptDmlStatType::TABLET_OPT_INSERT_STAT,