Fix plan with das for update hang in px transmit operator
This commit is contained in:
		
							
								
								
									
										2
									
								
								build.sh
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								build.sh
									
									
									
									
									
								
							@ -201,7 +201,7 @@ function build
 | 
				
			|||||||
        ;;
 | 
					        ;;
 | 
				
			||||||
      xrpm)
 | 
					      xrpm)
 | 
				
			||||||
        STATIC_LINK_LGPL_DEPS_OPTION=OFF
 | 
					        STATIC_LINK_LGPL_DEPS_OPTION=OFF
 | 
				
			||||||
        do_build "$@" -DOB_BUILD_RPM=ON -DCMAKE_BUILD_TYPE=RelWithDebInfo -DOB_USE_LLD=$LLD_OPTION -DENABLE_FATAL_ERROR_HANG=OFF -DENABLE_AUTO_FDO=ON -DENABLE_THIN_LTO=ON -DOB_STATIC_LINK_LGPL_DEPS=$STATIC_LINK_LGPL_DEPS_OPTION
 | 
					        do_build "$@" -DOB_BUILD_RPM=ON -DCMAKE_BUILD_TYPE=RelWithDebInfo -DOB_USE_LLD=$LLD_OPTION -DENABLE_FATAL_ERROR_HANG=OFF -DENABLE_AUTO_FDO=ON -DOB_STATIC_LINK_LGPL_DEPS=$STATIC_LINK_LGPL_DEPS_OPTION
 | 
				
			||||||
        ;;
 | 
					        ;;
 | 
				
			||||||
      xenable_smart_var_check)
 | 
					      xenable_smart_var_check)
 | 
				
			||||||
        do_build "$@" -DCMAKE_BUILD_TYPE=Debug -DOB_USE_LLD=$LLD_OPTION -DENABLE_SMART_VAR_CHECK=ON -DOB_ENABLE_AVX2=ON
 | 
					        do_build "$@" -DCMAKE_BUILD_TYPE=Debug -DOB_USE_LLD=$LLD_OPTION -DENABLE_SMART_VAR_CHECK=ON -DOB_ENABLE_AVX2=ON
 | 
				
			||||||
 | 
				
			|||||||
@ -164,16 +164,6 @@ void ObPxTransmitOp::destroy()
 | 
				
			|||||||
  ObTransmitOp::destroy();
 | 
					  ObTransmitOp::destroy();
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
bool ObPxTransmitOp::is_dml_type_match_iter_end(bool need_drive_dml_query)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
  // In the new engine, insert, update, and delete no longer do real dml in inner_open,
 | 
					 | 
				
			||||||
  // do real dml in get_next_row, so the original early termination logic needs to be modified,
 | 
					 | 
				
			||||||
  // and other dml operators can still retain this optimization, such as replace, merge, etc.
 | 
					 | 
				
			||||||
  return child_->get_spec().is_dml_operator() &&
 | 
					 | 
				
			||||||
         !child_->get_spec().is_pdml_operator() &&
 | 
					 | 
				
			||||||
         !need_drive_dml_query;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
int ObPxTransmitOp::inner_open()
 | 
					int ObPxTransmitOp::inner_open()
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  int ret = OB_SUCCESS;
 | 
					  int ret = OB_SUCCESS;
 | 
				
			||||||
