Add retry in direct load for read/write tmp file
This commit is contained in:
		@ -191,10 +191,8 @@ int ObDirectLoadDataBlockReader<Header, T>::read_next_buffer()
 | 
			
		||||
    buf_size_ = data_size;
 | 
			
		||||
    // read buffer
 | 
			
		||||
    const int64_t read_size = MIN(buf_capacity_ - buf_size_, read_size_);
 | 
			
		||||
    if (OB_FAIL(file_io_handle_.aio_pread(buf_ + buf_size_, read_size, offset_))) {
 | 
			
		||||
      STORAGE_LOG(WARN, "fail to do aio read from tmp file", KR(ret));
 | 
			
		||||
    } else if (OB_FAIL(file_io_handle_.wait())) {
 | 
			
		||||
      STORAGE_LOG(WARN, "fail to wait io finish", KR(ret), K(io_timeout_ms_));
 | 
			
		||||
    if (OB_FAIL(file_io_handle_.pread(buf_ + buf_size_, read_size, offset_))) {
 | 
			
		||||
      STORAGE_LOG(WARN, "fail to do pread from tmp file", KR(ret));
 | 
			
		||||
    } else {
 | 
			
		||||
      buf_size_ += read_size;
 | 
			
		||||
      offset_ += read_size;
 | 
			
		||||
 | 
			
		||||
@ -216,7 +216,7 @@ int ObDirectLoadDataBlockWriter<Header, T>::flush_buffer()
 | 
			
		||||
    int64_t buf_size = 0;
 | 
			
		||||
    if (OB_FAIL(data_block_writer_.build_data_block(buf, buf_size))) {
 | 
			
		||||
      STORAGE_LOG(WARN, "fail to build data block", KR(ret));
 | 
			
		||||
    } else if (OB_FAIL(file_io_handle_.aio_write(buf, data_block_size_))) {
 | 
			
		||||
    } else if (OB_FAIL(file_io_handle_.write(buf, data_block_size_))) {
 | 
			
		||||
      STORAGE_LOG(WARN, "fail to do aio write tmp file", KR(ret));
 | 
			
		||||
    } else if (nullptr != callback_ && OB_FAIL(callback_->write(buf, data_block_size_, offset_))) {
 | 
			
		||||
      STORAGE_LOG(WARN, "fail to callback write", KR(ret));
 | 
			
		||||
@ -245,7 +245,7 @@ int ObDirectLoadDataBlockWriter<Header, T>::flush_extra_buffer(const T &item)
 | 
			
		||||
          data_block_writer_.build_data_block(item, extra_buf_, extra_buf_size_, data_size))) {
 | 
			
		||||
      STORAGE_LOG(WARN, "fail to build data block", KR(ret));
 | 
			
		||||
    } else if (FALSE_IT(data_block_size = ALIGN_UP(data_size, DIO_ALIGN_SIZE))) {
 | 
			
		||||
    } else if (OB_FAIL(file_io_handle_.aio_write(extra_buf_, data_block_size))) {
 | 
			
		||||
    } else if (OB_FAIL(file_io_handle_.write(extra_buf_, data_block_size))) {
 | 
			
		||||
      STORAGE_LOG(WARN, "fail to do aio write tmp file", KR(ret));
 | 
			
		||||
    } else if (nullptr != callback_ &&
 | 
			
		||||
               OB_FAIL(callback_->write(extra_buf_, data_block_size, offset_))) {
 | 
			
		||||
 | 
			
		||||
@ -410,10 +410,10 @@ int ObDirectLoadDataBlockWriter2::close()
 | 
			
		||||
    LOG_WARN("ObDirectLoadDataBlockWriter2 is closed", KR(ret));
 | 
			
		||||
  } else if (buf_pos_ > header_length_ && OB_FAIL(flush_buffer(buf_size_, buf_))) {
 | 
			
		||||
    LOG_WARN("fail to flush buffer", KR(ret));
 | 
			
		||||
  } else if (OB_FAIL(file_io_handle_.wait())) {
 | 
			
		||||
    LOG_WARN("fail to wait io finish", KR(ret));
 | 
			
		||||
  } else {
 | 
			
		||||
    reset();
 | 
			
		||||
  }
 | 
			
		||||
  if (OB_SUCC(ret)) {
 | 
			
		||||
    is_closed_ = true;
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
@ -428,7 +428,7 @@ int ObDirectLoadDataBlockWriter2::flush_buffer(int64_t buf_size, char *buf)
 | 
			
		||||
  if (OB_FAIL(header_.serialize(buf, buf_size, pos))) {
 | 
			
		||||
    LOG_WARN("fail to serialize data block header", KR(ret), K(buf_size), KP(buf_), K(pos));
 | 
			
		||||
  } else {
 | 
			
		||||
    if (OB_FAIL(file_io_handle_.aio_write(buf, buf_size))) {
 | 
			
		||||
    if (OB_FAIL(file_io_handle_.write(buf, buf_size))) {
 | 
			
		||||
      LOG_WARN("fail to do aio write data file", KR(ret));
 | 
			
		||||
    } else {
 | 
			
		||||
      ObDirectLoadIndexBlockItem item;
 | 
			
		||||
@ -560,7 +560,7 @@ int ObDirectLoadIndexBlockWriter::flush_buffer()
 | 
			
		||||
  if (OB_FAIL(header_.serialize(buf_, buf_size_, pos))) {
 | 
			
		||||
    LOG_WARN("fail to serialize data block header", KR(ret), K(buf_size_), K(pos), KP(buf_));
 | 
			
		||||
  } else {
 | 
			
		||||
    if (OB_FAIL(file_io_handle_.aio_write(buf_, buf_size_))) {
 | 
			
		||||
    if (OB_FAIL(file_io_handle_.write(buf_, buf_size_))) {
 | 
			
		||||
      LOG_WARN("fail to do aio write index file", KR(ret));
 | 
			
		||||
    } else {
 | 
			
		||||
      header_.start_offset_ = offset_;
 | 
			
		||||
@ -584,10 +584,10 @@ int ObDirectLoadIndexBlockWriter::close()
 | 
			
		||||
    LOG_WARN("ObDirectLoadIndexBlockWriter is closed", KR(ret));
 | 
			
		||||
  } else if (buf_pos_ > header_length_ && OB_FAIL(flush_buffer())) {
 | 
			
		||||
    LOG_WARN("fail to flush buffer", KR(ret));
 | 
			
		||||
  } else if (OB_FAIL(file_io_handle_.wait())) {
 | 
			
		||||
    LOG_WARN("fail to wait io finish", KR(ret));
 | 
			
		||||
  } else {
 | 
			
		||||
    reset();
 | 
			
		||||
  }
 | 
			
		||||
  if (OB_SUCC(ret)) {
 | 
			
		||||
    is_closed_ = true;
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
@ -667,13 +667,9 @@ int ObDirectLoadIndexBlockReader::read_buffer(int64_t idx)
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  uint64_t offset = 0;
 | 
			
		||||
  offset = buf_size_ * idx;
 | 
			
		||||
  if (OB_FAIL(file_io_handle_.aio_pread(buf_, buf_size_, offset))) {
 | 
			
		||||
  if (OB_FAIL(file_io_handle_.pread(buf_, buf_size_, offset))) {
 | 
			
		||||
    if (OB_UNLIKELY(OB_ITER_END != ret)) {
 | 
			
		||||
      LOG_WARN("fail to do aio read from index file", KR(ret));
 | 
			
		||||
    }
 | 
			
		||||
  } else {
 | 
			
		||||
    if (OB_FAIL(file_io_handle_.wait())) {
 | 
			
		||||
      LOG_WARN("fail to wait io finish", KR(ret));
 | 
			
		||||
      LOG_WARN("fail to do pread from index file", KR(ret));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
 | 
			
		||||
@ -168,6 +168,7 @@ int ObDirectLoadTmpFilesHandle::get_file(int64_t idx,
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
ObDirectLoadTmpFileIOHandle::ObDirectLoadTmpFileIOHandle()
 | 
			
		||||
  : tmp_file_(nullptr), is_cancel_(false)
 | 
			
		||||
{
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -178,31 +179,96 @@ ObDirectLoadTmpFileIOHandle::~ObDirectLoadTmpFileIOHandle()
 | 
			
		||||
 | 
			
		||||
void ObDirectLoadTmpFileIOHandle::reset()
 | 
			
		||||
{
 | 
			
		||||
  tmp_file_ = nullptr;
 | 
			
		||||
  file_handle_.reset();
 | 
			
		||||
  io_info_.reset();
 | 
			
		||||
  file_io_handle_.reset();
 | 
			
		||||
  is_cancel_ = false;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObDirectLoadTmpFileIOHandle::open(const ObDirectLoadTmpFileHandle &file_handle)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObDirectLoadTmpFile *tmp_file = file_handle.get_file();
 | 
			
		||||
  int64_t file_size = 0;
 | 
			
		||||
  if (OB_UNLIKELY(!file_handle.is_valid())) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid args", KR(ret), K(file_handle));
 | 
			
		||||
  } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_size(tmp_file->get_file_id().fd_,
 | 
			
		||||
                                                                file_size))) {
 | 
			
		||||
    LOG_WARN("fail to get tmp file size", KR(ret), KPC(tmp_file));
 | 
			
		||||
  } else if (OB_UNLIKELY(file_size != tmp_file->get_file_size())) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("unexpected file size", KR(ret), K(file_size), KPC(tmp_file));
 | 
			
		||||
  } else {
 | 
			
		||||
    reset();
 | 
			
		||||
    if (OB_FAIL(file_handle_.assign(file_handle))) {
 | 
			
		||||
      LOG_WARN("fail to assign file handle", KR(ret));
 | 
			
		||||
    } else {
 | 
			
		||||
      tmp_file_ = tmp_file;
 | 
			
		||||
      io_info_.tenant_id_ = MTL_ID();
 | 
			
		||||
      io_info_.dir_id_ = file_handle_.get_file()->get_file_id().dir_id_;
 | 
			
		||||
      io_info_.fd_ = file_handle_.get_file()->get_file_id().fd_;
 | 
			
		||||
      io_info_.dir_id_ = tmp_file_->get_file_id().dir_id_;
 | 
			
		||||
      io_info_.fd_ = tmp_file_->get_file_id().fd_;
 | 
			
		||||
      io_info_.io_desc_.set_group_id(ObIOModule::DIRECT_LOAD_IO);
 | 
			
		||||
      io_info_.io_timeout_ms_ = GCONF._data_storage_io_timeout / 1000L;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObDirectLoadTmpFileIOHandle::aio_read(char *buf, int64_t size)
 | 
			
		||||
int ObDirectLoadTmpFileIOHandle::check_status()
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_UNLIKELY(is_cancel_)) {
 | 
			
		||||
    ret = OB_CANCELED;
 | 
			
		||||
    LOG_WARN("tmp file io canceled", KR(ret));
 | 
			
		||||
  } else if (OB_UNLIKELY(THIS_WORKER.is_timeout())) {
 | 
			
		||||
    ret = OB_TIMEOUT;
 | 
			
		||||
    LOG_WARN("worker timeout", KR(ret), K(THIS_WORKER.get_timeout_ts()));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObDirectLoadTmpFileIOHandle::pread(char *buf, int64_t size, int64_t offset)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_UNLIKELY(!is_valid())) {
 | 
			
		||||
    ret = OB_FILE_NOT_EXIST;
 | 
			
		||||
    LOG_WARN("tmp file not set", KR(ret));
 | 
			
		||||
  } else if (OB_UNLIKELY(nullptr == buf || size <= 0 || offset < 0)) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid args", KR(ret), KP(buf), K(size), K(offset));
 | 
			
		||||
  } else if (OB_UNLIKELY(offset + size > tmp_file_->get_file_size())) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("unexpected read out of file size", KR(ret), KPC_(tmp_file), K(size), K(offset));
 | 
			
		||||
  } else {
 | 
			
		||||
    int64_t retry_cnt = 0;
 | 
			
		||||
    io_info_.size_ = size;
 | 
			
		||||
    io_info_.buf_ = buf;
 | 
			
		||||
    io_info_.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ);
 | 
			
		||||
    while (OB_SUCC(ret)) {
 | 
			
		||||
      if (OB_FAIL(check_status())) {
 | 
			
		||||
        LOG_WARN("fail to check status", KR(ret));
 | 
			
		||||
      } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(io_info_, offset, file_io_handle_))) {
 | 
			
		||||
        LOG_WARN("fail to do pread from tmp file", KR(ret), K_(io_info), K(offset));
 | 
			
		||||
        if (OB_LIKELY(is_retry_err(ret))) {
 | 
			
		||||
          if (++retry_cnt <= MAX_RETRY_CNT) {
 | 
			
		||||
            ret = OB_SUCCESS;
 | 
			
		||||
            LOG_INFO("retry pread tmp file", K(retry_cnt), K_(io_info), K(size), K(offset));
 | 
			
		||||
          }
 | 
			
		||||
        } else if (OB_ITER_END == ret) {
 | 
			
		||||
          ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
          LOG_WARN("unexpected read out of file size", KR(ret), K_(io_info), K(size), K(offset));
 | 
			
		||||
        }
 | 
			
		||||
      } else {
 | 
			
		||||
        break;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObDirectLoadTmpFileIOHandle::write(char *buf, int64_t size)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_UNLIKELY(!is_valid())) {
 | 
			
		||||
@ -212,13 +278,40 @@ int ObDirectLoadTmpFileIOHandle::aio_read(char *buf, int64_t size)
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid args", KR(ret), KP(buf), K(size));
 | 
			
		||||
  } else {
 | 
			
		||||
    io_info_.size_ = size;
 | 
			
		||||
    int64_t retry_cnt = 0;
 | 
			
		||||
    io_info_.buf_ = buf;
 | 
			
		||||
    io_info_.io_desc_.set_group_id(ObIOModule::DIRECT_LOAD_IO);
 | 
			
		||||
    io_info_.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ);
 | 
			
		||||
    if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_read(io_info_, file_io_handle_))) {
 | 
			
		||||
      if (OB_UNLIKELY(OB_ITER_END != ret)) {
 | 
			
		||||
        LOG_WARN("fail to do aio read from tmp file", KR(ret), K_(io_info));
 | 
			
		||||
    io_info_.size_ = size;
 | 
			
		||||
    io_info_.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_INDEX_BUILD_WRITE);
 | 
			
		||||
    while (OB_SUCC(ret)) {
 | 
			
		||||
      if (OB_FAIL(check_status())) {
 | 
			
		||||
        LOG_WARN("fail to check status", KR(ret));
 | 
			
		||||
      }
 | 
			
		||||
      // TODO(suzhi.yt): 先保留原来的调用, aio_write提交成功就相当于写成功了
 | 
			
		||||
      else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_write(io_info_, file_io_handle_))) {
 | 
			
		||||
        LOG_WARN("fail to do aio write to tmp file", KR(ret), K_(io_info));
 | 
			
		||||
        if (OB_LIKELY(is_retry_err(ret))) {
 | 
			
		||||
          if (++retry_cnt <= MAX_RETRY_CNT) {
 | 
			
		||||
            ret = OB_SUCCESS;
 | 
			
		||||
            int64_t new_file_size = 0;
 | 
			
		||||
            if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_size(tmp_file_->get_file_id().fd_,
 | 
			
		||||
                                                                   new_file_size))) {
 | 
			
		||||
              LOG_WARN("fail to get tmp file size", KR(ret), KPC_(tmp_file));
 | 
			
		||||
            } else {
 | 
			
		||||
              const int64_t write_size = new_file_size - tmp_file_->get_file_size();
 | 
			
		||||
              tmp_file_->inc_file_size(write_size);
 | 
			
		||||
              io_info_.buf_ += write_size;
 | 
			
		||||
              io_info_.size_ -= write_size;
 | 
			
		||||
              if (io_info_.size_ > 0) {
 | 
			
		||||
                LOG_INFO("retry aio write tmp file", K(retry_cnt), K_(io_info));
 | 
			
		||||
              } else {
 | 
			
		||||
                break;
 | 
			
		||||
              }
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      } else {
 | 
			
		||||
        tmp_file_->inc_file_size(io_info_.size_);
 | 
			
		||||
        break;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
@ -234,66 +327,30 @@ int ObDirectLoadTmpFileIOHandle::aio_pread(char *buf, int64_t size, int64_t offs
 | 
			
		||||
  } else if (OB_UNLIKELY(nullptr == buf || size <= 0 || offset < 0)) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid args", KR(ret), KP(buf), K(size), K(offset));
 | 
			
		||||
  } else if (OB_UNLIKELY(offset + size > tmp_file_->get_file_size())) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("unexpected read out of file size", KR(ret), KPC_(tmp_file), K(size), K(offset));
 | 
			
		||||
  } else {
 | 
			
		||||
    int64_t retry_cnt = 0;
 | 
			
		||||
    io_info_.size_ = size;
 | 
			
		||||
    io_info_.buf_ = buf;
 | 
			
		||||
    io_info_.io_desc_.set_group_id(ObIOModule::DIRECT_LOAD_IO);
 | 
			
		||||
    io_info_.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ);
 | 
			
		||||
    if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_pread(io_info_, offset, file_io_handle_))) {
 | 
			
		||||
      if (OB_UNLIKELY(OB_ITER_END != ret)) {
 | 
			
		||||
    while (OB_SUCC(ret)) {
 | 
			
		||||
      if (OB_FAIL(check_status())) {
 | 
			
		||||
        LOG_WARN("fail to check status", KR(ret));
 | 
			
		||||
      } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_pread(io_info_, offset, file_io_handle_))) {
 | 
			
		||||
        LOG_WARN("fail to do aio pread from tmp file", KR(ret), K_(io_info), K(offset));
 | 
			
		||||
        if (OB_LIKELY(is_retry_err(ret))) {
 | 
			
		||||
          if (++retry_cnt <= MAX_RETRY_CNT) {
 | 
			
		||||
            ret = OB_SUCCESS;
 | 
			
		||||
            LOG_INFO("retry aio pread tmp file", K(retry_cnt), K_(io_info), K(offset));
 | 
			
		||||
          }
 | 
			
		||||
        } else if (OB_ITER_END == ret) {
 | 
			
		||||
          ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
          LOG_WARN("unexpected read out of file size", KR(ret), KPC_(tmp_file), K(size), K(offset));
 | 
			
		||||
        }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObDirectLoadTmpFileIOHandle::read(char *buf, int64_t &size)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_UNLIKELY(!is_valid())) {
 | 
			
		||||
    ret = OB_FILE_NOT_EXIST;
 | 
			
		||||
    LOG_WARN("tmp file not set", KR(ret));
 | 
			
		||||
  } else if (OB_UNLIKELY(nullptr == buf || size <= 0)) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid args", KR(ret), KP(buf), K(size));
 | 
			
		||||
      } else {
 | 
			
		||||
    io_info_.size_ = size;
 | 
			
		||||
    io_info_.buf_ = buf;
 | 
			
		||||
    io_info_.io_desc_.set_group_id(ObIOModule::DIRECT_LOAD_IO);
 | 
			
		||||
    io_info_.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ);
 | 
			
		||||
    io_info_.io_timeout_ms_ = GCONF._data_storage_io_timeout / 1000L;
 | 
			
		||||
    if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.read(io_info_, file_io_handle_))) {
 | 
			
		||||
      if (OB_UNLIKELY(OB_ITER_END != ret)) {
 | 
			
		||||
        LOG_WARN("fail to do read from tmp file", KR(ret), K_(io_info));
 | 
			
		||||
      } else {
 | 
			
		||||
        size = file_io_handle_.get_data_size();
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObDirectLoadTmpFileIOHandle::pread(char *buf, int64_t &size, int64_t offset)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_UNLIKELY(!is_valid())) {
 | 
			
		||||
    ret = OB_FILE_NOT_EXIST;
 | 
			
		||||
    LOG_WARN("tmp file not set", KR(ret));
 | 
			
		||||
  } else if (OB_UNLIKELY(nullptr == buf || size <= 0)) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid args", KR(ret), KP(buf), K(size));
 | 
			
		||||
  } else {
 | 
			
		||||
    io_info_.size_ = size;
 | 
			
		||||
    io_info_.buf_ = buf;
 | 
			
		||||
    io_info_.io_desc_.set_group_id(ObIOModule::DIRECT_LOAD_IO);
 | 
			
		||||
    io_info_.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ);
 | 
			
		||||
    io_info_.io_timeout_ms_ = GCONF._data_storage_io_timeout / 1000L;
 | 
			
		||||
    if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(io_info_, offset, file_io_handle_))) {
 | 
			
		||||
      if (OB_UNLIKELY(OB_ITER_END != ret)) {
 | 
			
		||||
        LOG_WARN("fail to do pread from tmp file", KR(ret), K_(io_info), K(offset));
 | 
			
		||||
      } else {
 | 
			
		||||
        size = file_io_handle_.get_data_size();
 | 
			
		||||
        break;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
@ -310,34 +367,41 @@ int ObDirectLoadTmpFileIOHandle::aio_write(char *buf, int64_t size)
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid args", KR(ret), KP(buf), K(size));
 | 
			
		||||
  } else {
 | 
			
		||||
    int64_t retry_cnt = 0;
 | 
			
		||||
    io_info_.size_ = size;
 | 
			
		||||
    io_info_.buf_ = buf;
 | 
			
		||||
    io_info_.io_desc_.set_group_id(ObIOModule::DIRECT_LOAD_IO);
 | 
			
		||||
    io_info_.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_INDEX_BUILD_WRITE);
 | 
			
		||||
    io_info_.io_timeout_ms_ = GCONF._data_storage_io_timeout / 1000L;
 | 
			
		||||
    if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_write(io_info_, file_io_handle_))) {
 | 
			
		||||
    while (OB_SUCC(ret)) {
 | 
			
		||||
      if (OB_FAIL(check_status())) {
 | 
			
		||||
        LOG_WARN("fail to check status", KR(ret));
 | 
			
		||||
      }
 | 
			
		||||
      // aio_write提交成功就相当于写成功了
 | 
			
		||||
      else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_write(io_info_, file_io_handle_))) {
 | 
			
		||||
        LOG_WARN("fail to do aio write to tmp file", KR(ret), K_(io_info));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObDirectLoadTmpFileIOHandle::write(char *buf, int64_t size)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_UNLIKELY(!is_valid())) {
 | 
			
		||||
    ret = OB_FILE_NOT_EXIST;
 | 
			
		||||
    LOG_WARN("tmp file not set", KR(ret));
 | 
			
		||||
  } else if (OB_UNLIKELY(nullptr == buf || size <= 0)) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid args", KR(ret), KP(buf), K(size));
 | 
			
		||||
        if (OB_LIKELY(is_retry_err(ret))) {
 | 
			
		||||
          if (++retry_cnt <= MAX_RETRY_CNT) {
 | 
			
		||||
            ret = OB_SUCCESS;
 | 
			
		||||
            int64_t new_file_size = 0;
 | 
			
		||||
            if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_size(tmp_file_->get_file_id().fd_,
 | 
			
		||||
                                                                   new_file_size))) {
 | 
			
		||||
              LOG_WARN("fail to get tmp file size", KR(ret), KPC_(tmp_file));
 | 
			
		||||
            } else {
 | 
			
		||||
    io_info_.size_ = size;
 | 
			
		||||
    io_info_.buf_ = buf;
 | 
			
		||||
    io_info_.io_desc_.set_group_id(ObIOModule::DIRECT_LOAD_IO);
 | 
			
		||||
    io_info_.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_INDEX_BUILD_WRITE);
 | 
			
		||||
    if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.write(io_info_))) {
 | 
			
		||||
      LOG_WARN("fail to do write to tmp file", KR(ret), K_(io_info));
 | 
			
		||||
              const int64_t write_size = new_file_size - tmp_file_->get_file_size();
 | 
			
		||||
              tmp_file_->inc_file_size(write_size);
 | 
			
		||||
              io_info_.buf_ += write_size;
 | 
			
		||||
              io_info_.size_ -= write_size;
 | 
			
		||||
              if (io_info_.size_ > 0) {
 | 
			
		||||
                LOG_INFO("retry aio write tmp file", K(retry_cnt), K_(io_info));
 | 
			
		||||
              } else {
 | 
			
		||||
                break;
 | 
			
		||||
              }
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      } else {
 | 
			
		||||
        tmp_file_->inc_file_size(io_info_.size_);
 | 
			
		||||
        break;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
@ -346,8 +410,27 @@ int ObDirectLoadTmpFileIOHandle::write(char *buf, int64_t size)
 | 
			
		||||
int ObDirectLoadTmpFileIOHandle::wait()
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_FAIL(file_io_handle_.wait())) {
 | 
			
		||||
  if (OB_UNLIKELY(!is_valid())) {
 | 
			
		||||
    ret = OB_FILE_NOT_EXIST;
 | 
			
		||||
    LOG_WARN("tmp file not set", KR(ret));
 | 
			
		||||
  } else {
 | 
			
		||||
    int64_t retry_cnt = 0;
 | 
			
		||||
    while (OB_SUCC(ret)) {
 | 
			
		||||
      if (OB_FAIL(check_status())) {
 | 
			
		||||
        LOG_WARN("fail to check status", KR(ret));
 | 
			
		||||
      } else if (OB_FAIL(file_io_handle_.wait())) {
 | 
			
		||||
        LOG_WARN("fail to wait io finish", KR(ret));
 | 
			
		||||
        if (OB_LIKELY(is_retry_err(ret))) {
 | 
			
		||||
          if (++retry_cnt <= MAX_RETRY_CNT) {
 | 
			
		||||
            ret = OB_SUCCESS;
 | 
			
		||||
            LOG_INFO("retry wait tmp file io finish", K(retry_cnt), KPC_(tmp_file),
 | 
			
		||||
                     K_(file_io_handle));
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      } else {
 | 
			
		||||
        break;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -39,7 +39,7 @@ class ObDirectLoadTmpFile
 | 
			
		||||
{
 | 
			
		||||
public:
 | 
			
		||||
  ObDirectLoadTmpFile(ObDirectLoadTmpFileManager *file_mgr, const ObDirectLoadTmpFileId &file_id)
 | 
			
		||||
    : file_mgr_(file_mgr), file_id_(file_id), ref_count_(0)
 | 
			
		||||
    : file_mgr_(file_mgr), file_id_(file_id), file_size_(0), ref_count_(0)
 | 
			
		||||
  {
 | 
			
		||||
  }
 | 
			
		||||
  bool is_valid() const { return nullptr != file_mgr_ && file_id_.is_valid(); }
 | 
			
		||||
@ -48,10 +48,13 @@ public:
 | 
			
		||||
  int64_t dec_ref_count() { return ATOMIC_AAF(&ref_count_, -1); }
 | 
			
		||||
  ObDirectLoadTmpFileManager *get_file_mgr() const { return file_mgr_; }
 | 
			
		||||
  const ObDirectLoadTmpFileId &get_file_id() const { return file_id_; }
 | 
			
		||||
  TO_STRING_KV(KP_(file_mgr), K_(file_id), K_(ref_count));
 | 
			
		||||
  int64_t get_file_size() const { return file_size_; }
 | 
			
		||||
  void inc_file_size(int64_t size) { file_size_ += size; }
 | 
			
		||||
  TO_STRING_KV(KP_(file_mgr), K_(file_id), K_(file_size), K_(ref_count));
 | 
			
		||||
private:
 | 
			
		||||
  ObDirectLoadTmpFileManager *const file_mgr_;
 | 
			
		||||
  const ObDirectLoadTmpFileId file_id_;
 | 
			
		||||
  int64_t file_size_;
 | 
			
		||||
  int64_t ref_count_ CACHE_ALIGNED;
 | 
			
		||||
  DISABLE_COPY_ASSIGN(ObDirectLoadTmpFile);
 | 
			
		||||
};
 | 
			
		||||
@ -94,28 +97,31 @@ private:
 | 
			
		||||
 | 
			
		||||
class ObDirectLoadTmpFileIOHandle final
 | 
			
		||||
{
 | 
			
		||||
  static const uint64_t MAX_RETRY_CNT = 3;
 | 
			
		||||
public:
 | 
			
		||||
  ObDirectLoadTmpFileIOHandle();
 | 
			
		||||
  ~ObDirectLoadTmpFileIOHandle();
 | 
			
		||||
  void reset();
 | 
			
		||||
  bool is_valid() const { return file_handle_.is_valid(); }
 | 
			
		||||
  int open(const ObDirectLoadTmpFileHandle &file_handle);
 | 
			
		||||
  int aio_read(char *buf, int64_t size);
 | 
			
		||||
  int aio_pread(char *buf, int64_t size, int64_t offset);
 | 
			
		||||
  int read(char *buf, int64_t &size);
 | 
			
		||||
  int pread(char *buf, int64_t &size, int64_t offset);
 | 
			
		||||
  int aio_write(char *buf, int64_t size);
 | 
			
		||||
  int pread(char *buf, int64_t size, int64_t offset);
 | 
			
		||||
  int write(char *buf, int64_t size);
 | 
			
		||||
  int aio_pread(char *buf, int64_t size, int64_t offset);
 | 
			
		||||
  int aio_write(char *buf, int64_t size);
 | 
			
		||||
  int wait();
 | 
			
		||||
  // for aio read to get real read size when ret = OB_ITER_END
 | 
			
		||||
  OB_INLINE int64_t get_data_size() { return file_io_handle_.get_data_size(); }
 | 
			
		||||
  OB_INLINE void cancel() { is_cancel_ = true; }
 | 
			
		||||
  static int seek(const ObDirectLoadTmpFileHandle &file_handle, int64_t offset, int whence);
 | 
			
		||||
  static int sync(const ObDirectLoadTmpFileHandle &file_handle, int64_t timeout_ms);
 | 
			
		||||
  TO_STRING_KV(K_(file_handle));
 | 
			
		||||
  static bool is_retry_err(int ret_code) { return OB_TIMEOUT == ret_code; }
 | 
			
		||||
  TO_STRING_KV(K_(file_handle), K_(io_info));
 | 
			
		||||
private:
 | 
			
		||||
  int check_status();
 | 
			
		||||
private:
 | 
			
		||||
  ObDirectLoadTmpFileHandle file_handle_;
 | 
			
		||||
  ObDirectLoadTmpFile *tmp_file_;
 | 
			
		||||
  blocksstable::ObTmpFileIOInfo io_info_;
 | 
			
		||||
  blocksstable::ObTmpFileIOHandle file_io_handle_;
 | 
			
		||||
  bool is_cancel_;
 | 
			
		||||
  DISABLE_COPY_ASSIGN(ObDirectLoadTmpFileIOHandle);
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user