fix row col switch estimate row get tables
This commit is contained in:
12
deps/oblib/src/lib/container/ob_mask_set2.h
vendored
12
deps/oblib/src/lib/container/ob_mask_set2.h
vendored
@ -16,9 +16,9 @@
|
||||
#include "ob_bit_set.h"
|
||||
#include "ob_iarray.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace common
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace common
|
||||
{
|
||||
|
||||
template<typename T>
|
||||
@ -150,9 +150,9 @@ public:
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
bool is_mask(const T &key)
|
||||
{
|
||||
bool bool_ret = false;
|
||||
bool is_mask(const T &key)
|
||||
{
|
||||
bool bool_ret = false;
|
||||
if (is_inited_) {
|
||||
for (int64_t i = 0; i < array_->count(); ++i) {
|
||||
if (array_->at(i) == key && bitset_.has_member(i)) {
|
||||
|
@ -805,9 +805,22 @@ int ObMediumCompactionScheduleFunc::init_co_major_merge_type(
|
||||
ObMediumCompactionInfo &medium_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSSTable *first_sstable = static_cast<ObSSTable *>(result.handle_.get_table(0));
|
||||
ObCOSSTableV2 *co_sstable = nullptr;
|
||||
ObCOMajorMergePolicy::ObCOMajorMergeType major_merge_type = ObCOMajorMergePolicy::INVALID_CO_MAJOR_MERGE_TYPE;
|
||||
if (OB_FAIL(ObCOMajorMergePolicy::decide_co_major_merge_type(
|
||||
result,
|
||||
ObTabletTableIterator iter;
|
||||
ObSEArray<ObITable*, OB_DEFAULT_SE_ARRAY_COUNT> tables;
|
||||
if (OB_ISNULL(first_sstable) || OB_UNLIKELY(!first_sstable->is_co_sstable())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("first sstable in tables handle is null or not co sstable", K(ret), K(result.handle_));
|
||||
} else if (FALSE_IT(co_sstable = static_cast<ObCOSSTableV2 *>(first_sstable))) {
|
||||
} else if (OB_FAIL(iter.set_tablet_handle(tablet_handle_))) {
|
||||
LOG_WARN("failed to set tablet handle", K(ret), K(iter), K(tablet_handle_));
|
||||
} else if (OB_FAIL(iter.get_read_tables_from_tablet(medium_info.medium_snapshot_, false/*allow_no_ready_read*/, false/*major_sstable_only*/, tables))) {
|
||||
LOG_WARN("failed to get read tables for estimate row cnt", K(ret), K(medium_info), K(iter));
|
||||
} else if (OB_FAIL(ObCOMajorMergePolicy::decide_co_major_merge_type(
|
||||
*co_sstable,
|
||||
tables,
|
||||
medium_info.storage_schema_,
|
||||
tablet_handle_,
|
||||
major_merge_type))) {
|
||||
|
@ -1768,7 +1768,7 @@ int ObCOMajorMergePolicy::decide_co_major_sstable_status(
|
||||
|
||||
int ObCOMajorMergePolicy::estimate_row_cnt_for_major_merge(
|
||||
const uint64_t table_id,
|
||||
const ObTablesHandleArray &tables_handle,
|
||||
const ObIArray<ObITable *> &tables,
|
||||
const ObStorageSchema &storage_schema,
|
||||
const ObTabletHandle &tablet_handle,
|
||||
int64_t &estimate_row_cnt)
|
||||
@ -1781,7 +1781,6 @@ int ObCOMajorMergePolicy::estimate_row_cnt_for_major_merge(
|
||||
false, /*full row scan flag, obsoleted*/
|
||||
false, /*index back*/
|
||||
false); /*query_stat*/
|
||||
ObSEArray<ObITable*, OB_DEFAULT_SE_ARRAY_COUNT> tables;
|
||||
estimate_row_cnt = 0;
|
||||
#ifdef ERRSIM
|
||||
ret = OB_E(EventTable::EN_COMPACTION_ESTIMATE_ROW_FAILED) ret;
|
||||
@ -1794,9 +1793,7 @@ int ObCOMajorMergePolicy::estimate_row_cnt_for_major_merge(
|
||||
* 1. if tables.empty(), no writes, row count = 0
|
||||
* 2. else, do estimate, use logical row count instead of physical
|
||||
*/
|
||||
if (OB_FAIL(tables_handle.get_tables(tables))) {
|
||||
LOG_WARN("failed to get tables", K(ret), K(tables));
|
||||
} else if (!tables.empty()) {
|
||||
if (!tables.empty()) {
|
||||
ObTableEstimateBaseInput base_input(query_flag, table_id, transaction::ObTransID(), tables, tablet_handle);
|
||||
ObDatumRange whole_range;
|
||||
whole_range.set_whole_range();
|
||||
@ -1840,26 +1837,20 @@ bool ObCOMajorMergePolicy::whether_to_rebuild_column_store(
|
||||
}
|
||||
|
||||
int ObCOMajorMergePolicy::decide_co_major_merge_type(
|
||||
const ObGetMergeTablesResult &result,
|
||||
const ObCOSSTableV2 &co_sstable,
|
||||
const ObIArray<ObITable *> &tables,
|
||||
const ObStorageSchema &storage_schema,
|
||||
const ObTabletHandle &tablet_handle,
|
||||
ObCOMajorMergeType &major_merge_type)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSSTable *first_sstable = static_cast<ObSSTable *>(result.handle_.get_table(0));
|
||||
ObCOSSTableV2 *co_sstable = nullptr;
|
||||
ObCOMajorSSTableStatus major_sstable_status = ObCOMajorSSTableStatus::INVALID_CO_MAJOR_SSTABLE_STATUS;
|
||||
int64_t estimate_row_cnt = 0;
|
||||
ObTabletID tablet_id;
|
||||
const ObTabletID tablet_id = co_sstable.get_key().tablet_id_;
|
||||
|
||||
if (OB_ISNULL(first_sstable) || OB_UNLIKELY(!first_sstable->is_co_sstable())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("first sstable in tables handle is null or not co sstable", K(ret), K(result.handle_));
|
||||
} else if (FALSE_IT(co_sstable = static_cast<ObCOSSTableV2 *>(first_sstable))) {
|
||||
} else if (FALSE_IT(tablet_id = co_sstable->get_key().tablet_id_)) {
|
||||
} else if (OB_FAIL(decide_co_major_sstable_status(*co_sstable, storage_schema, major_sstable_status))) {
|
||||
if (OB_FAIL(decide_co_major_sstable_status(co_sstable, storage_schema, major_sstable_status))) {
|
||||
LOG_WARN("failed to decide co major sstable status");
|
||||
} else if (OB_FAIL(estimate_row_cnt_for_major_merge(tablet_id.id(), result.handle_, storage_schema, tablet_handle, estimate_row_cnt))) {
|
||||
} else if (OB_FAIL(estimate_row_cnt_for_major_merge(tablet_id.id(), tables, storage_schema, tablet_handle, estimate_row_cnt))) {
|
||||
// if estimate row cnt failed, make major sstable match schema
|
||||
major_merge_type = is_major_sstable_match_schema(major_sstable_status) ? BUILD_COLUMN_STORE_MERGE : REBUILD_COLUMN_STORE_MERGE;
|
||||
LOG_WARN("failed to estimate row count for co major merge, build column store by default", "estimate_ret", ret, K(major_sstable_status), K(major_merge_type));
|
||||
@ -1878,7 +1869,7 @@ int ObCOMajorMergePolicy::decide_co_major_merge_type(
|
||||
} else {
|
||||
major_merge_type = BUILD_ROW_STORE_MERGE;
|
||||
}
|
||||
LOG_DEBUG("[RowColSwitch] finish decide major merge type", K(tablet_id), K(major_sstable_status), K(major_merge_type), K(estimate_row_cnt), K(column_cnt));
|
||||
LOG_INFO("[RowColSwitch] finish decide major merge type", K(tablet_id), K(co_sstable), K(tables), K(major_sstable_status), K(major_merge_type), K(estimate_row_cnt), K(column_cnt));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -325,7 +325,7 @@ public:
|
||||
ObCOMajorSSTableStatus &major_sstable_status);
|
||||
static int estimate_row_cnt_for_major_merge(
|
||||
const uint64_t table_id,
|
||||
const ObTablesHandleArray &tables_handle,
|
||||
const ObIArray<ObITable *> &tables,
|
||||
const ObStorageSchema &storage_schema,
|
||||
const ObTabletHandle &tablet_handle,
|
||||
int64_t &estimate_row_cnt);
|
||||
@ -337,7 +337,8 @@ public:
|
||||
const int64_t &estimate_row_cnt,
|
||||
const int64_t &column_cnt);
|
||||
static int decide_co_major_merge_type(
|
||||
const ObGetMergeTablesResult &result,
|
||||
const ObCOSSTableV2 &co_sstable,
|
||||
const ObIArray<ObITable *> &tables,
|
||||
const ObStorageSchema &storage_schema,
|
||||
const ObTabletHandle &tablet_handle,
|
||||
ObCOMajorMergeType &major_merge_type);
|
||||
|
@ -297,5 +297,35 @@ int ObTabletTableIterator::refresh_read_tables_from_tablet(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletTableIterator::get_read_tables_from_tablet(
|
||||
const int64_t snapshot_version,
|
||||
const bool allow_no_ready_read,
|
||||
const bool major_sstable_only,
|
||||
ObIArray<ObITable *> &tables)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(refresh_read_tables_from_tablet(snapshot_version, allow_no_ready_read, major_sstable_only))) {
|
||||
LOG_WARN("failed to refresh read tables", K(ret), K(snapshot_version), K(allow_no_ready_read), K(major_sstable_only), KPC(this));
|
||||
} else {
|
||||
while(OB_SUCC(ret)) {
|
||||
ObITable *table = nullptr;
|
||||
if (OB_FAIL(table_store_iter_.get_next(table))) {
|
||||
if (OB_LIKELY(OB_ITER_END == ret)) {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
} else {
|
||||
STORAGE_LOG(WARN, "failed to get next table iter", K(ret), KPC(this));
|
||||
}
|
||||
} else if (OB_ISNULL(table)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get nullptr table", K(ret), KP(table), KPC(this));
|
||||
} else if (OB_FAIL(tables.push_back(table))) {
|
||||
LOG_WARN("failed to push back table", K(ret), K(tables), KPC(table));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // end namespace storage
|
||||
} // end namespace oceanbase
|
||||
|
@ -100,6 +100,11 @@ public:
|
||||
const int64_t snapshot_version,
|
||||
const bool allow_no_ready_read,
|
||||
const bool major_sstable_only = false);
|
||||
int get_read_tables_from_tablet(
|
||||
const int64_t snapshot_version,
|
||||
const bool allow_no_ready_read,
|
||||
const bool major_sstable_only,
|
||||
ObIArray<ObITable *> &tables);
|
||||
TO_STRING_KV(K_(tablet_handle), K_(transfer_src_handle), K_(table_store_iter));
|
||||
private:
|
||||
ObTabletHandle tablet_handle_;
|
||||
|
Reference in New Issue
Block a user