fix memtable retrive problem
This commit is contained in:
		@ -11,6 +11,8 @@
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#define USING_LOG_PREFIX STORAGE
 | 
			
		||||
#define PRINT_TS_WRAPPER(x) (ObPrintTableStore(*(x.get_member())))
 | 
			
		||||
 | 
			
		||||
#include "ob_partition_merge_policy.h"
 | 
			
		||||
#include "share/ob_debug_sync_point.h"
 | 
			
		||||
#include "share/ob_debug_sync.h"
 | 
			
		||||
@ -166,9 +168,9 @@ int ObPartitionMergePolicy::get_mini_merge_tables(
 | 
			
		||||
  ObTenantFreezeInfoMgr::NeighbourFreezeInfo freeze_info;
 | 
			
		||||
  int64_t merge_inc_base_version = tablet.get_snapshot_version();
 | 
			
		||||
  const ObMergeType merge_type = param.merge_type_;
 | 
			
		||||
  ObSEArray<ObTableHandleV2, MAX_MEMSTORE_CNT> memtable_handles;
 | 
			
		||||
  result.reset();
 | 
			
		||||
  ObTabletMemberWrapper<ObTabletTableStore> table_store_wrapper;
 | 
			
		||||
  ObSEArray<ObITable *, MAX_MEMSTORE_CNT> memtables;
 | 
			
		||||
  DEBUG_SYNC(BEFORE_GET_MINOR_MGERGE_TABLES);
 | 
			
		||||
  if (OB_FAIL(tablet.fetch_table_store(table_store_wrapper))) {
 | 
			
		||||
    LOG_WARN("fail to fetch table store", K(ret));
 | 
			
		||||
@ -181,16 +183,16 @@ int ObPartitionMergePolicy::get_mini_merge_tables(
 | 
			
		||||
  } else if (table_store_wrapper.get_member()->get_minor_sstables().count() >= MAX_SSTABLE_CNT_IN_STORAGE) {
 | 
			
		||||
    ret = OB_SIZE_OVERFLOW;
 | 
			
		||||
    LOG_ERROR("Too many sstables, delay mini merge until sstable count falls below MAX_SSTABLE_CNT",
 | 
			
		||||
              K(ret), K(table_store_wrapper), K(tablet));
 | 
			
		||||
              K(ret), K(PRINT_TS_WRAPPER(table_store_wrapper)), K(tablet));
 | 
			
		||||
    // add compaction diagnose info
 | 
			
		||||
    ObPartitionMergePolicy::diagnose_table_count_unsafe(MINI_MERGE, tablet);
 | 
			
		||||
  } else if (OB_FAIL(tablet.get_memtable_mgr()->get_all_memtables(memtable_handles))) {
 | 
			
		||||
  } else if (OB_FAIL((table_store_wrapper.get_member()->get_memtables(memtables, true)))) {
 | 
			
		||||
    LOG_WARN("failed to get all memtables from memtable mgr", K(ret));
 | 
			
		||||
  } else if (OB_FAIL(get_neighbour_freeze_info(merge_inc_base_version,
 | 
			
		||||
                                               table_store_wrapper.get_member()->get_major_sstables().get_boundary_table(true),
 | 
			
		||||
                                               freeze_info))) {
 | 
			
		||||
    LOG_WARN("failed to get next major freeze", K(ret), K(merge_inc_base_version), K(table_store_wrapper));
 | 
			
		||||
  } else if (OB_FAIL(find_mini_merge_tables(param, freeze_info, ls, tablet, memtable_handles, result))) {
 | 
			
		||||
    LOG_WARN("failed to get next major freeze", K(ret), K(merge_inc_base_version), K(PRINT_TS_WRAPPER(table_store_wrapper)));
 | 
			
		||||
  } else if (OB_FAIL(find_mini_merge_tables(param, freeze_info, ls, tablet, memtables, result))) {
 | 
			
		||||
    if (OB_NO_NEED_MERGE != ret) {
 | 
			
		||||
      LOG_WARN("failed to find mini merge tables", K(ret), K(freeze_info));
 | 
			
		||||
    }
 | 
			
		||||
@ -207,7 +209,7 @@ int ObPartitionMergePolicy::find_mini_merge_tables(
 | 
			
		||||
    const ObTenantFreezeInfoMgr::NeighbourFreezeInfo &freeze_info,
 | 
			
		||||
    ObLS &ls,
 | 
			
		||||
    const storage::ObTablet &tablet,
 | 
			
		||||
    ObIArray<ObTableHandleV2> &memtable_handles,
 | 
			
		||||
    ObIArray<ObITable *> &memtables,
 | 
			
		||||
    ObGetMergeTablesResult &result)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
@ -223,15 +225,15 @@ int ObPartitionMergePolicy::find_mini_merge_tables(
 | 
			
		||||
  ObIMemtable *memtable = nullptr;
 | 
			
		||||
  const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
 | 
			
		||||
  bool need_update_snapshot_version = false;
 | 
			
		||||
  for (int64_t i = 0; OB_SUCC(ret) && i < memtable_handles.count(); ++i) {
 | 
			
		||||
    if (OB_ISNULL(memtable = static_cast<ObIMemtable *>(memtable_handles.at(i).get_table()))) {
 | 
			
		||||
  for (int64_t i = 0; OB_SUCC(ret) && i < memtables.count(); ++i) {
 | 
			
		||||
    if (OB_ISNULL(memtable = static_cast<ObIMemtable *>(memtables.at(i)))) {
 | 
			
		||||
      ret = OB_ERR_SYS;
 | 
			
		||||
      LOG_ERROR("memtable must not null", K(ret), K(tablet));
 | 
			
		||||
    } else if (OB_UNLIKELY(memtable->is_active_memtable())) {
 | 
			
		||||
      LOG_DEBUG("skip active memtable", K(i), KPC(memtable), K(memtable_handles));
 | 
			
		||||
      LOG_DEBUG("skip active memtable", K(i), KPC(memtable), K(memtables));
 | 
			
		||||
      break;
 | 
			
		||||
    } else if (!memtable->can_be_minor_merged()) {
 | 
			
		||||
      FLOG_INFO("memtable cannot mini merge now", K(ret), K(i), KPC(memtable), K(max_snapshot_version), K(memtable_handles), K(param));
 | 
			
		||||
      FLOG_INFO("memtable cannot mini merge now", K(ret), K(i), KPC(memtable), K(max_snapshot_version), K(memtables), K(param));
 | 
			
		||||
      break;
 | 
			
		||||
    } else if (memtable->get_end_scn() <= clog_checkpoint_scn) {
 | 
			
		||||
      if (!tablet_id.is_special_merge_tablet() &&
 | 
			
		||||
@ -391,7 +393,7 @@ int ObPartitionMergePolicy::get_boundary_snapshot_version(
 | 
			
		||||
  } else if (OB_FAIL(get_neighbour_freeze_info(merge_inc_base_version,
 | 
			
		||||
                                               last_major_table,
 | 
			
		||||
                                               freeze_info))) {
 | 
			
		||||
    LOG_WARN("failed to get freeze info", K(ret), K(merge_inc_base_version), K(table_store_wrapper));
 | 
			
		||||
    LOG_WARN("failed to get freeze info", K(ret), K(merge_inc_base_version), K(PRINT_TS_WRAPPER(table_store_wrapper)));
 | 
			
		||||
  } else if (check_table_cnt && table_store_wrapper.get_member()->get_table_count() >= OB_UNSAFE_TABLE_CNT) {
 | 
			
		||||
    max_snapshot = INT64_MAX;
 | 
			
		||||
    if (table_store_wrapper.get_member()->get_table_count() >= OB_EMERGENCY_TABLE_CNT) {
 | 
			
		||||
@ -634,7 +636,7 @@ int ObPartitionMergePolicy::deal_hist_minor_merge(
 | 
			
		||||
    if (0 == (max_snapshot_version = MTL(ObTenantFreezeInfoMgr*)->get_latest_frozen_version())) {
 | 
			
		||||
      // no freeze info found, wait normal mini minor to free sstable
 | 
			
		||||
      ret = OB_NO_NEED_MERGE;
 | 
			
		||||
      LOG_WARN("No freeze range to do hist minor merge for buiding index", K(ret), K(table_store_wrapper));
 | 
			
		||||
      LOG_WARN("No freeze range to do hist minor merge for buiding index", K(ret), K(PRINT_TS_WRAPPER(table_store_wrapper)));
 | 
			
		||||
    }
 | 
			
		||||
  } else {
 | 
			
		||||
    ObTenantFreezeInfoMgr::NeighbourFreezeInfo freeze_info;
 | 
			
		||||
@ -796,17 +798,17 @@ int ObPartitionMergePolicy::refine_mini_merge_result(
 | 
			
		||||
  } else if (result.scn_range_.start_scn_ > last_table->get_end_scn()) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_ERROR("Unexpected uncontinuous scn_range in mini merge",
 | 
			
		||||
              K(ret), K(result), KPC(last_table), K(table_store_wrapper), K(tablet));
 | 
			
		||||
              K(ret), K(result), KPC(last_table), K(PRINT_TS_WRAPPER(table_store_wrapper)), K(tablet));
 | 
			
		||||
  } else if (result.scn_range_.start_scn_ < last_table->get_end_scn()
 | 
			
		||||
      && !tablet.get_tablet_meta().tablet_id_.is_special_merge_tablet()) {
 | 
			
		||||
    // fix start_scn to make scn_range continuous in migrate phase for issue 42832934
 | 
			
		||||
    if (result.scn_range_.end_scn_ <= last_table->get_end_scn()) {
 | 
			
		||||
      ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
      LOG_WARN("No need mini merge memtable which is covered by existing sstable",
 | 
			
		||||
               K(ret), K(result), KPC(last_table), K(table_store_wrapper), K(tablet));
 | 
			
		||||
               K(ret), K(result), KPC(last_table), K(PRINT_TS_WRAPPER(table_store_wrapper)), K(tablet));
 | 
			
		||||
    } else {
 | 
			
		||||
      result.scn_range_.start_scn_ = last_table->get_end_scn();
 | 
			
		||||
      FLOG_INFO("Fix mini merge result scn range", K(ret), K(result), KPC(last_table), K(table_store_wrapper), K(tablet));
 | 
			
		||||
      FLOG_INFO("Fix mini merge result scn range", K(ret), K(result), KPC(last_table), K(PRINT_TS_WRAPPER(table_store_wrapper)), K(tablet));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
@ -1306,7 +1308,7 @@ int ObAdaptiveMergePolicy::get_meta_merge_tables(
 | 
			
		||||
      param.merge_type_, ls, tablet, result.version_range_))) {
 | 
			
		||||
    LOG_WARN("failed to get multi version_start", K(ret));
 | 
			
		||||
  } else {
 | 
			
		||||
    FLOG_INFO("succeed to get meta major merge tables", K(result), K(table_store_wrapper));
 | 
			
		||||
    FLOG_INFO("succeed to get meta major merge tables", K(result), K(PRINT_TS_WRAPPER(table_store_wrapper)));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
@ -1364,7 +1366,7 @@ int ObAdaptiveMergePolicy::find_meta_major_tables(
 | 
			
		||||
      for (int64_t i = 0; OB_SUCC(ret) && i < minor_tables.count(); ++i) {
 | 
			
		||||
        if (OB_ISNULL(table = minor_tables[i]) || !table->is_multi_version_minor_sstable()) {
 | 
			
		||||
          ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
          LOG_WARN("get unexpected table", K(ret), K(i), K(table_store_wrapper));
 | 
			
		||||
          LOG_WARN("get unexpected table", K(ret), K(i), K(PRINT_TS_WRAPPER(table_store_wrapper)));
 | 
			
		||||
        } else if (table->get_upper_trans_version() <= base_table->get_snapshot_version()) {
 | 
			
		||||
          // skip minor sstable which has been merged
 | 
			
		||||
          continue;
 | 
			
		||||
@ -1388,7 +1390,7 @@ int ObAdaptiveMergePolicy::find_meta_major_tables(
 | 
			
		||||
      if (OB_FAIL(ret)) {
 | 
			
		||||
      } else if (tx_determ_table_cnt < 2) {
 | 
			
		||||
        ret = OB_NO_NEED_MERGE;
 | 
			
		||||
        LOG_INFO("no enough table for meta merge", K(ret), K(result), K(table_store_wrapper));
 | 
			
		||||
        LOG_INFO("no enough table for meta merge", K(ret), K(result), K(PRINT_TS_WRAPPER(table_store_wrapper)));
 | 
			
		||||
      } else if (inc_row_cnt < TRANS_STATE_DETERM_ROW_CNT_THRESHOLD
 | 
			
		||||
        || inc_row_cnt < INC_ROW_COUNT_PERCENTAGE_THRESHOLD * base_row_cnt) {
 | 
			
		||||
        ret = OB_NO_NEED_MERGE;
 | 
			
		||||
@ -1403,7 +1405,7 @@ int ObAdaptiveMergePolicy::find_meta_major_tables(
 | 
			
		||||
    } else if (tx_determ_table_cnt < 2) {
 | 
			
		||||
      ret = OB_NO_NEED_MERGE;
 | 
			
		||||
      if (REACH_TENANT_TIME_INTERVAL(60 * 1000 * 1000/*60s*/)) {
 | 
			
		||||
        LOG_INFO("no enough table for meta merge", K(ret), K(result), K(table_store_wrapper));
 | 
			
		||||
        LOG_INFO("no enough table for meta merge", K(ret), K(result), K(PRINT_TS_WRAPPER(table_store_wrapper)));
 | 
			
		||||
      }
 | 
			
		||||
    } else if (inc_row_cnt < TRANS_STATE_DETERM_ROW_CNT_THRESHOLD
 | 
			
		||||
      || inc_row_cnt < INC_ROW_COUNT_PERCENTAGE_THRESHOLD * base_row_cnt) {
 | 
			
		||||
 | 
			
		||||
@ -107,7 +107,7 @@ private:
 | 
			
		||||
      const storage::ObTenantFreezeInfoMgr::NeighbourFreezeInfo &freeze_info,
 | 
			
		||||
      ObLS &ls,
 | 
			
		||||
      const storage::ObTablet &tablet,
 | 
			
		||||
      ObIArray<storage::ObTableHandleV2> &memtable_handles,
 | 
			
		||||
      ObIArray<storage::ObITable *> &memtables,
 | 
			
		||||
      storage::ObGetMergeTablesResult &result);
 | 
			
		||||
 | 
			
		||||
  static int find_minor_merge_tables(
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user