fix upgrade error && open adaptive_merge_schedule
This commit is contained in:
parent
fade38c7d2
commit
d2ef3db981
@ -232,18 +232,6 @@ int ObAllVirtualTableMgr::process_curr_tenant(common::ObNewRow *&row)
|
||||
case TABLE_TYPE:
|
||||
cur_row_.cells_[i].set_int(table_key.table_type_);
|
||||
break;
|
||||
case DATA_CHECKSUM: {
|
||||
int64_t data_checksum = 0;
|
||||
if (table->is_memtable()) {
|
||||
// memtable has no data checksum, do nothing
|
||||
} else if (table->is_co_sstable()) {
|
||||
data_checksum = static_cast<storage::ObCOSSTableV2 *>(table)->get_cs_meta().data_checksum_;
|
||||
} else if (table->is_sstable()) {
|
||||
data_checksum = static_cast<blocksstable::ObSSTable *>(table)->get_data_checksum();
|
||||
}
|
||||
cur_row_.cells_[i].set_int(data_checksum);
|
||||
break;
|
||||
}
|
||||
case SIZE: {
|
||||
int64_t size = 0;
|
||||
if (table->is_memtable()) {
|
||||
@ -314,6 +302,18 @@ int ObAllVirtualTableMgr::process_curr_tenant(common::ObNewRow *&row)
|
||||
case CG_IDX:
|
||||
cur_row_.cells_[i].set_int(table_key.get_column_group_id());
|
||||
break;
|
||||
case DATA_CHECKSUM: {
|
||||
int64_t data_checksum = 0;
|
||||
if (table->is_memtable()) {
|
||||
// memtable has no data checksum, do nothing
|
||||
} else if (table->is_co_sstable()) {
|
||||
data_checksum = static_cast<storage::ObCOSSTableV2 *>(table)->get_cs_meta().data_checksum_;
|
||||
} else if (table->is_sstable()) {
|
||||
data_checksum = static_cast<blocksstable::ObSSTable *>(table)->get_data_checksum();
|
||||
}
|
||||
cur_row_.cells_[i].set_int(data_checksum);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
SERVER_LOG(WARN, "invalid col_id", K(ret), K(col_id));
|
||||
|
@ -46,7 +46,6 @@ class ObAllVirtualTableMgr : public common::ObVirtualTableScannerIterator,
|
||||
START_LOG_SCN,
|
||||
END_LOG_SCN,
|
||||
UPPER_TRANS_VERSION,
|
||||
DATA_CHECKSUM,
|
||||
SIZE,
|
||||
DATA_BLOCK_CNT,
|
||||
INDEX_BLOCK_CNT,
|
||||
@ -56,7 +55,8 @@ class ObAllVirtualTableMgr : public common::ObVirtualTableScannerIterator,
|
||||
CONTAIN_UNCOMMITTED_ROW,
|
||||
NESTED_OFFSET,
|
||||
NESTED_SIZE,
|
||||
CG_IDX
|
||||
CG_IDX,
|
||||
DATA_CHECKSUM
|
||||
};
|
||||
public:
|
||||
ObAllVirtualTableMgr();
|
||||
|
@ -180,9 +180,19 @@ int ObAllVirtualTxDataTable::get_next_tx_data_table_(ObITable *&tx_data_table)
|
||||
} else if (FALSE_IT(tablet = tablet_handle_.get_obj())) {
|
||||
} else if (OB_FAIL(tablet->fetch_table_store(table_store_wrapper_))) {
|
||||
SERVER_LOG(WARN, "fail to fetch table store", K(ret));
|
||||
} else if (OB_FAIL(table_store_wrapper_.get_member()->get_minor_sstables().get_all_tables(sstable_handles_))) {
|
||||
SERVER_LOG(WARN, "fail to get sstable handles", KR(ret));
|
||||
} else {
|
||||
const ObSSTableArray &minor_tables = table_store_wrapper_.get_member()->get_minor_sstables();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < minor_tables.count(); ++i) {
|
||||
if (OB_ISNULL(minor_tables[i])) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
SERVER_LOG(WARN, "get unexpected null sstable", KR(ret));
|
||||
} else if (OB_FAIL(sstable_handles_.push_back(minor_tables[i]))) {
|
||||
SERVER_LOG(WARN, "fail to add sstable", KR(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
// iterate from the newest memtable in memtable handles
|
||||
memtable_array_pos_ = memtable_handles_.count() - 1;
|
||||
// iterate from the newest sstable in sstable handles
|
||||
|
@ -2813,21 +2813,6 @@ int ObInnerTableSchema::all_virtual_table_mgr_schema(ObTableSchema &table_schema
|
||||
false); //is_autoincrement
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ADD_COLUMN_SCHEMA("data_checksum", //column_name
|
||||
++column_id, //column_id
|
||||
0, //rowkey_id
|
||||
0, //index_id
|
||||
0, //part_key_pos
|
||||
ObIntType, //column_type
|
||||
CS_TYPE_INVALID, //column_collation_type
|
||||
sizeof(int64_t), //column_length
|
||||
-1, //column_precision
|
||||
-1, //column_scale
|
||||
false, //is_nullable
|
||||
false); //is_autoincrement
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ADD_COLUMN_SCHEMA("size", //column_name
|
||||
++column_id, //column_id
|
||||
@ -2977,6 +2962,21 @@ int ObInnerTableSchema::all_virtual_table_mgr_schema(ObTableSchema &table_schema
|
||||
false, //is_nullable
|
||||
false); //is_autoincrement
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ADD_COLUMN_SCHEMA("data_checksum", //column_name
|
||||
++column_id, //column_id
|
||||
0, //rowkey_id
|
||||
0, //index_id
|
||||
0, //part_key_pos
|
||||
ObIntType, //column_type
|
||||
CS_TYPE_INVALID, //column_collation_type
|
||||
sizeof(int64_t), //column_length
|
||||
-1, //column_precision
|
||||
-1, //column_scale
|
||||
false, //is_nullable
|
||||
false); //is_autoincrement
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
table_schema.get_part_option().set_part_num(1);
|
||||
table_schema.set_part_level(PARTITION_LEVEL_ONE);
|
||||
|
@ -3253,21 +3253,6 @@ int ObInnerTableSchema::all_virtual_table_mgr_ora_schema(ObTableSchema &table_sc
|
||||
false); //is_autoincrement
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ADD_COLUMN_SCHEMA("DATA_CHECKSUM", //column_name
|
||||
++column_id, //column_id
|
||||
0, //rowkey_id
|
||||
0, //index_id
|
||||
0, //part_key_pos
|
||||
ObNumberType, //column_type
|
||||
CS_TYPE_INVALID, //column_collation_type
|
||||
38, //column_length
|
||||
38, //column_precision
|
||||
0, //column_scale
|
||||
false, //is_nullable
|
||||
false); //is_autoincrement
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ADD_COLUMN_SCHEMA("SIZE", //column_name
|
||||
++column_id, //column_id
|
||||
@ -3417,6 +3402,21 @@ int ObInnerTableSchema::all_virtual_table_mgr_ora_schema(ObTableSchema &table_sc
|
||||
false, //is_nullable
|
||||
false); //is_autoincrement
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
ADD_COLUMN_SCHEMA("DATA_CHECKSUM", //column_name
|
||||
++column_id, //column_id
|
||||
0, //rowkey_id
|
||||
0, //index_id
|
||||
0, //part_key_pos
|
||||
ObNumberType, //column_type
|
||||
CS_TYPE_INVALID, //column_collation_type
|
||||
38, //column_length
|
||||
38, //column_precision
|
||||
0, //column_scale
|
||||
false, //is_nullable
|
||||
false); //is_autoincrement
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
table_schema.get_part_option().set_part_num(1);
|
||||
table_schema.set_part_level(PARTITION_LEVEL_ONE);
|
||||
|
@ -9930,7 +9930,6 @@ def_table_schema(
|
||||
('start_log_scn', 'uint'),
|
||||
('end_log_scn', 'uint'),
|
||||
('upper_trans_version', 'uint'),
|
||||
('data_checksum', 'int'),
|
||||
('size', 'int'),
|
||||
('data_block_count', 'int'),
|
||||
('index_block_count', 'int'),
|
||||
@ -9941,6 +9940,7 @@ def_table_schema(
|
||||
('nested_offset', 'int'),
|
||||
('nested_size', 'int'),
|
||||
('cg_idx', 'int'),
|
||||
('data_checksum', 'int'),
|
||||
],
|
||||
partition_columns = ['svr_ip', 'svr_port'],
|
||||
vtable_route_policy = 'distributed',
|
||||
|
@ -958,7 +958,7 @@ DEF_BOOL(_ob_enable_fast_freeze, OB_TENANT_PARAMETER, "True",
|
||||
"specifies whether the tenant's fast freeze is enabled"
|
||||
"Value: True:turned on; False: turned off",
|
||||
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_BOOL(_enable_adaptive_merge_schedule, OB_TENANT_PARAMETER, "False",
|
||||
DEF_BOOL(_enable_adaptive_merge_schedule, OB_TENANT_PARAMETER, "True",
|
||||
"specifies whether the tenant's adaptive merge scheduling is enabled"
|
||||
"Value: True:turned on; False: turned off",
|
||||
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
|
@ -2178,6 +2178,7 @@ int ObDagPrioScheduler::do_rank_compaction_dags_(
|
||||
int ret = OB_SUCCESS;
|
||||
rank_dags.reset();
|
||||
compaction::ObCompactionDagRanker ranker;
|
||||
common::ObSEArray<compaction::ObTabletMergeDag *, 32> need_rank_dags;
|
||||
const int64_t cur_time = common::ObTimeUtility::current_time();
|
||||
|
||||
if (OB_UNLIKELY(batch_size <= 0)) {
|
||||
@ -2200,7 +2201,11 @@ int ObDagPrioScheduler::do_rank_compaction_dags_(
|
||||
} else if (ObIDag::DAG_STATUS_READY != dag_status) { // dag status must be INITING or READY
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
COMMON_LOG(WARN, "dag in rank list must be ready", K(ret), K(dag_status), KPC(compaction_dag));
|
||||
} else if (OB_FAIL(rank_dags.push_back(compaction_dag))) {
|
||||
} else if (is_meta_major_merge(compaction_dag->get_param().merge_type_)) {
|
||||
if (OB_FAIL(rank_dags.push_back(compaction_dag))) {
|
||||
COMMON_LOG(WARN, "failed to add meta merge dag", K(ret), KPC(compaction_dag));
|
||||
}
|
||||
} else if (OB_FAIL(need_rank_dags.push_back(compaction_dag))) {
|
||||
COMMON_LOG(WARN, "failed to add compaction dag", K(ret), KPC(compaction_dag));
|
||||
} else {
|
||||
ranker.update(cur_time, compaction_dag->get_param().compaction_param_);
|
||||
@ -2208,16 +2213,22 @@ int ObDagPrioScheduler::do_rank_compaction_dags_(
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
cur = cur->get_next();
|
||||
if (rank_dags.count() >= batch_size) {
|
||||
if (rank_dags.count() + need_rank_dags.count() >= batch_size) {
|
||||
break; // reached max rank count, stop adding ready dags
|
||||
}
|
||||
}
|
||||
} // end while
|
||||
|
||||
if (OB_SUCC(ret) && !rank_dags.empty() && ranker.is_valid()) {
|
||||
if (OB_FAIL(ranker.sort(rank_dags))) {
|
||||
if (OB_SUCC(ret) && !need_rank_dags.empty() && ranker.is_valid()) {
|
||||
if (OB_FAIL(ranker.sort(need_rank_dags))) {
|
||||
COMMON_LOG(WARN, "failed to sort compaction dags, move dags to ready list directly",
|
||||
K(ret), K(rank_dags));
|
||||
K(ret), K(need_rank_dags));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < need_rank_dags.count(); ++i) {
|
||||
if (OB_FAIL(rank_dags.push_back(need_rank_dags.at(i)))) {
|
||||
COMMON_LOG(WARN, "failed to add rank dags", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2303,7 +2314,7 @@ int ObDagPrioScheduler::rank_compaction_dags_()
|
||||
// Time Statistics
|
||||
int64_t cost_time = common::ObTimeUtility::fast_current_time() - cur_time;
|
||||
if (REACH_TENANT_TIME_INTERVAL(DUMP_STATUS_INTERVAL)) {
|
||||
COMMON_LOG(INFO, "[DAG_RERANK] Ranking compaction dags costs: ", K(cost_time), K_(priority), K(ret), K(tmp_ret));
|
||||
COMMON_LOG(INFO, "[ADAPTIVE_SCHED] Ranking compaction dags costs: ", K(cost_time), K_(priority), K(ret), K(tmp_ret));
|
||||
}
|
||||
}
|
||||
|
||||
@ -2336,7 +2347,7 @@ void ObDagPrioScheduler::try_update_adaptive_task_limit_(const int64_t batch_siz
|
||||
const int64_t mem_allow_max_thread = lib::get_tenant_memory_remain(MTL_ID()) * ADAPTIVE_PERCENT / estimate_mem_per_thread;
|
||||
if (mem_allow_max_thread >= adaptive_task_limit_ * 5) {
|
||||
++adaptive_task_limit_;
|
||||
FLOG_INFO("[ADAPTIVE_DAG] increment adaptive task limit", K(priority_), K(adaptive_task_limit_));
|
||||
FLOG_INFO("[ADAPTIVE_SCHED] increment adaptive task limit", K(priority_), K(adaptive_task_limit_));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2350,7 +2361,7 @@ int ObDagPrioScheduler::pop_task_from_ready_list_(ObITask *&task)
|
||||
|
||||
// adaptive compaction scheduling
|
||||
if (is_rank_dag_prio() && OB_TMP_FAIL(rank_compaction_dags_())) {
|
||||
COMMON_LOG(WARN, "[DAG_RERANK] Failed to rank compaction dags", K(tmp_ret), K_(priority));
|
||||
COMMON_LOG(WARN, "[ADAPTIVE_SCHED] Failed to rank compaction dags", K(tmp_ret), K_(priority));
|
||||
}
|
||||
|
||||
if (!dag_list_[READY_DAG_LIST].is_empty()) {
|
||||
@ -3165,7 +3176,7 @@ int ObDagPrioScheduler::deal_with_finish_task(
|
||||
// dag can retry & this task is the last running task
|
||||
if (OB_ALLOCATE_MEMORY_FAILED == error_code && is_mini_compaction_dag(dag->get_type())) {
|
||||
if (static_cast<compaction::ObTabletMergeDag *>(dag)->is_reserve_mode() &&
|
||||
MTL(ObTenantCompactionMemPool *)->is_emergency_mode()) {
|
||||
MTL(ObTenantCompactionMemPool *)->is_emergency_mode() && !MTL_IS_MINI_MODE()) {
|
||||
COMMON_LOG(ERROR, "reserve mode dag failed to alloc mem unexpectly", KPC(dag)); // tmp debug log for reserve mode, remove later.
|
||||
}
|
||||
MTL(ObTenantCompactionMemPool *)->set_memory_mode(ObTenantCompactionMemPool::EMERGENCY_MODE);
|
||||
@ -3348,11 +3359,12 @@ bool ObDagPrioScheduler::check_need_load_shedding_(const bool for_schedule)
|
||||
} else if (load_shedding_factor <= 1 || !is_rank_dag_prio()) {
|
||||
// no need to load shedding
|
||||
} else {
|
||||
const int64_t load_shedding_limit = MAX(2, limits_ / load_shedding_factor);
|
||||
const int64_t load_shedding_limit = MAX(2, adaptive_task_limit_ / load_shedding_factor);
|
||||
if (running_task_cnts_ > load_shedding_limit + extra_limit) {
|
||||
need_shedding = true;
|
||||
if (REACH_TENANT_TIME_INTERVAL(30_s)) {
|
||||
FLOG_INFO("DagScheduler needs to load shedding", K(load_shedding_factor), K(extra_limit), K_(limits));
|
||||
FLOG_INFO("[ADAPTIVE_SCHED] DagScheduler needs to load shedding", K(load_shedding_factor), K(for_schedule),
|
||||
K(extra_limit), K_(adaptive_task_limit), K_(running_task_cnts), K_(priority));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -919,8 +919,7 @@ private:
|
||||
}
|
||||
OB_INLINE bool is_mini_compaction_dag(ObDagType::ObDagTypeEnum dag_type) const
|
||||
{
|
||||
return ObDagType::DAG_TYPE_MINI_MERGE == dag_type ||
|
||||
ObDagType::DAG_TYPE_TX_TABLE_MERGE == dag_type;
|
||||
return ObDagType::DAG_TYPE_MINI_MERGE == dag_type;
|
||||
}
|
||||
OB_INLINE bool is_minor_compaction_dag(ObDagType::ObDagTypeEnum dag_type) const
|
||||
{
|
||||
|
@ -301,8 +301,8 @@ int ObTenantCompactionMemPool::init()
|
||||
chunk_allocator_.set_tenant_id(MTL_ID());
|
||||
piece_allocator_.set_tenant_id(MTL_ID());
|
||||
max_block_num_ = MTL_IS_MINI_MODE()
|
||||
? MAX_MEMORY_LIMIT / ObCompactionBufferChunk::DEFAULT_BLOCK_SIZE
|
||||
: MAX_MEMORY_LIMIT / (ObCompactionBufferChunk::DEFAULT_BLOCK_SIZE * 2);
|
||||
? MINI_MODE_CHUNK_MEMORY_LIMIT / ObCompactionBufferChunk::DEFAULT_BLOCK_SIZE
|
||||
: CHUNK_MEMORY_LIMIT / ObCompactionBufferChunk::DEFAULT_BLOCK_SIZE;
|
||||
total_block_num_ = 0;
|
||||
is_inited_ = true;
|
||||
}
|
||||
@ -493,8 +493,8 @@ int ObTenantCompactionMemPool::try_shrink()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSpinLockGuard guard(chunk_lock_);
|
||||
|
||||
if (max_block_num_ > total_block_num_) {
|
||||
// not reserve mem in mini mode
|
||||
if (!MTL_IS_MINI_MODE() && max_block_num_ > total_block_num_) {
|
||||
// do nothing
|
||||
} else if (used_block_num_ <= total_block_num_ / 2) {
|
||||
// Less than half of blocks were used, need shrink
|
||||
@ -544,7 +544,9 @@ void ObTenantCompactionMemPool::MemPoolShrinkTask::runTimerTask()
|
||||
bool ObTenantCompactionMemPool::acquire_reserve_mem()
|
||||
{
|
||||
bool bret = false;
|
||||
bret = ATOMIC_BCAS(&reserve_mode_signal_, 1, 0);
|
||||
if (!MTL_IS_MINI_MODE()) {
|
||||
bret = ATOMIC_BCAS(&reserve_mode_signal_, 1, 0);
|
||||
}
|
||||
return bret;
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "lib/task/ob_timer.h"
|
||||
#include "lib/lock/ob_spin_lock.h"
|
||||
#include "lib/allocator/page_arena.h"
|
||||
#include "lib/literals/ob_literals.h"
|
||||
#include "storage/blocksstable/ob_data_buffer.h"
|
||||
|
||||
|
||||
@ -138,9 +139,10 @@ private:
|
||||
};
|
||||
|
||||
public:
|
||||
static constexpr int64_t MAX_MEMORY_LIMIT = 2 << 26; // 128MB
|
||||
static constexpr int64_t RESERVE_MEM_SIZE = 2 << 24; // 32MB
|
||||
static constexpr int64_t CHECK_SHRINK_INTERVAL = 120L * 1000L * 1000L; // 120s
|
||||
static constexpr int64_t CHUNK_MEMORY_LIMIT = 128_MB;
|
||||
static constexpr int64_t MINI_MODE_CHUNK_MEMORY_LIMIT = 32_MB;
|
||||
static constexpr int64_t RESERVE_MEM_SIZE = 32_MB;
|
||||
static constexpr int64_t CHECK_SHRINK_INTERVAL = 120_s;
|
||||
private:
|
||||
MemPoolShrinkTask mem_shrink_task_;
|
||||
common::DefaultPageAllocator chunk_allocator_;
|
||||
|
@ -868,18 +868,17 @@ int ObStorageHATabletsBuilder::get_major_sstable_max_snapshot_(
|
||||
int64_t &max_snapshot_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObArray<ObITable *> sstables;
|
||||
ObArray<ObSSTableWrapper> sstables;
|
||||
|
||||
max_snapshot_version = 0;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("storage ha tablets builder do not init", K(ret));
|
||||
} else if (major_sstable_array.count() > 0 && OB_FAIL(major_sstable_array.get_all_tables(sstables))) {
|
||||
} else if (major_sstable_array.count() > 0 && OB_FAIL(major_sstable_array.get_all_table_wrappers(sstables))) {
|
||||
LOG_WARN("failed to get all tables", K(ret), K(param_));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < sstables.count(); ++i) {
|
||||
const ObITable *table = sstables.at(i);
|
||||
const ObSSTable *sstable = nullptr;
|
||||
const ObITable *table = sstables.at(i).get_sstable();
|
||||
|
||||
if (OB_ISNULL(table)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -914,7 +913,10 @@ int ObStorageHATabletsBuilder::get_minor_scn_range_(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
scn_range.reset();
|
||||
ObArray<ObITable *> sstables;
|
||||
|
||||
ObArray<ObSSTableWrapper> sstables;
|
||||
scn_range.start_scn_ = ObTabletMeta::INIT_CLOG_CHECKPOINT_SCN;
|
||||
scn_range.end_scn_ = ObTabletMeta::INIT_CLOG_CHECKPOINT_SCN;
|
||||
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -922,11 +924,11 @@ int ObStorageHATabletsBuilder::get_minor_scn_range_(
|
||||
} else if (OB_ISNULL(tablet)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("get remote logical minor scn range get invalid argument", K(ret), KP(tablet));
|
||||
} else if (minor_sstable_array.count() > 0 && OB_FAIL(minor_sstable_array.get_all_tables(sstables))) {
|
||||
} else if (minor_sstable_array.count() > 0 && OB_FAIL(minor_sstable_array.get_all_table_wrappers(sstables))) {
|
||||
LOG_WARN("failed to get all tables", K(ret), K(param_));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < sstables.count(); ++i) {
|
||||
const ObITable *table = sstables.at(i);
|
||||
const ObITable *table = sstables.at(i).get_sstable();
|
||||
|
||||
if (OB_ISNULL(table)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -996,7 +998,7 @@ int ObStorageHATabletsBuilder::get_ddl_sstable_min_start_scn_(
|
||||
SCN &max_start_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObArray<ObITable *> sstables;
|
||||
ObArray<ObSSTableWrapper> sstables;
|
||||
max_start_scn = SCN::max_scn();
|
||||
|
||||
if (!is_inited_) {
|
||||
@ -1005,11 +1007,11 @@ int ObStorageHATabletsBuilder::get_ddl_sstable_min_start_scn_(
|
||||
} else if (ddl_sstable_array.empty()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ddl sstable should not be empty", K(ret));
|
||||
} else if (OB_FAIL(ddl_sstable_array.get_all_tables(sstables))) {
|
||||
} else if (OB_FAIL(ddl_sstable_array.get_all_table_wrappers(sstables))) {
|
||||
LOG_WARN("failed to get all tables", K(ret), K(param_));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < sstables.count(); ++i) {
|
||||
const ObITable *table = sstables.at(i);
|
||||
const ObITable *table = sstables.at(i).get_sstable();
|
||||
|
||||
if (OB_ISNULL(table)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
@ -2402,16 +2402,7 @@ void ObMemtable::fill_compaction_param_(
|
||||
compaction_param.replay_interval_ = get_start_scn().get_val_for_tx() - ls_handle_.get_ls()->get_ls_meta().get_clog_checkpoint_scn().get_val_for_tx();
|
||||
compaction_param.last_end_scn_ = get_end_scn();
|
||||
compaction_param.add_time_ = current_time;
|
||||
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t total_bytes = 0;
|
||||
int64_t total_rows = 0;
|
||||
if (OB_FAIL(estimate_phy_size(nullptr, nullptr, total_bytes, total_rows))) {
|
||||
compaction_param.estimate_phy_size_ = get_occupied_size();
|
||||
TRANS_LOG(WARN, "failed to estimate memtable phy size", K(ret));
|
||||
} else {
|
||||
compaction_param.estimate_phy_size_ = total_bytes;
|
||||
}
|
||||
compaction_param.estimate_phy_size_ = mt_stat_.row_size_;
|
||||
}
|
||||
|
||||
bool ObMemtable::is_active_memtable() const
|
||||
|
@ -51,6 +51,10 @@ namespace memtable
|
||||
class ObMemtableScanIterator;
|
||||
class ObMemtableGetIterator;
|
||||
|
||||
|
||||
/*
|
||||
* Attention! When tx is rollback, insert/update/delete row count and size will not reduced accordingly
|
||||
*/
|
||||
struct ObMtStat
|
||||
{
|
||||
ObMtStat() { reset(); }
|
||||
|
@ -522,10 +522,20 @@ void ObTenantSysLoadShedder::refresh_sys_load()
|
||||
// do nothing
|
||||
} else if (REACH_TENANT_TIME_INTERVAL(CPU_TIME_SAMPLING_INTERVAL)) {
|
||||
load_shedding_factor_ = 1;
|
||||
(void) refresh_cpu_utility();
|
||||
|
||||
if (1 >= load_shedding_factor_) {
|
||||
(void) refresh_cpu_usage();
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
double max_cpu_cnt = 0;
|
||||
double min_cpu_cnt = 0;
|
||||
|
||||
if (OB_TMP_FAIL(GCTX.omt_->get_tenant_cpu(MTL_ID(), min_cpu_cnt, max_cpu_cnt))) {
|
||||
LOG_WARN_RET(tmp_ret, "failed to get tennant cpu cnt", "tenant_id", MTL_ID());
|
||||
} else {
|
||||
min_cpu_cnt_ = min_cpu_cnt;
|
||||
max_cpu_cnt_ = max_cpu_cnt;
|
||||
}
|
||||
|
||||
if (min_cpu_cnt_ > 0 && max_cpu_cnt_ > 0) {
|
||||
(void) refresh_cpu_utility();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -536,30 +546,31 @@ int ObTenantSysLoadShedder::refresh_cpu_utility()
|
||||
int64_t curr_cpu_time = last_cpu_time_;
|
||||
int64_t inc_cpu_time = 0;
|
||||
int64_t physical_cpu_utility = 0;
|
||||
double max_cpu_cnt = 0; // placeholder
|
||||
|
||||
if (OB_FAIL(GCTX.omt_->get_tenant_cpu(MTL_ID(), min_cpu_cnt_, max_cpu_cnt))) {
|
||||
LOG_WARN("failed to get tennant cpu cnt", K(ret));
|
||||
} else if (OB_FAIL(GCTX.omt_->get_tenant_cpu_time(MTL_ID(), curr_cpu_time))) {
|
||||
if (OB_FAIL(GCTX.omt_->get_tenant_cpu_time(MTL_ID(), curr_cpu_time))) {
|
||||
LOG_WARN("failed to get tennant cpu cnt", K(ret));
|
||||
} else {
|
||||
const int64_t curr_sample_time = ObTimeUtility::fast_current_time();
|
||||
if (0 == last_sample_time_) {
|
||||
if (0 == last_sample_time_ || 0 == last_cpu_time_) {
|
||||
// first time sample, no need to calculate cpu utility
|
||||
} else {
|
||||
inc_cpu_time = curr_cpu_time - last_cpu_time_;
|
||||
physical_cpu_utility = inc_cpu_time * 100 / (curr_sample_time - last_sample_time_);
|
||||
}
|
||||
last_sample_time_ = curr_sample_time;
|
||||
last_cpu_time_ = curr_cpu_time;
|
||||
|
||||
if (physical_cpu_utility >= min_cpu_cnt_ * CPU_UTIL_THRESHOLD) {
|
||||
if (physical_cpu_utility > max_cpu_cnt_ * 100) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("cpu utility unexpected bigger than cpu cnt", K(ret), K(max_cpu_cnt_),
|
||||
K(physical_cpu_utility), K(inc_cpu_time), K(curr_sample_time), K(last_sample_time_));
|
||||
} else if (physical_cpu_utility >= max_cpu_cnt_ * 100 * CPU_TIME_THRESHOLD) {
|
||||
ATOMIC_STORE(&load_shedding_factor_, DEFAULT_LOAD_SHEDDING_FACTOR);
|
||||
effect_time_ = ObTimeUtility::fast_current_time();
|
||||
FLOG_INFO("[ADAPTIVE_SCHED] refresh cpu utility", K(ret), K(load_shedding_factor_), K(min_cpu_cnt_),
|
||||
K(physical_cpu_utility), K(inc_cpu_time), K(curr_sample_time), K(last_sample_time_));
|
||||
}
|
||||
}
|
||||
|
||||
// debug log, remove later
|
||||
FLOG_INFO("BatMan refresh cpu utility", K(ret), K(load_shedding_factor_), K(min_cpu_cnt_), K(inc_cpu_time), K(physical_cpu_utility));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -567,15 +578,15 @@ int ObTenantSysLoadShedder::refresh_cpu_usage()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
// tenant_cpu_usage is a relatively large value, it includes the wait_time on lock, RPC, IO and so on.
|
||||
if (OB_FAIL(GCTX.omt_->get_tenant_cpu_usage(MTL_ID(), cpu_usage_))) {
|
||||
LOG_WARN("failed to get tenant cpu usage", K(ret));
|
||||
} else if (cpu_usage_ * 100 >= CPU_UTIL_THRESHOLD) {
|
||||
} else if (cpu_usage_ * 100 >= max_cpu_cnt_ * CPU_USAGE_THRESHOLD) {
|
||||
effect_time_ = ObTimeUtility::fast_current_time();
|
||||
ATOMIC_STORE(&load_shedding_factor_, DEFAULT_LOAD_SHEDDING_FACTOR);
|
||||
}
|
||||
|
||||
// debug log, remove later
|
||||
FLOG_INFO("BatMan refresh cpu usage", K(ret), K(load_shedding_factor_), "cpu_usage_percent", cpu_usage_ * 100 * 100);
|
||||
FLOG_INFO("[ADAPTIVE_SCHED] refresh cpu usage", K(ret), K(load_shedding_factor_), "cpu_usage_percent", cpu_usage_ * 100 * 100);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -323,7 +323,7 @@ public:
|
||||
void refresh_sys_load();
|
||||
int64_t get_load_shedding_factor() const { return ATOMIC_LOAD(&load_shedding_factor_); }
|
||||
|
||||
TO_STRING_KV(K_(load_shedding_factor), K_(last_cpu_time), K_(cpu_usage), K_(min_cpu_cnt), K_(effect_time));
|
||||
TO_STRING_KV(K_(load_shedding_factor), K_(last_cpu_time), K_(cpu_usage), K_(min_cpu_cnt), K_(max_cpu_cnt), K_(effect_time));
|
||||
private:
|
||||
int refresh_cpu_utility();
|
||||
int refresh_cpu_usage();
|
||||
@ -331,8 +331,9 @@ private:
|
||||
public:
|
||||
static const int64_t DEFAULT_LOAD_SHEDDING_FACTOR = 2;
|
||||
static const int64_t CPU_TIME_SAMPLING_INTERVAL = 20_s; //20 * 1000 * 1000 us
|
||||
static constexpr double CPU_UTIL_THRESHOLD = 0.6; // 60%
|
||||
static const int64_t SHEDDER_EXPIRE_TIME = 10_min;
|
||||
static constexpr double CPU_TIME_THRESHOLD = 0.6; // 60%
|
||||
static constexpr double CPU_USAGE_THRESHOLD = 0.8; // 80%
|
||||
static const int64_t SHEDDER_EXPIRE_TIME = 2_min;
|
||||
private:
|
||||
int64_t effect_time_;
|
||||
int64_t last_sample_time_;
|
||||
@ -340,6 +341,7 @@ private:
|
||||
int64_t last_cpu_time_;
|
||||
double cpu_usage_;
|
||||
double min_cpu_cnt_;
|
||||
double max_cpu_cnt_;
|
||||
};
|
||||
|
||||
|
||||
|
@ -71,7 +71,6 @@ public:
|
||||
blocksstable::ObSSTable *operator[](const int64_t pos) const;
|
||||
blocksstable::ObSSTable *at(const int64_t pos) const;
|
||||
ObITable *get_boundary_table(const bool is_last) const;
|
||||
int get_all_tables(ObIArray<ObITable *> &tables) const;
|
||||
int get_all_table_wrappers(ObIArray<ObSSTableWrapper> &tables, const bool need_unpack = false) const;
|
||||
int get_table(const ObITable::TableKey &table_key, ObSSTableWrapper &wrapper) const;
|
||||
int inc_macro_ref(bool &is_success) const;
|
||||
@ -85,6 +84,7 @@ public:
|
||||
OB_INLINE bool empty() const { return 0 == cnt_; }
|
||||
TO_STRING_KV(K_(cnt), KP_(sstable_array), K_(serialize_table_type), K_(is_inited));
|
||||
private:
|
||||
int get_all_tables(ObIArray<ObITable *> &tables) const;
|
||||
int inc_meta_ref_cnt(bool &inc_success) const;
|
||||
int inc_data_ref_cnt(bool &inc_success) const;
|
||||
void dec_meta_ref_cnt() const;
|
||||
|
@ -1614,7 +1614,6 @@ tablet_id bigint(20) NO NULL
|
||||
start_log_scn bigint(20) unsigned NO NULL
|
||||
end_log_scn bigint(20) unsigned NO NULL
|
||||
upper_trans_version bigint(20) unsigned NO NULL
|
||||
data_checksum bigint(20) NO NULL
|
||||
size bigint(20) NO NULL
|
||||
data_block_count bigint(20) NO NULL
|
||||
index_block_count bigint(20) NO NULL
|
||||
@ -1625,6 +1624,7 @@ contain_uncommitted_row varchar(3) NO NULL
|
||||
nested_offset bigint(20) NO NULL
|
||||
nested_size bigint(20) NO NULL
|
||||
cg_idx bigint(20) NO NULL
|
||||
data_checksum bigint(20) NO NULL
|
||||
select /*+QUERY_TIMEOUT(60000000)*/ IF(count(*) >= 0, 1, 0) from oceanbase.__all_virtual_table_mgr;
|
||||
IF(count(*) >= 0, 1, 0)
|
||||
1
|
||||
|
@ -2162,7 +2162,6 @@ tablet_id bigint(20) NO NULL
|
||||
start_log_scn bigint(20) unsigned NO NULL
|
||||
end_log_scn bigint(20) unsigned NO NULL
|
||||
upper_trans_version bigint(20) unsigned NO NULL
|
||||
data_checksum bigint(20) NO NULL
|
||||
size bigint(20) NO NULL
|
||||
data_block_count bigint(20) NO NULL
|
||||
index_block_count bigint(20) NO NULL
|
||||
@ -2173,6 +2172,7 @@ contain_uncommitted_row varchar(3) NO NULL
|
||||
nested_offset bigint(20) NO NULL
|
||||
nested_size bigint(20) NO NULL
|
||||
cg_idx bigint(20) NO NULL
|
||||
data_checksum bigint(20) NO NULL
|
||||
select /*+QUERY_TIMEOUT(60000000)*/ IF(count(*) >= 0, 1, 0) from oceanbase.__all_virtual_table_mgr;
|
||||
IF(count(*) >= 0, 1, 0)
|
||||
1
|
||||
|
Loading…
x
Reference in New Issue
Block a user