[CP] move compression format into csv format
This commit is contained in:
parent
1e4b1b8de4
commit
b4e818c3e7
@ -5731,7 +5731,7 @@ int ObSchemaPrinter::print_external_table_file_info(const ObTableSchema &table_s
|
||||
if (OB_SUCC(ret) && ObExternalFileFormat::CSV_FORMAT == format.format_type_) {
|
||||
const ObCSVGeneralFormat &csv = format.csv_format_;
|
||||
const ObOriginFileFormat &origin_format = format.origin_file_format_str_;
|
||||
const char *compression_name = compression_format_to_string(format.compression_format_);
|
||||
const char *compression_name = compression_algorithm_to_string(csv.compression_algorithm_);
|
||||
if (OB_FAIL(0 != csv.line_term_str_.case_compare(ObDataInFileStruct::DEFAULT_LINE_TERM_STR) &&
|
||||
databuff_printf(buf, buf_len, pos, "\n LINE_DELIMITER = %.*s,", origin_format.origin_line_term_str_.length(), origin_format.origin_line_term_str_.ptr()))) {
|
||||
SHARE_SCHEMA_LOG(WARN, "fail to print LINE_DELIMITER", K(ret));
|
||||
@ -5761,7 +5761,7 @@ int ObSchemaPrinter::print_external_table_file_info(const ObTableSchema &table_s
|
||||
} else if (OB_FAIL(0 != csv.null_if_.count() &&
|
||||
databuff_printf(buf, buf_len, pos, "\n NULL_IF = (%.*s),", origin_format.origin_null_if_str_.length(), origin_format.origin_null_if_str_.ptr()))) {
|
||||
SHARE_SCHEMA_LOG(WARN, "fail to print NULL_IF", K(ret));
|
||||
} else if (ObLoadCompressionFormat::NONE != format.compression_format_ &&
|
||||
} else if (ObCSVGeneralFormat::ObCSVCompression::NONE != csv.compression_algorithm_ &&
|
||||
OB_FAIL(databuff_printf(buf, buf_len, pos, "\n COMPRESSION = %.*s,",
|
||||
static_cast<int>(STRLEN(compression_name)), compression_name))) {
|
||||
SHARE_SCHEMA_LOG(WARN, "fail to print compression", K(ret));
|
||||
|
@ -43,7 +43,7 @@ using namespace omt;
|
||||
*/
|
||||
|
||||
ObLoadDataDirectImpl::DataAccessParam::DataAccessParam()
|
||||
: file_column_num_(0), file_cs_type_(CS_TYPE_INVALID), compression_format_(ObLoadCompressionFormat::NONE)
|
||||
: file_column_num_(0), file_cs_type_(CS_TYPE_INVALID), compression_format_(ObCSVGeneralFormat::ObCSVCompression::NONE)
|
||||
{
|
||||
}
|
||||
|
||||
@ -820,7 +820,7 @@ int ObLoadDataDirectImpl::SimpleDataSplitUtils::split(const DataAccessParam &dat
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected data format", KR(ret), K(data_access_param));
|
||||
} else if (1 == count || (ObLoadFileLocation::CLIENT_DISK == data_access_param.file_location_)
|
||||
|| data_access_param.compression_format_ != ObLoadCompressionFormat::NONE) {
|
||||
|| data_access_param.compression_format_ != ObCSVGeneralFormat::ObCSVCompression::NONE) {
|
||||
if (OB_FAIL(data_desc_iter.add_data_desc(data_desc))) {
|
||||
LOG_WARN("fail to push back", KR(ret));
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ private:
|
||||
int64_t file_column_num_; // number of column in file
|
||||
ObDataInFileStruct file_format_;
|
||||
common::ObCollationType file_cs_type_;
|
||||
ObLoadCompressionFormat compression_format_;
|
||||
ObCSVGeneralFormat::ObCSVCompression compression_format_;
|
||||
};
|
||||
|
||||
struct LoadExecuteParam
|
||||
|
@ -37,21 +37,23 @@ const ObLabel MEMORY_LABEL = ObLabel("LoadDataReader");
|
||||
*/
|
||||
|
||||
ObFileReadParam::ObFileReadParam()
|
||||
: compression_format_(ObLoadCompressionFormat::NONE),
|
||||
: compression_format_(ObCSVGeneralFormat::ObCSVCompression::NONE),
|
||||
packet_handle_(NULL),
|
||||
session_(NULL),
|
||||
timeout_ts_(-1)
|
||||
{
|
||||
}
|
||||
|
||||
int ObFileReadParam::parse_compression_format(ObString compression_name, ObString filename, ObLoadCompressionFormat &compression_format)
|
||||
int ObFileReadParam::parse_compression_format(ObString compression_name,
|
||||
ObString filename,
|
||||
ObCSVGeneralFormat::ObCSVCompression &compression_format)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (compression_name.length() == 0) {
|
||||
compression_format = ObLoadCompressionFormat::NONE;
|
||||
} else if (OB_FAIL(compression_format_from_string(compression_name, compression_format))) {
|
||||
} else if (ObLoadCompressionFormat::AUTO == compression_format) {
|
||||
ret = compression_format_from_suffix(filename, compression_format);
|
||||
compression_format = ObCSVGeneralFormat::ObCSVCompression::NONE;
|
||||
} else if (OB_FAIL(compression_algorithm_from_string(compression_name, compression_format))) {
|
||||
} else if (ObCSVGeneralFormat::ObCSVCompression::AUTO == compression_format) {
|
||||
ret = compression_algorithm_from_suffix(filename, compression_format);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -132,7 +134,7 @@ int ObFileReader::open_decompress_reader(const ObFileReadParam ¶m,
|
||||
ObFileReader *&file_reader)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (param.compression_format_ == ObLoadCompressionFormat::NONE) {
|
||||
if (param.compression_format_ == ObCSVGeneralFormat::ObCSVCompression::NONE) {
|
||||
file_reader = source_reader;
|
||||
} else {
|
||||
ObDecompressFileReader *tmp_reader = OB_NEW(ObDecompressFileReader, MEMORY_ATTR, allocator);
|
||||
@ -546,23 +548,25 @@ ObDecompressor::~ObDecompressor()
|
||||
{
|
||||
}
|
||||
|
||||
int ObDecompressor::create(ObLoadCompressionFormat format, ObIAllocator &allocator, ObDecompressor *&decompressor)
|
||||
int ObDecompressor::create(ObCSVGeneralFormat::ObCSVCompression format,
|
||||
ObIAllocator &allocator,
|
||||
ObDecompressor *&decompressor)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
decompressor = nullptr;
|
||||
|
||||
switch (format) {
|
||||
case ObLoadCompressionFormat::NONE: {
|
||||
case ObCSVGeneralFormat::ObCSVCompression::NONE: {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
} break;
|
||||
|
||||
case ObLoadCompressionFormat::GZIP:
|
||||
case ObLoadCompressionFormat::DEFLATE: {
|
||||
case ObCSVGeneralFormat::ObCSVCompression::GZIP:
|
||||
case ObCSVGeneralFormat::ObCSVCompression::DEFLATE: {
|
||||
decompressor = OB_NEW(ObZlibDecompressor, MEMORY_ATTR, allocator, format);
|
||||
} break;
|
||||
|
||||
case ObLoadCompressionFormat::ZSTD: {
|
||||
case ObCSVGeneralFormat::ObCSVCompression::ZSTD: {
|
||||
decompressor = OB_NEW(ObZstdDecompressor, MEMORY_ATTR, allocator);
|
||||
} break;
|
||||
|
||||
@ -620,7 +624,7 @@ int ObDecompressFileReader::open(const ObFileReadParam ¶m, ObFileReader *sou
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (param.compression_format_ == ObLoadCompressionFormat::NONE) {
|
||||
if (param.compression_format_ == ObCSVGeneralFormat::ObCSVCompression::NONE) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
} else if (OB_FAIL(ObDecompressor::create(param.compression_format_, allocator_, decompressor_))) {
|
||||
LOG_WARN("failed to create decompressor", K(param.compression_format_), K(ret));
|
||||
@ -725,7 +729,8 @@ void zlib_free(voidpf opaque, voidpf address)
|
||||
}
|
||||
}
|
||||
|
||||
ObZlibDecompressor::ObZlibDecompressor(ObIAllocator &allocator, ObLoadCompressionFormat compression_format)
|
||||
ObZlibDecompressor::ObZlibDecompressor(ObIAllocator &allocator,
|
||||
ObCSVGeneralFormat::ObCSVCompression compression_format)
|
||||
: ObDecompressor(allocator), compression_format_(compression_format)
|
||||
{}
|
||||
|
||||
|
@ -38,14 +38,16 @@ public:
|
||||
public:
|
||||
ObLoadFileLocation file_location_;
|
||||
ObString filename_;
|
||||
ObLoadCompressionFormat compression_format_;
|
||||
ObCSVGeneralFormat::ObCSVCompression compression_format_;
|
||||
share::ObBackupStorageInfo access_info_;
|
||||
observer::ObIMPPacketSender *packet_handle_;
|
||||
ObSQLSessionInfo *session_;
|
||||
int64_t timeout_ts_; // A job always has a deadline and file reading may cost a long time
|
||||
|
||||
public:
|
||||
static int parse_compression_format(ObString compression_name, ObString filename, ObLoadCompressionFormat &compression_format);
|
||||
static int parse_compression_format(ObString compression_name,
|
||||
ObString filename,
|
||||
ObCSVGeneralFormat::ObCSVCompression &compression_format);
|
||||
};
|
||||
|
||||
class ObFileReader
|
||||
@ -226,9 +228,11 @@ public:
|
||||
virtual int decompress(const char *src, int64_t src_size, int64_t &consumed_size,
|
||||
char *dest, int64_t dest_capacity, int64_t &decompressed_size) = 0;
|
||||
|
||||
virtual ObLoadCompressionFormat compression_format() const = 0;
|
||||
virtual ObCSVGeneralFormat::ObCSVCompression compression_format() const = 0;
|
||||
|
||||
static int create(ObLoadCompressionFormat format, ObIAllocator &allocator, ObDecompressor *&decompressor);
|
||||
static int create(ObCSVGeneralFormat::ObCSVCompression format,
|
||||
ObIAllocator &allocator,
|
||||
ObDecompressor *&decompressor);
|
||||
static void destroy(ObDecompressor *decompressor);
|
||||
|
||||
protected:
|
||||
@ -275,7 +279,8 @@ protected:
|
||||
class ObZlibDecompressor : public ObDecompressor
|
||||
{
|
||||
public:
|
||||
explicit ObZlibDecompressor(ObIAllocator &allocator, ObLoadCompressionFormat compression_format);
|
||||
explicit ObZlibDecompressor(ObIAllocator &allocator,
|
||||
ObCSVGeneralFormat::ObCSVCompression compression_format);
|
||||
virtual ~ObZlibDecompressor();
|
||||
|
||||
int init() override;
|
||||
@ -284,13 +289,13 @@ public:
|
||||
int decompress(const char *src, int64_t src_size, int64_t &consumed_size,
|
||||
char *dest, int64_t dest_capacity, int64_t &decompressed_size) override;
|
||||
|
||||
ObLoadCompressionFormat compression_format() const override { return compression_format_; }
|
||||
ObCSVGeneralFormat::ObCSVCompression compression_format() const override { return compression_format_; }
|
||||
|
||||
private:
|
||||
void *zlib_stream_ptr_ = nullptr;
|
||||
bool zstream_need_reset_ = false; // the zstreamptr should be reset if we got Z_STREAM_END
|
||||
|
||||
ObLoadCompressionFormat compression_format_;
|
||||
ObCSVGeneralFormat::ObCSVCompression compression_format_;
|
||||
};
|
||||
|
||||
/**
|
||||
@ -308,7 +313,10 @@ public:
|
||||
int decompress(const char *src, int64_t src_size, int64_t &consumed_size,
|
||||
char *dest, int64_t dest_capacity, int64_t &decompressed_size) override;
|
||||
|
||||
ObLoadCompressionFormat compression_format() const override { return ObLoadCompressionFormat::ZSTD; }
|
||||
ObCSVGeneralFormat::ObCSVCompression compression_format() const override
|
||||
{
|
||||
return ObCSVGeneralFormat::ObCSVCompression::ZSTD;
|
||||
}
|
||||
private:
|
||||
void *zstd_stream_context_ = nullptr;
|
||||
};
|
||||
|
@ -461,6 +461,8 @@ int64_t ObCSVGeneralFormat::to_json_kv_string(char *buf, const int64_t buf_len)
|
||||
J_ARRAY_END();
|
||||
J_COMMA();
|
||||
databuff_printf(buf, buf_len, pos, "\"%s\":%s", OPTION_NAMES[idx++], STR_BOOL(empty_field_as_null_));
|
||||
J_COMMA();
|
||||
databuff_printf(buf, buf_len, pos, "\"%s\":\"%s\"", OPTION_NAMES[idx++], compression_algorithm_to_string(compression_algorithm_));
|
||||
return pos;
|
||||
}
|
||||
|
||||
@ -554,6 +556,14 @@ int ObCSVGeneralFormat::load_from_json_data(json::Pair *&node, ObIAllocator &all
|
||||
}
|
||||
node = node->get_next();
|
||||
}
|
||||
if (OB_NOT_NULL(node) && 0 == node->name_.case_compare(OPTION_NAMES[idx++])
|
||||
&& json::JT_STRING == node->value_->get_type()) {
|
||||
if (OB_FAIL(compression_algorithm_from_string(node->value_->get_string(), compression_algorithm_))) {
|
||||
LOG_WARN("failed to convert string to compression", K(ret));
|
||||
} else {
|
||||
node = node->get_next();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -631,51 +641,53 @@ int ObOriginFileFormat::load_from_json_data(json::Pair *&node, ObIAllocator &all
|
||||
return ret;
|
||||
}
|
||||
|
||||
const char *compression_format_to_string(ObLoadCompressionFormat compression_format)
|
||||
const char *compression_algorithm_to_string(ObCSVGeneralFormat::ObCSVCompression compression_algorithm)
|
||||
{
|
||||
switch (compression_format) {
|
||||
case ObLoadCompressionFormat::NONE: return "NONE";
|
||||
case ObLoadCompressionFormat::AUTO: return "AUTO";
|
||||
case ObLoadCompressionFormat::GZIP: return "GZIP";
|
||||
case ObLoadCompressionFormat::DEFLATE: return "DEFLATE";
|
||||
case ObLoadCompressionFormat::ZSTD: return "ZSTD";
|
||||
switch (compression_algorithm) {
|
||||
case ObCSVGeneralFormat::ObCSVCompression::NONE: return "NONE";
|
||||
case ObCSVGeneralFormat::ObCSVCompression::AUTO: return "AUTO";
|
||||
case ObCSVGeneralFormat::ObCSVCompression::GZIP: return "GZIP";
|
||||
case ObCSVGeneralFormat::ObCSVCompression::DEFLATE: return "DEFLATE";
|
||||
case ObCSVGeneralFormat::ObCSVCompression::ZSTD: return "ZSTD";
|
||||
default: return "INVALID";
|
||||
}
|
||||
}
|
||||
|
||||
int compression_format_from_string(ObString compression_name, ObLoadCompressionFormat &compression_format)
|
||||
int compression_algorithm_from_string(ObString compression_name,
|
||||
ObCSVGeneralFormat::ObCSVCompression &compression_algorithm)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (compression_name.length() == 0 ||
|
||||
0 == compression_name.case_compare("none")) {
|
||||
compression_format = ObLoadCompressionFormat::NONE;
|
||||
compression_algorithm = ObCSVGeneralFormat::ObCSVCompression::NONE;
|
||||
} else if (0 == compression_name.case_compare("gzip")) {
|
||||
compression_format = ObLoadCompressionFormat::GZIP;
|
||||
compression_algorithm = ObCSVGeneralFormat::ObCSVCompression::GZIP;
|
||||
} else if (0 == compression_name.case_compare("deflate")) {
|
||||
compression_format = ObLoadCompressionFormat::DEFLATE;
|
||||
compression_algorithm = ObCSVGeneralFormat::ObCSVCompression::DEFLATE;
|
||||
} else if (0 == compression_name.case_compare("zstd")) {
|
||||
compression_format = ObLoadCompressionFormat::ZSTD;
|
||||
compression_algorithm = ObCSVGeneralFormat::ObCSVCompression::ZSTD;
|
||||
} else if (0 == compression_name.case_compare("auto")) {
|
||||
compression_format = ObLoadCompressionFormat::AUTO;
|
||||
compression_algorithm = ObCSVGeneralFormat::ObCSVCompression::AUTO;
|
||||
} else {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
compression_format = ObLoadCompressionFormat::INVALID;
|
||||
compression_algorithm = ObCSVGeneralFormat::ObCSVCompression::INVALID;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int compression_format_from_suffix(ObString filename, ObLoadCompressionFormat &compression_format)
|
||||
int compression_algorithm_from_suffix(ObString filename,
|
||||
ObCSVGeneralFormat::ObCSVCompression &compression_algorithm)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (filename.suffix_match_ci(".gz")) {
|
||||
compression_format = ObLoadCompressionFormat::GZIP;
|
||||
compression_algorithm = ObCSVGeneralFormat::ObCSVCompression::GZIP;
|
||||
} else if (filename.suffix_match_ci(".deflate")) {
|
||||
compression_format = ObLoadCompressionFormat::DEFLATE;
|
||||
compression_algorithm = ObCSVGeneralFormat::ObCSVCompression::DEFLATE;
|
||||
} else if (filename.suffix_match_ci(".zst") || filename.suffix_match_ci(".zstd")) {
|
||||
compression_format = ObLoadCompressionFormat::ZSTD;
|
||||
compression_algorithm = ObCSVGeneralFormat::ObCSVCompression::ZSTD;
|
||||
} else {
|
||||
compression_format = ObLoadCompressionFormat::NONE;
|
||||
compression_algorithm = ObCSVGeneralFormat::ObCSVCompression::NONE;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -700,11 +712,6 @@ int64_t ObExternalFileFormat::to_string(char *buf, const int64_t buf_len) const
|
||||
pos += 0;
|
||||
}
|
||||
|
||||
if (compression_format_ != ObLoadCompressionFormat::NONE) {
|
||||
J_COMMA();
|
||||
databuff_print_kv(buf, buf_len, pos, "\"COMPRESSION\"", compression_format_to_string(compression_format_));
|
||||
}
|
||||
|
||||
J_OBJ_END();
|
||||
return pos;
|
||||
}
|
||||
@ -755,13 +762,6 @@ int ObExternalFileFormat::load_from_string(const ObString &str, ObIAllocator &al
|
||||
LOG_WARN("invalid format type", K(ret), K(format_type_str));
|
||||
break;
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(format_type_node)
|
||||
&& 0 == format_type_node->name_.case_compare("COMPRESSION")
|
||||
&& format_type_node->value_->get_type() == json::JT_STRING) {
|
||||
ObString compression_format_str = format_type_node->value_->get_string();
|
||||
OZ(compression_format_from_string(compression_format_str, compression_format_));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -93,7 +93,8 @@ struct ObCSVGeneralFormat {
|
||||
trim_space_(false),
|
||||
null_if_(),
|
||||
empty_field_as_null_(false),
|
||||
file_column_nums_(0)
|
||||
file_column_nums_(0),
|
||||
compression_algorithm_(ObCSVCompression::NONE)
|
||||
{}
|
||||
static constexpr const char *OPTION_NAMES[] = {
|
||||
"LINE_DELIMITER",
|
||||
@ -106,6 +107,16 @@ struct ObCSVGeneralFormat {
|
||||
"TRIM_SPACE",
|
||||
"NULL_IF_EXETERNAL",
|
||||
"EMPTY_FIELD_AS_NULL",
|
||||
"COMPRESSION"
|
||||
};
|
||||
enum ObCSVCompression
|
||||
{
|
||||
INVALID = -1,
|
||||
NONE = 0,
|
||||
AUTO = 1,
|
||||
GZIP = 2,
|
||||
DEFLATE = 3,
|
||||
ZSTD = 4,
|
||||
};
|
||||
common::ObString line_start_str_;
|
||||
common::ObString line_term_str_;
|
||||
@ -121,6 +132,7 @@ struct ObCSVGeneralFormat {
|
||||
bool empty_field_as_null_;
|
||||
|
||||
int64_t file_column_nums_;
|
||||
ObCSVCompression compression_algorithm_;
|
||||
|
||||
int init_format(const ObDataInFileStruct &format,
|
||||
int64_t file_column_nums,
|
||||
@ -129,7 +141,7 @@ struct ObCSVGeneralFormat {
|
||||
int load_from_json_data(json::Pair *&node, common::ObIAllocator &allocator);
|
||||
|
||||
TO_STRING_KV(K(cs_type_), K(file_column_nums_), K(line_start_str_), K(field_enclosed_char_),
|
||||
K(field_escaped_char_), K(field_term_str_), K(line_term_str_));
|
||||
K(field_escaped_char_), K(field_term_str_), K(line_term_str_), K(compression_algorithm_));
|
||||
OB_UNIS_VERSION(1);
|
||||
};
|
||||
|
||||
@ -572,25 +584,17 @@ struct ObOriginFileFormat
|
||||
common::ObString origin_null_if_str_;
|
||||
};
|
||||
|
||||
enum class ObLoadCompressionFormat
|
||||
{
|
||||
INVALID,
|
||||
NONE,
|
||||
AUTO,
|
||||
GZIP,
|
||||
DEFLATE,
|
||||
ZSTD,
|
||||
};
|
||||
|
||||
const char *compression_format_to_string(ObLoadCompressionFormat compression_format);
|
||||
int compression_format_from_string(ObString compression_name, ObLoadCompressionFormat &compression_format);
|
||||
const char *compression_algorithm_to_string(ObCSVGeneralFormat::ObCSVCompression compression_algorithm);
|
||||
int compression_algorithm_from_string(ObString compression_name,
|
||||
ObCSVGeneralFormat::ObCSVCompression &compression_algorithm);
|
||||
|
||||
/**
|
||||
* guess compression format from filename suffix
|
||||
*
|
||||
* Return NONE if none of the known compression format matches.
|
||||
*/
|
||||
int compression_format_from_suffix(ObString filename, ObLoadCompressionFormat &compression_format);
|
||||
int compression_algorithm_from_suffix(ObString filename,
|
||||
ObCSVGeneralFormat::ObCSVCompression &compression_algorithm);
|
||||
|
||||
struct ObExternalFileFormat
|
||||
{
|
||||
@ -617,7 +621,7 @@ struct ObExternalFileFormat
|
||||
OPT_BINARY_AS_TEXT = 1 << 1,
|
||||
};
|
||||
|
||||
ObExternalFileFormat() : format_type_(INVALID_FORMAT), compression_format_(ObLoadCompressionFormat::NONE) {}
|
||||
ObExternalFileFormat() : format_type_(INVALID_FORMAT) {}
|
||||
|
||||
int64_t to_string(char* buf, const int64_t buf_len) const;
|
||||
int load_from_string(const common::ObString &str, common::ObIAllocator &allocator);
|
||||
@ -627,7 +631,6 @@ struct ObExternalFileFormat
|
||||
FormatType format_type_;
|
||||
sql::ObCSVGeneralFormat csv_format_;
|
||||
sql::ObODPSGeneralFormat odps_format_;
|
||||
ObLoadCompressionFormat compression_format_;
|
||||
uint64_t options_;
|
||||
static const char *FORMAT_TYPE_STR[];
|
||||
};
|
||||
|
@ -352,7 +352,7 @@ const int64_t ObExternalStreamFileReader::COMPRESSED_DATA_BUFFER_SIZE = 2 * 1024
|
||||
|
||||
int ObExternalStreamFileReader::init(const common::ObString &location,
|
||||
const ObString &access_info,
|
||||
ObLoadCompressionFormat compression_format,
|
||||
ObCSVGeneralFormat::ObCSVCompression compression_format,
|
||||
ObIAllocator &allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -381,9 +381,9 @@ int ObExternalStreamFileReader::open(const ObString &filename)
|
||||
} else {
|
||||
is_file_end_ = false;
|
||||
|
||||
ObLoadCompressionFormat this_file_compression_format = compression_format_;
|
||||
if (this_file_compression_format == ObLoadCompressionFormat::AUTO
|
||||
&& OB_FAIL(compression_format_from_suffix(filename, this_file_compression_format))) {
|
||||
ObCSVGeneralFormat::ObCSVCompression this_file_compression_format = compression_format_;
|
||||
if (this_file_compression_format == ObCSVGeneralFormat::ObCSVCompression::AUTO
|
||||
&& OB_FAIL(compression_algorithm_from_suffix(filename, this_file_compression_format))) {
|
||||
LOG_WARN("failed to dectect compression format from filename", K(ret), K(filename));
|
||||
}
|
||||
|
||||
@ -528,12 +528,12 @@ int ObExternalStreamFileReader::read_compressed_data()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObExternalStreamFileReader::create_decompressor(ObLoadCompressionFormat compression_format)
|
||||
int ObExternalStreamFileReader::create_decompressor(ObCSVGeneralFormat::ObCSVCompression compression_format)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(allocator_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (compression_format == ObLoadCompressionFormat::NONE) {
|
||||
} else if (compression_format == ObCSVGeneralFormat::ObCSVCompression::NONE) {
|
||||
ObDecompressor::destroy(decompressor_);
|
||||
decompressor_ = nullptr;
|
||||
} else if (OB_NOT_NULL(decompressor_) && decompressor_->compression_format() == compression_format) {
|
||||
@ -802,7 +802,7 @@ int ObCSVTableRowIterator::init(const storage::ObTableScanParam *scan_param)
|
||||
OZ (ObExternalTableRowIterator::init(scan_param));
|
||||
OZ (parser_.init(scan_param->external_file_format_.csv_format_));
|
||||
OZ (file_reader_.init(scan_param_->external_file_location_, scan_param->external_file_access_info_,
|
||||
scan_param_->external_file_format_.compression_format_, malloc_alloc_));
|
||||
scan_param_->external_file_format_.csv_format_.compression_algorithm_, malloc_alloc_));
|
||||
OZ (expand_buf());
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
|
@ -70,7 +70,7 @@ public:
|
||||
|
||||
int init(const common::ObString &location,
|
||||
const ObString &access_info,
|
||||
ObLoadCompressionFormat compression_format,
|
||||
ObCSVGeneralFormat::ObCSVCompression compression_format,
|
||||
ObIAllocator &allocator);
|
||||
|
||||
int open(const ObString &filename);
|
||||
@ -96,7 +96,7 @@ private:
|
||||
*
|
||||
* It's no need to create new decompressor if the compression_format is the seem as decompressor's.
|
||||
*/
|
||||
int create_decompressor(ObLoadCompressionFormat compression_format);
|
||||
int create_decompressor(ObCSVGeneralFormat::ObCSVCompression compression_format);
|
||||
|
||||
private:
|
||||
ObExternalDataAccessDriver data_access_driver_;
|
||||
@ -113,7 +113,7 @@ private:
|
||||
ObIAllocator *allocator_ = nullptr;
|
||||
|
||||
/// the compression format specified in `create external table` statement
|
||||
ObLoadCompressionFormat compression_format_ = ObLoadCompressionFormat::NONE;
|
||||
ObCSVGeneralFormat::ObCSVCompression compression_format_ = ObCSVGeneralFormat::ObCSVCompression::NONE;
|
||||
|
||||
static const char * MEMORY_LABEL;
|
||||
static const int64_t COMPRESSED_DATA_BUFFER_SIZE;
|
||||
|
@ -66,7 +66,7 @@ struct ObLoadArgument
|
||||
table_id_(OB_INVALID_INDEX_INT64),
|
||||
is_csv_format_(false),
|
||||
part_level_(share::schema::PARTITION_LEVEL_MAX),
|
||||
compression_format_(ObLoadCompressionFormat::NONE)
|
||||
compression_format_(ObCSVGeneralFormat::ObCSVCompression::NONE)
|
||||
|
||||
{}
|
||||
|
||||
@ -123,7 +123,7 @@ struct ObLoadArgument
|
||||
bool is_csv_format_;
|
||||
share::schema::ObPartitionLevel part_level_;
|
||||
ObLoadFileIterator file_iter_;
|
||||
ObLoadCompressionFormat compression_format_;
|
||||
ObCSVGeneralFormat::ObCSVCompression compression_format_;
|
||||
};
|
||||
|
||||
struct ObDataInFileStruct
|
||||
|
@ -3237,7 +3237,7 @@ int ObDDLResolver::resolve_file_format(const ParseNode *node, ObExternalFileForm
|
||||
}
|
||||
case T_COMPRESSION: {
|
||||
ObString string_v = ObString(node->children_[0]->str_len_, node->children_[0]->str_value_).trim();
|
||||
ret = compression_format_from_string(string_v, format.compression_format_);
|
||||
ret = compression_algorithm_from_string(string_v, format.csv_format_.compression_algorithm_);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
@ -3245,14 +3245,6 @@ int ObDDLResolver::resolve_file_format(const ParseNode *node, ObExternalFileForm
|
||||
LOG_WARN("invalid file format option", K(ret), K(node->type_));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)
|
||||
&& format.format_type_ == ObExternalFileFormat::PARQUET_FORMAT
|
||||
&& format.compression_format_ != ObLoadCompressionFormat::NONE) {
|
||||
LOG_WARN("parquet file doesn't support compression", K(format.compression_format_));
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_USER_ERROR(OB_NOT_SUPPORTED, "parquet file with compression");
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user