check expired ddl kv or sstable in ddl merge task
This commit is contained in:
		@ -289,8 +289,8 @@ int ObDDLTableMergeTask::process()
 | 
			
		||||
    } else if (merge_param_.start_scn_ > SCN::min_scn() && merge_param_.start_scn_ < ddl_param.start_scn_) {
 | 
			
		||||
      ret = OB_TASK_EXPIRED;
 | 
			
		||||
      LOG_INFO("ddl merge task expired, do nothing", K(merge_param_), "new_start_scn", ddl_param.start_scn_);
 | 
			
		||||
    } else if (OB_FAIL(ObTabletDDLUtil::get_compact_scn(ddl_table_iter, frozen_ddl_kvs_, compact_start_scn, compact_end_scn))) {
 | 
			
		||||
      LOG_WARN("get compact scn failed", K(ret), K(merge_param_));
 | 
			
		||||
    } else if (OB_FAIL(ObTabletDDLUtil::get_compact_scn(ddl_param.start_scn_, ddl_table_iter, frozen_ddl_kvs_, compact_start_scn, compact_end_scn))) {
 | 
			
		||||
      LOG_WARN("get compact scn failed", K(ret), K(merge_param_), K(ddl_param), K(ddl_table_iter), K(frozen_ddl_kvs_));
 | 
			
		||||
    } else if (ddl_param.commit_scn_.is_valid_and_not_min() && compact_end_scn > ddl_param.commit_scn_) {
 | 
			
		||||
      ret = OB_ERR_SYS;
 | 
			
		||||
      LOG_WARN("compact end scn is larger than commit scn", K(ret), K(ddl_param), K(compact_end_scn), K(frozen_ddl_kvs_), K(ddl_table_iter));
 | 
			
		||||
@ -1140,7 +1140,46 @@ int ObTabletDDLUtil::compact_ddl_kv(
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int check_ddl_sstable_expired(const SCN &ddl_start_scn, ObTableStoreIterator &ddl_sstable_iter)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObITable *table = nullptr;
 | 
			
		||||
  ObSSTable *ddl_sstable = nullptr;
 | 
			
		||||
  ObSSTableMetaHandle meta_handle;
 | 
			
		||||
  if (0 == ddl_sstable_iter.count()) {
 | 
			
		||||
    // do nothing
 | 
			
		||||
  } else if (OB_FAIL(ddl_sstable_iter.get_boundary_table(false, table))) {
 | 
			
		||||
    LOG_WARN("get first ddl sstable failed", K(ret));
 | 
			
		||||
  } else if (OB_ISNULL(ddl_sstable = static_cast<ObSSTable *>(table))) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("ddl sstable is null", K(ret), KPC(table));
 | 
			
		||||
  } else if (OB_FAIL(ddl_sstable->get_meta(meta_handle))) {
 | 
			
		||||
    LOG_WARN("get meta handle failed", K(ret));
 | 
			
		||||
  } else if (meta_handle.get_sstable_meta().get_ddl_scn() < ddl_start_scn) {
 | 
			
		||||
    ret = OB_TASK_EXPIRED;
 | 
			
		||||
    LOG_WARN("ddl sstable is expired", K(ret), K(meta_handle.get_sstable_meta()), K(ddl_start_scn));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int check_ddl_kv_expired(const SCN &ddl_start_scn, const ObIArray<ObDDLKVHandle> &frozen_ddl_kvs)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObDDLKV *ddl_kv = nullptr;
 | 
			
		||||
  if (frozen_ddl_kvs.empty()) {
 | 
			
		||||
    // do nothing
 | 
			
		||||
  } else if (OB_ISNULL(ddl_kv = frozen_ddl_kvs.at(0).get_obj())) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("ddl kv is null", K(ret), K(frozen_ddl_kvs));
 | 
			
		||||
  } else if (ddl_kv->get_ddl_start_scn() < ddl_start_scn) {
 | 
			
		||||
    ret = OB_TASK_EXPIRED;
 | 
			
		||||
    LOG_WARN("ddl sstable is expired", K(ret), KPC(ddl_kv), K(ddl_start_scn));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObTabletDDLUtil::get_compact_scn(
 | 
			
		||||
    const SCN &ddl_start_scn,
 | 
			
		||||
    ObTableStoreIterator &ddl_sstable_iter,
 | 
			
		||||
    const ObIArray<ObDDLKVHandle> &frozen_ddl_kvs,
 | 
			
		||||
    SCN &compact_start_scn,
 | 
			
		||||
@ -1154,11 +1193,15 @@ int ObTabletDDLUtil::get_compact_scn(
 | 
			
		||||
  if (OB_UNLIKELY((0 == ddl_sstable_iter.count() && frozen_ddl_kvs.empty()))) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid argument", K(ret), K(ddl_sstable_iter.count()), K(frozen_ddl_kvs.count()));
 | 
			
		||||
  } else if (OB_FAIL(check_ddl_sstable_expired(ddl_start_scn, ddl_sstable_iter))) {
 | 
			
		||||
    LOG_WARN("check ddl sstable expired failed", K(ret), K(ddl_start_scn), K(ddl_sstable_iter));
 | 
			
		||||
  } else if (ddl_sstable_iter.count() > 0 && OB_FAIL(check_data_continue(ddl_sstable_iter, is_data_continue, compact_start_scn, compact_end_scn))) {
 | 
			
		||||
    LOG_WARN("check ddl sstable continue failed", K(ret));
 | 
			
		||||
  } else if (!is_data_continue) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("ddl sstable not continuous", K(ret), K(ddl_sstable_iter));
 | 
			
		||||
  } else if (OB_FAIL(check_ddl_kv_expired(ddl_start_scn, frozen_ddl_kvs))) {
 | 
			
		||||
    LOG_WARN("check ddl kv expired failed", K(ret), K(ddl_start_scn), K(frozen_ddl_kvs));
 | 
			
		||||
  } else if (frozen_ddl_kvs.count() > 0 && OB_FAIL(check_data_continue(frozen_ddl_kvs, is_data_continue, compact_start_scn, compact_end_scn))) {
 | 
			
		||||
    LOG_WARN("check ddl sstable continue failed", K(ret));
 | 
			
		||||
  } else if (!is_data_continue) {
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user