fix dml stat

This commit is contained in:
yongshige
2023-06-09 02:18:12 +00:00
committed by ob-robot
parent b371559fcb
commit ad49eedade
17 changed files with 516 additions and 277 deletions

View File

@ -8,6 +8,7 @@
#include "observer/table_load/ob_table_load_struct.h"
#include "share/table/ob_table_load_array.h"
#include "share/table/ob_table_load_define.h"
#include "share/table/ob_table_load_sql_statistics.h"
#include "share/table/ob_table_load_row_array.h"
namespace oceanbase

View File

@ -303,6 +303,58 @@ int ObTableLoadMerger::build_merge_ctx()
return ret;
}
int ObTableLoadMerger::collect_dml_stat(ObTableLoadDmlStat &dml_stats)
{
int ret = OB_SUCCESS;
if (store_ctx_->is_fast_heap_table_) {
ObDirectLoadMultiMap<ObTabletID, ObDirectLoadFastHeapTable *> tables;
ObArray<ObTableLoadTransStore *> trans_store_array;
if (OB_FAIL(tables.init())) {
LOG_WARN("fail to init table", KR(ret));
} else if (OB_FAIL(store_ctx_->get_committed_trans_stores(trans_store_array))) {
LOG_WARN("fail to get trans store", KR(ret));
} else {
for (int i = 0; OB_SUCC(ret) && i < trans_store_array.count(); ++i) {
ObTableLoadTransStore *trans_store = trans_store_array.at(i);
for (int j = 0; OB_SUCC(ret) && j < trans_store->session_store_array_.count(); ++j) {
ObTableLoadTransStore::SessionStore * session_store = trans_store->session_store_array_.at(j);
for (int k = 0 ; OB_SUCC(ret) && k < session_store->partition_table_array_.count(); ++k) {
ObIDirectLoadPartitionTable *table = session_store->partition_table_array_.at(k);
ObDirectLoadFastHeapTable *sstable = nullptr;
if (OB_ISNULL(sstable = dynamic_cast<ObDirectLoadFastHeapTable *>(table))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected not heap sstable", KR(ret), KPC(table));
} else {
const ObTabletID &tablet_id = sstable->get_tablet_id();
if (OB_FAIL(tables.add(tablet_id, sstable))) {
LOG_WARN("fail to add tables", KR(ret), KPC(sstable));
}
}
}
}
}
for (int i = 0; OB_SUCC(ret) && i < merge_ctx_.get_tablet_merge_ctxs().count(); ++i) {
ObDirectLoadTabletMergeCtx *tablet_ctx = merge_ctx_.get_tablet_merge_ctxs().at(i);
ObArray<ObDirectLoadFastHeapTable *> heap_table_array ;
if (OB_FAIL(tables.get(tablet_ctx->get_tablet_id(), heap_table_array))) {
LOG_WARN("get heap sstable failed", KR(ret));
} else if (OB_FAIL(tablet_ctx->collect_dml_stat(heap_table_array, dml_stats))) {
LOG_WARN("fail to collect sql statics", KR(ret));
}
}
}
} else {
for (int i = 0; OB_SUCC(ret) && i < merge_ctx_.get_tablet_merge_ctxs().count(); ++i) {
ObDirectLoadTabletMergeCtx *tablet_ctx = merge_ctx_.get_tablet_merge_ctxs().at(i);
ObArray<ObDirectLoadFastHeapTable *> heap_table_array ;
if (OB_FAIL(tablet_ctx->collect_dml_stat(heap_table_array, dml_stats))) {
LOG_WARN("fail to collect sql statics", KR(ret));
}
}
}
return ret;
}
int ObTableLoadMerger::collect_sql_statistics(ObTableLoadSqlStatistics &sql_statistics)
{
int ret = OB_SUCCESS;

View File

@ -30,6 +30,7 @@ public:
void stop();
int handle_table_compact_success();
int collect_sql_statistics(table::ObTableLoadSqlStatistics &sql_statistics);
int collect_dml_stat(table::ObTableLoadDmlStat &dml_stats);
private:
int build_merge_ctx();
int start_merge();

View File

@ -16,6 +16,7 @@
#include "observer/table_load/ob_table_load_trans_store.h"
#include "observer/table_load/ob_table_load_utils.h"
#include "storage/direct_load/ob_direct_load_insert_table_ctx.h"
#include "share/stat/ob_opt_stat_monitor_manager.h"
namespace oceanbase
{
@ -283,6 +284,7 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, ObTableLoadSqlS
LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this));
} else {
LOG_INFO("store commit");
ObTableLoadDmlStat dml_stats;
obsys::ObWLockGuard guard(store_ctx_->get_status_lock());
if (OB_FAIL(store_ctx_->check_status_unlock(ObTableLoadStatusType::MERGED))) {
LOG_WARN("fail to check store status", KR(ret));
@ -293,6 +295,10 @@ int ObTableLoadStore::commit(ObTableLoadResultInfo &result_info, ObTableLoadSqlS
} else if (param_.online_opt_stat_gather_ &&
OB_FAIL(store_ctx_->merger_->collect_sql_statistics(sql_statistics))) {
LOG_WARN("fail to collect sql stats", KR(ret));
} else if (OB_FAIL(store_ctx_->merger_->collect_dml_stat(dml_stats))) {
LOG_WARN("fail to build dml stat", KR(ret));
} else if (OB_FAIL(ObOptStatMonitorManager::get_instance().update_dml_stat_info_from_direct_load(dml_stats.dml_stat_array_))) {
LOG_WARN("fail to update dml stat info", KR(ret));
} else if (OB_FAIL(store_ctx_->set_status_commit_unlock())) {
LOG_WARN("fail to set store status commit", KR(ret));
} else {

View File

@ -9,6 +9,7 @@
#include "share/table/ob_table_load_array.h"
#include "share/table/ob_table_load_define.h"
#include "share/table/ob_table_load_row_array.h"
#include "share/table/ob_table_load_sql_statistics.h"
namespace oceanbase
{

View File

@ -250,6 +250,7 @@ ob_set_subtarget(ob_share common_mixed
detect/ob_detect_rpc_proxy.cpp
detect/ob_detect_rpc_processor.cpp
detect/ob_detect_manager_utils.cpp
table/ob_table_load_sql_statistics.cpp
)
ob_set_subtarget(ob_share tablet

View File

@ -495,11 +495,9 @@ int ObOptStatMonitorManager::update_tenant_dml_stat_info(uint64_t tenant_id)
ObSqlString value_sql;
int count = 0;
for (auto iter = dml_stat_map->begin(); OB_SUCC(ret) && iter != dml_stat_map->end(); ++iter) {
if (OB_FAIL(get_dml_stat_sql(tenant_id,
iter->first,
iter->second,
0 != count, // need_add_comma
value_sql))) {
if (OB_FAIL(get_dml_stat_sql(tenant_id, iter->second,
0 != count, // need_add_comma
value_sql))) {
LOG_WARN("failed to get dml stat sql", K(ret));
} else if (UPDATE_OPT_STAT_BATCH_CNT == ++count) {
if (OB_FAIL(exec_insert_monitor_modified_sql(tenant_id, value_sql))) {
@ -917,24 +915,25 @@ int ObOptStatMonitorManager::exec_insert_monitor_modified_sql(uint64_t tenant_id
LOG_WARN("failed to append string", K(ret));
} else if (OB_FAIL(mysql_proxy_->write(tenant_id, insert_sql.ptr(), affected_rows))) {
LOG_WARN("fail to exec sql", K(insert_sql), K(ret));
} else {
LOG_TRACE("succeed to exec insert monitor modified sql", K(tenant_id), K(values_sql));
}
return ret;
}
int ObOptStatMonitorManager::get_dml_stat_sql(const uint64_t tenant_id,
const StatKey &dml_stat_key,
const ObOptDmlStat &dml_stat,
const bool need_add_comma,
ObSqlString &sql_string)
{
int ret = OB_SUCCESS;
share::ObDMLSqlSplicer dml_splicer;
uint64_t table_id = dml_stat_key.first;
uint64_t table_id = dml_stat.table_id_;
uint64_t ext_tenant_id = share::schema::ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id);
uint64_t pure_table_id = share::schema::ObSchemaUtils::get_extract_schema_id(tenant_id, table_id);
if (OB_FAIL(dml_splicer.add_pk_column("tenant_id", ext_tenant_id)) ||
OB_FAIL(dml_splicer.add_pk_column("table_id", pure_table_id)) ||
OB_FAIL(dml_splicer.add_pk_column("tablet_id", dml_stat_key.second)) ||
OB_FAIL(dml_splicer.add_pk_column("tablet_id", dml_stat.tablet_id_)) ||
OB_FAIL(dml_splicer.add_column("inserts", dml_stat.insert_row_count_)) ||
OB_FAIL(dml_splicer.add_column("updates", dml_stat.update_row_count_)) ||
OB_FAIL(dml_splicer.add_column("deletes", dml_stat.delete_row_count_))) {
@ -998,6 +997,38 @@ int ObOptStatMonitorManager::clean_useless_dml_stat_info(uint64_t tenant_id)
return ret;
}
int ObOptStatMonitorManager::update_dml_stat_info_from_direct_load(const ObIArray<ObOptDmlStat *> &dml_stats)
{
int ret = OB_SUCCESS;
ObSqlString value_sql;
int count = 0;
uint64_t tenant_id = 0;
LOG_TRACE("begin to update dml stat info from direct load", K(dml_stats));
for (int64_t i = 0; OB_SUCC(ret) && i < dml_stats.count(); ++i) {
if (OB_ISNULL(dml_stats.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpcted error", K(ret), K(dml_stats.at(i)));
} else {
tenant_id = dml_stats.at(i)->tenant_id_;
if (OB_FAIL(get_dml_stat_sql(tenant_id, *dml_stats.at(i), 0 != count, value_sql))) {
LOG_WARN("failed to get dml stat sql", K(ret));
} else if (UPDATE_OPT_STAT_BATCH_CNT == ++count) {
if (OB_FAIL(exec_insert_monitor_modified_sql(tenant_id, value_sql))) {
LOG_WARN("failed to exec insert sql", K(ret));
} else {
count = 0;
value_sql.reset();
}
}
}
}
if (OB_SUCC(ret) && count != 0) {
if (OB_FAIL(exec_insert_monitor_modified_sql(tenant_id, value_sql))) {
LOG_WARN("failed to exec insert sql", K(ret));
}
}
return ret;
}
}
}

View File

@ -168,7 +168,6 @@ public:
const bool need_add_comma,
ObSqlString &sql_string);
int get_dml_stat_sql(const uint64_t tenant_id,
const StatKey &dml_stat_key,
const ObOptDmlStat &dml_stat,
const bool need_add_comma,
ObSqlString &sql_string);
@ -190,6 +189,8 @@ public:
int clean_useless_dml_stat_info(uint64_t tenant_id);
int update_dml_stat_info_from_direct_load(const ObIArray<ObOptDmlStat *> &dml_stats);
private:
DISALLOW_COPY_AND_ASSIGN(ObOptStatMonitorManager);
const static int64_t UPDATE_OPT_STAT_BATCH_CNT = 200;

View File

@ -41,101 +41,5 @@ OB_SERIALIZE_MEMBER_SIMPLE(ObTableLoadResultInfo,
skipped_,
warnings_);
OB_DEF_SERIALIZE(ObTableLoadSqlStatistics)
{
int ret = OB_SUCCESS;
OB_UNIS_ENCODE(table_stat_array_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < table_stat_array_.count(); i++) {
if (table_stat_array_.at(i) != nullptr) {
OB_UNIS_ENCODE(*table_stat_array_.at(i));
}
}
OB_UNIS_ENCODE(col_stat_array_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); i++) {
if (col_stat_array_.at(i) != nullptr) {
OB_UNIS_ENCODE(*col_stat_array_.at(i));
}
}
return ret;
}
OB_DEF_DESERIALIZE(ObTableLoadSqlStatistics)
{
int ret = OB_SUCCESS;
reset();
int64_t size = 0;
OB_UNIS_DECODE(size)
for (int64_t i = 0; OB_SUCC(ret) && i < size; ++i) {
ObOptTableStat table_stat;
ObOptTableStat *copied_table_stat = nullptr;
if (OB_FAIL(table_stat.deserialize(buf, data_len, pos))) {
LOG_WARN("deserialize datum store failed", K(ret), K(i));
} else {
int64_t size = table_stat.size();
char *new_buf = nullptr;
if (OB_ISNULL(new_buf = static_cast<char *>(allocator_.alloc(size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to allocate buffer", KR(ret), K(size));
} else if (OB_FAIL(table_stat.deep_copy(new_buf, size, copied_table_stat))) {
OB_LOG(WARN, "fail to copy table stat", KR(ret));
} else if (OB_FAIL(table_stat_array_.push_back(copied_table_stat))) {
OB_LOG(WARN, "fail to add table stat", KR(ret));
}
if (OB_FAIL(ret)) {
if (copied_table_stat != nullptr) {
copied_table_stat->~ObOptTableStat();
copied_table_stat = nullptr;
}
if(new_buf != nullptr) {
allocator_.free(new_buf);
new_buf = nullptr;
}
}
}
}
size = 0;
OB_UNIS_DECODE(size)
for (int64_t i = 0; OB_SUCC(ret) && i < size; ++i) {
ObOptOSGColumnStat *osg_col_stat = NULL;
if (OB_ISNULL(osg_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_)) ||
OB_ISNULL(osg_col_stat->col_stat_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "failed to create col stat");
} else if (OB_FAIL(osg_col_stat->deserialize(buf, data_len, pos))) {
OB_LOG(WARN, "deserialize datum store failed", K(ret), K(i));
} else if (OB_FAIL(osg_col_stat->deep_copy(*osg_col_stat))) {
OB_LOG(WARN, "fail to deep copy", K(ret));
} else if (OB_FAIL(col_stat_array_.push_back(osg_col_stat))) {
OB_LOG(WARN, "fail to add table stat", KR(ret));
}
if (OB_FAIL(ret)) {
if (osg_col_stat != nullptr) {
osg_col_stat->~ObOptOSGColumnStat();
osg_col_stat = nullptr;
}
}
}
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObTableLoadSqlStatistics)
{
int ret = OB_SUCCESS;
int64_t len = 0;
OB_UNIS_ADD_LEN(table_stat_array_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < table_stat_array_.count(); i++) {
if (table_stat_array_.at(i) != nullptr) {
OB_UNIS_ADD_LEN(*table_stat_array_.at(i));
}
}
OB_UNIS_ADD_LEN(col_stat_array_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); i++) {
if (col_stat_array_.at(i) != nullptr) {
OB_UNIS_ADD_LEN(*col_stat_array_.at(i));
}
}
return len;
}
} // namespace table
} // namespace oceanbase

View File

@ -11,9 +11,6 @@
#include "lib/oblog/ob_log_module.h"
#include "lib/utility/ob_print_utils.h"
#include "share/rc/ob_tenant_base.h"
#include "share/stat/ob_opt_table_stat.h"
#include "share/stat/ob_opt_column_stat.h"
#include "share/stat/ob_opt_osg_column_stat.h"
namespace oceanbase
{
@ -409,165 +406,5 @@ public:
uint64_t warnings_ CACHE_ALIGNED;
};
struct ObTableLoadSqlStatistics
{
OB_UNIS_VERSION(1);
public:
ObTableLoadSqlStatistics() : allocator_("TLD_Opstat") { allocator_.set_tenant_id(MTL_ID()); }
~ObTableLoadSqlStatistics() { reset();}
void reset() {
for (int64_t i = 0; i < col_stat_array_.count(); ++i) {
ObOptOSGColumnStat *col_stat = col_stat_array_.at(i);
if (col_stat != nullptr) {
col_stat->~ObOptOSGColumnStat();
}
}
col_stat_array_.reset();
for (int64_t i = 0; i < table_stat_array_.count(); ++i) {
ObOptTableStat *table_stat = table_stat_array_.at(i);
if (table_stat != nullptr) {
table_stat->~ObOptTableStat();
}
}
table_stat_array_.reset();
allocator_.reset();
};
bool is_empty() const
{
return table_stat_array_.count() == 0 || col_stat_array_.count() == 0;
}
int allocate_table_stat(ObOptTableStat *&table_stat)
{
int ret = OB_SUCCESS;
ObOptTableStat *new_table_stat = OB_NEWx(ObOptTableStat, (&allocator_));
if (OB_ISNULL(new_table_stat)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to allocate buffer", KR(ret));
} else if (OB_FAIL(table_stat_array_.push_back(new_table_stat))) {
OB_LOG(WARN, "fail to push back", KR(ret));
} else {
table_stat = new_table_stat;
}
if (OB_FAIL(ret)) {
if (new_table_stat != nullptr) {
new_table_stat->~ObOptTableStat();
allocator_.free(new_table_stat);
new_table_stat = nullptr;
}
}
return ret;
}
int allocate_col_stat(ObOptOSGColumnStat *&col_stat)
{
int ret = OB_SUCCESS;
ObOptOSGColumnStat *new_osg_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_);
if (OB_ISNULL(new_osg_col_stat) || OB_ISNULL(new_osg_col_stat->col_stat_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to allocate buffer", KR(ret));
} else if (OB_FAIL(col_stat_array_.push_back(new_osg_col_stat))) {
OB_LOG(WARN, "fail to push back", KR(ret));
} else {
col_stat = new_osg_col_stat;
}
if (OB_FAIL(ret)) {
if (new_osg_col_stat != nullptr) {
new_osg_col_stat->~ObOptOSGColumnStat();
allocator_.free(new_osg_col_stat);
new_osg_col_stat = nullptr;
}
}
return ret;
}
int add(const ObTableLoadSqlStatistics& other)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret)&& i < other.table_stat_array_.count(); ++i) {
ObOptTableStat *table_stat = other.table_stat_array_.at(i);
if (table_stat != nullptr) {
ObOptTableStat *copied_table_stat = nullptr;
int64_t size = table_stat->size();
char *new_buf = nullptr;
if (OB_ISNULL(new_buf = static_cast<char *>(allocator_.alloc(size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to allocate buffer", KR(ret), K(size));
} else if (OB_FAIL(table_stat->deep_copy(new_buf, size, copied_table_stat))) {
OB_LOG(WARN, "fail to copy table stat", KR(ret));
} else if (OB_FAIL(table_stat_array_.push_back(copied_table_stat))) {
OB_LOG(WARN, "fail to add table stat", KR(ret));
}
if (OB_FAIL(ret)) {
if (copied_table_stat != nullptr) {
copied_table_stat->~ObOptTableStat();
copied_table_stat = nullptr;
}
if(new_buf != nullptr) {
allocator_.free(new_buf);
new_buf = nullptr;
}
}
}
}
for (int64_t i = 0; OB_SUCC(ret)&& i < other.col_stat_array_.count(); ++i) {
ObOptOSGColumnStat *col_stat = other.col_stat_array_.at(i);
ObOptOSGColumnStat *copied_col_stat = nullptr;
if (OB_ISNULL(col_stat)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "get unexpected null");
} else if (OB_ISNULL(copied_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "failed to create new col stat");
} else if (OB_FAIL(copied_col_stat->deep_copy(*col_stat))) {
OB_LOG(WARN, "fail to copy col stat", KR(ret));
} else if (OB_FAIL(col_stat_array_.push_back(copied_col_stat))) {
OB_LOG(WARN, "fail to add col stat", KR(ret));
}
if (OB_FAIL(ret)) {
if (copied_col_stat != nullptr) {
copied_col_stat->~ObOptOSGColumnStat();
copied_col_stat = nullptr;
}
}
}
return ret;
}
int get_col_stat_array(ObIArray<ObOptColumnStat*> &col_stat_array)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); ++i) {
if (OB_ISNULL(col_stat_array_.at(i))) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "get unexpected null");
} else if (OB_FAIL(col_stat_array_.at(i)->set_min_max_datum_to_obj())) {
OB_LOG(WARN, "failed to persistence min max");
} else if (OB_FAIL(col_stat_array.push_back(col_stat_array_.at(i)->col_stat_))) {
OB_LOG(WARN, "failed to push back col stat");
}
}
return ret;
}
int persistence_col_stats()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); ++i) {
if (OB_ISNULL(col_stat_array_.at(i))) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "get unexpected null");
} else if (OB_FAIL(col_stat_array_.at(i)->set_min_max_datum_to_obj())) {
OB_LOG(WARN, "failed to persistence min max");
}
}
return ret;
}
TO_STRING_KV(K_(col_stat_array), K_(table_stat_array));
public:
common::ObSEArray<ObOptTableStat *, 64> table_stat_array_;
common::ObSEArray<ObOptOSGColumnStat *, 64> col_stat_array_;
common::ObArenaAllocator allocator_;
};
} // namespace table
} // namespace oceanbase

