fix the core of heap table direct load
This commit is contained in:
@ -196,9 +196,9 @@ int ObPDMLOpDataDriver::fill_cache_unitl_cache_full_or_child_iter_end(ObExecCont
|
|||||||
}
|
}
|
||||||
} else if (is_skipped) {
|
} else if (is_skipped) {
|
||||||
//need to skip this row
|
//need to skip this row
|
||||||
} else if ((is_heap_table_insert_ && !is_direct_load) // direct-load generates hidden pk by itself
|
} else if (is_heap_table_insert_
|
||||||
&& OB_FAIL(set_heap_table_hidden_pk(row, tablet_id))) {
|
&& OB_FAIL(set_heap_table_hidden_pk(row, tablet_id, is_direct_load))) {
|
||||||
LOG_WARN("fail to set heap table hidden pk", K(ret), K(*row), K(tablet_id));
|
LOG_WARN("fail to set heap table hidden pk", K(ret), K(*row), K(tablet_id), K(is_direct_load));
|
||||||
} else if (OB_FAIL(cache_.add_row(*row, tablet_id))) {
|
} else if (OB_FAIL(cache_.add_row(*row, tablet_id))) {
|
||||||
if (!with_barrier_ && OB_EXCEED_MEM_LIMIT == ret) {
|
if (!with_barrier_ && OB_EXCEED_MEM_LIMIT == ret) {
|
||||||
// 目前暂时不支持缓存最后一行数据
|
// 目前暂时不支持缓存最后一行数据
|
||||||
@ -406,33 +406,57 @@ int ObPDMLOpDataDriver::switch_row_iter_to_next_partition()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObPDMLOpDataDriver::set_heap_table_hidden_pk(const ObExprPtrIArray *&row, ObTabletID &tablet_id)
|
int ObPDMLOpDataDriver::set_heap_table_hidden_pk(
|
||||||
|
const ObExprPtrIArray *&row,
|
||||||
|
ObTabletID &tablet_id,
|
||||||
|
const bool is_direct_load)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
uint64_t autoinc_seq = 0;
|
uint64_t pk_value = 0;
|
||||||
ObSQLSessionInfo *my_session = eval_ctx_->exec_ctx_.get_my_session();
|
if (!is_direct_load) {
|
||||||
uint64_t tenant_id = my_session->get_effective_tenant_id();
|
uint64_t autoinc_seq = 0;
|
||||||
if (OB_FAIL(ObDMLService::get_heap_table_hidden_pk(tenant_id,
|
ObSQLSessionInfo *my_session = eval_ctx_->exec_ctx_.get_my_session();
|
||||||
tablet_id,
|
uint64_t tenant_id = my_session->get_effective_tenant_id();
|
||||||
autoinc_seq))) {
|
if (OB_FAIL(ObDMLService::get_heap_table_hidden_pk(tenant_id,
|
||||||
LOG_WARN("fail to het hidden pk", K(ret), K(tablet_id), K(tenant_id));
|
tablet_id,
|
||||||
} else {
|
autoinc_seq))) {
|
||||||
ObExpr *auto_inc_expr = nullptr;
|
LOG_WARN("fail to get hidden pk", KR(ret), K(tablet_id), K(tenant_id));
|
||||||
uint64_t next_autoinc_val = 0;
|
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < row->count(); ++i) {
|
|
||||||
if (row->at(i)->type_ == T_TABLET_AUTOINC_NEXTVAL) {
|
|
||||||
auto_inc_expr = row->at(i);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (OB_ISNULL(auto_inc_expr)) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("cannot find tablet autoinc expr", KPC(row));
|
|
||||||
} else {
|
} else {
|
||||||
ObDatum &datum = auto_inc_expr->locate_datum_for_write(*eval_ctx_);
|
pk_value = autoinc_seq;
|
||||||
datum.set_uint(autoinc_seq);
|
}
|
||||||
auto_inc_expr->set_evaluated_projected(*eval_ctx_);
|
} else {
|
||||||
|
// init the datum with a simple value to avoid core in project_storage_row(),
|
||||||
|
// direct-load will generate the real hidden pk later by itself
|
||||||
|
pk_value = 0;
|
||||||
|
}
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
if (OB_FAIL(set_heap_table_hidden_pk_value(row, tablet_id, pk_value))) {
|
||||||
|
LOG_WARN("fail to set heap table hidden pk value", KR(ret), K(tablet_id), K(pk_value));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObPDMLOpDataDriver::set_heap_table_hidden_pk_value(
|
||||||
|
const ObExprPtrIArray *&row,
|
||||||
|
ObTabletID &tablet_id,
|
||||||
|
const uint64_t pk_value)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObExpr *auto_inc_expr = nullptr;
|
||||||
|
for (int64_t i = 0; OB_SUCC(ret) && i < row->count(); ++i) {
|
||||||
|
if (T_TABLET_AUTOINC_NEXTVAL == row->at(i)->type_) {
|
||||||
|
auto_inc_expr = row->at(i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (OB_ISNULL(auto_inc_expr)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("cannot find tablet autoinc expr", KR(ret), KPC(row));
|
||||||
|
} else {
|
||||||
|
ObDatum &datum = auto_inc_expr->locate_datum_for_write(*eval_ctx_);
|
||||||
|
datum.set_uint(pk_value);
|
||||||
|
auto_inc_expr->set_evaluated_projected(*eval_ctx_);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
@ -76,7 +76,12 @@ private:
|
|||||||
int barrier(ObExecContext &ctx);
|
int barrier(ObExecContext &ctx);
|
||||||
int next_row_from_cache_for_returning(const ObExprPtrIArray &row);
|
int next_row_from_cache_for_returning(const ObExprPtrIArray &row);
|
||||||
int write_partitions(ObExecContext &ctx);
|
int write_partitions(ObExecContext &ctx);
|
||||||
int set_heap_table_hidden_pk(const ObExprPtrIArray *&row, common::ObTabletID &tablet_id);
|
int set_heap_table_hidden_pk(const ObExprPtrIArray *&row,
|
||||||
|
common::ObTabletID &tablet_id,
|
||||||
|
const bool is_direct_load = false);
|
||||||
|
int set_heap_table_hidden_pk_value(const ObExprPtrIArray *&row,
|
||||||
|
common::ObTabletID &tablet_id,
|
||||||
|
const uint64_t pk_value);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// 因为 cache 中会缓存多个分区的数据,迭代的过程中需要
|
// 因为 cache 中会缓存多个分区的数据,迭代的过程中需要
|
||||||
|
Reference in New Issue
Block a user