diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index f0a86ed1ab..f90d293435 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -345,6 +345,12 @@ SET(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}") message(STATUS "Compiler Flags: ${CMAKE_CXX_FLAGS}") +if (CMAKE_GENERATOR STREQUAL "Ninja" AND NOT DISABLE_COLORED_BUILD) + # Turn on colored output. https://github.com/ninja-build/ninja/wiki/FAQ + set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always") + set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fdiagnostics-color=always") +endif () + # Thrift requires these two definitions for some types that we use add_definitions(-DHAVE_INTTYPES_H -DHAVE_NETINET_IN_H) diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index a07bf8ac99..4433f174e7 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -139,6 +139,7 @@ Status JsonScanner::open_next_reader() { std::string jsonpath = ""; bool strip_outer_array = false; bool num_as_string = false; + bool fuzzy_parse = false; if (range.__isset.jsonpaths) { jsonpath = range.jsonpaths; @@ -152,8 +153,11 @@ Status JsonScanner::open_next_reader() { if (range.__isset.num_as_string) { num_as_string = range.num_as_string; } - _cur_file_reader = - new JsonReader(_state, _counter, _profile, file, strip_outer_array, num_as_string); + if (range.__isset.fuzzy_parse) { + fuzzy_parse = range.fuzzy_parse; + } + _cur_file_reader = new JsonReader(_state, _counter, _profile, file, strip_outer_array, + num_as_string, fuzzy_parse); RETURN_IF_ERROR(_cur_file_reader->init(jsonpath, json_root)); return Status::OK(); @@ -185,7 +189,8 @@ rapidjson::Value::ConstValueIterator JsonDataInternal::get_next() { ////// class JsonReader JsonReader::JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, - FileReader* file_reader, bool strip_outer_array, bool num_as_string) + FileReader* file_reader, bool strip_outer_array, bool num_as_string, + bool fuzzy_parse) : _handle_json_callback(nullptr), _next_line(0), _total_lines(0), @@ -196,6 +201,7 @@ JsonReader::JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProf _closed(false), _strip_outer_array(strip_outer_array), _num_as_string(num_as_string), + _fuzzy_parse(fuzzy_parse), _json_doc(nullptr) { _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); _read_timer = ADD_TIMER(_profile, "ReadTime"); @@ -272,7 +278,7 @@ void JsonReader::_close() { Status JsonReader::_parse_json_doc(bool* eof) { // read a whole message, must be delete json_str by `delete[]` SCOPED_TIMER(_file_read_timer); - std::unique_ptr json_str; + std::unique_ptr json_str; size_t length = 0; RETURN_IF_ERROR(_file_reader->read_one_message(&json_str, &length)); _bytes_read_counter += length; @@ -300,7 +306,8 @@ Status JsonReader::_parse_json_doc(bool* eof) { str_error << "Parse json data for JsonDoc failed. code = " << _origin_json_doc.GetParseError() << ", error-info:" << rapidjson::GetParseError_En(_origin_json_doc.GetParseError()); - _state->append_error_msg_to_file(std::string((char*)json_str.get(), length), str_error.str()); + _state->append_error_msg_to_file(std::string((char*)json_str.get(), length), + str_error.str()); _counter->num_rows_filtered++; return Status::DataQualityError(str_error.str()); } @@ -427,9 +434,7 @@ void JsonReader::_write_data_to_tuple(rapidjson::Value::ConstValueIterator value // for simple format json void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, const std::vector& slot_descs, - const std::vector& value_key, MemPool* tuple_pool, bool* valid) { - DCHECK(slot_descs.size() == value_key.size()); if (!objectValue.IsObject()) { // Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"}, // not other type of Json format. @@ -441,21 +446,30 @@ void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, } int nullcount = 0; - for (int i = 0; i < slot_descs.size(); ++i) { - rapidjson::Value::ConstMemberIterator it = objectValue.FindMember(value_key[i]); + for (auto v : slot_descs) { + rapidjson::Value::ConstMemberIterator it = objectValue.MemberEnd(); + if (_fuzzy_parse) { + auto idx_it = _name_map.find(v->col_name()); + if (idx_it != _name_map.end() && idx_it->second < objectValue.MemberCount()) { + it = objectValue.MemberBegin() + idx_it->second; + } + } else { + it = objectValue.FindMember( + rapidjson::Value(v->col_name().c_str(), v->col_name().size())); + } if (it != objectValue.MemberEnd()) { const rapidjson::Value& value = it->value; - _write_data_to_tuple(&value, slot_descs[i], tuple, tuple_pool, valid); + _write_data_to_tuple(&value, v, tuple, tuple_pool, valid); if (!(*valid)) { return; } } else { // not found - if (slot_descs[i]->is_nullable()) { - tuple->set_null(slot_descs[i]->null_indicator_offset()); + if (v->is_nullable()) { + tuple->set_null(v->null_indicator_offset()); nullcount++; } else { std::stringstream str_error; - str_error << "The column `" << slot_descs[i]->col_name() + str_error << "The column `" << v->col_name() << "` is not nullable, but it's not found in jsondata."; _state->append_error_msg_to_file(_print_json_value(objectValue), str_error.str()); _counter->num_rows_filtered++; @@ -486,11 +500,6 @@ void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, */ Status JsonReader::_handle_simple_json(Tuple* tuple, const std::vector& slot_descs, MemPool* tuple_pool, bool* eof) { - // If you use a string as the key to find the json object, strlen will be called every time, so the key is constructed in advance - std::vector value_key; - for (auto v : slot_descs) { - value_key.emplace_back(v->col_name().c_str(), v->col_name().size()); - } do { bool valid = false; if (_next_line >= _total_lines) { // parse json and generic document @@ -502,6 +511,8 @@ Status JsonReader::_handle_simple_json(Tuple* tuple, const std::vectorIsArray()) { _total_lines = _json_doc->Size(); if (_total_lines == 0) { @@ -513,18 +524,30 @@ Status JsonReader::_handle_simple_json(Tuple* tuple, const std::vectornum_rows_filtered++; continue; } + objectValue = &(*_json_doc)[0]; } else { _total_lines = 1; // only one row + objectValue = _json_doc; } - _next_line = 0; + if (_fuzzy_parse) { + for (auto v : slot_descs) { + for (int i = 0; i < objectValue->MemberCount(); ++i) { + auto it = objectValue->MemberBegin() + i; + if (v->col_name() == it->name.GetString()) { + _name_map[v->col_name()] = i; + break; + } + } + } + } } if (_json_doc->IsArray()) { // handle case 1 rapidjson::Value& objectValue = (*_json_doc)[_next_line]; // json object - _set_tuple_value(objectValue, tuple, slot_descs, value_key, tuple_pool, &valid); + _set_tuple_value(objectValue, tuple, slot_descs, tuple_pool, &valid); } else { // handle case 2 - _set_tuple_value(*_json_doc, tuple, slot_descs, value_key, tuple_pool, &valid); + _set_tuple_value(*_json_doc, tuple, slot_descs, tuple_pool, &valid); } _next_line++; if (!valid) { diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h index 59759782d3..86bc2dd82a 100644 --- a/be/src/exec/json_scanner.h +++ b/be/src/exec/json_scanner.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include "common/status.h" @@ -104,7 +105,8 @@ struct JsonPath; class JsonReader { public: JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* profile, - FileReader* file_reader, bool strip_outer_array, bool num_as_string); + FileReader* file_reader, bool strip_outer_array, bool num_as_string, + bool fuzzy_parse); ~JsonReader(); @@ -129,8 +131,7 @@ private: const uint8_t* value, int32_t len); Status _parse_json_doc(bool* eof); void _set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple, - const std::vector& slot_descs, - const std::vector& value_key, MemPool* tuple_pool, + const std::vector& slot_descs, MemPool* tuple_pool, bool* valid); void _write_data_to_tuple(rapidjson::Value::ConstValueIterator value, SlotDescriptor* desc, Tuple* tuple, MemPool* tuple_pool, bool* valid); @@ -153,6 +154,7 @@ private: bool _closed; bool _strip_outer_array; bool _num_as_string; + bool _fuzzy_parse; RuntimeProfile::Counter* _bytes_read_counter; RuntimeProfile::Counter* _read_timer; RuntimeProfile::Counter* _file_read_timer; @@ -162,6 +164,7 @@ private: rapidjson::Document _origin_json_doc; // origin json document object from parsed json string rapidjson::Value* _json_doc; // _json_doc equals _final_json_doc iff not set `json_root` + std::unordered_map _name_map; }; } // namespace doris diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index 0a185ce8bd..879bc34a32 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -305,6 +305,15 @@ Status MiniLoadAction::_merge_header(HttpRequest* http_req, } else { (*params)[HTTP_STRIP_OUTER_ARRAY] = "false"; } + if (!http_req->header(HTTP_FUZZY_PARSE).empty()) { + if (boost::iequals(http_req->header(HTTP_FUZZY_PARSE), "true")) { + (*params)[HTTP_FUZZY_PARSE] = "true"; + } else { + (*params)[HTTP_FUZZY_PARSE] = "false"; + } + } else { + (*params)[HTTP_FUZZY_PARSE] = "false"; + } if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { (*params)[HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL] = http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL); @@ -923,6 +932,7 @@ void MiniLoadAction::_new_handle(HttpRequest* req) { } std::string str = ctx->to_json_for_mini_load(); + str += '\n'; HttpChannel::send_reply(req, str); } diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index c9abc73b8b..1c9b7c30c9 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -128,6 +128,8 @@ void StreamLoadAction::handle(HttpRequest* req) { } auto str = ctx->to_json(); + // add new line at end + str = str + '\n'; HttpChannel::send_reply(req, str); // update statstics @@ -195,6 +197,8 @@ int StreamLoadAction::on_header(HttpRequest* req) { ctx->body_sink->cancel(); } auto str = ctx->to_json(); + // add new line at end + str = str + '\n'; HttpChannel::send_reply(req, str); streaming_load_current_processing->increment(-1); return -1; @@ -408,6 +412,15 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* } else { request.__set_num_as_string(false); } + if (!http_req->header(HTTP_FUZZY_PARSE).empty()) { + if (boost::iequals(http_req->header(HTTP_FUZZY_PARSE), "true")) { + request.__set_fuzzy_parse(true); + } else { + request.__set_fuzzy_parse(false); + } + } else { + request.__set_fuzzy_parse(false); + } if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { request.__set_sequence_col( http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL)); diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index f6a89e2006..5681db6e0c 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -40,6 +40,8 @@ static const std::string HTTP_JSONPATHS = "jsonpaths"; static const std::string HTTP_JSONROOT = "json_root"; static const std::string HTTP_STRIP_OUTER_ARRAY = "strip_outer_array"; static const std::string HTTP_NUM_AS_STRING = "num_as_string"; +static const std::string HTTP_FUZZY_PARSE = "fuzzy_parse"; + static const std::string HTTP_MERGE_TYPE = "merge_type"; static const std::string HTTP_DELETE_CONDITION = "delete"; static const std::string HTTP_FUNCTION_COLUMN = "function_column"; diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index 39d587b859..5440c7fbd9 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -127,6 +127,8 @@ json_root is a valid JSONPATH string that specifies the root node of the JSON Do The type of data merging supports three types: APPEND, DELETE, and MERGE. APPEND is the default value, which means that all this batch of data needs to be appended to the existing data. DELETE means to delete all rows with the same key as this batch of data. MERGE semantics Need to be used in conjunction with the delete condition, which means that the data that meets the delete condition is processed according to DELETE semantics and the rest is processed according to APPEND semantics +`fuzzy_parse` Boolean type, true to indicate that parse json schema as the first line, this can make import more faster,but need all key keep the order of first line, default value is false. Only use for json format. + RETURN VALUES After the load is completed, the related content of this load will be returned in Json format. Current field included diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md index 82ebd447f9..4c4ecc0f22 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md @@ -96,6 +96,8 @@ under the License. function_column.sequence_col: 只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence列进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。 + + fuzzy_parse: 布尔类型,为true表示json将以第一行为schema 进行解析,开启这个选项可以提高json 导入效率,但是要求要求所有json 对象的key的顺序和第一行一致, 默认为false,仅用于json 格式 RETURN VALUES 导入完成后,会以Json格式返回这次导入的相关内容。当前包括一下字段 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java index 4f16572819..69ca52046e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java @@ -52,6 +52,7 @@ public class AlterRoutineLoadStmt extends DdlStmt { .add(CreateRoutineLoadStmt.JSONROOT) .add(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY) .add(CreateRoutineLoadStmt.NUM_AS_STRING) + .add(CreateRoutineLoadStmt.FUZZY_PARSE) .add(LoadStmt.STRICT_MODE) .add(LoadStmt.TIMEZONE) .build(); @@ -188,6 +189,12 @@ public class AlterRoutineLoadStmt extends DdlStmt { analyzedJobProperties.put(jobProperties.get(CreateRoutineLoadStmt.NUM_AS_STRING), String.valueOf(numAsString)); } + + if (jobProperties.containsKey(CreateRoutineLoadStmt.FUZZY_PARSE)) { + boolean fuzzyParse = Boolean.valueOf(jobProperties.get(CreateRoutineLoadStmt.FUZZY_PARSE)); + analyzedJobProperties.put(jobProperties.get(CreateRoutineLoadStmt.FUZZY_PARSE), + String.valueOf(fuzzyParse)); + } } private void checkDataSourceProperties() throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 7b1c0db277..ee629becea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -102,6 +102,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final String JSONPATHS = "jsonpaths"; public static final String JSONROOT = "json_root"; public static final String NUM_AS_STRING = "num_as_string"; + public static final String FUZZY_PARSE = "fuzzy_parse"; // kafka type properties public static final String KAFKA_BROKER_LIST_PROPERTY = "kafka_broker_list"; @@ -124,6 +125,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(JSONPATHS) .add(STRIP_OUTER_ARRAY) .add(NUM_AS_STRING) + .add(FUZZY_PARSE) .add(JSONROOT) .add(LoadStmt.STRICT_MODE) .add(LoadStmt.TIMEZONE) @@ -168,6 +170,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { private String jsonRoot = ""; // MUST be a jsonpath string private boolean stripOuterArray = false; private boolean numAsString = false; + private boolean fuzzyParse = false; // kafka related properties private String kafkaBrokerList; @@ -262,6 +265,10 @@ public class CreateRoutineLoadStmt extends DdlStmt { return numAsString; } + public boolean isFuzzyParse() { + return fuzzyParse; + } + public String getJsonPaths() { return jsonPaths; } @@ -439,6 +446,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { jsonRoot = jobProperties.get(JSONROOT); stripOuterArray = Boolean.valueOf(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false")); numAsString = Boolean.valueOf(jobProperties.getOrDefault(NUM_AS_STRING, "false")); + fuzzyParse = Boolean.valueOf(jobProperties.getOrDefault(FUZZY_PARSE, "false")); } else { throw new UserException("Format type is invalid. format=`" + format + "`"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 64cfd765d3..577aa99805 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -116,6 +116,7 @@ public class DataDescription { private boolean stripOuterArray = false; private String jsonPaths = ""; private String jsonRoot = ""; + private boolean fuzzyParse = false; private String sequenceCol; @@ -477,6 +478,14 @@ public class DataDescription { this.stripOuterArray = stripOuterArray; } + public boolean isFuzzyParse() { + return fuzzyParse; + } + + public void setFuzzyParse(boolean fuzzyParse) { + this.fuzzyParse = fuzzyParse; + } + public String getJsonPaths() { return jsonPaths; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index 8cd60138da..cfca47263f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -107,6 +107,7 @@ public class LoadStmt extends DdlStmt { public static final String KEY_IN_PARAM_JSONPATHS = "jsonpaths"; public static final String KEY_IN_PARAM_JSONROOT = "json_root"; public static final String KEY_IN_PARAM_STRIP_OUTER_ARRAY = "strip_outer_array"; + public static final String KEY_IN_PARAM_FUZZY_PARSE = "fuzzy_parse"; public static final String KEY_IN_PARAM_MERGE_TYPE = "merge_type"; public static final String KEY_IN_PARAM_DELETE_CONDITION = "delete"; public static final String KEY_IN_PARAM_FUNCTION_COLUMN = "function_column"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/UploadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/UploadAction.java index 0a09c5bba6..61b7f807c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/UploadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/UploadAction.java @@ -271,6 +271,7 @@ public class UploadAction extends RestBaseController { public String stripOuterArray; public String jsonRoot; public String numAsString; + public String fuzzyParse; public LoadContext(HttpServletRequest request, String db, String tbl, String user, String passwd, TmpFileMgr.TmpFile file) { @@ -302,6 +303,7 @@ public class UploadAction extends RestBaseController { this.stripOuterArray = request.getHeader("strip_outer_array"); this.numAsString = request.getHeader("num_as_string"); this.jsonRoot = request.getHeader("json_root"); + this.fuzzyParse = request.getHeader("fuzzy_parse"); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index fc7df6bdc8..6278ff89c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -99,6 +99,7 @@ public class BrokerFileGroup implements Writable { private boolean stripOuterArray = false; private String jsonPaths = ""; private String jsonRoot = ""; + private boolean fuzzyParse = true; // for unit test and edit log persistence private BrokerFileGroup() { @@ -227,6 +228,7 @@ public class BrokerFileGroup implements Writable { stripOuterArray = dataDescription.isStripOuterArray(); jsonPaths = dataDescription.getJsonPaths(); jsonRoot = dataDescription.getJsonRoot(); + fuzzyParse = dataDescription.isFuzzyParse(); } } @@ -326,6 +328,14 @@ public class BrokerFileGroup implements Writable { this.stripOuterArray = stripOuterArray; } + public boolean isFuzzyParse() { + return fuzzyParse; + } + + public void setFuzzyParse(boolean fuzzyParse) { + this.fuzzyParse = fuzzyParse; + } + public String getJsonPaths() { return jsonPaths; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index f1d858b766..3824df9452 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -198,6 +198,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl private static final String PROPS_NUM_AS_STRING = "num_as_string"; private static final String PROPS_JSONPATHS = "jsonpaths"; private static final String PROPS_JSONROOT = "json_root"; + private static final String PROPS_FUZZY_PARSE = "fuzzy_parse"; protected int currentTaskConcurrentNum; protected RoutineLoadProgress progress; @@ -308,6 +309,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl jobProperties.put(PROPS_NUM_AS_STRING, "false"); jobProperties.put(PROPS_JSONPATHS, ""); jobProperties.put(PROPS_JSONROOT, ""); + jobProperties.put(PROPS_FUZZY_PARSE, "false"); } else if (stmt.getFormat().equals("json")) { jobProperties.put(PROPS_FORMAT, "json"); if (!Strings.isNullOrEmpty(stmt.getJsonPaths())) { @@ -330,6 +332,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } else { jobProperties.put(PROPS_NUM_AS_STRING, "false"); } + if (stmt.isFuzzyParse()) { + jobProperties.put(PROPS_FUZZY_PARSE, "true"); + } else { + jobProperties.put(PROPS_FUZZY_PARSE, "false"); + } } else { throw new UserException("Invalid format type."); @@ -560,6 +567,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl return Boolean.valueOf(jobProperties.get(PROPS_NUM_AS_STRING)); } + public boolean isFuzzyParse() { + return Boolean.valueOf(jobProperties.get(PROPS_FUZZY_PARSE)); + } + @Override public String getPath() { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index e7dd2feaad..5923d9c10c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -427,6 +427,7 @@ public class BrokerScanNode extends LoadScanNode { rangeDesc.setStripOuterArray(context.fileGroup.isStripOuterArray()); rangeDesc.setJsonpaths(context.fileGroup.getJsonPaths()); rangeDesc.setJsonRoot(context.fileGroup.getJsonRoot()); + rangeDesc.setFuzzyParse(context.fileGroup.isFuzzyParse()); } brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset += rangeBytes; @@ -451,6 +452,7 @@ public class BrokerScanNode extends LoadScanNode { rangeDesc.setStripOuterArray(context.fileGroup.isStripOuterArray()); rangeDesc.setJsonpaths(context.fileGroup.getJsonPaths()); rangeDesc.setJsonRoot(context.fileGroup.getJsonRoot()); + rangeDesc.setFuzzyParse(context.fileGroup.isFuzzyParse()); } brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index 4b8a7ef2eb..b31578bc37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -94,6 +94,7 @@ public class StreamLoadScanNode extends LoadScanNode { } rangeDesc.setStripOuterArray(taskInfo.isStripOuterArray()); rangeDesc.setNumAsString(taskInfo.isNumAsString()); + rangeDesc.setFuzzyParse(taskInfo.isFuzzyParse()); } rangeDesc.splittable = false; switch (taskInfo.getFileType()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index 9d2d853b56..24fc3e4f5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -470,6 +470,7 @@ public class MultiLoadMgr { boolean stripOuterArray = false; String jsonPaths = ""; String jsonRoot = ""; + boolean fuzzyParse = false; if (properties != null) { colString = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS); String columnSeparatorStr = properties.get(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR); @@ -502,6 +503,8 @@ public class MultiLoadMgr { properties.getOrDefault(LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY, "false")); jsonPaths = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONPATHS, ""); jsonRoot = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONROOT, ""); + fuzzyParse = Boolean.valueOf( + properties.getOrDefault(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE, "false")); } } DataDescription dataDescription = new DataDescription(tbl, partitionNames, files, null, columnSeparator, @@ -518,6 +521,7 @@ public class MultiLoadMgr { dataDescription.setJsonPaths(jsonPaths); dataDescription.setJsonRoot(jsonRoot); dataDescription.setStripOuterArray(stripOuterArray); + dataDescription.setFuzzyParse(fuzzyParse); return dataDescription; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index 7692d31754..ec335e1bd2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -42,6 +42,7 @@ public interface LoadTaskInfo { public String getJsonPaths(); public String getJsonRoot(); public boolean isStripOuterArray(); + public boolean isFuzzyParse(); public boolean isNumAsString(); public String getPath(); public List getColumnExprDescs(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 2baf1b1577..7ccdd46a1d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -60,6 +60,7 @@ public class StreamLoadTask implements LoadTaskInfo { private boolean numAsString; private String jsonPaths; private String jsonRoot; + private boolean fuzzyParse; // optional private List columnExprDescs = Lists.newArrayList(); @@ -85,6 +86,7 @@ public class StreamLoadTask implements LoadTaskInfo { this.jsonRoot = ""; this.stripOuterArray = false; this.numAsString = false; + this.fuzzyParse = false; } public TUniqueId getId() { @@ -148,6 +150,15 @@ public class StreamLoadTask implements LoadTaskInfo { return numAsString; } + @Override + public boolean isFuzzyParse() { + return fuzzyParse; + } + + public void setFuzzyParse(boolean fuzzyParse) { + this.fuzzyParse = fuzzyParse; + } + public void setStripOuterArray(boolean stripOuterArray) { this.stripOuterArray = stripOuterArray; } @@ -239,6 +250,7 @@ public class StreamLoadTask implements LoadTaskInfo { } stripOuterArray = request.isStripOuterArray(); numAsString = request.isNumAsString(); + fuzzyParse = request.isFuzzyParse(); } if (request.isSetMergeType()) { try { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 35182f56be..b10fbe7d16 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -575,6 +575,7 @@ struct TStreamLoadPutRequest { 28: optional string delete_condition 29: optional string sequence_col 30: optional bool num_as_string + 31: optional bool fuzzy_parse } struct TStreamLoadPutResult { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index ffc921004d..1f83574f49 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -133,6 +133,7 @@ struct TBrokerRangeDesc { 13: optional string json_root; // it's usefull when format_type == FORMAT_JSON 14: optional bool num_as_string; + 15: optional bool fuzzy_parse; } struct TBrokerScanRangeParams { diff --git a/run-be-ut.sh b/run-be-ut.sh index dafd484681..f9c205efa8 100755 --- a/run-be-ut.sh +++ b/run-be-ut.sh @@ -107,9 +107,15 @@ if [ ! -d ${CMAKE_BUILD_DIR} ]; then fi cd ${CMAKE_BUILD_DIR} +GENERATOR="Unix Makefiles" +BUILD_SYSTEM="make" +if ninja --version 2>/dev/null; then + GENERATOR="Ninja" + BUILD_SYSTEM="ninja" +fi -${CMAKE_CMD} ../ -DWITH_MYSQL=OFF -DMAKE_TEST=ON -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -make -j${PARALLEL} +${CMAKE_CMD} -G "${GENERATOR}" ../ -DWITH_MYSQL=OFF -DMAKE_TEST=ON -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} +${BUILD_SYSTEM} -j${PARALLEL} if [ ${RUN} -ne 1 ]; then echo "Finished"