[fix] Use fmt::to_string replace memory buffer::data() (#8311)
This commit is contained in:
@ -209,7 +209,7 @@ Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
|
||||
"column({}) value is incorrect while strict mode is {}, "
|
||||
"src value is {}",
|
||||
slot_desc->col_name(), _strict_mode, raw_string);
|
||||
return error_msg.data();
|
||||
return fmt::to_string(error_msg);
|
||||
},
|
||||
&_scanner_eof));
|
||||
_counter->num_rows_filtered++;
|
||||
@ -227,7 +227,7 @@ Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
|
||||
error_msg,
|
||||
"column({}) values is null while columns is not nullable",
|
||||
slot_desc->col_name());
|
||||
return error_msg.data();
|
||||
return fmt::to_string(error_msg);
|
||||
},
|
||||
&_scanner_eof));
|
||||
_counter->num_rows_filtered++;
|
||||
|
||||
@ -479,7 +479,7 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
|
||||
[]() -> std::string {
|
||||
fmt::memory_buffer error_msg;
|
||||
fmt::format_to(error_msg, "{}", "Unable to display");
|
||||
return error_msg.data();
|
||||
return fmt::to_string(error_msg);
|
||||
}, &_scanner_eof));
|
||||
_counter->num_rows_filtered++;
|
||||
_success = false;
|
||||
@ -514,7 +514,7 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
|
||||
fmt::format_to(error_msg, "{}", "actual column number is less than schema column number.");
|
||||
fmt::format_to(error_msg, "actual number: {}, column separator: [{}], ", _split_values.size(), _value_separator);
|
||||
fmt::format_to(error_msg, "line delimiter: [{}], schema number: {}; ", _line_delimiter, _src_slot_descs.size());
|
||||
return error_msg.data();
|
||||
return fmt::to_string(error_msg);
|
||||
}, &_scanner_eof));
|
||||
_counter->num_rows_filtered++;
|
||||
_success = false;
|
||||
@ -527,7 +527,7 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
|
||||
fmt::format_to(error_msg, "{}", "actual column number is more than schema column number.");
|
||||
fmt::format_to(error_msg, "actual number: {}, column separator: [{}], ", _split_values.size(), _value_separator);
|
||||
fmt::format_to(error_msg, "line delimiter: [{}], schema number: {}; ", _line_delimiter, _src_slot_descs.size());
|
||||
return error_msg.data();
|
||||
return fmt::to_string(error_msg);
|
||||
}, &_scanner_eof));
|
||||
_counter->num_rows_filtered++;
|
||||
_success = false;
|
||||
|
||||
@ -406,7 +406,7 @@ Status JsonReader::_parse_json_doc(size_t* size, bool* eof) {
|
||||
fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}",
|
||||
_origin_json_doc.GetParseError(), rapidjson::GetParseError_En(_origin_json_doc.GetParseError()));
|
||||
RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return std::string((char*)json_str, *size); },
|
||||
[&]() -> std::string { return error_msg.data(); }, _scanner_eof));
|
||||
[&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof));
|
||||
_counter->num_rows_filtered++;
|
||||
if (*_scanner_eof) {
|
||||
// Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means
|
||||
@ -415,7 +415,7 @@ Status JsonReader::_parse_json_doc(size_t* size, bool* eof) {
|
||||
*eof = true;
|
||||
return Status::OK();
|
||||
}
|
||||
return Status::DataQualityError(error_msg.data());
|
||||
return Status::DataQualityError(fmt::to_string(error_msg));
|
||||
}
|
||||
|
||||
// set json root
|
||||
@ -426,14 +426,14 @@ Status JsonReader::_parse_json_doc(size_t* size, bool* eof) {
|
||||
fmt::memory_buffer error_msg;
|
||||
fmt::format_to(error_msg, "{}", "JSON Root not found.");
|
||||
RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return _print_json_value(_origin_json_doc); },
|
||||
[&]() -> std::string { return error_msg.data(); }, _scanner_eof));
|
||||
[&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof));
|
||||
_counter->num_rows_filtered++;
|
||||
if (*_scanner_eof) {
|
||||
// Same as Case A
|
||||
*eof = true;
|
||||
return Status::OK();
|
||||
}
|
||||
return Status::DataQualityError(error_msg.data());
|
||||
return Status::DataQualityError(fmt::to_string(error_msg));
|
||||
}
|
||||
} else {
|
||||
_json_doc = &_origin_json_doc;
|
||||
@ -443,28 +443,28 @@ Status JsonReader::_parse_json_doc(size_t* size, bool* eof) {
|
||||
fmt::memory_buffer error_msg;
|
||||
fmt::format_to(error_msg, "{}", "JSON data is array-object, `strip_outer_array` must be TRUE.");
|
||||
RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return _print_json_value(_origin_json_doc); },
|
||||
[&]() -> std::string { return error_msg.data(); }, _scanner_eof));
|
||||
[&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof));
|
||||
_counter->num_rows_filtered++;
|
||||
if (*_scanner_eof) {
|
||||
// Same as Case A
|
||||
*eof = true;
|
||||
return Status::OK();
|
||||
}
|
||||
return Status::DataQualityError(error_msg.data());
|
||||
return Status::DataQualityError(fmt::to_string(error_msg));
|
||||
}
|
||||
|
||||
if (!_json_doc->IsArray() && _strip_outer_array) {
|
||||
fmt::memory_buffer error_msg;
|
||||
fmt::format_to(error_msg, "{}", "JSON data is not an array-object, `strip_outer_array` must be FALSE.");
|
||||
RETURN_IF_ERROR(_state->append_error_msg_to_file([&]() -> std::string { return _print_json_value(_origin_json_doc); },
|
||||
[&]() -> std::string { return error_msg.data(); }, _scanner_eof));
|
||||
[&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof));
|
||||
_counter->num_rows_filtered++;
|
||||
if (*_scanner_eof) {
|
||||
// Same as Case A
|
||||
*eof = true;
|
||||
return Status::OK();
|
||||
}
|
||||
return Status::DataQualityError(error_msg.data());
|
||||
return Status::DataQualityError(fmt::to_string(error_msg));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
@ -539,7 +539,7 @@ Status JsonReader::_write_data_to_tuple(rapidjson::Value::ConstValueIterator val
|
||||
[&]() -> std::string {
|
||||
fmt::memory_buffer error_msg;
|
||||
fmt::format_to(error_msg, "Json value is null, but the column `{}` is not nullable.", desc->col_name());
|
||||
return error_msg.data();
|
||||
return fmt::to_string(error_msg);
|
||||
}, _scanner_eof));
|
||||
_counter->num_rows_filtered++;
|
||||
*valid = false;
|
||||
@ -600,7 +600,7 @@ Status JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple,
|
||||
[&]() -> std::string {
|
||||
fmt::memory_buffer error_msg;
|
||||
fmt::format_to(error_msg, "The column `{}` is not nullable, but it's not found in jsondata.", v->col_name());
|
||||
return error_msg.data();
|
||||
return fmt::to_string(error_msg);
|
||||
}, _scanner_eof));
|
||||
_counter->num_rows_filtered++;
|
||||
*valid = false; // current row is invalid
|
||||
@ -723,7 +723,7 @@ Status JsonReader::_write_values_by_jsonpath(rapidjson::Value& objectValue, MemP
|
||||
[&]() -> std::string {
|
||||
fmt::memory_buffer error_msg;
|
||||
fmt::format_to(error_msg, "The column `{}` is not nullable, but it's not found in jsondata.", slot_descs[i]->col_name());
|
||||
return error_msg.data();
|
||||
return fmt::to_string(error_msg);
|
||||
}, _scanner_eof));
|
||||
_counter->num_rows_filtered++;
|
||||
*valid = false; // current row is invalid
|
||||
|
||||
@ -27,16 +27,16 @@
|
||||
#include "runtime/primitive_type.h"
|
||||
#include "util/types.h"
|
||||
|
||||
#define ODBC_DISPOSE(h, ht, x, op) \
|
||||
{ \
|
||||
auto rc = x; \
|
||||
if (rc != SQL_SUCCESS && rc != SQL_SUCCESS_WITH_INFO) { \
|
||||
return error_status(op, handle_diagnostic_record(h, ht, rc)); \
|
||||
} \
|
||||
if (rc == SQL_ERROR) { \
|
||||
auto err_msg = std::string("Error in") + std::string(op); \
|
||||
return Status::InternalError(err_msg.c_str()); \
|
||||
} \
|
||||
#define ODBC_DISPOSE(h, ht, x, op) \
|
||||
{ \
|
||||
auto rc = x; \
|
||||
if (rc != SQL_SUCCESS && rc != SQL_SUCCESS_WITH_INFO) { \
|
||||
return error_status(fmt::to_string(op), handle_diagnostic_record(h, ht, rc)); \
|
||||
} \
|
||||
if (rc == SQL_ERROR) { \
|
||||
auto err_msg = std::string("Error in") + fmt::to_string(op); \
|
||||
return Status::InternalError(err_msg.c_str()); \
|
||||
} \
|
||||
}
|
||||
|
||||
static constexpr uint32_t SMALL_COLUMN_SIZE_BUFFER = 100;
|
||||
@ -292,7 +292,7 @@ Status ODBCConnector::append(const std::string& table_name, RowBatch* batch,
|
||||
fmt::memory_buffer err_out;
|
||||
fmt::format_to(err_out, "can't convert this type to mysql type. type = {}",
|
||||
_output_expr_ctxs[j]->root()->type().type);
|
||||
return Status::InternalError(err_out.data());
|
||||
return Status::InternalError(fmt::to_string(err_out));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -902,7 +902,7 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) {
|
||||
fmt::memory_buffer buf;
|
||||
fmt::format_to(buf, "no partition for this tuple. tuple={}",
|
||||
Tuple::to_string(tuple, *_output_tuple_desc));
|
||||
return buf.data();
|
||||
return fmt::to_string(buf);
|
||||
},
|
||||
&stop_processing));
|
||||
_number_filtered_rows++;
|
||||
@ -1086,7 +1086,7 @@ Status OlapTableSink::_convert_batch(RuntimeState* state, RowBatch* input_batch,
|
||||
fmt::memory_buffer buf;
|
||||
fmt::format_to(buf, "null value for not null column, column={}",
|
||||
slot_desc->col_name());
|
||||
return buf.data();
|
||||
return fmt::to_string(buf);
|
||||
},
|
||||
&stop_processing));
|
||||
_number_filtered_rows++;
|
||||
@ -1221,7 +1221,7 @@ Status OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitma
|
||||
filter_bitmap->Set(row_no, true);
|
||||
RETURN_IF_ERROR(state->append_error_msg_to_file(
|
||||
[]() -> std::string { return ""; },
|
||||
[&]() -> std::string { return error_msg.data(); }, stop_processing));
|
||||
[&]() -> std::string { return fmt::to_string(error_msg); }, stop_processing));
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
|
||||
@ -461,8 +461,8 @@ Status RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
|
||||
}
|
||||
|
||||
if (out.size() > 0) {
|
||||
(*_error_log_file) << out.data() << std::endl;
|
||||
export_load_error(out.data());
|
||||
(*_error_log_file) << fmt::to_string(out) << std::endl;
|
||||
export_load_error(fmt::to_string(out));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -55,7 +55,7 @@ Status VMysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string
|
||||
if (res == nullptr) {
|
||||
fmt::memory_buffer err_ss;
|
||||
fmt::format_to(err_ss, "mysql_real_connect failed because : {}.", mysql_error(_mysql_conn));
|
||||
return Status::InternalError(err_ss.data());
|
||||
return Status::InternalError(fmt::to_string(err_ss.data()));
|
||||
}
|
||||
|
||||
// set character
|
||||
@ -63,7 +63,7 @@ Status VMysqlTableWriter::open(const MysqlConnInfo& conn_info, const std::string
|
||||
fmt::memory_buffer err_ss;
|
||||
fmt::format_to(err_ss, "mysql_set_character_set failed because : {}.",
|
||||
mysql_error(_mysql_conn));
|
||||
return Status::InternalError(err_ss.data());
|
||||
return Status::InternalError(fmt::to_string(err_ss.data()));
|
||||
}
|
||||
|
||||
_mysql_tbl = tbl;
|
||||
@ -190,7 +190,7 @@ Status VMysqlTableWriter::insert_row(vectorized::Block& block, size_t row) {
|
||||
fmt::memory_buffer err_out;
|
||||
fmt::format_to(err_out, "can't convert this type to mysql type. type = {}",
|
||||
_vec_output_expr_ctxs[i]->root()->type().type);
|
||||
return Status::InternalError(err_out.data());
|
||||
return Status::InternalError(fmt::to_string(err_out));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -202,7 +202,7 @@ Status VMysqlTableWriter::insert_row(vectorized::Block& block, size_t row) {
|
||||
fmt::memory_buffer err_ss;
|
||||
fmt::format_to(err_ss, "Insert to mysql server({}) failed, because: {}.",
|
||||
mysql_get_host_info(_mysql_conn), mysql_error(_mysql_conn));
|
||||
return Status::InternalError(err_ss.data());
|
||||
return Status::InternalError(fmt::to_string(err_ss));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
|
||||
@ -116,11 +116,10 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block)
|
||||
RETURN_IF_ERROR(state->append_error_msg_to_file(
|
||||
[]() -> std::string { return ""; },
|
||||
[&]() -> std::string {
|
||||
fmt::memory_buffer buf;
|
||||
fmt::format_to(buf, "no partition for this tuple. tuple=[]");
|
||||
return buf.data();
|
||||
},
|
||||
&stop_processing));
|
||||
fmt::memory_buffer buf;
|
||||
fmt::format_to(buf, "no partition for this tuple. tuple=[]");
|
||||
return fmt::to_string(buf);
|
||||
}, &stop_processing));
|
||||
_number_filtered_rows++;
|
||||
if (stop_processing) {
|
||||
return Status::EndOfFile("Encountered unqualified data, stop processing");
|
||||
@ -164,10 +163,9 @@ Status VOlapTableSink::_validate_data(RuntimeState* state, vectorized::Block* bl
|
||||
const auto num_rows = block->rows();
|
||||
fmt::memory_buffer error_msg;
|
||||
auto set_invalid_and_append_error_msg = [&](int row) {
|
||||
filter_bitmap->Set(row, true);
|
||||
return state->append_error_msg_to_file(
|
||||
[]() -> std::string { return ""; },
|
||||
[&error_msg]() -> std::string { return error_msg.data(); }, stop_processing);
|
||||
filter_bitmap->Set(row, true);
|
||||
return state->append_error_msg_to_file([]() -> std::string { return ""; },
|
||||
[&error_msg]() -> std::string { return fmt::to_string(error_msg); }, stop_processing);
|
||||
};
|
||||
|
||||
for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
|
||||
|
||||
Reference in New Issue
Block a user