diff --git a/deps/oblib/src/lib/charset/ob_charset.cpp b/deps/oblib/src/lib/charset/ob_charset.cpp index ba50e5e126..24d4a9eae4 100644 --- a/deps/oblib/src/lib/charset/ob_charset.cpp +++ b/deps/oblib/src/lib/charset/ob_charset.cpp @@ -3066,6 +3066,20 @@ bool ObCharset::is_cs_unicode(ObCollationType collation_type) return is_cs_unicode; } +int ObCharset::get_replace_character(ObCollationType collation_type, int32_t &replaced_char_unicode) +{ + int ret = OB_SUCCESS; + if (is_cs_unicode(collation_type)) { + replaced_char_unicode = OB_CS_REPLACEMENT_CHARACTER; + } else if (!is_cs_nonascii(collation_type)) { + replaced_char_unicode = '?'; + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected collation type", K(ret)); + } + return ret; +} + bool ObCharset::is_cjk_charset(ObCollationType collation_type) { ObCharsetType cs_type = ObCharset::charset_type_by_coll(collation_type); diff --git a/deps/oblib/src/lib/charset/ob_charset.h b/deps/oblib/src/lib/charset/ob_charset.h index 04b8014932..e5efb0216c 100644 --- a/deps/oblib/src/lib/charset/ob_charset.h +++ b/deps/oblib/src/lib/charset/ob_charset.h @@ -541,6 +541,7 @@ public: static bool is_cs_nonascii(ObCollationType collation_type); static bool is_cs_unicode(ObCollationType collation_type); + static int get_replace_character(ObCollationType collation_type, int32_t &replaced_char_unicode); static bool is_cjk_charset(ObCollationType collation_type); static bool is_valid_connection_collation(ObCollationType collation_type); static const char* get_oracle_charset_name_by_charset_type(ObCharsetType charset_type); diff --git a/deps/oblib/src/lib/charset/ob_charset_string_helper.h b/deps/oblib/src/lib/charset/ob_charset_string_helper.h index 55240c05a8..6b66031ce5 100644 --- a/deps/oblib/src/lib/charset/ob_charset_string_helper.h +++ b/deps/oblib/src/lib/charset/ob_charset_string_helper.h @@ -61,6 +61,8 @@ inline int ob_charset_char_len(const unsigned char *s, const un mb_len = 3; } else if (c < 0xf8) { mb_len = 4; + } else { + mb_len = 1; /* Illegal mb head */ } if (s + mb_len > e) { mb_len = OB_CS_TOOSMALL; @@ -195,6 +197,8 @@ inline int ob_charset_char_len(const unsigned char *s, const un if (OB_LIKELY(s + 3 < e)) { mb_len = 4; } + } else { + mb_len = 1; /* Illegal low_c */ } } } @@ -614,6 +618,7 @@ public: static int foreach_char_prototype(const ObString &str, HANDLE_FUNC &func, bool ignore_convert_failed = false, + bool stop_when_truncated = false, int64_t *truncated_len = NULL) { int ret = OB_SUCCESS; @@ -621,20 +626,23 @@ public: const char* end = str.ptr() + str.length(); int64_t step = 0; ob_wc_t unicode = -1; + int32_t replace_wc = 0; for (; OB_SUCC(ret) && begin < end; begin += step) { if (DO_DECODE) { step = ob_charset_decode_unicode(pointer_cast(begin), pointer_cast(end), unicode); } else { step = ob_charset_char_len(pointer_cast(begin), pointer_cast(end)); } - if (OB_UNLIKELY(step <= OB_CS_TOOSMALL)) { - ret = OB_ERR_DATA_TRUNCATED; - if (OB_NOT_NULL(truncated_len)) { - *truncated_len = end - begin; - } - } else if (OB_UNLIKELY(step <= 0)) { - if (ignore_convert_failed) { + if (OB_UNLIKELY(step <= 0)) { + if (ignore_convert_failed && !(stop_when_truncated && step <= OB_CS_TOOSMALL)) { + ret = OB_SUCCESS; step = 1; + unicode = -1; + } else if (step <= OB_CS_TOOSMALL) { + ret = OB_ERR_DATA_TRUNCATED; + if (OB_NOT_NULL(truncated_len)) { + *truncated_len = end - begin; + } } else { ret = OB_ERR_INCORRECT_STRING_VALUE; } @@ -662,44 +670,45 @@ public: HANDLE_FUNC &func, bool convert_unicode = true, bool ignore_convert_failed = false, + bool stop_when_truncated = false, int64_t *truncated_len = NULL) { int ret = OB_SUCCESS; switch (cs_type) { case CHARSET_UTF8MB4: ret = convert_unicode ? - foreach_char_prototype(str, func, ignore_convert_failed, truncated_len) - : foreach_char_prototype(str, func, ignore_convert_failed, truncated_len); + foreach_char_prototype(str, func, ignore_convert_failed, stop_when_truncated, truncated_len) + : foreach_char_prototype(str, func, ignore_convert_failed, stop_when_truncated, truncated_len); break; case CHARSET_GBK: ret = convert_unicode ? - foreach_char_prototype(str, func, ignore_convert_failed, truncated_len) - : foreach_char_prototype(str, func, ignore_convert_failed, truncated_len); + foreach_char_prototype(str, func, ignore_convert_failed, stop_when_truncated, truncated_len) + : foreach_char_prototype(str, func, ignore_convert_failed, stop_when_truncated, truncated_len); break; case CHARSET_GB18030: ret = convert_unicode ? - foreach_char_prototype(str, func, ignore_convert_failed, truncated_len) - : foreach_char_prototype(str, func, ignore_convert_failed, truncated_len); + foreach_char_prototype(str, func, ignore_convert_failed, stop_when_truncated, truncated_len) + : foreach_char_prototype(str, func, ignore_convert_failed, stop_when_truncated, truncated_len); break; case CHARSET_GB18030_2022: ret = convert_unicode ? - foreach_char_prototype(str, func, ignore_convert_failed, truncated_len) - : foreach_char_prototype(str, func, ignore_convert_failed, truncated_len); + foreach_char_prototype(str, func, ignore_convert_failed, stop_when_truncated, truncated_len) + : foreach_char_prototype(str, func, ignore_convert_failed, stop_when_truncated, truncated_len); break; case CHARSET_UTF16: ret = convert_unicode ? - foreach_char_prototype(str, func, ignore_convert_failed, truncated_len) - : foreach_char_prototype(str, func, ignore_convert_failed, truncated_len); + foreach_char_prototype(str, func, ignore_convert_failed, stop_when_truncated, truncated_len) + : foreach_char_prototype(str, func, ignore_convert_failed, stop_when_truncated, truncated_len); break; case CHARSET_LATIN1: ret = convert_unicode ? - foreach_char_prototype(str, func, ignore_convert_failed, truncated_len) - : foreach_char_prototype(str, func, ignore_convert_failed, truncated_len); + foreach_char_prototype(str, func, ignore_convert_failed, stop_when_truncated, truncated_len) + : foreach_char_prototype(str, func, ignore_convert_failed, stop_when_truncated, truncated_len); break; case CHARSET_BINARY: ret = convert_unicode ? - foreach_char_prototype(str, func, ignore_convert_failed, truncated_len) - : foreach_char_prototype(str, func, ignore_convert_failed, truncated_len); + foreach_char_prototype(str, func, ignore_convert_failed, stop_when_truncated, truncated_len) + : foreach_char_prototype(str, func, ignore_convert_failed, stop_when_truncated, truncated_len); break; default: ret = OB_ERR_UNEXPECTED; @@ -744,30 +753,31 @@ public: ObCharsetType in_cs_type = ObCharset::charset_type_by_coll(src_coll_type); ObCharsetType out_cs_type = ObCharset::charset_type_by_coll(out_coll_type); int64_t truncated_len = 0; + bool stop_when_truncated = false; switch (out_cs_type) { case CHARSET_UTF8MB4: { Encoder encoder(buf, buf_len, pos, replaced_char); - ret = foreach_char(str, in_cs_type, encoder, true, !report_error, &truncated_len); + ret = foreach_char(str, in_cs_type, encoder, true, !report_error, stop_when_truncated, &truncated_len); break; } case CHARSET_GBK: { Encoder encoder(buf, buf_len, pos, replaced_char); - ret = foreach_char(str, in_cs_type, encoder, true, !report_error, &truncated_len); + ret = foreach_char(str, in_cs_type, encoder, true, !report_error, stop_when_truncated, &truncated_len); break; } case CHARSET_GB18030: { Encoder encoder(buf, buf_len, pos, replaced_char); - ret = foreach_char(str, in_cs_type, encoder, true, !report_error, &truncated_len); + ret = foreach_char(str, in_cs_type, encoder, true, !report_error, stop_when_truncated, &truncated_len); break; } case CHARSET_GB18030_2022: { Encoder encoder(buf, buf_len, pos, replaced_char); - ret = foreach_char(str, in_cs_type, encoder, true, !report_error, &truncated_len); + ret = foreach_char(str, in_cs_type, encoder, true, !report_error, stop_when_truncated, &truncated_len); break; } case CHARSET_UTF16: { Encoder encoder(buf, buf_len, pos, replaced_char); - ret = foreach_char(str, in_cs_type, encoder, true, !report_error, &truncated_len); + ret = foreach_char(str, in_cs_type, encoder, true, !report_error, stop_when_truncated, &truncated_len); break; } default: { diff --git a/src/sql/engine/basic/ob_select_into_op.cpp b/src/sql/engine/basic/ob_select_into_op.cpp index 1970e29aa0..b77c7608a8 100644 --- a/src/sql/engine/basic/ob_select_into_op.cpp +++ b/src/sql/engine/basic/ob_select_into_op.cpp @@ -114,31 +114,8 @@ int ObSelectIntoOp::inner_open() file_location_ = path.prefix_match_ci(OB_OSS_PREFIX) ? IntoFileLocation::REMOTE_OSS : IntoFileLocation::SERVER_DISK; - if (!MY_SPEC.is_single_) { - input_file_name = file_location_ == IntoFileLocation::REMOTE_OSS - ? path.split_on('?').trim() - : path; - if (input_file_name.length() == 0 || path.length() == 0 || OB_ISNULL(input)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected path or input is null", K(ret)); - } else { - if (input_file_name.ptr()[input_file_name.length() - 1] == '/'){ - file_name_with_suffix.append_fmt("%sdata", to_cstring(input_file_name)); - } else { - file_name_with_suffix.append_fmt("%s", to_cstring(input_file_name)); - } - if (MY_SPEC.parallel_ > 1) { - file_name_with_suffix.append_fmt("_%ld_%ld_%ld", input->sqc_id_, input->task_id_, split_file_id_); - } else { - file_name_with_suffix.append_fmt("_%ld", split_file_id_); - } - if (file_location_ == IntoFileLocation::REMOTE_OSS) { - file_name_with_suffix.append_fmt("?%s", to_cstring(path)); - } - path = file_name_with_suffix.string(); - } - } - if (OB_FAIL(ret)) { + if (T_INTO_OUTFILE == into_type && !MY_SPEC.is_single_ && OB_FAIL(calc_first_file_path(path))) { + LOG_WARN("failed to calc first file path", K(ret)); } else if (file_location_ == IntoFileLocation::REMOTE_OSS) { ObString temp_url = path.split_on('?'); temp_url.trim(); @@ -210,6 +187,8 @@ int ObSelectIntoOp::inner_get_next_row() if (OB_ISNULL(phy_plan_ctx = ctx_.get_physical_plan_ctx())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get phy_plan_ctx failed", K(ret)); + } else if (T_INTO_OUTFILE == into_type && MY_SPEC.is_single_ && OB_FAIL(open_file())) { + LOG_WARN("failed to open file", K(ret)); } while (OB_SUCC(ret) && row_count < top_limit_cnt_) { clear_evaluated_flag(); @@ -258,6 +237,8 @@ int ObSelectIntoOp::inner_get_next_batch(const int64_t max_row_cnt) if (OB_ISNULL(phy_plan_ctx = ctx_.get_physical_plan_ctx())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get phy_plan_ctx failed", K(ret)); + } else if (T_INTO_OUTFILE == into_type && MY_SPEC.is_single_ && OB_FAIL(open_file())) { + LOG_WARN("failed to open file", K(ret)); } bool stop_loop = false; bool is_iter_end = false; @@ -388,7 +369,72 @@ int ObSelectIntoOp::get_row_str(const int64_t buf_len, return ret; } -int ObSelectIntoOp::open_file(bool delay_create) +int ObSelectIntoOp::open_file() +{ + int ret = OB_SUCCESS; + if (IntoFileLocation::REMOTE_OSS == file_location_) { + ObIODOpt opt; + ObIODOpts iod_opts; + opt.set("AccessType", "appender"); + iod_opts.opts_ = &opt; + iod_opts.opt_cnt_ = 1; + bool is_exist = false; + if (OB_FAIL(device_handle_->exist(url_.ptr(), is_exist))) { + LOG_WARN("failed to check file exist", K(ret), K(url_)); + } else if (is_exist) { + ret = OB_FILE_ALREADY_EXIST; + LOG_WARN("file already exist", K(ret), K(url_)); + } else if (OB_FAIL(device_handle_->open(url_.ptr(), -1, 0, fd_, &iod_opts))) { + LOG_WARN("failed to open file", K(ret)); + } else { + is_file_opened_ = true; + } + } else if (IntoFileLocation::SERVER_DISK == file_location_) { + if (OB_FAIL(file_appender_.create(url_, true))) { + LOG_WARN("failed to create file", K(ret), K(url_)); + } else { + is_file_opened_ = true; + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected error. invalid file location", K(ret)); + } + return ret; +} + +int ObSelectIntoOp::calc_first_file_path(ObString &path) +{ + int ret = OB_SUCCESS; + ObSqlString file_name_with_suffix; + ObSelectIntoOpInput *input = static_cast(input_); + ObString input_file_name = file_location_ == IntoFileLocation::REMOTE_OSS + ? path.split_on('?').trim() + : path; + if (input_file_name.length() == 0 || path.length() == 0 || OB_ISNULL(input)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected path or input is null", K(ret)); + } else { + if (input_file_name.ptr()[input_file_name.length() - 1] == '/'){ + file_name_with_suffix.append_fmt("%sdata", to_cstring(input_file_name)); + } else { + file_name_with_suffix.append_fmt("%s", to_cstring(input_file_name)); + } + if (MY_SPEC.parallel_ > 1) { + file_name_with_suffix.append_fmt("_%ld_%ld_%ld", input->sqc_id_, input->task_id_, split_file_id_); + } else { + file_name_with_suffix.append_fmt("_%ld", split_file_id_); + } + if (file_location_ == IntoFileLocation::REMOTE_OSS) { + file_name_with_suffix.append_fmt("?%s", to_cstring(path)); + } + if (OB_FAIL(ob_write_string(ctx_.get_allocator(), file_name_with_suffix.string(), path))) { + LOG_WARN("failed to write string", K(ret)); + } + } + return ret; +} + +int ObSelectIntoOp::calc_next_file_path() { int ret = OB_SUCCESS; ObSqlString url_with_suffix; @@ -414,30 +460,6 @@ int ObSelectIntoOp::open_file(bool delay_create) LOG_WARN("fail to write string", K(ret)); } } - if (OB_FAIL(ret)) { - } else if (IntoFileLocation::REMOTE_OSS == file_location_) { - ObIODOpt opt; - ObIODOpts iod_opts; - opt.set("AccessType", "appender"); - iod_opts.opts_ = &opt; - iod_opts.opt_cnt_ = 1; - bool is_exist = false; - if (OB_FAIL(device_handle_->exist(url_.ptr(), is_exist))) { - LOG_WARN("fail to check file exist", K(ret), K(url_)); - } else if (is_exist) { - ret = OB_FILE_ALREADY_EXIST; - LOG_WARN("file already exist", K(ret), K(url_)); - } else if (!delay_create && OB_FAIL(device_handle_->open(url_.ptr(), -1, 0, fd_, &iod_opts))) { - LOG_WARN("fail to open file", K(ret)); - } - } else if (IntoFileLocation::SERVER_DISK == file_location_) { - if (!delay_create && OB_FAIL(file_appender_.create(url_, true))) { - LOG_WARN("create dumpfile failed", K(ret), K(url_)); - } - } else { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected error. invalid file location", K(ret)); - } return ret; } @@ -451,6 +473,7 @@ void ObSelectIntoOp::close_file() fd_.reset(); } } + is_file_opened_ = false; } std::function ObSelectIntoOp::get_flush_function() @@ -458,26 +481,16 @@ std::function ObSelectIntoOp::get_flush_function() return [this](const char *data, int64_t data_len) -> int { int ret = OB_SUCCESS; - if (file_location_ == IntoFileLocation::SERVER_DISK) { - if (!file_appender_.is_opened() && OB_FAIL(file_appender_.create(url_, true))) { - LOG_WARN("failed to create file", K(ret), K(url_)); - } else if (OB_FAIL(file_appender_.append(data, data_len, false))) { + if (!is_file_opened_ && OB_FAIL(open_file())) { + LOG_WARN("failed to open file", K(ret), K(url_)); + } else if (file_location_ == IntoFileLocation::SERVER_DISK) { + if (OB_FAIL(file_appender_.append(data, data_len, false))) { LOG_WARN("failed to append file", K(ret), K(data_len)); } - } else { - ObIODOpt opt; - ObIODOpts iod_opts; - opt.set("AccessType", "appender"); - iod_opts.opts_ = &opt; - iod_opts.opt_cnt_ = 1; - bool is_exist = false; + } else if (file_location_ == IntoFileLocation::REMOTE_OSS) { int64_t write_size = 0; int64_t begin_ts = ObTimeUtility::current_time(); - if (OB_FAIL(device_handle_->exist(url_.ptr(), is_exist))) { - LOG_WARN("failed to check file exist", K(ret)); - } else if (!is_exist && OB_FAIL(device_handle_->open(url_.ptr(), -1, 0, fd_, &iod_opts))) { - LOG_WARN("failed to open file", K(ret)); - } else if (OB_FAIL(device_handle_->write(fd_, data, data_len, write_size))) { + if (OB_FAIL(device_handle_->write(fd_, data, data_len, write_size))) { LOG_WARN("failed to write device", K(ret)); } else if (OB_UNLIKELY(write_size != data_len)) { ret = OB_IO_ERROR; @@ -492,6 +505,9 @@ std::function ObSelectIntoOp::get_flush_function() _OB_LOG(TRACE, "write oss stat, time:%ld write_size:%ld speed:%.2Lf MB/s total_write:%.2Lf MB", cost_time, write_size, speed, total_write); } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected error. invalid file location", K(ret)); } return ret; }; @@ -501,7 +517,6 @@ int ObSelectIntoOp::split_file() { int ret = OB_SUCCESS; int64_t dummy_pos = 0; - bool delay_create = true; if (OB_FAIL(flush_buf(dummy_pos))) { LOG_WARN("fail to flush buffer", K(ret)); } else { @@ -529,8 +544,8 @@ int ObSelectIntoOp::split_file() //create new file if (OB_SUCC(ret)) { split_file_id_++; - if (OB_FAIL(open_file(delay_create))) { - LOG_WARN("fail to open file", K(ret)); + if (OB_FAIL(calc_next_file_path())) { + LOG_WARN("failed to calculate new file path", K(ret)); } } return ret; @@ -662,7 +677,8 @@ int ObSelectIntoOp::write_obj_to_file(const ObObj &obj, bool need_escape) if (OB_FAIL(ObFastStringScanner::foreach_char(str_to_escape, src_type, escape_printer_, - escape_printer_.do_encode_))) { + escape_printer_.do_encode_, + escape_printer_.ignore_convert_failed_))) { if (OB_SIZE_OVERFLOW == ret) { if (i == 0 && OB_UNLIKELY(OB_SUCCESS != (tmp_ret = flush_buf(escape_printer_.pos_)))) { LOG_WARN("failed to flush buffer", K(tmp_ret), K(ret)); @@ -675,7 +691,7 @@ int ObSelectIntoOp::write_obj_to_file(const ObObj &obj, bool need_escape) ret = OB_SUCCESS; } } else { - LOG_WARN("failed to print plain str", K(ret)); + LOG_WARN("failed to print plain str", K(ret), K(src_type), K(escape_printer_.do_encode_)); } } else { print_succ = true; @@ -746,12 +762,17 @@ int ObSelectIntoOp::write_lob_to_file(const ObObj &obj, const ObExpr &expr, cons ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_); common::ObArenaAllocator &temp_allocator = tmp_alloc_g.get_allocator(); int64_t truncated_len = 0; + bool stop_when_truncated = false; if (OB_FAIL(lob_iter.init(0, NULL, &temp_allocator))) { LOG_WARN("init lob_iter failed ", K(ret), K(lob_iter)); } + // 当truncated_len == src_block_data.length()时 + // 表明当前foreach_char处理的仅为lob末尾的无效的数据, 即上一轮的truncated data, 要避免死循环 while (OB_SUCC(ret) && (state = lob_iter.get_next_block(src_block_data)) == TEXTSTRING_ITER_NEXT) { + // outrow lob最后一次才有可能为false, inrow lob只迭代一次, 为false + stop_when_truncated = (truncated_len != src_block_data.length()) && lob_iter.is_outrow_lob(); if ((escape_printer_.buf_len_ - escape_printer_.pos_) < (src_block_data.length() * 5) && OB_FAIL(flush_buf(escape_printer_.pos_))) { LOG_WARN("failed to flush buf", K(ret)); @@ -759,9 +780,10 @@ int ObSelectIntoOp::write_lob_to_file(const ObObj &obj, const ObExpr &expr, cons src_type, escape_printer_, escape_printer_.do_encode_, - false, + escape_printer_.ignore_convert_failed_, + stop_when_truncated, &truncated_len))) { - if (OB_ERR_DATA_TRUNCATED == ret) { + if (OB_ERR_DATA_TRUNCATED == ret && stop_when_truncated) { lob_iter.set_reserved_byte_len(truncated_len); ret = OB_SUCCESS; } else { @@ -847,13 +869,6 @@ int ObSelectIntoOp::into_outfile() const ObIArray &select_exprs = MY_SPEC.select_exprs_; ObDatum *datum = NULL; ObObj obj; - if (is_first_) { // create file - if (OB_FAIL(open_file(true))) { - LOG_WARN("open file failed", K(ret), K(file_name_)); - } else { - is_first_ = false; - } - } for (int64_t i = 0; OB_SUCC(ret) && i < select_exprs.count(); ++i) { if (OB_ISNULL(select_exprs.at(i))) { ret = OB_ERR_UNEXPECTED; @@ -891,13 +906,6 @@ int ObSelectIntoOp::into_outfile_batch(const ObBatchRows &brs) ObArray datum_vectors; ObDatum *datum = NULL; ObObj obj; - if (is_first_) { // create file - if (OB_FAIL(open_file(true))) { - LOG_WARN("open file failed", K(ret), K(file_name_)); - } else { - is_first_ = false; - } - } for (int64_t i = 0; OB_SUCC(ret) && i < select_exprs.count(); ++i) { if (OB_FAIL(select_exprs.at(i)->eval_batch(eval_ctx_, *brs.skip_, brs.size_))) { LOG_WARN("failed to eval batch", K(ret)); @@ -1033,15 +1041,17 @@ int ObSelectIntoOp::prepare_escape_printer() int ret = OB_SUCCESS; int64_t pos = 0; char *buf = NULL; - int64_t buf_len = 5 * ObCharset::MAX_MB_LEN; + int64_t buf_len = 6 * ObCharset::MAX_MB_LEN; // mb->wc int32_t wchar_enclose = char_enclose_; int32_t wchar_escape = char_escape_; int32_t wchar_field = 0; int32_t wchar_line = 0; int32_t wchar_zero = '\0'; + int32_t wchar_replace = 0; OZ(extract_fisrt_wchar_from_varhcar(MY_SPEC.field_str_, wchar_field)); OZ(extract_fisrt_wchar_from_varhcar(MY_SPEC.line_str_, wchar_line)); + OZ(ObCharset::get_replace_character(MY_SPEC.cs_type_, wchar_replace)); // wc->mb if (OB_ISNULL(buf = static_cast(ctx_.get_allocator().alloc(buf_len)))) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -1052,7 +1062,9 @@ int ObSelectIntoOp::prepare_escape_printer() OZ(print_wchar_to_buf(buf, buf_len, pos, wchar_zero, escape_printer_.zero_, MY_SPEC.cs_type_)); OZ(print_wchar_to_buf(buf, buf_len, pos, wchar_field, escape_printer_.field_terminator_, MY_SPEC.cs_type_)); OZ(print_wchar_to_buf(buf, buf_len, pos, wchar_line, escape_printer_.line_terminator_, MY_SPEC.cs_type_)); + OZ(print_wchar_to_buf(buf, buf_len, pos, wchar_replace, escape_printer_.convert_replacer_, MY_SPEC.cs_type_)); escape_printer_.coll_type_ = MY_SPEC.cs_type_; + escape_printer_.ignore_convert_failed_ = true; // TODO: provide user-defined interface return ret; } diff --git a/src/sql/engine/basic/ob_select_into_op.h b/src/sql/engine/basic/ob_select_into_op.h index 2bbd471e17..6e86f03b83 100644 --- a/src/sql/engine/basic/ob_select_into_op.h +++ b/src/sql/engine/basic/ob_select_into_op.h @@ -110,6 +110,7 @@ public: has_escape_(false), has_lob_(false), has_json_(false), + is_file_opened_(false), print_params_(), escape_printer_() { @@ -119,7 +120,8 @@ public: struct ObEscapePrinter { ObEscapePrinter(): - need_enclose_(false), do_encode_(false), do_escape_(false), print_hex_(false) {} + need_enclose_(false), do_encode_(false), do_escape_(false), print_hex_(false), + ignore_convert_failed_(false) {} int operator() (const ObString &src_str, const ob_wc_t &unicode_value) { int ret = OB_SUCCESS; ObString dst_str = src_str; @@ -129,6 +131,9 @@ public: ret = ObCharset::wc_mb(coll_type_, unicode_value, tmp_buf, ObCharset::MAX_MB_LEN, result_len); if (OB_SUCC(ret)) { dst_str = ObString(result_len, tmp_buf); + } else if (ret == OB_ERR_INCORRECT_STRING_VALUE && ignore_convert_failed_) { + dst_str = convert_replacer_; + ret = OB_SUCCESS; } } if (OB_FAIL(ret) || !do_escape_ || print_hex_) { @@ -155,11 +160,13 @@ public: ObString zero_; ObString field_terminator_; ObString line_terminator_; + ObString convert_replacer_; ObCollationType coll_type_; bool need_enclose_; bool do_encode_; bool do_escape_; bool print_hex_; + bool ignore_convert_failed_; char *buf_; int64_t buf_len_; int64_t pos_; @@ -239,6 +246,7 @@ public: write_bytes_ = 0; split_file_id_ = 0; data_writer_.init(NULL, 0); + is_file_opened_ = false; } private: @@ -268,7 +276,9 @@ private: int write_lob_to_file(const ObObj &obj, const ObExpr &expr, const ObDatum &datum); int try_split_file(); int into_varlist(); - int open_file(bool delay_create = false); + int open_file(); + int calc_next_file_path(); + int calc_first_file_path(ObString &path); int split_file(); void close_file(); std::function get_flush_function(); @@ -297,6 +307,7 @@ private: bool has_escape_; bool has_lob_; bool has_json_; + bool is_file_opened_; common::ObObjPrintParams print_params_; ObEscapePrinter escape_printer_; }; diff --git a/src/sql/resolver/dml/ob_select_resolver.cpp b/src/sql/resolver/dml/ob_select_resolver.cpp index 7bd912f124..cfaf633f76 100644 --- a/src/sql/resolver/dml/ob_select_resolver.cpp +++ b/src/sql/resolver/dml/ob_select_resolver.cpp @@ -4970,6 +4970,10 @@ int ObSelectResolver::resolve_into_clause(const ParseNode *node) if (CHARSET_INVALID == (charset_type = ObCharset::charset_type(charset.trim()))) { ret = OB_ERR_UNKNOWN_CHARSET; LOG_USER_ERROR(OB_ERR_UNKNOWN_CHARSET, charset.length(), charset.ptr()); + } else if (CHARSET_UTF16 == charset_type) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("select into outfile character set utf16", K(ret)); + LOG_USER_ERROR(OB_NOT_SUPPORTED, "upload data using utf16"); } else { into_item->cs_type_ = ObCharset::get_default_collation(charset_type); }