fix hash gby can not dump in 1st stage when close by pass && splite expr in hash map memory into user tenant

This commit is contained in:
18523270951@163.com
2023-11-13 12:09:09 +00:00
committed by ob-robot
parent ccbaaa781d
commit 81b3514d20
3 changed files with 35 additions and 331 deletions

View File

@ -81,31 +81,6 @@ bool Row<ObDatum>::equal_key(const Row<ObDatum> &other, void **cmp_funcs, const
return equal_ret;
}
template <>
bool Row<ObObj>::equal_key(const Row<ObObj> &other, void **cmp_funcs, const int idx) const
{
UNUSED(cmp_funcs);/**nullptr**/
bool equal_ret = false;
if (OB_ISNULL(other.elems_) || OB_ISNULL(elems_)) {
} else {
bool is_equal = true;
int curr_idx = idx;
for (int i = 0; is_equal && 0 != curr_idx; ++i, curr_idx = curr_idx >> 1) {
if (1 == (curr_idx & 1)) {
if (elems_[i].is_null() && other.elems_[i].is_null()) {
//true
} else if (elems_[i].is_null() || other.elems_[i].is_null()) {
is_equal = false;
} else {
is_equal = elems_[i] == other.elems_[i];
}
}
}
equal_ret = is_equal;
}
return equal_ret;
}
template <>
int Row<ObDatum>::hash_key(void **hash_funcs, const int idx, uint64_t seed, uint64_t &hash_val) const
{
@ -126,27 +101,6 @@ int Row<ObDatum>::hash_key(void **hash_funcs, const int idx, uint64_t seed, uint
return ret;
}
template <>
int Row<ObObj>::hash_key(void **hash_funcs, const int idx, uint64_t seed, uint64_t &hash_val) const
{
UNUSED(hash_funcs);/**nullptr**/
int ret = OB_SUCCESS;
hash_val = 0;
if (OB_ISNULL(elems_)) {
} else {
int curr_idx = idx;
for (int i = 0; 0 != curr_idx && OB_SUCC(ret); ++i, curr_idx = curr_idx >> 1) {
if (1 == (curr_idx & 1)) {
ret = elems_[i].hash(seed, seed);
} else {
continue;
}
}
hash_val = seed;
}
return ret;
}
template <>
int Row<ObDatum>::compare_with_null(const Row<ObDatum> &other,
void **cmp_funcs,
@ -177,36 +131,6 @@ int Row<ObDatum>::compare_with_null(const Row<ObDatum> &other,
}
return ret;
}
//0 for true, -1 for false, 1 for NULL
template <>
int Row<ObObj>::compare_with_null(const Row<ObObj> &other,
void **cmp_funcs,
const int64_t row_dimension,
int &exist_ret) const
{
UNUSED(cmp_funcs);/**nullptr**/
int ret = OB_SUCCESS;
if (OB_ISNULL(other.elems_) || OB_ISNULL(elems_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("NULL pointer param", K(ret));
} else if (row_dimension > 0) {
exist_ret = ObExprInHashMap<ObDatum>::HASH_CMP_TRUE;
for (int i = 0;
ObExprInHashMap<ObObj>::HASH_CMP_FALSE != exist_ret && i < row_dimension; ++i) {
if (elems_[i].is_null() || other.elems_[i].is_null()) {
exist_ret = ObExprInHashMap<ObObj>::HASH_CMP_UNKNOWN;
} else {
int cmp_ret = (elems_[i] == other.elems_[i]) ? 0 : -1;
if (0 != cmp_ret) {
exist_ret = ObExprInHashMap<ObObj>::HASH_CMP_FALSE;
} else {
//do nothing
}
}
}
}
return ret;
}
template <class T>
int Row<T>::set_elem(T *elems)
@ -243,6 +167,7 @@ int ObExprInHashMap<T>::set_refactored(const Row<T> &row)
tmp_row_key.meta_ = &meta_;
if (OB_ISNULL(arr_ptr = const_cast<ObArray<Row<T>> *> (map_.get(tmp_row_key)))) {
ObArray<Row<T>> arr;
arr.set_tenant_id(MTL_ID());
if (OB_FAIL(arr.push_back(row))) {
LOG_WARN("failed to load row", K(ret));
} else {
@ -325,40 +250,6 @@ int ObExprInHashSet<T>::exist_refactored(const Row<T> &row, bool &is_exist)
return OB_SUCCESS;
}
int ObExprInOrNotIn::ObExprInCtx::init_hashset(int64_t param_num)
{
return hashset_.create(param_num * 2);
}
int ObExprInOrNotIn::ObExprInCtx::init_hashset_vecs(int64_t param_num,
int64_t row_dimension,
ObExecContext *exec_ctx)
{
int ret = OB_SUCCESS;
hashset_vecs_ = NULL;
int vecs_buf_size = sizeof(ObExprInHashMap<ObObj> ) * (1 << row_dimension);
if (OB_ISNULL(hashset_vecs_ =
(ObExprInHashMap<ObObj> *)
((exec_ctx->get_allocator()).alloc(vecs_buf_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else {
for (int64_t i = 0; i < (1 << row_dimension); ++i) {
new (&hashset_vecs_[i]) ObExprInHashMap<ObObj> ();
}
for (int64_t i = 0; OB_SUCC(ret) && i < (1 << row_dimension); ++i) {
hashset_vecs_[i].set_meta_idx(i);
hashset_vecs_[i].set_meta_dimension(row_dimension);
if (OB_FAIL(hashset_vecs_[i].create(param_num))){
LOG_WARN("create static_engine_hashset_vecs failed", K(ret), K(i));
}
}
}
row_dimension_ = row_dimension;
return ret;
}
int ObExprInOrNotIn::ObExprInCtx::init_static_engine_hashset(int64_t param_num)
{
static_engine_hashset_.set_meta_idx(1);
@ -367,19 +258,19 @@ int ObExprInOrNotIn::ObExprInCtx::init_static_engine_hashset(int64_t param_num)
return static_engine_hashset_.create(param_num * 2);
}
int ObExprInOrNotIn::ObExprInCtx::init_static_engine_hashset_vecs(int64_t param_num,
int64_t row_dimension,
ObExecContext *exec_ctx)
{
int ret = OB_SUCCESS;
static_engine_hashset_vecs_ = NULL;
int64_t vecs_buf_size = sizeof(ObExprInHashMap<ObDatum> ) * (1 << row_dimension);
if (OB_ISNULL(static_engine_hashset_vecs_ =
(ObExprInHashMap<ObDatum> *)
((exec_ctx->get_allocator()).alloc(vecs_buf_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else {
int ObExprInOrNotIn::ObExprInCtx::init_static_engine_hashset_vecs(int64_t param_num,
int64_t row_dimension,
ObExecContext *exec_ctx)
{
int ret = OB_SUCCESS;
static_engine_hashset_vecs_ = NULL;
int64_t vecs_buf_size = sizeof(ObExprInHashMap<ObDatum> ) * (1 << row_dimension);
if (OB_ISNULL(static_engine_hashset_vecs_ =
(ObExprInHashMap<ObDatum> *)
((exec_ctx->get_allocator()).alloc(vecs_buf_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else {
for (int64_t i = 0; i < (1 << row_dimension); ++i) {
new (&static_engine_hashset_vecs_[i]) ObExprInHashMap<ObDatum> ();
}
@ -387,36 +278,12 @@ int ObExprInOrNotIn::ObExprInCtx::init_static_engine_hashset(int64_t param_num)
static_engine_hashset_vecs_[i].set_meta_idx(i);
static_engine_hashset_vecs_[i].set_meta_dimension(row_dimension);
if (OB_FAIL(static_engine_hashset_vecs_[i].create(param_num))) {
LOG_WARN("create static_engine_hashset_vecs failed", K(ret), K(i));
LOG_WARN("create static_engine_hashset_vecs failed", K(ret), K(i));
}
}
}
row_dimension_ = row_dimension;
return ret;
}
int ObExprInOrNotIn::ObExprInCtx::add_to_hashset(const ObObj &obj)
{
int ret = hashset_.set_refactored(obj);
if (OB_FAIL(ret)) {
LOG_WARN("failed to add to hashset", K(ret), K(obj));
}
return ret;
}
int ObExprInOrNotIn::ObExprInCtx::add_to_hashset_vecs(const Row<ObObj> &row,
const int idx)
{
int ret = OB_SUCCESS;
if (idx >= (1 << row_dimension_)) {
ret = OB_INVALID_ARGUMENT;
} else {
ret = hashset_vecs_[idx].set_refactored(row);
}
if (OB_FAIL(ret)) {
LOG_WARN("failed to add to hashset_vecs", K(ret), K(idx));
}
return ret;
}
int ObExprInOrNotIn::ObExprInCtx::add_to_static_engine_hashset(const Row<common::ObDatum> &row)
@ -444,34 +311,6 @@ add_to_static_engine_hashset_vecs(const Row<common::ObDatum> &row, const int idx
return ret;
}
int ObExprInOrNotIn::ObExprInCtx::exist_in_hashset(const ObObj &obj, bool &is_exist) const
{
int ret = hashset_.exist_refactored(obj);
if (OB_HASH_EXIST == ret) {
ret = OB_SUCCESS;
is_exist = true;
} else if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;
is_exist = false;
} else {
LOG_WARN("failed to search in hashset", K(ret), K(obj));
}
return OB_SUCCESS;
}
int ObExprInOrNotIn::ObExprInCtx::exist_in_hashset_vecs(const Row<ObObj> &row,
const int idx,
int &exist_ret) const
{
int ret = OB_SUCCESS;
if (idx >= (1 << row_dimension_)) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(hashset_vecs_[idx].exist_refactored(row, exist_ret))){
LOG_WARN("failed to find in hash map", K(ret));
}
return ret;
}
int ObExprInOrNotIn::ObExprInCtx::exist_in_static_engine_hashset(const Row<ObDatum> &row,
bool &is_exist)
{
@ -506,13 +345,6 @@ int ObExprInOrNotIn::ObExprInCtx::set_cmp_types(const ObExprCalcType &cmp_type,
return ret;
}
int ObExprInOrNotIn::ObExprInCtx::init_cmp_types(const int64_t row_dimension,
ObExecContext *exec_ctx)
{
cmp_types_.set_allocator(&(exec_ctx->get_allocator()));
return cmp_types_.init(row_dimension);
}
const ObExprCalcType &ObExprInOrNotIn::ObExprInCtx::get_cmp_types(const int64_t idx) const
{
@ -558,31 +390,6 @@ get_hashset_vecs_all_null(const int64_t idx, bool &is_all_null) const
return ret;
}
int ObExprInOrNotIn::ObExprInCtx::
init_right_objs(int64_t param_num,
int64_t row_dimension,
ObExecContext *exec_ctx)
{
int ret = OB_SUCCESS;
right_objs_ = NULL;
int objs_buf_size = sizeof(ObObj *) * param_num; //ObObj *指针数组大小
if (OB_ISNULL(right_objs_ =
(ObObj **)
((exec_ctx->get_allocator()).alloc(objs_buf_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory for ObObj **", K(ret));
} else {
for (int i =0; OB_SUCC(ret) && i < param_num; ++i) {//初始化每个ObObj *
if (OB_ISNULL(right_objs_[i] =
static_cast<ObObj *> (((exec_ctx->get_allocator()).alloc(sizeof(ObObj) * row_dimension)))) ){
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory for ObObj *", K(ret), K(i));
}
}
}
return ret;
}
int ObExprInOrNotIn::ObExprInCtx::
init_right_datums(int64_t param_num,
int64_t row_dimension,
@ -628,25 +435,6 @@ init_cmp_funcs(int64_t func_cnt,
return ret;
}
int ObExprInOrNotIn::ObExprInCtx::set_right_obj(int64_t row_num,
int64_t col_num,
const int right_param_num,
const common::ObObj &obj)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(right_objs_)) {
ret = OB_NOT_INIT;
LOG_WARN("right_datums is not init", K(ret));
} else if (row_num < 0 || row_num >= right_param_num
|| col_num < 0 || col_num >= row_dimension_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("row_num or col_num out of bounds", K(ret));
} else {
right_objs_[row_num][col_num] = obj;
}
return ret;
}
int ObExprInOrNotIn::ObExprInCtx::set_right_datum(int64_t row_num,
int64_t col_num,
const int right_param_num,
@ -748,26 +536,6 @@ int ObExprInOrNotIn::eval_pl_udt_in(const ObExpr &expr,
return ret;
}
inline int ObExprInOrNotIn::to_type(const ObObjType expect_type,
const ObCollationType expect_cs_type,
ObCastCtx &cast_ctx,
const ObObj &in_obj,
ObObj &out_obj) const
{
int ret = OB_SUCCESS;
if (ob_is_string_type(expect_type) && in_obj.is_string_type()
&& in_obj.get_collation_type() == expect_cs_type) {
out_obj = in_obj;
out_obj.set_type(expect_type);
out_obj.set_collation_type(expect_cs_type);
} else {
// otherwise ObObjCaster::to_type() will use cast_ctx.dest_collation_ as dst cs type
cast_ctx.dest_collation_ = CS_TYPE_INVALID;
ret = ObObjCaster::to_type(expect_type, expect_cs_type, cast_ctx, in_obj, out_obj);
}
return ret;
}
inline bool ObExprInOrNotIn::need_hash(ObExecContext *exec_ctx) const
{
return is_param_all_const() && is_param_all_same_type() && is_param_all_same_cs_type()
@ -786,28 +554,10 @@ ObExprIn::ObExprIn(ObIAllocator &alloc)
: ObExprInOrNotIn(alloc, T_OP_IN, N_IN)
{}
void ObExprIn::set_result(ObObj &result, bool is_exist, bool param_exist_null) const
{
if (!is_exist && param_exist_null) {
result.set_null();
} else {
result.set_int(static_cast<int64_t>(is_exist));
}
}
ObExprNotIn::ObExprNotIn(ObIAllocator &alloc)
: ObExprInOrNotIn(alloc, T_OP_NOT_IN, N_NOT_IN)
{}
void ObExprNotIn::set_result(ObObj &result, bool is_exist, bool param_exist_null) const
{
if (!is_exist && param_exist_null) {
result.set_null();
} else {
result.set_int(static_cast<int64_t>(!is_exist));
}
}
int ObExprInOrNotIn::cg_expr(ObExprCGCtx &expr_cg_ctx,
const ObRawExpr &raw_expr,
ObExpr &rt_expr) const