diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index d3ae2849de..2ba617295c 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -258,6 +258,7 @@ set(VEC_FILES exec/scan/new_es_scan_node.cpp exec/format/csv/csv_reader.cpp exec/format/orc/vorc_reader.cpp + exec/format/json/new_json_reader.cpp ) add_library(Vec STATIC diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp new file mode 100644 index 0000000000..add32361e3 --- /dev/null +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -0,0 +1,754 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/json/new_json_reader.h" + +#include "common/compiler_util.h" +#include "exec/plain_text_line_reader.h" +#include "exprs/json_functions.h" +#include "io/file_factory.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_state.h" +#include "vec/core/block.h" +#include "vec/exec/scan/vscanner.h" +namespace doris::vectorized { + +NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + const std::vector& file_slot_descs, bool* scanner_eof) + : _vhandle_json_callback(nullptr), + _state(state), + _profile(profile), + _counter(counter), + _params(params), + _range(range), + _file_slot_descs(file_slot_descs), + _file_reader(nullptr), + _file_reader_s(nullptr), + _real_file_reader(nullptr), + _line_reader(nullptr), + _reader_eof(false), + _skip_first_line(false), + _next_row(0), + _total_rows(0), + _value_allocator(_value_buffer, sizeof(_value_buffer)), + _parse_allocator(_parse_buffer, sizeof(_parse_buffer)), + _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator), + _scanner_eof(scanner_eof) { + _file_format_type = _params.format_type; + + _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES); + _read_timer = ADD_TIMER(_profile, "ReadTime"); + _file_read_timer = ADD_TIMER(_profile, "FileReadTime"); +} + +Status NewJsonReader::init_reader() { + RETURN_IF_ERROR(_get_range_params()); + + RETURN_IF_ERROR(_open_file_reader()); + if (_read_json_by_line) { + RETURN_IF_ERROR(_open_line_reader()); + } + + // generate _parsed_jsonpaths and _parsed_json_root + RETURN_IF_ERROR(_parse_jsonpath_and_json_root()); + + //improve performance + if (_parsed_jsonpaths.empty()) { // input is a simple json-string + _vhandle_json_callback = &NewJsonReader::_vhandle_simple_json; + } else { // input is a complex json-string and a json-path + if (_strip_outer_array) { + _vhandle_json_callback = &NewJsonReader::_vhandle_flat_array_complex_json; + } else { + _vhandle_json_callback = &NewJsonReader::_vhandle_nested_complex_json; + } + } + return Status::OK(); +} + +Status NewJsonReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + if (_reader_eof == true) { + *eof = true; + return Status::OK(); + } + + const int batch_size = _state->batch_size(); + auto columns = block->mutate_columns(); + + while (columns[0]->size() < batch_size && !_reader_eof) { + if (UNLIKELY(_read_json_by_line && _skip_first_line)) { + size_t size = 0; + const uint8_t* line_ptr = nullptr; + RETURN_IF_ERROR(_line_reader->read_line(&line_ptr, &size, &_reader_eof)); + _skip_first_line = false; + continue; + } + + bool is_empty_row = false; + + RETURN_IF_ERROR(_read_json_column(columns, _file_slot_descs, &is_empty_row, &_reader_eof)); + ++(*read_rows); + if (is_empty_row) { + // Read empty row, just continue + continue; + } + } + + columns.clear(); + return Status::OK(); +} + +Status NewJsonReader::get_columns(std::unordered_map* name_to_type, + std::unordered_set* missing_cols) { + for (auto& slot : _file_slot_descs) { + name_to_type->emplace(slot->col_name(), slot->type()); + } + return Status::OK(); +} + +Status NewJsonReader::_get_range_params() { + if (!_params.__isset.file_attributes) { + return Status::InternalError("BE cat get file_attributes"); + } + + // get line_delimiter + if (_params.file_attributes.__isset.text_params && + _params.file_attributes.text_params.__isset.line_delimiter) { + _line_delimiter = _params.file_attributes.text_params.line_delimiter; + _line_delimiter_length = _line_delimiter.size(); + } + + if (_params.file_attributes.__isset.jsonpaths) { + _jsonpaths = _params.file_attributes.jsonpaths; + } + if (_params.file_attributes.__isset.json_root) { + _json_root = _params.file_attributes.json_root; + } + if (_params.file_attributes.__isset.read_json_by_line) { + _read_json_by_line = _params.file_attributes.read_json_by_line; + } + if (_params.file_attributes.__isset.strip_outer_array) { + _strip_outer_array = _params.file_attributes.strip_outer_array; + } + if (_params.file_attributes.__isset.num_as_string) { + _num_as_string = _params.file_attributes.num_as_string; + } + if (_params.file_attributes.__isset.fuzzy_parse) { + _fuzzy_parse = _params.file_attributes.fuzzy_parse; + } + return Status::OK(); +} + +Status NewJsonReader::_open_file_reader() { + int64_t start_offset = _range.start_offset; + if (start_offset != 0) { + start_offset -= 1; + } + + if (_params.file_type == TFileType::FILE_STREAM) { + RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, _file_reader_s)); + _real_file_reader = _file_reader_s.get(); + } else { + RETURN_IF_ERROR(FileFactory::create_file_reader( + _profile, _params, _range.path, start_offset, _range.file_size, 0, _file_reader)); + _real_file_reader = _file_reader.get(); + } + return _real_file_reader->open(); +} + +Status NewJsonReader::_open_line_reader() { + int64_t size = _range.size; + if (_range.start_offset != 0) { + // When we fetch range doesn't start from 0, size will += 1. + + // TODO(ftw): check what if file_reader is stream_pipe? Is `size+=1` is correct? + size += 1; + _skip_first_line = true; + } else { + _skip_first_line = false; + } + _line_reader.reset(new PlainTextLineReader(_profile, _real_file_reader, nullptr, size, + _line_delimiter, _line_delimiter_length)); + return Status::OK(); +} + +Status NewJsonReader::_parse_jsonpath_and_json_root() { + // parse jsonpaths + if (!_jsonpaths.empty()) { + rapidjson::Document jsonpaths_doc; + if (!jsonpaths_doc.Parse(_jsonpaths.c_str(), _jsonpaths.length()).HasParseError()) { + if (!jsonpaths_doc.IsArray()) { + return Status::InvalidArgument("Invalid json path: {}", _jsonpaths); + } else { + for (int i = 0; i < jsonpaths_doc.Size(); i++) { + const rapidjson::Value& path = jsonpaths_doc[i]; + if (!path.IsString()) { + return Status::InvalidArgument("Invalid json path: {}", _jsonpaths); + } + std::vector parsed_paths; + JsonFunctions::parse_json_paths(path.GetString(), &parsed_paths); + _parsed_jsonpaths.push_back(std::move(parsed_paths)); + } + } + } else { + return Status::InvalidArgument("Invalid json path: {}", _jsonpaths); + } + } + + // parse jsonroot + if (!_json_root.empty()) { + JsonFunctions::parse_json_paths(_json_root, &_parsed_json_root); + } + return Status::OK(); +} + +Status NewJsonReader::_read_json_column(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { + return (this->*_vhandle_json_callback)(columns, slot_descs, is_empty_row, eof); +} + +Status NewJsonReader::_vhandle_simple_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { + do { + bool valid = false; + if (_next_row >= _total_rows) { // parse json and generic document + Status st = _parse_json(is_empty_row, eof); + if (st.is_data_quality_error()) { + continue; // continue to read next + } + RETURN_IF_ERROR(st); + if (*is_empty_row == true) { + return Status::OK(); + } + _name_map.clear(); + rapidjson::Value* objectValue = nullptr; + if (_json_doc->IsArray()) { + _total_rows = _json_doc->Size(); + if (_total_rows == 0) { + // may be passing an empty json, such as "[]" + RETURN_IF_ERROR(_append_error_msg(*_json_doc, "Empty json line", "", nullptr)); + + // TODO(ftw): check _reader_eof?? + if (_reader_eof) { + *is_empty_row = true; + return Status::OK(); + } + continue; + } + objectValue = &(*_json_doc)[0]; + } else { + _total_rows = 1; // only one row + objectValue = _json_doc; + } + _next_row = 0; + if (_fuzzy_parse) { + for (auto v : slot_descs) { + for (int i = 0; i < objectValue->MemberCount(); ++i) { + auto it = objectValue->MemberBegin() + i; + if (v->col_name() == it->name.GetString()) { + _name_map[v->col_name()] = i; + break; + } + } + } + } + } + + if (_json_doc->IsArray()) { // handle case 1 + rapidjson::Value& objectValue = (*_json_doc)[_next_row]; // json object + RETURN_IF_ERROR(_set_column_value(objectValue, columns, slot_descs, &valid)); + } else { // handle case 2 + RETURN_IF_ERROR(_set_column_value(*_json_doc, columns, slot_descs, &valid)); + } + _next_row++; + if (!valid) { + if (*_scanner_eof) { + // When _scanner_eof is true and valid is false, it means that we have encountered + // unqualified data and decided to stop the scan. + *is_empty_row = true; + // TODO(ftw): check *eof=true? + *eof = true; + return Status::OK(); + } + continue; + } + *is_empty_row = false; + break; // get a valid row, then break + } while (_next_row <= _total_rows); + return Status::OK(); +} + +Status NewJsonReader::_vhandle_flat_array_complex_json( + std::vector& columns, const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { + do { + if (_next_row >= _total_rows) { + Status st = _parse_json(is_empty_row, eof); + if (st.is_data_quality_error()) { + continue; // continue to read next + } + RETURN_IF_ERROR(st); + if (*is_empty_row == true) { + if (st == Status::OK()) { + return Status::OK(); + } + if (_total_rows == 0) { + continue; + } + } + } + rapidjson::Value& objectValue = (*_json_doc)[_next_row++]; + bool valid = true; + RETURN_IF_ERROR(_write_columns_by_jsonpath(objectValue, slot_descs, columns, &valid)); + if (!valid) { + continue; // process next line + } + *is_empty_row = false; + break; // get a valid row, then break + } while (_next_row <= _total_rows); + return Status::OK(); +} + +Status NewJsonReader::_vhandle_nested_complex_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof) { + while (true) { + Status st = _parse_json(is_empty_row, eof); + if (st.is_data_quality_error()) { + continue; // continue to read next + } + RETURN_IF_ERROR(st); + if (*is_empty_row == true) { + return Status::OK(); + } + *is_empty_row = false; + break; // read a valid row + } + bool valid = true; + RETURN_IF_ERROR(_write_columns_by_jsonpath(*_json_doc, slot_descs, columns, &valid)); + if (!valid) { + // there is only one line in this case, so if it return false, just set is_empty_row true + // so that the caller will continue reading next line. + *is_empty_row = true; + } + return Status::OK(); +} + +Status NewJsonReader::_parse_json(bool* is_empty_row, bool* eof) { + size_t size = 0; + RETURN_IF_ERROR(_parse_json_doc(&size, eof)); + + // read all data, then return + if (size == 0 || *eof) { + *is_empty_row = true; + return Status::OK(); + } + + if (!_parsed_jsonpaths.empty() && _strip_outer_array) { + _total_rows = _json_doc->Size(); + _next_row = 0; + + if (_total_rows == 0) { + // meet an empty json array. + *is_empty_row = true; + } + } + return Status::OK(); +} + +// read one json string from line reader or file reader and parse it to json doc. +// return Status::DataQualityError() if data has quality error. +// return other error if encounter other problemes. +// return Status::OK() if parse succeed or reach EOF. +Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) { + // read a whole message + SCOPED_TIMER(_file_read_timer); + const uint8_t* json_str = nullptr; + std::unique_ptr json_str_ptr; + if (_line_reader != nullptr) { + RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof)); + } else { + int64_t length = 0; + RETURN_IF_ERROR(_real_file_reader->read_one_message(&json_str_ptr, &length)); + json_str = json_str_ptr.get(); + *size = length; + if (length == 0) { + *eof = true; + } + } + + _bytes_read_counter += *size; + if (*eof) { + return Status::OK(); + } + + // clear memory here. + _value_allocator.Clear(); + _parse_allocator.Clear(); + bool has_parse_error = false; + // parse jsondata to JsonDoc + + // As the issue: https://github.com/Tencent/rapidjson/issues/1458 + // Now, rapidjson only support uint64_t, So lagreint load cause bug. We use kParseNumbersAsStringsFlag. + if (_num_as_string) { + has_parse_error = + _origin_json_doc + .Parse((char*)json_str, *size) + .HasParseError(); + } else { + has_parse_error = _origin_json_doc.Parse((char*)json_str, *size).HasParseError(); + } + + if (has_parse_error) { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}", + _origin_json_doc.GetParseError(), + rapidjson::GetParseError_En(_origin_json_doc.GetParseError())); + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { return std::string((char*)json_str, *size); }, + [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); + _counter->num_rows_filtered++; + if (*_scanner_eof) { + // Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means + // we meet enough invalid rows and the scanner should be stopped. + // So we set eof to true and return OK, the caller will stop the process as we meet the end of file. + *eof = true; + return Status::OK(); + } + return Status::DataQualityError(fmt::to_string(error_msg)); + } + + // set json root + if (_parsed_json_root.size() != 0) { + _json_doc = JsonFunctions::get_json_object_from_parsed_json( + _parsed_json_root, &_origin_json_doc, _origin_json_doc.GetAllocator()); + if (_json_doc == nullptr) { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "{}", "JSON Root not found."); + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { return _print_json_value(_origin_json_doc); }, + [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); + _counter->num_rows_filtered++; + if (*_scanner_eof) { + // Same as Case A + *eof = true; + return Status::OK(); + } + return Status::DataQualityError(fmt::to_string(error_msg)); + } + } else { + _json_doc = &_origin_json_doc; + } + + if (_json_doc->IsArray() && !_strip_outer_array) { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "{}", + "JSON data is array-object, `strip_outer_array` must be TRUE."); + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { return _print_json_value(_origin_json_doc); }, + [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); + _counter->num_rows_filtered++; + if (*_scanner_eof) { + // Same as Case A + *eof = true; + return Status::OK(); + } + return Status::DataQualityError(fmt::to_string(error_msg)); + } + + if (!_json_doc->IsArray() && _strip_outer_array) { + fmt::memory_buffer error_msg; + fmt::format_to(error_msg, "{}", + "JSON data is not an array-object, `strip_outer_array` must be FALSE."); + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { return _print_json_value(_origin_json_doc); }, + [&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof)); + _counter->num_rows_filtered++; + if (*_scanner_eof) { + // Same as Case A + *eof = true; + return Status::OK(); + } + return Status::DataQualityError(fmt::to_string(error_msg)); + } + + return Status::OK(); +} + +// for simple format json +// set valid to true and return OK if succeed. +// set valid to false and return OK if we met an invalid row. +// return other status if encounter other problmes. +Status NewJsonReader::_set_column_value(rapidjson::Value& objectValue, + std::vector& columns, + const std::vector& slot_descs, + bool* valid) { + if (!objectValue.IsObject()) { + // Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"}, + // not other type of Json format. + RETURN_IF_ERROR(_append_error_msg(objectValue, "Expect json object value", "", valid)); + return Status::OK(); + } + + int ctx_idx = 0; + bool has_valid_value = false; + size_t cur_row_count = columns[0]->size(); + for (auto slot_desc : slot_descs) { + if (!slot_desc->is_materialized()) { + continue; + } + + int dest_index = ctx_idx++; + auto* column_ptr = columns[dest_index].get(); + rapidjson::Value::ConstMemberIterator it = objectValue.MemberEnd(); + + if (_fuzzy_parse) { + auto idx_it = _name_map.find(slot_desc->col_name()); + if (idx_it != _name_map.end() && idx_it->second < objectValue.MemberCount()) { + it = objectValue.MemberBegin() + idx_it->second; + } + } else { + it = objectValue.FindMember( + rapidjson::Value(slot_desc->col_name().c_str(), slot_desc->col_name().size())); + } + + if (it != objectValue.MemberEnd()) { + const rapidjson::Value& value = it->value; + RETURN_IF_ERROR(_write_data_to_column(&value, slot_desc, column_ptr, valid)); + if (!(*valid)) { + return Status::OK(); + } + has_valid_value = true; + } else { // not found + // When the entire row has no valid value, this row should be filtered, + // so the default value cannot be directly inserted here + if (!slot_desc->is_nullable()) { + RETURN_IF_ERROR(_append_error_msg( + objectValue, + "The column `{}` is not nullable, but it's not found in jsondata.", + slot_desc->col_name(), valid)); + break; + } + } + } + if (!has_valid_value) { + RETURN_IF_ERROR(_append_error_msg(objectValue, "All fields is null, this is a invalid row.", + "", valid)); + return Status::OK(); + } + ctx_idx = 0; + int nullcount = 0; + // fill missing slot + for (auto slot_desc : slot_descs) { + if (!slot_desc->is_materialized()) { + continue; + } + int dest_index = ctx_idx++; + auto* column_ptr = columns[dest_index].get(); + if (column_ptr->size() < cur_row_count + 1) { + DCHECK(column_ptr->size() == cur_row_count); + column_ptr->assume_mutable()->insert_default(); + ++nullcount; + } + DCHECK(column_ptr->size() == cur_row_count + 1); + } + // There is at least one valid value here + DCHECK(nullcount < columns.size()); + *valid = true; + return Status::OK(); +} + +Status NewJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator value, + SlotDescriptor* slot_desc, + vectorized::IColumn* column_ptr, bool* valid) { + const char* str_value = nullptr; + char tmp_buf[128] = {0}; + int32_t wbytes = 0; + std::string json_str; + + vectorized::ColumnNullable* nullable_column = nullptr; + if (slot_desc->is_nullable()) { + nullable_column = reinterpret_cast(column_ptr); + // kNullType will put 1 into the Null map, so there is no need to push 0 for kNullType. + if (value->GetType() != rapidjson::Type::kNullType) { + nullable_column->get_null_map_data().push_back(0); + } else { + nullable_column->insert_default(); + } + column_ptr = &nullable_column->get_nested_column(); + } + + switch (value->GetType()) { + case rapidjson::Type::kStringType: + str_value = value->GetString(); + wbytes = strlen(str_value); + break; + case rapidjson::Type::kNumberType: + if (value->IsUint()) { + wbytes = sprintf(tmp_buf, "%u", value->GetUint()); + } else if (value->IsInt()) { + wbytes = sprintf(tmp_buf, "%d", value->GetInt()); + } else if (value->IsUint64()) { + wbytes = sprintf(tmp_buf, "%" PRIu64, value->GetUint64()); + } else if (value->IsInt64()) { + wbytes = sprintf(tmp_buf, "%" PRId64, value->GetInt64()); + } else { + wbytes = sprintf(tmp_buf, "%f", value->GetDouble()); + } + str_value = tmp_buf; + break; + case rapidjson::Type::kFalseType: + wbytes = 1; + str_value = (char*)"0"; + break; + case rapidjson::Type::kTrueType: + wbytes = 1; + str_value = (char*)"1"; + break; + case rapidjson::Type::kNullType: + if (!slot_desc->is_nullable()) { + RETURN_IF_ERROR(_append_error_msg( + *value, "Json value is null, but the column `{}` is not nullable.", + slot_desc->col_name(), valid)); + return Status::OK(); + } + // return immediately to prevent from repeatedly insert_data + *valid = true; + return Status::OK(); + default: + // for other type like array or object. we convert it to string to save + json_str = NewJsonReader::_print_json_value(*value); + wbytes = json_str.size(); + str_value = json_str.c_str(); + break; + } + + // TODO: if the vexpr can support another 'slot_desc type' than 'TYPE_VARCHAR', + // we need use a function to support these types to insert data in columns. + DCHECK(slot_desc->type().type == TYPE_VARCHAR); + assert_cast(column_ptr)->insert_data(str_value, wbytes); + + *valid = true; + return Status::OK(); +} + +Status NewJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue, + const std::vector& slot_descs, + std::vector& columns, + bool* valid) { + int ctx_idx = 0; + bool has_valid_value = false; + size_t cur_row_count = columns[0]->size(); + for (auto slot_desc : slot_descs) { + if (!slot_desc->is_materialized()) { + continue; + } + int i = ctx_idx++; + auto* column_ptr = columns[i].get(); + rapidjson::Value* json_values = nullptr; + bool wrap_explicitly = false; + if (LIKELY(i < _parsed_jsonpaths.size())) { + json_values = JsonFunctions::get_json_array_from_parsed_json( + _parsed_jsonpaths[i], &objectValue, _origin_json_doc.GetAllocator(), + &wrap_explicitly); + } + + if (json_values == nullptr) { + // not match in jsondata. + if (!slot_descs[i]->is_nullable()) { + RETURN_IF_ERROR(_append_error_msg( + objectValue, + "The column `{}` is not nullable, but it's not found in jsondata.", + slot_descs[i]->col_name(), valid)); + return Status::OK(); + } + } else { + CHECK(json_values->IsArray()); + if (json_values->Size() == 1 && wrap_explicitly) { + // NOTICE1: JsonFunctions::get_json_array_from_parsed_json() will wrap the single json object with an array. + // so here we unwrap the array to get the real element. + // if json_values' size > 1, it means we just match an array, not a wrapped one, so no need to unwrap. + json_values = &((*json_values)[0]); + } + RETURN_IF_ERROR(_write_data_to_column(json_values, slot_descs[i], column_ptr, valid)); + if (!(*valid)) { + return Status::OK(); + } + has_valid_value = true; + } + } + if (!has_valid_value) { + RETURN_IF_ERROR(_append_error_msg( + objectValue, "All fields is null or not matched, this is a invalid row.", "", + valid)); + return Status::OK(); + } + ctx_idx = 0; + for (auto slot_desc : slot_descs) { + if (!slot_desc->is_materialized()) { + continue; + } + int dest_index = ctx_idx++; + auto* column_ptr = columns[dest_index].get(); + if (column_ptr->size() < cur_row_count + 1) { + DCHECK(column_ptr->size() == cur_row_count); + column_ptr->assume_mutable()->insert_default(); + } + DCHECK(column_ptr->size() == cur_row_count + 1); + } + return Status::OK(); +} + +Status NewJsonReader::_append_error_msg(const rapidjson::Value& objectValue, std::string error_msg, + std::string col_name, bool* valid) { + std::string err_msg; + if (!col_name.empty()) { + fmt::memory_buffer error_buf; + fmt::format_to(error_buf, error_msg, col_name); + err_msg = fmt::to_string(error_buf); + } else { + err_msg = error_msg; + } + + RETURN_IF_ERROR(_state->append_error_msg_to_file( + [&]() -> std::string { return NewJsonReader::_print_json_value(objectValue); }, + [&]() -> std::string { return err_msg; }, _scanner_eof)); + + // TODO(ftw): check here? + if (*_scanner_eof == true) { + _reader_eof = true; + } + + _counter->num_rows_filtered++; + if (valid != nullptr) { + // current row is invalid + *valid = false; + } + return Status::OK(); +} + +std::string NewJsonReader::_print_json_value(const rapidjson::Value& value) { + rapidjson::StringBuffer buffer; + buffer.Clear(); + rapidjson::Writer writer(buffer); + value.Accept(writer); + return std::string(buffer.GetString()); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h new file mode 100644 index 0000000000..aee11535fb --- /dev/null +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "vec/exec/format/generic_reader.h" +namespace doris { + +class FileReader; +struct JsonPath; +class LineReader; +class SlotDescriptor; + +namespace vectorized { + +struct ScannerCounter; + +class NewJsonReader : public GenericReader { +public: + NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + const std::vector& file_slot_descs, bool* scanner_eof); + ~NewJsonReader() override = default; + + Status init_reader(); + Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + Status get_columns(std::unordered_map* name_to_type, + std::unordered_set* missing_cols) override; + +private: + Status _get_range_params(); + Status _open_file_reader(); + Status _open_line_reader(); + Status _parse_jsonpath_and_json_root(); + + Status _read_json_column(std::vector& columns, + const std::vector& slot_descs, bool* is_empty_row, + bool* eof); + + Status _vhandle_simple_json(std::vector& columns, + const std::vector& slot_descs, bool* is_empty_row, + bool* eof); + + Status _vhandle_flat_array_complex_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof); + + Status _vhandle_nested_complex_json(std::vector& columns, + const std::vector& slot_descs, + bool* is_empty_row, bool* eof); + + Status _parse_json(bool* is_empty_row, bool* eof); + Status _parse_json_doc(size_t* size, bool* eof); + + Status _set_column_value(rapidjson::Value& objectValue, std::vector& columns, + const std::vector& slot_descs, bool* valid); + + Status _write_data_to_column(rapidjson::Value::ConstValueIterator value, + SlotDescriptor* slot_desc, vectorized::IColumn* column_ptr, + bool* valid); + + Status _write_columns_by_jsonpath(rapidjson::Value& objectValue, + const std::vector& slot_descs, + std::vector& columns, bool* valid); + + Status _append_error_msg(const rapidjson::Value& objectValue, std::string error_msg, + std::string col_name, bool* valid); + + std::string _print_json_value(const rapidjson::Value& value); + +private: + Status (NewJsonReader::*_vhandle_json_callback)( + std::vector& columns, + const std::vector& slot_descs, bool* is_empty_row, bool* eof); + RuntimeState* _state; + RuntimeProfile* _profile; + ScannerCounter* _counter; + const TFileScanRangeParams& _params; + const TFileRangeDesc& _range; + const std::vector& _file_slot_descs; + + // _file_reader_s is for stream load pipe reader, + // and _file_reader is for other file reader. + // TODO: refactor this to use only shared_ptr or unique_ptr + std::unique_ptr _file_reader; + std::shared_ptr _file_reader_s; + FileReader* _real_file_reader; + std::unique_ptr _line_reader; + bool _reader_eof; + + TFileFormatType::type _file_format_type; + + // When we fetch range doesn't start from 0 will always skip the first line + bool _skip_first_line; + + std::string _line_delimiter; + int _line_delimiter_length; + + int _next_row; + int _total_rows; + + std::string _jsonpaths; + std::string _json_root; + bool _read_json_by_line; + bool _strip_outer_array; + bool _num_as_string; + bool _fuzzy_parse; + + std::vector> _parsed_jsonpaths; + std::vector _parsed_json_root; + + char _value_buffer[4 * 1024 * 1024]; // 4MB + char _parse_buffer[512 * 1024]; // 512KB + + typedef rapidjson::GenericDocument, rapidjson::MemoryPoolAllocator<>, + rapidjson::MemoryPoolAllocator<>> + Document; + rapidjson::MemoryPoolAllocator<> _value_allocator; + rapidjson::MemoryPoolAllocator<> _parse_allocator; + Document _origin_json_doc; // origin json document object from parsed json string + rapidjson::Value* _json_doc; // _json_doc equals _final_json_doc iff not set `json_root` + std::unordered_map _name_map; + + bool* _scanner_eof; + + RuntimeProfile::Counter* _bytes_read_counter; + RuntimeProfile::Counter* _read_timer; + RuntimeProfile::Counter* _file_read_timer; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 3503e8c468..9df3722b95 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -31,6 +31,7 @@ #include "runtime/raw_value.h" #include "runtime/runtime_state.h" #include "vec/exec/format/csv/csv_reader.h" +#include "vec/exec/format/json/new_json_reader.h" #include "vec/exec/format/orc/vorc_reader.h" #include "vec/exec/format/parquet/vparquet_reader.h" #include "vec/exec/scan/new_file_scan_node.h" @@ -494,6 +495,12 @@ Status VFileScanner::_get_next_reader() { init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(_is_load); break; } + case TFileFormatType::FORMAT_JSON: { + _cur_reader.reset(new NewJsonReader(_state, _profile, &_counter, _params, range, + _file_slot_descs, &_scanner_eof)); + init_status = ((NewJsonReader*)(_cur_reader.get()))->init_reader(); + break; + } default: return Status::InternalError("Not supported file format: {}", _params.format_type); } diff --git a/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh b/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh index 1fe80f6e84..0d07bf5e36 100755 --- a/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh +++ b/docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh @@ -27,6 +27,8 @@ echo "hadoop fs -mkdir /user/doris/" hadoop fs -mkdir -p /user/doris/ echo "hadoop fs -put /mnt/scripts/tpch1.db /user/doris/" hadoop fs -put /mnt/scripts/tpch1.db /user/doris/ +echo "hadoop fs -put /mnt/scripts/json_format_test.db /user/doris/" +hadoop fs -put /mnt/scripts/json_format_test /user/doris/ echo "hive -f /mnt/scripts/create.hql" hive -f /mnt/scripts/create.hql diff --git a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json.json b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json.json new file mode 100644 index 0000000000..b5079899fc --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json.json @@ -0,0 +1,2 @@ +[{"id": 1, "city": "beijing", "code": 1454547},{"id": 2, "city": "shanghai", "code": 1244264}, {"id": 3, "city": "guangzhou", "code": 528369},{"id": 4, "city": "shenzhen", "code": 594201},{"id": 5, "city": "hangzhou", "code": 594201}] +[{"id": 6, "city": "nanjing", "code": 2345672},{"id": 7, "city": "wuhan", "code": 2345673}, {"id": 8, "city": "chengdu", "code": 2345674},{"id": 9, "city": "xian", "code": 2345675},{"id": 10, "city": "hefei", "code": 2345676}] \ No newline at end of file diff --git a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_lack_column.json b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_lack_column.json new file mode 100644 index 0000000000..338eaa726f --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_lack_column.json @@ -0,0 +1,2 @@ +[{"id": 1, "code": 1454547},{"city": "shanghai", "id": 2, "code": 1244264}, {"id": 3, "code": 528369},{"id": 4, "code": 594202},{"city": "hangzhou", "id": 5, "code": 594201}] +[{"city": "nanjing", "id": 6, "code": 2345672},{"id": 7, "code": 2345673}, {"id": 8, "code": 2345674},{"city": "xian", "id": 9, "code": 2345675},{"id": 10, "code": 2345676}] diff --git a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_unorder.json b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_unorder.json new file mode 100644 index 0000000000..539d66df8d --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/multi_line_json_unorder.json @@ -0,0 +1,2 @@ +[{"city": "beijing", "id": 1, "code": 1454547},{"city": "shanghai", "id": 2, "code": 1244264}, {"city": "guangzhou", "id": 3, "code": 528369},{"city": "shenzhen", "id": 4, "code": 594202},{"city": "hangzhou", "id": 5, "code": 594201}] +[{"city": "nanjing", "id": 6, "code": 2345672},{"city": "wuhan", "id": 7, "code": 2345673}, {"city": "chengdu", "id": 8, "code": 2345674},{"city": "xian", "id": 9, "code": 2345675},{"city": "hefei", "id": 10, "code": 2345676}] diff --git a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/nest_json.json b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/nest_json.json new file mode 100644 index 0000000000..b28159b2a0 --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/nest_json.json @@ -0,0 +1,5 @@ +{"no": 1, "item": {"id": 1, "city": "beijing", "code": 2345671}} +{"no": 2, "item": {"id": 2, "city": "shanghai", "code": 2345672}} +{"no": 3, "item": {"id": 3, "city": "hangzhou", "code": 2345673}} +{"no": 4, "item": {"id": 4, "city": "shenzhen", "code": 2345674}} +{"no": 5, "item": {"id": 5, "city": "guangzhou", "code": 2345675}} diff --git a/docker/thirdparties/docker-compose/hive/scripts/json_format_test/simple_object_json.json b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/simple_object_json.json new file mode 100644 index 0000000000..a7912466fd --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/json_format_test/simple_object_json.json @@ -0,0 +1,12 @@ +{"id": 1, "city": "beijing", "code": 2345671} +{"id": 2, "city": "shanghai", "code": 2345672} +{"id": 3, "city": "guangzhou", "code": 2345673} +{"id": 4, "city": "shenzhen", "code": 2345674} +{"id": 5, "city": "hangzhou", "code": 2345675} +{"id": 6, "city": "nanjing", "code": 2345676} +{"id": 7, "city": "wuhan", "code": 2345677} +{"id": 8, "city": "chengdu", "code": 2345678} +{"id": 9, "city": "xian", "code": 2345679} +{"id": 10, "city": "hefei", "code": 23456710} +{"id": 10, "city": null, "code": 23456711} +{"id": 10, "city": "hefei", "code": null} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 32aed58211..5854be7970 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -121,7 +121,10 @@ public class DataDescription { private String jsonPaths = ""; private String jsonRoot = ""; private boolean fuzzyParse = false; - private boolean readJsonByLine = false; + // the default must be true. + // So that for broker load, this is always true, + // and for stream load, it will set on demand. + private boolean readJsonByLine = true; private boolean numAsString = false; private String sequenceCol; @@ -616,6 +619,10 @@ public class DataDescription { return !Strings.isNullOrEmpty(srcTableName); } + public boolean isReadJsonByLine() { + return readJsonByLine; + } + /* * Analyze parsedExprMap and columnToHadoopFunction from columns, columns from path and columnMappingList * Example: diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index f62f81ea54..aa341ad283 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -4031,7 +4031,7 @@ public class Env { } } - public void renameColumn(Database db, OlapTable table, String colName, + private void renameColumn(Database db, OlapTable table, String colName, String newColName, boolean isReplay) throws DdlException { if (table.getState() != OlapTableState.NORMAL) { throw new DdlException("Table[" + table.getName() + "] is under " + table.getState()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 2762ca89d3..a89f48e52b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -256,9 +256,9 @@ public class BrokerFileGroup implements Writable { jsonPaths = dataDescription.getJsonPaths(); jsonRoot = dataDescription.getJsonRoot(); fuzzyParse = dataDescription.isFuzzyParse(); - // For broker load, we only support reading json format data line by line, - // so we set readJsonByLine to true here. - readJsonByLine = true; + // ATTN: for broker load, we only support reading json format data line by line, + // so if this is set to false, it must be stream load. + readJsonByLine = dataDescription.isReadJsonByLine(); numAsString = dataDescription.isNumAsString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 5d68f6edef..c47699142b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -42,7 +42,6 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.Util; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.loadv2.LoadTask; @@ -174,10 +173,6 @@ public class StreamLoadPlanner { // create scan node if (Config.enable_new_load_scan_node && Config.enable_vectorized_load) { ExternalFileScanNode fileScanNode = new ExternalFileScanNode(new PlanNodeId(0), scanTupleDesc); - if (!Util.isCsvFormat(taskInfo.getFormatType())) { - throw new AnalysisException( - "New stream load scan load not support non-csv type now: " + taskInfo.getFormatType()); - } // 1. create file group DataDescription dataDescription = new DataDescription(destTable.getName(), taskInfo); dataDescription.analyzeWithoutCheckPriv(db.getFullName()); diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index 00872d275c..6791f7cacd 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -78,3 +78,4 @@ pg_14_port=5442 // See `docker/thirdparties/start-thirdparties-docker.sh` enableHiveTest=false hms_port=9183 +hdfs_port=8120 diff --git a/regression-test/data/load_p0/broker_load/test_array_load.out b/regression-test/data/load_p0/broker_load/test_array_load.out index baff23e6f2..6c568b6ef1 100644 --- a/regression-test/data/load_p0/broker_load/test_array_load.out +++ b/regression-test/data/load_p0/broker_load/test_array_load.out @@ -39,3 +39,43 @@ 5 \N \N \N \N \N \N \N \N \N \N 100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000] +-- !select -- +1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3] +2 [6, 7, 8, 9, 10] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3] +3 [] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['happy', 'birthday'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3] +4 [NULL] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3] +5 [NULL, NULL] [32767, 32768, NULL] [65534, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3] +100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4, 5.5, 6.67] + +-- !select -- +1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000] +2 [6, 7, 8, 9, 10] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000] +3 [] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['happy', 'birthday'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000] +4 [NULL] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000] +5 [NULL, NULL] [32767, 32768, NULL] [65534, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000] +100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000] + +-- !select -- +1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3] +2 [6, 7, 8, 9, 10] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3] +3 [] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['happy', 'birthday'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3] +4 [NULL] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3] +5 [NULL, NULL] [32767, 32768, NULL] [65534, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3] +100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4, 5.5, 6.67] + +-- !select -- +1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000] +2 [6, 7, 8, 9, 10] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000] +3 [] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['happy', 'birthday'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000] +4 [NULL] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000] +5 [NULL, NULL] [32767, 32768, NULL] [65534, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000] +100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000] + +-- !select -- +1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01, 1992-02-02, 1993-03-03] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000] +2 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01, 1992-02-02, 1993-03-03] \N \N \N [1.000000, NULL, 1.300000] +3 \N \N \N \N \N \N \N \N \N \N +4 \N \N \N \N \N \N \N \N \N \N +5 \N \N \N \N \N \N \N \N \N \N +100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000] + diff --git a/regression-test/data/load_p0/stream_load/load_json_null_to_nullable.out b/regression-test/data/load_p0/stream_load/load_json_null_to_nullable.out index d564b5b99a..8bbc4be012 100644 --- a/regression-test/data/load_p0/stream_load/load_json_null_to_nullable.out +++ b/regression-test/data/load_p0/stream_load/load_json_null_to_nullable.out @@ -11,3 +11,15 @@ h h H H h h +-- !select -- +\N \N + +H H +h h + +-- !select -- +\N \N + +H H +h h + diff --git a/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out b/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out index 43037b624d..4918c59dd8 100644 --- a/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out +++ b/regression-test/data/load_p0/stream_load/load_json_with_jsonpath.out @@ -9,3 +9,13 @@ 1000 7395.231067 2000 \N +-- !select -- +22 \N +1000 7395.231067 +2000 \N + +-- !select -- +22 \N +1000 7395.231067 +2000 \N + diff --git a/regression-test/data/load_p0/stream_load/nest_json.json b/regression-test/data/load_p0/stream_load/nest_json.json index 90647c490b..b28159b2a0 100644 --- a/regression-test/data/load_p0/stream_load/nest_json.json +++ b/regression-test/data/load_p0/stream_load/nest_json.json @@ -1,2 +1,5 @@ {"no": 1, "item": {"id": 1, "city": "beijing", "code": 2345671}} {"no": 2, "item": {"id": 2, "city": "shanghai", "code": 2345672}} +{"no": 3, "item": {"id": 3, "city": "hangzhou", "code": 2345673}} +{"no": 4, "item": {"id": 4, "city": "shenzhen", "code": 2345674}} +{"no": 5, "item": {"id": 5, "city": "guangzhou", "code": 2345675}} diff --git a/regression-test/data/load_p0/stream_load/nest_json_array.json b/regression-test/data/load_p0/stream_load/nest_json_array.json new file mode 100644 index 0000000000..b8b3cf917d --- /dev/null +++ b/regression-test/data/load_p0/stream_load/nest_json_array.json @@ -0,0 +1,74 @@ +[ + { + "no": 1, + "item": { + "id": 1, + "city": [ + "zhejiang", + "hangzhou", + "xihu" + ], + "code": 2345671 + } + }, + { + "no": 2, + "item": { + "id": 2, + "city": [ + "zhejiang", + "hangzhou", + "xiaoshan" + ], + "code": 2345672 + } + }, + { + "no": 3, + "item": { + "id": 3, + "city": [ + "zhejiang", + "hangzhou", + "binjiang" + ], + "code": 2345673 + } + }, + { + "no": 4, + "item": { + "id": 4, + "city": [ + "zhejiang", + "hangzhou", + "shangcheng" + ], + "code": 2345674 + } + }, + { + "no": 5, + "item": { + "id": 5, + "city": [ + "zhejiang", + "hangzhou", + "tonglu" + ], + "code": 2345675 + } + }, + { + "no": 6, + "item": { + "id": 6, + "city": [ + "zhejiang", + "hangzhou", + "fuyang" + ], + "code": 2345676 + } + } +] \ No newline at end of file diff --git a/regression-test/data/load_p0/stream_load/simple_json2.json b/regression-test/data/load_p0/stream_load/simple_json2.json new file mode 100644 index 0000000000..eb698453de --- /dev/null +++ b/regression-test/data/load_p0/stream_load/simple_json2.json @@ -0,0 +1,52 @@ +[ + { + "code": 2345671, + "id": 1, + "city": "beijing" + }, + { + "code": 2345672, + "id": 2, + "city": "shanghai" + }, + { + "code": 2345673, + "id": 3, + "city": "guangzhou" + }, + { + "code": 2345674, + "id": 4, + "city": "shenzhen" + }, + { + "code": 2345675, + "id": 5, + "city": "hangzhou" + }, + { + "code": 2345676, + "id": 6, + "city": "nanjing" + }, + { + "code": 2345677, + "id": 7, + "city": "wuhan" + }, + { + "code": 2345678, + "id": 8, + "city": "chengdu" + }, + { + "code": 2345679, + "id": 9, + "city": "xian" + }, + { + "code": 23456710, + "id": 10, + "city": "hefei" + } +] \ No newline at end of file diff --git a/regression-test/data/load_p0/stream_load/simple_json2_lack_one_column.json b/regression-test/data/load_p0/stream_load/simple_json2_lack_one_column.json new file mode 100644 index 0000000000..7b6b4ad800 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/simple_json2_lack_one_column.json @@ -0,0 +1,48 @@ +[ + { + "code": 2345671, + "id": 1 + }, + { + "code": 2345672, + "id": 2, + "city": "shanghai" + }, + { + "code": 2345673, + "id": 3, + "city": "beijing" + }, + { + "code": 2345674, + "id": 4, + "city": "shenzhen" + }, + { + "code": 2345675, + "id": 5, + "city": "hangzhou" + }, + { + "code": 2345676, + "id": 6, + "city": "nanjing" + }, + { + "code": 2345677, + "id": 7 + }, + { + "code": 2345678, + "id": 8, + "city": "chengdu" + }, + { + "code": 2345679, + "id": 9 + }, + { + "code": 23456710, + "id": 10 + } +] \ No newline at end of file diff --git a/regression-test/data/load_p0/stream_load/test_hdfs_json_load.out b/regression-test/data/load_p0/stream_load/test_hdfs_json_load.out new file mode 100644 index 0000000000..594d2ec60a --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_hdfs_json_load.out @@ -0,0 +1,305 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +1 beijing 2345671 +2 shanghai 2345672 +3 guangzhou 2345673 +4 shenzhen 2345674 +5 hangzhou 2345675 +6 nanjing 2345676 +7 wuhan 2345677 +8 chengdu 2345678 +9 xian 2345679 +10 \N 23456711 +10 hefei 23456710 +200 changsha 3456789 + +-- !select1 -- +1 beijing 2345671 +2 shanghai 2345672 +3 guangzhou 2345673 +4 shenzhen 2345674 +5 hangzhou 2345675 +6 nanjing 2345676 +7 wuhan 2345677 +8 chengdu 2345678 +9 xian 2345679 +10 \N 23456711 +10 hefei 23456710 +200 changsha 3456789 + +-- !select2 -- +10 beijing 2345671 +20 shanghai 2345672 +30 guangzhou 2345673 +40 shenzhen 2345674 +50 hangzhou 2345675 +60 nanjing 2345676 +70 wuhan 2345677 +80 chengdu 2345678 +90 xian 2345679 +100 \N 23456711 +100 hefei 23456710 +200 changsha 3456789 + +-- !select2 -- +10 beijing 2345671 +20 shanghai 2345672 +30 guangzhou 2345673 +40 shenzhen 2345674 +50 hangzhou 2345675 +60 nanjing 2345676 +70 wuhan 2345677 +80 chengdu 2345678 +90 xian 2345679 +100 \N 23456711 +100 hefei 23456710 +200 changsha 3456789 + +-- !select3 -- +1 2345671 \N +2 2345672 \N +3 2345673 \N +4 2345674 \N +5 2345675 \N +6 2345676 \N +7 2345677 \N +8 2345678 \N +9 2345679 \N +10 \N \N +10 23456710 \N +10 23456711 \N +200 changsha 3456789 + +-- !select3 -- +1 2345671 \N +2 2345672 \N +3 2345673 \N +4 2345674 \N +5 2345675 \N +6 2345676 \N +7 2345677 \N +8 2345678 \N +9 2345679 \N +10 \N \N +10 23456710 \N +10 23456711 \N +200 changsha 3456789 + +-- !select4 -- +1 \N 210 +2 \N 220 +3 \N 230 +4 \N 240 +5 \N 250 +6 \N 260 +7 \N 270 +8 \N 280 +9 \N 290 +10 \N 900 +200 changsha 3456789 + +-- !select4 -- +1 \N 210 +2 \N 220 +3 \N 230 +4 \N 240 +5 \N 250 +6 \N 260 +7 \N 270 +8 \N 280 +9 \N 290 +10 \N 900 +200 changsha 3456789 + +-- !select5 -- +1 beijing 1454547 +2 shanghai 1244264 +3 guangzhou 528369 +4 shenzhen 594201 +5 hangzhou 594201 +6 nanjing 2345672 +7 wuhan 2345673 +8 chengdu 2345674 +9 xian 2345675 +10 hefei 2345676 +200 changsha 3456789 + +-- !select5 -- +1 beijing 1454547 +2 shanghai 1244264 +3 guangzhou 528369 +4 shenzhen 594201 +5 hangzhou 594201 +6 nanjing 2345672 +7 wuhan 2345673 +8 chengdu 2345674 +9 xian 2345675 +10 hefei 2345676 +200 changsha 3456789 + +-- !select6 -- +10 1454547 \N +20 1244264 \N +30 528369 \N +40 594201 \N +50 594201 \N +60 2345672 \N +70 2345673 \N +80 2345674 \N +90 2345675 \N +100 2345676 \N +200 changsha 3456789 + +-- !select6 -- +10 1454547 \N +20 1244264 \N +30 528369 \N +40 594201 \N +50 594201 \N +60 2345672 \N +70 2345673 \N +80 2345674 \N +90 2345675 \N +100 2345676 \N +200 changsha 3456789 + +-- !select7 -- +60 2345672 \N +70 2345673 \N +80 2345674 \N +90 2345675 \N +100 2345676 \N +200 changsha 3456789 + +-- !select7 -- +60 2345672 \N +70 2345673 \N +80 2345674 \N +90 2345675 \N +100 2345676 \N +200 changsha 3456789 + +-- !select8 -- +60 nanjing \N +70 wuhan \N +80 chengdu \N +90 xian \N +100 hefei \N +200 changsha 3456789 + +-- !select8 -- +60 nanjing \N +70 wuhan \N +80 chengdu \N +90 xian \N +100 hefei \N +200 changsha 3456789 + +-- !select9 -- +10 beijing 2345671 +20 shanghai 2345672 +30 hangzhou 2345673 +40 shenzhen 2345674 +50 guangzhou 2345675 +200 changsha 3456789 + +-- !select9 -- +10 beijing 2345671 +20 shanghai 2345672 +30 hangzhou 2345673 +40 shenzhen 2345674 +50 guangzhou 2345675 +200 changsha 3456789 + +-- !select10 -- +1 beijing 1454547 +2 shanghai 1244264 +3 guangzhou 528369 +4 shenzhen 594202 +5 hangzhou 594201 +6 nanjing 2345672 +7 wuhan 2345673 +8 chengdu 2345674 +9 xian 2345675 +10 hefei 2345676 +200 changsha 3456789 + +-- !select10 -- +1 beijing 1454547 +2 shanghai 1244264 +3 guangzhou 528369 +4 shenzhen 594202 +5 hangzhou 594201 +6 nanjing 2345672 +7 wuhan 2345673 +8 chengdu 2345674 +9 xian 2345675 +10 hefei 2345676 +200 changsha 3456789 + +-- !select11 -- +1 \N 1454547 +2 shanghai 1244264 +3 \N 528369 +4 \N 594202 +5 hangzhou 594201 +6 nanjing 2345672 +7 \N 2345673 +8 \N 2345674 +9 xian 2345675 +10 \N 2345676 +200 changsha 3456789 + +-- !select11 -- +1 \N 1454547 +2 shanghai 1244264 +3 \N 528369 +4 \N 594202 +5 hangzhou 594201 +6 nanjing 2345672 +7 \N 2345673 +8 \N 2345674 +9 xian 2345675 +10 \N 2345676 +200 changsha 3456789 + +-- !select12 -- +10 beijing \N +20 shanghai \N +30 hangzhou \N +40 shenzhen \N +50 guangzhou \N +200 changsha 3456789 + +-- !select12 -- +10 beijing \N +20 shanghai \N +30 hangzhou \N +40 shenzhen \N +50 guangzhou \N +200 changsha 3456789 + +-- !select13 -- +30 hangzhou \N +40 shenzhen \N +50 guangzhou \N +200 changsha 3456789 + +-- !select13 -- +30 hangzhou \N +40 shenzhen \N +50 guangzhou \N +200 changsha 3456789 + +-- !select14 -- +30 hangzhou 2345673 +40 shenzhen 2345674 +50 guangzhou 2345675 +200 changsha 3456789 + +-- !select14 -- +30 hangzhou 2345673 +40 shenzhen 2345674 +50 guangzhou 2345675 +200 changsha 3456789 + diff --git a/regression-test/data/load_p0/stream_load/test_json_load.out b/regression-test/data/load_p0/stream_load/test_json_load.out index b6df264df1..ebf706f594 100644 --- a/regression-test/data/load_p0/stream_load/test_json_load.out +++ b/regression-test/data/load_p0/stream_load/test_json_load.out @@ -1,5 +1,5 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !select -- +-- !select1 -- 1 beijing 2345671 2 shanghai 2345672 3 guangzhou 2345673 @@ -12,7 +12,20 @@ 10 hefei 23456710 200 changsha 3456789 --- !select -- +-- !select1 -- +1 beijing 2345671 +2 shanghai 2345672 +3 guangzhou 2345673 +4 shenzhen 2345674 +5 hangzhou 2345675 +6 nanjing 2345676 +7 wuhan 2345677 +8 chengdu 2345678 +9 xian 2345679 +10 hefei 23456710 +200 changsha 3456789 + +-- !select2 -- 10 beijing 2345671 20 shanghai 2345672 30 guangzhou 2345673 @@ -25,7 +38,20 @@ 100 hefei 23456710 200 changsha 3456789 --- !select -- +-- !select2 -- +10 beijing 2345671 +20 shanghai 2345672 +30 guangzhou 2345673 +40 shenzhen 2345674 +50 hangzhou 2345675 +60 nanjing 2345676 +70 wuhan 2345677 +80 chengdu 2345678 +90 xian 2345679 +100 hefei 23456710 +200 changsha 3456789 + +-- !select3 -- 1 2345671 2 2345672 3 2345673 @@ -38,7 +64,20 @@ 10 23456710 200 755 --- !select -- +-- !select3 -- +1 2345671 +2 2345672 +3 2345673 +4 2345674 +5 2345675 +6 2345676 +7 2345677 +8 2345678 +9 2345679 +10 23456710 +200 755 + +-- !select4 -- 1 210 2 220 3 230 @@ -51,7 +90,20 @@ 10 300 200 755 --- !select -- +-- !select4 -- +1 210 +2 220 +3 230 +4 240 +5 250 +6 260 +7 270 +8 280 +9 290 +10 300 +200 755 + +-- !select5 -- 1 1454547 2 1244264 3 528369 @@ -64,7 +116,20 @@ 10 2345676 200 755 --- !select -- +-- !select5 -- +1 1454547 +2 1244264 +3 528369 +4 594201 +5 594201 +6 2345672 +7 2345673 +8 2345674 +9 2345675 +10 2345676 +200 755 + +-- !select6 -- 10 1454547 20 1244264 30 528369 @@ -77,7 +142,12 @@ 100 2345676 200 755 --- !select -- +-- !select6 -- +10 1454547 +20 1244264 +30 528369 +40 594201 +50 594201 60 2345672 70 2345673 80 2345674 @@ -85,7 +155,7 @@ 100 2345676 200 755 --- !select -- +-- !select7 -- 60 2345672 70 2345673 80 2345674 @@ -93,15 +163,54 @@ 100 2345676 200 755 --- !select -- +-- !select7 -- +60 2345672 +70 2345673 +80 2345674 +90 2345675 +100 2345676 +200 755 + +-- !select8 -- +60 2345672 +70 2345673 +80 2345674 +90 2345675 +100 2345676 +200 755 + +-- !select8 -- +60 2345672 +70 2345673 +80 2345674 +90 2345675 +100 2345676 +200 755 + +-- !select9 -- 10 beijing 2345671 20 shanghai 2345672 +30 hangzhou 2345673 +40 shenzhen 2345674 +50 guangzhou 2345675 200 changsha 3456789 --- !select -- +-- !select9 -- +10 beijing 2345671 +20 shanghai 2345672 +30 hangzhou 2345673 +40 shenzhen 2345674 +50 guangzhou 2345675 200 changsha 3456789 --- !select -- +-- !select10 -- +200 changsha 3456789 + +-- !select10 -- +200 changsha 3456789 + +-- !select11 -- +1 beijing 2345671 2 shanghai 2345672 3 guangzhou 2345673 4 shenzhen 2345674 @@ -110,12 +219,113 @@ 7 wuhan 2345677 8 chengdu 2345678 9 xian 2345679 +10 hefei 23456710 200 changsha 3456789 --- !select -- +-- !select11 -- +1 beijing 2345671 2 shanghai 2345672 3 guangzhou 2345673 4 shenzhen 2345674 +5 hangzhou 2345675 +6 nanjing 2345676 +7 wuhan 2345677 +8 chengdu 2345678 +9 xian 2345679 +10 hefei 23456710 +200 changsha 3456789 + +-- !select12 -- +1 \N 2345671 +2 shanghai 2345672 +3 beijing 2345673 +4 shenzhen 2345674 +5 hangzhou 2345675 +6 nanjing 2345676 +7 \N 2345677 +8 chengdu 2345678 +9 \N 2345679 +10 \N 23456710 +200 changsha 3456789 + +-- !select12 -- +1 \N 2345671 +2 shanghai 2345672 +3 beijing 2345673 +4 shenzhen 2345674 +5 hangzhou 2345675 +6 nanjing 2345676 +7 \N 2345677 +8 chengdu 2345678 +9 \N 2345679 +10 \N 23456710 +200 changsha 3456789 + +-- !select13 -- +2 shanghai 2345672 +3 beijing 2345673 +4 shenzhen 2345674 +5 hangzhou 2345675 +6 nanjing 2345676 +8 chengdu 2345678 +200 hangzhou 12345 + +-- !select13 -- +2 shanghai 2345672 +3 beijing 2345673 +4 shenzhen 2345674 +5 hangzhou 2345675 +6 nanjing 2345676 +8 chengdu 2345678 +200 hangzhou 12345 + +-- !select14 -- +10 2345671 \N +20 2345672 \N +30 2345673 \N +40 2345674 \N +50 2345675 \N +200 changsha 3456789 + +-- !select14 -- +10 2345671 \N +20 2345672 \N +30 2345673 \N +40 2345674 \N +50 2345675 \N +200 changsha 3456789 + +-- !select15 -- +10 beijing 2345671 +20 shanghai 2345672 +30 hangzhou 2345673 +40 shenzhen 2345674 +50 guangzhou 2345675 +200 changsha 3456789 + +-- !select15 -- +10 beijing 2345671 +20 shanghai 2345672 +30 hangzhou 2345673 +40 shenzhen 2345674 +50 guangzhou 2345675 +200 changsha 3456789 + +-- !select16 -- +1 xihu 2345671 +2 xiaoshan 2345672 +3 binjiang 2345673 +4 shangcheng 2345674 +5 tonglu 2345675 +6 fuyang 2345676 +200 changsha 3456789 + +-- !select16 -- +1 xihu 2345671 +2 xiaoshan 2345672 +3 binjiang 2345673 +4 shangcheng 2345674 +5 tonglu 2345675 +6 fuyang 2345676 200 changsha 3456789 --- !select -- diff --git a/regression-test/suites/load_p0/broker_load/test_array_load.groovy b/regression-test/suites/load_p0/broker_load/test_array_load.groovy index 5581a4928e..b462d3c00e 100644 --- a/regression-test/suites/load_p0/broker_load/test_array_load.groovy +++ b/regression-test/suites/load_p0/broker_load/test_array_load.groovy @@ -194,85 +194,95 @@ suite("test_array_load", "p0") { } } - // case1: import array data in json format and enable vectorized engine - try { - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table.call(testTable, true) - - load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'simple_array.json') - - // select the table and check whether the data is correct + def check_data_correct = {table_name -> sql "sync" - qt_select "select * from ${testTable} order by k1" - - } finally { - try_sql("DROP TABLE IF EXISTS ${testTable}") + // select the table and check whether the data is correct + qt_select "select * from ${table_name} order by k1" } - // case2: import array data in json format and disable vectorized engine try { - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table.call(testTable, false) + for ( i in 0..1 ) { + // should be deleted after new_load_scan is ready + if (i == 1) { + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");""" + } else { + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "true");""" + } - load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'simple_array.json') - - // select the table and check whether the data is correct - sql "sync" - qt_select "select * from ${testTable} order by k1" + // case1: import array data in json format and enable vectorized engine + try { + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table.call(testTable, true) + load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'simple_array.json') + + check_data_correct(testTable) + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + + // case2: import array data in json format and disable vectorized engine + try { + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table.call(testTable, false) + + load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'simple_array.json') + + check_data_correct(testTable) + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + + // case3: import array data in csv format and enable vectorized engine + try { + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table.call(testTable, true) + + load_array_data.call(testTable, 'true', '', 'csv', '', '', '', '', '', '/', 'simple_array.csv') + + check_data_correct(testTable) + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + + // case4: import array data in csv format and disable vectorized engine + try { + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table.call(testTable, false) + + load_array_data.call(testTable, 'true', '', 'csv', '', '', '', '', '', '/', 'simple_array.csv') + + check_data_correct(testTable) + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + + // case5: import array data not specify the format + try { + sql "DROP TABLE IF EXISTS ${testTable01}" + + create_test_table01.call(testTable01) + + load_array_data.call(testTable01, '', '', '', '', '', '', '', '', '/', 'simple_array.data') + + check_data_correct(testTable01) + + } finally { + // try_sql("DROP TABLE IF EXISTS ${testTable01}") + } + } } finally { - try_sql("DROP TABLE IF EXISTS ${testTable}") - } - - // case3: import array data in csv format and enable vectorized engine - try { - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table.call(testTable, true) - - load_array_data.call(testTable, 'true', '', 'csv', '', '', '', '', '', '/', 'simple_array.csv') - - // select the table and check whether the data is correct - sql "sync" - qt_select "select * from ${testTable} order by k1" - - } finally { - try_sql("DROP TABLE IF EXISTS ${testTable}") + try_sql("""ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");""") } - // case4: import array data in csv format and disable vectorized engine - try { - sql "DROP TABLE IF EXISTS ${testTable}" - - create_test_table.call(testTable, false) - - load_array_data.call(testTable, 'true', '', 'csv', '', '', '', '', '', '/', 'simple_array.csv') - - // select the table and check whether the data is correct - sql "sync" - qt_select "select * from ${testTable} order by k1" - - } finally { - try_sql("DROP TABLE IF EXISTS ${testTable}") - } - - // case5: import array data not specify the format - try { - sql "DROP TABLE IF EXISTS ${testTable01}" - - create_test_table01.call(testTable01) - - load_array_data.call(testTable01, '', '', '', '', '', '', '', '', '/', 'simple_array.data') - - // select the table and check whether the data is correct - sql "sync" - qt_select "select * from ${testTable01} order by k1" - - } finally { - // try_sql("DROP TABLE IF EXISTS ${testTable01}") - } // if 'enableHdfs' in regression-conf.groovy has been set to true, // the test will run these case as below. diff --git a/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy b/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy index f3edfd225c..760af3344e 100644 --- a/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy +++ b/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy @@ -49,8 +49,11 @@ suite("test_load_json_column_exclude_schema_without_jsonpath", "p0") { """ } - def load_array_data = {table_name, strip_flag, read_flag, format_flag, exprs, json_paths, + def load_array_data = {new_json_reader_flag, table_name, strip_flag, read_flag, format_flag, exprs, json_paths, json_root, where_expr, fuzzy_flag, column_sep, file_name -> + // should be deleted after new_load_scan is ready + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");""" + // load the json data streamLoad { table table_name @@ -91,7 +94,12 @@ suite("test_load_json_column_exclude_schema_without_jsonpath", "p0") { create_test_table.call(true) - load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'json_column_match.json') + load_array_data.call('false', testTable, 'true', '', 'json', '', '', '', '', '', '', 'json_column_match.json') + + // test new json load, should be deleted after new_load_scan ready + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table.call(true) + load_array_data.call('true', testTable, 'true', '', 'json', '', '', '', '', '', '', 'json_column_match.json') } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") @@ -103,7 +111,12 @@ suite("test_load_json_column_exclude_schema_without_jsonpath", "p0") { create_test_table.call(false) - load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'json_column_match.json') + load_array_data.call('false', testTable, 'true', '', 'json', '', '', '', '', '', '', 'json_column_match.json') + + // test new json load, should be deleted after new_load_scan ready + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table.call(false) + load_array_data.call('true', testTable, 'true', '', 'json', '', '', '', '', '', '', 'json_column_match.json') } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") diff --git a/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy b/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy index 4dfc5a9fcb..f934c038a2 100644 --- a/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy +++ b/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy @@ -40,8 +40,11 @@ suite("test_load_json_null_to_nullable", "p0") { """ } - def load_array_data = {table_name, strip_flag, read_flag, format_flag, exprs, json_paths, + def load_array_data = {new_json_reader_flag, table_name, strip_flag, read_flag, format_flag, exprs, json_paths, json_root, where_expr, fuzzy_flag, column_sep, file_name -> + // should be deleted after new_load_scan is ready + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");""" + // load the json data streamLoad { table table_name @@ -74,6 +77,15 @@ suite("test_load_json_null_to_nullable", "p0") { assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) } } + + // should be deleted after new_load_scan is ready + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");""" + } + + def check_data_correct = {table_name -> + sql "sync" + // select the table and check whether the data is correct + qt_select "select * from ${table_name} order by k1" } // case1: import array data in json format and enable vectorized engine @@ -82,11 +94,15 @@ suite("test_load_json_null_to_nullable", "p0") { create_test_table.call(true) - load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'test_char.json') + load_array_data.call('false', testTable, 'true', '', 'json', '', '', '', '', '', '', 'test_char.json') - sql "sync" - // select the table and check whether the data is correct - qt_select "select * from ${testTable} order by k1" + check_data_correct(testTable) + + // test new json load, should be deleted after new_load_scan ready + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table.call(true) + load_array_data.call('true', testTable, 'true', '', 'json', '', '', '', '', '', '', 'test_char.json') + check_data_correct(testTable) } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") @@ -98,11 +114,15 @@ suite("test_load_json_null_to_nullable", "p0") { create_test_table.call(false) - load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'test_char.json') + load_array_data.call('false', testTable, 'true', '', 'json', '', '', '', '', '', '', 'test_char.json') - sql "sync" - // select the table and check whether the data is correct - qt_select "select * from ${testTable} order by k1" + check_data_correct(testTable) + + // test new json load, should be deleted after new_load_scan ready + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table.call(false) + load_array_data.call('true', testTable, 'true', '', 'json', '', '', '', '', '', '', 'test_char.json') + check_data_correct(testTable) } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") diff --git a/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy b/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy index b5fc9ea096..02ffd808e2 100644 --- a/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy +++ b/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy @@ -40,8 +40,12 @@ suite("test_load_json_with_jsonpath", "p0") { """ } - def load_array_data = {table_name, strip_flag, read_flag, format_flag, exprs, json_paths, + def load_array_data = {new_json_reader_flag, table_name, strip_flag, read_flag, format_flag, exprs, json_paths, json_root, where_expr, fuzzy_flag, column_sep, file_name -> + + // should be deleted after new_load_scan is ready + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");""" + // load the json data streamLoad { table table_name @@ -74,6 +78,15 @@ suite("test_load_json_with_jsonpath", "p0") { assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) } } + + // should be deleted after new_load_scan is ready + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");""" + } + + def check_data_correct = {table_name -> + sql "sync" + // select the table and check whether the data is correct + qt_select "select * from ${table_name} order by k1" } // case1: import array data in json format and enable vectorized engine @@ -82,11 +95,15 @@ suite("test_load_json_with_jsonpath", "p0") { create_test_table.call(true) - load_array_data.call(testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json') + load_array_data.call('false', testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json') - // select the table and check whether the data is correct - sql "sync" - qt_select "select * from ${testTable} order by k1" + check_data_correct(testTable) + + // test new json load, should be deleted after new_load_scan ready + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table.call(true) + load_array_data.call('true', testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json') + check_data_correct(testTable) } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") @@ -98,12 +115,19 @@ suite("test_load_json_with_jsonpath", "p0") { create_test_table.call(false) - load_array_data.call(testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json') + load_array_data.call('false', testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json') sql "sync" // select the table and check whether the data is correct qt_select "select * from ${testTable} order by k1" + + // test new json load, should be deleted after new_load_scan ready + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table.call(false) + load_array_data.call('true', testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json') + check_data_correct(testTable) + } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") } diff --git a/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy b/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy new file mode 100644 index 0000000000..4c7a1c162b --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy @@ -0,0 +1,554 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hdfs_json_load", "p0") { + // define a sql table + def testTable = "test_hdfs_json_load" + + def create_test_table1 = {testTablex -> + // multi-line sql + def result1 = sql """ + CREATE TABLE IF NOT EXISTS ${testTablex} ( + id INT DEFAULT '10', + city VARCHAR(32) DEFAULT '', + code BIGINT SUM DEFAULT '0') + DISTRIBUTED BY HASH(id) BUCKETS 10 + PROPERTIES("replication_num" = "1"); + """ + + // DDL/DML return 1 row and 3 column, the only value is update row count + assertTrue(result1.size() == 1) + assertTrue(result1[0].size() == 1) + assertTrue(result1[0][0] == 0, "Create table should update 0 rows") + + // insert 1 row to check whether the table is ok + def result2 = sql "INSERT INTO ${testTablex} (id, city, code) VALUES (200, 'changsha', 3456789)" + assertTrue(result2.size() == 1) + assertTrue(result2[0].size() == 1) + assertTrue(result2[0][0] == 1, "Insert should update 1 rows") + } + + def load_from_hdfs1 = {new_json_reader_flag, strip_flag, fuzzy_flag, testTablex, label, fileName, + fsPath, hdfsUser, exprs, jsonpaths, json_root, columns_parameter, where -> + // should be delete after new_load_scan is ready + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");""" + + def hdfsFilePath = "${fsPath}/user/doris/json_format_test/${fileName}" + def result1= sql """ + LOAD LABEL ${label} ( + DATA INFILE("${hdfsFilePath}") + INTO TABLE ${testTablex} + FORMAT as "json" + ${columns_parameter} + ${exprs} + ${where} + properties( + "json_root" = "${json_root}", + "jsonpaths" = "${jsonpaths}", + "strip_outer_array" = "${strip_flag}", + "fuzzy_parse" = "${fuzzy_flag}" + ) + ) + with HDFS ( + "fs.defaultFS"="${fsPath}", + "hadoop.username" = "${hdfsUser}" + ) + PROPERTIES ( + "timeout"="1200", + "max_filter_ratio"="0" + ); + """ + + assertTrue(result1.size() == 1) + assertTrue(result1[0].size() == 1) + assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected") + + // should be delete after new_load_scan is ready + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");""" + } + + def check_load_result = {checklabel, testTablex -> + max_try_milli_secs = 10000 + while(max_try_milli_secs) { + result = sql "show load where label = '${checklabel}'" + if(result[0][2] == "FINISHED") { + log.info("LOAD FINISHED!") + break + } else { + sleep(1000) // wait 1 second every time + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + log.info("Broker load result: ${result}".toString()) + assertEquals(1, 2) + } + } + } + } + + + + String hdfs_port = context.config.otherConfigs.get("hdfs_port") + def fsPath = "hdfs://127.0.0.1:${hdfs_port}" + // It's okay to use random `hdfsUser`, but can not be empty. + def hdfsUser = "doris" + + + // case1: import simple json + def q1 = { + try { + def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + load_from_hdfs1.call("false", "false", "false", testTable, test_load_label1, "simple_object_json.json", + fsPath, hdfsUser, '', '', '', '', '') + + check_load_result(test_load_label1, testTable) + sql "sync" + qt_select1 "select * from ${testTable} order by id" + + // test new json reader + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table1.call(testTable) + load_from_hdfs1.call("true", "false", "false", testTable, test_load_label2, "simple_object_json.json", + fsPath, hdfsUser, '', '', '', '', '') + + check_load_result(test_load_label2, testTable) + sql "sync" + qt_select1 "select * from ${testTable} order by id" + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } + + // case2: import json and apply exprs + def q2 = { + try { + def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + load_from_hdfs1.call("false", "false", "false", testTable, test_load_label1, "simple_object_json.json", + fsPath, hdfsUser, "SET(id= id * 10)", '', '', '', '') + + check_load_result(test_load_label1, testTable) + sql "sync" + qt_select2 "select * from ${testTable} order by id" + + // test new json reader + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table1.call(testTable) + load_from_hdfs1.call("true", "false", "false", testTable, test_load_label2, "simple_object_json.json", + fsPath, hdfsUser, "SET(id= id * 10)", '', '', '', '') + + check_load_result(test_load_label2, testTable) + sql "sync" + qt_select2 "select * from ${testTable} order by id" + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } + + // case3: import json and apply jsonpaths + def q3 = { + try { + def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + load_from_hdfs1.call("false", "false", "false", testTable, test_load_label1, "simple_object_json.json", + fsPath, hdfsUser, '', """[\\"\$.id\\", \\"\$.code\\"]""", '', '', '') + + check_load_result(test_load_label1, testTable) + sql "sync" + qt_select3 "select * from ${testTable} order by id" + + // test new json reader + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table1.call(testTable) + load_from_hdfs1.call("true", "false", "false", testTable, test_load_label2, "simple_object_json.json", + fsPath, hdfsUser, '', """[\\"\$.id\\", \\"\$.code\\"]""", '', '', '') + + check_load_result(test_load_label2, testTable) + sql "sync" + qt_select3 "select * from ${testTable} order by id" + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } + + // case4: import json and apply jsonpaths & exprs + def q4 = { + try { + def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + load_from_hdfs1.call("false", "false", "false", testTable, test_load_label1, "simple_object_json.json", + fsPath, hdfsUser, "SET(code = id * 10 + 200)", """[\\"\$.id\\"]""", '', '', '') + + check_load_result(test_load_label1, testTable) + sql "sync" + qt_select4 "select * from ${testTable} order by id" + + // test new json reader + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table1.call(testTable) + load_from_hdfs1.call("true", "false", "false", testTable, test_load_label2, "simple_object_json.json", + fsPath, hdfsUser, "SET(code = id * 10 + 200)", """[\\"\$.id\\"]""", '', '', '') + + check_load_result(test_load_label2, testTable) + sql "sync" + qt_select4 "select * from ${testTable} order by id" + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } + + // case5: import json with line reader + def q5 = { + try { + def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + load_from_hdfs1.call("false", "true", "false", testTable, test_load_label1, "multi_line_json.json", + fsPath, hdfsUser, '', '', '', '', '') + + check_load_result(test_load_label1, testTable) + sql "sync" + qt_select5 "select * from ${testTable} order by id" + + // test new json reader + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table1.call(testTable) + load_from_hdfs1.call("true", "true", "false", testTable, test_load_label2, "multi_line_json.json", + fsPath, hdfsUser, '', '', '', '', '') + + check_load_result(test_load_label2, testTable) + sql "sync" + qt_select5 "select * from ${testTable} order by id" + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } + + // case6: import json use exprs and jsonpaths + def q6 = { + try { + def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + load_from_hdfs1.call("false", "true", "false", testTable, test_load_label1, "multi_line_json.json", + fsPath, hdfsUser, "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.code\\"]""", '', '', '') + + check_load_result(test_load_label1, testTable) + sql "sync" + qt_select6 "select * from ${testTable} order by id" + + // test new json reader + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table1.call(testTable) + load_from_hdfs1.call("true", "true", "false", testTable, test_load_label2, "multi_line_json.json", + fsPath, hdfsUser, "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.code\\"]""", '', '', '') + + check_load_result(test_load_label2, testTable) + sql "sync" + qt_select6 "select * from ${testTable} order by id" + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } + + // case7: import json use where + def q7 = { + try { + def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + load_from_hdfs1.call("false", "true", "false", testTable, test_load_label1, "multi_line_json.json", + fsPath, hdfsUser, "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.code\\"]""", '', '', 'WHERE id>50') + + check_load_result(test_load_label1, testTable) + sql "sync" + qt_select7 "select * from ${testTable} order by id" + + // test new json reader + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table1.call(testTable) + load_from_hdfs1.call("true", "true", "false", testTable, test_load_label2, "multi_line_json.json", + fsPath, hdfsUser, "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.code\\"]""", '', '', 'WHERE id>50') + + check_load_result(test_load_label2, testTable) + sql "sync" + qt_select7 "select * from ${testTable} order by id" + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } + + + // case8: import json use fuzzy_parse + def q8 = { + try { + def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + load_from_hdfs1.call("false", "true", "true", testTable, test_load_label1, "multi_line_json.json", + fsPath, hdfsUser, "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.city\\"]""", '', '', 'WHERE id>50') + + check_load_result(test_load_label1, testTable) + sql "sync" + qt_select8 "select * from ${testTable} order by id" + + // test new json reader + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table1.call(testTable) + load_from_hdfs1.call("true", "true", "true", testTable, test_load_label2, "multi_line_json.json", + fsPath, hdfsUser, "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.city\\"]""", '', '', 'WHERE id>50') + + check_load_result(test_load_label2, testTable) + sql "sync" + qt_select8 "select * from ${testTable} order by id" + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } + + // case9: import json use json_root + def q9 = { + try { + def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + load_from_hdfs1.call("false", "false", "true", testTable, test_load_label1, "nest_json.json", + fsPath, hdfsUser, "SET(id = id * 10)", '', '$.item', '', '') + + check_load_result(test_load_label1, testTable) + sql "sync" + qt_select9 "select * from ${testTable} order by id" + + // test new json reader + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table1.call(testTable) + load_from_hdfs1.call("true", "false", "true", testTable, test_load_label2, "nest_json.json", + fsPath, hdfsUser, "SET(id = id * 10)", '', '$.item', '', '') + + check_load_result(test_load_label2, testTable) + sql "sync" + qt_select9 "select * from ${testTable} order by id" + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } + + // case10: test json file which is unordered and no use json_path + def q10 = { + try { + def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + load_from_hdfs1.call("false", "true", "false", testTable, test_load_label1, "multi_line_json_unorder.json", + fsPath, hdfsUser, '', '', '', '', '') + + check_load_result(test_load_label1, testTable) + sql "sync" + qt_select10 "select * from ${testTable} order by id" + + // test new json reader + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table1.call(testTable) + load_from_hdfs1.call("true", "true", "false", testTable, test_load_label2, "multi_line_json_unorder.json", + fsPath, hdfsUser, '', '', '', '', '') + + check_load_result(test_load_label2, testTable) + sql "sync" + qt_select10 "select * from ${testTable} order by id" + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } + + // case11: test json file which is unordered and lack one column which is nullable + def q11 = { + try { + def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + load_from_hdfs1.call("false", "true", "false", testTable, test_load_label1, "multi_line_json_lack_column.json", + fsPath, hdfsUser, '', '', '', '', '') + + check_load_result(test_load_label1, testTable) + sql "sync" + qt_select11 "select * from ${testTable} order by id" + + // test new json reader + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table1.call(testTable) + load_from_hdfs1.call("true", "true", "false", testTable, test_load_label2, "multi_line_json_lack_column.json", + fsPath, hdfsUser, '', '', '', '', '') + + check_load_result(test_load_label2, testTable) + sql "sync" + qt_select11 "select * from ${testTable} order by id" + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } + + + // case12: use json_path and json_root + def q12 = { + try { + def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + load_from_hdfs1.call("false", "false", "false", testTable, test_load_label1, "nest_json.json", fsPath, hdfsUser, + "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.city\\"]""", '$.item', '', '') + + check_load_result(test_load_label1, testTable) + sql "sync" + qt_select12 "select * from ${testTable} order by id" + + // test new json reader + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table1.call(testTable) + load_from_hdfs1.call("true", "false", "false", testTable, test_load_label2, "nest_json.json", fsPath, hdfsUser, + "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.city\\"]""", '$.item', '', '') + + check_load_result(test_load_label2, testTable) + sql "sync" + qt_select12 "select * from ${testTable} order by id" + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } + + // case13: use json_path & json_root & where + def q13 = { + try { + def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + load_from_hdfs1.call("false", "false", "false", testTable, test_load_label1, "nest_json.json", fsPath, hdfsUser, + "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.city\\"]""", '$.item', '', 'WHERE id>20') + + check_load_result(test_load_label1, testTable) + sql "sync" + qt_select13 "select * from ${testTable} order by id" + + // test new json reader + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table1.call(testTable) + load_from_hdfs1.call("true", "false", "false", testTable, test_load_label2, "nest_json.json", fsPath, hdfsUser, + "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.city\\"]""", '$.item', '', 'WHERE id>20') + + check_load_result(test_load_label2, testTable) + sql "sync" + qt_select13 "select * from ${testTable} order by id" + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } + + // case14: use jsonpaths & json_root & where & columns + def q14 = { + try { + def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + load_from_hdfs1.call("false", "false", "false", testTable, test_load_label1, "nest_json.json", fsPath, hdfsUser, + "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.code\\",\\"\$.city\\"]""", '$.item', + '(id, code, city)', 'WHERE id>20') + + check_load_result(test_load_label1, testTable) + sql "sync" + qt_select14 "select * from ${testTable} order by id" + + // test new json reader + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table1.call(testTable) + load_from_hdfs1.call("true", "false", "false", testTable, test_load_label2, "nest_json.json", fsPath, hdfsUser, + "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.code\\", \\"\$.city\\"]""", '$.item', + '(id, code, city)', 'WHERE id>20') + + check_load_result(test_load_label2, testTable) + sql "sync" + qt_select14 "select * from ${testTable} order by id" + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + } + + + + + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + log.info("Begin Test q1:") + q1() + log.info("Begin Test q2:") + q2() + log.info("Begin Test q3:") + q3() + log.info("Begin Test q4:") + q4() + log.info("Begin Test q5:") + q5() + log.info("Begin Test q6:") + q6() + log.info("Begin Test q7:") + q7() + log.info("Begin Test q8:") + q8() + log.info("Begin Test q9:") + q9() + log.info("Begin Test q10:") + q10() + log.info("Begin Test q11:") + q11() + log.info("Begin Test q12:") + q12() + log.info("Begin Test q13:") + q13() + log.info("Begin Test q14:") + q14() + } +} \ No newline at end of file diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy b/regression-test/suites/load_p0/stream_load/test_json_load.groovy index 8dc594c660..e066467e3e 100644 --- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_json_load", "p0") { +suite("test_json_load", "p0") { // define a sql table def testTable = "test_json_load" @@ -64,6 +64,30 @@ suite("test_json_load", "p0") { assertTrue(result2[0][0] == 1, "Insert should update 1 rows") } + // city is NOT NULL + def create_test_table3 = {testTablex -> + // multi-line sql + def result1 = sql """ + CREATE TABLE IF NOT EXISTS ${testTablex} ( + id INT DEFAULT '10', + city VARCHAR(32) NOT NULL, + code BIGINT SUM DEFAULT '0') + DISTRIBUTED BY HASH(id) BUCKETS 10 + PROPERTIES("replication_num" = "1"); + """ + + // DDL/DML return 1 row and 3 column, the only value is update row count + assertTrue(result1.size() == 1) + assertTrue(result1[0].size() == 1) + assertTrue(result1[0][0] == 0, "Create table should update 0 rows") + + // insert 1 row to check whether the table is ok + def result2 = sql "INSERT INTO ${testTablex} (id, city, code) VALUES (200, 'hangzhou', 12345)" + assertTrue(result2.size() == 1) + assertTrue(result2[0].size() == 1) + assertTrue(result2[0][0] == 1, "Insert should update 1 rows") + } + def test_invalid_json_array_table = { testTablex -> // multi-line sql def result1 = sql """ @@ -90,8 +114,11 @@ suite("test_json_load", "p0") { assertTrue(result1[0][0] == 0, "Create table should update 0 rows") } - def load_json_data = {label, strip_flag, read_flag, format_flag, exprs, json_paths, - json_root, where_expr, fuzzy_flag, file_name, ignore_failure=false -> + def load_json_data = {new_json_reader_flag, label, strip_flag, read_flag, format_flag, exprs, json_paths, + json_root, where_expr, fuzzy_flag, file_name, ignore_failure=false -> + // should be delete after new_load_scan is ready + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");""" + // load the json data streamLoad { table "test_json_load" @@ -123,6 +150,9 @@ suite("test_json_load", "p0") { assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) } } + + // should be deleted after new_load_scan is ready + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");""" } def load_from_hdfs1 = {testTablex, label, hdfsFilePath, format, brokerName, hdfsUser, hdfsPasswd -> @@ -182,10 +212,20 @@ suite("test_json_load", "p0") { create_test_table1.call(testTable) - load_json_data.call('test_json_load_case1', 'true', '', 'json', '', '', '', '', '', 'simple_json.json') + load_json_data.call('false', 'test_json_load_case1', 'true', '', 'json', '', '', '', '', '', 'simple_json.json') sql "sync" - qt_select "select * from ${testTable} order by id" + qt_select1 "select * from ${testTable} order by id" + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + + load_json_data.call('true', 'test_json_load_case1_2', 'true', '', 'json', '', '', '', '', '', 'simple_json.json') + + sql "sync" + qt_select1 "select * from ${testTable} order by id" } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") @@ -197,10 +237,20 @@ suite("test_json_load", "p0") { create_test_table1.call(testTable) - load_json_data.call('test_json_load_case2', 'true', '', 'json', 'id= id * 10', '', '', '', '', 'simple_json.json') + load_json_data.call('false', 'test_json_load_case2', 'true', '', 'json', 'id= id * 10', '', '', '', '', 'simple_json.json') sql "sync" - qt_select "select * from ${testTable} order by id" + qt_select2 "select * from ${testTable} order by id" + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + + load_json_data.call('true', 'test_json_load_case2_2', 'true', '', 'json', 'id= id * 10', '', '', '', '', 'simple_json.json') + + sql "sync" + qt_select2 "select * from ${testTable} order by id" } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") @@ -212,11 +262,22 @@ suite("test_json_load", "p0") { create_test_table2.call(testTable) - load_json_data.call('test_json_load_case3', 'true', '', 'json', '', '[\"$.id\", \"$.code\"]', + load_json_data.call('false', 'test_json_load_case3', 'true', '', 'json', '', '[\"$.id\", \"$.code\"]', '', '', '', 'simple_json.json') sql "sync" - qt_select "select * from ${testTable} order by id" + qt_select3 "select * from ${testTable} order by id" + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table2.call(testTable) + + load_json_data.call('true', 'test_json_load_case3_2', 'true', '', 'json', '', '[\"$.id\", \"$.code\"]', + '', '', '', 'simple_json.json') + + sql "sync" + qt_select3 "select * from ${testTable} order by id" } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") @@ -228,11 +289,22 @@ suite("test_json_load", "p0") { create_test_table2.call(testTable) - load_json_data.call('test_json_load_case4', 'true', '', 'json', 'code = id * 10 + 200', '[\"$.id\"]', + load_json_data.call('false', 'test_json_load_case4', 'true', '', 'json', 'code = id * 10 + 200', '[\"$.id\"]', '', '', '', 'simple_json.json') sql "sync" - qt_select "select * from ${testTable} order by id" + qt_select4 "select * from ${testTable} order by id" + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table2.call(testTable) + + load_json_data.call('true', 'test_json_load_case4_2', 'true', '', 'json', 'code = id * 10 + 200', '[\"$.id\"]', + '', '', '', 'simple_json.json') + + sql "sync" + qt_select4 "select * from ${testTable} order by id" } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") @@ -244,11 +316,22 @@ suite("test_json_load", "p0") { create_test_table2.call(testTable) - load_json_data.call('test_json_load_case5', 'true', 'true', 'json', '', '[\"$.id\", \"$.code\"]', + load_json_data.call('false', 'test_json_load_case5', 'true', 'true', 'json', '', '[\"$.id\", \"$.code\"]', '', '', '', 'multi_line_json.json') sql "sync" - qt_select "select * from ${testTable} order by id" + qt_select5 "select * from ${testTable} order by id" + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table2.call(testTable) + + load_json_data.call('true', 'test_json_load_case5_2', 'true', 'true', 'json', '', '[\"$.id\", \"$.code\"]', + '', '', '', 'multi_line_json.json') + + sql "sync" + qt_select5 "select * from ${testTable} order by id" } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") @@ -260,11 +343,23 @@ suite("test_json_load", "p0") { create_test_table2.call(testTable) - load_json_data.call('test_json_load_case6', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', + load_json_data.call('false', 'test_json_load_case6', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', '', '', '', 'multi_line_json.json') sql "sync" - qt_select "select * from ${testTable} order by id" + qt_select6 "select * from ${testTable} order by id" + + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table2.call(testTable) + + load_json_data.call('true', 'test_json_load_case6_2', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', + '', '', '', 'multi_line_json.json') + + sql "sync" + qt_select6 "select * from ${testTable} order by id" } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") @@ -276,11 +371,22 @@ suite("test_json_load", "p0") { create_test_table2.call(testTable) - load_json_data.call('test_json_load_case7', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', + load_json_data.call('false', 'test_json_load_case7', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', '', 'id > 50', '', 'multi_line_json.json') sql "sync" - qt_select "select * from ${testTable} order by id" + qt_select7 "select * from ${testTable} order by id" + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table2.call(testTable) + + load_json_data.call('true', 'test_json_load_case7_2', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', + '', 'id > 50', '', 'multi_line_json.json') + + sql "sync" + qt_select7 "select * from ${testTable} order by id" } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") @@ -292,11 +398,23 @@ suite("test_json_load", "p0") { create_test_table2.call(testTable) - load_json_data.call('test_json_load_case8', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', + load_json_data.call('false', 'test_json_load_case8', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', '', 'id > 50', 'true', 'multi_line_json.json') sql "sync" - qt_select "select * from ${testTable} order by id" + qt_select8 "select * from ${testTable} order by id" + + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table2.call(testTable) + + load_json_data.call('true', 'test_json_load_case8_2', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', + '', 'id > 50', 'true', 'multi_line_json.json') + + sql "sync" + qt_select8 "select * from ${testTable} order by id" } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") @@ -308,26 +426,258 @@ suite("test_json_load", "p0") { create_test_table1.call(testTable) - load_json_data.call('test_json_load_case9', '', 'true', 'json', 'id= id * 10', '', + load_json_data.call('false', 'test_json_load_case9', '', 'true', 'json', 'id= id * 10', '', '$.item', '', 'true', 'nest_json.json') sql "sync" - qt_select "select * from ${testTable} order by id" + qt_select9 "select * from ${testTable} order by id" + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + + load_json_data.call('true', 'test_json_load_case9_2', '', 'true', 'json', 'id= id * 10', '', + '$.item', '', 'true', 'nest_json.json') + + sql "sync" + qt_select9 "select * from ${testTable} order by id" } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") } + // case10: invalid json try { sql "DROP TABLE IF EXISTS ${testTable}" create_test_table1.call(testTable) - load_json_data.call('test_json_load_case10', '', 'true', 'json', 'id= id * 10', '', + load_json_data.call('false', 'test_json_load_case10', '', 'true', 'json', 'id= id * 10', '', '$.item', '', 'true', 'invalid_json.json', true) sql "sync" - qt_select "select * from ${testTable} order by id" + qt_select10 "select * from ${testTable} order by id" + + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + + load_json_data.call('true', 'test_json_load_case10_2', '', 'true', 'json', 'id= id * 10', '', + '$.item', '', 'true', 'invalid_json.json', true) + + sql "sync" + qt_select10 "select * from ${testTable} order by id" + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + + // case11: test json file which is unordered and no use json_path + try { + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + + load_json_data.call('false', 'test_json_load_case11', 'true', '', 'json', '', '', '', '', '', 'simple_json2.json') + + sql "sync" + qt_select11 "select * from ${testTable} order by id" + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + + load_json_data.call('true', 'test_json_load_case11_2', 'true', '', 'json', '', '', '', '', '', 'simple_json2.json') + + sql "sync" + qt_select11 "select * from ${testTable} order by id" + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + + // case12: test json file which is unordered and lack one column which is nullable + try { + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + + load_json_data.call('false', 'test_json_load_case12', 'true', '', 'json', '', '', '', '', '', 'simple_json2_lack_one_column.json') + + sql "sync" + qt_select12 "select * from ${testTable} order by id" + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + + load_json_data.call('true', 'test_json_load_case12_2', 'true', '', 'json', '', '', '', '', '', 'simple_json2_lack_one_column.json') + + sql "sync" + qt_select12 "select * from ${testTable} order by id" + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + + // case13: test json file which is unordered and lack one column which is not nullable + try { + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table3.call(testTable) + // should be delete after new_load_scan is ready + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");""" + // load the json data + streamLoad { + table "${testTable}" + + // set http request header params + set 'strip_outer_array', "true" + set 'format', "json" + set 'max_filter_ratio', '1' + file "simple_json2_lack_one_column.json" // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows + json.NumberFilteredRows) + assertEquals(json.NumberFilteredRows, 4) + assertEquals(json.NumberLoadedRows, 6) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + // should be deleted after new_load_scan is ready + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");""" + sql "sync" + qt_select13 "select * from ${testTable} order by id" + + + sql "DROP TABLE IF EXISTS ${testTable}" + create_test_table3.call(testTable) + // should be delete after new_load_scan is ready + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "true");""" + // load the json data + streamLoad { + table "${testTable}" + + // set http request header params + set 'strip_outer_array', "true" + set 'format', "json" + set 'max_filter_ratio', '1' + file "simple_json2_lack_one_column.json" // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows + json.NumberFilteredRows) + assertEquals(json.NumberFilteredRows, 4) + assertEquals(json.NumberLoadedRows, 6) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + // should be deleted after new_load_scan is ready + sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");""" + sql "sync" + qt_select13 "select * from ${testTable} order by id" + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + + // case14: use json_path and json_root + try { + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + + load_json_data.call('false', 'test_json_load_case14', '', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', + '$.item', '', 'true', 'nest_json.json') + + sql "sync" + qt_select14 "select * from ${testTable} order by id" + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + + load_json_data.call('true', 'test_json_load_case14_2', '', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]', + '$.item', '', 'true', 'nest_json.json') + + sql "sync" + qt_select14 "select * from ${testTable} order by id" + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + + // case15: apply jsonpaths & exprs & json_root + try { + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + + load_json_data.call('false', 'test_json_load_case15', '', 'true', 'json', 'id, code, city, id= id * 10', + '[\"$.id\", \"$.code\", \"$.city\"]', '$.item', '', 'true', 'nest_json.json') + + sql "sync" + qt_select15 "select * from ${testTable} order by id" + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + + load_json_data.call('true', 'test_json_load_case15_2', '', 'true', 'json', 'id, code, city,id= id * 10', + '[\"$.id\", \"$.code\", \"$.city\"]', '$.item', '', 'true', 'nest_json.json') + + sql "sync" + qt_select15 "select * from ${testTable} order by id" + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } + + // case16: apply jsonpaths & exprs & json_root + try { + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + + load_json_data.call('false', 'test_json_load_case16', 'true', '', 'json', 'id, code, city', + '[\"$.id\", \"$.code\", \"$.city[2]\"]', '$.item', '', 'true', 'nest_json_array.json') + + sql "sync" + qt_select16 "select * from ${testTable} order by id" + + // test new json reader + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table1.call(testTable) + + load_json_data.call('true', 'test_json_load_case16_2', 'true', '', 'json', 'id, code, city', + '[\"$.id\", \"$.code\", \"$.city[2]\"]', '$.item', '', 'true', 'nest_json_array.json') + + sql "sync" + qt_select16 "select * from ${testTable} order by id" } finally { try_sql("DROP TABLE IF EXISTS ${testTable}") @@ -342,7 +692,7 @@ suite("test_json_load", "p0") { def hdfs_file_path = uploadToHdfs "stream_load/simple_object_json.json" def format = "json" - // case11: import json use pre-filter exprs + // case17: import json use pre-filter exprs try { sql "DROP TABLE IF EXISTS ${testTable}" @@ -357,7 +707,7 @@ suite("test_json_load", "p0") { try_sql("DROP TABLE IF EXISTS ${testTable}") } - // case12: import json use pre-filter and where exprs + // case18: import json use pre-filter and where exprs try { sql "DROP TABLE IF EXISTS ${testTable}" @@ -372,12 +722,12 @@ suite("test_json_load", "p0") { try_sql("DROP TABLE IF EXISTS ${testTable}") } - // case13: invalid json + // case19: invalid json try { sql "DROP TABLE IF EXISTS ${testTable}" test_invalid_json_array_table.call(testTable) - load_json_data.call('test_json_load_case11', 'true', '', 'json', '', '', + load_json_data.call('false', 'test_json_load_case19', 'true', '', 'json', '', '', '', '', '', 'invalid_json_array.json', true) sql "sync" diff --git a/regression-test/suites/tpch_sf1_p0/multi_catalog_query_parquet/hive_catalog.groovy b/regression-test/suites/tpch_sf1_p0/multi_catalog_query_parquet/hive_catalog.groovy index 9db1324d4e..30d2798ef2 100644 --- a/regression-test/suites/tpch_sf1_p0/multi_catalog_query_parquet/hive_catalog.groovy +++ b/regression-test/suites/tpch_sf1_p0/multi_catalog_query_parquet/hive_catalog.groovy @@ -801,22 +801,23 @@ order by String[][] backends = sql """ show backends; """ assertTrue(backends.size() > 0) for (String[] backend in backends) { - StringBuilder setConfigCommand = new StringBuilder(); - setConfigCommand.append("curl -X POST http://") - setConfigCommand.append(backend[2]) - setConfigCommand.append(":") - setConfigCommand.append(backend[5]) - setConfigCommand.append("/api/update_config?") - String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=true" - logger.info(command1) - String command2 = setConfigCommand.toString() + "enable_new_file_scanner=true" - logger.info(command2) - def process1 = command1.execute() - int code = process1.waitFor() - assertEquals(code, 0) - def process2 = command2.execute() - code = process1.waitFor() - assertEquals(code, 0) + // No need to set this config anymore, but leave this code sample here + // StringBuilder setConfigCommand = new StringBuilder(); + // setConfigCommand.append("curl -X POST http://") + // setConfigCommand.append(backend[2]) + // setConfigCommand.append(":") + // setConfigCommand.append(backend[5]) + // setConfigCommand.append("/api/update_config?") + // String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=true" + // logger.info(command1) + // String command2 = setConfigCommand.toString() + "enable_new_file_scanner=true" + // logger.info(command2) + // def process1 = command1.execute() + // int code = process1.waitFor() + // assertEquals(code, 0) + // def process2 = command2.execute() + // code = process1.waitFor() + // assertEquals(code, 0) } }