View File

@ -0,0 +1,70 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#pragma once
#include "share/stat/ob_stat_define.h"
namespace oceanbase
{
namespace common
{
class ObOptDmlStat;
} // namespace common
namespace table
{
struct ObTableLoadDmlStat
{
public:
ObTableLoadDmlStat() : allocator_("TLD_Dmlstat") { allocator_.set_tenant_id(MTL_ID()); }
~ObTableLoadDmlStat() { reset(); }
void reset()
{
for (int64_t i = 0; i < dml_stat_array_.count(); ++i) {
ObOptDmlStat *col_stat = dml_stat_array_.at(i);
if (col_stat != nullptr) {
col_stat->~ObOptDmlStat();
}
}
dml_stat_array_.reset();
allocator_.reset();
}
bool is_empty() const { return dml_stat_array_.count() == 0; }
int allocate_dml_stat(ObOptDmlStat *&dml_stat)
{
int ret = OB_SUCCESS;
ObOptDmlStat *new_dml_stat = OB_NEWx(ObOptDmlStat, (&allocator_));
if (OB_ISNULL(new_dml_stat)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to allocate buffer", KR(ret));
} else if (OB_FAIL(dml_stat_array_.push_back(new_dml_stat))) {
OB_LOG(WARN, "fail to push back", KR(ret));
} else {
dml_stat = new_dml_stat;
}
if (OB_FAIL(ret)) {
if (new_dml_stat != nullptr) {
new_dml_stat->~ObOptDmlStat();
allocator_.free(new_dml_stat);
new_dml_stat = nullptr;
}
}
return ret;
}
TO_STRING_KV(K_(dml_stat_array));
public:
common::ObSEArray<ObOptDmlStat *, 64, common::ModulePageAllocator, true> dml_stat_array_;
common::ObArenaAllocator allocator_;
};
} // namespace table
} // namespace oceanbase

