Fix migration dest copy macro block inner retry return 4070
This commit is contained in:
		
				
					committed by
					
						
						wangzelin.wzl
					
				
			
			
				
	
			
			
			
						parent
						
							e7a71c9f57
						
					
				
				
					commit
					f13a795692
				
			@ -135,8 +135,7 @@ ObPhysicalCopyTask::ObPhysicalCopyTask()
 | 
				
			|||||||
    copy_ctx_(nullptr),
 | 
					    copy_ctx_(nullptr),
 | 
				
			||||||
    finish_task_(nullptr),
 | 
					    finish_task_(nullptr),
 | 
				
			||||||
    copy_table_key_(),
 | 
					    copy_table_key_(),
 | 
				
			||||||
    copy_macro_range_info_(),
 | 
					    copy_macro_range_info_()
 | 
				
			||||||
    index_block_rebuilder_()
 | 
					 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -157,8 +156,6 @@ int ObPhysicalCopyTask::init(
 | 
				
			|||||||
    LOG_WARN("physical copy task get invalid argument", K(ret), KPC(copy_ctx), KPC(finish_task));
 | 
					    LOG_WARN("physical copy task get invalid argument", K(ret), KPC(copy_ctx), KPC(finish_task));
 | 
				
			||||||
  } else if (OB_FAIL(build_macro_block_copy_info_(finish_task))) {
 | 
					  } else if (OB_FAIL(build_macro_block_copy_info_(finish_task))) {
 | 
				
			||||||
    LOG_WARN("failed to build macro block copy info", K(ret), KPC(copy_ctx));
 | 
					    LOG_WARN("failed to build macro block copy info", K(ret), KPC(copy_ctx));
 | 
				
			||||||
  } else if (OB_FAIL(index_block_rebuilder_.init(*copy_ctx->sstable_index_builder_))) {
 | 
					 | 
				
			||||||
    LOG_WARN("failed to init index rebuilder", K(ret), KPC(copy_ctx));
 | 
					 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    copy_ctx_ = copy_ctx;
 | 
					    copy_ctx_ = copy_ctx;
 | 
				
			||||||
    finish_task_ = finish_task;
 | 
					    finish_task_ = finish_task;
 | 
				
			||||||
@ -211,8 +208,6 @@ int ObPhysicalCopyTask::process()
 | 
				
			|||||||
        ret = OB_ERR_SYS;
 | 
					        ret = OB_ERR_SYS;
 | 
				
			||||||
        LOG_ERROR("list count not match", K(ret), KPC(copy_macro_range_info_),
 | 
					        LOG_ERROR("list count not match", K(ret), KPC(copy_macro_range_info_),
 | 
				
			||||||
            K(copied_ctx.get_macro_block_count()), K(copied_ctx));
 | 
					            K(copied_ctx.get_macro_block_count()), K(copied_ctx));
 | 
				
			||||||
      } else if (OB_FAIL(index_block_rebuilder_.close())) {
 | 
					 | 
				
			||||||
        LOG_WARN("failed to close index block builder", K(ret), K(copied_ctx));
 | 
					 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    LOG_INFO("physical copy task finish", K(ret), KPC(copy_macro_range_info_), KPC(copy_ctx_));
 | 
					    LOG_INFO("physical copy task finish", K(ret), KPC(copy_macro_range_info_), KPC(copy_ctx_));
 | 
				
			||||||
@ -244,7 +239,7 @@ int ObPhysicalCopyTask::fetch_macro_block_with_retry_(
 | 
				
			|||||||
      if (retry_times > 0) {
 | 
					      if (retry_times > 0) {
 | 
				
			||||||
        LOG_INFO("retry get major block", K(retry_times));
 | 
					        LOG_INFO("retry get major block", K(retry_times));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
      if (OB_FAIL(fetch_macro_block_(copied_ctx))) {
 | 
					      if (OB_FAIL(fetch_macro_block_(retry_times, copied_ctx))) {
 | 
				
			||||||
        STORAGE_LOG(WARN, "failed to fetch major block", K(ret), K(retry_times));
 | 
					        STORAGE_LOG(WARN, "failed to fetch major block", K(ret), K(retry_times));
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -264,20 +259,25 @@ int ObPhysicalCopyTask::fetch_macro_block_with_retry_(
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int ObPhysicalCopyTask::fetch_macro_block_(
 | 
					int ObPhysicalCopyTask::fetch_macro_block_(
 | 
				
			||||||
 | 
					    const int64_t retry_times,
 | 
				
			||||||
    ObMacroBlocksWriteCtx &copied_ctx)
 | 
					    ObMacroBlocksWriteCtx &copied_ctx)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
  ObStorageHAMacroBlockWriter *writer = NULL;
 | 
					  ObStorageHAMacroBlockWriter *writer = NULL;
 | 
				
			||||||
  ObICopyMacroBlockReader *reader = NULL;
 | 
					  ObICopyMacroBlockReader *reader = NULL;
 | 
				
			||||||
 | 
					  ObIndexBlockRebuilder index_block_rebuilder;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (!is_inited_) {
 | 
					  if (!is_inited_) {
 | 
				
			||||||
    ret = OB_NOT_INIT;
 | 
					    ret = OB_NOT_INIT;
 | 
				
			||||||
    LOG_WARN("physical copy physical task do not init", K(ret));
 | 
					    LOG_WARN("physical copy physical task do not init", K(ret));
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    LOG_INFO("init reader", K(copy_table_key_));
 | 
					    LOG_INFO("init reader", K(copy_table_key_));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (OB_FAIL(get_macro_block_reader_(reader))) {
 | 
					    if (OB_FAIL(index_block_rebuilder.init(*copy_ctx_->sstable_index_builder_))) {
 | 
				
			||||||
 | 
					      LOG_WARN("failed to init index block rebuilder", K(ret), K(copy_table_key_));
 | 
				
			||||||
 | 
					    } else if (OB_FAIL(get_macro_block_reader_(reader))) {
 | 
				
			||||||
      LOG_WARN("fail to get macro block reader", K(ret));
 | 
					      LOG_WARN("fail to get macro block reader", K(ret));
 | 
				
			||||||
    } else if (OB_FAIL(get_macro_block_writer_(reader, writer))) {
 | 
					    } else if (OB_FAIL(get_macro_block_writer_(reader, &index_block_rebuilder, writer))) {
 | 
				
			||||||
      LOG_WARN("failed to get macro block writer", K(ret), K(copy_table_key_));
 | 
					      LOG_WARN("failed to get macro block writer", K(ret), K(copy_table_key_));
 | 
				
			||||||
    } else if (OB_FAIL(writer->process(copied_ctx))) {
 | 
					    } else if (OB_FAIL(writer->process(copied_ctx))) {
 | 
				
			||||||
      LOG_WARN("failed to process writer", K(ret), K(copy_table_key_));
 | 
					      LOG_WARN("failed to process writer", K(ret), K(copy_table_key_));
 | 
				
			||||||
@ -287,6 +287,23 @@ int ObPhysicalCopyTask::fetch_macro_block_(
 | 
				
			|||||||
          K(copied_ctx.get_macro_block_count()), K(copied_ctx));
 | 
					          K(copied_ctx.get_macro_block_count()), K(copied_ctx));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#ifdef ERRSIM
 | 
				
			||||||
 | 
					    if (OB_SUCC(ret)) {
 | 
				
			||||||
 | 
					      ret = E(EventTable::EN_MIGRATE_FETCH_MACRO_BLOCK) OB_SUCCESS;
 | 
				
			||||||
 | 
					      if (OB_FAIL(ret)) {
 | 
				
			||||||
 | 
					        if (retry_times == 0) {
 | 
				
			||||||
 | 
					        } else {
 | 
				
			||||||
 | 
					          ret = OB_SUCCESS;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        STORAGE_LOG(ERROR, "fake EN_MIGRATE_FETCH_MACRO_BLOCK", K(ret));
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if (FAILEDx(index_block_rebuilder.close())) {
 | 
				
			||||||
 | 
					      LOG_WARN("failed to close index block builder", K(ret), K(copied_ctx));
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (NULL != reader) {
 | 
					    if (NULL != reader) {
 | 
				
			||||||
      free_macro_block_reader_(reader);
 | 
					      free_macro_block_reader_(reader);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -406,6 +423,7 @@ int ObPhysicalCopyTask::get_macro_block_restore_reader_(
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
int ObPhysicalCopyTask::get_macro_block_writer_(
 | 
					int ObPhysicalCopyTask::get_macro_block_writer_(
 | 
				
			||||||
    ObICopyMacroBlockReader *reader,
 | 
					    ObICopyMacroBlockReader *reader,
 | 
				
			||||||
 | 
					    ObIndexBlockRebuilder *index_block_rebuilder,
 | 
				
			||||||
    ObStorageHAMacroBlockWriter *&writer)
 | 
					    ObStorageHAMacroBlockWriter *&writer)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
@ -413,16 +431,16 @@ int ObPhysicalCopyTask::get_macro_block_writer_(
 | 
				
			|||||||
  if (!is_inited_) {
 | 
					  if (!is_inited_) {
 | 
				
			||||||
    ret = OB_NOT_INIT;
 | 
					    ret = OB_NOT_INIT;
 | 
				
			||||||
    LOG_WARN("physical copy task do not init", K(ret));
 | 
					    LOG_WARN("physical copy task do not init", K(ret));
 | 
				
			||||||
  } else if (OB_ISNULL(reader)) {
 | 
					  } else if (OB_ISNULL(reader) || OB_ISNULL(index_block_rebuilder)) {
 | 
				
			||||||
    ret = OB_INVALID_ARGUMENT;
 | 
					    ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
    LOG_WARN("macro block writer get invalid argument", K(ret), KP(reader));
 | 
					    LOG_WARN("macro block writer get invalid argument", K(ret), KP(reader), KP(index_block_rebuilder));
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    void *buf = mtl_malloc(sizeof(ObStorageHAMacroBlockWriter), "MacroObWriter");
 | 
					    void *buf = mtl_malloc(sizeof(ObStorageHAMacroBlockWriter), "MacroObWriter");
 | 
				
			||||||
    if (OB_ISNULL(buf)) {
 | 
					    if (OB_ISNULL(buf)) {
 | 
				
			||||||
      ret = OB_ALLOCATE_MEMORY_FAILED;
 | 
					      ret = OB_ALLOCATE_MEMORY_FAILED;
 | 
				
			||||||
      LOG_WARN("failed to alloc memory", K(ret), KP(buf));
 | 
					      LOG_WARN("failed to alloc memory", K(ret), KP(buf));
 | 
				
			||||||
    } else if (FALSE_IT(writer = new(buf) ObStorageHAMacroBlockWriter())) {
 | 
					    } else if (FALSE_IT(writer = new(buf) ObStorageHAMacroBlockWriter())) {
 | 
				
			||||||
    } else if (OB_FAIL(writer->init(copy_ctx_->tenant_id_, reader, &index_block_rebuilder_))) {
 | 
					    } else if (OB_FAIL(writer->init(copy_ctx_->tenant_id_, reader, index_block_rebuilder))) {
 | 
				
			||||||
      STORAGE_LOG(WARN, "failed to init ob reader", K(ret), KPC(copy_ctx_));
 | 
					      STORAGE_LOG(WARN, "failed to init ob reader", K(ret), KPC(copy_ctx_));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -99,6 +99,7 @@ private:
 | 
				
			|||||||
  int fetch_macro_block_with_retry_(
 | 
					  int fetch_macro_block_with_retry_(
 | 
				
			||||||
      ObMacroBlocksWriteCtx &copied_ctx);
 | 
					      ObMacroBlocksWriteCtx &copied_ctx);
 | 
				
			||||||
  int fetch_macro_block_(
 | 
					  int fetch_macro_block_(
 | 
				
			||||||
 | 
					      const int64_t retry_times,
 | 
				
			||||||
      ObMacroBlocksWriteCtx &copied_ctx);
 | 
					      ObMacroBlocksWriteCtx &copied_ctx);
 | 
				
			||||||
  int build_macro_block_copy_info_(ObPhysicalCopyFinishTask *finish_task);
 | 
					  int build_macro_block_copy_info_(ObPhysicalCopyFinishTask *finish_task);
 | 
				
			||||||
  int get_macro_block_reader_(
 | 
					  int get_macro_block_reader_(
 | 
				
			||||||
@ -111,6 +112,7 @@ private:
 | 
				
			|||||||
      ObICopyMacroBlockReader *&reader);
 | 
					      ObICopyMacroBlockReader *&reader);
 | 
				
			||||||
  int get_macro_block_writer_(
 | 
					  int get_macro_block_writer_(
 | 
				
			||||||
      ObICopyMacroBlockReader *reader,
 | 
					      ObICopyMacroBlockReader *reader,
 | 
				
			||||||
 | 
					      ObIndexBlockRebuilder *index_block_rebuilder,
 | 
				
			||||||
      ObStorageHAMacroBlockWriter *&writer);
 | 
					      ObStorageHAMacroBlockWriter *&writer);
 | 
				
			||||||
  void free_macro_block_reader_(ObICopyMacroBlockReader *&reader);
 | 
					  void free_macro_block_reader_(ObICopyMacroBlockReader *&reader);
 | 
				
			||||||
  void free_macro_block_writer_(ObStorageHAMacroBlockWriter *&writer);
 | 
					  void free_macro_block_writer_(ObStorageHAMacroBlockWriter *&writer);
 | 
				
			||||||
@ -126,7 +128,6 @@ private:
 | 
				
			|||||||
  ObPhysicalCopyFinishTask *finish_task_;
 | 
					  ObPhysicalCopyFinishTask *finish_task_;
 | 
				
			||||||
  ObITable::TableKey copy_table_key_;
 | 
					  ObITable::TableKey copy_table_key_;
 | 
				
			||||||
  const ObCopyMacroRangeInfo *copy_macro_range_info_;
 | 
					  const ObCopyMacroRangeInfo *copy_macro_range_info_;
 | 
				
			||||||
  ObIndexBlockRebuilder index_block_rebuilder_;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
  DISALLOW_COPY_AND_ASSIGN(ObPhysicalCopyTask);
 | 
					  DISALLOW_COPY_AND_ASSIGN(ObPhysicalCopyTask);
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										28
									
								
								tools/deploy/init_for_ce.sql
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								tools/deploy/init_for_ce.sql
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,28 @@
 | 
				
			|||||||
 | 
					system sleep 5;
 | 
				
			||||||
 | 
					alter system set balancer_idle_time = '10s';
 | 
				
			||||||
 | 
					create user if not exists 'admin' IDENTIFIED BY 'admin';
 | 
				
			||||||
 | 
					use oceanbase;
 | 
				
			||||||
 | 
					create database if not exists test;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use test;
 | 
				
			||||||
 | 
					grant all on *.* to 'admin' WITH GRANT OPTION;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					alter system set enable_syslog_wf=false;
 | 
				
			||||||
 | 
					set @@session.ob_query_timeout = 200000000;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					source init_create_tenant_routines.sql;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					call adjust_sys_resource();
 | 
				
			||||||
 | 
					call create_tenant_by_memory_resource('mysql', 'mysql');
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					set @@session.ob_query_timeout = 10000000;
 | 
				
			||||||
 | 
					system sleep 5;
 | 
				
			||||||
 | 
					alter tenant sys set variables recyclebin = 'on';
 | 
				
			||||||
 | 
					alter tenant sys set variables ob_enable_truncate_flashback = 'on';
 | 
				
			||||||
 | 
					alter tenant mysql set variables ob_tcp_invited_nodes='%';
 | 
				
			||||||
 | 
					alter tenant mysql set variables recyclebin = 'on';
 | 
				
			||||||
 | 
					alter tenant mysql set variables ob_enable_truncate_flashback = 'on';
 | 
				
			||||||
 | 
					alter system set ob_compaction_schedule_interval = '10s' tenant all;
 | 
				
			||||||
 | 
					alter system set merger_check_interval = '10s' tenant all;
 | 
				
			||||||
		Reference in New Issue
	
	Block a user