diff --git a/deps/oblib/src/lib/compress/zstd/ob_zstd_wrapper.cpp b/deps/oblib/src/lib/compress/zstd/ob_zstd_wrapper.cpp index fd456978cc..51b1252f5f 100644 --- a/deps/oblib/src/lib/compress/zstd/ob_zstd_wrapper.cpp +++ b/deps/oblib/src/lib/compress/zstd/ob_zstd_wrapper.cpp @@ -241,6 +241,3 @@ int ObZstdWrapper::insert_block(void *ctx, const void *block, const size_t block return ret; } - - - diff --git a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_wrapper.cpp b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_wrapper.cpp index 0114c3cd3b..59615dfca5 100644 --- a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_wrapper.cpp +++ b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_wrapper.cpp @@ -241,6 +241,58 @@ int ObZstdWrapper::insert_block(void *ctx, const void *block, const size_t block return ret; } +int ObZstdWrapper::create_stream_dctx(const OB_ZSTD_customMem &ob_zstd_mem, void *&ctx) +{ + int ret = OB_SUCCESS; + size_t ret_code = 0; + ZSTD_DStream *dctx = NULL; + ZSTD_customMem zstd_mem; + zstd_mem.customAlloc = ob_zstd_mem.customAlloc; + zstd_mem.customFree = ob_zstd_mem.customFree; + zstd_mem.opaque = ob_zstd_mem.opaque; + ctx = NULL; + if (NULL == (dctx = ZSTD_createDStream_advanced(zstd_mem))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + } else { + ctx = dctx; + } + return ret; +} + +void ObZstdWrapper::free_stream_dctx(void *&ctx) +{ + ZSTD_DStream *dctx = static_cast(ctx); + ZSTD_freeDStream(dctx); +} + +int ObZstdWrapper::decompress_stream(void *ctx, const char *src, const size_t src_size, size_t &consumed_size, + char *dest, const size_t dest_capacity, size_t &decompressed_size) +{ + int ret = OB_SUCCESS; + + if (NULL == ctx + || NULL == src + || NULL == dest + || src_size <= 0 + || dest_capacity <= 0) { + ret = OB_INVALID_ARGUMENT; + } else { + consumed_size = 0; + decompressed_size = 0; + + ZSTD_DStream *dctx = static_cast(ctx); + ZSTD_outBuffer output = { dest, dest_capacity, 0 }; + ZSTD_inBuffer input = { src, src_size, 0 }; + int zstd_err = ZSTD_decompressStream(dctx, &output, &input); + if (0 != ZSTD_isError(zstd_err)) { + ret = OB_ERR_COMPRESS_DECOMPRESS_DATA; + } else { + consumed_size = input.pos; + decompressed_size = output.pos; + } + } + return ret; +} diff --git a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_wrapper.h b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_wrapper.h index c68cc94b30..752f1faf02 100644 --- a/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_wrapper.h +++ b/deps/oblib/src/lib/compress/zstd_1_3_8/ob_zstd_wrapper.h @@ -60,6 +60,11 @@ public: char *dest, const size_t dest_capacity, size_t &decompressed_size); static size_t compress_bound(const size_t src_size); static int insert_block(void *ctx, const void *block, const size_t block_size); + + static int create_stream_dctx(const OB_ZSTD_customMem &ob_zstd_mem, void *&ctx); + static void free_stream_dctx(void *&ctx); + static int decompress_stream(void *ctx, const char *src, const size_t src_size, size_t &consumed_size, + char *dest, const size_t dest_capacity, size_t &decompressed_size); }; #undef OB_PUBLIC_API diff --git a/deps/oblib/src/lib/string/ob_string.h b/deps/oblib/src/lib/string/ob_string.h index 452f84fa4f..6115bbc220 100644 --- a/deps/oblib/src/lib/string/ob_string.h +++ b/deps/oblib/src/lib/string/ob_string.h @@ -383,6 +383,26 @@ public: return match; } + inline bool suffix_match_ci(const ObString &obstr) const + { + bool match = false; + if (data_length_ < obstr.data_length_) { + } else if (0 == STRNCASECMP(ptr_ + data_length_ - obstr.data_length_, obstr.ptr_, obstr.data_length_)) { + match = true; + } + return match; + } + + inline bool suffix_match_ci(const char *str) const + { + bool match = false; + if (OB_NOT_NULL(str)) { + ObString obstr(str); + match = suffix_match_ci(obstr); + } + return match; + } + inline bool prefix_match(const char *str) const { obstr_size_t len = 0; diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index 970f7fe2ed..a365529359 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -42,7 +42,7 @@ using namespace omt; */ ObLoadDataDirectImpl::DataAccessParam::DataAccessParam() - : file_column_num_(0), file_cs_type_(CS_TYPE_INVALID) + : file_column_num_(0), file_cs_type_(CS_TYPE_INVALID), compression_format_(ObLoadCompressionFormat::NONE) { } @@ -498,12 +498,13 @@ int ObLoadDataDirectImpl::DataReader::init(const DataAccessParam &data_access_pa end_offset_ = data_desc.end_; ObFileReadParam file_read_param; - file_read_param.file_location_ = data_access_param.file_location_; - file_read_param.filename_ = data_desc.filename_; - file_read_param.access_info_ = data_access_param.access_info_; - file_read_param.packet_handle_ = &execute_ctx.exec_ctx_.get_session_info()->get_pl_query_sender()->get_packet_sender(); - file_read_param.session_ = execute_ctx.exec_ctx_.get_session_info(); - file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts(); + file_read_param.file_location_ = data_access_param.file_location_; + file_read_param.filename_ = data_desc.filename_; + file_read_param.compression_format_ = data_access_param.compression_format_; + file_read_param.access_info_ = data_access_param.access_info_; + file_read_param.packet_handle_ = &execute_ctx.exec_ctx_.get_session_info()->get_pl_query_sender()->get_packet_sender(); + file_read_param.session_ = execute_ctx.exec_ctx_.get_session_info(); + file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts(); if (OB_FAIL(ObFileReader::open(file_read_param, allocator_, file_reader_))) { LOG_WARN("failed to open file", KR(ret), K(data_desc)); @@ -810,7 +811,8 @@ int ObLoadDataDirectImpl::SimpleDataSplitUtils::split(const DataAccessParam &dat data_access_param.file_cs_type_))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected data format", KR(ret), K(data_access_param)); - } else if (1 == count || (ObLoadFileLocation::CLIENT_DISK == data_access_param.file_location_)) { + } else if (1 == count || (ObLoadFileLocation::CLIENT_DISK == data_access_param.file_location_) + || data_access_param.compression_format_ != ObLoadCompressionFormat::NONE) { if (OB_FAIL(data_desc_iter.add_data_desc(data_desc))) { LOG_WARN("fail to push back", KR(ret)); } @@ -821,12 +823,13 @@ int ObLoadDataDirectImpl::SimpleDataSplitUtils::split(const DataAccessParam &dat int64_t end_offset = data_desc.end_; ObFileReadParam file_read_param; - file_read_param.file_location_ = data_access_param.file_location_; - file_read_param.filename_ = data_desc.filename_; - file_read_param.access_info_ = data_access_param.access_info_; - file_read_param.packet_handle_ = NULL; - file_read_param.session_ = NULL; - file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts(); + file_read_param.file_location_ = data_access_param.file_location_; + file_read_param.filename_ = data_desc.filename_; + file_read_param.access_info_ = data_access_param.access_info_; + file_read_param.compression_format_ = data_access_param.compression_format_; + file_read_param.packet_handle_ = NULL; + file_read_param.session_ = NULL; + file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts(); ObFileReader *file_reader = NULL; if (OB_FAIL(ObFileReader::open(file_read_param, allocator, file_reader))) { @@ -2412,6 +2415,7 @@ int ObLoadDataDirectImpl::init_execute_param() data_access_param.file_format_ = load_stmt_->get_data_struct_in_file(); data_access_param.file_cs_type_ = load_args.file_cs_type_; data_access_param.access_info_ = load_args.access_info_; + data_access_param.compression_format_ = load_args.compression_format_; } // column_ids_ if (OB_SUCC(ret)) { diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.h b/src/sql/engine/cmd/ob_load_data_direct_impl.h index 8d8873bf5e..8247693537 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.h +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.h @@ -71,6 +71,7 @@ private: int64_t file_column_num_; // number of column in file ObDataInFileStruct file_format_; common::ObCollationType file_cs_type_; + ObLoadCompressionFormat compression_format_; }; struct LoadExecuteParam diff --git a/src/sql/engine/cmd/ob_load_data_file_reader.cpp b/src/sql/engine/cmd/ob_load_data_file_reader.cpp index e0080be034..32861094e0 100644 --- a/src/sql/engine/cmd/ob_load_data_file_reader.cpp +++ b/src/sql/engine/cmd/ob_load_data_file_reader.cpp @@ -18,23 +18,57 @@ #include "rpc/obmysql/ob_i_cs_mem_pool.h" #include "rpc/obmysql/ob_mysql_packet.h" #include "rpc/obmysql/packet/ompk_local_infile.h" +#include "lib/compress/zstd_1_3_8/ob_zstd_wrapper.h" namespace oceanbase { namespace sql { +const ObLabel MEMORY_LABEL = ObLabel("LoadDataReader"); + +#define MEMORY_ATTR ObMemAttr(MTL_ID(), MEMORY_LABEL) + /** * ObFileReadParam */ ObFileReadParam::ObFileReadParam() - : packet_handle_(NULL), + : compression_format_(ObLoadCompressionFormat::NONE), + packet_handle_(NULL), session_(NULL), timeout_ts_(-1) { } +int ObFileReadParam::parse_compression_format(ObString compression_name, ObString filename, ObLoadCompressionFormat &compression_format) +{ + int ret = OB_SUCCESS; + if (compression_name.length() == 0 || + 0 == compression_name.case_compare("none")) { + compression_format = ObLoadCompressionFormat::NONE; + } else if (0 == compression_name.case_compare("gzip")) { + compression_format = ObLoadCompressionFormat::GZIP; + } else if (0 == compression_name.case_compare("deflate")) { + compression_format = ObLoadCompressionFormat::DEFLATE; + } else if (0 == compression_name.case_compare("zstd")) { + compression_format = ObLoadCompressionFormat::ZSTD; + } else if (0 == compression_name.case_compare("auto")) { + if (filename.suffix_match_ci(".gz")) { + compression_format = ObLoadCompressionFormat::GZIP; + } else if (filename.suffix_match_ci(".deflate")) { + compression_format = ObLoadCompressionFormat::DEFLATE; + } else if (filename.suffix_match_ci(".zst") || filename.suffix_match_ci(".zstd")) { + compression_format = ObLoadCompressionFormat::ZSTD; + } else { + ret = OB_INVALID_ARGUMENT; + } + } else { + ret = OB_INVALID_ARGUMENT; + } + return ret; +} + /** * ObFileReader */ @@ -45,26 +79,24 @@ int ObFileReader::open(const ObFileReadParam ¶m, ObIAllocator &allocator, Ob file_reader = nullptr; if (param.file_location_ == ObLoadFileLocation::SERVER_DISK) { - ObRandomFileReader *tmp_reader = OB_NEWx(ObRandomFileReader, &allocator, allocator); + ObRandomFileReader *tmp_reader = OB_NEW(ObRandomFileReader, MEMORY_ATTR, allocator); if (OB_ISNULL(tmp_reader)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("failed to create ObRandomFileReader", K(ret)); } else if (OB_FAIL(tmp_reader->open(param.filename_))) { LOG_WARN("fail to open random file reader", KR(ret), K(param.filename_)); - tmp_reader->~ObRandomFileReader(); - allocator.free(tmp_reader); + OB_DELETE(ObRandomFileReader, MEMORY_ATTR, tmp_reader); } else { file_reader = tmp_reader; } } else if (param.file_location_ == ObLoadFileLocation::OSS) { - ObRandomOSSReader *tmp_reader = OB_NEWx(ObRandomOSSReader, &allocator, allocator); + ObRandomOSSReader *tmp_reader = OB_NEW(ObRandomOSSReader, MEMORY_ATTR, allocator); if (OB_ISNULL(tmp_reader)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("failed to create RandomOSSReader", K(ret)); } else if (OB_FAIL(tmp_reader->open(param.access_info_, param.filename_))) { LOG_WARN("fail to open random oss reader", KR(ret), K(param.filename_)); - tmp_reader->~ObRandomOSSReader(); - allocator.free(tmp_reader); + OB_DELETE(ObRandomOSSReader, MEMORY_ATTR, tmp_reader); } else { file_reader = tmp_reader; } @@ -73,14 +105,13 @@ int ObFileReader::open(const ObFileReadParam ¶m, ObIAllocator &allocator, Ob ret = OB_ERR_UNEXPECTED; LOG_WARN("cannot create packet stream file reader while the packet handle is null", K(ret)); } else { - ObPacketStreamFileReader *tmp_reader = OB_NEWx(ObPacketStreamFileReader, &allocator, allocator); + ObPacketStreamFileReader *tmp_reader = OB_NEW(ObPacketStreamFileReader, MEMORY_ATTR, allocator); if (OB_ISNULL(tmp_reader)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("failed to create ObPacketStreamFileReader", K(ret)); } else if (OB_FAIL(tmp_reader->open(param.filename_, *param.packet_handle_, param.session_, param.timeout_ts_))) { LOG_WARN("failed to open packet stream file reader", KR(ret), K(param.filename_)); - tmp_reader->~ObPacketStreamFileReader(); - allocator.free(tmp_reader); + OB_DELETE(ObPacketStreamFileReader, MEMORY_ATTR, tmp_reader); } else { file_reader = tmp_reader; } @@ -90,6 +121,36 @@ int ObFileReader::open(const ObFileReadParam ¶m, ObIAllocator &allocator, Ob LOG_WARN("not supported load file location", KR(ret), K(param.file_location_)); } + if (OB_SUCC(ret)) { + ObFileReader *decompress_reader = nullptr; + ret = open_decompress_reader(param, allocator, file_reader, decompress_reader); + if (OB_SUCC(ret) && OB_NOT_NULL(decompress_reader)) { + file_reader = decompress_reader; + } + } + + return ret; +} + +int ObFileReader::open_decompress_reader(const ObFileReadParam ¶m, + ObIAllocator &allocator, + ObFileReader *source_reader, + ObFileReader *&file_reader) +{ + int ret = OB_SUCCESS; + if (param.compression_format_ == ObLoadCompressionFormat::NONE) { + file_reader = source_reader; + } else { + ObDecompressFileReader *tmp_reader = OB_NEW(ObDecompressFileReader, MEMORY_ATTR, allocator); + if (OB_ISNULL(tmp_reader)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + } else if (OB_FAIL(tmp_reader->open(param, source_reader))) { + LOG_WARN("failed to open decompress file reader"); + OB_DELETE(ObDecompressFileReader, MEMORY_ATTR, tmp_reader); + } else { + file_reader = tmp_reader; + } + } return ret; } @@ -471,5 +532,378 @@ bool ObPacketStreamFileReader::is_killed() const return NULL != session_ && (session_->is_query_killed() || session_->is_zombie()); } +/** + * ObDecompressor + */ +ObDecompressor::ObDecompressor(ObIAllocator &allocator) + : allocator_(allocator) +{} + +ObDecompressor::~ObDecompressor() +{ +} + +int ObDecompressor::create(ObLoadCompressionFormat format, ObIAllocator &allocator, ObDecompressor *&decompressor) +{ + int ret = OB_SUCCESS; + + decompressor = nullptr; + + switch (format) { + case ObLoadCompressionFormat::NONE: { + ret = OB_INVALID_ARGUMENT; + } break; + + case ObLoadCompressionFormat::GZIP: + case ObLoadCompressionFormat::DEFLATE: { + decompressor = OB_NEW(ObZlibDecompressor, MEMORY_ATTR, allocator); + } break; + + case ObLoadCompressionFormat::ZSTD: { + decompressor = OB_NEW(ObZstdDecompressor, MEMORY_ATTR, allocator); + } break; + + default: { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("unsupported compression format", K(format)); + } break; + } + + if (OB_SUCC(ret) && OB_NOT_NULL(decompressor)) { + if (OB_FAIL(decompressor->init())) { + LOG_WARN("failed to init decompressor", KR(ret)); + decompressor->destroy(); + OB_DELETE(ObDecompressor, MEMORY_ATTR, decompressor); + } + } + + return ret; +} + +/** + * ObDecompressFileReader + */ +const int64_t ObDecompressFileReader::COMPRESSED_DATA_BUFFER_SIZE = 2 * 1024 * 1024; + +ObDecompressFileReader::ObDecompressFileReader(ObIAllocator &allocator) + : ObStreamFileReader(allocator) +{} + +ObDecompressFileReader::~ObDecompressFileReader() +{ + if (OB_NOT_NULL(source_reader_)) { + OB_DELETE(ObFileReader, MEMORY_ATTR, source_reader_); + } + + if (OB_NOT_NULL(decompressor_)) { + OB_DELETE(ObDecompressor, MEMORY_ATTR, decompressor_); + } + + if (OB_NOT_NULL(compressed_data_)) { + allocator_.free(compressed_data_); + compressed_data_ = nullptr; + } +} + +int ObDecompressFileReader::open(const ObFileReadParam ¶m, ObFileReader *source_reader) +{ + int ret = OB_SUCCESS; + + if (param.compression_format_ == ObLoadCompressionFormat::NONE) { + ret = OB_INVALID_ARGUMENT; + } else if (OB_FAIL(ObDecompressor::create(param.compression_format_, allocator_, decompressor_))) { + LOG_WARN("failed to create decompressor", K(param.compression_format_), K(ret)); + } else if (OB_ISNULL(compressed_data_ = (char *)allocator_.alloc(COMPRESSED_DATA_BUFFER_SIZE))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate buffer.", K(COMPRESSED_DATA_BUFFER_SIZE)); + } else if (FALSE_IT(source_reader_ = source_reader)) { + } + + return ret; +} + +int ObDecompressFileReader::read(char *buf, int64_t capacity, int64_t &read_size) +{ + int ret = OB_SUCCESS; + + read_size = 0; + + if (OB_ISNULL(source_reader_)) { + ret = OB_NOT_INIT; + } else if (OB_ISNULL(buf) || capacity <= 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KP(buf), K(capacity)); + } else if (consumed_data_size_ >= compress_data_size_) { + if (!source_reader_->eof()) { + ret = read_compressed_data(); + } else { + eof_ = true; + } + } + + if (OB_SUCC(ret) && compress_data_size_ > consumed_data_size_) { + int64_t consumed_size = 0; + ret = decompressor_->decompress(compressed_data_ + consumed_data_size_, + compress_data_size_ - consumed_data_size_, + consumed_size, + buf, + capacity, + read_size); + if (OB_FAIL(ret)) { + LOG_WARN("failed to decompress", K(ret)); + } else { + consumed_data_size_ += consumed_size; + uncompressed_size_ += read_size; + } + } + + return ret; +} + +int ObDecompressFileReader::read_compressed_data() +{ + int ret = OB_SUCCESS; + char *read_buffer = compressed_data_; + if (OB_ISNULL(source_reader_)) { + ret = OB_NOT_INIT; + } else if (OB_UNLIKELY(consumed_data_size_ < compress_data_size_)) { + // backup data + const int64_t last_data_size = compress_data_size_ - consumed_data_size_; + MEMMOVE(compressed_data_, compressed_data_ + consumed_data_size_, last_data_size); + read_buffer = compressed_data_ + last_data_size; + consumed_data_size_ = 0; + compress_data_size_ = last_data_size; + } else if (consumed_data_size_ == compress_data_size_) { + consumed_data_size_ = 0; + compress_data_size_ = 0; + } + + if (OB_SUCC(ret)) { + // read data from source reader + int64_t read_size = 0; + int64_t capability = COMPRESSED_DATA_BUFFER_SIZE - compress_data_size_; + ret = source_reader_->read(read_buffer, capability, read_size); + if (OB_SUCC(ret)) { + compress_data_size_ += read_size; + } + } + return ret; +} + +/** + * ObZlibDecompressor + */ +voidpf zlib_alloc(voidpf opaque, uInt items, uInt size) +{ + voidpf ret = NULL; + ObIAllocator *allocator = static_cast(opaque); + if (OB_ISNULL(allocator)) { + } else { + ret = allocator->alloc(items * size); + } + return ret; +} + +void zlib_free(voidpf opaque, voidpf address) +{ + ObIAllocator *allocator = static_cast(opaque); + if (OB_ISNULL(allocator)) { + free(address); + } else { + allocator->free(address); + } +} + +ObZlibDecompressor::ObZlibDecompressor(ObIAllocator &allocator) + : ObDecompressor(allocator) +{} + +ObZlibDecompressor::~ObZlibDecompressor() +{ + this->destroy(); +} + +void ObZlibDecompressor::destroy() +{ + if (OB_NOT_NULL(zlib_stream_ptr_)) { + z_streamp zstream_ptr = static_cast(zlib_stream_ptr_); + inflateEnd(zstream_ptr); + zlib_stream_ptr_ = nullptr; + } +} + +int ObZlibDecompressor::init() +{ + int ret = OB_SUCCESS; + if (OB_NOT_NULL(zlib_stream_ptr_)) { + ret = OB_INIT_TWICE; + } else if (OB_ISNULL(zlib_stream_ptr_ = allocator_.alloc(sizeof(z_stream)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("allocate memory failed: zlib stream object.", K(sizeof(z_stream))); + } else { + z_streamp zstream_ptr = static_cast(zlib_stream_ptr_); + zstream_ptr->zalloc = zlib_alloc; + zstream_ptr->zfree = zlib_free; + zstream_ptr->opaque = static_cast(&allocator_); + zstream_ptr->avail_in = 0; + zstream_ptr->next_in = Z_NULL; + + int zlib_ret = inflateInit2(zstream_ptr, 32 + MAX_WBITS); + if (Z_OK != zlib_ret) { + ret = OB_ERROR; + LIB_LOG(WARN, "failed to inflateInit2", K(zlib_ret)); + } + } + return ret; +} + +int ObZlibDecompressor::decompress(const char *src, int64_t src_size, int64_t &consumed_size, + char *dest, int64_t dest_capacity, int64_t &decompressed_size) +{ + int ret = OB_SUCCESS; + int zlib_ret = Z_OK; + z_streamp zstream_ptr = nullptr; + + if (OB_ISNULL(zlib_stream_ptr_)) { + ret = OB_NOT_INIT; + } else if (OB_ISNULL(src) || src_size <= 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KP(src), K(src_size)); + } else if (OB_ISNULL(dest) || dest_capacity <= 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KP(dest), K(dest_capacity)); + } else if (FALSE_IT(zstream_ptr = static_cast(zlib_stream_ptr_))) { + } else if (zstream_need_reset_) { + if (Z_OK != (zlib_ret = inflateReset(zstream_ptr))) { + ret = OB_ERR_COMPRESS_DECOMPRESS_DATA; + LOG_WARN("failed to reset zlib stream", K(zlib_ret)); + } else { + zstream_need_reset_ = false; + } + } + + if (OB_SUCC(ret)) { + zstream_ptr->avail_in = src_size; + zstream_ptr->next_in = (Bytef *)src; + + int64_t last_avail_in = zstream_ptr->avail_in; + int64_t last_total_out = zstream_ptr->total_out; + zstream_ptr->next_out = reinterpret_cast(dest); + zstream_ptr->avail_out = dest_capacity; + + zlib_ret = inflate(zstream_ptr, Z_NO_FLUSH); + if (Z_OK == zlib_ret || Z_STREAM_END == zlib_ret) { + LOG_TRACE("inflate success", + K(last_avail_in - zstream_ptr->avail_in), + K(zstream_ptr->total_out - last_total_out)); + + consumed_size = last_avail_in - zstream_ptr->avail_in; + decompressed_size = zstream_ptr->total_out - last_total_out; + + if (Z_STREAM_END == zlib_ret) { + LOG_DEBUG("got Z_STREAM_END"); + zstream_need_reset_ = true; + } + } else { + ret = OB_ERR_COMPRESS_DECOMPRESS_DATA; + LOG_WARN("failed to decompress", K(zlib_ret)); + } + } + + return ret; +} + +/** + * ObZstdDecompressor + */ +void *zstd_alloc(void* opaque, size_t size) +{ + void *ret = nullptr; + if (OB_ISNULL(opaque)) { + } else { + ObIAllocator *allocator = static_cast(opaque); + ret = allocator->alloc(size); + } + return ret; +} + +void zstd_free(void *opaque, void *address) +{ + if (OB_ISNULL(opaque)) { + } else { + ObIAllocator *allocator = static_cast(opaque); + allocator->free(address); + } +} + +ObZstdDecompressor::ObZstdDecompressor(ObIAllocator &allocator) + : ObDecompressor(allocator) +{} + +ObZstdDecompressor::~ObZstdDecompressor() +{ + this->destroy(); +} + +void ObZstdDecompressor::destroy() +{ + using ObZstdWrapper = oceanbase::common::zstd_1_3_8::ObZstdWrapper; + + if (OB_NOT_NULL(zstd_stream_context_)) { + ObZstdWrapper::free_stream_dctx(zstd_stream_context_); + zstd_stream_context_ = nullptr; + } +} + +int ObZstdDecompressor::init() +{ + using OB_ZSTD_customMem = oceanbase::common::zstd_1_3_8::OB_ZSTD_customMem; + using ObZstdWrapper = oceanbase::common::zstd_1_3_8::ObZstdWrapper; + + int ret = OB_SUCCESS; + + if (OB_NOT_NULL(zstd_stream_context_)) { + ret = OB_INIT_TWICE; + } else { + OB_ZSTD_customMem allocator; + allocator.customAlloc = zstd_alloc; + allocator.customFree = zstd_free; + allocator.opaque = &allocator_; + + ret = ObZstdWrapper::create_stream_dctx(allocator, zstd_stream_context_); + if (OB_FAIL(ret)) { + LOG_WARN("failed to create zstd stream context", K(ret)); + } + } + + return ret; +} + +int ObZstdDecompressor::decompress(const char *src, int64_t src_size, int64_t &consumed_size, + char *dest, int64_t dest_capacity, int64_t &decompressed_size) +{ + using ObZstdWrapper = oceanbase::common::zstd_1_3_8::ObZstdWrapper; + + int ret = OB_SUCCESS; + if (OB_ISNULL(zstd_stream_context_)) { + ret = OB_NOT_INIT; + } else if (OB_ISNULL(src) || src_size <= 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KP(src), K(src_size)); + } else if (OB_ISNULL(dest) || dest_capacity <= 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KP(dest), K(dest_capacity)); + } else { + size_t tmp_consumed_size = 0; + size_t tmp_decompressed_size = 0; + ret = ObZstdWrapper::decompress_stream(zstd_stream_context_, + src, src_size, tmp_consumed_size, + dest, dest_capacity, tmp_decompressed_size); + consumed_size = static_cast(tmp_consumed_size); + decompressed_size = static_cast(tmp_decompressed_size); + } + return ret; +} + } // namespace sql } // namespace oceanbase diff --git a/src/sql/engine/cmd/ob_load_data_file_reader.h b/src/sql/engine/cmd/ob_load_data_file_reader.h index d888f299a5..506b17c3ec 100644 --- a/src/sql/engine/cmd/ob_load_data_file_reader.h +++ b/src/sql/engine/cmd/ob_load_data_file_reader.h @@ -37,10 +37,14 @@ public: public: ObLoadFileLocation file_location_; ObString filename_; + ObLoadCompressionFormat compression_format_; share::ObBackupStorageInfo access_info_; observer::ObIMPPacketSender *packet_handle_; ObSQLSessionInfo *session_; int64_t timeout_ts_; // A job always has a deadline and file reading may cost a long time + +public: + static int parse_compression_format(ObString compression_name, ObString filename, ObLoadCompressionFormat &compression_format); }; class ObFileReader @@ -89,6 +93,12 @@ public: */ static int open(const ObFileReadParam ¶m, ObIAllocator &allocator, ObFileReader *& file_reader); +private: + static int open_decompress_reader(const ObFileReadParam ¶m, + ObIAllocator &allocator, + ObFileReader *source_reader, + ObFileReader *&file_reader); + protected: ObIAllocator &allocator_; }; @@ -200,6 +210,99 @@ private: bool eof_; }; +/** + * base class for stream decompressor + */ +class ObDecompressor +{ +public: + explicit ObDecompressor(ObIAllocator &allocator); + virtual ~ObDecompressor(); + + virtual int init() = 0; + virtual void destroy() = 0; + virtual int decompress(const char *src, int64_t src_size, int64_t &consumed_size, + char *dest, int64_t dest_capacity, int64_t &decompressed_size) = 0; + + static int create(ObLoadCompressionFormat format, ObIAllocator &allocator, ObDecompressor *&decompressor); + +protected: + ObIAllocator &allocator_; +}; + +/** + * stream decompress file reader + */ +class ObDecompressFileReader : public ObStreamFileReader +{ +public: + explicit ObDecompressFileReader(ObIAllocator &allocator); + virtual ~ObDecompressFileReader(); + + int open(const ObFileReadParam ¶m, ObFileReader *source_reader); + +public: + int read(char *buf, int64_t capability, int64_t &read_size) override; + int64_t get_offset() const override { return uncompressed_size_; } + bool eof() const override { return eof_; } + +private: + int read_compressed_data(); + +protected: + ObFileReader *source_reader_ = nullptr; + + char * compressed_data_ = nullptr; /// compressed data buffer + int64_t compress_data_size_ = 0; /// the valid data size in compressed data buffer + int64_t consumed_data_size_ = 0; /// handled buffer size in the compressed data buffer + int64_t uncompressed_size_ = 0; /// decompressed size from compressed data + + bool eof_ = false; + + ObDecompressor *decompressor_ = nullptr; + + static const int64_t COMPRESSED_DATA_BUFFER_SIZE; +}; + +/** + * gzip/deflate decompressor + */ +class ObZlibDecompressor : public ObDecompressor +{ +public: + explicit ObZlibDecompressor(ObIAllocator &allocator); + virtual ~ObZlibDecompressor(); + + int init() override; + void destroy() override; + + int decompress(const char *src, int64_t src_size, int64_t &consumed_size, + char *dest, int64_t dest_capacity, int64_t &decompressed_size) override; + +private: + void *zlib_stream_ptr_ = nullptr; + bool zstream_need_reset_ = false; // the zstreamptr should be reset if we got Z_STREAM_END +}; + +/** + * zstd decompressor + */ +class ObZstdDecompressor : public ObDecompressor +{ +public: + explicit ObZstdDecompressor(ObIAllocator &allocator); + virtual ~ObZstdDecompressor(); + + int init() override; + void destroy() override; + + int decompress(const char *src, int64_t src_size, int64_t &consumed_size, + char *dest, int64_t dest_capacity, int64_t &decompressed_size) override; + +private: + void *zstd_stream_context_ = nullptr; +}; + } // namespace sql } // namespace oceanbase diff --git a/src/sql/engine/cmd/ob_load_data_impl.cpp b/src/sql/engine/cmd/ob_load_data_impl.cpp index b978c31660..472ca6c737 100644 --- a/src/sql/engine/cmd/ob_load_data_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_impl.cpp @@ -2772,12 +2772,13 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm } if (OB_SUCC(ret)) { - file_read_param.file_location_ = load_file_storage; - file_read_param.filename_ = load_args.file_name_; - file_read_param.access_info_ = load_args.access_info_; - file_read_param.packet_handle_ = &ctx.get_my_session()->get_pl_query_sender()->get_packet_sender(); - file_read_param.session_ = ctx.get_my_session(); - file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts(); + file_read_param.file_location_ = load_file_storage; + file_read_param.filename_ = load_args.file_name_; + file_read_param.compression_format_ = load_args.compression_format_; + file_read_param.access_info_ = load_args.access_info_; + file_read_param.packet_handle_ = &ctx.get_my_session()->get_pl_query_sender()->get_packet_sender(); + file_read_param.session_ = ctx.get_my_session(); + file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts(); if (OB_UNLIKELY(ObLoadFileLocation::OSS == load_file_storage && ObLoadDataFormat::CSV != load_args.access_info_.get_load_data_format())) { diff --git a/src/sql/parser/sql_parser_mysql_mode.y b/src/sql/parser/sql_parser_mysql_mode.y index 714aadc744..b57aca1427 100644 --- a/src/sql/parser/sql_parser_mysql_mode.y +++ b/src/sql/parser/sql_parser_mysql_mode.y @@ -466,7 +466,7 @@ END_P SET_VAR DELIMITER %type alter_column_behavior opt_set opt_position_column %type alter_system_stmt alter_system_set_parameter_actions alter_system_settp_actions settp_option alter_system_set_parameter_action server_info_list server_info alter_system_reset_parameter_actions alter_system_reset_parameter_action %type opt_comment opt_as -%type column_name relation_name function_name column_label var_name relation_name_or_string row_format_option +%type column_name relation_name function_name column_label var_name relation_name_or_string row_format_option compression_name %type audit_stmt audit_clause op_audit_tail_clause audit_operation_clause audit_all_shortcut_list audit_all_shortcut auditing_on_clause auditing_by_user_clause audit_user_list audit_user audit_user_with_host_name %type opt_hint_list hint_option select_with_opt_hint update_with_opt_hint delete_with_opt_hint hint_list_with_end global_hint transform_hint optimize_hint %type create_index_stmt index_name sort_column_list sort_column_key opt_index_option_list index_option opt_sort_column_key_length opt_index_using_algorithm index_using_algorithm visibility_option opt_constraint_name constraint_name create_with_opt_hint index_expr alter_with_opt_hint @@ -506,7 +506,7 @@ END_P SET_VAR DELIMITER %type balance_task_type opt_balance_task_type %type list_expr list_partition_element list_partition_expr list_partition_list list_partition_option opt_list_partition_list opt_list_subpartition_list list_subpartition_list list_subpartition_element drop_partition_name_list %type primary_zone_name change_tenant_name_or_tenant_id distribute_method distribute_method_list -%type load_data_stmt opt_load_local opt_duplicate opt_load_charset opt_load_ignore_rows infile_string +%type load_data_stmt opt_load_local opt_duplicate opt_compression opt_load_charset opt_load_ignore_rows infile_string %type lines_or_rows opt_field_or_var_spec field_or_vars_list field_or_vars opt_load_set_spec opt_load_data_extended_option_list load_data_extended_option_list load_data_extended_option %type load_set_list load_set_element load_data_with_opt_hint %type ret_type opt_agg @@ -4675,23 +4675,24 @@ NAME_OB *****************************************************************************/ load_data_stmt: load_data_with_opt_hint opt_load_local INFILE infile_string opt_duplicate INTO TABLE -relation_factor opt_use_partition opt_load_charset field_opt line_opt opt_load_ignore_rows +relation_factor opt_use_partition opt_compression opt_load_charset field_opt line_opt opt_load_ignore_rows opt_field_or_var_spec opt_load_set_spec opt_load_data_extended_option_list { (void) $9; - malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_DATA, 12, + malloc_non_terminal_node($$, result->malloc_pool_, T_LOAD_DATA, 13, $2, /* 0. local */ $4, /* 1. filename */ $5, /* 2. duplicate */ $8, /* 3. table */ - $10, /* 4. charset */ - $11, /* 5. field */ - $12, /* 6. line */ - $13, /* 7. ignore rows */ - $14, /* 8. field or vars */ - $15, /* 9. set field */ + $11, /* 4. charset */ + $12, /* 5. field */ + $13, /* 6. line */ + $14, /* 7. ignore rows */ + $15, /* 8. field or vars */ + $16, /* 9. set field */ $1, /* 10. hint */ - $16 /* 11. extended option list */ + $17, /* 11. extended option list */ + $10 /* 12. compression format */ ); } ; @@ -4743,6 +4744,26 @@ opt_load_local: } ; +opt_compression: +/* empty */ +{ + $$ = NULL; +} +| COMPRESSION opt_equal_mark compression_name +{ + (void)$2; + malloc_non_terminal_node($$, result->malloc_pool_, T_COMPRESSION, 1, $3); +} +; + +compression_name: +NAME_OB { $$ = $1; } +| unreserved_keyword +{ + get_non_reserved_node($$, result->malloc_pool_, @1.first_column, @1.last_column); +} +; + opt_duplicate: /* empty */ { $$= NULL; } | IGNORE { malloc_terminal_node($$, result->malloc_pool_, T_IGNORE); } diff --git a/src/sql/resolver/cmd/ob_load_data_resolver.cpp b/src/sql/resolver/cmd/ob_load_data_resolver.cpp index 4694c7b9bf..ff10289bb7 100644 --- a/src/sql/resolver/cmd/ob_load_data_resolver.cpp +++ b/src/sql/resolver/cmd/ob_load_data_resolver.cpp @@ -28,6 +28,7 @@ #include "share/backup/ob_backup_io_adapter.h" #include "share/backup/ob_backup_struct.h" #include "lib/restore/ob_storage_info.h" +#include "sql/engine/cmd/ob_load_data_file_reader.h" #include namespace oceanbase @@ -43,6 +44,7 @@ LOAD DATA [LOW_PRIORITY | CONCURRENT] [LOCAL] INFILE 'file_name' [REPLACE | IGNORE] INTO TABLE tbl_name [PARTITION (partition_name [, partition_name] ...)] + [COMPRESSION [=] compression_format] [CHARACTER SET charset_name] [{FIELDS | COLUMNS} [TERMINATED BY 'string'] @@ -215,6 +217,30 @@ int ObLoadDataResolver::resolve(const ParseNode &parse_tree) } } + if (OB_SUCC(ret)) { + /* opt_compression */ + ObLoadArgument &load_args = load_stmt->get_load_arguments(); + const ParseNode *child_node = node->children_[ENUM_OPT_COMPRESSION]; + if (NULL != child_node) { + if (OB_UNLIKELY(1 != child_node->num_child_) + || OB_ISNULL(child_node->children_) + || OB_ISNULL(child_node->children_[0]) + || T_COMPRESSION != child_node->type_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid child node", K(child_node->num_child_)); + } else { + ObString compression_name(static_cast(child_node->children_[0]->str_len_), + child_node->children_[0]->str_value_); + ret = ObFileReadParam::parse_compression_format(compression_name, load_args.file_name_, load_args.compression_format_); + if (OB_FAIL(ret)) { + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "unknown compression format or cannot detect compression format by filename"); + } else { + LOG_TRACE("load data with compression format", K(load_args.compression_format_)); + } + } + } + } + if (OB_SUCC(ret)) { /* 4. opt_charset */ ObLoadArgument &load_args = load_stmt->get_load_arguments(); diff --git a/src/sql/resolver/cmd/ob_load_data_resolver.h b/src/sql/resolver/cmd/ob_load_data_resolver.h index 0c77ae20a9..e1719b25d9 100644 --- a/src/sql/resolver/cmd/ob_load_data_resolver.h +++ b/src/sql/resolver/cmd/ob_load_data_resolver.h @@ -79,6 +79,7 @@ private: ENUM_OPT_SET_FIELD, ENUM_OPT_HINT, ENUM_OPT_EXTENDED_OPTIONS, + ENUM_OPT_COMPRESSION, ENUM_TOTAL_COUNT }; ObStmtScope current_scope_; diff --git a/src/sql/resolver/cmd/ob_load_data_stmt.h b/src/sql/resolver/cmd/ob_load_data_stmt.h index 8d21920a93..3c7073f6ca 100644 --- a/src/sql/resolver/cmd/ob_load_data_stmt.h +++ b/src/sql/resolver/cmd/ob_load_data_stmt.h @@ -18,6 +18,7 @@ #include "sql/resolver/dml/ob_del_upd_stmt.h" #include "sql/resolver/dml/ob_hint.h" #include "share/backup/ob_backup_struct.h" + namespace oceanbase { namespace sql @@ -36,6 +37,14 @@ enum class ObLoadFileLocation { OSS, }; +enum class ObLoadCompressionFormat +{ + NONE, + GZIP, + DEFLATE, + ZSTD, +}; + class ObLoadFileIterator { public: @@ -63,7 +72,8 @@ struct ObLoadArgument database_id_(OB_INVALID_INDEX_INT64), table_id_(OB_INVALID_INDEX_INT64), is_csv_format_(false), - part_level_(share::schema::PARTITION_LEVEL_MAX) + part_level_(share::schema::PARTITION_LEVEL_MAX), + compression_format_(ObLoadCompressionFormat::NONE) {} @@ -81,7 +91,8 @@ struct ObLoadArgument K_(database_id), K_(table_id), K_(is_csv_format), - K_(file_iter)); + K_(file_iter), + K_(compression_format)); void assign(const ObLoadArgument &other) { load_file_storage_ = other.load_file_storage_; @@ -100,6 +111,7 @@ struct ObLoadArgument is_csv_format_ = other.is_csv_format_; part_level_ = other.part_level_; file_iter_.copy(other.file_iter_); + compression_format_ = other.compression_format_; } ObLoadFileLocation load_file_storage_; @@ -118,6 +130,7 @@ struct ObLoadArgument bool is_csv_format_; share::schema::ObPartitionLevel part_level_; ObLoadFileIterator file_iter_; + ObLoadCompressionFormat compression_format_; }; struct ObDataInFileStruct