diff --git a/src/observer/table/ob_table_cg_service.cpp b/src/observer/table/ob_table_cg_service.cpp index 55a268066b..5379c5cc49 100644 --- a/src/observer/table/ob_table_cg_service.cpp +++ b/src/observer/table/ob_table_cg_service.cpp @@ -793,32 +793,42 @@ int ObTableExprCgService::refresh_delete_exprs_frame(ObTableCtx &ctx, { int ret = OB_SUCCESS; ObSEArray rowkey; - ObObj k_obj; - ObObj q_obj; - ObObj t_obj; - int64_t time = 0; + if (ObTableEntityType::ET_HKV == ctx.get_entity_type()) { + ObObj k_obj; + ObObj q_obj; + ObObj t_obj; + int64_t time = 0; - // htable场景rowkey都在properties中,所以需要从properties中提取出rowkey - if (OB_FAIL(entity.get_property(ObHTableConstants::ROWKEY_CNAME_STR, k_obj))) { - LOG_WARN("fail to get K", K(ret)); - } else if (OB_FAIL(entity.get_property(ObHTableConstants::CQ_CNAME_STR, q_obj))) { - LOG_WARN("fail to get Q", K(ret)); - } else if (OB_FAIL(entity.get_property(ObHTableConstants::VERSION_CNAME_STR, t_obj))) { - LOG_WARN("fail to get T", K(ret)); - } else if (OB_FAIL(rowkey.push_back(k_obj))) { - LOG_WARN("fail to push back k_obj", K(ret), K(k_obj)); - } else if (OB_FAIL(rowkey.push_back(q_obj))) { - LOG_WARN("fail to push back q_obj", K(ret), K(q_obj)); - } else if (FALSE_IT(time = t_obj.get_int())) { - // do nothing - } else if (FALSE_IT(t_obj.set_int(-1 * time))) { - // do nothing - } else if (OB_FAIL(rowkey.push_back(t_obj))) { - LOG_WARN("fail to push back t_obj", K(ret), K(t_obj)); - } else if (OB_FAIL(refresh_rowkey_exprs_frame(ctx, exprs, rowkey))) { - LOG_WARN("fail to init rowkey exprs frame", K(ret), K(ctx), K(rowkey)); - } else if (OB_FAIL(refresh_properties_exprs_frame(ctx, exprs, entity))) { - LOG_WARN("fail to init properties exprs frame", K(ret), K(ctx)); + // htable场景rowkey都在properties中,所以需要从properties中提取出rowkey + if (OB_FAIL(entity.get_property(ObHTableConstants::ROWKEY_CNAME_STR, k_obj))) { + LOG_WARN("fail to get K", K(ret)); + } else if (OB_FAIL(entity.get_property(ObHTableConstants::CQ_CNAME_STR, q_obj))) { + LOG_WARN("fail to get Q", K(ret)); + } else if (OB_FAIL(entity.get_property(ObHTableConstants::VERSION_CNAME_STR, t_obj))) { + LOG_WARN("fail to get T", K(ret)); + } else if (OB_FAIL(rowkey.push_back(k_obj))) { + LOG_WARN("fail to push back k_obj", K(ret), K(k_obj)); + } else if (OB_FAIL(rowkey.push_back(q_obj))) { + LOG_WARN("fail to push back q_obj", K(ret), K(q_obj)); + } else if (FALSE_IT(time = t_obj.get_int())) { + // do nothing + } else if (FALSE_IT(t_obj.set_int(-1 * time))) { + // do nothing + } else if (OB_FAIL(rowkey.push_back(t_obj))) { + LOG_WARN("fail to push back t_obj", K(ret), K(t_obj)); + } + } else { + if (OB_FAIL(rowkey.assign(entity.get_rowkey_objs()))) { + LOG_WARN("fail to assign", K(ret), K(entity.get_rowkey_objs())); + } + } + + if (OB_SUCC(ret)) { + if (OB_FAIL(refresh_rowkey_exprs_frame(ctx, exprs, rowkey))) { + LOG_WARN("fail to init rowkey exprs frame", K(ret), K(ctx), K(rowkey)); + } else if (OB_FAIL(refresh_properties_exprs_frame(ctx, exprs, entity))) { + LOG_WARN("fail to init properties exprs frame", K(ret), K(ctx)); + } } return ret; @@ -934,35 +944,43 @@ int ObTableExprCgService::refresh_rowkey_exprs_frame(ObTableCtx &ctx, LOG_WARN("invalid column item count", K(ret), K(items), K(schema_rowkey_cnt)); } - for (int64_t i = 0, rowkey_idx = 0; OB_SUCC(ret) && i < schema_rowkey_cnt; i++) { + // not always the primary key is the prefix of table schema + // e.g., create table test(a varchar(1024), b int primary key); + for (int64_t i = 0; OB_SUCC(ret) && i < items.count(); i++) { const ObTableColumnItem &item = items.at(i); - const ObExpr *expr = exprs.at(i); - if (T_FUN_SYS_AUTOINC_NEXTVAL == expr->type_) { - ObObj null_obj; - null_obj.set_null(); - const ObObj *tmp_obj = is_full_filled ? &rowkey.at(rowkey_idx) : &null_obj; - if (OB_FAIL(write_autoinc_datum(ctx, *expr, eval_ctx, *tmp_obj))) { - LOG_WARN("fail to write auto increment datum", K(ret), K(is_full_filled), K(*expr), K(*tmp_obj)); - } - if (is_full_filled) { - rowkey_idx++; - } - } else if (!is_full_filled && IS_DEFAULT_NOW_OBJ(item.default_value_)) { - ObDatum *tmp_datum = nullptr; - if (OB_FAIL(expr->eval(eval_ctx, tmp_datum))) { - LOG_WARN("fail to eval current timestamp expr", K(ret)); - } + int64_t rowkey_position = item.rowkey_position_; + if (rowkey_position <= 0) { + // normal column, do nothing } else { - if (rowkey_idx >= entity_rowkey_cnt) { - ret = OB_INDEX_OUT_OF_RANGE; - LOG_WARN("idx out of range", K(ret), K(i), K(entity_rowkey_cnt)); - } else if (OB_ISNULL(expr)) { - ret = OB_ERR_UNDEFINED; - LOG_WARN("expr is null", K(ret)); - } else if (OB_FAIL(write_datum(ctx, ctx.get_allocator(), *expr, eval_ctx, rowkey.at(rowkey_idx)))) { - LOG_WARN("fail to write datum", K(ret), K(rowkey_idx), K(rowkey.at(rowkey_idx)), K(*expr)); + const ObTableColumnItem &item = items.at(i); + const ObExpr *expr = exprs.at(i); + if (T_FUN_SYS_AUTOINC_NEXTVAL == expr->type_) { + if (is_full_filled && rowkey_position > entity_rowkey_cnt) { + ret = OB_INDEX_OUT_OF_RANGE; + LOG_WARN("idx out of range", K(ret), K(i), K(rowkey_position), K(entity_rowkey_cnt)); + } else { + ObObj null_obj; + null_obj.set_null(); + const ObObj *tmp_obj = is_full_filled ? &rowkey.at(rowkey_position-1) : &null_obj; + if (OB_FAIL(write_autoinc_datum(ctx, *expr, eval_ctx, *tmp_obj))) { + LOG_WARN("fail to write auto increment datum", K(ret), K(is_full_filled), K(*expr), K(*tmp_obj)); + } + } + } else if (!is_full_filled && IS_DEFAULT_NOW_OBJ(item.default_value_)) { + ObDatum *tmp_datum = nullptr; + if (OB_FAIL(expr->eval(eval_ctx, tmp_datum))) { + LOG_WARN("fail to eval current timestamp expr", K(ret)); + } } else { - rowkey_idx++; + if (rowkey_position > entity_rowkey_cnt) { + ret = OB_INDEX_OUT_OF_RANGE; + LOG_WARN("idx out of range", K(ret), K(i), K(entity_rowkey_cnt), K(rowkey_position)); + } else if (OB_ISNULL(expr)) { + ret = OB_ERR_UNDEFINED; + LOG_WARN("expr is null", K(ret)); + } else if (OB_FAIL(write_datum(ctx, ctx.get_allocator(), *expr, eval_ctx, rowkey.at(rowkey_position-1)))) { + LOG_WARN("fail to write datum", K(ret), K(rowkey_position), K(rowkey.at(rowkey_position-1)), K(*expr)); + } } } } @@ -998,44 +1016,48 @@ int ObTableExprCgService::refresh_properties_exprs_frame(ObTableCtx &ctx, } else { ObObj prop_value; const ObObj *obj = nullptr; - const int64_t rowkey_column_cnt = table_schema->get_rowkey_column_num(); - for (int64_t i = rowkey_column_cnt; OB_SUCC(ret) && i < items.count(); i++) { + // not always the primary key is the prefix of table schema + // e.g., create table test(a varchar(1024), b int primary key); + for (int64_t i = 0; OB_SUCC(ret) && i < items.count(); i++) { const ObTableColumnItem &item = items.at(i); - const ObExpr *expr = exprs.at(i); - if (item.is_generated_column_) { // generate column need eval first - ObDatum *tmp_datum = nullptr; - if (OB_FAIL(expr->eval(eval_ctx, tmp_datum))) { - LOG_WARN("fail to eval generate expr", K(ret)); - } + if (item.rowkey_position_ > 0) { + // rowkey column, do nothing } else { - // 这里使用schema的列名在entity中查找property,有可能出现本身entity中的prop_name是不对的,导致找不到 - bool not_found = (OB_SEARCH_NOT_FOUND == entity.get_property(item.column_name_, prop_value)); - if (not_found) { - obj = &item.default_value_; - } else { - obj = &prop_value; - } - if (T_FUN_SYS_AUTOINC_NEXTVAL == expr->type_) { - ObObj null_obj; - null_obj.set_null(); - obj = not_found ? &null_obj : &prop_value; - if (OB_FAIL(write_autoinc_datum(ctx, *expr, eval_ctx, *obj))) { - LOG_WARN("fail to write auto increment datum", K(ret), K(not_found), K(*expr), K(*obj)); - } - } else if (not_found && IS_DEFAULT_NOW_OBJ(item.default_value_)) { + const ObExpr *expr = exprs.at(i); + if (item.is_generated_column_) { // generate column need eval first ObDatum *tmp_datum = nullptr; if (OB_FAIL(expr->eval(eval_ctx, tmp_datum))) { - LOG_WARN("fail to eval current timestamp expr", K(ret)); + LOG_WARN("fail to eval generate expr", K(ret)); } } else { - if (OB_FAIL(write_datum(ctx, ctx.get_allocator(), *expr, eval_ctx, *obj))) { - LOG_WARN("fail to write datum", K(ret), K(*obj), K(*expr)); + // 这里使用schema的列名在entity中查找property,有可能出现本身entity中的prop_name是不对的,导致找不到 + bool not_found = (OB_SEARCH_NOT_FOUND == entity.get_property(item.column_name_, prop_value)); + if (not_found) { + obj = &item.default_value_; + } else { + obj = &prop_value; + } + if (T_FUN_SYS_AUTOINC_NEXTVAL == expr->type_) { + ObObj null_obj; + null_obj.set_null(); + obj = not_found ? &null_obj : &prop_value; + if (OB_FAIL(write_autoinc_datum(ctx, *expr, eval_ctx, *obj))) { + LOG_WARN("fail to write auto increment datum", K(ret), K(not_found), K(*expr), K(*obj)); + } + } else if (not_found && IS_DEFAULT_NOW_OBJ(item.default_value_)) { + ObDatum *tmp_datum = nullptr; + if (OB_FAIL(expr->eval(eval_ctx, tmp_datum))) { + LOG_WARN("fail to eval current timestamp expr", K(ret)); + } + } else { + if (OB_FAIL(write_datum(ctx, ctx.get_allocator(), *expr, eval_ctx, *obj))) { + LOG_WARN("fail to write datum", K(ret), K(*obj), K(*expr)); + } } } } } } - return ret; } diff --git a/src/observer/table/ob_table_context.cpp b/src/observer/table/ob_table_context.cpp index 3084082ddd..d23b5991c4 100644 --- a/src/observer/table/ob_table_context.cpp +++ b/src/observer/table/ob_table_context.cpp @@ -142,6 +142,7 @@ int ObTableCtx::construct_column_items() item.is_auto_increment_ = col_schema->is_autoincrement(); item.generated_expr_str_ = item.default_value_.get_string(); item.auto_filled_timestamp_ = col_schema->is_on_update_current_timestamp(); + item.rowkey_position_ = col_schema->get_rowkey_position(); if (item.is_auto_increment_ && OB_FAIL(add_auto_inc_param(*col_schema))) { LOG_WARN("fail to add auto inc param", K(ret), K(item)); } else if (item.is_generated_column_ diff --git a/src/observer/table/ob_table_context.h b/src/observer/table/ob_table_context.h index 2196c2d18e..2992f62d8e 100644 --- a/src/observer/table/ob_table_context.h +++ b/src/observer/table/ob_table_context.h @@ -33,7 +33,8 @@ struct ObTableColumnItem : public sql::ColumnItem is_generated_column_(false), is_stored_generated_column_(false), is_virtual_generated_column_(false), - is_auto_increment_(false) + is_auto_increment_(false), + rowkey_position_(-1) {} TO_STRING_KV("ColumnItem", static_cast(*this), KPC_(raw_expr), @@ -43,7 +44,8 @@ struct ObTableColumnItem : public sql::ColumnItem K_(cascaded_column_ids), K_(generated_expr_str), K_(dependant_exprs), - K_(is_auto_increment)); + K_(is_auto_increment), + K_(rowkey_position)); sql::ObRawExpr *raw_expr_; // column ref expr or calculate expr bool is_generated_column_; bool is_stored_generated_column_; @@ -54,6 +56,7 @@ struct ObTableColumnItem : public sql::ColumnItem common::ObString generated_expr_str_; common::ObSEArray dependant_exprs_; bool is_auto_increment_; + int64_t rowkey_position_; // greater than zero if this is rowkey column, 0 if this is common column }; struct ObTableAssignment : public sql::ObAssignment @@ -147,6 +150,7 @@ public: return_rowkey_ = false; cur_cluster_version_ = GET_MIN_CLUSTER_VERSION(); is_ttl_table_ = false; + is_skip_scan_ = false; } virtual ~ObTableCtx() {} @@ -179,7 +183,8 @@ public: K_(is_for_insertup), K_(entity_type), K_(cur_cluster_version), - K_(is_ttl_table)); + K_(is_ttl_table), + K_(is_skip_scan)); public: //////////////////////////////////////// getter //////////////////////////////////////////////// // for common @@ -291,6 +296,9 @@ public: && operation_type_ != ObTableOperationType::GET && operation_type_ != ObTableOperationType::SCAN; } + // for delete + OB_INLINE void set_skip_scan(bool skip_scan) { is_skip_scan_ = skip_scan; } + OB_INLINE bool is_skip_scan() { return is_skip_scan_; } public: // 基于 table name 初始化common部分(不包括expr_info_, exec_ctx_) int init_common(ObTableApiCredential &credential, @@ -462,6 +470,8 @@ private: // for lob adapt uint64_t cur_cluster_version_; bool is_ttl_table_; + // for delete skip scan + bool is_skip_scan_; private: DISALLOW_COPY_AND_ASSIGN(ObTableCtx); }; diff --git a/src/observer/table/ob_table_delete_executor.cpp b/src/observer/table/ob_table_delete_executor.cpp index 069ed01437..ac98e14dc8 100644 --- a/src/observer/table/ob_table_delete_executor.cpp +++ b/src/observer/table/ob_table_delete_executor.cpp @@ -32,6 +32,11 @@ int ObTableApiDeleteExecutor::open() LOG_WARN("fail to oepn ObTableApiModifyExecutor", K(ret)); } else if (OB_FAIL(generate_del_rtdef(del_spec_.get_ctdef(), del_rtdef_))) { LOG_WARN("fail to generate delete rtdef"); + } else { + if (tb_ctx_.is_skip_scan()) { + set_entity(tb_ctx_.get_entity()); + set_skip_scan(true); + } } return ret; diff --git a/src/observer/table/ob_table_modify_executor.cpp b/src/observer/table/ob_table_modify_executor.cpp index 1a55101c75..633f812a64 100644 --- a/src/observer/table/ob_table_modify_executor.cpp +++ b/src/observer/table/ob_table_modify_executor.cpp @@ -413,19 +413,22 @@ int ObTableApiModifyExecutor::to_expr_skip_old(const ObChunkDatumStore::StoredRo const ObTableUpdCtDef &upd_ctdef) { int ret = OB_SUCCESS; - const ObTableSchema *table_schema = tb_ctx_.get_table_schema(); const ObIArray &new_row = upd_ctdef.new_row_; - if (OB_UNLIKELY(store_row.cnt_ != new_row.count())) { + const ObIArray& column_items = tb_ctx_.get_column_items(); + if (OB_UNLIKELY(store_row.cnt_ != new_row.count()) || OB_UNLIKELY(new_row.count() != column_items.count())) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("datum count mismatch", K(ret), K(store_row.cnt_), K(new_row.count())); + LOG_WARN("datum count mismatch", K(ret), K(store_row.cnt_), K(new_row.count()), K(column_items.count())); } else { // 1. refresh rowkey expr datum - const int64_t rowkey_col_cnt = tb_ctx_.get_table_schema()->get_rowkey_column_num(); - for (uint64_t i = 0; OB_SUCC(ret) && i < rowkey_col_cnt; ++i) { + // not always the primary key is the prefix of table schema + // e.g., create table test(a varchar(1024), b int primary key); + for (uint64_t i = 0; OB_SUCC(ret) && i < column_items.count(); ++i) { const ObExpr *expr = new_row.at(i); - expr->locate_expr_datum(eval_ctx_) = store_row.cells()[i]; - expr->get_eval_info(eval_ctx_).evaluated_ = true; - expr->get_eval_info(eval_ctx_).projected_ = true; + if (column_items.at(i).rowkey_position_ > 0) { + expr->locate_expr_datum(eval_ctx_) = store_row.cells()[i]; + expr->get_eval_info(eval_ctx_).evaluated_ = true; + expr->get_eval_info(eval_ctx_).projected_ = true; + } } // 2. refresh assign column expr datum diff --git a/src/observer/table/ob_table_rpc_processor.cpp b/src/observer/table/ob_table_rpc_processor.cpp index c4f05f003c..2d322e52dd 100644 --- a/src/observer/table/ob_table_rpc_processor.cpp +++ b/src/observer/table/ob_table_rpc_processor.cpp @@ -38,6 +38,8 @@ using namespace oceanbase::table; using namespace oceanbase::share; using namespace oceanbase::obrpc; +const ObString ObTableApiProcessorBase::OBKV_TRACE_INFO = ObString::make_string("OBKV Operation"); + int ObTableLoginP::process() { int ret = OB_SUCCESS; @@ -510,11 +512,12 @@ int ObTableApiProcessorBase::end_trans(bool is_rollback, rpc::ObRequest *req, in int ObTableApiProcessorBase::sync_end_trans(bool is_rollback, int64_t timeout_ts, ObHTableLockHandle *lock_handle /*nullptr*/) { - return sync_end_trans_(is_rollback, trans_desc_, timeout_ts, lock_handle); + return sync_end_trans_(is_rollback, trans_desc_, timeout_ts, lock_handle, &OBKV_TRACE_INFO); } int ObTableApiProcessorBase::sync_end_trans_(bool is_rollback, transaction::ObTxDesc *&trans_desc, - int64_t timeout_ts, ObHTableLockHandle *lock_handle /*nullptr*/) + int64_t timeout_ts, ObHTableLockHandle *lock_handle /*nullptr*/, + const ObString *trace_info /*nullptr*/) { int ret = OB_SUCCESS; @@ -524,7 +527,7 @@ int ObTableApiProcessorBase::sync_end_trans_(bool is_rollback, transaction::ObTx if (OB_FAIL(txs->rollback_tx(*trans_desc))) { LOG_WARN("fail rollback trans when session terminate", K(ret), KPC(trans_desc)); } - } else if (OB_FAIL(txs->commit_tx(*trans_desc, stmt_timeout_ts))) { + } else if (OB_FAIL(txs->commit_tx(*trans_desc, stmt_timeout_ts, trace_info))) { ACTIVE_SESSION_FLAG_SETTER_GUARD(in_committing); LOG_WARN("fail commit trans when session terminate", K(ret), KPC(trans_desc), K(stmt_timeout_ts)); @@ -544,7 +547,8 @@ int ObTableApiProcessorBase::sync_end_trans_(bool is_rollback, transaction::ObTx return ret; } -int ObTableApiProcessorBase::async_commit_trans(rpc::ObRequest *req, int64_t timeout_ts, ObHTableLockHandle *lock_handle /*nullptr*/) +int ObTableApiProcessorBase::async_commit_trans(rpc::ObRequest *req, int64_t timeout_ts, + ObHTableLockHandle *lock_handle /*nullptr*/) { int ret = OB_SUCCESS; transaction::ObTransService *txs = MTL(transaction::ObTransService*); diff --git a/src/observer/table/ob_table_rpc_processor.h b/src/observer/table/ob_table_rpc_processor.h index eeb37b87d6..e20eb79028 100644 --- a/src/observer/table/ob_table_rpc_processor.h +++ b/src/observer/table/ob_table_rpc_processor.h @@ -127,7 +127,8 @@ public: static int sync_end_trans_(bool is_rollback, transaction::ObTxDesc *&trans_desc, int64_t timeout_ts, - table::ObHTableLockHandle *lock_handle = nullptr); + table::ObHTableLockHandle *lock_handle = nullptr, + const ObString *trace_info = nullptr); // for get int init_read_trans(const table::ObTableConsistencyLevel consistency_level, @@ -190,6 +191,7 @@ protected: bool had_do_response_; // asynchronous transactions return packet in advance sql::TransState *trans_state_ptr_; transaction::ObTxReadSnapshot tx_snapshot_; + static const ObString OBKV_TRACE_INFO; }; template diff --git a/src/observer/table/ttl/ob_table_ttl_task.cpp b/src/observer/table/ttl/ob_table_ttl_task.cpp index 898e3ab17d..3c312ffb17 100644 --- a/src/observer/table/ttl/ob_table_ttl_task.cpp +++ b/src/observer/table/ttl/ob_table_ttl_task.cpp @@ -28,6 +28,7 @@ using namespace oceanbase::share; using namespace oceanbase::table; using namespace oceanbase::rootserver; +const ObString ObTableTTLDeleteTask::TTL_TRACE_INFO = ObString::make_string("TTL Delete"); /** * ---------------------------------------- ObTableTTLDeleteTask ---------------------------------------- @@ -215,7 +216,8 @@ int ObTableTTLDeleteTask::process_one() if (trans_state.is_start_trans_executed() && trans_state.is_start_trans_success()) { int tmp_ret = ret; - if (OB_FAIL(ObTableApiProcessorBase::sync_end_trans_(OB_SUCCESS != ret, trans_desc, get_timeout_ts()))) { + if (OB_FAIL(ObTableApiProcessorBase::sync_end_trans_(OB_SUCCESS != ret, trans_desc, get_timeout_ts(), + nullptr, &TTL_TRACE_INFO))) { LOG_WARN("fail to end trans", KR(ret)); } ret = (OB_SUCCESS == tmp_ret) ? ret : tmp_ret; @@ -279,6 +281,7 @@ int ObTableTTLDeleteTask::process_ttl_delete(const ObITableEntity &new_entity, SMART_VAR(ObTableCtx, delete_ctx, allocator_) { if (OB_FAIL(init_tb_ctx(new_entity, delete_ctx))) { LOG_WARN("fail to init table ctx", K(ret), K(new_entity)); + } else if (FALSE_IT(delete_ctx.set_skip_scan(true))) { } else if (OB_FAIL(delete_ctx.init_trans(trans_desc, snapshot))) { LOG_WARN("fail to init trans", K(ret), K(delete_ctx)); } else if (OB_FAIL(ObTableOpWrapper::process_op(delete_ctx, op_result))) { @@ -429,9 +432,22 @@ int ObTableTTLDag::fill_info_param(compaction::ObIBasicInfoParam *&out_param, Ob } ObTableTTLDeleteRowIterator::ObTableTTLDeleteRowIterator() -: is_inited_(false), max_version_(0), time_to_live_ms_(0), limit_del_rows_(-1), cur_del_rows_(0), - cur_version_(0), cur_rowkey_(), cur_qualifier_(), max_version_cnt_(0), ttl_cnt_(0), scan_cnt_(0), - is_last_row_ttl_(true), is_hbase_table_(false), last_row_(nullptr), rowkey_cnt_(0) + : allocator_(ObMemAttr(MTL_ID(), "TTLDelRowIter")), + is_inited_(false), + max_version_(0), + time_to_live_ms_(0), + limit_del_rows_(-1), + cur_del_rows_(0), + cur_version_(0), + cur_rowkey_(), + cur_qualifier_(), + max_version_cnt_(0), + ttl_cnt_(0), + scan_cnt_(0), + is_last_row_ttl_(true), + is_hbase_table_(false), + last_row_(nullptr), + rowkey_cnt_(0) { } @@ -467,6 +483,20 @@ int ObTableTTLDeleteRowIterator::init(const schema::ObTableSchema &table_schema, } } } + + for (uint64_t i = 0; OB_SUCC(ret) && i < full_column_ids.count(); i++) { + const ObColumnSchemaV2 *column_schema = nullptr; + uint64_t column_id = full_column_ids.at(i); + if (OB_ISNULL(column_schema = table_schema.get_column_schema(column_id))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get column schema", KR(ret), K(column_id)); + } else if (!column_schema->is_rowkey_column()) { + PropertyPair pair(i, column_schema->get_column_name_str()); + if (OB_FAIL(properties_pairs_.push_back(pair))) { + LOG_WARN("fail to push back", KR(ret), K(i), "column_name", column_schema->get_column_name_str()); + } + } + } } if (OB_SUCC(ret) && is_hbase_table_ && ttl_operation.start_rowkey_.is_valid()) { @@ -566,6 +596,12 @@ int ObTableTTLDeleteTask::execute_ttl_delete(ObTableTTLDeleteRowIterator &ttl_ro LOG_WARN("fail to add rowkey value", K(ret)); } } + for (int64_t i = 0; OB_SUCC(ret) && i < ttl_row_iter.properties_pairs_.count(); i++) { + ObTableTTLDeleteRowIterator::PropertyPair &pair = ttl_row_iter.properties_pairs_.at(i); + if (OB_FAIL(delete_entity_.set_property(pair.property_name_, row->get_cell(pair.cell_idx_)))) { + LOG_WARN("fail to add rowkey value", K(ret), K(i), K_(ttl_row_iter.properties_pairs)); + } + } if (OB_SUCC(ret)) { int64_t tmp_affect_rows = 0; diff --git a/src/observer/table/ttl/ob_table_ttl_task.h b/src/observer/table/ttl/ob_table_ttl_task.h index d79a26eb0f..4ef60fe417 100644 --- a/src/observer/table/ttl/ob_table_ttl_task.h +++ b/src/observer/table/ttl/ob_table_ttl_task.h @@ -34,7 +34,22 @@ public: virtual int get_next_row(ObNewRow*& row); int init(const share::schema::ObTableSchema &table_schema, const table::ObTableTTLOperation &ttl_operation); int64_t get_rowkey_column_cnt() const { return rowkey_cnt_; } + public: + struct PropertyPair + { + PropertyPair() = default; + PropertyPair(uint64_t cell_idx, const common::ObString &property_name) + : cell_idx_(cell_idx), + property_name_(property_name) + {} + uint64_t cell_idx_; + common::ObString property_name_; + TO_STRING_KV(K_(cell_idx), K_(property_name)); + }; + +public: + common::ObArenaAllocator allocator_; bool is_inited_; int32_t max_version_; int64_t time_to_live_ms_; // ttl in millisecond @@ -51,8 +66,10 @@ public: ObNewRow *last_row_; common::ObTableTTLChecker ttl_checker_; int64_t rowkey_cnt_; - + // map new row -> rowkey column common::ObSArray rowkey_cell_ids_; + // map new row -> normal column + common::ObSArray properties_pairs_; }; @@ -93,9 +110,9 @@ public: private: static const int64_t RETRY_INTERVAL = 30 * 60 * 1000 * 1000l; // 30min - static const int64_t PER_TASK_DEL_ROWS = 10000l; + static const int64_t PER_TASK_DEL_ROWS = 1024l; static const int64_t ONE_TASK_TIMEOUT = 1 * 60 * 1000 * 1000l; // 1min - + static const ObString TTL_TRACE_INFO; private: int process_one(); diff --git a/src/observer/table/ttl/ob_tenant_ttl_manager.cpp b/src/observer/table/ttl/ob_tenant_ttl_manager.cpp index 340b3d1f09..311aec7c84 100644 --- a/src/observer/table/ttl/ob_tenant_ttl_manager.cpp +++ b/src/observer/table/ttl/ob_tenant_ttl_manager.cpp @@ -37,6 +37,8 @@ void ObClearTTLHistoryTask::runTimerTask() } else if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ob clear ttl history task is not init", KR(ret)); + } else if (is_paused_) { + // timer paused, do nothing } else if (ObTTLUtil::check_can_do_work()) { int ret = OB_SUCCESS; const int64_t now = ObTimeUtility::current_time(); @@ -86,6 +88,16 @@ int ObClearTTLHistoryTask::init(const uint64_t tenant_id, common::ObMySQLProxy & return ret; } +void ObClearTTLHistoryTask::resume() +{ + is_paused_ = false; +} + +void ObClearTTLHistoryTask::pause() +{ + is_paused_ = true; +} + int ObTTLTaskScheduler::init(const uint64_t tenant_id, common::ObMySQLProxy &sql_proxy) { int ret = OB_SUCCESS; @@ -519,6 +531,16 @@ void ObTTLTaskScheduler::reset_local_tenant_task() tenant_task_.reset(); } +void ObTTLTaskScheduler::resume() +{ + is_paused_ = false; +} + +void ObTTLTaskScheduler::pause() +{ + is_paused_ = true; +} + int ObTenantTTLManager::init(const uint64_t tenant_id, ObMySQLProxy &sql_proxy) { int ret = OB_SUCCESS; @@ -591,6 +613,8 @@ void ObTTLTaskScheduler::runTimerTask() } else if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ttl task mgr not init", KR(ret)); + } else if (is_paused_) { + // timer paused, do nothing } else if (OB_FAIL(reload_tenant_task())) { LOG_WARN("fail to process tenant task", KR(ret), K_(tenant_id)); } else if (OB_FAIL(try_add_periodic_task())) { @@ -663,6 +687,18 @@ int ObTTLTaskScheduler::move_all_task_to_history_table() return ret; } +void ObTenantTTLManager::resume() +{ + clear_ttl_history_task_.resume(); + task_scheduler_.resume(); + task_scheduler_.set_need_reload(true);} + +void ObTenantTTLManager::pause() +{ + clear_ttl_history_task_.pause(); + task_scheduler_.pause(); +} + int ObTTLTaskScheduler::check_task_need_move(bool &need_move) { int ret = OB_SUCCESS; diff --git a/src/observer/table/ttl/ob_tenant_ttl_manager.h b/src/observer/table/ttl/ob_tenant_ttl_manager.h index 5106c8e5f5..6c78f49121 100644 --- a/src/observer/table/ttl/ob_tenant_ttl_manager.h +++ b/src/observer/table/ttl/ob_tenant_ttl_manager.h @@ -35,12 +35,15 @@ public: ObClearTTLHistoryTask() : sql_proxy_(nullptr), is_inited_(false), - tenant_id_(OB_INVALID_TENANT_ID) + tenant_id_(OB_INVALID_TENANT_ID), + is_paused_(false) {} ~ObClearTTLHistoryTask() {} int init(const uint64_t tenant_id, common::ObMySQLProxy &sql_proxy); virtual void runTimerTask() override; void destroy() {} + void pause(); + void resume(); static const int64_t OB_KV_TTL_GC_INTERVAL = 30 * 1000L * 1000L; // 30s static const int64_t OB_KV_TTL_GC_COUNT_PER_TASK = 4096L; @@ -48,6 +51,7 @@ private: common::ObMySQLProxy *sql_proxy_; bool is_inited_; uint64_t tenant_id_; + bool is_paused_; }; struct ObTTLServerInfo @@ -90,7 +94,8 @@ class ObTTLTaskScheduler : public common::ObTimerTask { public: ObTTLTaskScheduler() - : del_ten_arr_(), sql_proxy_(nullptr), is_inited_(false), periodic_launched_(false), need_reload_(true) + : del_ten_arr_(), sql_proxy_(nullptr), is_inited_(false), periodic_launched_(false), + need_reload_(true), is_paused_(false) {} ~ObTTLTaskScheduler() {} @@ -107,6 +112,9 @@ public: int try_add_periodic_task(); void set_need_reload(bool need_reload) { need_reload_ = need_reload; } + + void pause(); + void resume(); private: virtual bool is_enable_ttl(); @@ -158,6 +166,7 @@ private: bool need_reload_; lib::ObMutex mutex_; ObArray tablet_table_pairs_; + bool is_paused_; const int64_t OB_TTL_TASK_RETRY_INTERVAL = 15*1000*1000; // 15s }; @@ -180,6 +189,8 @@ public: void stop(); void destroy(); int handle_user_ttl(const obrpc::ObTTLRequestArg& arg); + void resume(); + void pause(); private: bool is_inited_; ObClearTTLHistoryTask clear_ttl_history_task_; diff --git a/src/observer/table/ttl/ob_ttl_service.cpp b/src/observer/table/ttl/ob_ttl_service.cpp index ad396d71f7..88b0d2a25b 100644 --- a/src/observer/table/ttl/ob_ttl_service.cpp +++ b/src/observer/table/ttl/ob_ttl_service.cpp @@ -49,8 +49,14 @@ int ObTTLService::switch_to_leader() } } if (OB_SUCC(ret)) { - if (OB_FAIL(tenant_ttl_mgr_->start())) { - LOG_WARN("fail to start tenant_ttl_mgr", K_(tenant_id), KR(ret)); + if (!has_start_) { + if (OB_FAIL(tenant_ttl_mgr_->start())) { + LOG_WARN("fail to start tenant_ttl_mgr", K_(tenant_id), KR(ret)); + } else { + has_start_ = true; + } + } else { + tenant_ttl_mgr_->resume(); } } } @@ -75,8 +81,9 @@ void ObTTLService::inner_switch_to_follower() { FLOG_INFO("ttl_service: switch_to_follower", K_(tenant_id)); const int64_t start_time_us = ObTimeUtility::current_time(); - stop(); - wait(); + if (OB_NOT_NULL(tenant_ttl_mgr_)) { + tenant_ttl_mgr_->pause(); + } const int64_t cost_us = ObTimeUtility::current_time() - start_time_us; FLOG_INFO("ttl_service: switch_to_follower", K_(tenant_id), K(cost_us), KP_(tenant_ttl_mgr)); } diff --git a/src/observer/table/ttl/ob_ttl_service.h b/src/observer/table/ttl/ob_ttl_service.h index 6da04643b0..6ad5638bf1 100644 --- a/src/observer/table/ttl/ob_ttl_service.h +++ b/src/observer/table/ttl/ob_ttl_service.h @@ -29,7 +29,8 @@ public: ObTTLService() : is_inited_(false), tenant_id_(OB_INVALID_TENANT_ID), - tenant_ttl_mgr_(nullptr) + tenant_ttl_mgr_(nullptr), + has_start_(false) {} virtual ~ObTTLService(); int init(const uint64_t tenant_id); @@ -73,6 +74,7 @@ private: bool is_inited_; int64_t tenant_id_; ObTenantTTLManager *tenant_ttl_mgr_; + bool has_start_; }; } } diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index af556359e7..b7f7574a5b 100755 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -1623,6 +1623,10 @@ DEF_TIME(kv_ttl_history_recycle_interval, OB_TENANT_PARAMETER, "7d", "[1d, 180d] DEF_BOOL(enable_kv_ttl, OB_TENANT_PARAMETER, "False", "specifies whether ttl task is enbled", ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_INT(ttl_thread_score, OB_TENANT_PARAMETER, "0", "[0,100]", + "the current work thread score of ttl thread. Range: [0,100] in integer. Especially, 0 means default value", + ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); + DEF_STR_WITH_CHECKER(sql_protocol_min_tls_version, OB_CLUSTER_PARAMETER, "none", common::ObConfigSQLTlsVersionChecker, "SQL SSL control options, used to specify the minimum SSL/TLS version number. " diff --git a/src/share/scheduler/ob_dag_scheduler.cpp b/src/share/scheduler/ob_dag_scheduler.cpp index ceecc0c99e..080e22aa4d 100755 --- a/src/share/scheduler/ob_dag_scheduler.cpp +++ b/src/share/scheduler/ob_dag_scheduler.cpp @@ -1566,6 +1566,7 @@ void ObTenantDagScheduler::reload_config() set_thread_score(ObDagPrio::DAG_PRIO_HA_MID, tenant_config->ha_mid_thread_score); set_thread_score(ObDagPrio::DAG_PRIO_HA_LOW, tenant_config->ha_low_thread_score); set_thread_score(ObDagPrio::DAG_PRIO_DDL, tenant_config->ddl_thread_score); + set_thread_score(ObDagPrio::DAG_PRIO_TTL, tenant_config->ttl_thread_score); } } diff --git a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result index 108c5bb4dc..c265011ec6 100644 --- a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result +++ b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result @@ -229,6 +229,7 @@ tenant_sql_net_thread_count tenant_task_queue_size trace_log_slow_query_watermark trx_2pc_retry_interval +ttl_thread_score undo_retention user_block_cache_priority user_row_cache_priority