@ -226,9 +216,6 @@ int ObPxTransmitOp::fetch_first_row()
 | 
				
			|||||||
      OB_ISNULL(phy_plan = phy_plan_ctx->get_phy_plan())) {
 | 
					      OB_ISNULL(phy_plan = phy_plan_ctx->get_phy_plan())) {
 | 
				
			||||||
    ret = OB_ERR_UNEXPECTED;
 | 
					    ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
    LOG_WARN("null phy_plan or phy_plan_ctx", K(phy_plan_ctx), K(phy_plan), K(ret));
 | 
					    LOG_WARN("null phy_plan or phy_plan_ctx", K(phy_plan_ctx), K(phy_plan), K(ret));
 | 
				
			||||||
  } else if (is_dml_type_match_iter_end(phy_plan->need_drive_dml_query_)) {
 | 
					 | 
				
			||||||
    iter_end_ = true;
 | 
					 | 
				
			||||||
    LOG_TRACE("transmit iter end", K(ret), K(iter_end_));
 | 
					 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    const ObBatchRows *brs = NULL;
 | 
					    const ObBatchRows *brs = NULL;
 | 
				
			||||||
    if (is_vectorized()) {
 | 
					    if (is_vectorized()) {
 | 
				
			||||||
 | 
				
			|||||||
@ -1,4 +1,5 @@
 | 
				
			|||||||
/**
 | 
					/**
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 * Copyright (c) 2021 OceanBase
 | 
					 * Copyright (c) 2021 OceanBase
 | 
				
			||||||
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
					 * OceanBase CE is licensed under Mulan PubL v2.
 | 
				
			||||||
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
					 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
				
			||||||
@ -148,7 +149,6 @@ private:
 | 
				
			|||||||
  int broadcast_eof_row();
 | 
					  int broadcast_eof_row();
 | 
				
			||||||
  int next_row();
 | 
					  int next_row();
 | 
				
			||||||
  int set_rollup_hybrid_keys(ObSliceIdxCalc &slice_calc);
 | 
					  int set_rollup_hybrid_keys(ObSliceIdxCalc &slice_calc);
 | 
				
			||||||
  bool is_dml_type_match_iter_end(bool need_drive_dml_query);
 | 
					 | 
				
			||||||
  int fetch_first_row();
 | 
					  int fetch_first_row();
 | 
				
			||||||
  int set_expect_range_count();
 | 
					  int set_expect_range_count();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -179,7 +179,7 @@ int ObLSMemberTable::on_commit_create_tablets(const obrpc::ObBatchCreateTabletAr
 | 
				
			|||||||
    if (OB_FAIL(tablet_svr->on_abort_create_tablets(*arg, trans_flags))) {
 | 
					    if (OB_FAIL(tablet_svr->on_abort_create_tablets(*arg, trans_flags))) {
 | 
				
			||||||
      LOG_WARN("fail to on_abort_create_tablets", KR(ret), KPC(arg), K(trans_flags));
 | 
					      LOG_WARN("fail to on_abort_create_tablets", KR(ret), KPC(arg), K(trans_flags));
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      ls->get_tablet_gc_handler()->set_tablet_gc_trigger();
 | 
					      ls->get_tablet_gc_handler()->set_tablet_persist_trigger();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  if (OB_FAIL(ret)) {
 | 
					  if (OB_FAIL(ret)) {
 | 
				
			||||||
@ -358,7 +358,7 @@ int ObLSMemberTable::on_commit_remove_tablets(const obrpc::ObBatchRemoveTabletAr
 | 
				
			|||||||
    if (OB_FAIL(tablet_svr->on_commit_remove_tablets(*arg, trans_flags))) {
 | 
					    if (OB_FAIL(tablet_svr->on_commit_remove_tablets(*arg, trans_flags))) {
 | 
				
			||||||
      LOG_WARN("fail to on_commit_remove_tablets", KR(ret), KPC(arg), K(trans_flags));
 | 
					      LOG_WARN("fail to on_commit_remove_tablets", KR(ret), KPC(arg), K(trans_flags));
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      ls->get_tablet_gc_handler()->set_tablet_gc_trigger();
 | 
					      ls->get_tablet_gc_handler()->set_tablet_persist_trigger();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  } else if (OB_FAIL(tablet_svr->on_abort_remove_tablets(*arg, trans_flags))) {
 | 
					  } else if (OB_FAIL(tablet_svr->on_abort_remove_tablets(*arg, trans_flags))) {
 | 
				
			||||||
    LOG_WARN("fail to on_abort_remove_tablets", KR(ret), KPC(arg), K(trans_flags));
 | 
					    LOG_WARN("fail to on_abort_remove_tablets", KR(ret), KPC(arg), K(trans_flags));
 | 
				
			||||||
 | 
				
			|||||||
@ -450,7 +450,6 @@ int ObTabletGCHandler::offline()
 | 
				
			|||||||
void ObTabletGCHandler::online()
 | 
					void ObTabletGCHandler::online()
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
  set_tablet_persist_trigger();
 | 
					  set_tablet_persist_trigger();
 | 
				
			||||||
  set_tablet_gc_trigger();
 | 
					 | 
				
			||||||
  set_start();
 | 
					  set_start();
 | 
				
			||||||
  STORAGE_LOG(INFO, "tablet gc handler online", KPC(this), KPC(ls_), K(ls_->get_ls_meta()));
 | 
					  STORAGE_LOG(INFO, "tablet gc handler online", KPC(this), KPC(ls_), K(ls_->get_ls_meta()));
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -28,6 +28,7 @@ namespace checkpoint
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
class ObTabletGCHandler
 | 
					class ObTabletGCHandler
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
					  friend class ObTabletGCService;
 | 
				
			||||||
public:
 | 
					public:
 | 
				
			||||||
  ObTabletGCHandler()
 | 
					  ObTabletGCHandler()
 | 
				
			||||||
    : ls_(NULL),
 | 
					    : ls_(NULL),
 | 
				
			||||||
@ -50,7 +51,6 @@ public:
 | 
				
			|||||||
  static bool is_tablet_gc_trigger(uint8_t tablet_persist_trigger)
 | 
					  static bool is_tablet_gc_trigger(uint8_t tablet_persist_trigger)
 | 
				
			||||||
  { return 0 != (tablet_persist_trigger & 2); }
 | 
					  { return 0 != (tablet_persist_trigger & 2); }
 | 
				
			||||||
  void set_tablet_persist_trigger();
 | 
					  void set_tablet_persist_trigger();
 | 
				
			||||||
  void set_tablet_gc_trigger();
 | 
					 | 
				
			||||||
  uint8_t get_tablet_persist_trigger_and_reset();
 | 
					  uint8_t get_tablet_persist_trigger_and_reset();
 | 
				
			||||||
  int get_unpersist_tablet_ids(common::ObTabletIDArray &unpersist_create_tablet_ids,
 | 
					  int get_unpersist_tablet_ids(common::ObTabletIDArray &unpersist_create_tablet_ids,
 | 
				
			||||||
                               bool only_deleted = false);
 | 
					                               bool only_deleted = false);
 | 
				
			||||||
@ -71,6 +71,7 @@ private:
 | 
				
			|||||||
  bool is_finish() { obsys::ObWLockGuard lock(wait_lock_, false); return lock.acquired(); }
 | 
					  bool is_finish() { obsys::ObWLockGuard lock(wait_lock_, false); return lock.acquired(); }
 | 
				
			||||||
  void set_stop() { ATOMIC_STORE(&update_enabled_, false); }
 | 
					  void set_stop() { ATOMIC_STORE(&update_enabled_, false); }
 | 
				
			||||||
  void set_start() { ATOMIC_STORE(&update_enabled_, true); }
 | 
					  void set_start() { ATOMIC_STORE(&update_enabled_, true); }
 | 
				
			||||||
 | 
					  void set_tablet_gc_trigger();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
public:
 | 
					public:
 | 
				
			||||||
  obsys::ObRWLock wait_lock_;
 | 
					  obsys::ObRWLock wait_lock_;
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user