[feature-wip](new-scan) support Json reader (#13546)

Issue Number: close #12574
This pr adds `NewJsonReader` which implements GenericReader interface to support read json format file.

TODO:
1. modify `_scann_eof` later.
2. Rename `NewJsonReader` to `JsonReader` when `JsonReader` is deleted.
This commit is contained in:
Tiewei Fang
2022-10-26 12:52:21 +08:00
committed by GitHub
parent 44c9163b3c
commit c418bbd2d1
31 changed files with 2820 additions and 154 deletions

View File

@ -258,6 +258,7 @@ set(VEC_FILES
exec/scan/new_es_scan_node.cpp
exec/format/csv/csv_reader.cpp
exec/format/orc/vorc_reader.cpp
exec/format/json/new_json_reader.cpp
)
add_library(Vec STATIC

View File

@ -0,0 +1,754 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/format/json/new_json_reader.h"
#include "common/compiler_util.h"
#include "exec/plain_text_line_reader.h"
#include "exprs/json_functions.h"
#include "io/file_factory.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
#include "vec/exec/scan/vscanner.h"
namespace doris::vectorized {
NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof)
: _vhandle_json_callback(nullptr),
_state(state),
_profile(profile),
_counter(counter),
_params(params),
_range(range),
_file_slot_descs(file_slot_descs),
_file_reader(nullptr),
_file_reader_s(nullptr),
_real_file_reader(nullptr),
_line_reader(nullptr),
_reader_eof(false),
_skip_first_line(false),
_next_row(0),
_total_rows(0),
_value_allocator(_value_buffer, sizeof(_value_buffer)),
_parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
_origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator),
_scanner_eof(scanner_eof) {
_file_format_type = _params.format_type;
_bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
_read_timer = ADD_TIMER(_profile, "ReadTime");
_file_read_timer = ADD_TIMER(_profile, "FileReadTime");
}
Status NewJsonReader::init_reader() {
RETURN_IF_ERROR(_get_range_params());
RETURN_IF_ERROR(_open_file_reader());
if (_read_json_by_line) {
RETURN_IF_ERROR(_open_line_reader());
}
// generate _parsed_jsonpaths and _parsed_json_root
RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
//improve performance
if (_parsed_jsonpaths.empty()) { // input is a simple json-string
_vhandle_json_callback = &NewJsonReader::_vhandle_simple_json;
} else { // input is a complex json-string and a json-path
if (_strip_outer_array) {
_vhandle_json_callback = &NewJsonReader::_vhandle_flat_array_complex_json;
} else {
_vhandle_json_callback = &NewJsonReader::_vhandle_nested_complex_json;
}
}
return Status::OK();
}
Status NewJsonReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
if (_reader_eof == true) {
*eof = true;
return Status::OK();
}
const int batch_size = _state->batch_size();
auto columns = block->mutate_columns();
while (columns[0]->size() < batch_size && !_reader_eof) {
if (UNLIKELY(_read_json_by_line && _skip_first_line)) {
size_t size = 0;
const uint8_t* line_ptr = nullptr;
RETURN_IF_ERROR(_line_reader->read_line(&line_ptr, &size, &_reader_eof));
_skip_first_line = false;
continue;
}
bool is_empty_row = false;
RETURN_IF_ERROR(_read_json_column(columns, _file_slot_descs, &is_empty_row, &_reader_eof));
++(*read_rows);
if (is_empty_row) {
// Read empty row, just continue
continue;
}
}
columns.clear();
return Status::OK();
}
Status NewJsonReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
for (auto& slot : _file_slot_descs) {
name_to_type->emplace(slot->col_name(), slot->type());
}
return Status::OK();
}
Status NewJsonReader::_get_range_params() {
if (!_params.__isset.file_attributes) {
return Status::InternalError("BE cat get file_attributes");
}
// get line_delimiter
if (_params.file_attributes.__isset.text_params &&
_params.file_attributes.text_params.__isset.line_delimiter) {
_line_delimiter = _params.file_attributes.text_params.line_delimiter;
_line_delimiter_length = _line_delimiter.size();
}
if (_params.file_attributes.__isset.jsonpaths) {
_jsonpaths = _params.file_attributes.jsonpaths;
}
if (_params.file_attributes.__isset.json_root) {
_json_root = _params.file_attributes.json_root;
}
if (_params.file_attributes.__isset.read_json_by_line) {
_read_json_by_line = _params.file_attributes.read_json_by_line;
}
if (_params.file_attributes.__isset.strip_outer_array) {
_strip_outer_array = _params.file_attributes.strip_outer_array;
}
if (_params.file_attributes.__isset.num_as_string) {
_num_as_string = _params.file_attributes.num_as_string;
}
if (_params.file_attributes.__isset.fuzzy_parse) {
_fuzzy_parse = _params.file_attributes.fuzzy_parse;
}
return Status::OK();
}
Status NewJsonReader::_open_file_reader() {
int64_t start_offset = _range.start_offset;
if (start_offset != 0) {
start_offset -= 1;
}
if (_params.file_type == TFileType::FILE_STREAM) {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, _file_reader_s));
_real_file_reader = _file_reader_s.get();
} else {
RETURN_IF_ERROR(FileFactory::create_file_reader(
_profile, _params, _range.path, start_offset, _range.file_size, 0, _file_reader));
_real_file_reader = _file_reader.get();
}
return _real_file_reader->open();
}
Status NewJsonReader::_open_line_reader() {
int64_t size = _range.size;
if (_range.start_offset != 0) {
// When we fetch range doesn't start from 0, size will += 1.
// TODO(ftw): check what if file_reader is stream_pipe? Is `size+=1` is correct?
size += 1;
_skip_first_line = true;
} else {
_skip_first_line = false;
}
_line_reader.reset(new PlainTextLineReader(_profile, _real_file_reader, nullptr, size,
_line_delimiter, _line_delimiter_length));
return Status::OK();
}
Status NewJsonReader::_parse_jsonpath_and_json_root() {
// parse jsonpaths
if (!_jsonpaths.empty()) {
rapidjson::Document jsonpaths_doc;
if (!jsonpaths_doc.Parse(_jsonpaths.c_str(), _jsonpaths.length()).HasParseError()) {
if (!jsonpaths_doc.IsArray()) {
return Status::InvalidArgument("Invalid json path: {}", _jsonpaths);
} else {
for (int i = 0; i < jsonpaths_doc.Size(); i++) {
const rapidjson::Value& path = jsonpaths_doc[i];
if (!path.IsString()) {
return Status::InvalidArgument("Invalid json path: {}", _jsonpaths);
}
std::vector<JsonPath> parsed_paths;
JsonFunctions::parse_json_paths(path.GetString(), &parsed_paths);
_parsed_jsonpaths.push_back(std::move(parsed_paths));
}
}
} else {
return Status::InvalidArgument("Invalid json path: {}", _jsonpaths);
}
}
// parse jsonroot
if (!_json_root.empty()) {
JsonFunctions::parse_json_paths(_json_root, &_parsed_json_root);
}
return Status::OK();
}
Status NewJsonReader::_read_json_column(std::vector<MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof) {
return (this->*_vhandle_json_callback)(columns, slot_descs, is_empty_row, eof);
}
Status NewJsonReader::_vhandle_simple_json(std::vector<MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof) {
do {
bool valid = false;
if (_next_row >= _total_rows) { // parse json and generic document
Status st = _parse_json(is_empty_row, eof);
if (st.is_data_quality_error()) {
continue; // continue to read next
}
RETURN_IF_ERROR(st);
if (*is_empty_row == true) {
return Status::OK();
}
_name_map.clear();
rapidjson::Value* objectValue = nullptr;
if (_json_doc->IsArray()) {
_total_rows = _json_doc->Size();
if (_total_rows == 0) {
// may be passing an empty json, such as "[]"
RETURN_IF_ERROR(_append_error_msg(*_json_doc, "Empty json line", "", nullptr));
// TODO(ftw): check _reader_eof??
if (_reader_eof) {
*is_empty_row = true;
return Status::OK();
}
continue;
}
objectValue = &(*_json_doc)[0];
} else {
_total_rows = 1; // only one row
objectValue = _json_doc;
}
_next_row = 0;
if (_fuzzy_parse) {
for (auto v : slot_descs) {
for (int i = 0; i < objectValue->MemberCount(); ++i) {
auto it = objectValue->MemberBegin() + i;
if (v->col_name() == it->name.GetString()) {
_name_map[v->col_name()] = i;
break;
}
}
}
}
}
if (_json_doc->IsArray()) { // handle case 1
rapidjson::Value& objectValue = (*_json_doc)[_next_row]; // json object
RETURN_IF_ERROR(_set_column_value(objectValue, columns, slot_descs, &valid));
} else { // handle case 2
RETURN_IF_ERROR(_set_column_value(*_json_doc, columns, slot_descs, &valid));
}
_next_row++;
if (!valid) {
if (*_scanner_eof) {
// When _scanner_eof is true and valid is false, it means that we have encountered
// unqualified data and decided to stop the scan.
*is_empty_row = true;
// TODO(ftw): check *eof=true?
*eof = true;
return Status::OK();
}
continue;
}
*is_empty_row = false;
break; // get a valid row, then break
} while (_next_row <= _total_rows);
return Status::OK();
}
Status NewJsonReader::_vhandle_flat_array_complex_json(
std::vector<MutableColumnPtr>& columns, const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof) {
do {
if (_next_row >= _total_rows) {
Status st = _parse_json(is_empty_row, eof);
if (st.is_data_quality_error()) {
continue; // continue to read next
}
RETURN_IF_ERROR(st);
if (*is_empty_row == true) {
if (st == Status::OK()) {
return Status::OK();
}
if (_total_rows == 0) {
continue;
}
}
}
rapidjson::Value& objectValue = (*_json_doc)[_next_row++];
bool valid = true;
RETURN_IF_ERROR(_write_columns_by_jsonpath(objectValue, slot_descs, columns, &valid));
if (!valid) {
continue; // process next line
}
*is_empty_row = false;
break; // get a valid row, then break
} while (_next_row <= _total_rows);
return Status::OK();
}
Status NewJsonReader::_vhandle_nested_complex_json(std::vector<MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof) {
while (true) {
Status st = _parse_json(is_empty_row, eof);
if (st.is_data_quality_error()) {
continue; // continue to read next
}
RETURN_IF_ERROR(st);
if (*is_empty_row == true) {
return Status::OK();
}
*is_empty_row = false;
break; // read a valid row
}
bool valid = true;
RETURN_IF_ERROR(_write_columns_by_jsonpath(*_json_doc, slot_descs, columns, &valid));
if (!valid) {
// there is only one line in this case, so if it return false, just set is_empty_row true
// so that the caller will continue reading next line.
*is_empty_row = true;
}
return Status::OK();
}
Status NewJsonReader::_parse_json(bool* is_empty_row, bool* eof) {
size_t size = 0;
RETURN_IF_ERROR(_parse_json_doc(&size, eof));
// read all data, then return
if (size == 0 || *eof) {
*is_empty_row = true;
return Status::OK();
}
if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
_total_rows = _json_doc->Size();
_next_row = 0;
if (_total_rows == 0) {
// meet an empty json array.
*is_empty_row = true;
}
}
return Status::OK();
}
// read one json string from line reader or file reader and parse it to json doc.
// return Status::DataQualityError() if data has quality error.
// return other error if encounter other problemes.
// return Status::OK() if parse succeed or reach EOF.
Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) {
// read a whole message
SCOPED_TIMER(_file_read_timer);
const uint8_t* json_str = nullptr;
std::unique_ptr<uint8_t[]> json_str_ptr;
if (_line_reader != nullptr) {
RETURN_IF_ERROR(_line_reader->read_line(&json_str, size, eof));
} else {
int64_t length = 0;
RETURN_IF_ERROR(_real_file_reader->read_one_message(&json_str_ptr, &length));
json_str = json_str_ptr.get();
*size = length;
if (length == 0) {
*eof = true;
}
}
_bytes_read_counter += *size;
if (*eof) {
return Status::OK();
}
// clear memory here.
_value_allocator.Clear();
_parse_allocator.Clear();
bool has_parse_error = false;
// parse jsondata to JsonDoc
// As the issue: https://github.com/Tencent/rapidjson/issues/1458
// Now, rapidjson only support uint64_t, So lagreint load cause bug. We use kParseNumbersAsStringsFlag.
if (_num_as_string) {
has_parse_error =
_origin_json_doc
.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str, *size)
.HasParseError();
} else {
has_parse_error = _origin_json_doc.Parse((char*)json_str, *size).HasParseError();
}
if (has_parse_error) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}",
_origin_json_doc.GetParseError(),
rapidjson::GetParseError_En(_origin_json_doc.GetParseError()));
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return std::string((char*)json_str, *size); },
[&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof));
_counter->num_rows_filtered++;
if (*_scanner_eof) {
// Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means
// we meet enough invalid rows and the scanner should be stopped.
// So we set eof to true and return OK, the caller will stop the process as we meet the end of file.
*eof = true;
return Status::OK();
}
return Status::DataQualityError(fmt::to_string(error_msg));
}
// set json root
if (_parsed_json_root.size() != 0) {
_json_doc = JsonFunctions::get_json_object_from_parsed_json(
_parsed_json_root, &_origin_json_doc, _origin_json_doc.GetAllocator());
if (_json_doc == nullptr) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}", "JSON Root not found.");
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return _print_json_value(_origin_json_doc); },
[&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof));
_counter->num_rows_filtered++;
if (*_scanner_eof) {
// Same as Case A
*eof = true;
return Status::OK();
}
return Status::DataQualityError(fmt::to_string(error_msg));
}
} else {
_json_doc = &_origin_json_doc;
}
if (_json_doc->IsArray() && !_strip_outer_array) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}",
"JSON data is array-object, `strip_outer_array` must be TRUE.");
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return _print_json_value(_origin_json_doc); },
[&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof));
_counter->num_rows_filtered++;
if (*_scanner_eof) {
// Same as Case A
*eof = true;
return Status::OK();
}
return Status::DataQualityError(fmt::to_string(error_msg));
}
if (!_json_doc->IsArray() && _strip_outer_array) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}",
"JSON data is not an array-object, `strip_outer_array` must be FALSE.");
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return _print_json_value(_origin_json_doc); },
[&]() -> std::string { return fmt::to_string(error_msg); }, _scanner_eof));
_counter->num_rows_filtered++;
if (*_scanner_eof) {
// Same as Case A
*eof = true;
return Status::OK();
}
return Status::DataQualityError(fmt::to_string(error_msg));
}
return Status::OK();
}
// for simple format json
// set valid to true and return OK if succeed.
// set valid to false and return OK if we met an invalid row.
// return other status if encounter other problmes.
Status NewJsonReader::_set_column_value(rapidjson::Value& objectValue,
std::vector<MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs,
bool* valid) {
if (!objectValue.IsObject()) {
// Here we expect the incoming `objectValue` to be a Json Object, such as {"key" : "value"},
// not other type of Json format.
RETURN_IF_ERROR(_append_error_msg(objectValue, "Expect json object value", "", valid));
return Status::OK();
}
int ctx_idx = 0;
bool has_valid_value = false;
size_t cur_row_count = columns[0]->size();
for (auto slot_desc : slot_descs) {
if (!slot_desc->is_materialized()) {
continue;
}
int dest_index = ctx_idx++;
auto* column_ptr = columns[dest_index].get();
rapidjson::Value::ConstMemberIterator it = objectValue.MemberEnd();
if (_fuzzy_parse) {
auto idx_it = _name_map.find(slot_desc->col_name());
if (idx_it != _name_map.end() && idx_it->second < objectValue.MemberCount()) {
it = objectValue.MemberBegin() + idx_it->second;
}
} else {
it = objectValue.FindMember(
rapidjson::Value(slot_desc->col_name().c_str(), slot_desc->col_name().size()));
}
if (it != objectValue.MemberEnd()) {
const rapidjson::Value& value = it->value;
RETURN_IF_ERROR(_write_data_to_column(&value, slot_desc, column_ptr, valid));
if (!(*valid)) {
return Status::OK();
}
has_valid_value = true;
} else { // not found
// When the entire row has no valid value, this row should be filtered,
// so the default value cannot be directly inserted here
if (!slot_desc->is_nullable()) {
RETURN_IF_ERROR(_append_error_msg(
objectValue,
"The column `{}` is not nullable, but it's not found in jsondata.",
slot_desc->col_name(), valid));
break;
}
}
}
if (!has_valid_value) {
RETURN_IF_ERROR(_append_error_msg(objectValue, "All fields is null, this is a invalid row.",
"", valid));
return Status::OK();
}
ctx_idx = 0;
int nullcount = 0;
// fill missing slot
for (auto slot_desc : slot_descs) {
if (!slot_desc->is_materialized()) {
continue;
}
int dest_index = ctx_idx++;
auto* column_ptr = columns[dest_index].get();
if (column_ptr->size() < cur_row_count + 1) {
DCHECK(column_ptr->size() == cur_row_count);
column_ptr->assume_mutable()->insert_default();
++nullcount;
}
DCHECK(column_ptr->size() == cur_row_count + 1);
}
// There is at least one valid value here
DCHECK(nullcount < columns.size());
*valid = true;
return Status::OK();
}
Status NewJsonReader::_write_data_to_column(rapidjson::Value::ConstValueIterator value,
SlotDescriptor* slot_desc,
vectorized::IColumn* column_ptr, bool* valid) {
const char* str_value = nullptr;
char tmp_buf[128] = {0};
int32_t wbytes = 0;
std::string json_str;
vectorized::ColumnNullable* nullable_column = nullptr;
if (slot_desc->is_nullable()) {
nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
// kNullType will put 1 into the Null map, so there is no need to push 0 for kNullType.
if (value->GetType() != rapidjson::Type::kNullType) {
nullable_column->get_null_map_data().push_back(0);
} else {
nullable_column->insert_default();
}
column_ptr = &nullable_column->get_nested_column();
}
switch (value->GetType()) {
case rapidjson::Type::kStringType:
str_value = value->GetString();
wbytes = strlen(str_value);
break;
case rapidjson::Type::kNumberType:
if (value->IsUint()) {
wbytes = sprintf(tmp_buf, "%u", value->GetUint());
} else if (value->IsInt()) {
wbytes = sprintf(tmp_buf, "%d", value->GetInt());
} else if (value->IsUint64()) {
wbytes = sprintf(tmp_buf, "%" PRIu64, value->GetUint64());
} else if (value->IsInt64()) {
wbytes = sprintf(tmp_buf, "%" PRId64, value->GetInt64());
} else {
wbytes = sprintf(tmp_buf, "%f", value->GetDouble());
}
str_value = tmp_buf;
break;
case rapidjson::Type::kFalseType:
wbytes = 1;
str_value = (char*)"0";
break;
case rapidjson::Type::kTrueType:
wbytes = 1;
str_value = (char*)"1";
break;
case rapidjson::Type::kNullType:
if (!slot_desc->is_nullable()) {
RETURN_IF_ERROR(_append_error_msg(
*value, "Json value is null, but the column `{}` is not nullable.",
slot_desc->col_name(), valid));
return Status::OK();
}
// return immediately to prevent from repeatedly insert_data
*valid = true;
return Status::OK();
default:
// for other type like array or object. we convert it to string to save
json_str = NewJsonReader::_print_json_value(*value);
wbytes = json_str.size();
str_value = json_str.c_str();
break;
}
// TODO: if the vexpr can support another 'slot_desc type' than 'TYPE_VARCHAR',
// we need use a function to support these types to insert data in columns.
DCHECK(slot_desc->type().type == TYPE_VARCHAR);
assert_cast<ColumnString*>(column_ptr)->insert_data(str_value, wbytes);
*valid = true;
return Status::OK();
}
Status NewJsonReader::_write_columns_by_jsonpath(rapidjson::Value& objectValue,
const std::vector<SlotDescriptor*>& slot_descs,
std::vector<MutableColumnPtr>& columns,
bool* valid) {
int ctx_idx = 0;
bool has_valid_value = false;
size_t cur_row_count = columns[0]->size();
for (auto slot_desc : slot_descs) {
if (!slot_desc->is_materialized()) {
continue;
}
int i = ctx_idx++;
auto* column_ptr = columns[i].get();
rapidjson::Value* json_values = nullptr;
bool wrap_explicitly = false;
if (LIKELY(i < _parsed_jsonpaths.size())) {
json_values = JsonFunctions::get_json_array_from_parsed_json(
_parsed_jsonpaths[i], &objectValue, _origin_json_doc.GetAllocator(),
&wrap_explicitly);
}
if (json_values == nullptr) {
// not match in jsondata.
if (!slot_descs[i]->is_nullable()) {
RETURN_IF_ERROR(_append_error_msg(
objectValue,
"The column `{}` is not nullable, but it's not found in jsondata.",
slot_descs[i]->col_name(), valid));
return Status::OK();
}
} else {
CHECK(json_values->IsArray());
if (json_values->Size() == 1 && wrap_explicitly) {
// NOTICE1: JsonFunctions::get_json_array_from_parsed_json() will wrap the single json object with an array.
// so here we unwrap the array to get the real element.
// if json_values' size > 1, it means we just match an array, not a wrapped one, so no need to unwrap.
json_values = &((*json_values)[0]);
}
RETURN_IF_ERROR(_write_data_to_column(json_values, slot_descs[i], column_ptr, valid));
if (!(*valid)) {
return Status::OK();
}
has_valid_value = true;
}
}
if (!has_valid_value) {
RETURN_IF_ERROR(_append_error_msg(
objectValue, "All fields is null or not matched, this is a invalid row.", "",
valid));
return Status::OK();
}
ctx_idx = 0;
for (auto slot_desc : slot_descs) {
if (!slot_desc->is_materialized()) {
continue;
}
int dest_index = ctx_idx++;
auto* column_ptr = columns[dest_index].get();
if (column_ptr->size() < cur_row_count + 1) {
DCHECK(column_ptr->size() == cur_row_count);
column_ptr->assume_mutable()->insert_default();
}
DCHECK(column_ptr->size() == cur_row_count + 1);
}
return Status::OK();
}
Status NewJsonReader::_append_error_msg(const rapidjson::Value& objectValue, std::string error_msg,
std::string col_name, bool* valid) {
std::string err_msg;
if (!col_name.empty()) {
fmt::memory_buffer error_buf;
fmt::format_to(error_buf, error_msg, col_name);
err_msg = fmt::to_string(error_buf);
} else {
err_msg = error_msg;
}
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return NewJsonReader::_print_json_value(objectValue); },
[&]() -> std::string { return err_msg; }, _scanner_eof));
// TODO(ftw): check here?
if (*_scanner_eof == true) {
_reader_eof = true;
}
_counter->num_rows_filtered++;
if (valid != nullptr) {
// current row is invalid
*valid = false;
}
return Status::OK();
}
std::string NewJsonReader::_print_json_value(const rapidjson::Value& value) {
rapidjson::StringBuffer buffer;
buffer.Clear();
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
value.Accept(writer);
return std::string(buffer.GetString());
}
} // namespace doris::vectorized

