fix: fix the core induced by check rowkey distinct when rowkey column has virtual generated column
This commit is contained in:
@ -719,8 +719,14 @@ int ObConflictChecker::build_data_table_range(ObNewRange &lookup_range)
|
|||||||
for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_cnt; ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_cnt; ++i) {
|
||||||
ObObj tmp_obj;
|
ObObj tmp_obj;
|
||||||
ObExpr *expr = checker_ctdef_.data_table_rowkey_expr_.at(i);
|
ObExpr *expr = checker_ctdef_.data_table_rowkey_expr_.at(i);
|
||||||
ObDatum &col_datum = expr->locate_expr_datum(eval_ctx_);
|
ObDatum *col_datum = nullptr;
|
||||||
if (OB_FAIL(col_datum.to_obj(tmp_obj, expr->obj_meta_, expr->obj_datum_map_))) {
|
if (OB_ISNULL(expr)) {
|
||||||
|
LOG_WARN("expr in rowkey is nullptr", K(ret), K(i));
|
||||||
|
} else if (OB_FAIL(expr->eval(eval_ctx_, col_datum))) {
|
||||||
|
LOG_WARN("failed to evaluate expr in rowkey", K(ret), K(i));
|
||||||
|
} else if (OB_ISNULL(col_datum)) {
|
||||||
|
LOG_WARN("evaluated column datum in rowkey is nullptr", K(ret), K(i));
|
||||||
|
} else if (OB_FAIL(col_datum->to_obj(tmp_obj, expr->obj_meta_, expr->obj_datum_map_))) {
|
||||||
LOG_WARN("convert datum to obj failed", K(ret));
|
LOG_WARN("convert datum to obj failed", K(ret));
|
||||||
}
|
}
|
||||||
// 这里需要做深拷贝
|
// 这里需要做深拷贝
|
||||||
|
|||||||
@ -953,7 +953,7 @@ struct ObDMLRtCtx
|
|||||||
bool need_non_sub_full_task()
|
bool need_non_sub_full_task()
|
||||||
{ return das_task_status_.need_non_sub_full_task(); }
|
{ return das_task_status_.need_non_sub_full_task(); }
|
||||||
void add_cached_row_size(const int64_t row_size) { cached_row_size_ += row_size; }
|
void add_cached_row_size(const int64_t row_size) { cached_row_size_ += row_size; }
|
||||||
int64_t get_cached_row_size() const { return cached_row_size_; }
|
int64_t get_row_buffer_size() const { return cached_row_size_; }
|
||||||
|
|
||||||
ObDASRef das_ref_;
|
ObDASRef das_ref_;
|
||||||
DasTaskStatus das_task_status_;
|
DasTaskStatus das_task_status_;
|
||||||
|
|||||||
@ -174,9 +174,15 @@ int ObDMLService::check_rowkey_whether_distinct(const ObExprPtrIArray &row,
|
|||||||
ObObj *tmp_obj_ptr = tmp_table_rowkey.get_obj_ptr();
|
ObObj *tmp_obj_ptr = tmp_table_rowkey.get_obj_ptr();
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_cnt; ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_cnt; ++i) {
|
||||||
ObExpr *expr = row.at(i);
|
ObExpr *expr = row.at(i);
|
||||||
ObDatum &col_datum = expr->locate_expr_datum(eval_ctx);
|
ObDatum *col_datum = nullptr;
|
||||||
if (OB_FAIL(col_datum.to_obj(tmp_obj_ptr[i], expr->obj_meta_, expr->obj_datum_map_))) {
|
if (OB_ISNULL(expr)) {
|
||||||
LOG_WARN("convert datum to obj failed", K(ret));
|
LOG_WARN("expr in rowkey is nullptr", K(ret), K(i));
|
||||||
|
} else if (OB_FAIL(expr->eval(eval_ctx, col_datum))) {
|
||||||
|
LOG_WARN("failed to evaluate expr in rowkey", K(ret), K(i));
|
||||||
|
} else if (OB_ISNULL(col_datum)) {
|
||||||
|
LOG_WARN("evaluated column datum in rowkey is nullptr", K(ret), K(i));
|
||||||
|
} else if (OB_FAIL(col_datum->to_obj(tmp_obj_ptr[i], expr->obj_meta_, expr->obj_datum_map_))) {
|
||||||
|
LOG_WARN("convert datum to obj failed", K(ret), K(i));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1561,7 +1567,7 @@ int ObDMLService::write_row_to_das_op(const ObDASDMLBaseCtDef &ctdef,
|
|||||||
if (OB_SUCC(ret) && buffer_full) {
|
if (OB_SUCC(ret) && buffer_full) {
|
||||||
need_retry = true;
|
need_retry = true;
|
||||||
if (REACH_COUNT_INTERVAL(10)) { // print log per 10 times.
|
if (REACH_COUNT_INTERVAL(10)) { // print log per 10 times.
|
||||||
LOG_INFO("DAS write buffer full, ", K(dml_op->get_row_cnt()), K(dml_rtctx.das_ref_.get_das_mem_used()), K(dml_rtctx.get_cached_row_size()));
|
LOG_INFO("DAS write buffer full, ", K(dml_op->get_row_cnt()), K(dml_rtctx.das_ref_.get_das_mem_used()), K(dml_rtctx.get_row_buffer_size()));
|
||||||
}
|
}
|
||||||
dml_rtctx.das_ref_.set_frozen_node();
|
dml_rtctx.das_ref_.set_frozen_node();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1106,9 +1106,9 @@ int ObTableModifyOp::discharge_das_write_buffer()
|
|||||||
if (OB_UNLIKELY(simulate_buffer_size > 0)) {
|
if (OB_UNLIKELY(simulate_buffer_size > 0)) {
|
||||||
buffer_size_limit = simulate_buffer_size;
|
buffer_size_limit = simulate_buffer_size;
|
||||||
}
|
}
|
||||||
if (dml_rtctx_.get_cached_row_size() >= buffer_size_limit) {
|
if (dml_rtctx_.get_row_buffer_size() >= buffer_size_limit) {
|
||||||
LOG_INFO("DASWriteBuffer full, now to write storage",
|
LOG_INFO("DASWriteBuffer full, now to write storage",
|
||||||
"buffer memory", dml_rtctx_.das_ref_.get_das_alloc().used(), K(dml_rtctx_.get_cached_row_size()));
|
"buffer memory", dml_rtctx_.das_ref_.get_das_alloc().used(), K(dml_rtctx_.get_row_buffer_size()));
|
||||||
ret = submit_all_dml_task();
|
ret = submit_all_dml_task();
|
||||||
} else if (execute_single_row_) {
|
} else if (execute_single_row_) {
|
||||||
ret = submit_all_dml_task();
|
ret = submit_all_dml_task();
|
||||||
|
|||||||
Reference in New Issue
Block a user