[CP] Fix merge join add row reset datum ptr
This commit is contained in:
@ -768,6 +768,7 @@ int ObRADatumStore::switch_idx_block(bool finish_add /* = false */)
|
|||||||
* 从operator的ObExpr的ObDatum中写入到ObChunkDatumStore时,使用add_row
|
* 从operator的ObExpr的ObDatum中写入到ObChunkDatumStore时,使用add_row
|
||||||
* 理论上只有这两个接口
|
* 理论上只有这两个接口
|
||||||
*/
|
*/
|
||||||
|
template<bool NEED_EVAL>
|
||||||
int ObRADatumStore::add_row(const common::ObIArray<ObExpr*> &exprs,
|
int ObRADatumStore::add_row(const common::ObIArray<ObExpr*> &exprs,
|
||||||
ObEvalCtx *ctx, StoredRow **stored_row)
|
ObEvalCtx *ctx, StoredRow **stored_row)
|
||||||
{
|
{
|
||||||
@ -776,7 +777,7 @@ int ObRADatumStore::add_row(const common::ObIArray<ObExpr*> &exprs,
|
|||||||
if (OB_UNLIKELY(!is_inited())) {
|
if (OB_UNLIKELY(!is_inited())) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("not init", K(ret));
|
LOG_WARN("not init", K(ret));
|
||||||
} else if (OB_FAIL(Block::row_store_size(exprs, *ctx, row_size, row_extend_size_))) {
|
} else if (OB_FAIL(Block::row_store_size<NEED_EVAL>(exprs, *ctx, row_size, row_extend_size_))) {
|
||||||
// row store size确保exprs被计算过
|
// row store size确保exprs被计算过
|
||||||
LOG_WARN("failed to calc store size");
|
LOG_WARN("failed to calc store size");
|
||||||
} else {
|
} else {
|
||||||
@ -802,6 +803,13 @@ int ObRADatumStore::add_row(const common::ObIArray<ObExpr*> &exprs,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template int ObRADatumStore::add_row<true>(const common::ObIArray<ObExpr*> &exprs,
|
||||||
|
ObEvalCtx *ctx,
|
||||||
|
StoredRow **stored_row);
|
||||||
|
template int ObRADatumStore::add_row<false>(const common::ObIArray<ObExpr*> &exprs,
|
||||||
|
ObEvalCtx *ctx,
|
||||||
|
StoredRow **stored_row);
|
||||||
|
|
||||||
int ObRADatumStore::add_row(const common::ObIArray<ObDatum> &datums,
|
int ObRADatumStore::add_row(const common::ObIArray<ObDatum> &datums,
|
||||||
StoredRow **stored_row)
|
StoredRow **stored_row)
|
||||||
{
|
{
|
||||||
|
|||||||
@ -71,6 +71,7 @@ public:
|
|||||||
|
|
||||||
// 这里暂时以ObExpr的数组形式写入数据到DatumStore,主要是为了上层Operator在写入数据时,可以无脑调用ObExpr的插入
|
// 这里暂时以ObExpr的数组形式写入数据到DatumStore,主要是为了上层Operator在写入数据时,可以无脑调用ObExpr的插入
|
||||||
// 可以看下面参数为common::ObDatum **datums的函数进行对比
|
// 可以看下面参数为common::ObDatum **datums的函数进行对比
|
||||||
|
template<bool NEED_EVAL = true>
|
||||||
static inline int row_copy_size(
|
static inline int row_copy_size(
|
||||||
const common::ObIArray<ObExpr*> &exprs, ObEvalCtx &ctx, int64_t &size)
|
const common::ObIArray<ObExpr*> &exprs, ObEvalCtx &ctx, int64_t &size)
|
||||||
{
|
{
|
||||||
@ -78,8 +79,9 @@ public:
|
|||||||
common::ObDatum *datum = nullptr;
|
common::ObDatum *datum = nullptr;
|
||||||
size = DATUM_SIZE * exprs.count();
|
size = DATUM_SIZE * exprs.count();
|
||||||
for (int64_t i = 0; i < exprs.count() && OB_SUCC(ret); ++i) {
|
for (int64_t i = 0; i < exprs.count() && OB_SUCC(ret); ++i) {
|
||||||
if (OB_FAIL(exprs.at(i)->eval(ctx, datum))) {
|
if (NEED_EVAL && OB_FAIL(exprs.at(i)->eval(ctx, datum))) {
|
||||||
SQL_ENG_LOG(WARN, "failed to eval expr datum", K(ret));
|
SQL_ENG_LOG(WARN, "failed to eval expr datum", K(ret));
|
||||||
|
} else if (!NEED_EVAL && FALSE_IT(datum = &exprs.at(i)->locate_expr_datum(ctx))) {
|
||||||
} else {
|
} else {
|
||||||
size += datum->len_;
|
size += datum->len_;
|
||||||
}
|
}
|
||||||
@ -143,6 +145,7 @@ public:
|
|||||||
{
|
{
|
||||||
return sizeof(Block) + row_store_size;
|
return sizeof(Block) + row_store_size;
|
||||||
}
|
}
|
||||||
|
template<bool NEED_EVAL>
|
||||||
static int inline row_store_size(const common::ObIArray<ObExpr*> &exprs,
|
static int inline row_store_size(const common::ObIArray<ObExpr*> &exprs,
|
||||||
ObEvalCtx &ctx,
|
ObEvalCtx &ctx,
|
||||||
int64_t &size,
|
int64_t &size,
|
||||||
@ -150,7 +153,7 @@ public:
|
|||||||
{
|
{
|
||||||
int ret = common::OB_SUCCESS;
|
int ret = common::OB_SUCCESS;
|
||||||
size = 0;
|
size = 0;
|
||||||
if (OB_FAIL(row_copy_size(exprs, ctx, size))) {
|
if (OB_FAIL(row_copy_size<NEED_EVAL>(exprs, ctx, size))) {
|
||||||
SQL_ENG_LOG(WARN, "failed to calc store row size", K(ret));
|
SQL_ENG_LOG(WARN, "failed to calc store row size", K(ret));
|
||||||
} else {
|
} else {
|
||||||
size += ROW_HEAD_SIZE + ROW_INDEX_SIZE + row_extend_size;
|
size += ROW_HEAD_SIZE + ROW_INDEX_SIZE + row_extend_size;
|
||||||
@ -346,6 +349,7 @@ public:
|
|||||||
// Keeping one memory block, reader must call reuse() too.
|
// Keeping one memory block, reader must call reuse() too.
|
||||||
void reuse();
|
void reuse();
|
||||||
|
|
||||||
|
template<bool NEED_EVAL = true>
|
||||||
int add_row(const common::ObIArray<ObExpr*> &exprs, ObEvalCtx *ctx,
|
int add_row(const common::ObIArray<ObExpr*> &exprs, ObEvalCtx *ctx,
|
||||||
StoredRow **stored_row = nullptr);
|
StoredRow **stored_row = nullptr);
|
||||||
int add_row(const common::ObIArray<common::ObDatum> &datums,
|
int add_row(const common::ObIArray<common::ObDatum> &datums,
|
||||||
|
|||||||
@ -1112,7 +1112,8 @@ int ObMergeJoinOp::ChildBatchFetcher::get_next_small_group(int64_t &cmp_res)
|
|||||||
ObRADatumStore::StoredRow *stored_row = NULL;
|
ObRADatumStore::StoredRow *stored_row = NULL;
|
||||||
guard.set_batch_idx(cur_idx_);
|
guard.set_batch_idx(cur_idx_);
|
||||||
guard.set_batch_size(brs_.size_);
|
guard.set_batch_size(brs_.size_);
|
||||||
if (OB_FAIL(datum_store_.add_row(*all_exprs_, &(merge_join_op_.eval_ctx_), &stored_row))) {
|
if (OB_FAIL(datum_store_.add_row<false>(*all_exprs_, &(merge_join_op_.eval_ctx_),
|
||||||
|
&stored_row))) {
|
||||||
LOG_WARN("add row failed", K(ret));
|
LOG_WARN("add row failed", K(ret));
|
||||||
} else if (OB_ISNULL(stored_row)) {
|
} else if (OB_ISNULL(stored_row)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
@ -1182,7 +1183,8 @@ int ObMergeJoinOp::ChildBatchFetcher::get_next_equal_group(JoinRowList &row_list
|
|||||||
if (!greater_found) {
|
if (!greater_found) {
|
||||||
guard.set_batch_idx(cur_idx_);
|
guard.set_batch_idx(cur_idx_);
|
||||||
guard.set_batch_size(batch_size_);
|
guard.set_batch_size(batch_size_);
|
||||||
if (OB_FAIL(datum_store_.add_row(*all_exprs_, &(merge_join_op_.eval_ctx_), &new_stored_row))) {
|
if (OB_FAIL(datum_store_.add_row<false>(*all_exprs_, &(merge_join_op_.eval_ctx_),
|
||||||
|
&new_stored_row))) {
|
||||||
LOG_WARN("add row failed", K(ret));
|
LOG_WARN("add row failed", K(ret));
|
||||||
} else if (OB_ISNULL(new_stored_row)) {
|
} else if (OB_ISNULL(new_stored_row)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
@ -1341,7 +1343,7 @@ int ObMergeJoinOp::store_group_first_row(
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObRADatumStore::StoredRow *res_row = NULL;
|
ObRADatumStore::StoredRow *res_row = NULL;
|
||||||
guard.set_batch_idx(child_fetcher.cur_idx_);
|
guard.set_batch_idx(child_fetcher.cur_idx_);
|
||||||
if (OB_FAIL(child_fetcher.datum_store_.add_row(*child_fetcher.all_exprs_,
|
if (OB_FAIL(child_fetcher.datum_store_.add_row<false>(*child_fetcher.all_exprs_,
|
||||||
&eval_ctx_, &res_row))) {
|
&eval_ctx_, &res_row))) {
|
||||||
LOG_WARN("add row failed", K(ret));
|
LOG_WARN("add row failed", K(ret));
|
||||||
} else if (OB_ISNULL(res_row)) {
|
} else if (OB_ISNULL(res_row)) {
|
||||||
|
|||||||
Reference in New Issue
Block a user