[FEAT MERGE] load local files

This commit is contained in:
hnwyllmm
2024-02-07 20:25:14 +00:00
committed by ob-robot
parent c7fe4c3f69
commit acd0ec6efd
45 changed files with 1671 additions and 503 deletions

View File

@ -9,7 +9,6 @@
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/cmd/ob_load_data_direct_impl.h"
@ -20,6 +19,7 @@
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "observer/table_load/ob_table_load_task.h"
#include "observer/table_load/ob_table_load_task_scheduler.h"
#include "observer/mysql/ob_query_driver.h"
#include "share/schema/ob_schema_getter_guard.h"
#include "share/ob_device_manager.h"
#include "share/backup/ob_backup_io_adapter.h"
@ -226,208 +226,6 @@ int ObLoadDataDirectImpl::Logger::log_error_line(const ObString &file_name, int6
return ret;
}
/**
* RandomFileReader
*/
ObLoadDataDirectImpl::RandomFileReader::RandomFileReader() : is_inited_(false)
{
}
ObLoadDataDirectImpl::RandomFileReader::~RandomFileReader()
{
}
int ObLoadDataDirectImpl::RandomFileReader::open(const DataAccessParam &data_access_param, const ObString &filename)
{
int ret = OB_SUCCESS;
UNUSED(data_access_param);
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("RandomFileReader init twice", KR(ret), KP(this));
} else if (OB_FAIL(file_reader_.open(filename.ptr(), false))) {
LOG_WARN("fail to open file", KR(ret), K(filename));
} else {
filename_ = filename;
is_inited_ = true;
}
return ret;
}
int ObLoadDataDirectImpl::RandomFileReader::pread(char *buf, int64_t count, int64_t offset, int64_t &read_size)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("RandomFileReader not init", KR(ret), KP(this));
} else if (OB_FAIL(file_reader_.pread(buf, count, offset, read_size))) {
LOG_WARN("fail to pread file buf", KR(ret), K(count), K(offset), K(read_size));
}
return ret;
}
int ObLoadDataDirectImpl::RandomFileReader::get_file_size(int64_t &file_size)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("RandomFileReader not init", KR(ret), KP(this));
} else {
file_size = ::get_file_size(filename_.ptr());
}
return ret;
}
/**
* RandomOSSReader
*/
ObLoadDataDirectImpl::RandomOSSReader::RandomOSSReader() : device_handle_(nullptr), is_inited_(false)
{
}
ObLoadDataDirectImpl::RandomOSSReader::~RandomOSSReader()
{
if (fd_.is_valid()) {
device_handle_->close(fd_);
fd_.reset();
}
if (nullptr != device_handle_) {
common::ObDeviceManager::get_instance().release_device(device_handle_);
device_handle_ = nullptr;
}
}
int ObLoadDataDirectImpl::RandomOSSReader::open(const DataAccessParam &data_access_param,
const ObString &filename)
{
int ret = OB_SUCCESS;
ObIODOpt opt;
ObIODOpts iod_opts;
ObBackupIoAdapter util;
iod_opts.opts_ = &opt;
iod_opts.opt_cnt_ = 0;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("RandomOSSReader init twice", KR(ret), KP(this));
} else if (OB_FAIL(
util.get_and_init_device(device_handle_, &data_access_param.access_info_, filename))) {
LOG_WARN("fail to get device manager", KR(ret), K(filename));
} else if (OB_FAIL(util.set_access_type(&iod_opts, false, 1))) {
LOG_WARN("fail to set access type", KR(ret));
} else if (OB_FAIL(device_handle_->open(to_cstring(filename), -1, 0, fd_, &iod_opts))) {
LOG_WARN("fail to open oss file", KR(ret), K(filename));
} else {
is_inited_ = true;
}
return ret;
}
int ObLoadDataDirectImpl::RandomOSSReader::pread(char *buf, int64_t count, int64_t offset,
int64_t &read_size)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("RandomOSSReader not init", KR(ret), KP(this));
} else if (OB_FAIL(device_handle_->pread(fd_, offset, count, buf, read_size))) {
LOG_WARN("fail to pread oss buf", KR(ret), K(offset), K(count), K(read_size));
}
return ret;
}
int ObLoadDataDirectImpl::RandomOSSReader::get_file_size(int64_t &file_size)
{
int ret = OB_SUCCESS;
ObBackupIoAdapter util;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("RandomOSSReader not init", KR(ret), KP(this));
} else if (OB_FAIL(util.get_file_size(device_handle_, fd_, file_size))) {
LOG_WARN("fail to get oss file size", KR(ret), K(file_size));
}
return ret;
}
/**
* SequentialDataAccessor
*/
ObLoadDataDirectImpl::SequentialDataAccessor::SequentialDataAccessor()
: random_io_device_(nullptr), offset_(0), is_inited_(false)
{
}
ObLoadDataDirectImpl::SequentialDataAccessor::~SequentialDataAccessor()
{
}
int ObLoadDataDirectImpl::SequentialDataAccessor::init(const DataAccessParam &data_access_param,
const ObString &filename)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObLoadDataDirectImpl::SequentialDataAccessor init twice", KR(ret), KP(this));
} else if (OB_UNLIKELY(!data_access_param.is_valid() || filename.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(data_access_param), K(filename));
} else {
if (data_access_param.file_location_ == ObLoadFileLocation::SERVER_DISK) {
if (OB_FAIL(random_file_reader_.open(data_access_param, filename))) {
LOG_WARN("fail to open random file reader", KR(ret), K(filename));
} else {
random_io_device_ = &random_file_reader_;
}
} else if (data_access_param.file_location_ == ObLoadFileLocation::OSS) {
if (OB_FAIL(random_oss_reader_.open(data_access_param, filename))) {
LOG_WARN("fail to open random oss reader", KR(ret), K(filename));
} else {
random_io_device_ = &random_oss_reader_;
}
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not supported load file location", KR(ret), K(data_access_param.file_location_));
FORWARD_USER_ERROR_MSG(ret, "not supported load file location");
}
if (OB_SUCC(ret)) {
is_inited_ = true;
}
}
return ret;
}
int ObLoadDataDirectImpl::SequentialDataAccessor::read(char *buf, int64_t count, int64_t &read_size)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObLoadDataDirectImpl::SequentialDataAccessor not init", KR(ret), KP(this));
} else if (OB_UNLIKELY(nullptr == buf || count <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(buf), K(count));
} else {
if (OB_FAIL(random_io_device_->pread(buf, count, offset_, read_size))) {
LOG_WARN("fail to do pread", KR(ret), K(offset_));
} else {
offset_ += read_size;
}
}
return ret;
}
int ObLoadDataDirectImpl::SequentialDataAccessor::get_file_size(int64_t &file_size)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObLoadDataDirectImpl::SequentialDataAccessor not init", KR(ret), KP(this));
} else if (OB_FAIL(random_io_device_->get_file_size(file_size))) {
LOG_WARN("fail to get random io device file size", KR(ret), K(file_size));
}
return ret;
}
/**
* DataDescIterator
*/
@ -637,10 +435,23 @@ void ObLoadDataDirectImpl::DataBuffer::swap(DataBuffer &other)
*/
ObLoadDataDirectImpl::DataReader::DataReader()
: execute_ctx_(nullptr), end_offset_(0), read_raw_(false), is_iter_end_(false), is_inited_(false)
: execute_ctx_(nullptr),
file_reader_(nullptr),
end_offset_(0),
read_raw_(false),
is_iter_end_(false),
is_inited_(false)
{
}
ObLoadDataDirectImpl::DataReader::~DataReader()
{
if (OB_NOT_NULL(file_reader_)) {
file_reader_->~ObFileReader();
file_reader_ = nullptr;
}
}
int ObLoadDataDirectImpl::DataReader::init(const DataAccessParam &data_access_param,
LoadExecuteContext &execute_ctx,
const DataDesc &data_desc, bool read_raw)
@ -665,13 +476,28 @@ int ObLoadDataDirectImpl::DataReader::init(const DataAccessParam &data_access_pa
}
if (OB_SUCC(ret)) {
end_offset_ = data_desc.end_;
if (OB_FAIL(io_accessor_.init(data_access_param, data_desc.filename_))) {
LOG_WARN("fail to init io device", KR(ret), K(data_desc));
} else if (end_offset_ == -1 && OB_FAIL(io_accessor_.get_file_size(end_offset_))) {
LOG_WARN("fail to get file size", KR(ret), K(data_desc));
} else {
io_accessor_.seek(data_desc.start_);
ATOMIC_AAF(&execute_ctx_->job_stat_->total_bytes_, (end_offset_ - data_desc.start_));
ObFileReadParam file_read_param;
file_read_param.file_location_ = data_access_param.file_location_;
file_read_param.filename_ = data_desc.filename_;
file_read_param.access_info_ = data_access_param.access_info_;
file_read_param.packet_handle_ = &execute_ctx.exec_ctx_.get_session_info()->get_pl_query_sender()->get_packet_sender();
file_read_param.session_ = execute_ctx.exec_ctx_.get_session_info();
file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts();
if (OB_FAIL(ObFileReader::open(file_read_param, *execute_ctx_->allocator_, file_reader_))) {
LOG_WARN("failed to open file", KR(ret), K(data_desc));
} else if (file_reader_->seekable()) {
if (end_offset_ == -1 && OB_FAIL(file_reader_->get_file_size(end_offset_))) {
LOG_WARN("fail to get file size", KR(ret), K(data_desc));
} else {
file_reader_->seek(data_desc.start_);
ATOMIC_AAF(&execute_ctx_->job_stat_->total_bytes_, (end_offset_ - data_desc.start_));
}
} else if (data_desc.start_ != 0) {
ret = OB_NOT_SUPPORTED; // should not happen
LOG_WARN("file reader asked to seek while not supported by unseekable file", KR(ret), K(file_read_param));
}
}
if (OB_SUCC(ret)) {
@ -681,6 +507,32 @@ int ObLoadDataDirectImpl::DataReader::init(const DataAccessParam &data_access_pa
return ret;
}
int ObLoadDataDirectImpl::DataReader::read_buffer(ObLoadFileBuffer &file_buffer)
{
int ret = OB_SUCCESS;
int64_t read_count = file_buffer.get_remain_len();
if (0 == read_count) {
ret = OB_BUF_NOT_ENOUGH;
LOG_WARN("cannot read more data as buffer is full", KR(ret));
} else if (!is_end_file()) {
int64_t read_size = 0;
if (end_offset_ > 0 && read_count > (end_offset_ - file_reader_->get_offset())) {
read_count = end_offset_ - file_reader_->get_offset();
}
if (OB_FAIL(file_reader_->readn(file_buffer.current_ptr(), read_count, read_size))) {
LOG_WARN("fail to read file", KR(ret));
} else if (0 == read_size) {
LOG_TRACE("read nothing", K(is_end_file()));
} else {
file_buffer.update_pos(read_size); // 更新buffer中数据长度
LOG_TRACE("read file sucess", K(read_size));
ATOMIC_AAF(&execute_ctx_->job_stat_->read_bytes_, read_size);
}
}
return ret;
}
int ObLoadDataDirectImpl::DataReader::get_next_buffer(ObLoadFileBuffer &file_buffer,
int64_t &line_count, int64_t limit)
{
@ -704,25 +556,14 @@ int ObLoadDataDirectImpl::DataReader::get_next_buffer(ObLoadFileBuffer &file_buf
if (OB_FAIL(data_trimer_.recover_incomplate_data(file_buffer))) {
LOG_WARN("fail to recover incomplate data", KR(ret));
}
// 2. 从文件里读取后续的数据
else if (!is_end_file()) {
int64_t read_count = 0;
int64_t read_size = 0;
if (FALSE_IT(read_count =
MIN(file_buffer.get_remain_len(), end_offset_ - io_accessor_.get_offset()))) {
} else if (OB_FAIL(io_accessor_.read(file_buffer.current_ptr(), read_count, read_size))) {
LOG_WARN("fail to read file", KR(ret));
} else if (OB_UNLIKELY(read_count != read_size)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected read size", KR(ret), K(read_count), K(read_size), K(end_offset_));
} else {
file_buffer.update_pos(read_size); // 更新buffer中数据长度
ATOMIC_AAF(&execute_ctx_->job_stat_->read_bytes_, read_size);
}
}
// 3. 从buffer中找出完整的行,剩下的数据缓存到data_trimer
// 2. 读取数据,然后从buffer中找出完整的行,剩下的数据缓存到data_trimer
if (OB_SUCC(ret)) {
if (!file_buffer.is_valid()) {
int64_t complete_cnt = limit;
int64_t complete_len = 0;
if (OB_FAIL(read_buffer(file_buffer))) {
LOG_WARN("failed to read buffer as there is not enough data to parse", KR(ret));
} else if (!file_buffer.is_valid()) {
is_iter_end_ = true;
ret = OB_ITER_END;
} else {
@ -760,19 +601,37 @@ int ObLoadDataDirectImpl::DataReader::get_next_raw_buffer(DataBuffer &data_buffe
} else if (is_end_file()) {
ret = OB_ITER_END;
} else if (data_buffer.get_remain_length() > 0) {
const int64_t read_count =
MIN(data_buffer.get_remain_length(), end_offset_ - io_accessor_.get_offset());
int64_t read_count = data_buffer.get_remain_length();
if (file_reader_->seekable() && read_count > end_offset_ - file_reader_->get_offset()) {
read_count = end_offset_ - file_reader_->get_offset();
}
int64_t read_size = 0;
if (OB_FAIL(io_accessor_.read(data_buffer.data() + data_buffer.get_data_length(), read_count,
if (OB_FAIL(file_reader_->readn(data_buffer.data() + data_buffer.get_data_length(), read_count,
read_size))) {
LOG_WARN("fail to read file", KR(ret));
} else if (OB_UNLIKELY(read_count != read_size)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected read size", KR(ret), K(read_count), K(read_size), K(end_offset_));
} else {
} else if (read_size > 0) {
data_buffer.update_data_length(read_size);
ATOMIC_AAF(&execute_ctx_->job_stat_->read_bytes_, read_size);
}
} else {
// read_size == 0
if (is_end_file()) {
ret = OB_ITER_END;
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("read buffer got unexpected bytes.", K(read_size), K(read_count), K(ret));
}
}
}
return ret;
}
bool ObLoadDataDirectImpl::DataReader::is_end_file() const
{
bool ret = false;
if (file_reader_->eof()) {
ret = true;
} else if (end_offset_ > 0) {
ret = file_reader_->get_offset() >= end_offset_;
}
return ret;
}
@ -931,16 +790,32 @@ int ObLoadDataDirectImpl::SimpleDataSplitUtils::split(const DataAccessParam &dat
data_access_param.file_cs_type_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected data format", KR(ret), K(data_access_param));
} else if (1 == count) {
} else if (1 == count || (ObLoadFileLocation::CLIENT_DISK == data_access_param.file_location_)) {
if (OB_FAIL(data_desc_iter.add_data_desc(data_desc))) {
LOG_WARN("fail to push back", KR(ret));
}
} else {
ObArenaAllocator allocator;
allocator.set_tenant_id(MTL_ID());
int64_t end_offset = data_desc.end_;
SequentialDataAccessor io_device;
if (OB_FAIL(io_device.init(data_access_param, data_desc.filename_))) {
LOG_WARN("fail to init io device", KR(ret), K(data_desc.filename_));
} else if (-1 == end_offset && OB_FAIL(io_device.get_file_size(end_offset))) {
ObFileReadParam file_read_param;
file_read_param.file_location_ = data_access_param.file_location_;
file_read_param.filename_ = data_desc.filename_;
file_read_param.access_info_ = data_access_param.access_info_;
file_read_param.packet_handle_ = NULL;
file_read_param.session_ = NULL;
file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts();
ObFileReader *file_reader = NULL;
if (OB_FAIL(ObFileReader::open(file_read_param, allocator, file_reader))) {
LOG_WARN("failed to open file.", KR(ret), K(data_desc));
} else if (!file_reader->seekable()) {
if (OB_FAIL(data_desc_iter.add_data_desc(data_desc))) {
LOG_WARN("fail to push back", KR(ret));
}
} else if (-1 == end_offset && OB_FAIL(file_reader->get_file_size(end_offset))) {
LOG_WARN("fail to get io device file size", KR(ret), K(end_offset));
} else {
const int64_t file_size = end_offset - data_desc.start_;
@ -953,26 +828,24 @@ int ObLoadDataDirectImpl::SimpleDataSplitUtils::split(const DataAccessParam &dat
const char line_term_char = data_access_param.file_format_.line_term_str_.ptr()[0];
const int64_t buf_size = (128LL << 10) + 1;
const int64_t split_size = file_size / count;
ObArenaAllocator allocator;
char *buf = nullptr;
int64_t read_size = 0;
DataDesc data_desc_ret;
data_desc_ret.file_idx_ = data_desc.file_idx_;
data_desc_ret.filename_ = data_desc.filename_;
data_desc_ret.start_ = data_desc.start_;
allocator.set_tenant_id(MTL_ID());
if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(buf_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc memory", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < count - 1; ++i) {
int64_t read_offset = data_desc.start_ + split_size * (i + 1);
io_device.seek(read_offset);
file_reader->seek(read_offset);
char *found = nullptr;
while (OB_SUCC(ret) && end_offset > io_device.get_offset() && nullptr == found) {
read_offset = io_device.get_offset();
while (OB_SUCC(ret) && end_offset > file_reader->get_offset() && nullptr == found) {
read_offset = file_reader->get_offset();
const int64_t read_count = MIN(end_offset - read_offset, buf_size - 1);
if (OB_FAIL(io_device.read(buf, read_count, read_size))) {
if (OB_FAIL(file_reader->readn(buf, read_count, read_size))) {
LOG_WARN("fail to do read", KR(ret), K(read_offset), K(read_count));
} else if (OB_UNLIKELY(read_count != read_size)) {
ret = OB_ERR_UNEXPECTED;
@ -1004,6 +877,11 @@ int ObLoadDataDirectImpl::SimpleDataSplitUtils::split(const DataAccessParam &dat
}
}
}
if (OB_NOT_NULL(file_reader)) {
file_reader->~ObFileReader();
allocator.free(file_reader);
}
}
return ret;
}
@ -1139,6 +1017,7 @@ int ObLoadDataDirectImpl::FileLoadExecutor::execute()
LOG_WARN("fail to prepare execute", KR(ret));
}
LOG_TRACE("file load executor prepare execute done", K(ret));
while (OB_SUCC(ret) && OB_SUCC(execute_ctx_->exec_ctx_.check_status())) {
TaskHandle *handle = nullptr;
if (OB_FAIL(get_next_task_handle(handle))) {
@ -1177,8 +1056,9 @@ int ObLoadDataDirectImpl::FileLoadExecutor::execute()
LOG_WARN("fail to handle all task result", KR(ret));
}
}
}
LOG_TRACE("large file load executor init done", K(ret));
return ret;
}