View File

@ -8,6 +8,7 @@
#include "lib/net/ob_addr.h"
#include "ob_table_load_array.h"
#include "ob_table_load_define.h"
#include "share/table/ob_table_load_sql_statistics.h"
#include "share/table/ob_table_load_row_array.h"
#include "sql/resolver/cmd/ob_load_data_stmt.h"
#include "sql/session/ob_sql_session_info.h"

View File

@ -0,0 +1,263 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX CLIENT
#include "ob_table_load_sql_statistics.h"
namespace oceanbase
{
namespace table
{
void ObTableLoadSqlStatistics::reset()
{
for (int64_t i = 0; i < col_stat_array_.count(); ++i) {
ObOptOSGColumnStat *col_stat = col_stat_array_.at(i);
if (col_stat != nullptr) {
col_stat->~ObOptOSGColumnStat();
}
}
col_stat_array_.reset();
for (int64_t i = 0; i < table_stat_array_.count(); ++i) {
ObOptTableStat *table_stat = table_stat_array_.at(i);
if (table_stat != nullptr) {
table_stat->~ObOptTableStat();
}
}
table_stat_array_.reset();
allocator_.reset();
}
int ObTableLoadSqlStatistics::allocate_table_stat(ObOptTableStat *&table_stat)
{
int ret = OB_SUCCESS;
ObOptTableStat *new_table_stat = OB_NEWx(ObOptTableStat, (&allocator_));
if (OB_ISNULL(new_table_stat)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to allocate buffer", KR(ret));
} else if (OB_FAIL(table_stat_array_.push_back(new_table_stat))) {
OB_LOG(WARN, "fail to push back", KR(ret));
} else {
table_stat = new_table_stat;
}
if (OB_FAIL(ret)) {
if (new_table_stat != nullptr) {
new_table_stat->~ObOptTableStat();
allocator_.free(new_table_stat);
new_table_stat = nullptr;
}
}
return ret;
}
int ObTableLoadSqlStatistics::allocate_col_stat(ObOptOSGColumnStat *&col_stat)
{
int ret = OB_SUCCESS;
ObOptOSGColumnStat *new_osg_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_);
if (OB_ISNULL(new_osg_col_stat) || OB_ISNULL(new_osg_col_stat->col_stat_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to allocate buffer", KR(ret));
} else if (OB_FAIL(col_stat_array_.push_back(new_osg_col_stat))) {
OB_LOG(WARN, "fail to push back", KR(ret));
} else {
col_stat = new_osg_col_stat;
}
if (OB_FAIL(ret)) {
if (new_osg_col_stat != nullptr) {
new_osg_col_stat->~ObOptOSGColumnStat();
allocator_.free(new_osg_col_stat);
new_osg_col_stat = nullptr;
}
}
return ret;
}
int ObTableLoadSqlStatistics::add(const ObTableLoadSqlStatistics& other)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret)&& i < other.table_stat_array_.count(); ++i) {
ObOptTableStat *table_stat = other.table_stat_array_.at(i);
if (table_stat != nullptr) {
ObOptTableStat *copied_table_stat = nullptr;
int64_t size = table_stat->size();
char *new_buf = nullptr;
if (OB_ISNULL(new_buf = static_cast<char *>(allocator_.alloc(size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to allocate buffer", KR(ret), K(size));
} else if (OB_FAIL(table_stat->deep_copy(new_buf, size, copied_table_stat))) {
OB_LOG(WARN, "fail to copy table stat", KR(ret));
} else if (OB_FAIL(table_stat_array_.push_back(copied_table_stat))) {
OB_LOG(WARN, "fail to add table stat", KR(ret));
}
if (OB_FAIL(ret)) {
if (copied_table_stat != nullptr) {
copied_table_stat->~ObOptTableStat();
copied_table_stat = nullptr;
}
if(new_buf != nullptr) {
allocator_.free(new_buf);
new_buf = nullptr;
}
}
}
}
for (int64_t i = 0; OB_SUCC(ret)&& i < other.col_stat_array_.count(); ++i) {
ObOptOSGColumnStat *col_stat = other.col_stat_array_.at(i);
ObOptOSGColumnStat *copied_col_stat = nullptr;
if (OB_ISNULL(col_stat)) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "get unexpected null");
} else if (OB_ISNULL(copied_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "failed to create new col stat");
} else if (OB_FAIL(copied_col_stat->deep_copy(*col_stat))) {
OB_LOG(WARN, "fail to copy col stat", KR(ret));
} else if (OB_FAIL(col_stat_array_.push_back(copied_col_stat))) {
OB_LOG(WARN, "fail to add col stat", KR(ret));
}
if (OB_FAIL(ret)) {
if (copied_col_stat != nullptr) {
copied_col_stat->~ObOptOSGColumnStat();
copied_col_stat = nullptr;
}
}
}
return ret;
}
int ObTableLoadSqlStatistics::get_col_stat_array(ObIArray<ObOptColumnStat*> &col_stat_array)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); ++i) {
if (OB_ISNULL(col_stat_array_.at(i))) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "get unexpected null");
} else if (OB_FAIL(col_stat_array_.at(i)->set_min_max_datum_to_obj())) {
OB_LOG(WARN, "failed to persistence min max");
} else if (OB_FAIL(col_stat_array.push_back(col_stat_array_.at(i)->col_stat_))) {
OB_LOG(WARN, "failed to push back col stat");
}
}
return ret;
}
int ObTableLoadSqlStatistics::persistence_col_stats()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); ++i) {
if (OB_ISNULL(col_stat_array_.at(i))) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "get unexpected null");
} else if (OB_FAIL(col_stat_array_.at(i)->set_min_max_datum_to_obj())) {
OB_LOG(WARN, "failed to persistence min max");
}
}
return ret;
}
OB_DEF_SERIALIZE(ObTableLoadSqlStatistics)
{
int ret = OB_SUCCESS;
OB_UNIS_ENCODE(table_stat_array_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < table_stat_array_.count(); i++) {
if (table_stat_array_.at(i) != nullptr) {
OB_UNIS_ENCODE(*table_stat_array_.at(i));
}
}
OB_UNIS_ENCODE(col_stat_array_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); i++) {
if (col_stat_array_.at(i) != nullptr) {
OB_UNIS_ENCODE(*col_stat_array_.at(i));
}
}
return ret;
}
OB_DEF_DESERIALIZE(ObTableLoadSqlStatistics)
{
int ret = OB_SUCCESS;
reset();
int64_t size = 0;
OB_UNIS_DECODE(size)
for (int64_t i = 0; OB_SUCC(ret) && i < size; ++i) {
ObOptTableStat table_stat;
ObOptTableStat *copied_table_stat = nullptr;
if (OB_FAIL(table_stat.deserialize(buf, data_len, pos))) {
LOG_WARN("deserialize datum store failed", K(ret), K(i));
} else {
int64_t size = table_stat.size();
char *new_buf = nullptr;
if (OB_ISNULL(new_buf = static_cast<char *>(allocator_.alloc(size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "fail to allocate buffer", KR(ret), K(size));
} else if (OB_FAIL(table_stat.deep_copy(new_buf, size, copied_table_stat))) {
OB_LOG(WARN, "fail to copy table stat", KR(ret));
} else if (OB_FAIL(table_stat_array_.push_back(copied_table_stat))) {
OB_LOG(WARN, "fail to add table stat", KR(ret));
}
if (OB_FAIL(ret)) {
if (copied_table_stat != nullptr) {
copied_table_stat->~ObOptTableStat();
copied_table_stat = nullptr;
}
if(new_buf != nullptr) {
allocator_.free(new_buf);
new_buf = nullptr;
}
}
}
}
size = 0;
OB_UNIS_DECODE(size)
for (int64_t i = 0; OB_SUCC(ret) && i < size; ++i) {
ObOptOSGColumnStat *osg_col_stat = NULL;
if (OB_ISNULL(osg_col_stat = ObOptOSGColumnStat::create_new_osg_col_stat(allocator_)) ||
OB_ISNULL(osg_col_stat->col_stat_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(WARN, "failed to create col stat");
} else if (OB_FAIL(osg_col_stat->deserialize(buf, data_len, pos))) {
OB_LOG(WARN, "deserialize datum store failed", K(ret), K(i));
} else if (OB_FAIL(col_stat_array_.push_back(osg_col_stat))) {
OB_LOG(WARN, "fail to add table stat", KR(ret));
}
if (OB_FAIL(ret)) {
if (osg_col_stat != nullptr) {
osg_col_stat->~ObOptOSGColumnStat();
osg_col_stat = nullptr;
}
}
}
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObTableLoadSqlStatistics)
{
int ret = OB_SUCCESS;
int64_t len = 0;
OB_UNIS_ADD_LEN(table_stat_array_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < table_stat_array_.count(); i++) {
if (table_stat_array_.at(i) != nullptr) {
OB_UNIS_ADD_LEN(*table_stat_array_.at(i));
}
}
OB_UNIS_ADD_LEN(col_stat_array_.count());
for (int64_t i = 0; OB_SUCC(ret) && i < col_stat_array_.count(); i++) {
if (col_stat_array_.at(i) != nullptr) {
OB_UNIS_ADD_LEN(*col_stat_array_.at(i));
}
}
return len;
}
} // namespace table
} // namespace oceanbase

View File

@ -0,0 +1,48 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#pragma once
#include "share/stat/ob_opt_table_stat.h"
#include "share/stat/ob_opt_column_stat.h"
#include "share/stat/ob_opt_osg_column_stat.h"
namespace oceanbase
{
namespace table
{
struct ObTableLoadSqlStatistics
{
OB_UNIS_VERSION(1);
public:
ObTableLoadSqlStatistics() : allocator_("TLD_Opstat") { allocator_.set_tenant_id(MTL_ID()); }
~ObTableLoadSqlStatistics() { reset(); }
void reset();
bool is_empty() const
{
return table_stat_array_.count() == 0 || col_stat_array_.count() == 0;
}
int allocate_table_stat(ObOptTableStat *&table_stat);
int allocate_col_stat(ObOptOSGColumnStat *&col_stat);
int add(const ObTableLoadSqlStatistics& other);
int get_col_stat_array(ObIArray<ObOptColumnStat*> &col_stat_array);
int persistence_col_stats();
TO_STRING_KV(K_(col_stat_array), K_(table_stat_array));
public:
common::ObSEArray<ObOptTableStat *, 64, common::ModulePageAllocator, true> table_stat_array_;
common::ObSEArray<ObOptOSGColumnStat *, 64, common::ModulePageAllocator, true> col_stat_array_;
common::ObArenaAllocator allocator_;
};
} // namespace table
} // namespace oceanbase

View File

@ -211,7 +211,6 @@ int ObDirectLoadTabletMergeCtx::collect_sql_statistics(
int64_t table_avg_len = 0;
int64_t col_cnt = param_.table_data_desc_.column_count_;
ObOptTableStat *table_stat = nullptr;
ObOptDmlStat dml_stat;
StatLevel stat_level;
if (table_schema->get_part_level() == PARTITION_LEVEL_ZERO) {
stat_level = TABLE_LEVEL;
@ -287,13 +286,6 @@ int ObDirectLoadTabletMergeCtx::collect_sql_statistics(
table_stat->set_object_type(stat_level);
table_stat->set_row_count(table_row_cnt);
table_stat->set_avg_row_size(table_avg_len);
dml_stat.tenant_id_ = tenant_id;
dml_stat.table_id_ = param_.target_table_id_;
dml_stat.tablet_id_ = get_target_tablet_id().id();
dml_stat.insert_row_count_ = table_row_cnt;
if (OB_FAIL(ObOptStatMonitorManager::get_instance().update_local_cache(tenant_id, dml_stat))) {
LOG_WARN("failed to update dml stat local cache", K(ret));
}
}
// persistence col stat once a merge task finished
if (OB_SUCC(ret) && OB_FAIL(sql_statistics.persistence_col_stats())) {
@ -304,6 +296,32 @@ int ObDirectLoadTabletMergeCtx::collect_sql_statistics(
return ret;
}
int ObDirectLoadTabletMergeCtx::collect_dml_stat(const common::ObIArray<ObDirectLoadFastHeapTable *> &fast_heap_table_array, ObTableLoadDmlStat &dml_stats)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
int64_t insert_row_cnt = 0;
ObOptDmlStat *dml_stat = nullptr;
if (OB_FAIL(dml_stats.allocate_dml_stat(dml_stat))) {
LOG_WARN("fail to allocate table stat", KR(ret));
} else {
// scan task_array
for (int64_t i = 0; OB_SUCC(ret) && i < task_array_.count(); ++i) {
insert_row_cnt += task_array_.at(i)->get_row_count();
}
// scan fast heap table
for (int64_t i = 0; OB_SUCC(ret) && i < fast_heap_table_array.count(); ++i) {
insert_row_cnt += fast_heap_table_array.at(i)->get_row_count();
}
dml_stat->tenant_id_ = tenant_id;
dml_stat->table_id_ = param_.target_table_id_;
dml_stat->tablet_id_ = target_tablet_id_.id();
dml_stat->insert_row_count_ = insert_row_cnt;
}
return ret;
}
int ObDirectLoadTabletMergeCtx::init_sstable_array(
const ObIArray<ObIDirectLoadPartitionTable *> &table_array)
{

View File

@ -12,6 +12,8 @@
#include "share/stat/ob_opt_osg_column_stat.h"
#include "share/stat/ob_opt_table_stat.h"
#include "share/table/ob_table_load_define.h"
#include "share/table/ob_table_load_dml_stat.h"
#include "share/table/ob_table_load_sql_statistics.h"
#include "storage/direct_load/ob_direct_load_origin_table.h"
#include "storage/direct_load/ob_direct_load_table_data_desc.h"
#include "storage/direct_load/ob_direct_load_fast_heap_table.h"
@ -101,6 +103,8 @@ public:
int inc_finish_count(bool &is_ready);
int collect_sql_statistics(
const common::ObIArray<ObDirectLoadFastHeapTable *> &fast_heap_table_array, table::ObTableLoadSqlStatistics &sql_statistics);
int collect_dml_stat(const common::ObIArray<ObDirectLoadFastHeapTable *> &fast_heap_table_array,
table::ObTableLoadDmlStat &dml_stats);
const ObDirectLoadMergeParam &get_param() const { return param_; }
const common::ObTabletID &get_tablet_id() const { return tablet_id_; }
const common::ObTabletID &get_target_tablet_id() const { return target_tablet_id_; }

View File

@ -663,7 +663,7 @@ int ObAccessService::insert_rows(
column_ids,
row_iter,
affected_rows);
if (OB_SUCC(ret)) {
if (OB_SUCC(ret) && !dml_param.is_direct_insert()) {
int tmp_ret = audit_tablet_opt_dml_stat(dml_param,
tablet_id,
ObOptDmlStatType::TABLET_OPT_INSERT_STAT,