View File

@ -0,0 +1,150 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <rapidjson/document.h>
#include <rapidjson/error/en.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include "vec/exec/format/generic_reader.h"
namespace doris {
class FileReader;
struct JsonPath;
class LineReader;
class SlotDescriptor;
namespace vectorized {
struct ScannerCounter;
class NewJsonReader : public GenericReader {
public:
NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof);
~NewJsonReader() override = default;
Status init_reader();
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
private:
Status _get_range_params();
Status _open_file_reader();
Status _open_line_reader();
Status _parse_jsonpath_and_json_root();
Status _read_json_column(std::vector<MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
bool* eof);
Status _vhandle_simple_json(std::vector<MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
bool* eof);
Status _vhandle_flat_array_complex_json(std::vector<MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof);
Status _vhandle_nested_complex_json(std::vector<MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof);
Status _parse_json(bool* is_empty_row, bool* eof);
Status _parse_json_doc(size_t* size, bool* eof);
Status _set_column_value(rapidjson::Value& objectValue, std::vector<MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs, bool* valid);
Status _write_data_to_column(rapidjson::Value::ConstValueIterator value,
SlotDescriptor* slot_desc, vectorized::IColumn* column_ptr,
bool* valid);
Status _write_columns_by_jsonpath(rapidjson::Value& objectValue,
const std::vector<SlotDescriptor*>& slot_descs,
std::vector<MutableColumnPtr>& columns, bool* valid);
Status _append_error_msg(const rapidjson::Value& objectValue, std::string error_msg,
std::string col_name, bool* valid);
std::string _print_json_value(const rapidjson::Value& value);
private:
Status (NewJsonReader::*_vhandle_json_callback)(
std::vector<vectorized::MutableColumnPtr>& columns,
const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, bool* eof);
RuntimeState* _state;
RuntimeProfile* _profile;
ScannerCounter* _counter;
const TFileScanRangeParams& _params;
const TFileRangeDesc& _range;
const std::vector<SlotDescriptor*>& _file_slot_descs;
// _file_reader_s is for stream load pipe reader,
// and _file_reader is for other file reader.
// TODO: refactor this to use only shared_ptr or unique_ptr
std::unique_ptr<FileReader> _file_reader;
std::shared_ptr<FileReader> _file_reader_s;
FileReader* _real_file_reader;
std::unique_ptr<LineReader> _line_reader;
bool _reader_eof;
TFileFormatType::type _file_format_type;
// When we fetch range doesn't start from 0 will always skip the first line
bool _skip_first_line;
std::string _line_delimiter;
int _line_delimiter_length;
int _next_row;
int _total_rows;
std::string _jsonpaths;
std::string _json_root;
bool _read_json_by_line;
bool _strip_outer_array;
bool _num_as_string;
bool _fuzzy_parse;
std::vector<std::vector<JsonPath>> _parsed_jsonpaths;
std::vector<JsonPath> _parsed_json_root;
char _value_buffer[4 * 1024 * 1024]; // 4MB
char _parse_buffer[512 * 1024]; // 512KB
typedef rapidjson::GenericDocument<rapidjson::UTF8<>, rapidjson::MemoryPoolAllocator<>,
rapidjson::MemoryPoolAllocator<>>
Document;
rapidjson::MemoryPoolAllocator<> _value_allocator;
rapidjson::MemoryPoolAllocator<> _parse_allocator;
Document _origin_json_doc; // origin json document object from parsed json string
rapidjson::Value* _json_doc; // _json_doc equals _final_json_doc iff not set `json_root`
std::unordered_map<std::string, int> _name_map;
bool* _scanner_eof;
RuntimeProfile::Counter* _bytes_read_counter;
RuntimeProfile::Counter* _read_timer;
RuntimeProfile::Counter* _file_read_timer;
};
} // namespace vectorized
} // namespace doris

