[FEAT MERGE]:Oracle Json Supported
This commit is contained in:
@ -60,7 +60,12 @@ OB_DEF_SERIALIZE(ObAggrInfo)
|
||||
bucket_num_param_expr_,
|
||||
rollup_idx_,
|
||||
grouping_idxs_,
|
||||
group_idxs_
|
||||
group_idxs_,
|
||||
format_json_,
|
||||
strict_json_,
|
||||
absent_on_null_,
|
||||
returning_type_,
|
||||
with_unique_keys_
|
||||
);
|
||||
if (T_FUN_AGG_UDF == get_expr_type()) {
|
||||
OB_UNIS_ENCODE(*dll_udf_);
|
||||
@ -93,7 +98,12 @@ OB_DEF_DESERIALIZE(ObAggrInfo)
|
||||
bucket_num_param_expr_,
|
||||
rollup_idx_,
|
||||
grouping_idxs_,
|
||||
group_idxs_
|
||||
group_idxs_,
|
||||
format_json_,
|
||||
strict_json_,
|
||||
absent_on_null_,
|
||||
returning_type_,
|
||||
with_unique_keys_
|
||||
);
|
||||
if (T_FUN_AGG_UDF == get_expr_type()) {
|
||||
CK(NULL != alloc_);
|
||||
@ -135,7 +145,12 @@ OB_DEF_SERIALIZE_SIZE(ObAggrInfo)
|
||||
bucket_num_param_expr_,
|
||||
rollup_idx_,
|
||||
grouping_idxs_,
|
||||
group_idxs_
|
||||
group_idxs_,
|
||||
format_json_,
|
||||
strict_json_,
|
||||
absent_on_null_,
|
||||
returning_type_,
|
||||
with_unique_keys_
|
||||
);
|
||||
if (T_FUN_AGG_UDF == get_expr_type()) {
|
||||
OB_UNIS_ADD_LEN(*dll_udf_);
|
||||
@ -591,7 +606,9 @@ int ObAggregateProcessor::init()
|
||||
T_FUN_KEEP_WM_CONCAT == aggr_info.get_expr_type() ||
|
||||
T_FUN_WM_CONCAT == aggr_info.get_expr_type() ||
|
||||
T_FUN_JSON_ARRAYAGG == aggr_info.get_expr_type() ||
|
||||
T_FUN_JSON_OBJECTAGG == aggr_info.get_expr_type());
|
||||
T_FUN_ORA_JSON_ARRAYAGG == aggr_info.get_expr_type() ||
|
||||
T_FUN_JSON_OBJECTAGG == aggr_info.get_expr_type() ||
|
||||
T_FUN_ORA_JSON_OBJECTAGG == aggr_info.get_expr_type());
|
||||
has_order_by_ |= aggr_info.has_order_by_;
|
||||
if (!has_extra_) {
|
||||
has_extra_ |= aggr_info.has_distinct_;
|
||||
@ -1540,8 +1557,10 @@ int ObAggregateProcessor::generate_group_row(GroupRow *&new_group_row,
|
||||
case T_FUN_WM_CONCAT:
|
||||
case T_FUN_PL_AGG_UDF:
|
||||
case T_FUN_HYBRID_HIST:
|
||||
case T_FUN_JSON_ARRAYAGG:
|
||||
case T_FUN_JSON_OBJECTAGG: {
|
||||
case T_FUN_JSON_ARRAYAGG:
|
||||
case T_FUN_ORA_JSON_ARRAYAGG:
|
||||
case T_FUN_JSON_OBJECTAGG:
|
||||
case T_FUN_ORA_JSON_OBJECTAGG: {
|
||||
void *tmp_buf = NULL;
|
||||
if (OB_ISNULL(tmp_buf = aggr_alloc_.alloc(sizeof(GroupConcatExtraResult)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
@ -1756,7 +1775,7 @@ int ObAggregateProcessor::rollup_base_process(
|
||||
}
|
||||
} else {
|
||||
if (aggr_info.has_distinct_) {
|
||||
if(OB_FAIL(rollup_distinct(aggr_cell, rollup_cell))) {
|
||||
if (OB_FAIL(rollup_distinct(aggr_cell, rollup_cell))) {
|
||||
LOG_WARN("failed to rollup aggregation results", K(ret));
|
||||
}
|
||||
} else {
|
||||
@ -1930,7 +1949,9 @@ int ObAggregateProcessor::rollup_aggregation(AggrCell &aggr_cell, AggrCell &roll
|
||||
case T_FUN_PL_AGG_UDF:
|
||||
case T_FUN_HYBRID_HIST:
|
||||
case T_FUN_JSON_ARRAYAGG:
|
||||
case T_FUN_JSON_OBJECTAGG: {
|
||||
case T_FUN_ORA_JSON_ARRAYAGG:
|
||||
case T_FUN_JSON_OBJECTAGG:
|
||||
case T_FUN_ORA_JSON_OBJECTAGG: {
|
||||
GroupConcatExtraResult *aggr_extra = NULL;
|
||||
GroupConcatExtraResult *rollup_extra = NULL;
|
||||
if (OB_ISNULL(aggr_extra = static_cast<GroupConcatExtraResult *>(aggr_cell.get_extra()))
|
||||
@ -2116,7 +2137,9 @@ int ObAggregateProcessor::prepare_aggr_result(const ObChunkDatumStore::StoredRow
|
||||
case T_FUN_PL_AGG_UDF:
|
||||
case T_FUN_HYBRID_HIST:
|
||||
case T_FUN_JSON_ARRAYAGG:
|
||||
case T_FUN_JSON_OBJECTAGG: {
|
||||
case T_FUN_ORA_JSON_ARRAYAGG:
|
||||
case T_FUN_JSON_OBJECTAGG:
|
||||
case T_FUN_ORA_JSON_OBJECTAGG: {
|
||||
GroupConcatExtraResult *extra = NULL;
|
||||
if (OB_ISNULL(extra = static_cast<GroupConcatExtraResult *>(aggr_cell.get_extra()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -2334,7 +2357,9 @@ int ObAggregateProcessor::process_aggr_batch_result(
|
||||
case T_FUN_PL_AGG_UDF:
|
||||
case T_FUN_HYBRID_HIST:
|
||||
case T_FUN_JSON_ARRAYAGG:
|
||||
case T_FUN_JSON_OBJECTAGG: {
|
||||
case T_FUN_ORA_JSON_ARRAYAGG:
|
||||
case T_FUN_JSON_OBJECTAGG:
|
||||
case T_FUN_ORA_JSON_OBJECTAGG: {
|
||||
GroupConcatExtraResult *extra_info = NULL;
|
||||
if (OB_ISNULL(extra_info = static_cast<GroupConcatExtraResult *>(aggr_cell.get_extra()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -2349,11 +2374,11 @@ int ObAggregateProcessor::process_aggr_batch_result(
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < len; i++) {
|
||||
ObExpr *tmp = NULL;
|
||||
if (OB_FAIL(param_exprs->at(i, tmp))){
|
||||
if (OB_FAIL(param_exprs->at(i, tmp))) {
|
||||
LOG_WARN("fail to get param_exprs[i]", K(ret));
|
||||
} else {
|
||||
bool is_bool = (tmp->is_boolean_ == 1);
|
||||
if (OB_FAIL(extra_info->set_bool_mark(i, is_bool))){
|
||||
if (OB_FAIL(extra_info->set_bool_mark(i, is_bool))) {
|
||||
LOG_WARN("fail to set_bool_mark", K(ret));
|
||||
}
|
||||
}
|
||||
@ -2548,7 +2573,9 @@ int ObAggregateProcessor::process_aggr_result(const ObChunkDatumStore::StoredRow
|
||||
case T_FUN_PL_AGG_UDF:
|
||||
case T_FUN_HYBRID_HIST:
|
||||
case T_FUN_JSON_ARRAYAGG:
|
||||
case T_FUN_JSON_OBJECTAGG: {
|
||||
case T_FUN_ORA_JSON_ARRAYAGG:
|
||||
case T_FUN_JSON_OBJECTAGG:
|
||||
case T_FUN_ORA_JSON_OBJECTAGG: {
|
||||
GroupConcatExtraResult *extra = NULL;
|
||||
if (OB_ISNULL(extra = static_cast<GroupConcatExtraResult *>(aggr_cell.get_extra()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -2846,6 +2873,15 @@ int ObAggregateProcessor::collect_aggr_result(
|
||||
break;
|
||||
}
|
||||
|
||||
case T_FUN_ORA_JSON_ARRAYAGG: {
|
||||
GroupConcatExtraResult *extra = static_cast<GroupConcatExtraResult *>(aggr_cell.get_extra());
|
||||
if (OB_FAIL(get_ora_json_arrayagg_result(aggr_info, extra, result))) {
|
||||
LOG_WARN("failed to get json_arrayagg result", K(ret));
|
||||
} else {
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case T_FUN_JSON_OBJECTAGG: {
|
||||
GroupConcatExtraResult *extra = static_cast<GroupConcatExtraResult *>(aggr_cell.get_extra());
|
||||
if (OB_FAIL(get_json_objectagg_result(aggr_info, extra, result))) {
|
||||
@ -2855,6 +2891,15 @@ int ObAggregateProcessor::collect_aggr_result(
|
||||
break;
|
||||
}
|
||||
|
||||
case T_FUN_ORA_JSON_OBJECTAGG: {
|
||||
GroupConcatExtraResult *extra = static_cast<GroupConcatExtraResult *>(aggr_cell.get_extra());
|
||||
if (OB_FAIL(get_ora_json_objectagg_result(aggr_info, extra, result))) {
|
||||
LOG_WARN("failed to get json_objectagg result", K(ret));
|
||||
} else {
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case T_FUN_GROUP_CONCAT: {
|
||||
GroupConcatExtraResult *extra = NULL;
|
||||
ObString sep_str;
|
||||
@ -5652,6 +5697,7 @@ int ObAggregateProcessor::get_json_arrayagg_result(const ObAggrInfo &aggr_info,
|
||||
if (OB_FAIL(ObJsonExprHelper::transform_scalar_2jsonBase(converted_datum, val_type,
|
||||
&tmp_alloc, scale,
|
||||
eval_ctx_.exec_ctx_.get_my_session()->get_timezone_info(),
|
||||
eval_ctx_.exec_ctx_.get_my_session(),
|
||||
json_val, false))) {
|
||||
LOG_WARN("failed: parse value to jsonBase", K(ret), K(val_type));
|
||||
}
|
||||
@ -5660,7 +5706,7 @@ int ObAggregateProcessor::get_json_arrayagg_result(const ObAggrInfo &aggr_info,
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(json_array.array_append(json_val))) {
|
||||
LOG_WARN("failed: json array append json value", K(ret));
|
||||
} else if(json_array.get_serialize_size() > OB_MAX_PACKET_LENGTH) {
|
||||
} else if (json_array.get_serialize_size() > OB_MAX_PACKET_LENGTH) {
|
||||
ret = OB_ERR_TOO_LONG_STRING_IN_CONCAT;
|
||||
LOG_WARN("result of json_arrayagg is too long", K(ret), K(json_array.get_serialize_size()),
|
||||
K(OB_MAX_PACKET_LENGTH));
|
||||
@ -5684,6 +5730,180 @@ int ObAggregateProcessor::get_json_arrayagg_result(const ObAggrInfo &aggr_info,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAggregateProcessor::get_ora_json_arrayagg_result(const ObAggrInfo &aggr_info,
|
||||
GroupConcatExtraResult *&extra,
|
||||
ObDatum &concat_result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObArenaAllocator tmp_alloc;
|
||||
if (OB_ISNULL(extra) || OB_UNLIKELY(extra->empty())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unpexcted null", K(ret), K(extra));
|
||||
} else if (extra->is_iterated() && OB_FAIL(extra->rewind())) {
|
||||
// Group concat row may be iterated in rollup_process(), rewind here.
|
||||
LOG_WARN("rewind failed", KPC(extra), K(ret));
|
||||
} else if (!extra->is_iterated() && OB_FAIL(extra->finish_add_row())) {
|
||||
LOG_WARN("finish_add_row failed", KPC(extra), K(ret));
|
||||
} else {
|
||||
const ObChunkDatumStore::StoredRow *storted_row = NULL;
|
||||
ObJsonBuffer json_array_buf(&aggr_alloc_);
|
||||
ObObj *tmp_obj = NULL;
|
||||
if (OB_FAIL(json_array_buf.append("["))) {
|
||||
LOG_WARN("fail to append curly brace", K(ret));
|
||||
}
|
||||
bool inited_tmp_obj = false;
|
||||
while (OB_SUCC(ret) && OB_SUCC(extra->get_next_row(storted_row))) {
|
||||
if (OB_ISNULL(storted_row)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected null", K(ret), K(storted_row));
|
||||
} else {
|
||||
// get type
|
||||
if (!inited_tmp_obj && OB_ISNULL(tmp_obj = static_cast<ObObj*>(tmp_alloc.alloc(
|
||||
sizeof(ObObj) * (storted_row->cnt_))))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("failed to allocate memory", K(ret), K(tmp_obj));
|
||||
} else if (!inited_tmp_obj && FALSE_IT(inited_tmp_obj = true)) {
|
||||
} else if (OB_FAIL(convert_datum_to_obj(aggr_info, *storted_row, tmp_obj, storted_row->cnt_))) {
|
||||
LOG_WARN("failed to convert datum to obj", K(ret));
|
||||
} else if (storted_row->cnt_ < 1) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected column count", K(ret), K(storted_row));
|
||||
} else {
|
||||
ObObjType val_type = tmp_obj->get_type();
|
||||
const ObExpr *expr = aggr_info.param_exprs_.at(0);
|
||||
if (val_type == ObNumberType && aggr_info.format_json_) {
|
||||
ret = OB_ERR_INVALID_TYPE_FOR_OP;
|
||||
LOG_USER_ERROR(OB_ERR_INVALID_TYPE_FOR_OP, "CHAR", ob_obj_type_str(val_type));
|
||||
} else {
|
||||
if (ob_is_null(val_type) && !aggr_info.absent_on_null_) {
|
||||
continue;
|
||||
}
|
||||
ObCollationType cs_type = tmp_obj->get_collation_type();
|
||||
ObScale scale = tmp_obj->get_scale();
|
||||
ObIJsonBase *json_val = NULL;
|
||||
ObDatum converted_datum;
|
||||
converted_datum.set_datum(storted_row->cells()[0]);
|
||||
// convert string charset if needed
|
||||
if ((ob_is_string_type(val_type) || (ob_is_lob_locator(val_type) && !ObJsonExprHelper::is_cs_type_bin(cs_type)) || ob_is_json(val_type) || ob_is_raw(val_type))
|
||||
&& (ObCharset::charset_type_by_coll(cs_type) != CHARSET_UTF8MB4)) {
|
||||
ObString origin_str;
|
||||
if (ob_is_lob_locator(val_type)) {
|
||||
const ObLobLocator &lob_locator = converted_datum.get_lob_locator();
|
||||
origin_str.assign(const_cast<char *>(lob_locator.get_payload_ptr()), lob_locator.payload_size_);
|
||||
val_type = ObVarcharType;
|
||||
} else {
|
||||
origin_str = converted_datum.get_string();
|
||||
}
|
||||
ObString converted_str;
|
||||
if (OB_FAIL(ObExprUtil::convert_string_collation(origin_str, cs_type, converted_str,
|
||||
CS_TYPE_UTF8MB4_BIN, tmp_alloc))) {
|
||||
LOG_WARN("convert string collation failed", K(ret), K(cs_type), K(origin_str.length()));
|
||||
} else {
|
||||
converted_datum.set_string(converted_str);
|
||||
cs_type = CS_TYPE_UTF8MB4_BIN;
|
||||
}
|
||||
}
|
||||
ObEvalCtx ctx(eval_ctx_);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (ObJsonExprHelper::is_convertible_to_json(val_type) || ob_is_raw(val_type)) {
|
||||
|
||||
if (OB_FAIL(ObJsonExprHelper::transform_convertible_2String(*expr, ctx, converted_datum, val_type,
|
||||
cs_type, json_array_buf,
|
||||
aggr_info.format_json_,
|
||||
aggr_info.strict_json_, 0))) {
|
||||
LOG_WARN("failed: parse value to jsonBase", K(ret), K(val_type));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(ObJsonExprHelper::transform_scalar_2String(ctx, converted_datum,
|
||||
val_type, scale,
|
||||
eval_ctx_.exec_ctx_.get_my_session()->get_timezone_info(),
|
||||
json_array_buf))) {
|
||||
LOG_WARN("failed: parse value to jsonBase", K(ret), K(val_type));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (json_array_buf.length() > OB_MAX_PACKET_LENGTH) {
|
||||
ret = OB_ERR_TOO_LONG_STRING_IN_CONCAT;
|
||||
LOG_WARN("result of json_arrar is too long", K(ret), K(json_array_buf.length()),
|
||||
K(OB_MAX_PACKET_LENGTH));
|
||||
} else if (json_array_buf.append(",")) {
|
||||
LOG_WARN("fail to append comma", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}//end of while
|
||||
if (ret != OB_ITER_END && ret != OB_SUCCESS) {
|
||||
LOG_WARN("fail to get next row", K(ret));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
ParseNode parse_node;
|
||||
parse_node.value_ = aggr_info.returning_type_;
|
||||
ObObjType obj_type = static_cast<ObObjType>(parse_node.int16_values_[OB_NODE_CAST_TYPE_IDX]);
|
||||
ObCollationType obj_cs_type = static_cast<ObCollationType>(parse_node.int16_values_[OB_NODE_CAST_COLL_IDX]);
|
||||
if (json_array_buf.length() > 1) {
|
||||
char *end_of_obj = json_array_buf.ptr() + json_array_buf.length() - 1;
|
||||
*end_of_obj = ']';
|
||||
} else if (OB_FAIL(json_array_buf.append("]"))) {
|
||||
LOG_WARN("fail to append curly brace", K(ret));
|
||||
}
|
||||
|
||||
int32_t dst_len = parse_node.int32_values_[OB_NODE_CAST_C_LEN_IDX];
|
||||
const ObString res_str(json_array_buf.length(), json_array_buf.ptr());
|
||||
ObJsonNode* json_node = NULL;
|
||||
uint32_t parse_flag = ObJsonParser::JSN_STRICT_FLAG;
|
||||
|
||||
ADD_FLAG_IF_NEED(!aggr_info.strict_json_, parse_flag, ObJsonParser::JSN_RELAXED_FLAG);
|
||||
ADD_FLAG_IF_NEED(aggr_info.with_unique_keys_, parse_flag, ObJsonParser::JSN_UNIQUE_FLAG);
|
||||
|
||||
if (obj_type == ObJsonType) {
|
||||
ADD_FLAG_IF_NEED(true, parse_flag, ObJsonParser::JSN_UNIQUE_FLAG);
|
||||
}
|
||||
if (obj_type == ObJsonType && OB_FAIL(ObJsonParser::get_tree(&tmp_alloc, res_str, json_node, parse_flag))) {
|
||||
LOG_WARN("fail to get json base", K(ret), K(res_str));
|
||||
} else if (ob_is_string_type(obj_type) || ob_is_lob_locator(obj_type) || ob_is_raw(obj_type)) {
|
||||
if (ob_is_string_type(obj_type) || ob_is_raw(obj_type)) {
|
||||
if (obj_type == ObVarcharType && res_str.length() > dst_len) {
|
||||
char res_ptr[OB_MAX_DECIMAL_PRECISION] = {0};
|
||||
if (OB_ISNULL(ObCharset::lltostr(dst_len, res_ptr, 10, 1))) {
|
||||
LOG_WARN("dst_len fail to string.", K(ret));
|
||||
}
|
||||
ret = OB_OPERATE_OVERFLOW;
|
||||
LOG_USER_ERROR(OB_OPERATE_OVERFLOW, res_ptr, "json_arrayagg");
|
||||
} else {
|
||||
concat_result.set_string(res_str);
|
||||
}
|
||||
} else {
|
||||
ObLobLocator *result = nullptr;
|
||||
char *total_buf = NULL;
|
||||
const int64_t total_buf_len = sizeof(ObLobLocator) + res_str.length();
|
||||
if (OB_ISNULL(total_buf = aggr_info.expr_->get_str_res_mem(eval_ctx_, total_buf_len))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("Failed to allocate memory for lob locator", K(ret), K(total_buf_len));
|
||||
} else if (FALSE_IT(result = reinterpret_cast<ObLobLocator *> (total_buf))) {
|
||||
} else if (OB_FAIL(result->init(res_str))) {
|
||||
LOG_WARN("Failed to init lob locator", K(ret), K(res_str), K(result));
|
||||
} else {
|
||||
concat_result.set_lob_locator(*result);
|
||||
}
|
||||
}
|
||||
} else if (ob_is_json(obj_type)) {
|
||||
ObString raw_binary_str;
|
||||
if (OB_FAIL(json_node->get_raw_binary(raw_binary_str, &aggr_alloc_))) {
|
||||
LOG_WARN("get result binary failed", K(ret));
|
||||
} else {
|
||||
concat_result.set_string(raw_binary_str);
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unpexcted returning type", K(ret), K(obj_type));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAggregateProcessor::get_json_objectagg_result(const ObAggrInfo &aggr_info,
|
||||
GroupConcatExtraResult *&extra,
|
||||
ObDatum &concat_result)
|
||||
@ -5711,7 +5931,7 @@ int ObAggregateProcessor::get_json_objectagg_result(const ObAggrInfo &aggr_info,
|
||||
if (OB_ISNULL(storted_row)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected null", K(ret), K(storted_row));
|
||||
} else if(storted_row->cnt_ != col_num) {
|
||||
} else if (storted_row->cnt_ != col_num) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected column count", K(ret), K(storted_row->cnt_));
|
||||
} else {
|
||||
@ -5790,6 +6010,7 @@ int ObAggregateProcessor::get_json_objectagg_result(const ObAggrInfo &aggr_info,
|
||||
if (OB_FAIL(ObJsonExprHelper::transform_scalar_2jsonBase(converted_datum, val_type1,
|
||||
&tmp_alloc, scale1,
|
||||
eval_ctx_.exec_ctx_.get_my_session()->get_timezone_info(),
|
||||
eval_ctx_.exec_ctx_.get_my_session(),
|
||||
json_val, false))) {
|
||||
LOG_WARN("failed: parse value to jsonBase", K(ret), K(val_type1));
|
||||
}
|
||||
@ -5798,7 +6019,7 @@ int ObAggregateProcessor::get_json_objectagg_result(const ObAggrInfo &aggr_info,
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(json_object.object_add(key_data, json_val))) {
|
||||
LOG_WARN("failed: json object add json value", K(ret));
|
||||
} else if(json_object.get_serialize_size() > OB_MAX_PACKET_LENGTH) {
|
||||
} else if (json_object.get_serialize_size() > OB_MAX_PACKET_LENGTH) {
|
||||
ret = OB_ERR_TOO_LONG_STRING_IN_CONCAT;
|
||||
LOG_WARN("result of json_objectagg is too long", K(ret), K(json_object.get_serialize_size()),
|
||||
K(OB_MAX_PACKET_LENGTH));
|
||||
@ -5826,6 +6047,235 @@ int ObAggregateProcessor::get_json_objectagg_result(const ObAggrInfo &aggr_info,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAggregateProcessor::check_key_valid(common::hash::ObHashSet<ObString> &view_key_names, const ObString &key_name)
|
||||
{
|
||||
INIT_SUCC(ret);
|
||||
|
||||
if (OB_HASH_EXIST == view_key_names.exist_refactored(key_name)) {
|
||||
ret = OB_ERR_DUPLICATE_KEY;
|
||||
LOG_WARN("duplicate key", K(ret));
|
||||
} else if (OB_FAIL(view_key_names.set_refactored(key_name, 0))) {
|
||||
LOG_WARN("store key to vector failed", K(ret), K(view_key_names.size()));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAggregateProcessor::get_ora_json_objectagg_result(const ObAggrInfo &aggr_info,
|
||||
GroupConcatExtraResult *&extra,
|
||||
ObDatum &concat_result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObArenaAllocator tmp_alloc;
|
||||
const int64_t MAX_BUCKET_NUM = 1024;
|
||||
common::hash::ObHashSet<ObString> view_key_names;
|
||||
if (OB_FAIL(view_key_names.create(MAX_BUCKET_NUM))) {
|
||||
LOG_WARN("init hash failed", K(ret), K(MAX_BUCKET_NUM));
|
||||
} else if (OB_ISNULL(extra) || OB_UNLIKELY(extra->empty())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unpexcted null", K(ret), K(extra));
|
||||
} else if (extra->is_iterated() && OB_FAIL(extra->rewind())) {
|
||||
// Group concat row may be iterated in rollup_process(), rewind here.
|
||||
LOG_WARN("rewind failed", KPC(extra), K(ret));
|
||||
} else if (!extra->is_iterated() && OB_FAIL(extra->finish_add_row())) {
|
||||
LOG_WARN("finish_add_row failed", KPC(extra), K(ret));
|
||||
} else {
|
||||
const ObChunkDatumStore::StoredRow *storted_row = NULL;
|
||||
ObJsonBuffer json_object_buf(&aggr_alloc_);
|
||||
if (OB_FAIL(json_object_buf.append("{"))) {
|
||||
LOG_WARN("fail to append curly brace", K(ret));
|
||||
}
|
||||
bool inited_tmp_obj = false;
|
||||
ObObj *tmp_obj = NULL;
|
||||
while (OB_SUCC(ret) && OB_SUCC(extra->get_next_row(storted_row))) {
|
||||
if (OB_ISNULL(storted_row)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected null", K(ret), K(storted_row));
|
||||
} else {
|
||||
// get obj
|
||||
if (!inited_tmp_obj && OB_ISNULL(tmp_obj = static_cast<ObObj*>(tmp_alloc.alloc(
|
||||
sizeof(ObObj) * (storted_row->cnt_))))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("failed to allocate memory", K(ret), K(tmp_obj));
|
||||
} else if (!inited_tmp_obj && FALSE_IT(inited_tmp_obj = true)) {
|
||||
} else if (OB_FAIL(convert_datum_to_obj(aggr_info, *storted_row, tmp_obj, storted_row->cnt_))) {
|
||||
LOG_WARN("failed to convert datum to obj", K(ret));
|
||||
} else if (tmp_obj[0].get_type() == ObNullType) {
|
||||
ret = OB_ERR_JSON_DOCUMENT_NULL_KEY;
|
||||
LOG_WARN("null type for json_objectagg key");
|
||||
} else if (ob_is_string_type(tmp_obj[0].get_type())
|
||||
&& tmp_obj[0].get_collation_type() == CS_TYPE_BINARY) {
|
||||
// not support binary charset as mysql
|
||||
LOG_WARN("unsuport json string type with binary charset",
|
||||
K(tmp_obj[0].get_type()), K(tmp_obj[0].get_collation_type()));
|
||||
ret = OB_ERR_INVALID_JSON_CHARSET;
|
||||
LOG_USER_ERROR(OB_ERR_INVALID_JSON_CHARSET);
|
||||
} else if (NULL == tmp_obj[0].get_string_ptr()) {
|
||||
ret = OB_ERR_NULL_VALUE;
|
||||
LOG_WARN("unexpected null result", K(ret), K(tmp_obj[0]));
|
||||
} else {
|
||||
ObObjType val_type1 = tmp_obj[1].get_type();
|
||||
const ObExpr *expr = aggr_info.param_exprs_.at(1);
|
||||
if (val_type1 == ObNumberType && aggr_info.format_json_) {
|
||||
ret = OB_ERR_INVALID_TYPE_FOR_OP;
|
||||
LOG_USER_ERROR(OB_ERR_INVALID_TYPE_FOR_OP, "CHAR", ob_obj_type_str(val_type1));
|
||||
} else {
|
||||
if (ob_is_null(val_type1) && (aggr_info.absent_on_null_ >= 1)) {
|
||||
continue;
|
||||
}
|
||||
ObObjType val_type0 = tmp_obj[0].get_type();
|
||||
ObCollationType cs_type0 = tmp_obj[0].get_collation_type();
|
||||
ObScale scale1 = tmp_obj[1].get_scale();
|
||||
ObCollationType cs_type1 = tmp_obj[1].get_collation_type();
|
||||
ObString key_string = tmp_obj[0].get_string();
|
||||
if (OB_SUCC(ret) && ObCharset::charset_type_by_coll(cs_type0) != CHARSET_UTF8MB4) {
|
||||
ObString converted_key_str;
|
||||
if (OB_FAIL(ObExprUtil::convert_string_collation(key_string, cs_type0, converted_key_str,
|
||||
CS_TYPE_UTF8MB4_BIN, tmp_alloc))) {
|
||||
LOG_WARN("convert key string collation failed", K(ret), K(cs_type0), K(key_string.length()));
|
||||
} else {
|
||||
key_string = converted_key_str;
|
||||
}
|
||||
}
|
||||
|
||||
// get key and value, and append to json_object
|
||||
if (OB_SUCC(ret) && aggr_info.with_unique_keys_
|
||||
&& OB_FAIL(check_key_valid(view_key_names, key_string))) {
|
||||
LOG_WARN("duplicate key", K(ret), K(key_string));
|
||||
}
|
||||
ObJsonString ob_str(key_string.ptr(), key_string.length());
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(ob_str.print(json_object_buf, true, false, 0, true))) {
|
||||
LOG_WARN("fail to print json node", K(ret));
|
||||
} else if (OB_FAIL(json_object_buf.append(":"))) {
|
||||
LOG_WARN("fail to append colon", K(ret));
|
||||
} else {
|
||||
ObDatum converted_datum;
|
||||
converted_datum.set_datum(storted_row->cells()[1]);
|
||||
if ((ob_is_string_type(val_type1) || (ob_is_lob_locator(val_type1) && !ObJsonExprHelper::is_cs_type_bin(cs_type1)) || ob_is_json(val_type1) || ob_is_raw(val_type1))
|
||||
&& (ObCharset::charset_type_by_coll(cs_type1) != CHARSET_UTF8MB4)) {
|
||||
ObString origin_str;
|
||||
if (ob_is_lob_locator(val_type1)) {
|
||||
const ObLobLocator &lob_locator = converted_datum.get_lob_locator();
|
||||
origin_str.assign(const_cast<char *>(lob_locator.get_payload_ptr()), lob_locator.payload_size_);
|
||||
val_type1 = ObVarcharType;
|
||||
} else {
|
||||
origin_str = converted_datum.get_string();
|
||||
}
|
||||
ObString converted_str;
|
||||
if (OB_FAIL(ObExprUtil::convert_string_collation(origin_str, cs_type1, converted_str,
|
||||
CS_TYPE_UTF8MB4_BIN, tmp_alloc))) {
|
||||
LOG_WARN("convert string collation failed", K(ret), K(cs_type1), K(origin_str.length()));
|
||||
} else {
|
||||
converted_datum.set_string(converted_str);
|
||||
cs_type1 = CS_TYPE_UTF8MB4_BIN;
|
||||
}
|
||||
}
|
||||
|
||||
ObIJsonBase *json_val = NULL;
|
||||
ObString val_data;
|
||||
ObEvalCtx ctx(eval_ctx_);
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (ObJsonExprHelper::is_convertible_to_json(val_type1) || ob_is_raw(val_type1)) {
|
||||
if (OB_FAIL(ObJsonExprHelper::transform_convertible_2String(*expr, ctx, converted_datum, val_type1,
|
||||
cs_type1, json_object_buf,
|
||||
aggr_info.format_json_,
|
||||
aggr_info.strict_json_, 0))) {
|
||||
LOG_WARN("failed: parse value to jsonBase", K(ret), K(val_type1));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(ObJsonExprHelper::transform_scalar_2String(ctx, converted_datum,
|
||||
val_type1, scale1,
|
||||
eval_ctx_.exec_ctx_.get_my_session()->get_timezone_info(),
|
||||
json_object_buf))) {
|
||||
LOG_WARN("failed: parse value to jsonBase", K(ret), K(val_type1));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (json_object_buf.length() > OB_MAX_PACKET_LENGTH) {
|
||||
ret = OB_ERR_TOO_LONG_STRING_IN_CONCAT;
|
||||
LOG_WARN("result of json_objectagg is too long", K(ret), K(json_object_buf.length()),
|
||||
K(OB_MAX_PACKET_LENGTH));
|
||||
} else if (json_object_buf.append(",")) {
|
||||
LOG_WARN("fail to append comma", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}//end of while
|
||||
if (ret == OB_ERR_JSON_DOCUMENT_NULL_KEY) {
|
||||
LOG_USER_ERROR(OB_ERR_JSON_DOCUMENT_NULL_KEY);
|
||||
}
|
||||
if (ret != OB_ITER_END && ret != OB_SUCCESS) {
|
||||
LOG_WARN("fail to get next row", K(ret));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
ParseNode parse_node;
|
||||
parse_node.value_ = aggr_info.returning_type_;
|
||||
ObObjType obj_type = static_cast<ObObjType>(parse_node.int16_values_[OB_NODE_CAST_TYPE_IDX]);
|
||||
ObCollationType obj_cs_type = static_cast<ObCollationType>(parse_node.int16_values_[OB_NODE_CAST_COLL_IDX]);
|
||||
if (json_object_buf.length() > 1) {
|
||||
char *end_of_obj = json_object_buf.ptr() + json_object_buf.length() - 1;
|
||||
*end_of_obj = '}';
|
||||
} else if (OB_FAIL(json_object_buf.append("}"))) {
|
||||
LOG_WARN("fail to append curly brace", K(ret));
|
||||
}
|
||||
const ObString res_str(json_object_buf.length(), json_object_buf.ptr());
|
||||
ObJsonNode* json_node = NULL;
|
||||
int32_t dst_len = parse_node.int32_values_[OB_NODE_CAST_C_LEN_IDX];
|
||||
uint32_t parse_flag = ObJsonParser::JSN_STRICT_FLAG;
|
||||
|
||||
ADD_FLAG_IF_NEED(!aggr_info.strict_json_, parse_flag, ObJsonParser::JSN_RELAXED_FLAG);
|
||||
ADD_FLAG_IF_NEED(aggr_info.with_unique_keys_, parse_flag, ObJsonParser::JSN_UNIQUE_FLAG);
|
||||
|
||||
if (obj_type == ObJsonType) {
|
||||
ADD_FLAG_IF_NEED(true, parse_flag, ObJsonParser::JSN_UNIQUE_FLAG);
|
||||
}
|
||||
if (obj_type == ObJsonType && OB_FAIL(ObJsonParser::get_tree(&tmp_alloc, res_str, json_node, parse_flag))) {
|
||||
LOG_WARN("fail to get json base", K(ret));
|
||||
} else if (ob_is_string_type(obj_type) || ob_is_lob_locator(obj_type) || ob_is_raw(obj_type)) {
|
||||
if (ob_is_string_type(obj_type) || ob_is_raw(obj_type)) {
|
||||
if (obj_type == ObVarcharType && res_str.length() > dst_len) {
|
||||
char res_ptr[OB_MAX_DECIMAL_PRECISION] = {0};
|
||||
if (OB_ISNULL(ObCharset::lltostr(dst_len, res_ptr, 10, 1))) {
|
||||
LOG_WARN("dst_len fail to string.", K(ret));
|
||||
}
|
||||
ret = OB_OPERATE_OVERFLOW;
|
||||
LOG_USER_ERROR(OB_OPERATE_OVERFLOW, res_ptr, "json_objectagg");
|
||||
} else {
|
||||
concat_result.set_string(res_str);
|
||||
}
|
||||
} else {
|
||||
ObLobLocator *result = nullptr;
|
||||
char *total_buf = NULL;
|
||||
const int64_t total_buf_len = sizeof(ObLobLocator) + res_str.length();
|
||||
if (OB_ISNULL(total_buf = aggr_info.expr_->get_str_res_mem(eval_ctx_, total_buf_len))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("Failed to allocate memory for lob locator", K(ret), K(total_buf_len));
|
||||
} else if (FALSE_IT(result = reinterpret_cast<ObLobLocator *> (total_buf))) {
|
||||
} else if (OB_FAIL(result->init(res_str))) {
|
||||
LOG_WARN("Failed to init lob locator", K(ret), K(res_str), K(result));
|
||||
} else {
|
||||
concat_result.set_lob_locator(*result);
|
||||
}
|
||||
}
|
||||
} else if (ob_is_json(obj_type)) {
|
||||
ObString raw_binary_str;
|
||||
if (OB_FAIL(json_node->get_raw_binary(raw_binary_str, &aggr_alloc_))) {
|
||||
LOG_WARN("get result binary failed", K(ret));
|
||||
} else {
|
||||
concat_result.set_string(raw_binary_str);
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unpexcted returning type", K(ret), K(obj_type));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObAggregateProcessor::single_row_agg(GroupRow &group_row, ObEvalCtx &eval_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
Reference in New Issue
Block a user