bugfix: ignore conversion failure and create an empty file when there is no output data
This commit is contained in:
@ -114,31 +114,8 @@ int ObSelectIntoOp::inner_open()
|
||||
file_location_ = path.prefix_match_ci(OB_OSS_PREFIX)
|
||||
? IntoFileLocation::REMOTE_OSS
|
||||
: IntoFileLocation::SERVER_DISK;
|
||||
if (!MY_SPEC.is_single_) {
|
||||
input_file_name = file_location_ == IntoFileLocation::REMOTE_OSS
|
||||
? path.split_on('?').trim()
|
||||
: path;
|
||||
if (input_file_name.length() == 0 || path.length() == 0 || OB_ISNULL(input)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected path or input is null", K(ret));
|
||||
} else {
|
||||
if (input_file_name.ptr()[input_file_name.length() - 1] == '/'){
|
||||
file_name_with_suffix.append_fmt("%sdata", to_cstring(input_file_name));
|
||||
} else {
|
||||
file_name_with_suffix.append_fmt("%s", to_cstring(input_file_name));
|
||||
}
|
||||
if (MY_SPEC.parallel_ > 1) {
|
||||
file_name_with_suffix.append_fmt("_%ld_%ld_%ld", input->sqc_id_, input->task_id_, split_file_id_);
|
||||
} else {
|
||||
file_name_with_suffix.append_fmt("_%ld", split_file_id_);
|
||||
}
|
||||
if (file_location_ == IntoFileLocation::REMOTE_OSS) {
|
||||
file_name_with_suffix.append_fmt("?%s", to_cstring(path));
|
||||
}
|
||||
path = file_name_with_suffix.string();
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
if (T_INTO_OUTFILE == into_type && !MY_SPEC.is_single_ && OB_FAIL(calc_first_file_path(path))) {
|
||||
LOG_WARN("failed to calc first file path", K(ret));
|
||||
} else if (file_location_ == IntoFileLocation::REMOTE_OSS) {
|
||||
ObString temp_url = path.split_on('?');
|
||||
temp_url.trim();
|
||||
@ -210,6 +187,8 @@ int ObSelectIntoOp::inner_get_next_row()
|
||||
if (OB_ISNULL(phy_plan_ctx = ctx_.get_physical_plan_ctx())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get phy_plan_ctx failed", K(ret));
|
||||
} else if (T_INTO_OUTFILE == into_type && MY_SPEC.is_single_ && OB_FAIL(open_file())) {
|
||||
LOG_WARN("failed to open file", K(ret));
|
||||
}
|
||||
while (OB_SUCC(ret) && row_count < top_limit_cnt_) {
|
||||
clear_evaluated_flag();
|
||||
@ -258,6 +237,8 @@ int ObSelectIntoOp::inner_get_next_batch(const int64_t max_row_cnt)
|
||||
if (OB_ISNULL(phy_plan_ctx = ctx_.get_physical_plan_ctx())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get phy_plan_ctx failed", K(ret));
|
||||
} else if (T_INTO_OUTFILE == into_type && MY_SPEC.is_single_ && OB_FAIL(open_file())) {
|
||||
LOG_WARN("failed to open file", K(ret));
|
||||
}
|
||||
bool stop_loop = false;
|
||||
bool is_iter_end = false;
|
||||
@ -388,7 +369,72 @@ int ObSelectIntoOp::get_row_str(const int64_t buf_len,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSelectIntoOp::open_file(bool delay_create)
|
||||
int ObSelectIntoOp::open_file()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IntoFileLocation::REMOTE_OSS == file_location_) {
|
||||
ObIODOpt opt;
|
||||
ObIODOpts iod_opts;
|
||||
opt.set("AccessType", "appender");
|
||||
iod_opts.opts_ = &opt;
|
||||
iod_opts.opt_cnt_ = 1;
|
||||
bool is_exist = false;
|
||||
if (OB_FAIL(device_handle_->exist(url_.ptr(), is_exist))) {
|
||||
LOG_WARN("failed to check file exist", K(ret), K(url_));
|
||||
} else if (is_exist) {
|
||||
ret = OB_FILE_ALREADY_EXIST;
|
||||
LOG_WARN("file already exist", K(ret), K(url_));
|
||||
} else if (OB_FAIL(device_handle_->open(url_.ptr(), -1, 0, fd_, &iod_opts))) {
|
||||
LOG_WARN("failed to open file", K(ret));
|
||||
} else {
|
||||
is_file_opened_ = true;
|
||||
}
|
||||
} else if (IntoFileLocation::SERVER_DISK == file_location_) {
|
||||
if (OB_FAIL(file_appender_.create(url_, true))) {
|
||||
LOG_WARN("failed to create file", K(ret), K(url_));
|
||||
} else {
|
||||
is_file_opened_ = true;
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error. invalid file location", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSelectIntoOp::calc_first_file_path(ObString &path)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString file_name_with_suffix;
|
||||
ObSelectIntoOpInput *input = static_cast<ObSelectIntoOpInput*>(input_);
|
||||
ObString input_file_name = file_location_ == IntoFileLocation::REMOTE_OSS
|
||||
? path.split_on('?').trim()
|
||||
: path;
|
||||
if (input_file_name.length() == 0 || path.length() == 0 || OB_ISNULL(input)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected path or input is null", K(ret));
|
||||
} else {
|
||||
if (input_file_name.ptr()[input_file_name.length() - 1] == '/'){
|
||||
file_name_with_suffix.append_fmt("%sdata", to_cstring(input_file_name));
|
||||
} else {
|
||||
file_name_with_suffix.append_fmt("%s", to_cstring(input_file_name));
|
||||
}
|
||||
if (MY_SPEC.parallel_ > 1) {
|
||||
file_name_with_suffix.append_fmt("_%ld_%ld_%ld", input->sqc_id_, input->task_id_, split_file_id_);
|
||||
} else {
|
||||
file_name_with_suffix.append_fmt("_%ld", split_file_id_);
|
||||
}
|
||||
if (file_location_ == IntoFileLocation::REMOTE_OSS) {
|
||||
file_name_with_suffix.append_fmt("?%s", to_cstring(path));
|
||||
}
|
||||
if (OB_FAIL(ob_write_string(ctx_.get_allocator(), file_name_with_suffix.string(), path))) {
|
||||
LOG_WARN("failed to write string", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSelectIntoOp::calc_next_file_path()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString url_with_suffix;
|
||||
@ -414,30 +460,6 @@ int ObSelectIntoOp::open_file(bool delay_create)
|
||||
LOG_WARN("fail to write string", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (IntoFileLocation::REMOTE_OSS == file_location_) {
|
||||
ObIODOpt opt;
|
||||
ObIODOpts iod_opts;
|
||||
opt.set("AccessType", "appender");
|
||||
iod_opts.opts_ = &opt;
|
||||
iod_opts.opt_cnt_ = 1;
|
||||
bool is_exist = false;
|
||||
if (OB_FAIL(device_handle_->exist(url_.ptr(), is_exist))) {
|
||||
LOG_WARN("fail to check file exist", K(ret), K(url_));
|
||||
} else if (is_exist) {
|
||||
ret = OB_FILE_ALREADY_EXIST;
|
||||
LOG_WARN("file already exist", K(ret), K(url_));
|
||||
} else if (!delay_create && OB_FAIL(device_handle_->open(url_.ptr(), -1, 0, fd_, &iod_opts))) {
|
||||
LOG_WARN("fail to open file", K(ret));
|
||||
}
|
||||
} else if (IntoFileLocation::SERVER_DISK == file_location_) {
|
||||
if (!delay_create && OB_FAIL(file_appender_.create(url_, true))) {
|
||||
LOG_WARN("create dumpfile failed", K(ret), K(url_));
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error. invalid file location", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -451,6 +473,7 @@ void ObSelectIntoOp::close_file()
|
||||
fd_.reset();
|
||||
}
|
||||
}
|
||||
is_file_opened_ = false;
|
||||
}
|
||||
|
||||
std::function<int(const char *, int64_t)> ObSelectIntoOp::get_flush_function()
|
||||
@ -458,26 +481,16 @@ std::function<int(const char *, int64_t)> ObSelectIntoOp::get_flush_function()
|
||||
return [this](const char *data, int64_t data_len) -> int
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (file_location_ == IntoFileLocation::SERVER_DISK) {
|
||||
if (!file_appender_.is_opened() && OB_FAIL(file_appender_.create(url_, true))) {
|
||||
LOG_WARN("failed to create file", K(ret), K(url_));
|
||||
} else if (OB_FAIL(file_appender_.append(data, data_len, false))) {
|
||||
if (!is_file_opened_ && OB_FAIL(open_file())) {
|
||||
LOG_WARN("failed to open file", K(ret), K(url_));
|
||||
} else if (file_location_ == IntoFileLocation::SERVER_DISK) {
|
||||
if (OB_FAIL(file_appender_.append(data, data_len, false))) {
|
||||
LOG_WARN("failed to append file", K(ret), K(data_len));
|
||||
}
|
||||
} else {
|
||||
ObIODOpt opt;
|
||||
ObIODOpts iod_opts;
|
||||
opt.set("AccessType", "appender");
|
||||
iod_opts.opts_ = &opt;
|
||||
iod_opts.opt_cnt_ = 1;
|
||||
bool is_exist = false;
|
||||
} else if (file_location_ == IntoFileLocation::REMOTE_OSS) {
|
||||
int64_t write_size = 0;
|
||||
int64_t begin_ts = ObTimeUtility::current_time();
|
||||
if (OB_FAIL(device_handle_->exist(url_.ptr(), is_exist))) {
|
||||
LOG_WARN("failed to check file exist", K(ret));
|
||||
} else if (!is_exist && OB_FAIL(device_handle_->open(url_.ptr(), -1, 0, fd_, &iod_opts))) {
|
||||
LOG_WARN("failed to open file", K(ret));
|
||||
} else if (OB_FAIL(device_handle_->write(fd_, data, data_len, write_size))) {
|
||||
if (OB_FAIL(device_handle_->write(fd_, data, data_len, write_size))) {
|
||||
LOG_WARN("failed to write device", K(ret));
|
||||
} else if (OB_UNLIKELY(write_size != data_len)) {
|
||||
ret = OB_IO_ERROR;
|
||||
@ -492,6 +505,9 @@ std::function<int(const char *, int64_t)> ObSelectIntoOp::get_flush_function()
|
||||
_OB_LOG(TRACE, "write oss stat, time:%ld write_size:%ld speed:%.2Lf MB/s total_write:%.2Lf MB",
|
||||
cost_time, write_size, speed, total_write);
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error. invalid file location", K(ret));
|
||||
}
|
||||
return ret;
|
||||
};
|
||||
@ -501,7 +517,6 @@ int ObSelectIntoOp::split_file()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t dummy_pos = 0;
|
||||
bool delay_create = true;
|
||||
if (OB_FAIL(flush_buf(dummy_pos))) {
|
||||
LOG_WARN("fail to flush buffer", K(ret));
|
||||
} else {
|
||||
@ -529,8 +544,8 @@ int ObSelectIntoOp::split_file()
|
||||
//create new file
|
||||
if (OB_SUCC(ret)) {
|
||||
split_file_id_++;
|
||||
if (OB_FAIL(open_file(delay_create))) {
|
||||
LOG_WARN("fail to open file", K(ret));
|
||||
if (OB_FAIL(calc_next_file_path())) {
|
||||
LOG_WARN("failed to calculate new file path", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -662,7 +677,8 @@ int ObSelectIntoOp::write_obj_to_file(const ObObj &obj, bool need_escape)
|
||||
if (OB_FAIL(ObFastStringScanner::foreach_char(str_to_escape,
|
||||
src_type,
|
||||
escape_printer_,
|
||||
escape_printer_.do_encode_))) {
|
||||
escape_printer_.do_encode_,
|
||||
escape_printer_.ignore_convert_failed_))) {
|
||||
if (OB_SIZE_OVERFLOW == ret) {
|
||||
if (i == 0 && OB_UNLIKELY(OB_SUCCESS != (tmp_ret = flush_buf(escape_printer_.pos_)))) {
|
||||
LOG_WARN("failed to flush buffer", K(tmp_ret), K(ret));
|
||||
@ -675,7 +691,7 @@ int ObSelectIntoOp::write_obj_to_file(const ObObj &obj, bool need_escape)
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("failed to print plain str", K(ret));
|
||||
LOG_WARN("failed to print plain str", K(ret), K(src_type), K(escape_printer_.do_encode_));
|
||||
}
|
||||
} else {
|
||||
print_succ = true;
|
||||
@ -746,12 +762,17 @@ int ObSelectIntoOp::write_lob_to_file(const ObObj &obj, const ObExpr &expr, cons
|
||||
ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_);
|
||||
common::ObArenaAllocator &temp_allocator = tmp_alloc_g.get_allocator();
|
||||
int64_t truncated_len = 0;
|
||||
bool stop_when_truncated = false;
|
||||
|
||||
if (OB_FAIL(lob_iter.init(0, NULL, &temp_allocator))) {
|
||||
LOG_WARN("init lob_iter failed ", K(ret), K(lob_iter));
|
||||
}
|
||||
// 当truncated_len == src_block_data.length()时
|
||||
// 表明当前foreach_char处理的仅为lob末尾的无效的数据, 即上一轮的truncated data, 要避免死循环
|
||||
while (OB_SUCC(ret)
|
||||
&& (state = lob_iter.get_next_block(src_block_data)) == TEXTSTRING_ITER_NEXT) {
|
||||
// outrow lob最后一次才有可能为false, inrow lob只迭代一次, 为false
|
||||
stop_when_truncated = (truncated_len != src_block_data.length()) && lob_iter.is_outrow_lob();
|
||||
if ((escape_printer_.buf_len_ - escape_printer_.pos_) < (src_block_data.length() * 5)
|
||||
&& OB_FAIL(flush_buf(escape_printer_.pos_))) {
|
||||
LOG_WARN("failed to flush buf", K(ret));
|
||||
@ -759,9 +780,10 @@ int ObSelectIntoOp::write_lob_to_file(const ObObj &obj, const ObExpr &expr, cons
|
||||
src_type,
|
||||
escape_printer_,
|
||||
escape_printer_.do_encode_,
|
||||
false,
|
||||
escape_printer_.ignore_convert_failed_,
|
||||
stop_when_truncated,
|
||||
&truncated_len))) {
|
||||
if (OB_ERR_DATA_TRUNCATED == ret) {
|
||||
if (OB_ERR_DATA_TRUNCATED == ret && stop_when_truncated) {
|
||||
lob_iter.set_reserved_byte_len(truncated_len);
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
@ -847,13 +869,6 @@ int ObSelectIntoOp::into_outfile()
|
||||
const ObIArray<ObExpr*> &select_exprs = MY_SPEC.select_exprs_;
|
||||
ObDatum *datum = NULL;
|
||||
ObObj obj;
|
||||
if (is_first_) { // create file
|
||||
if (OB_FAIL(open_file(true))) {
|
||||
LOG_WARN("open file failed", K(ret), K(file_name_));
|
||||
} else {
|
||||
is_first_ = false;
|
||||
}
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < select_exprs.count(); ++i) {
|
||||
if (OB_ISNULL(select_exprs.at(i))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -891,13 +906,6 @@ int ObSelectIntoOp::into_outfile_batch(const ObBatchRows &brs)
|
||||
ObArray<ObDatumVector> datum_vectors;
|
||||
ObDatum *datum = NULL;
|
||||
ObObj obj;
|
||||
if (is_first_) { // create file
|
||||
if (OB_FAIL(open_file(true))) {
|
||||
LOG_WARN("open file failed", K(ret), K(file_name_));
|
||||
} else {
|
||||
is_first_ = false;
|
||||
}
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < select_exprs.count(); ++i) {
|
||||
if (OB_FAIL(select_exprs.at(i)->eval_batch(eval_ctx_, *brs.skip_, brs.size_))) {
|
||||
LOG_WARN("failed to eval batch", K(ret));
|
||||
@ -1033,15 +1041,17 @@ int ObSelectIntoOp::prepare_escape_printer()
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t pos = 0;
|
||||
char *buf = NULL;
|
||||
int64_t buf_len = 5 * ObCharset::MAX_MB_LEN;
|
||||
int64_t buf_len = 6 * ObCharset::MAX_MB_LEN;
|
||||
// mb->wc
|
||||
int32_t wchar_enclose = char_enclose_;
|
||||
int32_t wchar_escape = char_escape_;
|
||||
int32_t wchar_field = 0;
|
||||
int32_t wchar_line = 0;
|
||||
int32_t wchar_zero = '\0';
|
||||
int32_t wchar_replace = 0;
|
||||
OZ(extract_fisrt_wchar_from_varhcar(MY_SPEC.field_str_, wchar_field));
|
||||
OZ(extract_fisrt_wchar_from_varhcar(MY_SPEC.line_str_, wchar_line));
|
||||
OZ(ObCharset::get_replace_character(MY_SPEC.cs_type_, wchar_replace));
|
||||
// wc->mb
|
||||
if (OB_ISNULL(buf = static_cast<char*>(ctx_.get_allocator().alloc(buf_len)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
@ -1052,7 +1062,9 @@ int ObSelectIntoOp::prepare_escape_printer()
|
||||
OZ(print_wchar_to_buf(buf, buf_len, pos, wchar_zero, escape_printer_.zero_, MY_SPEC.cs_type_));
|
||||
OZ(print_wchar_to_buf(buf, buf_len, pos, wchar_field, escape_printer_.field_terminator_, MY_SPEC.cs_type_));
|
||||
OZ(print_wchar_to_buf(buf, buf_len, pos, wchar_line, escape_printer_.line_terminator_, MY_SPEC.cs_type_));
|
||||
OZ(print_wchar_to_buf(buf, buf_len, pos, wchar_replace, escape_printer_.convert_replacer_, MY_SPEC.cs_type_));
|
||||
escape_printer_.coll_type_ = MY_SPEC.cs_type_;
|
||||
escape_printer_.ignore_convert_failed_ = true; // TODO: provide user-defined interface
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -110,6 +110,7 @@ public:
|
||||
has_escape_(false),
|
||||
has_lob_(false),
|
||||
has_json_(false),
|
||||
is_file_opened_(false),
|
||||
print_params_(),
|
||||
escape_printer_()
|
||||
{
|
||||
@ -119,7 +120,8 @@ public:
|
||||
struct ObEscapePrinter
|
||||
{
|
||||
ObEscapePrinter():
|
||||
need_enclose_(false), do_encode_(false), do_escape_(false), print_hex_(false) {}
|
||||
need_enclose_(false), do_encode_(false), do_escape_(false), print_hex_(false),
|
||||
ignore_convert_failed_(false) {}
|
||||
int operator() (const ObString &src_str, const ob_wc_t &unicode_value) {
|
||||
int ret = OB_SUCCESS;
|
||||
ObString dst_str = src_str;
|
||||
@ -129,6 +131,9 @@ public:
|
||||
ret = ObCharset::wc_mb(coll_type_, unicode_value, tmp_buf, ObCharset::MAX_MB_LEN, result_len);
|
||||
if (OB_SUCC(ret)) {
|
||||
dst_str = ObString(result_len, tmp_buf);
|
||||
} else if (ret == OB_ERR_INCORRECT_STRING_VALUE && ignore_convert_failed_) {
|
||||
dst_str = convert_replacer_;
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret) || !do_escape_ || print_hex_) {
|
||||
@ -155,11 +160,13 @@ public:
|
||||
ObString zero_;
|
||||
ObString field_terminator_;
|
||||
ObString line_terminator_;
|
||||
ObString convert_replacer_;
|
||||
ObCollationType coll_type_;
|
||||
bool need_enclose_;
|
||||
bool do_encode_;
|
||||
bool do_escape_;
|
||||
bool print_hex_;
|
||||
bool ignore_convert_failed_;
|
||||
char *buf_;
|
||||
int64_t buf_len_;
|
||||
int64_t pos_;
|
||||
@ -239,6 +246,7 @@ public:
|
||||
write_bytes_ = 0;
|
||||
split_file_id_ = 0;
|
||||
data_writer_.init(NULL, 0);
|
||||
is_file_opened_ = false;
|
||||
}
|
||||
|
||||
private:
|
||||
@ -268,7 +276,9 @@ private:
|
||||
int write_lob_to_file(const ObObj &obj, const ObExpr &expr, const ObDatum &datum);
|
||||
int try_split_file();
|
||||
int into_varlist();
|
||||
int open_file(bool delay_create = false);
|
||||
int open_file();
|
||||
int calc_next_file_path();
|
||||
int calc_first_file_path(ObString &path);
|
||||
int split_file();
|
||||
void close_file();
|
||||
std::function<int(const char *, int64_t)> get_flush_function();
|
||||
@ -297,6 +307,7 @@ private:
|
||||
bool has_escape_;
|
||||
bool has_lob_;
|
||||
bool has_json_;
|
||||
bool is_file_opened_;
|
||||
common::ObObjPrintParams print_params_;
|
||||
ObEscapePrinter escape_printer_;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user