View File

@ -31,6 +31,7 @@
#include "runtime/raw_value.h"
#include "runtime/runtime_state.h"
#include "vec/exec/format/csv/csv_reader.h"
#include "vec/exec/format/json/new_json_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/scan/new_file_scan_node.h"
@ -494,6 +495,12 @@ Status VFileScanner::_get_next_reader() {
init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(_is_load);
break;
}
case TFileFormatType::FORMAT_JSON: {
_cur_reader.reset(new NewJsonReader(_state, _profile, &_counter, _params, range,
_file_slot_descs, &_scanner_eof));
init_status = ((NewJsonReader*)(_cur_reader.get()))->init_reader();
break;
}
default:
return Status::InternalError("Not supported file format: {}", _params.format_type);
}

View File

@ -27,6 +27,8 @@ echo "hadoop fs -mkdir /user/doris/"
hadoop fs -mkdir -p /user/doris/
echo "hadoop fs -put /mnt/scripts/tpch1.db /user/doris/"
hadoop fs -put /mnt/scripts/tpch1.db /user/doris/
echo "hadoop fs -put /mnt/scripts/json_format_test.db /user/doris/"
hadoop fs -put /mnt/scripts/json_format_test /user/doris/
echo "hive -f /mnt/scripts/create.hql"
hive -f /mnt/scripts/create.hql

View File

@ -0,0 +1,2 @@
[{"id": 1, "city": "beijing", "code": 1454547},{"id": 2, "city": "shanghai", "code": 1244264}, {"id": 3, "city": "guangzhou", "code": 528369},{"id": 4, "city": "shenzhen", "code": 594201},{"id": 5, "city": "hangzhou", "code": 594201}]
[{"id": 6, "city": "nanjing", "code": 2345672},{"id": 7, "city": "wuhan", "code": 2345673}, {"id": 8, "city": "chengdu", "code": 2345674},{"id": 9, "city": "xian", "code": 2345675},{"id": 10, "city": "hefei", "code": 2345676}]

View File

@ -0,0 +1,2 @@
[{"id": 1, "code": 1454547},{"city": "shanghai", "id": 2, "code": 1244264}, {"id": 3, "code": 528369},{"id": 4, "code": 594202},{"city": "hangzhou", "id": 5, "code": 594201}]
[{"city": "nanjing", "id": 6, "code": 2345672},{"id": 7, "code": 2345673}, {"id": 8, "code": 2345674},{"city": "xian", "id": 9, "code": 2345675},{"id": 10, "code": 2345676}]

View File

@ -0,0 +1,2 @@
[{"city": "beijing", "id": 1, "code": 1454547},{"city": "shanghai", "id": 2, "code": 1244264}, {"city": "guangzhou", "id": 3, "code": 528369},{"city": "shenzhen", "id": 4, "code": 594202},{"city": "hangzhou", "id": 5, "code": 594201}]
[{"city": "nanjing", "id": 6, "code": 2345672},{"city": "wuhan", "id": 7, "code": 2345673}, {"city": "chengdu", "id": 8, "code": 2345674},{"city": "xian", "id": 9, "code": 2345675},{"city": "hefei", "id": 10, "code": 2345676}]

View File

@ -0,0 +1,5 @@
{"no": 1, "item": {"id": 1, "city": "beijing", "code": 2345671}}
{"no": 2, "item": {"id": 2, "city": "shanghai", "code": 2345672}}
{"no": 3, "item": {"id": 3, "city": "hangzhou", "code": 2345673}}
{"no": 4, "item": {"id": 4, "city": "shenzhen", "code": 2345674}}
{"no": 5, "item": {"id": 5, "city": "guangzhou", "code": 2345675}}

View File

@ -0,0 +1,12 @@
{"id": 1, "city": "beijing", "code": 2345671}
{"id": 2, "city": "shanghai", "code": 2345672}
{"id": 3, "city": "guangzhou", "code": 2345673}
{"id": 4, "city": "shenzhen", "code": 2345674}
{"id": 5, "city": "hangzhou", "code": 2345675}
{"id": 6, "city": "nanjing", "code": 2345676}
{"id": 7, "city": "wuhan", "code": 2345677}
{"id": 8, "city": "chengdu", "code": 2345678}
{"id": 9, "city": "xian", "code": 2345679}
{"id": 10, "city": "hefei", "code": 23456710}
{"id": 10, "city": null, "code": 23456711}
{"id": 10, "city": "hefei", "code": null}

View File

