[CP]cherry-pick current_timestamp from 32x
This commit is contained in:
@ -142,9 +142,13 @@ int ObTableApiCacheGuard::create_cache_key(ObTableCtx *tb_ctx)
|
||||
|| operation_type == ObTableOperationType::Type::INSERT_OR_UPDATE
|
||||
|| operation_type == ObTableOperationType::Type::INCREMENT
|
||||
|| operation_type == ObTableOperationType::Type::APPEND) {
|
||||
const ObTableCtx::ObAssignIds &assign_ids = tb_ctx->get_assign_ids();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < assign_ids.count(); i++) {
|
||||
if (OB_FAIL(cache_key_.op_column_ids_.push_back(assign_ids.at(i).column_id_))) {
|
||||
const ObIArray<ObTableAssignment> &assigns = tb_ctx->get_assignments();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < assigns.count(); i++) {
|
||||
const ObTableAssignment &tmp_assign = assigns.at(i);
|
||||
if (OB_ISNULL(tmp_assign.column_item_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("assign column item is null", K(ret), K(tmp_assign));
|
||||
} else if (OB_FAIL(cache_key_.op_column_ids_.push_back(tmp_assign.column_item_->column_id_))) {
|
||||
LOG_WARN("fail to add assign column id", K(ret), K(i));
|
||||
}
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ struct ObTableApiCacheKey: public ObILibCacheKey
|
||||
int64_t schema_version_;
|
||||
ObTableOperationType::Type operation_type_;
|
||||
bool is_ttl_table_;
|
||||
common::ObArray<uint64_t> op_column_ids_;
|
||||
common::ObSEArray<uint64_t, 32> op_column_ids_;
|
||||
};
|
||||
|
||||
class ObTableApiCacheNode: public ObILibCacheNode
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -36,10 +36,6 @@ public:
|
||||
static int generate_exprs(ObTableCtx &ctx,
|
||||
common::ObIAllocator &allocator,
|
||||
ObExprFrameInfo &expr_frame_info);
|
||||
// 基于table schema构造全表列原生列引用表达式和生成列表达式
|
||||
static int generate_column_raw_exprs(ObTableCtx &ctx);
|
||||
// 构造更新需要的表达式
|
||||
static int generate_update_raw_exprs(ObTableCtx &ctx);
|
||||
// 基于原生表达式生成表达式内存布局
|
||||
static int generate_expr_frame_info(ObTableCtx &ctx,
|
||||
common::ObIAllocator &allocator,
|
||||
@ -60,26 +56,13 @@ public:
|
||||
const common::ObIArray<sql::ObExpr *> &delta_exprs,
|
||||
const ObTableEntity &entity);
|
||||
static int refresh_update_exprs_frame(ObTableCtx &ctx,
|
||||
const common::ObIArray<sql::ObExpr *> &old_row,
|
||||
const common::ObIArray<sql::ObExpr *> &new_row,
|
||||
const common::ObIArray<sql::ObExpr *> &full_assign_row,
|
||||
const ObTableCtx::ObAssignIds &assign_ids,
|
||||
const ObTableEntity &entity);
|
||||
static int refresh_insert_up_exprs_frame(ObTableCtx &ctx,
|
||||
const common::ObIArray<sql::ObExpr *> &ins_new_row,
|
||||
const common::ObIArray<sql::ObExpr *> &delta_exprs,
|
||||
const common::ObIArray<sql::ObExpr *> &delta_row,
|
||||
const ObTableEntity &entity);
|
||||
static int refresh_generated_column_related_frame(ObTableCtx &ctx,
|
||||
const common::ObIArray<sql::ObExpr *> &old_row,
|
||||
const common::ObIArray<sql::ObExpr *> &full_assign_row,
|
||||
const ObTableCtx::ObAssignIds &assign_ids,
|
||||
const ObColumnSchemaV2 &col_schema);
|
||||
static int generate_assign_exprs(ObTableCtx &ctx,
|
||||
const ObTableCtx::ObAssignIds &assign_ids,
|
||||
common::ObIArray<sql::ObRawExpr *> &assign_exprs);
|
||||
private:
|
||||
static int init_datum_param_store(ObTableCtx &ctx,
|
||||
int64_t capacity);
|
||||
static int refresh_exprs_frame(ObTableCtx &ctx,
|
||||
const common::ObIArray<sql::ObExpr *> &exprs,
|
||||
const ObTableEntity &entity);
|
||||
@ -90,42 +73,52 @@ private:
|
||||
const common::ObIArray<sql::ObExpr *> &exprs,
|
||||
const ObTableEntity &entity);
|
||||
static int refresh_assign_exprs_frame(ObTableCtx &ctx,
|
||||
const common::ObIArray<sql::ObExpr *> &old_rows,
|
||||
const common::ObIArray<sql::ObExpr *> &new_rows,
|
||||
const common::ObIArray<sql::ObExpr *> &full_assign_rows,
|
||||
const ObTableCtx::ObAssignIds &assign_ids,
|
||||
const common::ObIArray<sql::ObExpr *> &new_row,
|
||||
const ObTableEntity &entity);
|
||||
static int refresh_delta_exprs_frame(ObTableCtx &ctx,
|
||||
const common::ObIArray<sql::ObExpr *> &delta_exprs,
|
||||
const common::ObIArray<sql::ObExpr *> &delta_row,
|
||||
const ObTableEntity &entity);
|
||||
static int generate_full_assign_raw_exprs(ObTableCtx &ctx);
|
||||
static int genreate_filter_exprs(ObTableCtx &ctx);
|
||||
private:
|
||||
// 通过column_name在表达式数组获取列引用表达式
|
||||
static ObRawExpr* get_ref_raw_expr(const common::ObIArray<sql::ObRawExpr *> &all_exprs,
|
||||
const common::ObString &col_name);
|
||||
// 构造生成列表达式
|
||||
static int build_generated_column_expr(ObTableCtx &ctx,
|
||||
sql::ObColumnRefRawExpr &col_expr,
|
||||
const common::ObString &expr_str,
|
||||
const common::ObIArray<sql::ObRawExpr *> &exprs);
|
||||
// 处理生成列表达式
|
||||
static int resolve_generated_column_expr(ObTableCtx &ctx);
|
||||
// 构造列引用原生表达式
|
||||
static int generate_column_ref_raw_expr(ObTableCtx &ctx,
|
||||
const ObColumnSchemaV2 &col_schema,
|
||||
sql::ObRawExpr *&expr);
|
||||
// 构建列自增表达式
|
||||
static int generate_autoinc_nextval_expr(ObTableCtx &ctx,
|
||||
ObRawExpr *&expr,
|
||||
const ObColumnSchemaV2 &col_schema);
|
||||
|
||||
static int build_expire_expr(ObTableCtx &ctx, sql::ObRawExpr *&expire_expr);
|
||||
static int generate_assignments(ObTableCtx &ctx);
|
||||
|
||||
static int generate_filter_exprs(ObTableCtx &ctx);
|
||||
|
||||
static int generate_delta_expr(ObTableCtx &ctx, ObTableAssignment &assign);
|
||||
|
||||
static int generate_assign_expr(ObTableCtx &ctx, ObTableAssignment &assign);
|
||||
|
||||
static int build_generated_column_expr(ObTableCtx &ctx,
|
||||
ObTableColumnItem &item,
|
||||
const ObString &expr_str,
|
||||
sql::ObRawExpr *&expr,
|
||||
sql::ObRawExpr *delta_expr = nullptr);
|
||||
|
||||
static int generate_autoinc_nextval_expr(ObTableCtx &ctx,
|
||||
const ObTableColumnItem &item,
|
||||
sql::ObRawExpr *&expr);
|
||||
|
||||
static int generate_expire_expr(ObTableCtx &ctx, sql::ObRawExpr *&expr);
|
||||
|
||||
static int generate_current_timestamp_expr(ObTableCtx &ctx,
|
||||
const ObTableColumnItem &item,
|
||||
sql::ObRawExpr *&expr);
|
||||
|
||||
static int generate_all_column_exprs(ObTableCtx &ctx);
|
||||
|
||||
static int resolve_exprs(ObTableCtx &ctx);
|
||||
|
||||
static int add_extra_column_exprs(ObTableCtx &ctx);
|
||||
|
||||
static int write_datum(ObTableCtx &ctx,
|
||||
common::ObIAllocator &allocator,
|
||||
const sql::ObExpr &expr,
|
||||
sql::ObEvalCtx &eval_ctx,
|
||||
const ObObj &obj);
|
||||
|
||||
static int write_autoinc_datum(ObTableCtx &ctx,
|
||||
const sql::ObExpr &expr,
|
||||
sql::ObEvalCtx &eval_ctx,
|
||||
const ObObj &obj);
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTableExprCgService);
|
||||
};
|
||||
@ -148,7 +141,6 @@ public:
|
||||
ObTableInsCtDef &ins_ctdef);
|
||||
static int generate_update_ctdef(ObTableCtx &ctx,
|
||||
ObIAllocator &allocator,
|
||||
const ObTableCtx::ObAssignIds &assign_ids,
|
||||
ObTableUpdCtDef &upd_ctdef);
|
||||
static int generate_delete_ctdef(ObTableCtx &ctx,
|
||||
ObIAllocator &allocator,
|
||||
@ -158,7 +150,6 @@ public:
|
||||
ObTableReplaceCtDef &replace_ctdef);
|
||||
static int generate_insert_up_ctdef(ObTableCtx &ctx,
|
||||
ObIAllocator &allocator,
|
||||
const ObTableCtx::ObAssignIds &assign_ids,
|
||||
ObTableInsUpdCtDef &ins_up_ctdef);
|
||||
static int generate_lock_ctdef(ObTableCtx &ctx,
|
||||
ObTableLockCtDef &lock_ctdef);
|
||||
@ -173,16 +164,13 @@ private:
|
||||
ObTableDmlBaseCtDef &base_ctdef,
|
||||
common::ObIArray<sql::ObRawExpr*> &old_row,
|
||||
common::ObIArray<sql::ObRawExpr*> &new_row);
|
||||
static int generate_column_ids(ObTableCtx &ctx,
|
||||
const common::ObIArray<sql::ObRawExpr*> &exprs,
|
||||
common::ObIArray<uint64_t> &column_ids);
|
||||
static int generate_column_ids(ObTableCtx &ctx, common::ObIArray<uint64_t> &column_ids);
|
||||
static int generate_das_ins_ctdef(ObTableCtx &ctx,
|
||||
uint64_t index_tid,
|
||||
sql::ObDASInsCtDef &das_ins_ctdef,
|
||||
const common::ObIArray<sql::ObRawExpr*> &new_row);
|
||||
static int generate_das_upd_ctdef(ObTableCtx &ctx,
|
||||
uint64_t index_tid,
|
||||
const common::ObIArray<sql::ObRawExpr *> &assign_exprs,
|
||||
sql::ObDASUpdCtDef &das_upd_ctdef,
|
||||
const common::ObIArray<sql::ObRawExpr*> &old_row,
|
||||
const common::ObIArray<sql::ObRawExpr*> &new_row,
|
||||
@ -196,12 +184,9 @@ private:
|
||||
sql::ObDASLockCtDef &das_lock_ctdef,
|
||||
const common::ObIArray<sql::ObRawExpr*> &old_row);
|
||||
static int generate_updated_column_ids(ObTableCtx &ctx,
|
||||
const common::ObIArray<sql::ObRawExpr *> &assign_exprs,
|
||||
const common::ObIArray<uint64_t> &column_ids,
|
||||
common::ObIArray<uint64_t> &updated_column_ids);
|
||||
static int generate_upd_assign_infos(ObTableCtx &ctx,
|
||||
ObIAllocator &allocator,
|
||||
const common::ObIArray<sql::ObRawExpr *> &assign_exprs,
|
||||
ObTableUpdCtDef &udp_ctdef);
|
||||
static int generate_das_base_ctdef(uint64_t index_tid,
|
||||
ObTableCtx &ctx,
|
||||
@ -223,7 +208,6 @@ private:
|
||||
sql::DASInsCtDefArray &ins_ctdefs);
|
||||
static int generate_related_upd_ctdef(ObTableCtx &ctx,
|
||||
ObIAllocator &allocator,
|
||||
const common::ObIArray<sql::ObRawExpr *> &assign_exprs,
|
||||
const common::ObIArray<sql::ObRawExpr*> &old_row,
|
||||
const common::ObIArray<sql::ObRawExpr*> &new_row,
|
||||
const common::ObIArray<sql::ObRawExpr*> &full_row,
|
||||
@ -232,11 +216,13 @@ private:
|
||||
ObIAllocator &allocator,
|
||||
const common::ObIArray<sql::ObRawExpr*> &old_row,
|
||||
sql::DASDelCtDefArray &del_ctdefs);
|
||||
static int get_rowkey_exprs(ObTableCtx &ctx,
|
||||
common::ObIArray<sql::ObRawExpr*> &rowkey_exprs);
|
||||
|
||||
static int get_rowkey_exprs(ObTableCtx &ctx, common::ObIArray<sql::ObRawExpr*> &rowkey_exprs);
|
||||
|
||||
static int generate_table_rowkey_info(ObTableCtx &ctx,
|
||||
ObTableInsCtDef &ins_ctdef);
|
||||
static int generate_tsc_ctdef(ObTableCtx &ctx,
|
||||
common::ObIArray<sql::ObRawExpr *> &access_exprs,
|
||||
sql::ObDASScanCtDef &tsc_ctdef);
|
||||
static int generate_single_constraint_info(ObTableCtx &ctx,
|
||||
const share::schema::ObTableSchema &index_schema,
|
||||
@ -247,7 +233,7 @@ private:
|
||||
static int generate_constraint_ctdefs(ObTableCtx &ctx,
|
||||
ObIAllocator &allocator,
|
||||
sql::ObRowkeyCstCtdefArray &cst_ctdefs);
|
||||
static int replace_exprs_with_dependant(const common::ObIArray<sql::ObRawExpr *> &src_exprs,
|
||||
static int replace_exprs_with_dependant(ObTableCtx &ctx,
|
||||
common::ObIArray<sql::ObRawExpr *> &dst_exprs);
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTableDmlCgService);
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -17,6 +17,7 @@
|
||||
#include "sql/resolver/expr/ob_raw_expr_util.h"
|
||||
#include "sql/engine/dml/ob_dml_ctx_define.h"
|
||||
#include "sql/das/ob_das_scan_op.h" // for ObDASScanRtDef
|
||||
#include "sql/resolver/dml/ob_dml_stmt.h"
|
||||
#include "share/table/ob_table.h"
|
||||
#include "ob_table_session_pool.h"
|
||||
|
||||
@ -24,23 +25,64 @@ namespace oceanbase
|
||||
{
|
||||
namespace table
|
||||
{
|
||||
|
||||
// 用于存放ctx下自增列的信息
|
||||
struct ObTableAutoInc
|
||||
struct ObTableColumnItem : public sql::ColumnItem
|
||||
{
|
||||
public:
|
||||
ObTableAutoInc()
|
||||
: param_(),
|
||||
auto_inc_column_id_(OB_INVALID_ID),
|
||||
auto_inc_column_name_()
|
||||
{
|
||||
}
|
||||
TO_STRING_KV(K_(param),
|
||||
K_(auto_inc_column_id),
|
||||
K_(auto_inc_column_name));
|
||||
AutoincParam param_;
|
||||
uint64_t auto_inc_column_id_;
|
||||
ObString auto_inc_column_name_;
|
||||
ObTableColumnItem()
|
||||
: sql::ColumnItem(),
|
||||
raw_expr_(nullptr),
|
||||
is_generated_column_(false),
|
||||
is_stored_generated_column_(false),
|
||||
is_virtual_generated_column_(false),
|
||||
is_auto_increment_(false)
|
||||
{}
|
||||
TO_STRING_KV("ColumnItem", static_cast<const sql::ColumnItem &>(*this),
|
||||
KPC_(raw_expr),
|
||||
K_(is_generated_column),
|
||||
K_(is_stored_generated_column),
|
||||
K_(is_virtual_generated_column),
|
||||
K_(cascaded_column_ids),
|
||||
K_(generated_expr_str),
|
||||
K_(dependant_exprs),
|
||||
K_(is_auto_increment));
|
||||
sql::ObRawExpr *raw_expr_; // column ref expr or calculate expr
|
||||
bool is_generated_column_;
|
||||
bool is_stored_generated_column_;
|
||||
bool is_virtual_generated_column_;
|
||||
common::ObSEArray<uint64_t, 8> cascaded_column_ids_;
|
||||
// default equal item.default_value_.get_string()
|
||||
// specific value in append and increment operation
|
||||
common::ObString generated_expr_str_;
|
||||
common::ObSEArray<sql::ObRawExpr*, 8, common::ModulePageAllocator, true> dependant_exprs_;
|
||||
bool is_auto_increment_;
|
||||
};
|
||||
|
||||
struct ObTableAssignment : public sql::ObAssignment
|
||||
{
|
||||
ObTableAssignment()
|
||||
: sql::ObAssignment(),
|
||||
column_item_(nullptr),
|
||||
is_inc_or_append_(false),
|
||||
delta_expr_(nullptr),
|
||||
is_assigned_(false)
|
||||
{}
|
||||
ObTableAssignment(ObTableColumnItem *item)
|
||||
: sql::ObAssignment(),
|
||||
column_item_(item),
|
||||
is_inc_or_append_(false),
|
||||
delta_expr_(nullptr),
|
||||
is_assigned_(false)
|
||||
{}
|
||||
TO_STRING_KV("ObAssignment", static_cast<const sql::ObAssignment &>(*this),
|
||||
KPC_(column_item),
|
||||
K_(is_inc_or_append),
|
||||
KPC_(delta_expr),
|
||||
K_(assign_value),
|
||||
K_(is_assigned));
|
||||
ObTableColumnItem *column_item_;
|
||||
bool is_inc_or_append_; // for append/increment
|
||||
sql::ObColumnRefRawExpr *delta_expr_; // for append/increment
|
||||
common::ObObj assign_value_;
|
||||
bool is_assigned_; // did user assign specific value or not
|
||||
};
|
||||
|
||||
enum ObTableExecutorType
|
||||
@ -62,19 +104,6 @@ enum ObTableExecutorType
|
||||
// 2.在try_process()中进行初始化
|
||||
class ObTableCtx
|
||||
{
|
||||
public:
|
||||
struct ObAssignId {
|
||||
ObAssignId()
|
||||
: idx_(OB_INVALID_ID),
|
||||
column_id_(OB_INVALID_ID)
|
||||
{}
|
||||
TO_STRING_KV("index", idx_,
|
||||
"column_id", column_id_);
|
||||
uint64_t idx_;
|
||||
uint64_t column_id_;
|
||||
};
|
||||
typedef common::ObFixedArray<ObAssignId, common::ObIAllocator> ObAssignIds;
|
||||
typedef std::pair<sql::ObColumnRefRawExpr*, common::ObArray<sql::ObRawExpr*>> ObGenDenpendantsPair;
|
||||
public:
|
||||
explicit ObTableCtx(common::ObIAllocator &allocator)
|
||||
: allocator_(allocator),
|
||||
@ -84,11 +113,8 @@ public:
|
||||
expr_factory_(allocator_),
|
||||
all_exprs_(false),
|
||||
loc_meta_(allocator_),
|
||||
assign_ids_(allocator_),
|
||||
agg_cell_proj_(allocator_),
|
||||
auto_inc_param_(),
|
||||
has_auto_inc_(false),
|
||||
all_column_ref_exprs_()
|
||||
has_auto_inc_(false)
|
||||
{
|
||||
// common
|
||||
is_init_ = false;
|
||||
@ -134,6 +160,8 @@ public:
|
||||
K_(index_tablet_id),
|
||||
K_(ls_id),
|
||||
K_(tenant_schema_version),
|
||||
K_(column_items),
|
||||
K_(assigns),
|
||||
// scan to string
|
||||
K_(is_scan),
|
||||
K_(is_index_scan),
|
||||
@ -172,6 +200,9 @@ public:
|
||||
OB_INLINE sql::ObExecContext& get_exec_ctx() { return exec_ctx_; }
|
||||
OB_INLINE sql::ObRawExprFactory& get_expr_factory() { return expr_factory_; }
|
||||
OB_INLINE sql::ObRawExprUniqueSet& get_all_exprs() { return all_exprs_; }
|
||||
OB_INLINE ObIArray<sql::ObRawExpr *>& get_all_exprs_array() {
|
||||
return const_cast<ObIArray<ObRawExpr *> &>(all_exprs_.get_expr_array());
|
||||
}
|
||||
OB_INLINE sql::ObSQLSessionInfo& get_session_info()
|
||||
{ return sess_guard_.get_sess_info();}
|
||||
OB_INLINE const sql::ObSQLSessionInfo& get_session_info() const
|
||||
@ -179,6 +210,10 @@ public:
|
||||
OB_INLINE int64_t get_tenant_schema_version() const { return tenant_schema_version_; }
|
||||
OB_INLINE ObTableOperationType::Type get_opertion_type() const { return operation_type_; }
|
||||
OB_INLINE bool is_init() const { return is_init_; }
|
||||
OB_INLINE const ObIArray<ObTableColumnItem>& get_column_items() const { return column_items_; }
|
||||
OB_INLINE ObIArray<ObTableColumnItem>& get_column_items() { return column_items_; }
|
||||
OB_INLINE const ObIArray<ObTableAssignment>& get_assignments() const { return assigns_; }
|
||||
OB_INLINE ObIArray<ObTableAssignment>& get_assignments() { return assigns_; }
|
||||
// for scan
|
||||
OB_INLINE bool is_scan() const { return is_scan_; }
|
||||
OB_INLINE bool is_index_scan() const { return is_index_scan_; }
|
||||
@ -202,16 +237,15 @@ public:
|
||||
OB_INLINE const common::ObIArray<common::ObString>& get_query_col_names() const { return query_col_names_; }
|
||||
// for update
|
||||
OB_INLINE bool is_for_update() const { return is_for_update_; }
|
||||
OB_INLINE const common::ObIArray<common::ObString>& get_expr_strs() const { return expr_strs_; }
|
||||
OB_INLINE bool is_inc_or_append() const
|
||||
{
|
||||
return ObTableOperationType::Type::APPEND == operation_type_
|
||||
|| ObTableOperationType::Type::INCREMENT == operation_type_;
|
||||
}
|
||||
OB_INLINE ObIArray<sql::ObRawExpr *>& get_old_row_exprs() { return old_row_exprs_; }
|
||||
OB_INLINE ObIArray<sql::ObRawExpr *>& get_full_assign_exprs() { return full_assign_exprs_; }
|
||||
OB_INLINE ObIArray<sql::ObRawExpr *>& get_delta_exprs() { return delta_exprs_; }
|
||||
OB_INLINE const ObAssignIds& get_assign_ids() const { return assign_ids_; }
|
||||
OB_INLINE bool is_dml() const
|
||||
{
|
||||
return ObTableOperationType::Type::GET != operation_type_ && !is_scan_;
|
||||
}
|
||||
// for dml
|
||||
OB_INLINE const ObIArray<common::ObTableID>& get_related_index_ids() const { return related_index_ids_; }
|
||||
OB_INLINE bool is_for_insertup() const { return is_for_insertup_; }
|
||||
@ -228,24 +262,11 @@ public:
|
||||
OB_INLINE bool return_affected_entity() const { return return_affected_entity_;}
|
||||
OB_INLINE bool return_rowkey() const { return return_rowkey_;}
|
||||
OB_INLINE uint64_t get_cur_cluster_version() const { return cur_cluster_version_;}
|
||||
OB_INLINE common::ObIArray<ObGenDenpendantsPair>& get_gen_dependants_pairs()
|
||||
{
|
||||
return gen_dependants_pairs_;
|
||||
}
|
||||
OB_INLINE const common::ObIArray<ObGenDenpendantsPair>& get_gen_dependants_pairs() const
|
||||
{
|
||||
return gen_dependants_pairs_;
|
||||
}
|
||||
OB_INLINE bool has_generated_column() const { return table_schema_->has_generated_column(); }
|
||||
// for aggregate
|
||||
OB_INLINE const common::ObIArray<uint64_t> &get_agg_projs() const { return agg_cell_proj_; }
|
||||
// for auto inc
|
||||
OB_INLINE uint64_t get_auto_inc_column_id() { return auto_inc_param_.auto_inc_column_id_; }
|
||||
OB_INLINE ObString get_auto_inc_column_name() { return auto_inc_param_.auto_inc_column_name_; }
|
||||
OB_INLINE ObPhysicalPlanCtx *get_physical_plan_ctx() { return exec_ctx_.get_physical_plan_ctx(); }
|
||||
OB_INLINE bool has_auto_inc() { return has_auto_inc_; }
|
||||
// for rowkey constraint info
|
||||
OB_INLINE common::ObIArray<ObColumnRefRawExpr*> &get_all_column_ref_exprs() { return all_column_ref_exprs_; }
|
||||
//////////////////////////////////////// setter ////////////////////////////////////////////////
|
||||
// for common
|
||||
OB_INLINE void set_init_flag(bool is_init) { is_init_ = is_init; }
|
||||
@ -261,8 +282,6 @@ public:
|
||||
// for htable
|
||||
OB_INLINE void set_batch_operation(const ObTableBatchOperation *batch_op) { batch_op_ = batch_op; }
|
||||
// for auto inc
|
||||
OB_INLINE void set_auto_inc_column_id(const uint64_t &auto_inc_column_id) { auto_inc_param_.auto_inc_column_id_ = auto_inc_column_id; }
|
||||
OB_INLINE void set_auto_inc_column_name(const ObString &auto_inc_column_name) { auto_inc_param_.auto_inc_column_name_ = auto_inc_column_name; }
|
||||
OB_INLINE bool need_auto_inc_expr()
|
||||
{
|
||||
// delete/update/get/scan操作只需要生成列引用表达式
|
||||
@ -311,14 +330,19 @@ public:
|
||||
int init_trans(transaction::ObTxDesc *trans_desc,
|
||||
const transaction::ObTxReadSnapshot &tx_snapshot);
|
||||
int init_das_context(ObDASCtx &das_ctx);
|
||||
int init_physical_plan_ctx(int64_t timeout_ts, int64_t tenant_schema_version);
|
||||
// 更新全局自增值
|
||||
int update_auto_inc_value();
|
||||
// init table context for ttl operation
|
||||
bool is_ttl_table() const { return is_ttl_table_; }
|
||||
|
||||
void set_is_ttl_table(bool is_ttl_table) { is_ttl_table_ = is_ttl_table; }
|
||||
int init_phy_plan_ctx();
|
||||
int init_ttl_delete(ObRowkey &start_key);
|
||||
int get_column_item_by_column_id(uint64_t column_id, const ObTableColumnItem *&item) const;
|
||||
int get_column_item_by_expr(sql::ObRawExpr *raw_expr, const ObTableColumnItem *&item) const;
|
||||
int get_column_item_by_expr(sql::ObColumnRefRawExpr *expr, const ObTableColumnItem *&item) const;
|
||||
int get_expr_from_column_items(const common::ObString &col_name, sql::ObRawExpr *&expr) const;
|
||||
int get_expr_from_assignments(const common::ObString &col_name, sql::ObRawExpr *&expr) const;
|
||||
public:
|
||||
// convert lob的allocator需要保证obj写入表达式后才能析构
|
||||
static int convert_lob(common::ObIAllocator &allocator, ObObj &obj);
|
||||
@ -336,8 +360,8 @@ private:
|
||||
// for dml
|
||||
int init_dml_related_tid();
|
||||
// for update
|
||||
int init_assign_ids(ObAssignIds &assign_ids,
|
||||
const ObTableEntity &entity);
|
||||
int init_assignments(const ObTableEntity &entity);
|
||||
int add_stored_generated_column_assignment(const ObTableAssignment &assign);
|
||||
// Init size of aggregation project array.
|
||||
//
|
||||
// @param [in] size The agg size
|
||||
@ -350,17 +374,12 @@ private:
|
||||
// @return Returns OB_SUCCESS on success, error code otherwise.
|
||||
int add_aggregate_proj(int64_t cell_idx, const common::ObString &column_name, const ObIArray<ObTableAggregation> &aggregations);
|
||||
|
||||
AutoincParam &get_auto_inc_param() { return auto_inc_param_.param_; }
|
||||
|
||||
// Add auto inc param to phy_plan_ctx.
|
||||
//
|
||||
// @param [in] phy_plan_ctx The phy_plan_ctx.
|
||||
// @return Returns OB_SUCCESS on success, error code otherwise.
|
||||
int add_auto_inc_param(ObPhysicalPlanCtx &phy_plan_ctx);
|
||||
int add_auto_inc_param(const share::schema::ObColumnSchemaV2 &column_schema);
|
||||
|
||||
private:
|
||||
int construct_column_items();
|
||||
int cons_column_type(const share::schema::ObColumnSchemaV2 &column_schema,
|
||||
sql::ObExprResType &column_type);
|
||||
sql::ObExprResType &column_type);
|
||||
int adjust_column_type(const ObExprResType &column_type, ObObj &obj);
|
||||
int adjust_column(const ObColumnSchemaV2 &col_schema, ObObj &obj);
|
||||
int adjust_rowkey();
|
||||
@ -401,6 +420,8 @@ private:
|
||||
ObTableApiSessGuard sess_guard_;
|
||||
sql::ObDASTableLocMeta loc_meta_;
|
||||
int64_t tenant_schema_version_;
|
||||
common::ObSEArray<ObTableColumnItem, 8> column_items_;
|
||||
common::ObSEArray<ObTableAssignment, 8> assigns_;
|
||||
// for scan
|
||||
bool is_scan_;
|
||||
bool is_index_scan_;
|
||||
@ -409,38 +430,30 @@ private:
|
||||
bool is_get_;
|
||||
bool read_latest_; // default true, false in single get and multi get
|
||||
common::ObQueryFlag::ScanOrder scan_order_;
|
||||
common::ObArray<sql::ObRawExpr*> select_exprs_;
|
||||
common::ObArray<sql::ObRawExpr*> rowkey_exprs_;
|
||||
common::ObArray<sql::ObRawExpr*> index_exprs_;
|
||||
common::ObArray<sql::ObRawExpr*> filter_exprs_;
|
||||
common::ObArray<uint64_t> select_col_ids_; // 基于schema序的select column id
|
||||
common::ObArray<uint64_t> query_col_ids_; // 用户查询的select column id
|
||||
common::ObArray<common::ObString> query_col_names_; // 用户查询的select column name,引用的是schema上的列名
|
||||
common::ObArray<uint64_t> index_col_ids_;
|
||||
common::ObSEArray<sql::ObRawExpr*, 32> select_exprs_;
|
||||
common::ObSEArray<sql::ObRawExpr*, 16> rowkey_exprs_;
|
||||
common::ObSEArray<sql::ObRawExpr*, 16> index_exprs_;
|
||||
common::ObSEArray<sql::ObRawExpr*, 8> filter_exprs_;
|
||||
common::ObSEArray<uint64_t, 32> select_col_ids_; // 基于schema序的select column id
|
||||
common::ObSEArray<uint64_t, 32> query_col_ids_; // 用户查询的select column id
|
||||
common::ObSEArray<common::ObString, 32> query_col_names_; // 用户查询的select column name,引用的是schema上的列名
|
||||
common::ObSEArray<uint64_t, 16> index_col_ids_;
|
||||
const share::schema::ObTableSchema *index_schema_;
|
||||
int64_t offset_;
|
||||
int64_t limit_;
|
||||
common::ObSEArray<common::ObNewRange, 16> key_ranges_;
|
||||
// for generate column
|
||||
common::ObArray<ObGenDenpendantsPair> gen_dependants_pairs_; // 生成列及其依赖列数组
|
||||
// for update
|
||||
bool is_for_update_;
|
||||
ObTableOperationType::Type operation_type_;
|
||||
common::ObArray<sql::ObRawExpr*> old_row_exprs_;
|
||||
common::ObArray<sql::ObRawExpr*> full_assign_exprs_;
|
||||
ObAssignIds assign_ids_;
|
||||
// agg cell index in schema
|
||||
common::ObFixedArray<uint64_t, common::ObIAllocator> agg_cell_proj_;
|
||||
// for auto inc
|
||||
ObTableAutoInc auto_inc_param_;
|
||||
bool has_auto_inc_;
|
||||
// for increment/append
|
||||
common::ObSEArray<common::ObString, 8> expr_strs_;
|
||||
common::ObArray<sql::ObRawExpr*> delta_exprs_; // for increment/append
|
||||
bool return_affected_entity_;
|
||||
bool return_rowkey_;
|
||||
// for dml
|
||||
common::ObSEArray<common::ObTableID, 4, common::ModulePageAllocator, true> related_index_ids_;
|
||||
common::ObSEArray<common::ObTableID, 16> related_index_ids_;
|
||||
bool is_for_insertup_;
|
||||
ObTableEntityType entity_type_;
|
||||
const ObITableEntity *entity_;
|
||||
@ -448,8 +461,6 @@ private:
|
||||
const ObTableBatchOperation *batch_op_;
|
||||
// for lob adapt
|
||||
uint64_t cur_cluster_version_;
|
||||
// for rowkey constraint info
|
||||
common::ObSEArray<ObColumnRefRawExpr*, 8, common::ModulePageAllocator, true> all_column_ref_exprs_;
|
||||
bool is_ttl_table_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTableCtx);
|
||||
@ -563,8 +574,7 @@ public:
|
||||
ObTableUpdCtDef(common::ObIAllocator &alloc)
|
||||
: ObTableDmlBaseCtDef(alloc),
|
||||
full_row_(alloc),
|
||||
full_assign_row_(alloc),
|
||||
delta_exprs_(alloc),
|
||||
delta_row_(alloc),
|
||||
das_ctdef_(alloc),
|
||||
assign_columns_(alloc),
|
||||
related_ctdefs_(alloc),
|
||||
@ -576,14 +586,12 @@ public:
|
||||
{
|
||||
}
|
||||
TO_STRING_KV(K_(full_row),
|
||||
K_(full_assign_row),
|
||||
K_(delta_exprs),
|
||||
K_(delta_row),
|
||||
K_(das_ctdef),
|
||||
K_(assign_columns),
|
||||
K_(related_ctdefs));
|
||||
ExprFixedArray full_row_;
|
||||
ExprFixedArray full_assign_row_;
|
||||
ExprFixedArray delta_exprs_; // for increment/append
|
||||
ExprFixedArray delta_row_;
|
||||
ObDASUpdCtDef das_ctdef_;
|
||||
ColContentFixedArray assign_columns_;
|
||||
DASUpdCtDefArray related_ctdefs_;
|
||||
|
@ -371,8 +371,6 @@ int ObTableApiExecuteP::process_get()
|
||||
ObNewRow *row = nullptr;
|
||||
if (OB_FAIL(check_arg2())) {
|
||||
LOG_WARN("fail to check arg", K(ret));
|
||||
} else if (OB_FAIL(init_tb_ctx())) {
|
||||
LOG_WARN("fail to init table ctx", K(ret));
|
||||
} else if (OB_FAIL(init_read_trans(arg_.consistency_level_,
|
||||
tb_ctx_.get_ls_id(),
|
||||
tb_ctx_.get_timeout_ts()))) {
|
||||
|
@ -72,18 +72,9 @@ void ObTableApiExecutor::clear_evaluated_flag()
|
||||
ObExprFrameInfo *expr_info = const_cast<ObExprFrameInfo *>(tb_ctx_.get_expr_frame_info());
|
||||
if (OB_NOT_NULL(expr_info)) {
|
||||
for (int64_t i = 0; i < expr_info->rt_exprs_.count(); i++) {
|
||||
if (!tb_ctx_.has_auto_inc()) { // 如果是inc/append场景下进行了自增操作,则不应该清自增列转换的flag,特判
|
||||
const ObExpr &expr = expr_info->rt_exprs_.at(i);
|
||||
if (expr.type_ != T_FUN_SYS_AUTOINC_NEXTVAL) {
|
||||
expr_info->rt_exprs_.at(i).clear_evaluated_flag(eval_ctx_);
|
||||
} else {
|
||||
if (expr_info->rt_exprs_.at(i).type_ == T_FUN_COLUMN_CONV) {
|
||||
if (expr_info->rt_exprs_.at(i).args_[4]->type_ == T_FUN_SYS_AUTOINC_NEXTVAL) {
|
||||
// do nothing
|
||||
} else {
|
||||
expr_info->rt_exprs_.at(i).clear_evaluated_flag(eval_ctx_);
|
||||
}
|
||||
} else {
|
||||
expr_info->rt_exprs_.at(i).clear_evaluated_flag(eval_ctx_);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -87,13 +87,12 @@ int ObTableApiInsertUpExecutor::refresh_exprs_frame(const ObTableEntity *entity)
|
||||
const ObTableInsCtDef &ins_ctdef = insert_up_spec_.get_ctdef().ins_ctdef_;
|
||||
const ObTableUpdCtDef &upd_ctdef = insert_up_spec_.get_ctdef().upd_ctdef_;
|
||||
|
||||
clear_evaluated_flag();
|
||||
if (OB_ISNULL(entity)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("entity is null", K(ret));
|
||||
} else if (OB_FAIL(ObTableExprCgService::refresh_insert_up_exprs_frame(tb_ctx_,
|
||||
ins_ctdef.new_row_,
|
||||
upd_ctdef.delta_exprs_,
|
||||
upd_ctdef.delta_row_,
|
||||
*entity))) {
|
||||
LOG_WARN("fail to refresh insert up exprs frame", K(ret), K(*entity));
|
||||
}
|
||||
@ -108,8 +107,11 @@ int ObTableApiInsertUpExecutor::get_next_row_from_child()
|
||||
|
||||
if (cur_idx_ >= 1) {
|
||||
ret = OB_ITER_END;
|
||||
} else if (OB_FAIL(refresh_exprs_frame(entity))) {
|
||||
LOG_WARN("fail to refresh exprs frame", K(ret));
|
||||
} else {
|
||||
clear_evaluated_flag();
|
||||
if (OB_FAIL(refresh_exprs_frame(entity))) {
|
||||
LOG_WARN("fail to refresh exprs frame", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -158,6 +160,21 @@ int ObTableApiInsertUpExecutor::try_update_row()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableApiInsertUpExecutor::cache_insert_row()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObExprPtrIArray &new_row_exprs = get_primary_table_new_row();
|
||||
|
||||
if (OB_FAIL(ObChunkDatumStore::StoredRow::build(insert_row_, new_row_exprs, eval_ctx_, allocator_))) {
|
||||
LOG_WARN("fail to build stored row", K(ret), K(new_row_exprs));
|
||||
} else if (OB_ISNULL(insert_row_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("cache insert row is null", K(ret));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
// 通过主键在conflict_checker_中找到冲突旧行,执行更新
|
||||
// 注意,这里更新后还可能出现二级索引冲突,eg:
|
||||
// create table t (C1 int, C2 varchar(10), primary key(C1), UNIQUE KEY idx_c2 (C2));
|
||||
@ -170,45 +187,32 @@ int ObTableApiInsertUpExecutor::do_insert_up_cache()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSEArray<ObConflictValue, 1> constraint_values;
|
||||
bool is_skipped = false;
|
||||
ObChunkDatumStore::StoredRow *insert_row = NULL;
|
||||
ObTableUpdRtDef &upd_rtdef = insert_up_rtdef_.upd_rtdef_;
|
||||
const ObTableEntity *entity = static_cast<const ObTableEntity*>(tb_ctx_.get_entity());
|
||||
const ObExprPtrIArray &new_row_exprs = get_primary_table_insert_row();
|
||||
|
||||
// new_row_exprs因为冲突已经被conflict_checker_刷为冲突行,因此需要重新刷一遍
|
||||
if (OB_FAIL(refresh_exprs_frame(entity))) {
|
||||
LOG_WARN("fail to refresh exprs frame", K(ret));
|
||||
} else if (OB_FAIL(ObChunkDatumStore::StoredRow::build(insert_row, new_row_exprs, eval_ctx_, allocator_))) {
|
||||
LOG_WARN("fail to build stored row", K(ret), K(new_row_exprs));
|
||||
if (OB_ISNULL(insert_row_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("insert row is NULL", K(ret));
|
||||
} else if (OB_FAIL(conflict_checker_.check_duplicate_rowkey(insert_row_, constraint_values, true))) {
|
||||
LOG_WARN("fail to check duplicated key", K(ret), KPC_(insert_row));
|
||||
} else {
|
||||
if (OB_ISNULL(insert_row)) {
|
||||
upd_rtdef.found_rows_++;
|
||||
const ObChunkDatumStore::StoredRow *upd_new_row = insert_row_;
|
||||
const ObChunkDatumStore::StoredRow *upd_old_row = constraint_values.at(0).current_datum_row_;
|
||||
if (OB_ISNULL(upd_old_row)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("insert row is NULL", K(ret));
|
||||
} else if (OB_FAIL(conflict_checker_.check_duplicate_rowkey(insert_row,
|
||||
constraint_values,
|
||||
true))) {
|
||||
LOG_WARN("fail to check duplicated key", K(ret), KPC(insert_row));
|
||||
} else {
|
||||
upd_rtdef.found_rows_++;
|
||||
const ObChunkDatumStore::StoredRow *upd_new_row = insert_row;
|
||||
const ObChunkDatumStore::StoredRow *upd_old_row = constraint_values.at(0).current_datum_row_;
|
||||
if (OB_ISNULL(upd_old_row)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("upd_old_row is NULL", K(ret));
|
||||
} else if (OB_FAIL(check_whether_row_change(*upd_old_row,
|
||||
*upd_new_row,
|
||||
insert_up_spec_.get_ctdef().upd_ctdef_,
|
||||
is_row_changed_))) {
|
||||
LOG_WARN("fail to check whether row change", K(ret));
|
||||
} else if (is_row_changed_) {
|
||||
// do update
|
||||
clear_evaluated_flag();
|
||||
if (OB_FAIL(conflict_checker_.update_row(upd_new_row, upd_old_row))) {
|
||||
LOG_WARN("fail to update row in conflict_checker", K(ret), KPC(upd_new_row), KPC(upd_old_row));
|
||||
} else {
|
||||
upd_changed_rows_++;
|
||||
}
|
||||
LOG_WARN("upd_old_row is NULL", K(ret));
|
||||
} else if (OB_FAIL(check_whether_row_change(*upd_old_row,
|
||||
*upd_new_row,
|
||||
insert_up_spec_.get_ctdef().upd_ctdef_,
|
||||
is_row_changed_))) {
|
||||
LOG_WARN("fail to check whether row change", K(ret));
|
||||
} else if (is_row_changed_) {
|
||||
// do update
|
||||
clear_evaluated_flag();
|
||||
if (OB_FAIL(conflict_checker_.update_row(upd_new_row, upd_old_row))) {
|
||||
LOG_WARN("fail to update row in conflict_checker", K(ret), KPC(upd_new_row), KPC(upd_old_row));
|
||||
} else {
|
||||
upd_changed_rows_++;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -258,7 +262,6 @@ int ObTableApiInsertUpExecutor::do_update(const ObRowkey &constraint_rowkey,
|
||||
insert_up_rtdef_.upd_rtdef_,
|
||||
upd_rtctx_));
|
||||
OZ(to_expr_skip_old(*constraint_value.current_datum_row_,
|
||||
constraint_rowkey,
|
||||
insert_up_spec_.get_ctdef().upd_ctdef_));
|
||||
clear_evaluated_flag();
|
||||
OZ(insert_upd_new_row_to_das(insert_up_spec_.get_ctdef().upd_ctdef_,
|
||||
@ -267,7 +270,6 @@ int ObTableApiInsertUpExecutor::do_update(const ObRowkey &constraint_rowkey,
|
||||
} else if (NULL == constraint_value.baseline_datum_row_ &&
|
||||
NULL != constraint_value.current_datum_row_) { // 单单是唯一索引冲突的时候,会走这个分支
|
||||
OZ(to_expr_skip_old(*constraint_value.current_datum_row_,
|
||||
constraint_rowkey,
|
||||
insert_up_spec_.get_ctdef().upd_ctdef_));
|
||||
OZ(insert_upd_new_row_to_das(insert_up_spec_.get_ctdef().upd_ctdef_,
|
||||
insert_up_rtdef_.upd_rtdef_,
|
||||
@ -295,6 +297,8 @@ int ObTableApiInsertUpExecutor::get_next_row()
|
||||
} else if (!is_duplicated()) {
|
||||
insert_rows_ = 1;
|
||||
LOG_TRACE("try insert is not duplicated", K(ret), K(insert_rows_));
|
||||
} else if (OB_FAIL(cache_insert_row())) {
|
||||
LOG_WARN("fail to cache insert row", K(ret));
|
||||
} else if (OB_FAIL(fetch_conflict_rowkey(conflict_checker_))) {
|
||||
LOG_WARN("fail to fetch conflict row", K(ret));
|
||||
} else if (OB_FAIL(reset_das_env(insert_up_rtdef_.ins_rtdef_))) {
|
||||
|
@ -25,8 +25,7 @@ public:
|
||||
ObTableApiInsertUpSpec(common::ObIAllocator &alloc, const ObTableExecutorType type)
|
||||
: ObTableApiModifySpec(alloc, type),
|
||||
insert_up_ctdef_(alloc),
|
||||
conflict_checker_ctdef_(alloc),
|
||||
all_saved_exprs_(alloc)
|
||||
conflict_checker_ctdef_(alloc)
|
||||
{
|
||||
}
|
||||
public:
|
||||
@ -34,12 +33,9 @@ public:
|
||||
OB_INLINE ObTableInsUpdCtDef& get_ctdef() { return insert_up_ctdef_; }
|
||||
OB_INLINE const sql::ObConflictCheckerCtdef& get_conflict_checker_ctdef() const { return conflict_checker_ctdef_; }
|
||||
OB_INLINE sql::ObConflictCheckerCtdef& get_conflict_checker_ctdef() { return conflict_checker_ctdef_; }
|
||||
OB_INLINE const common::ObIArray<sql::ObExpr *>& get_all_saved_exprs() const { return all_saved_exprs_; }
|
||||
OB_INLINE common::ObIArray<sql::ObExpr *>& get_all_saved_exprs() { return all_saved_exprs_; }
|
||||
private:
|
||||
ObTableInsUpdCtDef insert_up_ctdef_;
|
||||
sql::ObConflictCheckerCtdef conflict_checker_ctdef_;
|
||||
sql::ExprFixedArray all_saved_exprs_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTableApiInsertUpSpec);
|
||||
};
|
||||
@ -52,6 +48,7 @@ public:
|
||||
allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
||||
insert_up_spec_(spec),
|
||||
insert_up_rtdef_(),
|
||||
insert_row_(nullptr),
|
||||
insert_rows_(0),
|
||||
upd_changed_rows_(0),
|
||||
upd_rtctx_(eval_ctx_, exec_ctx_, get_fake_modify_op()),
|
||||
@ -110,6 +107,7 @@ private:
|
||||
int try_insert_row();
|
||||
int try_update_row();
|
||||
int do_insert_up_cache();
|
||||
int cache_insert_row();
|
||||
int prepare_final_insert_up_task();
|
||||
int do_update(const ObRowkey &constraint_rowkey,
|
||||
const sql::ObConflictValue &constraint_value);
|
||||
@ -119,6 +117,7 @@ private:
|
||||
common::ObArenaAllocator allocator_;
|
||||
const ObTableApiInsertUpSpec &insert_up_spec_;
|
||||
ObTableInsUpdRtDef insert_up_rtdef_;
|
||||
ObChunkDatumStore::StoredRow *insert_row_;
|
||||
int64_t insert_rows_;
|
||||
int64_t upd_changed_rows_;
|
||||
sql::ObDMLRtCtx upd_rtctx_;
|
||||
|
@ -410,7 +410,6 @@ int ObTableApiModifyExecutor::check_whether_row_change(const ObChunkDatumStore::
|
||||
}
|
||||
|
||||
int ObTableApiModifyExecutor::to_expr_skip_old(const ObChunkDatumStore::StoredRow &store_row,
|
||||
const ObRowkey &constraint_rowkey,
|
||||
const ObTableUpdCtDef &upd_ctdef)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -430,37 +429,35 @@ int ObTableApiModifyExecutor::to_expr_skip_old(const ObChunkDatumStore::StoredRo
|
||||
}
|
||||
|
||||
// 2. refresh assign column expr datum
|
||||
const ObTableCtx::ObAssignIds &assign_ids = tb_ctx_.get_assign_ids();
|
||||
const int64_t N = assign_ids.count();
|
||||
for (uint64_t i = 0; OB_SUCC(ret) && i < N; ++i) {
|
||||
uint64_t assign_id = assign_ids.at(i).idx_;
|
||||
const ObColumnSchemaV2 *col_schema = nullptr;
|
||||
if (OB_ISNULL(col_schema = table_schema->get_column_schema_by_idx(assign_id))) {
|
||||
ret = OB_SCHEMA_ERROR;
|
||||
LOG_WARN("fail to get column schema", K(ret), K(assign_id), K(*table_schema));
|
||||
} else if (assign_id >= store_row.cnt_) {
|
||||
ret = OB_ERROR_OUT_OF_RANGE;
|
||||
LOG_WARN("assign idx out of range", K(ret), K(assign_id), K(store_row.cnt_));
|
||||
} else if (assign_id >= new_row.count()) {
|
||||
ret = OB_ERROR_OUT_OF_RANGE;
|
||||
LOG_WARN("assign idx out of range", K(ret), K(assign_id), K(new_row.count()));
|
||||
} else if (col_schema->is_virtual_generated_column()) {
|
||||
const ObIArray<ObTableAssignment> &assigns = tb_ctx_.get_assignments();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < assigns.count(); i++) {
|
||||
const ObTableAssignment &assign = assigns.at(i);
|
||||
if (OB_ISNULL(assign.column_item_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("should not have virtual generated expr", K(ret));
|
||||
} else if (col_schema->is_stored_generated_column()) {
|
||||
ObTableCtx &ctx = const_cast<ObTableCtx &>(tb_ctx_);
|
||||
if (OB_FAIL(ObTableExprCgService::refresh_generated_column_related_frame(ctx,
|
||||
upd_ctdef.old_row_,
|
||||
upd_ctdef.full_assign_row_,
|
||||
assign_ids,
|
||||
*col_schema))) {
|
||||
LOG_WARN("fail to refresh generated column related frame", K(ret), K(ctx), K(*col_schema));
|
||||
}
|
||||
LOG_WARN("assign column item is null", K(ret), K(assign));
|
||||
} else if (new_row.count() < assign.column_item_->col_idx_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected assign projector_index_", K(ret), K(new_row), K(assign.column_item_));
|
||||
} else if (assign.column_item_->is_virtual_generated_column_) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("virtual generated column not support to update", K(ret), K(assign));
|
||||
} else {
|
||||
const ObExpr *expr = new_row.at(assign_id);
|
||||
expr->locate_expr_datum(eval_ctx_) = store_row.cells()[assign_id];
|
||||
expr->get_eval_info(eval_ctx_).evaluated_ = true;
|
||||
expr->get_eval_info(eval_ctx_).projected_ = true;
|
||||
ObExpr *expr = new_row.at(assign.column_item_->col_idx_);
|
||||
if (OB_ISNULL(expr)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("expr is null", K(ret));
|
||||
} else if (assign.column_item_->is_stored_generated_column_) {
|
||||
// do nothing, stored generated column not need to fill
|
||||
} else if (assign.column_item_->auto_filled_timestamp_ && !assign.is_assigned_) {
|
||||
ObDatum *tmp_datum = nullptr;
|
||||
if (OB_FAIL(expr->eval(eval_ctx_, tmp_datum))) {
|
||||
LOG_WARN("fail to eval current timestamp expr", K(ret));
|
||||
}
|
||||
} else {
|
||||
expr->locate_expr_datum(eval_ctx_) = store_row.cells()[assign.column_item_->col_idx_];
|
||||
expr->get_eval_info(eval_ctx_).evaluated_ = true;
|
||||
expr->get_eval_info(eval_ctx_).projected_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -108,7 +108,6 @@ protected:
|
||||
const ObTableUpdCtDef &upd_ctdef,
|
||||
bool &is_row_changed);
|
||||
int to_expr_skip_old(const ObChunkDatumStore::StoredRow &store_row,
|
||||
const ObRowkey &constraint_rowkey,
|
||||
const ObTableUpdCtDef &upd_ctdef);
|
||||
int generate_del_rtdef_for_update(const ObTableUpdCtDef &upd_ctdef,
|
||||
ObTableUpdRtDef &upd_rtdef);
|
||||
|
@ -81,47 +81,49 @@ int ObTableOpWrapper::process_affected_entity(ObTableCtx &tb_ctx,
|
||||
} else if (OB_FAIL(op_result.get_entity(result_entity))) {
|
||||
LOG_WARN("fail to get result entity", K(ret), K(result_entity));
|
||||
} else {
|
||||
ObIAllocator &allocator = tb_ctx.get_allocator();
|
||||
const ObIArray<ObExpr *> *full_assign_exprs = nullptr;
|
||||
const ObIArray<ObExpr *> *ins_exprs = nullptr;
|
||||
const ObIArray<ObExpr *> *upd_exprs = nullptr;
|
||||
bool use_insert_expr = false;
|
||||
if (TABLE_API_EXEC_INSERT_UP == spec.get_type()) {
|
||||
const ObTableApiInsertUpSpec &ins_up_spec = static_cast<const ObTableApiInsertUpSpec&>(spec);
|
||||
full_assign_exprs = &ins_up_spec.get_ctdef().upd_ctdef_.full_assign_row_;
|
||||
ins_exprs = &ins_up_spec.get_ctdef().ins_ctdef_.new_row_;
|
||||
upd_exprs = &ins_up_spec.get_ctdef().upd_ctdef_.new_row_;
|
||||
use_insert_expr = !static_cast<ObTableApiInsertUpExecutor&>(executor).is_insert_duplicated();
|
||||
} else {
|
||||
ObTableApiTTLExecutor &ttl_executor = static_cast<ObTableApiTTLExecutor&>(executor);
|
||||
const ObTableApiTTLSpec &ttl_spec = static_cast<const ObTableApiTTLSpec&>(spec);
|
||||
full_assign_exprs = &ttl_spec.get_ctdef().upd_ctdef_.full_assign_row_;
|
||||
ins_exprs = &ttl_spec.get_ctdef().ins_ctdef_.new_row_;
|
||||
upd_exprs = &ttl_spec.get_ctdef().upd_ctdef_.new_row_;
|
||||
use_insert_expr = !ttl_executor.is_insert_duplicated() || ttl_executor.is_expired();
|
||||
}
|
||||
const ObTableCtx::ObAssignIds &assign_ids = tb_ctx.get_assign_ids();
|
||||
const int64_t N = assign_ids.count();
|
||||
ObObj *obj_array = static_cast<ObObj*>(allocator.alloc(sizeof(ObObj) * N));
|
||||
|
||||
ObIArray<ObTableAssignment> &assigns = tb_ctx.get_assignments();
|
||||
ObIAllocator &allocator = tb_ctx.get_allocator();
|
||||
ObObj *obj_array = static_cast<ObObj*>(allocator.alloc(sizeof(ObObj) * assigns.count()));
|
||||
if (OB_ISNULL(obj_array)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("faild to alloc memory for objs", K(ret));
|
||||
} else if (OB_ISNULL(full_assign_exprs)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("full assign exprs is null", K(ret));
|
||||
LOG_WARN("faild to alloc memory for objs", K(ret), K(assigns.count()));
|
||||
} else if (OB_ISNULL(ins_exprs)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("insert exprs is null", K(ret));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < N; i++) {
|
||||
uint64_t idx = assign_ids.at(i).idx_;
|
||||
uint64_t column_id = assign_ids.at(i).column_id_;
|
||||
const ObColumnSchemaV2 *column_schema = nullptr;
|
||||
if (OB_ISNULL(column_schema = tb_ctx.get_table_schema()->get_column_schema(column_id))) {
|
||||
ret = OB_ERR_COLUMN_NOT_FOUND;
|
||||
LOG_WARN("column not exist", K(ret), K(column_id));
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < assigns.count(); i++) {
|
||||
ObTableAssignment &assign = assigns.at(i);
|
||||
uint64_t project_idx = OB_INVALID_ID;
|
||||
if (OB_ISNULL(assign.column_item_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("assign column item is nullptr", K(ret), K(assign));
|
||||
} else if (FALSE_IT(project_idx = assign.column_item_->col_idx_)) {
|
||||
} else if (use_insert_expr && ins_exprs->count() <= project_idx) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected insert index", K(ret), K(ins_exprs), K(assign));
|
||||
} else if (!use_insert_expr && upd_exprs->count() <= project_idx) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected assign index", K(ret), K(upd_exprs), K(assign));
|
||||
} else {
|
||||
ObObj &obj = obj_array[i];
|
||||
ObExpr *rt_expr = use_insert_expr ? ins_exprs->at(idx) : full_assign_exprs->at(idx);
|
||||
ObExpr *rt_expr = use_insert_expr ? ins_exprs->at(project_idx) : upd_exprs->at(project_idx);
|
||||
ObDatum *datum = nullptr;
|
||||
const ObString &column_name = column_schema->get_column_name_str();
|
||||
if (OB_FAIL(rt_expr->eval(executor.get_eval_ctx(), datum))) {
|
||||
LOG_WARN("fail to eval datum", K(ret), K(*rt_expr));
|
||||
} else if (OB_FAIL(datum->to_obj(obj, rt_expr->obj_meta_))) {
|
||||
@ -129,8 +131,8 @@ int ObTableOpWrapper::process_affected_entity(ObTableCtx &tb_ctx,
|
||||
} else if (is_lob_storage(obj.get_type())
|
||||
&& OB_FAIL(ObTableCtx::read_real_lob(allocator, obj))) {
|
||||
LOG_WARN("fail to read lob", K(ret), K(obj));
|
||||
} else if (OB_FAIL(result_entity->set_property(column_name, obj))) {
|
||||
LOG_WARN("fail to set property", K(ret), K(column_name), K(obj));
|
||||
} else if (OB_FAIL(result_entity->set_property(assign.column_item_->column_name_, obj))) {
|
||||
LOG_WARN("fail to set property", K(ret), K(assign), K(obj));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ int ObTableQueryUtils::check_htable_query_args(const ObTableQuery &query,
|
||||
int ret = OB_SUCCESS;
|
||||
const ObIArray<ObString> &select_columns = tb_ctx.get_query_col_names();
|
||||
int64_t N = select_columns.count();
|
||||
if (N != 4) {
|
||||
if (N != 4 && N != 5) { // htable maybe has prefix generated column
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("TableQuery with htable_filter should select 4 columns", K(ret), K(N));
|
||||
}
|
||||
|
@ -208,18 +208,11 @@ int ObTableApiReplaceExecutor::do_insert()
|
||||
const ObTableEntity *entity = static_cast<const ObTableEntity*>(tb_ctx_.get_entity());
|
||||
const ObTableReplaceCtDef &ctdef = replace_spec_.get_ctdef();
|
||||
|
||||
if (OB_FAIL(refresh_exprs_frame(entity))) {
|
||||
LOG_WARN("fail to refresh expr frame", K(ret));
|
||||
} else if (tb_ctx_.has_auto_inc()) {
|
||||
for (int64_t i = 0; i < ctdef.ins_ctdef_.new_row_.count(); i++) { // 在自增的场景下,由于自增列的列引用表达式被用户输入的值覆盖
|
||||
if (ctdef.ins_ctdef_.new_row_.at(i)->type_ == T_FUN_COLUMN_CONV) { // 故需要手动清空eval的flag
|
||||
ctdef.ins_ctdef_.new_row_.at(i)->get_eval_info(eval_ctx_).evaluated_ = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
// do nothing
|
||||
if (OB_ISNULL(insert_row_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("insert row is null", K(ret));
|
||||
} else if (OB_FAIL(insert_row_->to_expr(get_primary_table_new_row(), eval_ctx_))) {
|
||||
LOG_WARN("stored row to expr faild", K(ret));
|
||||
} else if (OB_FAIL(insert_row_to_das(ctdef.ins_ctdef_, replace_rtdef_.ins_rtdef_))) {
|
||||
LOG_WARN("shuffle insert row failed", K(ret));
|
||||
} else {
|
||||
@ -229,6 +222,21 @@ int ObTableApiReplaceExecutor::do_insert()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableApiReplaceExecutor::cache_insert_row()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObExprPtrIArray &new_row_exprs = get_primary_table_new_row();
|
||||
|
||||
if (OB_FAIL(ObChunkDatumStore::StoredRow::build(insert_row_, new_row_exprs, eval_ctx_, allocator_))) {
|
||||
LOG_WARN("fail to build stored row", K(ret), K(new_row_exprs));
|
||||
} else if (OB_ISNULL(insert_row_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("cache insert row is null", K(ret));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableApiReplaceExecutor::prepare_final_replace_task()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -278,6 +286,8 @@ int ObTableApiReplaceExecutor::get_next_row()
|
||||
LOG_WARN("fail to post all das task", K(ret));
|
||||
} else if (!is_duplicated()) {
|
||||
LOG_DEBUG("try insert is not duplicated", K(ret));
|
||||
} else if (OB_FAIL(cache_insert_row())) {
|
||||
LOG_WARN("fail to cache insert row", K(ret));
|
||||
} else if (OB_FAIL(fetch_conflict_rowkey(conflict_checker_))) {
|
||||
LOG_WARN("fail to fetch conflict row", K(ret));
|
||||
} else if (OB_FAIL(reset_das_env(replace_rtdef_.ins_rtdef_))) {
|
||||
|
@ -48,6 +48,7 @@ public:
|
||||
: ObTableApiModifyExecutor(ctx),
|
||||
allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
||||
replace_spec_(replace_spec),
|
||||
insert_row_(NULL),
|
||||
insert_rows_(0),
|
||||
delete_rows_(0),
|
||||
conflict_checker_(allocator_, eval_ctx_, replace_spec_.get_conflict_checker_ctdef()),
|
||||
@ -93,6 +94,7 @@ private:
|
||||
const ObChunkDatumStore::StoredRow *replace_row,
|
||||
const ObChunkDatumStore::StoredRow *delete_row);
|
||||
int prepare_final_replace_task();
|
||||
int cache_insert_row();
|
||||
int do_delete(ObConflictRowMap *primary_map);
|
||||
int do_insert();
|
||||
int reuse();
|
||||
@ -100,6 +102,7 @@ private:
|
||||
common::ObArenaAllocator allocator_;
|
||||
const ObTableApiReplaceSpec &replace_spec_;
|
||||
ObTableReplaceRtDef replace_rtdef_;
|
||||
ObChunkDatumStore::StoredRow *insert_row_;
|
||||
int64_t insert_rows_;
|
||||
int64_t delete_rows_;
|
||||
sql::ObConflictChecker conflict_checker_;
|
||||
|
@ -21,8 +21,7 @@ namespace oceanbase
|
||||
{
|
||||
namespace table
|
||||
{
|
||||
int ObTableApiUpdateExecutor::process_single_operation(const ObTableEntity *entity,
|
||||
const ObTableCtx::ObAssignIds &assign_ids)
|
||||
int ObTableApiUpdateExecutor::process_single_operation(const ObTableEntity *entity)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObIArray<ObNewRange> &key_ranges = tb_ctx_.get_key_ranges();
|
||||
@ -49,10 +48,7 @@ int ObTableApiUpdateExecutor::process_single_operation(const ObTableEntity *enti
|
||||
LOG_WARN("fail to get next row", K(ret));
|
||||
}
|
||||
} else if (OB_FAIL(ObTableExprCgService::refresh_update_exprs_frame(tb_ctx_,
|
||||
upd_ctdef->old_row_,
|
||||
upd_ctdef->new_row_,
|
||||
upd_ctdef->full_assign_row_,
|
||||
assign_ids,
|
||||
*entity))) {
|
||||
LOG_WARN("fail to refresh update exprs frame", K(ret), K(*entity), K(cur_idx_));
|
||||
}
|
||||
@ -69,7 +65,7 @@ int ObTableApiUpdateExecutor::get_next_row_from_child()
|
||||
|
||||
if (cur_idx_ >= 1) {
|
||||
ret = OB_ITER_END;
|
||||
} else if (OB_FAIL(process_single_operation(entity, tb_ctx_.get_assign_ids()))) {
|
||||
} else if (OB_FAIL(process_single_operation(entity))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("fail to process single update operation", K(ret));
|
||||
}
|
||||
|
@ -53,8 +53,7 @@ private:
|
||||
int get_next_row_from_child();
|
||||
int update_row_to_das();
|
||||
int upd_rows_post_proc();
|
||||
int process_single_operation(const ObTableEntity *entity,
|
||||
const ObTableCtx::ObAssignIds &assign_ids);
|
||||
int process_single_operation(const ObTableEntity *entity);
|
||||
private:
|
||||
const ObTableApiUpdateSpec &upd_spec_;
|
||||
ObTableUpdRtDef upd_rtdef_;
|
||||
|
@ -93,7 +93,7 @@ int ObTableApiTTLExecutor::refresh_exprs_frame(const ObTableEntity *entity)
|
||||
LOG_WARN("entity is null", K(ret));
|
||||
} else if (OB_FAIL(ObTableExprCgService::refresh_ttl_exprs_frame(tb_ctx_,
|
||||
ins_ctdef.new_row_,
|
||||
upd_ctdef.delta_exprs_,
|
||||
upd_ctdef.delta_row_,
|
||||
*entity))) {
|
||||
LOG_WARN("fail to refresh ttl exprs frame", K(ret), K(*entity));
|
||||
}
|
||||
@ -277,7 +277,6 @@ int ObTableApiTTLExecutor::update_row_to_das()
|
||||
ttl_rtdef_.upd_rtdef_,
|
||||
upd_rtctx_));
|
||||
OZ(to_expr_skip_old(*constraint_value.current_datum_row_,
|
||||
constraint_rowkey,
|
||||
ttl_spec_.get_ctdef().upd_ctdef_));
|
||||
clear_evaluated_flag();
|
||||
OZ(insert_upd_new_row_to_das(ttl_spec_.get_ctdef().upd_ctdef_,
|
||||
@ -286,7 +285,6 @@ int ObTableApiTTLExecutor::update_row_to_das()
|
||||
} else if (NULL == constraint_value.baseline_datum_row_ &&
|
||||
NULL != constraint_value.current_datum_row_) { // 单单是唯一索引冲突的时候,会走这个分支
|
||||
OZ(to_expr_skip_old(*constraint_value.current_datum_row_,
|
||||
constraint_rowkey,
|
||||
ttl_spec_.get_ctdef().upd_ctdef_));
|
||||
OZ(insert_upd_new_row_to_das(ttl_spec_.get_ctdef().upd_ctdef_,
|
||||
ttl_rtdef_.upd_rtdef_,
|
||||
|
@ -166,7 +166,8 @@ void TestCreateExecutor::fake_ctx_init_common(ObTableCtx &fake_ctx, ObTableSchem
|
||||
g_sess_node_val.is_inited_ = true;
|
||||
g_sess_node_val.sess_info_.test_init(0, 0, 0, NULL);
|
||||
g_sess_node_val.sess_info_.load_all_sys_vars(schema_guard_);
|
||||
fake_ctx.init_phy_plan_ctx();
|
||||
fake_ctx.init_physical_plan_ctx(0, 1);
|
||||
ASSERT_EQ(OB_SUCCESS, fake_ctx.construct_column_items());
|
||||
}
|
||||
|
||||
TEST_F(TestCreateExecutor, scan)
|
||||
@ -296,7 +297,7 @@ TEST_F(TestCreateExecutor, update)
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, ObTableExprCgService::generate_exprs(fake_ctx, allocator_, fake_expr_info));
|
||||
fake_ctx.set_expr_info(&fake_expr_info);
|
||||
ASSERT_EQ(6, fake_ctx.get_all_exprs().get_expr_array().count());
|
||||
ASSERT_EQ(4, fake_ctx.get_all_exprs().get_expr_array().count());
|
||||
ObTableApiSpec *root_spec = nullptr;
|
||||
ObTableApiExecutor *executor = nullptr;
|
||||
// generate update spec tree
|
||||
@ -343,7 +344,7 @@ TEST_F(TestCreateExecutor, insertup)
|
||||
ASSERT_EQ(OB_SUCCESS, fake_ctx.init_insert_up());
|
||||
ASSERT_EQ(OB_SUCCESS, ObTableExprCgService::generate_exprs(fake_ctx, allocator_, fake_expr_info));
|
||||
fake_ctx.set_expr_info(&fake_expr_info);
|
||||
ASSERT_EQ(6, fake_ctx.get_all_exprs().get_expr_array().count());
|
||||
ASSERT_EQ(4, fake_ctx.get_all_exprs().get_expr_array().count());
|
||||
ObTableApiSpec *root_spec = nullptr;
|
||||
ObTableApiExecutor *executor = nullptr;
|
||||
|
||||
|
Reference in New Issue
Block a user