[FEAT MERGE] load data support compress file
Co-authored-by: ant-ob-hengtang <zj458684356@gmail.com>
This commit is contained in:
@ -241,6 +241,3 @@ int ObZstdWrapper::insert_block(void *ctx, const void *block, const size_t block
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -241,6 +241,58 @@ int ObZstdWrapper::insert_block(void *ctx, const void *block, const size_t block
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObZstdWrapper::create_stream_dctx(const OB_ZSTD_customMem &ob_zstd_mem, void *&ctx)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
size_t ret_code = 0;
|
||||||
|
ZSTD_DStream *dctx = NULL;
|
||||||
|
ZSTD_customMem zstd_mem;
|
||||||
|
|
||||||
|
zstd_mem.customAlloc = ob_zstd_mem.customAlloc;
|
||||||
|
zstd_mem.customFree = ob_zstd_mem.customFree;
|
||||||
|
zstd_mem.opaque = ob_zstd_mem.opaque;
|
||||||
|
|
||||||
|
ctx = NULL;
|
||||||
|
|
||||||
|
if (NULL == (dctx = ZSTD_createDStream_advanced(zstd_mem))) {
|
||||||
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
} else {
|
||||||
|
ctx = dctx;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ObZstdWrapper::free_stream_dctx(void *&ctx)
|
||||||
|
{
|
||||||
|
ZSTD_DStream *dctx = static_cast<ZSTD_DStream *>(ctx);
|
||||||
|
ZSTD_freeDStream(dctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObZstdWrapper::decompress_stream(void *ctx, const char *src, const size_t src_size, size_t &consumed_size,
|
||||||
|
char *dest, const size_t dest_capacity, size_t &decompressed_size)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
if (NULL == ctx
|
||||||
|
|| NULL == src
|
||||||
|
|| NULL == dest
|
||||||
|
|| src_size <= 0
|
||||||
|
|| dest_capacity <= 0) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
} else {
|
||||||
|
consumed_size = 0;
|
||||||
|
decompressed_size = 0;
|
||||||
|
|
||||||
|
ZSTD_DStream *dctx = static_cast<ZSTD_DStream *>(ctx);
|
||||||
|
ZSTD_outBuffer output = { dest, dest_capacity, 0 };
|
||||||
|
ZSTD_inBuffer input = { src, src_size, 0 };
|
||||||
|
int zstd_err = ZSTD_decompressStream(dctx, &output, &input);
|
||||||
|
if (0 != ZSTD_isError(zstd_err)) {
|
||||||
|
ret = OB_ERR_COMPRESS_DECOMPRESS_DATA;
|
||||||
|
} else {
|
||||||
|
consumed_size = input.pos;
|
||||||
|
decompressed_size = output.pos;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|||||||
@ -60,6 +60,11 @@ public:
|
|||||||
char *dest, const size_t dest_capacity, size_t &decompressed_size);
|
char *dest, const size_t dest_capacity, size_t &decompressed_size);
|
||||||
static size_t compress_bound(const size_t src_size);
|
static size_t compress_bound(const size_t src_size);
|
||||||
static int insert_block(void *ctx, const void *block, const size_t block_size);
|
static int insert_block(void *ctx, const void *block, const size_t block_size);
|
||||||
|
|
||||||
|
static int create_stream_dctx(const OB_ZSTD_customMem &ob_zstd_mem, void *&ctx);
|
||||||
|
static void free_stream_dctx(void *&ctx);
|
||||||
|
static int decompress_stream(void *ctx, const char *src, const size_t src_size, size_t &consumed_size,
|
||||||
|
char *dest, const size_t dest_capacity, size_t &decompressed_size);
|
||||||
};
|
};
|
||||||
|
|
||||||
#undef OB_PUBLIC_API
|
#undef OB_PUBLIC_API
|
||||||
|
|||||||
20
deps/oblib/src/lib/string/ob_string.h
vendored
20
deps/oblib/src/lib/string/ob_string.h
vendored
@ -383,6 +383,26 @@ public:
|
|||||||
return match;
|
return match;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline bool suffix_match_ci(const ObString &obstr) const
|
||||||
|
{
|
||||||
|
bool match = false;
|
||||||
|
if (data_length_ < obstr.data_length_) {
|
||||||
|
} else if (0 == STRNCASECMP(ptr_ + data_length_ - obstr.data_length_, obstr.ptr_, obstr.data_length_)) {
|
||||||
|
match = true;
|
||||||
|
}
|
||||||
|
return match;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline bool suffix_match_ci(const char *str) const
|
||||||
|
{
|
||||||
|
bool match = false;
|
||||||
|
if (OB_NOT_NULL(str)) {
|
||||||
|
ObString obstr(str);
|
||||||
|
match = suffix_match_ci(obstr);
|
||||||
|
}
|
||||||
|
return match;
|
||||||
|
}
|
||||||
|
|
||||||
inline bool prefix_match(const char *str) const
|
inline bool prefix_match(const char *str) const
|
||||||
{
|
{
|
||||||
obstr_size_t len = 0;
|
obstr_size_t len = 0;
|
||||||
|
|||||||
@ -42,7 +42,7 @@ using namespace omt;
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
ObLoadDataDirectImpl::DataAccessParam::DataAccessParam()
|
ObLoadDataDirectImpl::DataAccessParam::DataAccessParam()
|
||||||
: file_column_num_(0), file_cs_type_(CS_TYPE_INVALID)
|
: file_column_num_(0), file_cs_type_(CS_TYPE_INVALID), compression_format_(ObLoadCompressionFormat::NONE)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -500,6 +500,7 @@ int ObLoadDataDirectImpl::DataReader::init(const DataAccessParam &data_access_pa
|
|||||||
ObFileReadParam file_read_param;
|
ObFileReadParam file_read_param;
|
||||||
file_read_param.file_location_ = data_access_param.file_location_;
|
file_read_param.file_location_ = data_access_param.file_location_;
|
||||||
file_read_param.filename_ = data_desc.filename_;
|
file_read_param.filename_ = data_desc.filename_;
|
||||||
|
file_read_param.compression_format_ = data_access_param.compression_format_;
|
||||||
file_read_param.access_info_ = data_access_param.access_info_;
|
file_read_param.access_info_ = data_access_param.access_info_;
|
||||||
file_read_param.packet_handle_ = &execute_ctx.exec_ctx_.get_session_info()->get_pl_query_sender()->get_packet_sender();
|
file_read_param.packet_handle_ = &execute_ctx.exec_ctx_.get_session_info()->get_pl_query_sender()->get_packet_sender();
|
||||||
file_read_param.session_ = execute_ctx.exec_ctx_.get_session_info();
|
file_read_param.session_ = execute_ctx.exec_ctx_.get_session_info();
|
||||||
@ -810,7 +811,8 @@ int ObLoadDataDirectImpl::SimpleDataSplitUtils::split(const DataAccessParam &dat
|
|||||||
data_access_param.file_cs_type_))) {
|
data_access_param.file_cs_type_))) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected data format", KR(ret), K(data_access_param));
|
LOG_WARN("unexpected data format", KR(ret), K(data_access_param));
|
||||||
} else if (1 == count || (ObLoadFileLocation::CLIENT_DISK == data_access_param.file_location_)) {
|
} else if (1 == count || (ObLoadFileLocation::CLIENT_DISK == data_access_param.file_location_)
|
||||||
|
|| data_access_param.compression_format_ != ObLoadCompressionFormat::NONE) {
|
||||||
if (OB_FAIL(data_desc_iter.add_data_desc(data_desc))) {
|
if (OB_FAIL(data_desc_iter.add_data_desc(data_desc))) {
|
||||||
LOG_WARN("fail to push back", KR(ret));
|
LOG_WARN("fail to push back", KR(ret));
|
||||||
}
|
}
|
||||||
@ -824,6 +826,7 @@ int ObLoadDataDirectImpl::SimpleDataSplitUtils::split(const DataAccessParam &dat
|
|||||||
file_read_param.file_location_ = data_access_param.file_location_;
|
file_read_param.file_location_ = data_access_param.file_location_;
|
||||||
file_read_param.filename_ = data_desc.filename_;
|
file_read_param.filename_ = data_desc.filename_;
|
||||||
file_read_param.access_info_ = data_access_param.access_info_;
|
file_read_param.access_info_ = data_access_param.access_info_;
|
||||||
|
file_read_param.compression_format_ = data_access_param.compression_format_;
|
||||||
file_read_param.packet_handle_ = NULL;
|
file_read_param.packet_handle_ = NULL;
|
||||||
file_read_param.session_ = NULL;
|
file_read_param.session_ = NULL;
|
||||||
file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts();
|
file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts();
|
||||||
@ -2412,6 +2415,7 @@ int ObLoadDataDirectImpl::init_execute_param()
|
|||||||
data_access_param.file_format_ = load_stmt_->get_data_struct_in_file();
|
data_access_param.file_format_ = load_stmt_->get_data_struct_in_file();
|
||||||
data_access_param.file_cs_type_ = load_args.file_cs_type_;
|
data_access_param.file_cs_type_ = load_args.file_cs_type_;
|
||||||
data_access_param.access_info_ = load_args.access_info_;
|
data_access_param.access_info_ = load_args.access_info_;
|
||||||
|
data_access_param.compression_format_ = load_args.compression_format_;
|
||||||
}
|
}
|
||||||
// column_ids_
|
// column_ids_
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
|
|||||||
@ -71,6 +71,7 @@ private:
|
|||||||
int64_t file_column_num_; // number of column in file
|
int64_t file_column_num_; // number of column in file
|
||||||
ObDataInFileStruct file_format_;
|
ObDataInFileStruct file_format_;
|
||||||
common::ObCollationType file_cs_type_;
|
common::ObCollationType file_cs_type_;
|
||||||
|
ObLoadCompressionFormat compression_format_;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct LoadExecuteParam
|
struct LoadExecuteParam
|
||||||
|
|||||||
@ -18,23 +18,57 @@
|
|||||||
#include "rpc/obmysql/ob_i_cs_mem_pool.h"
|
#include "rpc/obmysql/ob_i_cs_mem_pool.h"
|
||||||
#include "rpc/obmysql/ob_mysql_packet.h"
|
#include "rpc/obmysql/ob_mysql_packet.h"
|
||||||
#include "rpc/obmysql/packet/ompk_local_infile.h"
|
#include "rpc/obmysql/packet/ompk_local_infile.h"
|
||||||
|
#include "lib/compress/zstd_1_3_8/ob_zstd_wrapper.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
namespace sql
|
namespace sql
|
||||||
{
|
{
|
||||||
|
|
||||||
|
const ObLabel MEMORY_LABEL = ObLabel("LoadDataReader");
|
||||||
|
|
||||||
|
#define MEMORY_ATTR ObMemAttr(MTL_ID(), MEMORY_LABEL)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ObFileReadParam
|
* ObFileReadParam
|
||||||
*/
|
*/
|
||||||
|
|
||||||
ObFileReadParam::ObFileReadParam()
|
ObFileReadParam::ObFileReadParam()
|
||||||
: packet_handle_(NULL),
|
: compression_format_(ObLoadCompressionFormat::NONE),
|
||||||
|
packet_handle_(NULL),
|
||||||
session_(NULL),
|
session_(NULL),
|
||||||
timeout_ts_(-1)
|
timeout_ts_(-1)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObFileReadParam::parse_compression_format(ObString compression_name, ObString filename, ObLoadCompressionFormat &compression_format)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (compression_name.length() == 0 ||
|
||||||
|
0 == compression_name.case_compare("none")) {
|
||||||
|
compression_format = ObLoadCompressionFormat::NONE;
|
||||||
|
} else if (0 == compression_name.case_compare("gzip")) {
|
||||||
|
compression_format = ObLoadCompressionFormat::GZIP;
|
||||||
|
} else if (0 == compression_name.case_compare("deflate")) {
|
||||||
|
compression_format = ObLoadCompressionFormat::DEFLATE;
|
||||||
|
} else if (0 == compression_name.case_compare("zstd")) {
|
||||||
|
compression_format = ObLoadCompressionFormat::ZSTD;
|
||||||
|
} else if (0 == compression_name.case_compare("auto")) {
|
||||||
|
if (filename.suffix_match_ci(".gz")) {
|
||||||
|
compression_format = ObLoadCompressionFormat::GZIP;
|
||||||
|
} else if (filename.suffix_match_ci(".deflate")) {
|
||||||
|
compression_format = ObLoadCompressionFormat::DEFLATE;
|
||||||
|
} else if (filename.suffix_match_ci(".zst") || filename.suffix_match_ci(".zstd")) {
|
||||||
|
compression_format = ObLoadCompressionFormat::ZSTD;
|
||||||
|
} else {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ObFileReader
|
* ObFileReader
|
||||||
*/
|
*/
|
||||||
@ -45,26 +79,24 @@ int ObFileReader::open(const ObFileReadParam ¶m, ObIAllocator &allocator, Ob
|
|||||||
file_reader = nullptr;
|
file_reader = nullptr;
|
||||||
|
|
||||||
if (param.file_location_ == ObLoadFileLocation::SERVER_DISK) {
|
if (param.file_location_ == ObLoadFileLocation::SERVER_DISK) {
|
||||||
ObRandomFileReader *tmp_reader = OB_NEWx(ObRandomFileReader, &allocator, allocator);
|
ObRandomFileReader *tmp_reader = OB_NEW(ObRandomFileReader, MEMORY_ATTR, allocator);
|
||||||
if (OB_ISNULL(tmp_reader)) {
|
if (OB_ISNULL(tmp_reader)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("failed to create ObRandomFileReader", K(ret));
|
LOG_WARN("failed to create ObRandomFileReader", K(ret));
|
||||||
} else if (OB_FAIL(tmp_reader->open(param.filename_))) {
|
} else if (OB_FAIL(tmp_reader->open(param.filename_))) {
|
||||||
LOG_WARN("fail to open random file reader", KR(ret), K(param.filename_));
|
LOG_WARN("fail to open random file reader", KR(ret), K(param.filename_));
|
||||||
tmp_reader->~ObRandomFileReader();
|
OB_DELETE(ObRandomFileReader, MEMORY_ATTR, tmp_reader);
|
||||||
allocator.free(tmp_reader);
|
|
||||||
} else {
|
} else {
|
||||||
file_reader = tmp_reader;
|
file_reader = tmp_reader;
|
||||||
}
|
}
|
||||||
} else if (param.file_location_ == ObLoadFileLocation::OSS) {
|
} else if (param.file_location_ == ObLoadFileLocation::OSS) {
|
||||||
ObRandomOSSReader *tmp_reader = OB_NEWx(ObRandomOSSReader, &allocator, allocator);
|
ObRandomOSSReader *tmp_reader = OB_NEW(ObRandomOSSReader, MEMORY_ATTR, allocator);
|
||||||
if (OB_ISNULL(tmp_reader)) {
|
if (OB_ISNULL(tmp_reader)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("failed to create RandomOSSReader", K(ret));
|
LOG_WARN("failed to create RandomOSSReader", K(ret));
|
||||||
} else if (OB_FAIL(tmp_reader->open(param.access_info_, param.filename_))) {
|
} else if (OB_FAIL(tmp_reader->open(param.access_info_, param.filename_))) {
|
||||||
LOG_WARN("fail to open random oss reader", KR(ret), K(param.filename_));
|
LOG_WARN("fail to open random oss reader", KR(ret), K(param.filename_));
|
||||||
tmp_reader->~ObRandomOSSReader();
|
OB_DELETE(ObRandomOSSReader, MEMORY_ATTR, tmp_reader);
|
||||||
allocator.free(tmp_reader);
|
|
||||||
} else {
|
} else {
|
||||||
file_reader = tmp_reader;
|
file_reader = tmp_reader;
|
||||||
}
|
}
|
||||||
@ -73,14 +105,13 @@ int ObFileReader::open(const ObFileReadParam ¶m, ObIAllocator &allocator, Ob
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("cannot create packet stream file reader while the packet handle is null", K(ret));
|
LOG_WARN("cannot create packet stream file reader while the packet handle is null", K(ret));
|
||||||
} else {
|
} else {
|
||||||
ObPacketStreamFileReader *tmp_reader = OB_NEWx(ObPacketStreamFileReader, &allocator, allocator);
|
ObPacketStreamFileReader *tmp_reader = OB_NEW(ObPacketStreamFileReader, MEMORY_ATTR, allocator);
|
||||||
if (OB_ISNULL(tmp_reader)) {
|
if (OB_ISNULL(tmp_reader)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("failed to create ObPacketStreamFileReader", K(ret));
|
LOG_WARN("failed to create ObPacketStreamFileReader", K(ret));
|
||||||
} else if (OB_FAIL(tmp_reader->open(param.filename_, *param.packet_handle_, param.session_, param.timeout_ts_))) {
|
} else if (OB_FAIL(tmp_reader->open(param.filename_, *param.packet_handle_, param.session_, param.timeout_ts_))) {
|
||||||
LOG_WARN("failed to open packet stream file reader", KR(ret), K(param.filename_));
|
LOG_WARN("failed to open packet stream file reader", KR(ret), K(param.filename_));
|
||||||
tmp_reader->~ObPacketStreamFileReader();
|
OB_DELETE(ObPacketStreamFileReader, MEMORY_ATTR, tmp_reader);
|
||||||
allocator.free(tmp_reader);
|
|
||||||
} else {
|
} else {
|
||||||
file_reader = tmp_reader;
|
file_reader = tmp_reader;
|
||||||
}
|
}
|
||||||
@ -90,6 +121,36 @@ int ObFileReader::open(const ObFileReadParam ¶m, ObIAllocator &allocator, Ob
|
|||||||
LOG_WARN("not supported load file location", KR(ret), K(param.file_location_));
|
LOG_WARN("not supported load file location", KR(ret), K(param.file_location_));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
ObFileReader *decompress_reader = nullptr;
|
||||||
|
ret = open_decompress_reader(param, allocator, file_reader, decompress_reader);
|
||||||
|
if (OB_SUCC(ret) && OB_NOT_NULL(decompress_reader)) {
|
||||||
|
file_reader = decompress_reader;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObFileReader::open_decompress_reader(const ObFileReadParam ¶m,
|
||||||
|
ObIAllocator &allocator,
|
||||||
|
ObFileReader *source_reader,
|
||||||
|
ObFileReader *&file_reader)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (param.compression_format_ == ObLoadCompressionFormat::NONE) {
|
||||||
|
file_reader = source_reader;
|
||||||
|
} else {
|
||||||
|
ObDecompressFileReader *tmp_reader = OB_NEW(ObDecompressFileReader, MEMORY_ATTR, allocator);
|
||||||
|
if (OB_ISNULL(tmp_reader)) {
|
||||||
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
} else if (OB_FAIL(tmp_reader->open(param, source_reader))) {
|
||||||
|
LOG_WARN("failed to open decompress file reader");
|
||||||
|
OB_DELETE(ObDecompressFileReader, MEMORY_ATTR, tmp_reader);
|
||||||
|
} else {
|
||||||
|
file_reader = tmp_reader;
|
||||||
|
}
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -471,5 +532,378 @@ bool ObPacketStreamFileReader::is_killed() const
|
|||||||
return NULL != session_ && (session_->is_query_killed() || session_->is_zombie());
|
return NULL != session_ && (session_->is_query_killed() || session_->is_zombie());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ObDecompressor
|
||||||
|
*/
|
||||||
|
ObDecompressor::ObDecompressor(ObIAllocator &allocator)
|
||||||
|
: allocator_(allocator)
|
||||||
|
{}
|
||||||
|
|
||||||
|
ObDecompressor::~ObDecompressor()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObDecompressor::create(ObLoadCompressionFormat format, ObIAllocator &allocator, ObDecompressor *&decompressor)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
decompressor = nullptr;
|
||||||
|
|
||||||
|
switch (format) {
|
||||||
|
case ObLoadCompressionFormat::NONE: {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
} break;
|
||||||
|
|
||||||
|
case ObLoadCompressionFormat::GZIP:
|
||||||
|
case ObLoadCompressionFormat::DEFLATE: {
|
||||||
|
decompressor = OB_NEW(ObZlibDecompressor, MEMORY_ATTR, allocator);
|
||||||
|
} break;
|
||||||
|
|
||||||
|
case ObLoadCompressionFormat::ZSTD: {
|
||||||
|
decompressor = OB_NEW(ObZstdDecompressor, MEMORY_ATTR, allocator);
|
||||||
|
} break;
|
||||||
|
|
||||||
|
default: {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("unsupported compression format", K(format));
|
||||||
|
} break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret) && OB_NOT_NULL(decompressor)) {
|
||||||
|
if (OB_FAIL(decompressor->init())) {
|
||||||
|
LOG_WARN("failed to init decompressor", KR(ret));
|
||||||
|
decompressor->destroy();
|
||||||
|
OB_DELETE(ObDecompressor, MEMORY_ATTR, decompressor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ObDecompressFileReader
|
||||||
|
*/
|
||||||
|
const int64_t ObDecompressFileReader::COMPRESSED_DATA_BUFFER_SIZE = 2 * 1024 * 1024;
|
||||||
|
|
||||||
|
ObDecompressFileReader::ObDecompressFileReader(ObIAllocator &allocator)
|
||||||
|
: ObStreamFileReader(allocator)
|
||||||
|
{}
|
||||||
|
|
||||||
|
ObDecompressFileReader::~ObDecompressFileReader()
|
||||||
|
{
|
||||||
|
if (OB_NOT_NULL(source_reader_)) {
|
||||||
|
OB_DELETE(ObFileReader, MEMORY_ATTR, source_reader_);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_NOT_NULL(decompressor_)) {
|
||||||
|
OB_DELETE(ObDecompressor, MEMORY_ATTR, decompressor_);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_NOT_NULL(compressed_data_)) {
|
||||||
|
allocator_.free(compressed_data_);
|
||||||
|
compressed_data_ = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObDecompressFileReader::open(const ObFileReadParam ¶m, ObFileReader *source_reader)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
if (param.compression_format_ == ObLoadCompressionFormat::NONE) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
} else if (OB_FAIL(ObDecompressor::create(param.compression_format_, allocator_, decompressor_))) {
|
||||||
|
LOG_WARN("failed to create decompressor", K(param.compression_format_), K(ret));
|
||||||
|
} else if (OB_ISNULL(compressed_data_ = (char *)allocator_.alloc(COMPRESSED_DATA_BUFFER_SIZE))) {
|
||||||
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
LOG_WARN("failed to allocate buffer.", K(COMPRESSED_DATA_BUFFER_SIZE));
|
||||||
|
} else if (FALSE_IT(source_reader_ = source_reader)) {
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObDecompressFileReader::read(char *buf, int64_t capacity, int64_t &read_size)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
read_size = 0;
|
||||||
|
|
||||||
|
if (OB_ISNULL(source_reader_)) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
} else if (OB_ISNULL(buf) || capacity <= 0) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid argument", KP(buf), K(capacity));
|
||||||
|
} else if (consumed_data_size_ >= compress_data_size_) {
|
||||||
|
if (!source_reader_->eof()) {
|
||||||
|
ret = read_compressed_data();
|
||||||
|
} else {
|
||||||
|
eof_ = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret) && compress_data_size_ > consumed_data_size_) {
|
||||||
|
int64_t consumed_size = 0;
|
||||||
|
ret = decompressor_->decompress(compressed_data_ + consumed_data_size_,
|
||||||
|
compress_data_size_ - consumed_data_size_,
|
||||||
|
consumed_size,
|
||||||
|
buf,
|
||||||
|
capacity,
|
||||||
|
read_size);
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
LOG_WARN("failed to decompress", K(ret));
|
||||||
|
} else {
|
||||||
|
consumed_data_size_ += consumed_size;
|
||||||
|
uncompressed_size_ += read_size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObDecompressFileReader::read_compressed_data()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
char *read_buffer = compressed_data_;
|
||||||
|
if (OB_ISNULL(source_reader_)) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
} else if (OB_UNLIKELY(consumed_data_size_ < compress_data_size_)) {
|
||||||
|
// backup data
|
||||||
|
const int64_t last_data_size = compress_data_size_ - consumed_data_size_;
|
||||||
|
MEMMOVE(compressed_data_, compressed_data_ + consumed_data_size_, last_data_size);
|
||||||
|
read_buffer = compressed_data_ + last_data_size;
|
||||||
|
consumed_data_size_ = 0;
|
||||||
|
compress_data_size_ = last_data_size;
|
||||||
|
} else if (consumed_data_size_ == compress_data_size_) {
|
||||||
|
consumed_data_size_ = 0;
|
||||||
|
compress_data_size_ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
// read data from source reader
|
||||||
|
int64_t read_size = 0;
|
||||||
|
int64_t capability = COMPRESSED_DATA_BUFFER_SIZE - compress_data_size_;
|
||||||
|
ret = source_reader_->read(read_buffer, capability, read_size);
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
compress_data_size_ += read_size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ObZlibDecompressor
|
||||||
|
*/
|
||||||
|
voidpf zlib_alloc(voidpf opaque, uInt items, uInt size)
|
||||||
|
{
|
||||||
|
voidpf ret = NULL;
|
||||||
|
ObIAllocator *allocator = static_cast<ObIAllocator *>(opaque);
|
||||||
|
if (OB_ISNULL(allocator)) {
|
||||||
|
} else {
|
||||||
|
ret = allocator->alloc(items * size);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void zlib_free(voidpf opaque, voidpf address)
|
||||||
|
{
|
||||||
|
ObIAllocator *allocator = static_cast<ObIAllocator *>(opaque);
|
||||||
|
if (OB_ISNULL(allocator)) {
|
||||||
|
free(address);
|
||||||
|
} else {
|
||||||
|
allocator->free(address);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ObZlibDecompressor::ObZlibDecompressor(ObIAllocator &allocator)
|
||||||
|
: ObDecompressor(allocator)
|
||||||
|
{}
|
||||||
|
|
||||||
|
ObZlibDecompressor::~ObZlibDecompressor()
|
||||||
|
{
|
||||||
|
this->destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ObZlibDecompressor::destroy()
|
||||||
|
{
|
||||||
|
if (OB_NOT_NULL(zlib_stream_ptr_)) {
|
||||||
|
z_streamp zstream_ptr = static_cast<z_streamp>(zlib_stream_ptr_);
|
||||||
|
inflateEnd(zstream_ptr);
|
||||||
|
zlib_stream_ptr_ = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObZlibDecompressor::init()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_NOT_NULL(zlib_stream_ptr_)) {
|
||||||
|
ret = OB_INIT_TWICE;
|
||||||
|
} else if (OB_ISNULL(zlib_stream_ptr_ = allocator_.alloc(sizeof(z_stream)))) {
|
||||||
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
LOG_WARN("allocate memory failed: zlib stream object.", K(sizeof(z_stream)));
|
||||||
|
} else {
|
||||||
|
z_streamp zstream_ptr = static_cast<z_streamp>(zlib_stream_ptr_);
|
||||||
|
zstream_ptr->zalloc = zlib_alloc;
|
||||||
|
zstream_ptr->zfree = zlib_free;
|
||||||
|
zstream_ptr->opaque = static_cast<voidpf>(&allocator_);
|
||||||
|
zstream_ptr->avail_in = 0;
|
||||||
|
zstream_ptr->next_in = Z_NULL;
|
||||||
|
|
||||||
|
int zlib_ret = inflateInit2(zstream_ptr, 32 + MAX_WBITS);
|
||||||
|
if (Z_OK != zlib_ret) {
|
||||||
|
ret = OB_ERROR;
|
||||||
|
LIB_LOG(WARN, "failed to inflateInit2", K(zlib_ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObZlibDecompressor::decompress(const char *src, int64_t src_size, int64_t &consumed_size,
|
||||||
|
char *dest, int64_t dest_capacity, int64_t &decompressed_size)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
int zlib_ret = Z_OK;
|
||||||
|
z_streamp zstream_ptr = nullptr;
|
||||||
|
|
||||||
|
if (OB_ISNULL(zlib_stream_ptr_)) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
} else if (OB_ISNULL(src) || src_size <= 0) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid argument", KP(src), K(src_size));
|
||||||
|
} else if (OB_ISNULL(dest) || dest_capacity <= 0) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid argument", KP(dest), K(dest_capacity));
|
||||||
|
} else if (FALSE_IT(zstream_ptr = static_cast<z_streamp>(zlib_stream_ptr_))) {
|
||||||
|
} else if (zstream_need_reset_) {
|
||||||
|
if (Z_OK != (zlib_ret = inflateReset(zstream_ptr))) {
|
||||||
|
ret = OB_ERR_COMPRESS_DECOMPRESS_DATA;
|
||||||
|
LOG_WARN("failed to reset zlib stream", K(zlib_ret));
|
||||||
|
} else {
|
||||||
|
zstream_need_reset_ = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
zstream_ptr->avail_in = src_size;
|
||||||
|
zstream_ptr->next_in = (Bytef *)src;
|
||||||
|
|
||||||
|
int64_t last_avail_in = zstream_ptr->avail_in;
|
||||||
|
int64_t last_total_out = zstream_ptr->total_out;
|
||||||
|
zstream_ptr->next_out = reinterpret_cast<Bytef *>(dest);
|
||||||
|
zstream_ptr->avail_out = dest_capacity;
|
||||||
|
|
||||||
|
zlib_ret = inflate(zstream_ptr, Z_NO_FLUSH);
|
||||||
|
if (Z_OK == zlib_ret || Z_STREAM_END == zlib_ret) {
|
||||||
|
LOG_TRACE("inflate success",
|
||||||
|
K(last_avail_in - zstream_ptr->avail_in),
|
||||||
|
K(zstream_ptr->total_out - last_total_out));
|
||||||
|
|
||||||
|
consumed_size = last_avail_in - zstream_ptr->avail_in;
|
||||||
|
decompressed_size = zstream_ptr->total_out - last_total_out;
|
||||||
|
|
||||||
|
if (Z_STREAM_END == zlib_ret) {
|
||||||
|
LOG_DEBUG("got Z_STREAM_END");
|
||||||
|
zstream_need_reset_ = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ret = OB_ERR_COMPRESS_DECOMPRESS_DATA;
|
||||||
|
LOG_WARN("failed to decompress", K(zlib_ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ObZstdDecompressor
|
||||||
|
*/
|
||||||
|
void *zstd_alloc(void* opaque, size_t size)
|
||||||
|
{
|
||||||
|
void *ret = nullptr;
|
||||||
|
if (OB_ISNULL(opaque)) {
|
||||||
|
} else {
|
||||||
|
ObIAllocator *allocator = static_cast<ObIAllocator *>(opaque);
|
||||||
|
ret = allocator->alloc(size);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void zstd_free(void *opaque, void *address)
|
||||||
|
{
|
||||||
|
if (OB_ISNULL(opaque)) {
|
||||||
|
} else {
|
||||||
|
ObIAllocator *allocator = static_cast<ObIAllocator *>(opaque);
|
||||||
|
allocator->free(address);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ObZstdDecompressor::ObZstdDecompressor(ObIAllocator &allocator)
|
||||||
|
: ObDecompressor(allocator)
|
||||||
|
{}
|
||||||
|
|
||||||
|
ObZstdDecompressor::~ObZstdDecompressor()
|
||||||
|
{
|
||||||
|
this->destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ObZstdDecompressor::destroy()
|
||||||
|
{
|
||||||
|
using ObZstdWrapper = oceanbase::common::zstd_1_3_8::ObZstdWrapper;
|
||||||
|
|
||||||
|
if (OB_NOT_NULL(zstd_stream_context_)) {
|
||||||
|
ObZstdWrapper::free_stream_dctx(zstd_stream_context_);
|
||||||
|
zstd_stream_context_ = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObZstdDecompressor::init()
|
||||||
|
{
|
||||||
|
using OB_ZSTD_customMem = oceanbase::common::zstd_1_3_8::OB_ZSTD_customMem;
|
||||||
|
using ObZstdWrapper = oceanbase::common::zstd_1_3_8::ObZstdWrapper;
|
||||||
|
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
if (OB_NOT_NULL(zstd_stream_context_)) {
|
||||||
|
ret = OB_INIT_TWICE;
|
||||||
|
} else {
|
||||||
|
OB_ZSTD_customMem allocator;
|
||||||
|
allocator.customAlloc = zstd_alloc;
|
||||||
|
allocator.customFree = zstd_free;
|
||||||
|
allocator.opaque = &allocator_;
|
||||||
|
|
||||||
|
ret = ObZstdWrapper::create_stream_dctx(allocator, zstd_stream_context_);
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
LOG_WARN("failed to create zstd stream context", K(ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObZstdDecompressor::decompress(const char *src, int64_t src_size, int64_t &consumed_size,
|
||||||
|
char *dest, int64_t dest_capacity, int64_t &decompressed_size)
|
||||||
|
{
|
||||||
|
using ObZstdWrapper = oceanbase::common::zstd_1_3_8::ObZstdWrapper;
|
||||||
|
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_ISNULL(zstd_stream_context_)) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
} else if (OB_ISNULL(src) || src_size <= 0) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid argument", KP(src), K(src_size));
|
||||||
|
} else if (OB_ISNULL(dest) || dest_capacity <= 0) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid argument", KP(dest), K(dest_capacity));
|
||||||
|
} else {
|
||||||
|
size_t tmp_consumed_size = 0;
|
||||||
|
size_t tmp_decompressed_size = 0;
|
||||||
|
ret = ObZstdWrapper::decompress_stream(zstd_stream_context_,
|
||||||
|
src, src_size, tmp_consumed_size,
|
||||||
|
dest, dest_capacity, tmp_decompressed_size);
|
||||||
|
consumed_size = static_cast<int64_t>(tmp_consumed_size);
|
||||||
|
decompressed_size = static_cast<int64_t>(tmp_decompressed_size);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace sql
|
} // namespace sql
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
|||||||
@ -37,10 +37,14 @@ public:
|
|||||||
public:
|
public:
|
||||||
ObLoadFileLocation file_location_;
|
ObLoadFileLocation file_location_;
|
||||||
ObString filename_;
|
ObString filename_;
|
||||||
|
ObLoadCompressionFormat compression_format_;
|
||||||
share::ObBackupStorageInfo access_info_;
|
share::ObBackupStorageInfo access_info_;
|
||||||
observer::ObIMPPacketSender *packet_handle_;
|
observer::ObIMPPacketSender *packet_handle_;
|
||||||
ObSQLSessionInfo *session_;
|
ObSQLSessionInfo *session_;
|
||||||
int64_t timeout_ts_; // A job always has a deadline and file reading may cost a long time
|
int64_t timeout_ts_; // A job always has a deadline and file reading may cost a long time
|
||||||
|
|
||||||
|
public:
|
||||||
|
static int parse_compression_format(ObString compression_name, ObString filename, ObLoadCompressionFormat &compression_format);
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObFileReader
|
class ObFileReader
|
||||||
@ -89,6 +93,12 @@ public:
|
|||||||
*/
|
*/
|
||||||
static int open(const ObFileReadParam ¶m, ObIAllocator &allocator, ObFileReader *& file_reader);
|
static int open(const ObFileReadParam ¶m, ObIAllocator &allocator, ObFileReader *& file_reader);
|
||||||
|
|
||||||
|
private:
|
||||||
|
static int open_decompress_reader(const ObFileReadParam ¶m,
|
||||||
|
ObIAllocator &allocator,
|
||||||
|
ObFileReader *source_reader,
|
||||||
|
ObFileReader *&file_reader);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
ObIAllocator &allocator_;
|
ObIAllocator &allocator_;
|
||||||
};
|
};
|
||||||
@ -200,6 +210,99 @@ private:
|
|||||||
bool eof_;
|
bool eof_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* base class for stream decompressor
|
||||||
|
*/
|
||||||
|
class ObDecompressor
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit ObDecompressor(ObIAllocator &allocator);
|
||||||
|
virtual ~ObDecompressor();
|
||||||
|
|
||||||
|
virtual int init() = 0;
|
||||||
|
virtual void destroy() = 0;
|
||||||
|
virtual int decompress(const char *src, int64_t src_size, int64_t &consumed_size,
|
||||||
|
char *dest, int64_t dest_capacity, int64_t &decompressed_size) = 0;
|
||||||
|
|
||||||
|
static int create(ObLoadCompressionFormat format, ObIAllocator &allocator, ObDecompressor *&decompressor);
|
||||||
|
|
||||||
|
protected:
|
||||||
|
ObIAllocator &allocator_;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* stream decompress file reader
|
||||||
|
*/
|
||||||
|
class ObDecompressFileReader : public ObStreamFileReader
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit ObDecompressFileReader(ObIAllocator &allocator);
|
||||||
|
virtual ~ObDecompressFileReader();
|
||||||
|
|
||||||
|
int open(const ObFileReadParam ¶m, ObFileReader *source_reader);
|
||||||
|
|
||||||
|
public:
|
||||||
|
int read(char *buf, int64_t capability, int64_t &read_size) override;
|
||||||
|
int64_t get_offset() const override { return uncompressed_size_; }
|
||||||
|
bool eof() const override { return eof_; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
int read_compressed_data();
|
||||||
|
|
||||||
|
protected:
|
||||||
|
ObFileReader *source_reader_ = nullptr;
|
||||||
|
|
||||||
|
char * compressed_data_ = nullptr; /// compressed data buffer
|
||||||
|
int64_t compress_data_size_ = 0; /// the valid data size in compressed data buffer
|
||||||
|
int64_t consumed_data_size_ = 0; /// handled buffer size in the compressed data buffer
|
||||||
|
int64_t uncompressed_size_ = 0; /// decompressed size from compressed data
|
||||||
|
|
||||||
|
bool eof_ = false;
|
||||||
|
|
||||||
|
ObDecompressor *decompressor_ = nullptr;
|
||||||
|
|
||||||
|
static const int64_t COMPRESSED_DATA_BUFFER_SIZE;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* gzip/deflate decompressor
|
||||||
|
*/
|
||||||
|
class ObZlibDecompressor : public ObDecompressor
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit ObZlibDecompressor(ObIAllocator &allocator);
|
||||||
|
virtual ~ObZlibDecompressor();
|
||||||
|
|
||||||
|
int init() override;
|
||||||
|
void destroy() override;
|
||||||
|
|
||||||
|
int decompress(const char *src, int64_t src_size, int64_t &consumed_size,
|
||||||
|
char *dest, int64_t dest_capacity, int64_t &decompressed_size) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
void *zlib_stream_ptr_ = nullptr;
|
||||||
|
bool zstream_need_reset_ = false; // the zstreamptr should be reset if we got Z_STREAM_END
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* zstd decompressor
|
||||||
|
*/
|
||||||
|
class ObZstdDecompressor : public ObDecompressor
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
explicit ObZstdDecompressor(ObIAllocator &allocator);
|
||||||
|
virtual ~ObZstdDecompressor();
|
||||||
|
|
||||||
|
int init() override;
|
||||||
|
void destroy() override;
|
||||||
|
|
||||||
|
int decompress(const char *src, int64_t src_size, int64_t &consumed_size,
|
||||||
|
char *dest, int64_t dest_capacity, int64_t &decompressed_size) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
void *zstd_stream_context_ = nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace sql
|
} // namespace sql
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
|
||||||
|
|||||||
@ -2774,6 +2774,7 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm
|
|||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
file_read_param.file_location_ = load_file_storage;
|
file_read_param.file_location_ = load_file_storage;
|
||||||
file_read_param.filename_ = load_args.file_name_;
|
file_read_param.filename_ = load_args.file_name_;
|
||||||
|
file_read_param.compression_format_ = load_args.compression_format_;
|
||||||
file_read_param.access_info_ = load_args.access_info_;
|
file_read_param.access_info_ = load_args.access_info_;
|
||||||
file_read_param.packet_handle_ = &ctx.get_my_session()->get_pl_query_sender()->get_packet_sender();
|
file_read_param.packet_handle_ = &ctx.get_my_session()->get_pl_query_sender()->get_packet_sender();
|
||||||
file_read_param.session_ = ctx.get_my_session();
|
file_read_param.session_ = ctx.get_my_session();
|
||||||
|
|||||||
@ -466,7 +466,7 @@ END_P SET_VAR DELIMITER
|
|||||||
%type <node> alter_column_behavior opt_set opt_position_column
|
%type <node> alter_column_behavior opt_set opt_position_column
|
||||||
%type <node> alter_system_stmt alter_system_set_parameter_actions alter_system_settp_actions settp_option alter_system_set_parameter_action server_info_list server_info alter_system_reset_parameter_actions alter_system_reset_parameter_action
|
%type <node> alter_system_stmt alter_system_set_parameter_actions alter_system_settp_actions settp_option alter_system_set_parameter_action server_info_list server_info alter_system_reset_parameter_actions alter_system_reset_parameter_action
|
||||||
%type <node> opt_comment opt_as
|
%type <node> opt_comment opt_as
|
||||||
%type <node> column_name relation_name function_name column_label var_name relation_name_or_string row_format_option
|
%type <node> column_name relation_name function_name column_label var_name relation_name_or_string row_format_option compression_name
|
||||||
%type <node> audit_stmt audit_clause op_audit_tail_clause audit_operation_clause audit_all_shortcut_list audit_all_shortcut auditing_on_clause auditing_by_user_clause audit_user_list audit_user audit_user_with_host_name
|
%type <node> audit_stmt audit_clause op_audit_tail_clause audit_operation_clause audit_all_shortcut_list audit_all_shortcut auditing_on_clause auditing_by_user_clause audit_user_list audit_user audit_user_with_host_name
|
||||||
%type <node> opt_hint_list hint_option select_with_opt_hint update_with_opt_hint delete_with_opt_hint hint_list_with_end global_hint transform_hint optimize_hint
|
%type <node> opt_hint_list hint_option select_with_opt_hint update_with_opt_hint delete_with_opt_hint hint_list_with_end global_hint transform_hint optimize_hint
|
||||||
%type <node> create_index_stmt index_name sort_column_list sort_column_key opt_index_option_list index_option opt_sort_column_key_length opt_index_using_algorithm index_using_algorithm visibility_option opt_constraint_name constraint_name create_with_opt_hint index_expr alter_with_opt_hint
|
%type <node> create_index_stmt index_name sort_column_list sort_column_key opt_index_option_list index_option opt_sort_column_key_length opt_index_using_algorithm index_using_algorithm visibility_option opt_constraint_name constraint_name create_with_opt_hint index_expr alter_with_opt_hint
|
||||||
@ -506,7 +506,7 @@ END_P SET_VAR DELIMITER
|
|||||||
%type <node> balance_task_type opt_balance_task_type
|
%type <node> balance_task_type opt_balance_task_type
|
||||||
%type <node> list_expr list_partition_element list_partition_expr list_partition_list list_partition_option opt_list_partition_list opt_list_subpartition_list list_subpartition_list list_subpartition_element drop_partition_name_list
|
%type <node> list_expr list_partition_element list_partition_expr list_partition_list list_partition_option opt_list_partition_list opt_list_subpartition_list list_subpartition_list list_subpartition_element drop_partition_name_list
|
||||||
%type <node> primary_zone_name change_tenant_name_or_tenant_id distribute_method distribute_method_list
|
%type <node> primary_zone_name change_tenant_name_or_tenant_id distribute_method distribute_method_list
|
||||||
%type <node> load_data_stmt opt_load_local opt_duplicate opt_load_charset opt_load_ignore_rows infile_string
|
%type <node> load_data_stmt opt_load_local opt_duplicate opt_compression opt_load_charset opt_load_ignore_rows infile_string
|
||||||
%type <node> lines_or_rows opt_field_or_var_spec field_or_vars_list field_or_vars opt_load_set_spec opt_load_data_extended_option_list load_data_extended_option_list load_data_extended_option
|
%type <node> lines_or_rows opt_field_or_var_spec field_or_vars_list field_or_vars opt_load_set_spec opt_load_data_extended_option_list load_data_extended_option_list load_data_extended_option
|
||||||
%type <node> load_set_list load_set_element load_data_with_opt_hint
|
%type <node> load_set_list load_set_element load_data_with_opt_hint
|
||||||
%type <node> ret_type opt_agg
|
%type <node> ret_type opt_agg
|
||||||
@ -4675,23 +4675,24 @@ NAME_OB
|
|||||||
*****************************************************************************/
|
*****************************************************************************/
|
||||||
load_data_stmt:
|
load_data_stmt:
|
||||||
load_data_with_opt_hint opt_load_local INFILE infile_string opt_duplicate INTO TABLE
|
load_data_with_opt_hint opt_load_local INFILE infile_string opt_duplicate INTO TABLE
|
||||||
relation_factor opt_use_partition opt_load_charset field_opt line_opt opt_load_ignore_rows
|
relation_factor opt_use_partition opt_compression opt_load_charset field_opt line_opt opt_load_ignore_rows
|
||||||
opt_field_or_var_spec opt_load_set_spec opt_load_data_extended_option_list
|
opt_field_or_var_spec opt_load_set_spec opt_load_data_extended_option_list
|
||||||
{
|
{
|
||||||
(void) $9;
|
(void) $9;
|
||||||
malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_DATA, 12,
|
malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_DATA, 13,
|
||||||
$2, /* 0. local */
|
$2, /* 0. local */
|
||||||
$4, /* 1. filename */
|
$4, /* 1. filename */
|
||||||
$5, /* 2. duplicate */
|
$5, /* 2. duplicate */
|
||||||
$8, /* 3. table */
|
$8, /* 3. table */
|
||||||
$10, /* 4. charset */
|
$11, /* 4. charset */
|
||||||
$11, /* 5. field */
|
$12, /* 5. field */
|
||||||
$12, /* 6. line */
|
$13, /* 6. line */
|
||||||
$13, /* 7. ignore rows */
|
$14, /* 7. ignore rows */
|
||||||
$14, /* 8. field or vars */
|
$15, /* 8. field or vars */
|
||||||
$15, /* 9. set field */
|
$16, /* 9. set field */
|
||||||
$1, /* 10. hint */
|
$1, /* 10. hint */
|
||||||
$16 /* 11. extended option list */
|
$17, /* 11. extended option list */
|
||||||
|
$10 /* 12. compression format */
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
;
|
;
|
||||||
@ -4743,6 +4744,26 @@ opt_load_local:
|
|||||||
}
|
}
|
||||||
;
|
;
|
||||||
|
|
||||||
|
opt_compression:
|
||||||
|
/* empty */
|
||||||
|
{
|
||||||
|
$$ = NULL;
|
||||||
|
}
|
||||||
|
| COMPRESSION opt_equal_mark compression_name
|
||||||
|
{
|
||||||
|
(void)$2;
|
||||||
|
malloc_non_terminal_node($$, result->malloc_pool_, T_COMPRESSION, 1, $3);
|
||||||
|
}
|
||||||
|
;
|
||||||
|
|
||||||
|
compression_name:
|
||||||
|
NAME_OB { $$ = $1; }
|
||||||
|
| unreserved_keyword
|
||||||
|
{
|
||||||
|
get_non_reserved_node($$, result->malloc_pool_, @1.first_column, @1.last_column);
|
||||||
|
}
|
||||||
|
;
|
||||||
|
|
||||||
opt_duplicate:
|
opt_duplicate:
|
||||||
/* empty */ { $$= NULL; }
|
/* empty */ { $$= NULL; }
|
||||||
| IGNORE { malloc_terminal_node($$, result->malloc_pool_, T_IGNORE); }
|
| IGNORE { malloc_terminal_node($$, result->malloc_pool_, T_IGNORE); }
|
||||||
|
|||||||
@ -28,6 +28,7 @@
|
|||||||
#include "share/backup/ob_backup_io_adapter.h"
|
#include "share/backup/ob_backup_io_adapter.h"
|
||||||
#include "share/backup/ob_backup_struct.h"
|
#include "share/backup/ob_backup_struct.h"
|
||||||
#include "lib/restore/ob_storage_info.h"
|
#include "lib/restore/ob_storage_info.h"
|
||||||
|
#include "sql/engine/cmd/ob_load_data_file_reader.h"
|
||||||
#include <glob.h>
|
#include <glob.h>
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
@ -43,6 +44,7 @@ LOAD DATA [LOW_PRIORITY | CONCURRENT] [LOCAL] INFILE 'file_name'
|
|||||||
[REPLACE | IGNORE]
|
[REPLACE | IGNORE]
|
||||||
INTO TABLE tbl_name
|
INTO TABLE tbl_name
|
||||||
[PARTITION (partition_name [, partition_name] ...)]
|
[PARTITION (partition_name [, partition_name] ...)]
|
||||||
|
[COMPRESSION [=] compression_format]
|
||||||
[CHARACTER SET charset_name]
|
[CHARACTER SET charset_name]
|
||||||
[{FIELDS | COLUMNS}
|
[{FIELDS | COLUMNS}
|
||||||
[TERMINATED BY 'string']
|
[TERMINATED BY 'string']
|
||||||
@ -215,6 +217,30 @@ int ObLoadDataResolver::resolve(const ParseNode &parse_tree)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
/* opt_compression */
|
||||||
|
ObLoadArgument &load_args = load_stmt->get_load_arguments();
|
||||||
|
const ParseNode *child_node = node->children_[ENUM_OPT_COMPRESSION];
|
||||||
|
if (NULL != child_node) {
|
||||||
|
if (OB_UNLIKELY(1 != child_node->num_child_)
|
||||||
|
|| OB_ISNULL(child_node->children_)
|
||||||
|
|| OB_ISNULL(child_node->children_[0])
|
||||||
|
|| T_COMPRESSION != child_node->type_) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("invalid child node", K(child_node->num_child_));
|
||||||
|
} else {
|
||||||
|
ObString compression_name(static_cast<int32_t>(child_node->children_[0]->str_len_),
|
||||||
|
child_node->children_[0]->str_value_);
|
||||||
|
ret = ObFileReadParam::parse_compression_format(compression_name, load_args.file_name_, load_args.compression_format_);
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "unknown compression format or cannot detect compression format by filename");
|
||||||
|
} else {
|
||||||
|
LOG_TRACE("load data with compression format", K(load_args.compression_format_));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
/* 4. opt_charset */
|
/* 4. opt_charset */
|
||||||
ObLoadArgument &load_args = load_stmt->get_load_arguments();
|
ObLoadArgument &load_args = load_stmt->get_load_arguments();
|
||||||
|
|||||||
@ -79,6 +79,7 @@ private:
|
|||||||
ENUM_OPT_SET_FIELD,
|
ENUM_OPT_SET_FIELD,
|
||||||
ENUM_OPT_HINT,
|
ENUM_OPT_HINT,
|
||||||
ENUM_OPT_EXTENDED_OPTIONS,
|
ENUM_OPT_EXTENDED_OPTIONS,
|
||||||
|
ENUM_OPT_COMPRESSION,
|
||||||
ENUM_TOTAL_COUNT
|
ENUM_TOTAL_COUNT
|
||||||
};
|
};
|
||||||
ObStmtScope current_scope_;
|
ObStmtScope current_scope_;
|
||||||
|
|||||||
@ -18,6 +18,7 @@
|
|||||||
#include "sql/resolver/dml/ob_del_upd_stmt.h"
|
#include "sql/resolver/dml/ob_del_upd_stmt.h"
|
||||||
#include "sql/resolver/dml/ob_hint.h"
|
#include "sql/resolver/dml/ob_hint.h"
|
||||||
#include "share/backup/ob_backup_struct.h"
|
#include "share/backup/ob_backup_struct.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
namespace sql
|
namespace sql
|
||||||
@ -36,6 +37,14 @@ enum class ObLoadFileLocation {
|
|||||||
OSS,
|
OSS,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum class ObLoadCompressionFormat
|
||||||
|
{
|
||||||
|
NONE,
|
||||||
|
GZIP,
|
||||||
|
DEFLATE,
|
||||||
|
ZSTD,
|
||||||
|
};
|
||||||
|
|
||||||
class ObLoadFileIterator
|
class ObLoadFileIterator
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -63,7 +72,8 @@ struct ObLoadArgument
|
|||||||
database_id_(OB_INVALID_INDEX_INT64),
|
database_id_(OB_INVALID_INDEX_INT64),
|
||||||
table_id_(OB_INVALID_INDEX_INT64),
|
table_id_(OB_INVALID_INDEX_INT64),
|
||||||
is_csv_format_(false),
|
is_csv_format_(false),
|
||||||
part_level_(share::schema::PARTITION_LEVEL_MAX)
|
part_level_(share::schema::PARTITION_LEVEL_MAX),
|
||||||
|
compression_format_(ObLoadCompressionFormat::NONE)
|
||||||
|
|
||||||
{}
|
{}
|
||||||
|
|
||||||
@ -81,7 +91,8 @@ struct ObLoadArgument
|
|||||||
K_(database_id),
|
K_(database_id),
|
||||||
K_(table_id),
|
K_(table_id),
|
||||||
K_(is_csv_format),
|
K_(is_csv_format),
|
||||||
K_(file_iter));
|
K_(file_iter),
|
||||||
|
K_(compression_format));
|
||||||
|
|
||||||
void assign(const ObLoadArgument &other) {
|
void assign(const ObLoadArgument &other) {
|
||||||
load_file_storage_ = other.load_file_storage_;
|
load_file_storage_ = other.load_file_storage_;
|
||||||
@ -100,6 +111,7 @@ struct ObLoadArgument
|
|||||||
is_csv_format_ = other.is_csv_format_;
|
is_csv_format_ = other.is_csv_format_;
|
||||||
part_level_ = other.part_level_;
|
part_level_ = other.part_level_;
|
||||||
file_iter_.copy(other.file_iter_);
|
file_iter_.copy(other.file_iter_);
|
||||||
|
compression_format_ = other.compression_format_;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObLoadFileLocation load_file_storage_;
|
ObLoadFileLocation load_file_storage_;
|
||||||
@ -118,6 +130,7 @@ struct ObLoadArgument
|
|||||||
bool is_csv_format_;
|
bool is_csv_format_;
|
||||||
share::schema::ObPartitionLevel part_level_;
|
share::schema::ObPartitionLevel part_level_;
|
||||||
ObLoadFileIterator file_iter_;
|
ObLoadFileIterator file_iter_;
|
||||||
|
ObLoadCompressionFormat compression_format_;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ObDataInFileStruct
|
struct ObDataInFileStruct
|
||||||
|
|||||||
Reference in New Issue
Block a user