[FEAT MERGE] external table support compressed csv file

This commit is contained in:
hnwyllmm 2024-08-22 05:36:29 +00:00 committed by ob-robot
parent b974a3dc7d
commit 048b7033d7
9 changed files with 429 additions and 73 deletions

View File

@ -44,27 +44,11 @@ ObFileReadParam::ObFileReadParam()
int ObFileReadParam::parse_compression_format(ObString compression_name, ObString filename, ObLoadCompressionFormat &compression_format)
{
int ret = OB_SUCCESS;
if (compression_name.length() == 0 ||
0 == compression_name.case_compare("none")) {
if (compression_name.length() == 0) {
compression_format = ObLoadCompressionFormat::NONE;
} else if (0 == compression_name.case_compare("gzip")) {
compression_format = ObLoadCompressionFormat::GZIP;
} else if (0 == compression_name.case_compare("deflate")) {
compression_format = ObLoadCompressionFormat::DEFLATE;
} else if (0 == compression_name.case_compare("zstd")) {
compression_format = ObLoadCompressionFormat::ZSTD;
} else if (0 == compression_name.case_compare("auto")) {
if (filename.suffix_match_ci(".gz")) {
compression_format = ObLoadCompressionFormat::GZIP;
} else if (filename.suffix_match_ci(".deflate")) {
compression_format = ObLoadCompressionFormat::DEFLATE;
} else if (filename.suffix_match_ci(".zst") || filename.suffix_match_ci(".zstd")) {
compression_format = ObLoadCompressionFormat::ZSTD;
} else {
ret = OB_INVALID_ARGUMENT;
}
} else {
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(compression_format_from_string(compression_name, compression_format))) {
} else if (ObLoadCompressionFormat::AUTO == compression_format) {
ret = compression_format_from_suffix(filename, compression_format);
}
return ret;
}
@ -563,7 +547,7 @@ int ObDecompressor::create(ObLoadCompressionFormat format, ObIAllocator &allocat
case ObLoadCompressionFormat::GZIP:
case ObLoadCompressionFormat::DEFLATE: {
decompressor = OB_NEW(ObZlibDecompressor, MEMORY_ATTR, allocator);
decompressor = OB_NEW(ObZlibDecompressor, MEMORY_ATTR, allocator, format);
} break;
case ObLoadCompressionFormat::ZSTD: {
@ -579,14 +563,22 @@ int ObDecompressor::create(ObLoadCompressionFormat format, ObIAllocator &allocat
if (OB_SUCC(ret) && OB_NOT_NULL(decompressor)) {
if (OB_FAIL(decompressor->init())) {
LOG_WARN("failed to init decompressor", KR(ret));
decompressor->destroy();
OB_DELETE(ObDecompressor, MEMORY_ATTR, decompressor);
ObDecompressor::destroy(decompressor);
decompressor = nullptr;
}
}
return ret;
}
void ObDecompressor::destroy(ObDecompressor *decompressor)
{
if (OB_NOT_NULL(decompressor)) {
decompressor->destroy();
OB_DELETE(ObDecompressor, MEMORY_ATTR, decompressor);
}
}
/**
* ObDecompressFileReader
*/
@ -603,7 +595,7 @@ ObDecompressFileReader::~ObDecompressFileReader()
}
if (OB_NOT_NULL(decompressor_)) {
OB_DELETE(ObDecompressor, MEMORY_ATTR, decompressor_);
ObDecompressor::destroy(decompressor_);
}
if (OB_NOT_NULL(compressed_data_)) {
@ -721,8 +713,8 @@ void zlib_free(voidpf opaque, voidpf address)
}
}
ObZlibDecompressor::ObZlibDecompressor(ObIAllocator &allocator)
: ObDecompressor(allocator)
ObZlibDecompressor::ObZlibDecompressor(ObIAllocator &allocator, ObLoadCompressionFormat compression_format)
: ObDecompressor(allocator), compression_format_(compression_format)
{}
ObZlibDecompressor::~ObZlibDecompressor()

View File

@ -18,6 +18,7 @@
#include "lib/allocator/ob_allocator.h"
#include "lib/file/ob_file.h"
#include "sql/resolver/cmd/ob_load_data_stmt.h"
#include "sql/engine/cmd/ob_load_data_parser.h"
#include "share/backup/ob_backup_struct.h"
#include "observer/mysql/obmp_packet_sender.h"
@ -225,7 +226,10 @@ public:
virtual int decompress(const char *src, int64_t src_size, int64_t &consumed_size,
char *dest, int64_t dest_capacity, int64_t &decompressed_size) = 0;
virtual ObLoadCompressionFormat compression_format() const = 0;
static int create(ObLoadCompressionFormat format, ObIAllocator &allocator, ObDecompressor *&decompressor);
static void destroy(ObDecompressor *decompressor);
protected:
ObIAllocator &allocator_;
@ -271,7 +275,7 @@ protected:
class ObZlibDecompressor : public ObDecompressor
{
public:
explicit ObZlibDecompressor(ObIAllocator &allocator);
explicit ObZlibDecompressor(ObIAllocator &allocator, ObLoadCompressionFormat compression_format);
virtual ~ObZlibDecompressor();
int init() override;
@ -280,9 +284,13 @@ public:
int decompress(const char *src, int64_t src_size, int64_t &consumed_size,
char *dest, int64_t dest_capacity, int64_t &decompressed_size) override;
ObLoadCompressionFormat compression_format() const override { return compression_format_; }
private:
void *zlib_stream_ptr_ = nullptr;
bool zstream_need_reset_ = false; // the zstreamptr should be reset if we got Z_STREAM_END
ObLoadCompressionFormat compression_format_;
};
/**
@ -300,6 +308,7 @@ public:
int decompress(const char *src, int64_t src_size, int64_t &consumed_size,
char *dest, int64_t dest_capacity, int64_t &decompressed_size) override;
ObLoadCompressionFormat compression_format() const override { return ObLoadCompressionFormat::ZSTD; }
private:
void *zstd_stream_context_ = nullptr;
};

View File

@ -336,6 +336,54 @@ int ObOriginFileFormat::load_from_json_data(json::Pair *&node, ObIAllocator &all
return ret;
}
const char *compression_format_to_string(ObLoadCompressionFormat compression_format)
{
switch (compression_format) {
case ObLoadCompressionFormat::NONE: return "NONE";
case ObLoadCompressionFormat::AUTO: return "AUTO";
case ObLoadCompressionFormat::GZIP: return "GZIP";
case ObLoadCompressionFormat::DEFLATE: return "DEFLATE";
case ObLoadCompressionFormat::ZSTD: return "ZSTD";
default: return "INVALID";
}
}
int compression_format_from_string(ObString compression_name, ObLoadCompressionFormat &compression_format)
{
int ret = OB_SUCCESS;
if (compression_name.length() == 0 ||
0 == compression_name.case_compare("none")) {
compression_format = ObLoadCompressionFormat::NONE;
} else if (0 == compression_name.case_compare("gzip")) {
compression_format = ObLoadCompressionFormat::GZIP;
} else if (0 == compression_name.case_compare("deflate")) {
compression_format = ObLoadCompressionFormat::DEFLATE;
} else if (0 == compression_name.case_compare("zstd")) {
compression_format = ObLoadCompressionFormat::ZSTD;
} else if (0 == compression_name.case_compare("auto")) {
compression_format = ObLoadCompressionFormat::AUTO;
} else {
ret = OB_INVALID_ARGUMENT;
compression_format = ObLoadCompressionFormat::INVALID;
}
return ret;
}
int compression_format_from_suffix(ObString filename, ObLoadCompressionFormat &compression_format)
{
int ret = OB_SUCCESS;
if (filename.suffix_match_ci(".gz")) {
compression_format = ObLoadCompressionFormat::GZIP;
} else if (filename.suffix_match_ci(".deflate")) {
compression_format = ObLoadCompressionFormat::DEFLATE;
} else if (filename.suffix_match_ci(".zst") || filename.suffix_match_ci(".zstd")) {
compression_format = ObLoadCompressionFormat::ZSTD;
} else {
compression_format = ObLoadCompressionFormat::NONE;
}
return ret;
}
int64_t ObExternalFileFormat::to_string(char *buf, const int64_t buf_len) const
{
int64_t pos = 0;
@ -354,6 +402,11 @@ int64_t ObExternalFileFormat::to_string(char *buf, const int64_t buf_len) const
pos += 0;
}
if (compression_format_ != ObLoadCompressionFormat::NONE) {
J_COMMA();
databuff_print_kv(buf, buf_len, pos, "\"COMPRESSION\"", compression_format_to_string(compression_format_));
}
J_OBJ_END();
return pos;
}
@ -400,6 +453,13 @@ int ObExternalFileFormat::load_from_string(const ObString &str, ObIAllocator &al
LOG_WARN("invalid format type", K(ret), K(format_type_str));
break;
}
if (OB_SUCC(ret) && OB_NOT_NULL(format_type_node)
&& 0 == format_type_node->name_.case_compare("COMPRESSION")
&& format_type_node->value_->get_type() == json::JT_STRING) {
ObString compression_format_str = format_type_node->value_->get_string();
OZ(compression_format_from_string(compression_format_str, compression_format_));
}
}
}
return ret;

View File

@ -489,6 +489,26 @@ struct ObOriginFileFormat
common::ObString origin_null_if_str_;
};
enum class ObLoadCompressionFormat
{
INVALID,
NONE,
AUTO,
GZIP,
DEFLATE,
ZSTD,
};
const char *compression_format_to_string(ObLoadCompressionFormat compression_format);
int compression_format_from_string(ObString compression_name, ObLoadCompressionFormat &compression_format);
/**
* guess compression format from filename suffix
*
* Return NONE if none of the known compression format matches.
*/
int compression_format_from_suffix(ObString filename, ObLoadCompressionFormat &compression_format);
struct ObExternalFileFormat
{
struct StringData {
@ -512,7 +532,7 @@ struct ObExternalFileFormat
OPT_BINARY_AS_TEXT = 1 << 1,
};
ObExternalFileFormat() : format_type_(INVALID_FORMAT) {}
ObExternalFileFormat() : format_type_(INVALID_FORMAT), compression_format_(ObLoadCompressionFormat::NONE) {}
int64_t to_string(char* buf, const int64_t buf_len) const;
int load_from_string(const common::ObString &str, common::ObIAllocator &allocator);
@ -521,6 +541,7 @@ struct ObExternalFileFormat
ObOriginFileFormat origin_file_format_str_;
FormatType format_type_;
sql::ObCSVGeneralFormat csv_format_;
ObLoadCompressionFormat compression_format_;
uint64_t options_;
static const char *FORMAT_TYPE_STR[];

View File

@ -22,6 +22,7 @@
#include "share/ob_device_manager.h"
#include "lib/utility/ob_macro_utils.h"
#include "sql/engine/table/ob_parquet_table_row_iter.h"
#include "sql/engine/cmd/ob_load_data_file_reader.h"
namespace oceanbase
{
@ -330,6 +331,215 @@ int ObExternalDataAccessDriver::init(const ObString &location, const ObString &a
return ret;
}
ObExternalStreamFileReader::~ObExternalStreamFileReader()
{
reset();
}
const char * ObExternalStreamFileReader::MEMORY_LABEL = "ExternalReader";
const int64_t ObExternalStreamFileReader::COMPRESSED_DATA_BUFFER_SIZE = 2 * 1024 * 1024;
int ObExternalStreamFileReader::init(const common::ObString &location,
const ObString &access_info,
ObLoadCompressionFormat compression_format,
ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(allocator_)) {
ret = OB_INIT_TWICE;
} else if (OB_FAIL(data_access_driver_.init(location, access_info))) {
LOG_WARN("failed to init data access driver", K(ret), K(location), K(access_info));
} else {
allocator_ = &allocator;
compression_format_ = compression_format;
}
return ret;
}
int ObExternalStreamFileReader::open(const ObString &filename)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(allocator_)) {
ret = OB_NOT_INIT;
} else if (data_access_driver_.is_opened()) {
ret = OB_INIT_TWICE;
} else if (OB_FAIL(data_access_driver_.open(filename.ptr()))) {
LOG_WARN("failed to open file", K(ret), K(filename));
} else if (OB_FAIL(data_access_driver_.get_file_size(filename.ptr(), file_size_))) {
LOG_WARN("failed to get file size", K(ret), K(filename));
} else {
ObLoadCompressionFormat this_file_compression_format = compression_format_;
if (this_file_compression_format == ObLoadCompressionFormat::AUTO
&& OB_FAIL(compression_format_from_suffix(filename, this_file_compression_format))) {
LOG_WARN("failed to dectect compression format from filename", K(ret), K(filename));
}
if (OB_SUCC(ret) && OB_FAIL(create_decompressor(this_file_compression_format))) {
LOG_WARN("failed to create decompressor", K(ret));
}
}
LOG_TRACE("open file done", K(filename), K(ret));
return ret;
}
void ObExternalStreamFileReader::close()
{
if (data_access_driver_.is_opened()) {
data_access_driver_.close();
is_file_end_ = true;
file_offset_ = 0;
file_size_ = 0;
LOG_DEBUG("close file");
}
}
void ObExternalStreamFileReader::reset()
{
close();
if (OB_NOT_NULL(compressed_data_) && OB_NOT_NULL(allocator_)) {
allocator_->free(compressed_data_);
compressed_data_ = nullptr;
}
if (OB_NOT_NULL(decompressor_)) {
ObDecompressor::destroy(decompressor_);
decompressor_ = nullptr;
}
allocator_ = nullptr;
}
bool ObExternalStreamFileReader::eof()
{
return is_file_end_;
}
int ObExternalStreamFileReader::read(char *buf, int64_t buf_len, int64_t &read_size)
{
int ret = OB_SUCCESS;
read_size = 0;
if (OB_ISNULL(buf) || buf_len <= 0) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_ISNULL(decompressor_)) {
ret = read_from_driver(buf, buf_len, read_size);
is_file_end_ = file_offset_ >= file_size_;
LOG_DEBUG("read file", K(is_file_end_), K(file_offset_), K(file_size_), K(read_size));
} else {
ret = read_decompress(buf, buf_len, read_size);
}
return ret;
}
int ObExternalStreamFileReader::read_from_driver(char *buf, int64_t buf_len, int64_t &read_size)
{
int ret = OB_SUCCESS;
read_size = 0;
if (OB_ISNULL(buf) || buf_len <= 0) {
ret = OB_INVALID_ARGUMENT;
} else if(OB_FAIL(data_access_driver_.pread(buf, buf_len, file_offset_, read_size))) {
LOG_WARN("failed to read data from data access driver", K(ret), K(file_offset_), K(buf_len));
} else {
file_offset_ += read_size;
}
return ret;
}
int ObExternalStreamFileReader::read_decompress(char *buf, int64_t buf_len, int64_t &read_size)
{
int ret = OB_SUCCESS;
read_size = 0;
if (!data_access_driver_.is_opened()) {
ret = OB_NOT_INIT;
} else if (OB_ISNULL(buf) || buf_len <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KP(buf), K(buf_len));
} else if (consumed_data_size_ >= compress_data_size_) {
if (file_offset_ < file_size_) {
ret = read_compressed_data();
} else {
is_file_end_ = true;
}
}
if (OB_SUCC(ret) && compress_data_size_ > consumed_data_size_) {
int64_t consumed_size = 0;
ret = decompressor_->decompress(compressed_data_ + consumed_data_size_,
compress_data_size_ - consumed_data_size_,
consumed_size,
buf,
buf_len,
read_size);
if (OB_FAIL(ret)) {
LOG_WARN("failed to decompress", K(ret));
} else {
consumed_data_size_ += consumed_size;
uncompressed_size_ += read_size;
}
}
return ret;
}
int ObExternalStreamFileReader::read_compressed_data()
{
int ret = OB_SUCCESS;
char *read_buffer = compressed_data_;
if (!data_access_driver_.is_opened()) {
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(consumed_data_size_ < compress_data_size_)) {
// backup data
const int64_t last_data_size = compress_data_size_ - consumed_data_size_;
MEMMOVE(compressed_data_, compressed_data_ + consumed_data_size_, last_data_size);
read_buffer = compressed_data_ + last_data_size;
consumed_data_size_ = 0;
compress_data_size_ = last_data_size;
} else if (consumed_data_size_ == compress_data_size_) {
consumed_data_size_ = 0;
compress_data_size_ = 0;
}
if (OB_SUCC(ret)) {
// read data from source reader
int64_t read_size = 0;
int64_t capacity = COMPRESSED_DATA_BUFFER_SIZE - compress_data_size_;
ret = read_from_driver(read_buffer, capacity, read_size);
if (OB_SUCC(ret)) {
compress_data_size_ += read_size;
}
}
return ret;
}
int ObExternalStreamFileReader::create_decompressor(ObLoadCompressionFormat compression_format)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(allocator_)) {
ret = OB_NOT_INIT;
} else if (compression_format == ObLoadCompressionFormat::NONE) {
ObDecompressor::destroy(decompressor_);
decompressor_ = nullptr;
} else if (OB_NOT_NULL(decompressor_) && decompressor_->compression_format() == compression_format) {
// do nothing
} else {
if (OB_NOT_NULL(decompressor_)) {
ObDecompressor::destroy(decompressor_);
decompressor_ = nullptr;
}
if (OB_FAIL(ObDecompressor::create(compression_format, *allocator_, decompressor_))) {
LOG_WARN("failed to create decompressor", K(ret));
} else if (OB_ISNULL(compressed_data_) &&
OB_ISNULL(compressed_data_ = (char *)allocator_->alloc(COMPRESSED_DATA_BUFFER_SIZE))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(COMPRESSED_DATA_BUFFER_SIZE));
}
}
return ret;
}
int ObExternalTableAccessService::table_scan(
ObVTableScanParam &param,
@ -453,7 +663,7 @@ int ObCSVTableRowIterator::expand_buf()
if (nullptr != old_buf) {
new_buf_len = state_.buf_len_ * 2;
} else {
if (data_access_driver_.get_storage_type() != OB_STORAGE_FILE) {
if (file_reader_.get_storage_type() != OB_STORAGE_FILE) {
//for better performance
new_buf_len = OB_MALLOC_BIG_BLOCK_SIZE;
} else {
@ -548,11 +758,12 @@ int ObCSVTableRowIterator::init(const storage::ObTableScanParam *scan_param)
arena_alloc_.set_attr(lib::ObMemAttr(scan_param->tenant_id_, "CSVRowIter"));
OZ (ObExternalTableRowIterator::init(scan_param));
OZ (parser_.init(scan_param->external_file_format_.csv_format_));
OZ (data_access_driver_.init(scan_param_->external_file_location_, scan_param->external_file_access_info_));
OZ (file_reader_.init(scan_param_->external_file_location_, scan_param->external_file_access_info_,
scan_param_->external_file_format_.compression_format_, malloc_alloc_));
OZ (expand_buf());
if (OB_SUCC(ret)) {
if (data_access_driver_.get_storage_type() == OB_STORAGE_FILE) {
if (file_reader_.get_storage_type() == OB_STORAGE_FILE) {
if (OB_ISNULL(state_.ip_port_buf_ = static_cast<char *>(arena_alloc_.alloc(max_ipv6_port_length)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc memory", K(ret));
@ -653,11 +864,9 @@ int ObCSVTableRowIterator::open_next_file()
{
int ret = OB_SUCCESS;
ObString location = scan_param_->external_file_location_;
int64_t file_size = 0;
if (data_access_driver_.is_opened()) {
data_access_driver_.close();
}
file_reader_.close();
do {
ObString file_url;
int64_t file_id = 0;
@ -665,6 +874,8 @@ int ObCSVTableRowIterator::open_next_file()
int64_t start_line = 0;
int64_t end_line = 0;
int64_t task_idx = state_.file_idx_++;
file_size = 0;
url_.reuse();
ret = get_next_file_and_line_number(task_idx, file_url, file_id, part_id, start_line, end_line);
if (OB_FAIL(ret)) {
@ -695,8 +906,9 @@ int ObCSVTableRowIterator::open_next_file()
OZ (url_.append_fmt("%.*s%s%.*s", location.length(), location.ptr(),
(location.empty() || location[location.length() - 1] == '/') ? "" : split_char,
file_url.length(), file_url.ptr()));
OZ (data_access_driver_.get_file_size(url_.string(), state_.file_size_));
if (OB_SUCC(ret) && data_access_driver_.get_storage_type() == OB_STORAGE_FILE) {
// skip empty file and non-exist file
OZ (file_reader_.get_data_access_driver().get_file_size(url_.string(), file_size));
if (OB_SUCC(ret) && file_reader_.get_storage_type() == OB_STORAGE_FILE) {
ObSqlString full_name;
if (state_.ip_port_len_ == 0) {
OZ (GCONF.self_addr_.addr_to_buffer(state_.ip_port_buf_, max_ipv6_port_length, state_.ip_port_len_));
@ -708,10 +920,11 @@ int ObCSVTableRowIterator::open_next_file()
}
}
LOG_DEBUG("try next file", K(ret), K(url_), K(file_url), K(state_));
} while (OB_SUCC(ret) && 0 >= state_.file_size_); //skip empty file
OZ (data_access_driver_.open(url_.ptr()), url_);
} while (OB_SUCC(ret) && file_size <= 0);
LOG_DEBUG("open external file", K(ret), K(url_), K(state_.file_size_), K(location));
OZ(file_reader_.open(url_.ptr()));
LOG_DEBUG("open external file", K(ret), K(url_), K(location));
return ret;
}
@ -719,15 +932,14 @@ int ObCSVTableRowIterator::open_next_file()
int ObCSVTableRowIterator::load_next_buf()
{
int ret = OB_SUCCESS;
int64_t read_size = 0;
do {
char *next_load_pos = NULL;
int64_t next_buf_len = 0;
if (state_.is_end_file_) {
if (file_reader_.eof()) {
if (OB_FAIL(open_next_file())) {
//do not print log
} else {
state_.is_end_file_ = false;
state_.file_offset_ = 0;
next_load_pos = state_.buf_;
next_buf_len = state_.buf_len_;
}
@ -747,17 +959,20 @@ int ObCSVTableRowIterator::load_next_buf()
}
if (OB_SUCC(ret)) {
int64_t read_size = 0;
OZ (data_access_driver_.pread(next_load_pos, next_buf_len, state_.file_offset_, read_size));
// `read` may return read_size 0.
// If we read a compressed empty file, we need to read it twice
// to know that we have reached the end of the file. The first
// time we read the original file data and decompress it, we get
// 0 bytes, and the second time we read it to know that we have
// reached the end of the file.
OZ (file_reader_.read(next_load_pos, next_buf_len, read_size));
if (OB_SUCC(ret)) {
state_.file_offset_ += read_size;
state_.pos_ = state_.buf_;
state_.data_end_ = next_load_pos + read_size;
state_.is_end_file_ = (state_.file_offset_ >= state_.file_size_);
}
}
} while (false);
} while (OB_SUCC(ret) && read_size <= 0);
return ret;
}
@ -773,7 +988,7 @@ int ObCSVTableRowIterator::skip_lines()
do {
nrows = state_.skip_lines_;
OZ (parser_.scan(state_.pos_, state_.data_end_, nrows, nullptr, nullptr,
temp_handle, error_msgs, state_.is_end_file_));
temp_handle, error_msgs, file_reader_.eof()));
error_msgs.reuse();
state_.skip_lines_ -= nrows;
} while (OB_SUCC(ret) && state_.skip_lines_ > 0 && OB_SUCC(load_next_buf()));
@ -820,7 +1035,7 @@ int ObCSVTableRowIterator::get_next_row()
for (int i = 0; OB_SUCC(ret) && i < file_column_exprs_.count(); ++i) {
ObDatum &datum = file_column_exprs_.at(i)->locate_datum_for_write(eval_ctx_);
if (file_column_exprs_.at(i)->type_ == T_PSEUDO_EXTERNAL_FILE_URL) {
if (csv_iter_->data_access_driver_.get_storage_type() == OB_STORAGE_FILE) {
if (csv_iter_->file_reader_.get_storage_type() == OB_STORAGE_FILE) {
datum.set_string(csv_iter_->state_.file_with_url_.ptr(), csv_iter_->state_.file_with_url_.length());
} else {
datum.set_string(csv_iter_->state_.cur_file_name_.ptr(), csv_iter_->state_.cur_file_name_.length());
@ -865,11 +1080,10 @@ int ObCSVTableRowIterator::get_next_row()
nrows = MIN(1, state_.line_count_limit_);
if (OB_UNLIKELY(0 == nrows)) {
// if line_count_limit = 0, get next file.
state_.is_end_file_ = true;
} else {
ret = parser_.scan<decltype(handle_one_line), true>(state_.pos_, state_.data_end_, nrows,
state_.escape_buf_, state_.escape_buf_end_,
handle_one_line, error_msgs, state_.is_end_file_);
handle_one_line, error_msgs, file_reader_.eof());
if (OB_FAIL(ret)) {
LOG_WARN("fail to scan csv", K(ret));
} else if (OB_UNLIKELY(error_msgs.count() > 0)) {
@ -950,7 +1164,7 @@ int ObCSVTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
for (int i = 0; OB_SUCC(ret) && i < file_column_exprs_.count(); ++i) {
ObDatum *datums = file_column_exprs_.at(i)->locate_batch_datums(eval_ctx_);
if (file_column_exprs_.at(i)->type_ == T_PSEUDO_EXTERNAL_FILE_URL) {
if (csv_iter_->data_access_driver_.get_storage_type() == OB_STORAGE_FILE) {
if (csv_iter_->file_reader_.get_storage_type() == OB_STORAGE_FILE) {
datums[returned_row_cnt_].set_string(csv_iter_->state_.file_with_url_.ptr(), csv_iter_->state_.file_with_url_.length());
} else {
datums[returned_row_cnt_].set_string(csv_iter_->state_.cur_file_name_.ptr(), csv_iter_->state_.cur_file_name_.length());
@ -995,11 +1209,10 @@ int ObCSVTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
nrows = MIN(batch_size, state_.line_count_limit_);
if (OB_UNLIKELY(0 == nrows)) {
// if line_count_limit = 0, get next file.
state_.is_end_file_ = true;
} else {
ret = parser_.scan<decltype(handle_one_line), true>(state_.pos_, state_.data_end_, nrows,
state_.escape_buf_, state_.escape_buf_end_, handle_one_line,
error_msgs, state_.is_end_file_);
error_msgs, file_reader_.eof());
if (OB_FAIL(ret)) {
LOG_WARN("fail to scan csv", K(ret));
} else if (OB_UNLIKELY(error_msgs.count() > 0)) {

View File

@ -30,6 +30,7 @@ namespace common
namespace sql {
class ObExprRegexpSessionVariables;
class ObDecompressor;
class ObExternalDataAccessDriver
{
@ -62,6 +63,63 @@ private:
ObIOFd fd_;
};
class ObExternalStreamFileReader final
{
public:
// ObExternalStreamFileReader();
~ObExternalStreamFileReader();
void reset();
int init(const common::ObString &location,
const ObString &access_info,
ObLoadCompressionFormat compression_format,
ObIAllocator &allocator);
int open(const ObString &filename);
void close();
/**
* read data into buffer. decompress source data if need
*/
int read(char *buf, int64_t buf_len, int64_t &read_size);
bool eof();
common::ObStorageType get_storage_type() { return data_access_driver_.get_storage_type(); }
ObExternalDataAccessDriver &get_data_access_driver() { return data_access_driver_; }
private:
int read_from_driver(char *buf, int64_t buf_len, int64_t &read_size);
int read_decompress(char *buf, int64_t buf_len, int64_t &read_size);
int read_compressed_data(); // read data from driver into compressed buffer
/**
* create the decompressor if need
*
* It's no need to create new decompressor if the compression_format is the seem as decompressor's.
*/
int create_decompressor(ObLoadCompressionFormat compression_format);
private:
ObExternalDataAccessDriver data_access_driver_;
bool is_file_end_ = true;
int64_t file_offset_ = 0;
int64_t file_size_ = 0;
ObDecompressor *decompressor_ = nullptr;
char * compressed_data_ = nullptr; /// compressed data buffer
int64_t compress_data_size_ = 0; /// the valid data size in compressed data buffer
int64_t consumed_data_size_ = 0; /// handled buffer size in the compressed data buffer
int64_t uncompressed_size_ = 0; /// decompressed size from compressed data
ObIAllocator *allocator_ = nullptr;
/// the compression format specified in `create external table` statement
ObLoadCompressionFormat compression_format_ = ObLoadCompressionFormat::NONE;
static const char * MEMORY_LABEL;
static const int64_t COMPRESSED_DATA_BUFFER_SIZE;
};
class ObExternalTableRowIterator : public common::ObNewRowIterator {
public:
@ -109,7 +167,7 @@ public:
StateValues() :
buf_(nullptr), buf_len_(OB_MALLOC_NORMAL_BLOCK_SIZE),
pos_(nullptr), data_end_(nullptr), escape_buf_(nullptr), escape_buf_end_(nullptr),
is_end_file_(true), file_idx_(0), file_offset_(0), file_size_(0), skip_lines_(0),
file_idx_(0), skip_lines_(0),
cur_file_id_(MIN_EXTERNAL_TABLE_FILE_ID), cur_line_number_(MIN_EXTERNAL_TABLE_LINE_NUMBER),
line_count_limit_(INT64_MAX), part_id_(0), part_list_val_(), ip_port_buf_(NULL), ip_port_len_(0), file_with_url_() {}
char *buf_;
@ -118,10 +176,7 @@ public:
const char *data_end_;
char *escape_buf_;
char *escape_buf_end_;
bool is_end_file_;
int64_t file_idx_;
int64_t file_offset_;
int64_t file_size_;
int64_t skip_lines_;
common::ObString cur_file_name_;
int64_t cur_file_id_;
@ -135,10 +190,7 @@ public:
void reuse() {
pos_ = buf_;
data_end_ = buf_;
is_end_file_ = true;
file_idx_ = 0;
file_offset_ = 0;
file_size_ = 0;
skip_lines_ = 0;
cur_file_name_.reset();
cur_file_id_ = MIN_EXTERNAL_TABLE_FILE_ID;
@ -149,8 +201,8 @@ public:
ip_port_len_ = 0;
file_with_url_.reset();
}
TO_STRING_KV(KP(buf_), K(buf_len_), KP(pos_), KP(data_end_), K(is_end_file_), K(file_idx_),
K(file_offset_), K(file_size_), K(skip_lines_), K(line_count_limit_),
TO_STRING_KV(KP(buf_), K(buf_len_), KP(pos_), KP(data_end_), K(file_idx_),
K(skip_lines_), K(line_count_limit_),
K(cur_file_name_), K(cur_file_id_), K(cur_line_number_), K(line_count_limit_), K_(part_id), K_(ip_port_len), K_(file_with_url));
};
@ -186,7 +238,7 @@ private:
common::ObMalloc malloc_alloc_; //for internal data buffers
common::ObArenaAllocator arena_alloc_;
ObCSVGeneralParser parser_;
ObExternalDataAccessDriver data_access_driver_;
ObExternalStreamFileReader file_reader_;
ObSqlString url_;
ObExpr *file_name_expr_;
};

View File

@ -8233,6 +8233,10 @@ TYPE COMP_EQ STRING_VALUE
{
malloc_non_terminal_node($$, result->malloc_pool_, T_EMPTY_FIELD_AS_NULL, 1, $3);
}
| COMPRESSION COMP_EQ compression_name
{
malloc_non_terminal_node($$, result->malloc_pool_, T_COMPRESSION, 1, $3);
}
;
/*****************************************************************************

View File

@ -37,14 +37,6 @@ enum class ObLoadFileLocation {
OSS,
};
enum class ObLoadCompressionFormat
{
NONE,
GZIP,
DEFLATE,
ZSTD,
};
class ObLoadFileIterator
{
public:

View File

@ -2826,11 +2826,24 @@ int ObDDLResolver::resolve_file_format(const ParseNode *node, ObExternalFileForm
format.csv_format_.empty_field_as_null_ = node->children_[0]->value_;
break;
}
case T_COMPRESSION: {
ObString string_v = ObString(node->children_[0]->str_len_, node->children_[0]->str_value_).trim();
ret = compression_format_from_string(string_v, format.compression_format_);
break;
}
default: {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid file format option", K(ret), K(node->type_));
}
}
if (OB_SUCC(ret)
&& format.format_type_ == ObExternalFileFormat::PARQUET_FORMAT
&& format.compression_format_ != ObLoadCompressionFormat::NONE) {
LOG_WARN("parquet file doesn't support compression", K(format.compression_format_));
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "parquet file with compression");
}
}
return ret;
}