[debug](Variant) sanitize variant type and column in find_and_set_leave_value (#31436)
This commit is contained in:
@ -965,8 +965,15 @@ rapidjson::Value* find_leaf_node_by_path(rapidjson::Value& json, const PathInDat
|
||||
}
|
||||
|
||||
void find_and_set_leave_value(const IColumn* column, const PathInData& path,
|
||||
const DataTypeSerDeSPtr& type, rapidjson::Value& root,
|
||||
rapidjson::Document::AllocatorType& allocator, int row) {
|
||||
const DataTypeSerDeSPtr& type_serde, const DataTypePtr& type,
|
||||
rapidjson::Value& root, rapidjson::Document::AllocatorType& allocator,
|
||||
int row) {
|
||||
// sanitize type and column
|
||||
if (column->get_name() != type->create_column()->get_name()) {
|
||||
throw Exception(ErrorCode::INTERNAL_ERROR,
|
||||
"failed to set value for path {}, expected type {}, but got {} at row {}",
|
||||
path.get_path(), type->get_name(), column->get_name(), row);
|
||||
}
|
||||
const auto* nullable = assert_cast<const ColumnNullable*>(column);
|
||||
if (nullable->is_null_at(row)) {
|
||||
return;
|
||||
@ -980,7 +987,7 @@ void find_and_set_leave_value(const IColumn* column, const PathInData& path,
|
||||
LOG(FATAL) << "could not find path " << path.get_path()
|
||||
<< ", root: " << std::string(buffer.GetString(), buffer.GetSize());
|
||||
}
|
||||
type->write_one_cell_to_json(*column, *target, allocator, row);
|
||||
type_serde->write_one_cell_to_json(*column, *target, allocator, row);
|
||||
}
|
||||
|
||||
// compact null values
|
||||
@ -1088,7 +1095,8 @@ bool ColumnObject::serialize_one_row_to_json_format(int row, rapidjson::StringBu
|
||||
#endif
|
||||
for (const auto& subcolumn : subcolumns) {
|
||||
find_and_set_leave_value(subcolumn->data.get_finalized_column_ptr(), subcolumn->path,
|
||||
subcolumn->data.get_least_common_type_serde(), root,
|
||||
subcolumn->data.get_least_common_type_serde(),
|
||||
subcolumn->data.get_least_common_type(), root,
|
||||
doc_structure->GetAllocator(), row);
|
||||
}
|
||||
compact_null_values(root, doc_structure->GetAllocator());
|
||||
@ -1151,7 +1159,8 @@ void ColumnObject::merge_sparse_to_root_column() {
|
||||
continue;
|
||||
}
|
||||
find_and_set_leave_value(column, subcolumn->path,
|
||||
subcolumn->data.get_least_common_type_serde(), root,
|
||||
subcolumn->data.get_least_common_type_serde(),
|
||||
subcolumn->data.get_least_common_type(), root,
|
||||
doc_structure->GetAllocator(), i);
|
||||
}
|
||||
|
||||
|
||||
@ -15,17 +15,16 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("regression_test_variant_github_events_p0", "variant_type"){
|
||||
suite("regression_test_variant_github_events_p0", "nonConcurrent"){
|
||||
def backendId_to_backendIP = [:]
|
||||
def backendId_to_backendHttpPort = [:]
|
||||
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
|
||||
def set_be_config = { key, value ->
|
||||
String backend_id;
|
||||
def backendId_to_backendIP = [:]
|
||||
def backendId_to_backendHttpPort = [:]
|
||||
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
|
||||
|
||||
backend_id = backendId_to_backendIP.keySet()[0]
|
||||
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
|
||||
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
|
||||
}
|
||||
for (String backend_id: backendId_to_backendIP.keySet()) {
|
||||
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
|
||||
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
|
||||
}
|
||||
}
|
||||
sql "set enable_memtable_on_sink_node = true"
|
||||
def load_json_data = {table_name, file_name ->
|
||||
// load the json data
|
||||
@ -68,7 +67,7 @@ suite("regression_test_variant_github_events_p0", "variant_type"){
|
||||
DISTRIBUTED BY HASH(k) BUCKETS 4
|
||||
properties("replication_num" = "1", "disable_auto_compaction" = "false");
|
||||
"""
|
||||
set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "0.95")
|
||||
set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "1")
|
||||
// 2015
|
||||
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""")
|
||||
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-1.json'}""")
|
||||
|
||||
@ -15,7 +15,16 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("regression_test_variant_github_events_p0", "variant_type"){
|
||||
suite("regression_test_variant_github_events_p0", "nonConcurrent"){
|
||||
def backendId_to_backendIP = [:]
|
||||
def backendId_to_backendHttpPort = [:]
|
||||
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
|
||||
def set_be_config = { key, value ->
|
||||
for (String backend_id: backendId_to_backendIP.keySet()) {
|
||||
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
|
||||
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
|
||||
}
|
||||
}
|
||||
def load_json_data = {table_name, file_name ->
|
||||
// load the json data
|
||||
streamLoad {
|
||||
@ -43,6 +52,7 @@ suite("regression_test_variant_github_events_p0", "variant_type"){
|
||||
}
|
||||
}
|
||||
}
|
||||
set_be_config.call("variant_ratio_of_defaults_as_sparse_column", "1")
|
||||
|
||||
def table_name = "github_events"
|
||||
sql """DROP TABLE IF EXISTS ${table_name}"""
|
||||
|
||||
Reference in New Issue
Block a user