From 9b0627ec4d73e57ea98cb83a45b27e93e89899b6 Mon Sep 17 00:00:00 2001 From: wjhh2008 Date: Sun, 13 Nov 2022 02:00:10 +0800 Subject: [PATCH] fix load data insert binary data bug --- deps/oblib/src/common/object/ob_obj_funcs.h | 16 +++++++++++----- deps/oblib/src/common/object/ob_object.h | 3 ++- .../src/lib/mysqlclient/ob_isql_connection.h | 2 ++ .../lib/mysqlclient/ob_mysql_connection.cpp | 7 +++++++ .../src/lib/mysqlclient/ob_mysql_connection.h | 2 ++ .../src/lib/mysqlclient/ob_mysql_proxy.cpp | 10 +++++----- .../src/lib/mysqlclient/ob_mysql_proxy.h | 2 +- src/observer/ob_inner_sql_connection.h | 6 +++--- src/sql/engine/cmd/ob_load_data_impl.cpp | 2 +- .../engine/expr/ob_expr_to_outfile_row.cpp | 19 ++++++++++--------- 10 files changed, 44 insertions(+), 25 deletions(-) diff --git a/deps/oblib/src/common/object/ob_obj_funcs.h b/deps/oblib/src/common/object/ob_obj_funcs.h index b4a5cdccb..86fd02dd9 100644 --- a/deps/oblib/src/common/object/ob_obj_funcs.h +++ b/deps/oblib/src/common/object/ob_obj_funcs.h @@ -1108,8 +1108,10 @@ inline int obj_print_plain_str(const ObObj &obj, char *buffer, ObCharsetType src_type = ObCharset::charset_type_by_coll(obj.get_collation_type()); \ ObCharsetType dst_type = ObCharset::charset_type_by_coll(params.cs_type_); \ if (src_type == CHARSET_BINARY || src_type == dst_type || src_type == CHARSET_INVALID) {\ - if (params.use_memcpy_) { \ - ret = databuff_memcpy(buffer, length, pos, obj.get_string_len(), obj.get_string_ptr()); \ + if (obj.get_collation_type() == CS_TYPE_BINARY && params.binary_string_print_hex_) { \ + ret = hex_print(obj.get_string_ptr(), obj.get_string_len(), buffer, length, pos); \ + } else if (params.use_memcpy_) { \ + ret = databuff_memcpy(buffer, length, pos, obj.get_string_len(), obj.get_string_ptr()); \ } else { \ ret = databuff_printf(buffer, length, pos, "%.*s", obj.get_string_len(), obj.get_string_ptr()); \ } \ @@ -2501,9 +2503,11 @@ template <> int ret = OB_SUCCESS; \ ObCharsetType src_type = ObCharset::charset_type_by_coll(obj.get_collation_type()); \ ObCharsetType dst_type = ObCharset::charset_type_by_coll(params.cs_type_); \ - if (src_type == dst_type) { \ + if (src_type == CHARSET_BINARY && params.binary_string_print_hex_) { \ + ret = hex_print(obj.get_string_ptr(), obj.get_string_len(), buffer, length, pos); \ + } else if (src_type == dst_type) { \ if (params.use_memcpy_) { \ - ret = databuff_memcpy(buffer, length, pos, obj.get_string_len(), obj.get_string_ptr()); \ + ret = databuff_memcpy(buffer, length, pos, obj.get_string_len(), obj.get_string_ptr()); \ } else { \ ret = databuff_printf(buffer, length, pos, "%.*s", obj.get_string_len(), obj.get_string_ptr()); \ } \ @@ -2667,7 +2671,9 @@ inline int obj_print_plain_str(const ObObj &obj, char *buffer, int64_ UNUSED(params); int ret = OB_SUCCESS; ObString str = obj.get_lob_print_string(length - pos); - if (params.use_memcpy_) { + if (obj.get_collation_type() == CS_TYPE_BINARY && params.binary_string_print_hex_) { + ret = hex_print(str.ptr(), str.length(), buffer, length, pos); + } else if (params.use_memcpy_) { ret = databuff_memcpy(buffer, length, pos, str.length(), str.ptr()); } else { ret = databuff_printf(buffer, length, pos, "%.*s", str.length(), str.ptr()); diff --git a/deps/oblib/src/common/object/ob_object.h b/deps/oblib/src/common/object/ob_object.h index 6d7a3820b..7b83a84b9 100644 --- a/deps/oblib/src/common/object/ob_object.h +++ b/deps/oblib/src/common/object/ob_object.h @@ -656,7 +656,8 @@ struct ObObjPrintParams uint32_t use_memcpy_:1; uint32_t skip_escape_:1; uint32_t beginning_space_:1; - uint32_t reserved_:27; + uint32_t binary_string_print_hex_:1; + uint32_t reserved_:26; }; }; }; diff --git a/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h b/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h index 1c4cc447d..64e629cae 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h +++ b/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h @@ -86,6 +86,8 @@ public: bool is_from_pl = false) = 0; virtual int execute_write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows, bool is_user_sql = false) = 0; + virtual int execute_write(const uint64_t tenant_id, const ObString &sql, + int64_t &affected_rows, bool is_user_sql = false) = 0; // transaction interface virtual int start_transaction(const uint64_t &tenant_id, bool with_snap_shot = false) = 0; diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp index 104b490c4..ac5e54bc3 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp @@ -371,6 +371,13 @@ int ObMySQLConnection::switch_tenant(const uint64_t tenant_id) return ret; } +int ObMySQLConnection::execute_write(const uint64_t tenant_id, const ObString &sql, + int64_t &affected_rows, bool is_user_sql) +{ + UNUSEDx(tenant_id, sql, affected_rows, is_user_sql); + return OB_NOT_SUPPORTED; +} + int ObMySQLConnection::execute_write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows, bool is_user_sql) { diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.h b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.h index ae8b4667e..a2b512352 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.h +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.h @@ -72,6 +72,8 @@ public: virtual int execute_read(const int64_t cluster_id, const uint64_t tenant_id, const ObString &sql, ObISQLClient::ReadResult &res, bool is_user_sql = false, bool is_from_pl = false) override; + virtual int execute_write(const uint64_t tenant_id, const ObString &sql, + int64_t &affected_rows, bool is_user_sql = false) override; virtual int execute_write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows, bool is_user_sql = false) override; diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp index d9db2b6b4..6817bed46 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp @@ -138,14 +138,14 @@ int ObCommonSqlProxy::write(const uint64_t tenant_id, const char *sql, int64_t & return ret; } -int ObCommonSqlProxy::write(const uint64_t tenant_id, const char *sql, +int ObCommonSqlProxy::write(const uint64_t tenant_id, const ObString sql, int64_t &affected_rows, int64_t compatibility_mode, const ObSessionParam *param /* = nullptr*/) { int ret = OB_SUCCESS; bool is_user_sql = false; int64_t start = ::oceanbase::common::ObTimeUtility::current_time(); ObISQLConnection *conn = NULL; - if (OB_ISNULL(sql)) { + if (OB_UNLIKELY(sql.empty())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("empty sql"); } else if (OB_FAIL(acquire(tenant_id, conn))) { @@ -155,7 +155,7 @@ int ObCommonSqlProxy::write(const uint64_t tenant_id, const char *sql, LOG_WARN("connection can not be NULL"); } else if (!is_active()) { // check client active after connection acquired ret = OB_INACTIVE_SQL_CLIENT; - LOG_WARN("in active sql client", K(ret), KCSTRING(sql)); + LOG_WARN("in active sql client", K(ret), K(sql)); } int64_t old_compatibility_mode; int64_t old_sql_mode = 0; @@ -203,7 +203,7 @@ int ObCommonSqlProxy::write(const uint64_t tenant_id, const char *sql, } if (OB_SUCC(ret)) { if (OB_FAIL(conn->execute_write(tenant_id, sql, affected_rows, is_user_sql))) { - LOG_WARN("execute sql failed", K(ret), K(conn), K(start), KCSTRING(sql)); + LOG_WARN("execute sql failed", K(ret), K(conn), K(start), K(sql)); } else if (old_compatibility_mode != compatibility_mode && OB_FAIL(conn->set_session_variable("ob_compatibility_mode", old_compatibility_mode))) { LOG_WARN("fail to recover inner connection sql mode", K(ret)); @@ -220,7 +220,7 @@ int ObCommonSqlProxy::write(const uint64_t tenant_id, const char *sql, } } close(conn, ret); - LOG_TRACE("execute sql with sql mode", KCSTRING(sql), K(compatibility_mode), K(ret)); + LOG_TRACE("execute sql with sql mode", K(sql), K(compatibility_mode), K(ret)); return ret; } diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h index 5d533b91b..25cf2dbc7 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h @@ -109,7 +109,7 @@ public: using ObISQLClient::read; // execute update sql virtual int write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows) override; - int write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows, int64_t compatibility_mode, + int write(const uint64_t tenant_id, const ObString sql, int64_t &affected_rows, int64_t compatibility_mode, const ObSessionParam *session_param = nullptr); using ObISQLClient::write; diff --git a/src/observer/ob_inner_sql_connection.h b/src/observer/ob_inner_sql_connection.h index 9701df8fb..67d57874f 100644 --- a/src/observer/ob_inner_sql_connection.h +++ b/src/observer/ob_inner_sql_connection.h @@ -139,9 +139,9 @@ public: virtual int execute_write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows, bool is_user_sql = false) override; - int execute_write(const uint64_t tenant_id, const ObString &sql, - int64_t &affected_rows, - bool is_user_sql = false); + virtual int execute_write(const uint64_t tenant_id, const ObString &sql, + int64_t &affected_rows, + bool is_user_sql = false) override; virtual int start_transaction(const uint64_t &tenant_id, bool with_snap_shot = false) override; virtual int register_multi_data_source(const uint64_t &tenant_id, const share::ObLSID ls_id, diff --git a/src/sql/engine/cmd/ob_load_data_impl.cpp b/src/sql/engine/cmd/ob_load_data_impl.cpp index 25eb3972e..a2f18a326 100644 --- a/src/sql/engine/cmd/ob_load_data_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_impl.cpp @@ -1228,7 +1228,7 @@ int ObLoadDataSPImpl::exec_insert(ObInsertTask &task, ObInsertResult& result) param.is_load_data_exec_ = true; if (OB_SUCC(ret) && OB_FAIL(GCTX.sql_proxy_->write(task.tenant_id_, - sql_str.ptr(), + sql_str.string(), affected_rows, get_compatibility_mode(), ¶m))) { diff --git a/src/sql/engine/expr/ob_expr_to_outfile_row.cpp b/src/sql/engine/expr/ob_expr_to_outfile_row.cpp index 02fad7a6c..76f49f22c 100644 --- a/src/sql/engine/expr/ob_expr_to_outfile_row.cpp +++ b/src/sql/engine/expr/ob_expr_to_outfile_row.cpp @@ -10,9 +10,9 @@ * See the Mulan PubL v2 for more details. */ -#define USING_LOG_PREFIX SQL_ENG - -#include "sql/engine/expr/ob_expr_to_outfile_row.h" +#define USING_LOG_PREFIX SQL_ENG + +#include "sql/engine/expr/ob_expr_to_outfile_row.h" #include #include "lib/oblog/ob_log.h" #include "objit/common/ob_item_type.h" @@ -123,12 +123,13 @@ int ObExprToOutfileRow::calc_outfile_info(const ObExpr &expr, LOG_WARN("session is null", K(ret)); } else if (OB_ISNULL(out_info.print_params_.tz_info_ = session->get_timezone_info())) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("fail to get timezone info", K(ret)); - } else { - out_info.print_params_.use_memcpy_ = true; - out_info.is_optional_ = expr.locate_param_datum(ctx, PARAM_OPTIONAL).get_bool(); - } - + LOG_WARN("fail to get timezone info", K(ret)); + } else { + out_info.print_params_.use_memcpy_ = true; + out_info.print_params_.binary_string_print_hex_ = lib::is_oracle_mode(); + out_info.is_optional_ = expr.locate_param_datum(ctx, PARAM_OPTIONAL).get_bool(); + } + for (int i = 0; OB_SUCC(ret) && i < PARAM_SELECT_ITEM; ++i) { OZ(expr.locate_param_datum(ctx, i).to_obj(objs_array[i], expr.args_[i]->obj_meta_, expr.args_[i]->obj_datum_map_));