use timeout from THIS_WORKER to limit duration of get read tables in query route
This commit is contained in:
		@ -1274,21 +1274,26 @@ int ObMultipleMerge::refresh_tablet_iter()
 | 
				
			|||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    // reset first, in case get_read_tables fail and rowkey_read_info_ become dangling
 | 
					    // reset first, in case get_read_tables fail and rowkey_read_info_ become dangling
 | 
				
			||||||
    access_param_->iter_param_.rowkey_read_info_ = nullptr;
 | 
					    access_param_->iter_param_.rowkey_read_info_ = nullptr;
 | 
				
			||||||
    const common::ObTabletID tablet_id = get_table_param_->tablet_iter_.get_tablet()->get_tablet_meta().tablet_id_;
 | 
					    const int64_t remain_timeout = THIS_WORKER.get_timeout_remain();
 | 
				
			||||||
    if (OB_FAIL(MTL(ObLSService*)->get_ls(access_ctx_->ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) {
 | 
					    const share::ObLSID &ls_id = access_ctx_->ls_id_;
 | 
				
			||||||
      LOG_WARN("failed to get ls", K(ret));
 | 
					    const common::ObTabletID &tablet_id = get_table_param_->tablet_iter_.get_tablet()->get_tablet_meta().tablet_id_;
 | 
				
			||||||
 | 
					    if (OB_UNLIKELY(remain_timeout <= 0)) {
 | 
				
			||||||
 | 
					      ret = OB_TIMEOUT;
 | 
				
			||||||
 | 
					      LOG_WARN("timeout reached", K(ret), K(ls_id), K(tablet_id), K(remain_timeout));
 | 
				
			||||||
 | 
					    } else if (OB_FAIL(MTL(ObLSService*)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
 | 
				
			||||||
 | 
					      LOG_WARN("failed to get ls", K(ret), K(ls_id));
 | 
				
			||||||
    } else if (OB_ISNULL(ls_handle.get_ls())) {
 | 
					    } else if (OB_ISNULL(ls_handle.get_ls())) {
 | 
				
			||||||
      ret = OB_ERR_UNEXPECTED;
 | 
					      ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
      LOG_WARN("ls is null", K(ret), K(ls_handle));
 | 
					      LOG_WARN("ls is null", K(ret), K(ls_handle));
 | 
				
			||||||
    } else if (OB_FAIL(ls_handle.get_ls()->get_tablet_svr()->get_read_tables(
 | 
					    } else if (OB_FAIL(ls_handle.get_ls()->get_tablet_svr()->get_read_tables(
 | 
				
			||||||
        tablet_id,
 | 
					        tablet_id,
 | 
				
			||||||
        ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US,
 | 
					        remain_timeout,
 | 
				
			||||||
        get_table_param_->sample_info_.is_no_sample()
 | 
					        get_table_param_->sample_info_.is_no_sample()
 | 
				
			||||||
          ? access_ctx_->store_ctx_->mvcc_acc_ctx_.get_snapshot_version().get_val_for_tx()
 | 
					          ? access_ctx_->store_ctx_->mvcc_acc_ctx_.get_snapshot_version().get_val_for_tx()
 | 
				
			||||||
          : INT64_MAX,
 | 
					          : INT64_MAX,
 | 
				
			||||||
        get_table_param_->tablet_iter_,
 | 
					        get_table_param_->tablet_iter_,
 | 
				
			||||||
        false/*allow_not_ready*/))) {
 | 
					        false/*allow_not_ready*/))) {
 | 
				
			||||||
      LOG_WARN("failed to refresh tablet iterator", K(ret), K_(get_table_param), KP_(access_param));
 | 
					      LOG_WARN("failed to refresh tablet iterator", K(ret), K(ls_id), K_(get_table_param), KP_(access_param));
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      get_table_param_->refreshed_merge_ = this;
 | 
					      get_table_param_->refreshed_merge_ = this;
 | 
				
			||||||
      access_param_->iter_param_.rowkey_read_info_ =
 | 
					      access_param_->iter_param_.rowkey_read_info_ =
 | 
				
			||||||
 | 
				
			|||||||
@ -2384,8 +2384,12 @@ int ObLSTabletService::check_read_info_same(const AllowToReadMgr::AllowToReadInf
 | 
				
			|||||||
  return ret;
 | 
					  return ret;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// ATTENTION!
 | 
				
			||||||
 | 
					// here we pass VALUE rather than REF for tablet id,
 | 
				
			||||||
 | 
					// because tablet id may be from iter, which will be reset in function,
 | 
				
			||||||
 | 
					// thus tablet id will be invalid
 | 
				
			||||||
int ObLSTabletService::get_read_tables(
 | 
					int ObLSTabletService::get_read_tables(
 | 
				
			||||||
    const common::ObTabletID &tablet_id,
 | 
					    const common::ObTabletID tablet_id,
 | 
				
			||||||
    const int64_t timeout_us,
 | 
					    const int64_t timeout_us,
 | 
				
			||||||
    const int64_t snapshot_version,
 | 
					    const int64_t snapshot_version,
 | 
				
			||||||
    ObTabletTableIterator &iter,
 | 
					    ObTabletTableIterator &iter,
 | 
				
			||||||
 | 
				
			|||||||
@ -289,7 +289,7 @@ public:
 | 
				
			|||||||
      const bool for_replay = false,
 | 
					      const bool for_replay = false,
 | 
				
			||||||
      const share::SCN clog_checkpoint_scn = share::SCN::min_scn());
 | 
					      const share::SCN clog_checkpoint_scn = share::SCN::min_scn());
 | 
				
			||||||
  int get_read_tables(
 | 
					  int get_read_tables(
 | 
				
			||||||
      const common::ObTabletID &tablet_id,
 | 
					      const common::ObTabletID tablet_id,
 | 
				
			||||||
      const int64_t timeout_us,
 | 
					      const int64_t timeout_us,
 | 
				
			||||||
      const int64_t snapshot_version,
 | 
					      const int64_t snapshot_version,
 | 
				
			||||||
      ObTabletTableIterator &iter,
 | 
					      ObTabletTableIterator &iter,
 | 
				
			||||||
 | 
				
			|||||||
@ -178,24 +178,20 @@ int ObStorageTableGuard::refresh_and_protect_table(ObRelativeTable &relative_tab
 | 
				
			|||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  while (OB_SUCC(ret) && need_to_refresh_table(*iter.table_iter())) {
 | 
					  while (OB_SUCC(ret) && need_to_refresh_table(*iter.table_iter())) {
 | 
				
			||||||
    if (OB_FAIL(store_ctx_.ls_->get_tablet_svr()->get_read_tables(
 | 
					    const int64_t remain_timeout = THIS_WORKER.get_timeout_remain();
 | 
				
			||||||
 | 
					    if (OB_UNLIKELY(remain_timeout <= 0)) {
 | 
				
			||||||
 | 
					      ret = OB_TRANS_STMT_TIMEOUT;
 | 
				
			||||||
 | 
					    } else if (OB_FAIL(store_ctx_.ls_->get_tablet_svr()->get_read_tables(
 | 
				
			||||||
        tablet_id,
 | 
					        tablet_id,
 | 
				
			||||||
        ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US,
 | 
					        remain_timeout,
 | 
				
			||||||
        store_ctx_.mvcc_acc_ctx_.get_snapshot_version().get_val_for_tx(),
 | 
					        store_ctx_.mvcc_acc_ctx_.get_snapshot_version().get_val_for_tx(),
 | 
				
			||||||
        iter,
 | 
					        iter,
 | 
				
			||||||
        relative_table.allow_not_ready()))) {
 | 
					        relative_table.allow_not_ready()))) {
 | 
				
			||||||
      LOG_WARN("fail to get read tables", K(ret), K(ls_id), K(tablet_id),
 | 
					      LOG_WARN("fail to get read tables", K(ret), K(ls_id), K(remain_timeout),
 | 
				
			||||||
           "table id", relative_table.get_table_id());
 | 
					           "table_id", relative_table.get_table_id());
 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      // no worry. iter will hold tablet reference and its life cycle is longer than guard
 | 
					      // no worry. iter will hold tablet reference and its life cycle is longer than guard
 | 
				
			||||||
      tablet_ = iter.get_tablet();
 | 
					      tablet_ = iter.get_tablet();
 | 
				
			||||||
      // TODO: check if session is killed
 | 
					 | 
				
			||||||
      if (store_ctx_.timeout_ > 0) {
 | 
					 | 
				
			||||||
        const int64_t query_left_time = store_ctx_.timeout_ - ObTimeUtility::current_time();
 | 
					 | 
				
			||||||
        if (query_left_time <= 0) {
 | 
					 | 
				
			||||||
          ret = OB_TRANS_STMT_TIMEOUT;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user