@ -121,7 +121,10 @@ public class DataDescription {
private String jsonPaths = "";
private String jsonRoot = "";
private boolean fuzzyParse = false;
private boolean readJsonByLine = false;
// the default must be true.
// So that for broker load, this is always true,
// and for stream load, it will set on demand.
private boolean readJsonByLine = true;
private boolean numAsString = false;
private String sequenceCol;
@ -616,6 +619,10 @@ public class DataDescription {
return !Strings.isNullOrEmpty(srcTableName);
}
public boolean isReadJsonByLine() {
return readJsonByLine;
}
/*
* Analyze parsedExprMap and columnToHadoopFunction from columns, columns from path and columnMappingList
* Example:

View File

@ -4031,7 +4031,7 @@ public class Env {
}
}
public void renameColumn(Database db, OlapTable table, String colName,
private void renameColumn(Database db, OlapTable table, String colName,
String newColName, boolean isReplay) throws DdlException {
if (table.getState() != OlapTableState.NORMAL) {
throw new DdlException("Table[" + table.getName() + "] is under " + table.getState());

View File

@ -256,9 +256,9 @@ public class BrokerFileGroup implements Writable {
jsonPaths = dataDescription.getJsonPaths();
jsonRoot = dataDescription.getJsonRoot();
fuzzyParse = dataDescription.isFuzzyParse();
// For broker load, we only support reading json format data line by line,
// so we set readJsonByLine to true here.
readJsonByLine = true;
// ATTN: for broker load, we only support reading json format data line by line,
// so if this is set to false, it must be stream load.
readJsonByLine = dataDescription.isReadJsonByLine();
numAsString = dataDescription.isNumAsString();
}
}

View File

@ -42,7 +42,6 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.loadv2.LoadTask;
@ -174,10 +173,6 @@ public class StreamLoadPlanner {
// create scan node
if (Config.enable_new_load_scan_node && Config.enable_vectorized_load) {
ExternalFileScanNode fileScanNode = new ExternalFileScanNode(new PlanNodeId(0), scanTupleDesc);
if (!Util.isCsvFormat(taskInfo.getFormatType())) {
throw new AnalysisException(
"New stream load scan load not support non-csv type now: " + taskInfo.getFormatType());
}
// 1. create file group
DataDescription dataDescription = new DataDescription(destTable.getName(), taskInfo);
dataDescription.analyzeWithoutCheckPriv(db.getFullName());

View File

@ -78,3 +78,4 @@ pg_14_port=5442
// See `docker/thirdparties/start-thirdparties-docker.sh`
enableHiveTest=false
hms_port=9183
hdfs_port=8120

View File

@ -39,3 +39,43 @@
5 \N \N \N \N \N \N \N \N \N \N
100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000]
-- !select --
1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
2 [6, 7, 8, 9, 10] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
3 [] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['happy', 'birthday'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
4 [NULL] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
5 [NULL, NULL] [32767, 32768, NULL] [65534, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4, 5.5, 6.67]
-- !select --
1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
2 [6, 7, 8, 9, 10] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
3 [] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['happy', 'birthday'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
4 [NULL] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
5 [NULL, NULL] [32767, 32768, NULL] [65534, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000]
-- !select --
1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
2 [6, 7, 8, 9, 10] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
3 [] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['happy', 'birthday'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
4 [NULL] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
5 [NULL, NULL] [32767, 32768, NULL] [65534, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1, 1.2, 1.3]
100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4, 5.5, 6.67]
-- !select --
1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
2 [6, 7, 8, 9, 10] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
3 [] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['happy', 'birthday'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
4 [NULL] [32767, 32768, 32769] [NULL, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
5 [NULL, NULL] [32767, 32768, NULL] [65534, NULL, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000]
-- !select --
1 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01, 1992-02-02, 1993-03-03] [1991-01-01 00:00:00] [0.33, 0.67] [3.1415926, 0.878787878] [1.000000, 1.200000, 1.300000]
2 [1, 2, 3, 4, 5] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c', 'd', 'e'] ['hello', 'world'] [1991-01-01, 1992-02-02, 1993-03-03] \N \N \N [1.000000, NULL, 1.300000]
3 \N \N \N \N \N \N \N \N \N \N
4 \N \N \N \N \N \N \N \N \N \N
5 \N \N \N \N \N \N \N \N \N \N
100 [1, 2, 3] [32767, 32768, 32769] [65534, 65535, 65536] ['a', 'b', 'c'] ['hello', 'world'] [2022-07-13] [2022-07-13 12:30:00] [0.33, 0.67] [3.1415926, 0.878787878] [4.000000, 5.500000, 6.670000]

View File

@ -11,3 +11,15 @@ h h
H H
h h
-- !select --
\N \N
H H
h h
-- !select --
\N \N
H H
h h

View File

@ -9,3 +9,13 @@
1000 7395.231067
2000 \N
-- !select --
22 \N
1000 7395.231067
2000 \N
-- !select --
22 \N
1000 7395.231067
2000 \N

View File

@ -1,2 +1,5 @@
{"no": 1, "item": {"id": 1, "city": "beijing", "code": 2345671}}
{"no": 2, "item": {"id": 2, "city": "shanghai", "code": 2345672}}
{"no": 3, "item": {"id": 3, "city": "hangzhou", "code": 2345673}}
{"no": 4, "item": {"id": 4, "city": "shenzhen", "code": 2345674}}
{"no": 5, "item": {"id": 5, "city": "guangzhou", "code": 2345675}}

View File

@ -0,0 +1,74 @@
[
{
"no": 1,
"item": {
"id": 1,
"city": [
"zhejiang",
"hangzhou",
"xihu"
],
"code": 2345671
}
},
{
"no": 2,
"item": {
"id": 2,
"city": [
"zhejiang",
"hangzhou",
"xiaoshan"
],
"code": 2345672
}
},
{
"no": 3,
"item": {
"id": 3,
"city": [
"zhejiang",
"hangzhou",
"binjiang"
],
"code": 2345673
}
},
{
"no": 4,
"item": {
"id": 4,
"city": [
"zhejiang",
"hangzhou",
"shangcheng"
],
"code": 2345674
}
},
{
"no": 5,
"item": {
"id": 5,
"city": [
"zhejiang",
"hangzhou",
"tonglu"
],
"code": 2345675
}
},
{
"no": 6,
"item": {
"id": 6,
"city": [
"zhejiang",
"hangzhou",
"fuyang"
],
"code": 2345676
}
}
]

View File

@ -0,0 +1,52 @@
[
{
"code": 2345671,
"id": 1,
"city": "beijing"
},
{
"code": 2345672,
"id": 2,
"city": "shanghai"
},
{
"code": 2345673,
"id": 3,
"city": "guangzhou"
},
{
"code": 2345674,
"id": 4,
"city": "shenzhen"
},
{
"code": 2345675,
"id": 5,
"city": "hangzhou"
},
{
"code": 2345676,
"id": 6,
"city": "nanjing"
},
{
"code": 2345677,
"id": 7,
"city": "wuhan"
},
{
"code": 2345678,
"id": 8,
"city": "chengdu"
},
{
"code": 2345679,
"id": 9,
"city": "xian"
},
{
"code": 23456710,
"id": 10,
"city": "hefei"
}
]

View File

@ -0,0 +1,48 @@
[
{
"code": 2345671,
"id": 1
},
{
"code": 2345672,
"id": 2,
"city": "shanghai"
},
{
"code": 2345673,
"id": 3,
"city": "beijing"
},
{
"code": 2345674,
"id": 4,
"city": "shenzhen"
},
{
"code": 2345675,
"id": 5,
"city": "hangzhou"
},
{
"code": 2345676,
"id": 6,
"city": "nanjing"
},
{
"code": 2345677,
"id": 7
},
{
"code": 2345678,
"id": 8,
"city": "chengdu"
},
{
"code": 2345679,
"id": 9
},
{
"code": 23456710,
"id": 10
}
]

View File

@ -0,0 +1,305 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select1 --
1 beijing 2345671
2 shanghai 2345672
3 guangzhou 2345673
4 shenzhen 2345674
5 hangzhou 2345675
6 nanjing 2345676
7 wuhan 2345677
8 chengdu 2345678
9 xian 2345679
10 \N 23456711
10 hefei 23456710
200 changsha 3456789
-- !select1 --
1 beijing 2345671
2 shanghai 2345672
3 guangzhou 2345673
4 shenzhen 2345674
5 hangzhou 2345675
6 nanjing 2345676
7 wuhan 2345677
8 chengdu 2345678
9 xian 2345679
10 \N 23456711
10 hefei 23456710
200 changsha 3456789
-- !select2 --
10 beijing 2345671
20 shanghai 2345672
30 guangzhou 2345673
40 shenzhen 2345674
50 hangzhou 2345675
60 nanjing 2345676
70 wuhan 2345677
80 chengdu 2345678
90 xian 2345679
100 \N 23456711
100 hefei 23456710
200 changsha 3456789
-- !select2 --
10 beijing 2345671
20 shanghai 2345672
30 guangzhou 2345673
40 shenzhen 2345674
50 hangzhou 2345675
60 nanjing 2345676
70 wuhan 2345677
80 chengdu 2345678
90 xian 2345679
100 \N 23456711
100 hefei 23456710
200 changsha 3456789
-- !select3 --
1 2345671 \N
2 2345672 \N
3 2345673 \N
4 2345674 \N
5 2345675 \N
6 2345676 \N
7 2345677 \N
8 2345678 \N
9 2345679 \N
10 \N \N
10 23456710 \N
10 23456711 \N
200 changsha 3456789
-- !select3 --
1 2345671 \N
2 2345672 \N
3 2345673 \N
4 2345674 \N
5 2345675 \N
6 2345676 \N
7 2345677 \N
8 2345678 \N
9 2345679 \N
10 \N \N
10 23456710 \N
10 23456711 \N
200 changsha 3456789
-- !select4 --
1 \N 210
2 \N 220
3 \N 230
4 \N 240
5 \N 250
6 \N 260
7 \N 270
8 \N 280
9 \N 290
10 \N 900
200 changsha 3456789
-- !select4 --
1 \N 210
2 \N 220
3 \N 230
4 \N 240
5 \N 250
6 \N 260
7 \N 270
8 \N 280
9 \N 290
10 \N 900
200 changsha 3456789
-- !select5 --
1 beijing 1454547
2 shanghai 1244264
3 guangzhou 528369
4 shenzhen 594201
5 hangzhou 594201
6 nanjing 2345672
7 wuhan 2345673
8 chengdu 2345674
9 xian 2345675
10 hefei 2345676
200 changsha 3456789
-- !select5 --
1 beijing 1454547
2 shanghai 1244264
3 guangzhou 528369
4 shenzhen 594201
5 hangzhou 594201
6 nanjing 2345672
7 wuhan 2345673
8 chengdu 2345674
9 xian 2345675
10 hefei 2345676
200 changsha 3456789
-- !select6 --
10 1454547 \N
20 1244264 \N
30 528369 \N
40 594201 \N
50 594201 \N
60 2345672 \N
70 2345673 \N
80 2345674 \N
90 2345675 \N
100 2345676 \N
200 changsha 3456789
-- !select6 --
10 1454547 \N
20 1244264 \N
30 528369 \N
40 594201 \N
50 594201 \N
60 2345672 \N
70 2345673 \N
80 2345674 \N
90 2345675 \N
100 2345676 \N
200 changsha 3456789
-- !select7 --
60 2345672 \N
70 2345673 \N
80 2345674 \N
90 2345675 \N
100 2345676 \N
200 changsha 3456789
-- !select7 --
60 2345672 \N
70 2345673 \N
80 2345674 \N
90 2345675 \N
100 2345676 \N
200 changsha 3456789
-- !select8 --
60 nanjing \N
70 wuhan \N
80 chengdu \N
90 xian \N
100 hefei \N
200 changsha 3456789
-- !select8 --
60 nanjing \N
70 wuhan \N
80 chengdu \N
90 xian \N
100 hefei \N
200 changsha 3456789
-- !select9 --
10 beijing 2345671
20 shanghai 2345672
30 hangzhou 2345673
40 shenzhen 2345674
50 guangzhou 2345675
200 changsha 3456789
-- !select9 --
10 beijing 2345671
20 shanghai 2345672
30 hangzhou 2345673
40 shenzhen 2345674
50 guangzhou 2345675
200 changsha 3456789
-- !select10 --
1 beijing 1454547
2 shanghai 1244264
3 guangzhou 528369
4 shenzhen 594202
5 hangzhou 594201
6 nanjing 2345672
7 wuhan 2345673
8 chengdu 2345674
9 xian 2345675
10 hefei 2345676
200 changsha 3456789
-- !select10 --
1 beijing 1454547
2 shanghai 1244264
3 guangzhou 528369
4 shenzhen 594202
5 hangzhou 594201
6 nanjing 2345672
7 wuhan 2345673
8 chengdu 2345674
9 xian 2345675
10 hefei 2345676
200 changsha 3456789
-- !select11 --
1 \N 1454547
2 shanghai 1244264
3 \N 528369
4 \N 594202
5 hangzhou 594201
6 nanjing 2345672
7 \N 2345673
8 \N 2345674
9 xian 2345675
10 \N 2345676
200 changsha 3456789
-- !select11 --
1 \N 1454547
2 shanghai 1244264
3 \N 528369
4 \N 594202
5 hangzhou 594201
6 nanjing 2345672
7 \N 2345673
8 \N 2345674
9 xian 2345675
10 \N 2345676
200 changsha 3456789
-- !select12 --
10 beijing \N
20 shanghai \N
30 hangzhou \N
40 shenzhen \N
50 guangzhou \N
200 changsha 3456789
-- !select12 --
10 beijing \N
20 shanghai \N
30 hangzhou \N
40 shenzhen \N
50 guangzhou \N
200 changsha 3456789
-- !select13 --
30 hangzhou \N
40 shenzhen \N
50 guangzhou \N
200 changsha 3456789
-- !select13 --
30 hangzhou \N
40 shenzhen \N
50 guangzhou \N
200 changsha 3456789
-- !select14 --
30 hangzhou 2345673
40 shenzhen 2345674
50 guangzhou 2345675
200 changsha 3456789
-- !select14 --
30 hangzhou 2345673
40 shenzhen 2345674
50 guangzhou 2345675
200 changsha 3456789

View File

@ -1,5 +1,5 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --
-- !select1 --
1 beijing 2345671
2 shanghai 2345672
3 guangzhou 2345673
@ -12,7 +12,20 @@
10 hefei 23456710
200 changsha 3456789
-- !select --
-- !select1 --
1 beijing 2345671
2 shanghai 2345672
3 guangzhou 2345673
4 shenzhen 2345674
5 hangzhou 2345675
6 nanjing 2345676
7 wuhan 2345677
8 chengdu 2345678
9 xian 2345679
10 hefei 23456710
200 changsha 3456789
-- !select2 --
10 beijing 2345671
20 shanghai 2345672
30 guangzhou 2345673
@ -25,7 +38,20 @@
100 hefei 23456710
200 changsha 3456789
-- !select --
-- !select2 --
10 beijing 2345671
20 shanghai 2345672
30 guangzhou 2345673
40 shenzhen 2345674
50 hangzhou 2345675
60 nanjing 2345676
70 wuhan 2345677
80 chengdu 2345678
90 xian 2345679
100 hefei 23456710
200 changsha 3456789
-- !select3 --
1 2345671
2 2345672
3 2345673
@ -38,7 +64,20 @@
10 23456710
200 755
-- !select --
-- !select3 --
1 2345671
2 2345672
3 2345673
4 2345674
5 2345675
6 2345676
7 2345677
8 2345678
9 2345679
10 23456710
200 755
-- !select4 --
1 210
2 220
3 230
@ -51,7 +90,20 @@
10 300
200 755
-- !select --
-- !select4 --
1 210
2 220
3 230
4 240
5 250
6 260
7 270
8 280
9 290
10 300
200 755
-- !select5 --
1 1454547
2 1244264
3 528369
@ -64,7 +116,20 @@
10 2345676
200 755
-- !select --
-- !select5 --
1 1454547
2 1244264
3 528369
4 594201
5 594201
6 2345672
7 2345673
8 2345674
9 2345675
10 2345676
200 755
-- !select6 --
10 1454547
20 1244264
30 528369
@ -77,7 +142,12 @@
100 2345676
200 755
-- !select --
-- !select6 --
10 1454547
20 1244264
30 528369
40 594201
50 594201
60 2345672
70 2345673
80 2345674
@ -85,7 +155,7 @@
100 2345676
200 755
-- !select --
-- !select7 --
60 2345672
70 2345673
80 2345674
@ -93,15 +163,54 @@
100 2345676
200 755
-- !select --
-- !select7 --
60 2345672
70 2345673
80 2345674
90 2345675
100 2345676
200 755
-- !select8 --
60 2345672
70 2345673
80 2345674
90 2345675
100 2345676
200 755
-- !select8 --
60 2345672
70 2345673
80 2345674
90 2345675
100 2345676
200 755
-- !select9 --
10 beijing 2345671
20 shanghai 2345672
30 hangzhou 2345673
40 shenzhen 2345674
50 guangzhou 2345675
200 changsha 3456789
-- !select --
-- !select9 --
10 beijing 2345671
20 shanghai 2345672
30 hangzhou 2345673
40 shenzhen 2345674
50 guangzhou 2345675
200 changsha 3456789
-- !select --
-- !select10 --
200 changsha 3456789
-- !select10 --
200 changsha 3456789
-- !select11 --
1 beijing 2345671
2 shanghai 2345672
3 guangzhou 2345673
4 shenzhen 2345674
@ -110,12 +219,113 @@
7 wuhan 2345677
8 chengdu 2345678
9 xian 2345679
10 hefei 23456710
200 changsha 3456789
-- !select --
-- !select11 --
1 beijing 2345671
2 shanghai 2345672
3 guangzhou 2345673
4 shenzhen 2345674
5 hangzhou 2345675
6 nanjing 2345676
7 wuhan 2345677
8 chengdu 2345678
9 xian 2345679
10 hefei 23456710
200 changsha 3456789
-- !select12 --
1 \N 2345671
2 shanghai 2345672
3 beijing 2345673
4 shenzhen 2345674
5 hangzhou 2345675
6 nanjing 2345676
7 \N 2345677
8 chengdu 2345678
9 \N 2345679
10 \N 23456710
200 changsha 3456789
-- !select12 --
1 \N 2345671
2 shanghai 2345672
3 beijing 2345673
4 shenzhen 2345674
5 hangzhou 2345675
6 nanjing 2345676
7 \N 2345677
8 chengdu 2345678
9 \N 2345679
10 \N 23456710
200 changsha 3456789
-- !select13 --
2 shanghai 2345672
3 beijing 2345673
4 shenzhen 2345674
5 hangzhou 2345675
6 nanjing 2345676
8 chengdu 2345678
200 hangzhou 12345
-- !select13 --
2 shanghai 2345672
3 beijing 2345673
4 shenzhen 2345674
5 hangzhou 2345675
6 nanjing 2345676
8 chengdu 2345678
200 hangzhou 12345
-- !select14 --
10 2345671 \N
20 2345672 \N
30 2345673 \N
40 2345674 \N
50 2345675 \N
200 changsha 3456789
-- !select14 --
10 2345671 \N
20 2345672 \N
30 2345673 \N
40 2345674 \N
50 2345675 \N
200 changsha 3456789
-- !select15 --
10 beijing 2345671
20 shanghai 2345672
30 hangzhou 2345673
40 shenzhen 2345674
50 guangzhou 2345675
200 changsha 3456789
-- !select15 --
10 beijing 2345671
20 shanghai 2345672
30 hangzhou 2345673
40 shenzhen 2345674
50 guangzhou 2345675
200 changsha 3456789
-- !select16 --
1 xihu 2345671
2 xiaoshan 2345672
3 binjiang 2345673
4 shangcheng 2345674
5 tonglu 2345675
6 fuyang 2345676
200 changsha 3456789
-- !select16 --
1 xihu 2345671
2 xiaoshan 2345672
3 binjiang 2345673
4 shangcheng 2345674
5 tonglu 2345675
6 fuyang 2345676
200 changsha 3456789
-- !select --

View File

@ -194,85 +194,95 @@ suite("test_array_load", "p0") {
}
}
// case1: import array data in json format and enable vectorized engine
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call(testTable, true)
load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'simple_array.json')
// select the table and check whether the data is correct
def check_data_correct = {table_name ->
sql "sync"
qt_select "select * from ${testTable} order by k1"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
// select the table and check whether the data is correct
qt_select "select * from ${table_name} order by k1"
}
// case2: import array data in json format and disable vectorized engine
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call(testTable, false)
for ( i in 0..1 ) {
// should be deleted after new_load_scan is ready
if (i == 1) {
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
} else {
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "true");"""
}
load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'simple_array.json')
// select the table and check whether the data is correct
sql "sync"
qt_select "select * from ${testTable} order by k1"
// case1: import array data in json format and enable vectorized engine
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call(testTable, true)
load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'simple_array.json')
check_data_correct(testTable)
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case2: import array data in json format and disable vectorized engine
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call(testTable, false)
load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'simple_array.json')
check_data_correct(testTable)
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case3: import array data in csv format and enable vectorized engine
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call(testTable, true)
load_array_data.call(testTable, 'true', '', 'csv', '', '', '', '', '', '/', 'simple_array.csv')
check_data_correct(testTable)
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case4: import array data in csv format and disable vectorized engine
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call(testTable, false)
load_array_data.call(testTable, 'true', '', 'csv', '', '', '', '', '', '/', 'simple_array.csv')
check_data_correct(testTable)
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case5: import array data not specify the format
try {
sql "DROP TABLE IF EXISTS ${testTable01}"
create_test_table01.call(testTable01)
load_array_data.call(testTable01, '', '', '', '', '', '', '', '', '/', 'simple_array.data')
check_data_correct(testTable01)
} finally {
// try_sql("DROP TABLE IF EXISTS ${testTable01}")
}
}
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case3: import array data in csv format and enable vectorized engine
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call(testTable, true)
load_array_data.call(testTable, 'true', '', 'csv', '', '', '', '', '', '/', 'simple_array.csv')
// select the table and check whether the data is correct
sql "sync"
qt_select "select * from ${testTable} order by k1"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
try_sql("""ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");""")
}
// case4: import array data in csv format and disable vectorized engine
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call(testTable, false)
load_array_data.call(testTable, 'true', '', 'csv', '', '', '', '', '', '/', 'simple_array.csv')
// select the table and check whether the data is correct
sql "sync"
qt_select "select * from ${testTable} order by k1"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case5: import array data not specify the format
try {
sql "DROP TABLE IF EXISTS ${testTable01}"
create_test_table01.call(testTable01)
load_array_data.call(testTable01, '', '', '', '', '', '', '', '', '/', 'simple_array.data')
// select the table and check whether the data is correct
sql "sync"
qt_select "select * from ${testTable01} order by k1"
} finally {
// try_sql("DROP TABLE IF EXISTS ${testTable01}")
}
// if 'enableHdfs' in regression-conf.groovy has been set to true,
// the test will run these case as below.

View File

@ -49,8 +49,11 @@ suite("test_load_json_column_exclude_schema_without_jsonpath", "p0") {
"""
}
def load_array_data = {table_name, strip_flag, read_flag, format_flag, exprs, json_paths,
def load_array_data = {new_json_reader_flag, table_name, strip_flag, read_flag, format_flag, exprs, json_paths,
json_root, where_expr, fuzzy_flag, column_sep, file_name ->
// should be deleted after new_load_scan is ready
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");"""
// load the json data
streamLoad {
table table_name
@ -91,7 +94,12 @@ suite("test_load_json_column_exclude_schema_without_jsonpath", "p0") {
create_test_table.call(true)
load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'json_column_match.json')
load_array_data.call('false', testTable, 'true', '', 'json', '', '', '', '', '', '', 'json_column_match.json')
// test new json load, should be deleted after new_load_scan ready
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call(true)
load_array_data.call('true', testTable, 'true', '', 'json', '', '', '', '', '', '', 'json_column_match.json')
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@ -103,7 +111,12 @@ suite("test_load_json_column_exclude_schema_without_jsonpath", "p0") {
create_test_table.call(false)
load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'json_column_match.json')
load_array_data.call('false', testTable, 'true', '', 'json', '', '', '', '', '', '', 'json_column_match.json')
// test new json load, should be deleted after new_load_scan ready
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call(false)
load_array_data.call('true', testTable, 'true', '', 'json', '', '', '', '', '', '', 'json_column_match.json')
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")

View File

@ -40,8 +40,11 @@ suite("test_load_json_null_to_nullable", "p0") {
"""
}
def load_array_data = {table_name, strip_flag, read_flag, format_flag, exprs, json_paths,
def load_array_data = {new_json_reader_flag, table_name, strip_flag, read_flag, format_flag, exprs, json_paths,
json_root, where_expr, fuzzy_flag, column_sep, file_name ->
// should be deleted after new_load_scan is ready
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");"""
// load the json data
streamLoad {
table table_name
@ -74,6 +77,15 @@ suite("test_load_json_null_to_nullable", "p0") {
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
// should be deleted after new_load_scan is ready
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
}
def check_data_correct = {table_name ->
sql "sync"
// select the table and check whether the data is correct
qt_select "select * from ${table_name} order by k1"
}
// case1: import array data in json format and enable vectorized engine
@ -82,11 +94,15 @@ suite("test_load_json_null_to_nullable", "p0") {
create_test_table.call(true)
load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'test_char.json')
load_array_data.call('false', testTable, 'true', '', 'json', '', '', '', '', '', '', 'test_char.json')
sql "sync"
// select the table and check whether the data is correct
qt_select "select * from ${testTable} order by k1"
check_data_correct(testTable)
// test new json load, should be deleted after new_load_scan ready
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call(true)
load_array_data.call('true', testTable, 'true', '', 'json', '', '', '', '', '', '', 'test_char.json')
check_data_correct(testTable)
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@ -98,11 +114,15 @@ suite("test_load_json_null_to_nullable", "p0") {
create_test_table.call(false)
load_array_data.call(testTable, 'true', '', 'json', '', '', '', '', '', '', 'test_char.json')
load_array_data.call('false', testTable, 'true', '', 'json', '', '', '', '', '', '', 'test_char.json')
sql "sync"
// select the table and check whether the data is correct
qt_select "select * from ${testTable} order by k1"
check_data_correct(testTable)
// test new json load, should be deleted after new_load_scan ready
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call(false)
load_array_data.call('true', testTable, 'true', '', 'json', '', '', '', '', '', '', 'test_char.json')
check_data_correct(testTable)
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")

View File

@ -40,8 +40,12 @@ suite("test_load_json_with_jsonpath", "p0") {
"""
}
def load_array_data = {table_name, strip_flag, read_flag, format_flag, exprs, json_paths,
def load_array_data = {new_json_reader_flag, table_name, strip_flag, read_flag, format_flag, exprs, json_paths,
json_root, where_expr, fuzzy_flag, column_sep, file_name ->
// should be deleted after new_load_scan is ready
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");"""
// load the json data
streamLoad {
table table_name
@ -74,6 +78,15 @@ suite("test_load_json_with_jsonpath", "p0") {
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
// should be deleted after new_load_scan is ready
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
}
def check_data_correct = {table_name ->
sql "sync"
// select the table and check whether the data is correct
qt_select "select * from ${table_name} order by k1"
}
// case1: import array data in json format and enable vectorized engine
@ -82,11 +95,15 @@ suite("test_load_json_with_jsonpath", "p0") {
create_test_table.call(true)
load_array_data.call(testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
load_array_data.call('false', testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
// select the table and check whether the data is correct
sql "sync"
qt_select "select * from ${testTable} order by k1"
check_data_correct(testTable)
// test new json load, should be deleted after new_load_scan ready
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call(true)
load_array_data.call('true', testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
check_data_correct(testTable)
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@ -98,12 +115,19 @@ suite("test_load_json_with_jsonpath", "p0") {
create_test_table.call(false)
load_array_data.call(testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
load_array_data.call('false', testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
sql "sync"
// select the table and check whether the data is correct
qt_select "select * from ${testTable} order by k1"
// test new json load, should be deleted after new_load_scan ready
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table.call(false)
load_array_data.call('true', testTable, 'true', '', 'json', '', '["$.k1", "$.v1"]', '', '', '', '', 'test_load_with_jsonpath.json')
check_data_correct(testTable)
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}

View File

@ -0,0 +1,554 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_hdfs_json_load", "p0") {
// define a sql table
def testTable = "test_hdfs_json_load"
def create_test_table1 = {testTablex ->
// multi-line sql
def result1 = sql """
CREATE TABLE IF NOT EXISTS ${testTablex} (
id INT DEFAULT '10',
city VARCHAR(32) DEFAULT '',
code BIGINT SUM DEFAULT '0')
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES("replication_num" = "1");
"""
// DDL/DML return 1 row and 3 column, the only value is update row count
assertTrue(result1.size() == 1)
assertTrue(result1[0].size() == 1)
assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
// insert 1 row to check whether the table is ok
def result2 = sql "INSERT INTO ${testTablex} (id, city, code) VALUES (200, 'changsha', 3456789)"
assertTrue(result2.size() == 1)
assertTrue(result2[0].size() == 1)
assertTrue(result2[0][0] == 1, "Insert should update 1 rows")
}
def load_from_hdfs1 = {new_json_reader_flag, strip_flag, fuzzy_flag, testTablex, label, fileName,
fsPath, hdfsUser, exprs, jsonpaths, json_root, columns_parameter, where ->
// should be delete after new_load_scan is ready
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");"""
def hdfsFilePath = "${fsPath}/user/doris/json_format_test/${fileName}"
def result1= sql """
LOAD LABEL ${label} (
DATA INFILE("${hdfsFilePath}")
INTO TABLE ${testTablex}
FORMAT as "json"
${columns_parameter}
${exprs}
${where}
properties(
"json_root" = "${json_root}",
"jsonpaths" = "${jsonpaths}",
"strip_outer_array" = "${strip_flag}",
"fuzzy_parse" = "${fuzzy_flag}"
)
)
with HDFS (
"fs.defaultFS"="${fsPath}",
"hadoop.username" = "${hdfsUser}"
)
PROPERTIES (
"timeout"="1200",
"max_filter_ratio"="0"
);
"""
assertTrue(result1.size() == 1)
assertTrue(result1[0].size() == 1)
assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected")
// should be delete after new_load_scan is ready
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
}
def check_load_result = {checklabel, testTablex ->
max_try_milli_secs = 10000
while(max_try_milli_secs) {
result = sql "show load where label = '${checklabel}'"
if(result[0][2] == "FINISHED") {
log.info("LOAD FINISHED!")
break
} else {
sleep(1000) // wait 1 second every time
max_try_milli_secs -= 1000
if(max_try_milli_secs <= 0) {
log.info("Broker load result: ${result}".toString())
assertEquals(1, 2)
}
}
}
}
String hdfs_port = context.config.otherConfigs.get("hdfs_port")
def fsPath = "hdfs://127.0.0.1:${hdfs_port}"
// It's okay to use random `hdfsUser`, but can not be empty.
def hdfsUser = "doris"
// case1: import simple json
def q1 = {
try {
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("false", "false", "false", testTable, test_load_label1, "simple_object_json.json",
fsPath, hdfsUser, '', '', '', '', '')
check_load_result(test_load_label1, testTable)
sql "sync"
qt_select1 "select * from ${testTable} order by id"
// test new json reader
def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("true", "false", "false", testTable, test_load_label2, "simple_object_json.json",
fsPath, hdfsUser, '', '', '', '', '')
check_load_result(test_load_label2, testTable)
sql "sync"
qt_select1 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
// case2: import json and apply exprs
def q2 = {
try {
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("false", "false", "false", testTable, test_load_label1, "simple_object_json.json",
fsPath, hdfsUser, "SET(id= id * 10)", '', '', '', '')
check_load_result(test_load_label1, testTable)
sql "sync"
qt_select2 "select * from ${testTable} order by id"
// test new json reader
def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("true", "false", "false", testTable, test_load_label2, "simple_object_json.json",
fsPath, hdfsUser, "SET(id= id * 10)", '', '', '', '')
check_load_result(test_load_label2, testTable)
sql "sync"
qt_select2 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
// case3: import json and apply jsonpaths
def q3 = {
try {
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("false", "false", "false", testTable, test_load_label1, "simple_object_json.json",
fsPath, hdfsUser, '', """[\\"\$.id\\", \\"\$.code\\"]""", '', '', '')
check_load_result(test_load_label1, testTable)
sql "sync"
qt_select3 "select * from ${testTable} order by id"
// test new json reader
def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("true", "false", "false", testTable, test_load_label2, "simple_object_json.json",
fsPath, hdfsUser, '', """[\\"\$.id\\", \\"\$.code\\"]""", '', '', '')
check_load_result(test_load_label2, testTable)
sql "sync"
qt_select3 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
// case4: import json and apply jsonpaths & exprs
def q4 = {
try {
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("false", "false", "false", testTable, test_load_label1, "simple_object_json.json",
fsPath, hdfsUser, "SET(code = id * 10 + 200)", """[\\"\$.id\\"]""", '', '', '')
check_load_result(test_load_label1, testTable)
sql "sync"
qt_select4 "select * from ${testTable} order by id"
// test new json reader
def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("true", "false", "false", testTable, test_load_label2, "simple_object_json.json",
fsPath, hdfsUser, "SET(code = id * 10 + 200)", """[\\"\$.id\\"]""", '', '', '')
check_load_result(test_load_label2, testTable)
sql "sync"
qt_select4 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
// case5: import json with line reader
def q5 = {
try {
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("false", "true", "false", testTable, test_load_label1, "multi_line_json.json",
fsPath, hdfsUser, '', '', '', '', '')
check_load_result(test_load_label1, testTable)
sql "sync"
qt_select5 "select * from ${testTable} order by id"
// test new json reader
def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("true", "true", "false", testTable, test_load_label2, "multi_line_json.json",
fsPath, hdfsUser, '', '', '', '', '')
check_load_result(test_load_label2, testTable)
sql "sync"
qt_select5 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
// case6: import json use exprs and jsonpaths
def q6 = {
try {
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("false", "true", "false", testTable, test_load_label1, "multi_line_json.json",
fsPath, hdfsUser, "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.code\\"]""", '', '', '')
check_load_result(test_load_label1, testTable)
sql "sync"
qt_select6 "select * from ${testTable} order by id"
// test new json reader
def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("true", "true", "false", testTable, test_load_label2, "multi_line_json.json",
fsPath, hdfsUser, "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.code\\"]""", '', '', '')
check_load_result(test_load_label2, testTable)
sql "sync"
qt_select6 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
// case7: import json use where
def q7 = {
try {
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("false", "true", "false", testTable, test_load_label1, "multi_line_json.json",
fsPath, hdfsUser, "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.code\\"]""", '', '', 'WHERE id>50')
check_load_result(test_load_label1, testTable)
sql "sync"
qt_select7 "select * from ${testTable} order by id"
// test new json reader
def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("true", "true", "false", testTable, test_load_label2, "multi_line_json.json",
fsPath, hdfsUser, "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.code\\"]""", '', '', 'WHERE id>50')
check_load_result(test_load_label2, testTable)
sql "sync"
qt_select7 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
// case8: import json use fuzzy_parse
def q8 = {
try {
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("false", "true", "true", testTable, test_load_label1, "multi_line_json.json",
fsPath, hdfsUser, "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.city\\"]""", '', '', 'WHERE id>50')
check_load_result(test_load_label1, testTable)
sql "sync"
qt_select8 "select * from ${testTable} order by id"
// test new json reader
def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("true", "true", "true", testTable, test_load_label2, "multi_line_json.json",
fsPath, hdfsUser, "SET(id = id * 10)", """[\\"\$.id\\", \\"\$.city\\"]""", '', '', 'WHERE id>50')
check_load_result(test_load_label2, testTable)
sql "sync"
qt_select8 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
// case9: import json use json_root
def q9 = {
try {
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("false", "false", "true", testTable, test_load_label1, "nest_json.json",
fsPath, hdfsUser, "SET(id = id * 10)", '', '$.item', '', '')
check_load_result(test_load_label1, testTable)
sql "sync"
qt_select9 "select * from ${testTable} order by id"
// test new json reader
def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("true", "false", "true", testTable, test_load_label2, "nest_json.json",
fsPath, hdfsUser, "SET(id = id * 10)", '', '$.item', '', '')
check_load_result(test_load_label2, testTable)
sql "sync"
qt_select9 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
// case10: test json file which is unordered and no use json_path
def q10 = {
try {
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("false", "true", "false", testTable, test_load_label1, "multi_line_json_unorder.json",
fsPath, hdfsUser, '', '', '', '', '')
check_load_result(test_load_label1, testTable)
sql "sync"
qt_select10 "select * from ${testTable} order by id"
// test new json reader
def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("true", "true", "false", testTable, test_load_label2, "multi_line_json_unorder.json",
fsPath, hdfsUser, '', '', '', '', '')
check_load_result(test_load_label2, testTable)
sql "sync"
qt_select10 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
// case11: test json file which is unordered and lack one column which is nullable
def q11 = {
try {
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("false", "true", "false", testTable, test_load_label1, "multi_line_json_lack_column.json",
fsPath, hdfsUser, '', '', '', '', '')
check_load_result(test_load_label1, testTable)
sql "sync"
qt_select11 "select * from ${testTable} order by id"
// test new json reader
def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("true", "true", "false", testTable, test_load_label2, "multi_line_json_lack_column.json",
fsPath, hdfsUser, '', '', '', '', '')
check_load_result(test_load_label2, testTable)
sql "sync"
qt_select11 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
// case12: use json_path and json_root
def q12 = {
try {
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("false", "false", "false", testTable, test_load_label1, "nest_json.json", fsPath, hdfsUser,
"SET(id = id * 10)", """[\\"\$.id\\", \\"\$.city\\"]""", '$.item', '', '')
check_load_result(test_load_label1, testTable)
sql "sync"
qt_select12 "select * from ${testTable} order by id"
// test new json reader
def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("true", "false", "false", testTable, test_load_label2, "nest_json.json", fsPath, hdfsUser,
"SET(id = id * 10)", """[\\"\$.id\\", \\"\$.city\\"]""", '$.item', '', '')
check_load_result(test_load_label2, testTable)
sql "sync"
qt_select12 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
// case13: use json_path & json_root & where
def q13 = {
try {
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("false", "false", "false", testTable, test_load_label1, "nest_json.json", fsPath, hdfsUser,
"SET(id = id * 10)", """[\\"\$.id\\", \\"\$.city\\"]""", '$.item', '', 'WHERE id>20')
check_load_result(test_load_label1, testTable)
sql "sync"
qt_select13 "select * from ${testTable} order by id"
// test new json reader
def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("true", "false", "false", testTable, test_load_label2, "nest_json.json", fsPath, hdfsUser,
"SET(id = id * 10)", """[\\"\$.id\\", \\"\$.city\\"]""", '$.item', '', 'WHERE id>20')
check_load_result(test_load_label2, testTable)
sql "sync"
qt_select13 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
// case14: use jsonpaths & json_root & where & columns
def q14 = {
try {
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("false", "false", "false", testTable, test_load_label1, "nest_json.json", fsPath, hdfsUser,
"SET(id = id * 10)", """[\\"\$.id\\", \\"\$.code\\",\\"\$.city\\"]""", '$.item',
'(id, code, city)', 'WHERE id>20')
check_load_result(test_load_label1, testTable)
sql "sync"
qt_select14 "select * from ${testTable} order by id"
// test new json reader
def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "")
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_from_hdfs1.call("true", "false", "false", testTable, test_load_label2, "nest_json.json", fsPath, hdfsUser,
"SET(id = id * 10)", """[\\"\$.id\\", \\"\$.code\\", \\"\$.city\\"]""", '$.item',
'(id, code, city)', 'WHERE id>20')
check_load_result(test_load_label2, testTable)
sql "sync"
qt_select14 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
}
String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
log.info("Begin Test q1:")
q1()
log.info("Begin Test q2:")
q2()
log.info("Begin Test q3:")
q3()
log.info("Begin Test q4:")
q4()
log.info("Begin Test q5:")
q5()
log.info("Begin Test q6:")
q6()
log.info("Begin Test q7:")
q7()
log.info("Begin Test q8:")
q8()
log.info("Begin Test q9:")
q9()
log.info("Begin Test q10:")
q10()
log.info("Begin Test q11:")
q11()
log.info("Begin Test q12:")
q12()
log.info("Begin Test q13:")
q13()
log.info("Begin Test q14:")
q14()
}
}

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
suite("test_json_load", "p0") {
suite("test_json_load", "p0") {
// define a sql table
def testTable = "test_json_load"
@ -64,6 +64,30 @@ suite("test_json_load", "p0") {
assertTrue(result2[0][0] == 1, "Insert should update 1 rows")
}
// city is NOT NULL
def create_test_table3 = {testTablex ->
// multi-line sql
def result1 = sql """
CREATE TABLE IF NOT EXISTS ${testTablex} (
id INT DEFAULT '10',
city VARCHAR(32) NOT NULL,
code BIGINT SUM DEFAULT '0')
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES("replication_num" = "1");
"""
// DDL/DML return 1 row and 3 column, the only value is update row count
assertTrue(result1.size() == 1)
assertTrue(result1[0].size() == 1)
assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
// insert 1 row to check whether the table is ok
def result2 = sql "INSERT INTO ${testTablex} (id, city, code) VALUES (200, 'hangzhou', 12345)"
assertTrue(result2.size() == 1)
assertTrue(result2[0].size() == 1)
assertTrue(result2[0][0] == 1, "Insert should update 1 rows")
}
def test_invalid_json_array_table = { testTablex ->
// multi-line sql
def result1 = sql """
@ -90,8 +114,11 @@ suite("test_json_load", "p0") {
assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
}
def load_json_data = {label, strip_flag, read_flag, format_flag, exprs, json_paths,
json_root, where_expr, fuzzy_flag, file_name, ignore_failure=false ->
def load_json_data = {new_json_reader_flag, label, strip_flag, read_flag, format_flag, exprs, json_paths,
json_root, where_expr, fuzzy_flag, file_name, ignore_failure=false ->
// should be delete after new_load_scan is ready
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");"""
// load the json data
streamLoad {
table "test_json_load"
@ -123,6 +150,9 @@ suite("test_json_load", "p0") {
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
// should be deleted after new_load_scan is ready
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
}
def load_from_hdfs1 = {testTablex, label, hdfsFilePath, format, brokerName, hdfsUser, hdfsPasswd ->
@ -182,10 +212,20 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
load_json_data.call('test_json_load_case1', 'true', '', 'json', '', '', '', '', '', 'simple_json.json')
load_json_data.call('false', 'test_json_load_case1', 'true', '', 'json', '', '', '', '', '', 'simple_json.json')
sql "sync"
qt_select "select * from ${testTable} order by id"
qt_select1 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('true', 'test_json_load_case1_2', 'true', '', 'json', '', '', '', '', '', 'simple_json.json')
sql "sync"
qt_select1 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@ -197,10 +237,20 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
load_json_data.call('test_json_load_case2', 'true', '', 'json', 'id= id * 10', '', '', '', '', 'simple_json.json')
load_json_data.call('false', 'test_json_load_case2', 'true', '', 'json', 'id= id * 10', '', '', '', '', 'simple_json.json')
sql "sync"
qt_select "select * from ${testTable} order by id"
qt_select2 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('true', 'test_json_load_case2_2', 'true', '', 'json', 'id= id * 10', '', '', '', '', 'simple_json.json')
sql "sync"
qt_select2 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@ -212,11 +262,22 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
load_json_data.call('test_json_load_case3', 'true', '', 'json', '', '[\"$.id\", \"$.code\"]',
load_json_data.call('false', 'test_json_load_case3', 'true', '', 'json', '', '[\"$.id\", \"$.code\"]',
'', '', '', 'simple_json.json')
sql "sync"
qt_select "select * from ${testTable} order by id"
qt_select3 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table2.call(testTable)
load_json_data.call('true', 'test_json_load_case3_2', 'true', '', 'json', '', '[\"$.id\", \"$.code\"]',
'', '', '', 'simple_json.json')
sql "sync"
qt_select3 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@ -228,11 +289,22 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
load_json_data.call('test_json_load_case4', 'true', '', 'json', 'code = id * 10 + 200', '[\"$.id\"]',
load_json_data.call('false', 'test_json_load_case4', 'true', '', 'json', 'code = id * 10 + 200', '[\"$.id\"]',
'', '', '', 'simple_json.json')
sql "sync"
qt_select "select * from ${testTable} order by id"
qt_select4 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table2.call(testTable)
load_json_data.call('true', 'test_json_load_case4_2', 'true', '', 'json', 'code = id * 10 + 200', '[\"$.id\"]',
'', '', '', 'simple_json.json')
sql "sync"
qt_select4 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@ -244,11 +316,22 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
load_json_data.call('test_json_load_case5', 'true', 'true', 'json', '', '[\"$.id\", \"$.code\"]',
load_json_data.call('false', 'test_json_load_case5', 'true', 'true', 'json', '', '[\"$.id\", \"$.code\"]',
'', '', '', 'multi_line_json.json')
sql "sync"
qt_select "select * from ${testTable} order by id"
qt_select5 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table2.call(testTable)
load_json_data.call('true', 'test_json_load_case5_2', 'true', 'true', 'json', '', '[\"$.id\", \"$.code\"]',
'', '', '', 'multi_line_json.json')
sql "sync"
qt_select5 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@ -260,11 +343,23 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
load_json_data.call('test_json_load_case6', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
load_json_data.call('false', 'test_json_load_case6', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
'', '', '', 'multi_line_json.json')
sql "sync"
qt_select "select * from ${testTable} order by id"
qt_select6 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table2.call(testTable)
load_json_data.call('true', 'test_json_load_case6_2', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
'', '', '', 'multi_line_json.json')
sql "sync"
qt_select6 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@ -276,11 +371,22 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
load_json_data.call('test_json_load_case7', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
load_json_data.call('false', 'test_json_load_case7', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
'', 'id > 50', '', 'multi_line_json.json')
sql "sync"
qt_select "select * from ${testTable} order by id"
qt_select7 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table2.call(testTable)
load_json_data.call('true', 'test_json_load_case7_2', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
'', 'id > 50', '', 'multi_line_json.json')
sql "sync"
qt_select7 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@ -292,11 +398,23 @@ suite("test_json_load", "p0") {
create_test_table2.call(testTable)
load_json_data.call('test_json_load_case8', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
load_json_data.call('false', 'test_json_load_case8', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
'', 'id > 50', 'true', 'multi_line_json.json')
sql "sync"
qt_select "select * from ${testTable} order by id"
qt_select8 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table2.call(testTable)
load_json_data.call('true', 'test_json_load_case8_2', 'true', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
'', 'id > 50', 'true', 'multi_line_json.json')
sql "sync"
qt_select8 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@ -308,26 +426,258 @@ suite("test_json_load", "p0") {
create_test_table1.call(testTable)
load_json_data.call('test_json_load_case9', '', 'true', 'json', 'id= id * 10', '',
load_json_data.call('false', 'test_json_load_case9', '', 'true', 'json', 'id= id * 10', '',
'$.item', '', 'true', 'nest_json.json')
sql "sync"
qt_select "select * from ${testTable} order by id"
qt_select9 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('true', 'test_json_load_case9_2', '', 'true', 'json', 'id= id * 10', '',
'$.item', '', 'true', 'nest_json.json')
sql "sync"
qt_select9 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case10: invalid json
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('test_json_load_case10', '', 'true', 'json', 'id= id * 10', '',
load_json_data.call('false', 'test_json_load_case10', '', 'true', 'json', 'id= id * 10', '',
'$.item', '', 'true', 'invalid_json.json', true)
sql "sync"
qt_select "select * from ${testTable} order by id"
qt_select10 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('true', 'test_json_load_case10_2', '', 'true', 'json', 'id= id * 10', '',
'$.item', '', 'true', 'invalid_json.json', true)
sql "sync"
qt_select10 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case11: test json file which is unordered and no use json_path
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('false', 'test_json_load_case11', 'true', '', 'json', '', '', '', '', '', 'simple_json2.json')
sql "sync"
qt_select11 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('true', 'test_json_load_case11_2', 'true', '', 'json', '', '', '', '', '', 'simple_json2.json')
sql "sync"
qt_select11 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case12: test json file which is unordered and lack one column which is nullable
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('false', 'test_json_load_case12', 'true', '', 'json', '', '', '', '', '', 'simple_json2_lack_one_column.json')
sql "sync"
qt_select12 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('true', 'test_json_load_case12_2', 'true', '', 'json', '', '', '', '', '', 'simple_json2_lack_one_column.json')
sql "sync"
qt_select12 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case13: test json file which is unordered and lack one column which is not nullable
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table3.call(testTable)
// should be delete after new_load_scan is ready
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
// load the json data
streamLoad {
table "${testTable}"
// set http request header params
set 'strip_outer_array', "true"
set 'format', "json"
set 'max_filter_ratio', '1'
file "simple_json2_lack_one_column.json" // import json file
time 10000 // limit inflight 10s
// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows + json.NumberFilteredRows)
assertEquals(json.NumberFilteredRows, 4)
assertEquals(json.NumberLoadedRows, 6)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
// should be deleted after new_load_scan is ready
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
sql "sync"
qt_select13 "select * from ${testTable} order by id"
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table3.call(testTable)
// should be delete after new_load_scan is ready
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "true");"""
// load the json data
streamLoad {
table "${testTable}"
// set http request header params
set 'strip_outer_array', "true"
set 'format', "json"
set 'max_filter_ratio', '1'
file "simple_json2_lack_one_column.json" // import json file
time 10000 // limit inflight 10s
// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows + json.NumberFilteredRows)
assertEquals(json.NumberFilteredRows, 4)
assertEquals(json.NumberLoadedRows, 6)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
// should be deleted after new_load_scan is ready
sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
sql "sync"
qt_select13 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case14: use json_path and json_root
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('false', 'test_json_load_case14', '', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
'$.item', '', 'true', 'nest_json.json')
sql "sync"
qt_select14 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('true', 'test_json_load_case14_2', '', 'true', 'json', 'id= id * 10', '[\"$.id\", \"$.code\"]',
'$.item', '', 'true', 'nest_json.json')
sql "sync"
qt_select14 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case15: apply jsonpaths & exprs & json_root
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('false', 'test_json_load_case15', '', 'true', 'json', 'id, code, city, id= id * 10',
'[\"$.id\", \"$.code\", \"$.city\"]', '$.item', '', 'true', 'nest_json.json')
sql "sync"
qt_select15 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('true', 'test_json_load_case15_2', '', 'true', 'json', 'id, code, city,id= id * 10',
'[\"$.id\", \"$.code\", \"$.city\"]', '$.item', '', 'true', 'nest_json.json')
sql "sync"
qt_select15 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case16: apply jsonpaths & exprs & json_root
try {
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('false', 'test_json_load_case16', 'true', '', 'json', 'id, code, city',
'[\"$.id\", \"$.code\", \"$.city[2]\"]', '$.item', '', 'true', 'nest_json_array.json')
sql "sync"
qt_select16 "select * from ${testTable} order by id"
// test new json reader
sql "DROP TABLE IF EXISTS ${testTable}"
create_test_table1.call(testTable)
load_json_data.call('true', 'test_json_load_case16_2', 'true', '', 'json', 'id, code, city',
'[\"$.id\", \"$.code\", \"$.city[2]\"]', '$.item', '', 'true', 'nest_json_array.json')
sql "sync"
qt_select16 "select * from ${testTable} order by id"
} finally {
try_sql("DROP TABLE IF EXISTS ${testTable}")
@ -342,7 +692,7 @@ suite("test_json_load", "p0") {
def hdfs_file_path = uploadToHdfs "stream_load/simple_object_json.json"
def format = "json"
// case11: import json use pre-filter exprs
// case17: import json use pre-filter exprs
try {
sql "DROP TABLE IF EXISTS ${testTable}"
@ -357,7 +707,7 @@ suite("test_json_load", "p0") {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case12: import json use pre-filter and where exprs
// case18: import json use pre-filter and where exprs
try {
sql "DROP TABLE IF EXISTS ${testTable}"
@ -372,12 +722,12 @@ suite("test_json_load", "p0") {
try_sql("DROP TABLE IF EXISTS ${testTable}")
}
// case13: invalid json
// case19: invalid json
try {
sql "DROP TABLE IF EXISTS ${testTable}"
test_invalid_json_array_table.call(testTable)
load_json_data.call('test_json_load_case11', 'true', '', 'json', '', '',
load_json_data.call('false', 'test_json_load_case19', 'true', '', 'json', '', '',
'', '', '', 'invalid_json_array.json', true)
sql "sync"

View File

@ -801,22 +801,23 @@ order by
String[][] backends = sql """ show backends; """
assertTrue(backends.size() > 0)
for (String[] backend in backends) {
StringBuilder setConfigCommand = new StringBuilder();
setConfigCommand.append("curl -X POST http://")
setConfigCommand.append(backend[2])
setConfigCommand.append(":")
setConfigCommand.append(backend[5])
setConfigCommand.append("/api/update_config?")
String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=true"
logger.info(command1)
String command2 = setConfigCommand.toString() + "enable_new_file_scanner=true"
logger.info(command2)
def process1 = command1.execute()
int code = process1.waitFor()
assertEquals(code, 0)
def process2 = command2.execute()
code = process1.waitFor()
assertEquals(code, 0)
// No need to set this config anymore, but leave this code sample here
// StringBuilder setConfigCommand = new StringBuilder();
// setConfigCommand.append("curl -X POST http://")
// setConfigCommand.append(backend[2])
// setConfigCommand.append(":")
// setConfigCommand.append(backend[5])
// setConfigCommand.append("/api/update_config?")
// String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=true"
// logger.info(command1)
// String command2 = setConfigCommand.toString() + "enable_new_file_scanner=true"
// logger.info(command2)
// def process1 = command1.execute()
// int code = process1.waitFor()
// assertEquals(code, 0)
// def process2 = command2.execute()
// code = process1.waitFor()
// assertEquals(code, 0)
}
}