[FEAT MERGE] tablelock_42

Co-authored-by: YangEfei <yangyifei96@outlook.com>
Co-authored-by: joseph12138 <17862707376@163.com>
This commit is contained in:
obdev
2023-05-12 03:11:22 +00:00
committed by ob-robot
parent 3ed0e51931
commit a9b068daa1
23 changed files with 1324 additions and 290 deletions

View File

@ -90,6 +90,8 @@ enum ObLogBaseType
// for dup table trans
DUP_TABLE_LOG_BASE_TYPE = 26,
// for obj lock garbage collect service
OBJ_LOCK_GARBAGE_COLLECT_SERVICE_LOG_BASE_TYPE = 27,
// pay attention!!!
// add log type in log_base_type_to_string
// max value
@ -158,6 +160,8 @@ int log_base_type_to_string(const ObLogBaseType log_type,
strncpy(str ,"PADDING_LOG_ENTRY", str_len);
} else if (log_type == DUP_TABLE_LOG_BASE_TYPE) {
strncpy(str ,"DUP_TABLE", str_len);
} else if (log_type == OBJ_LOCK_GARBAGE_COLLECT_SERVICE_LOG_BASE_TYPE) {
strncpy(str ,"OBJ_LOCK_GARBAGE_COLLECT_SERVICE", str_len);
} else {
ret = OB_INVALID_ARGUMENT;
}

View File

@ -209,7 +209,8 @@ public:
v.no_more_test_ = true;
v.retry_type_ = RETRY_TYPE_NONE;
if (OB_ERR_INSUFFICIENT_PX_WORKER == v.err_ ||
OB_ERR_EXCLUSIVE_LOCK_CONFLICT == v.err_) {
OB_ERR_EXCLUSIVE_LOCK_CONFLICT == v.err_ ||
OB_ERR_EXCLUSIVE_LOCK_CONFLICT_NOWAIT == v.err_) {
v.client_ret_ = v.err_;
} else if (is_try_lock_row_err(v.session_.get_retry_info().get_last_query_retry_err())) {
// timeout caused by locking, should return OB_ERR_EXCLUSIVE_LOCK_CONFLICT

View File

@ -12,7 +12,6 @@
#define USING_LOG_PREFIX SQL_ENG
#include "sql/engine/cmd/ob_lock_table_executor.h"
#include "sql/resolver/ddl/ob_lock_table_stmt.h"
#include "sql/engine/ob_exec_context.h"
@ -35,10 +34,38 @@ int ObLockTableExecutor::execute(ObExecContext &ctx,
} else if (is_mysql_mode()) {
LOG_DEBUG("mysql mode do nothing");
} else {
if (OB_FAIL(ObSqlTransControl::lock_table(ctx,
stmt.get_table_id(),
stmt.get_lock_mode()))) {
LOG_WARN("fail lock table", K(ret), K(stmt.get_lock_mode()), K(stmt.get_table_id()));
const common::ObIArray<TableItem *> &table_items = stmt.get_table_items();
if (OB_UNLIKELY(table_items.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("there's no table in the stmt", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < table_items.count(); ++i) {
TableItem *table_item = table_items.at(i);
// handle compatibility
// If the version of cluster is updated than 4.1, it can use
// 'wait n' or 'no wait' grammar. Otherwise, it should follow
// the previous logic (i.e. try lock until trx / sql timeout)
int64_t wait_lock_seconds;
if (GET_MIN_CLUSTER_VERSION() > CLUSTER_VERSION_4_1_0_0) {
wait_lock_seconds = stmt.get_wait_lock_seconds();
} else {
wait_lock_seconds = -1;
}
if (OB_FAIL(ObSqlTransControl::lock_table(
ctx, table_item->ref_id_, table_item->part_ids_,
stmt.get_lock_mode(), wait_lock_seconds))) {
if ((OB_TRY_LOCK_ROW_CONFLICT == ret ||
OB_ERR_EXCLUSIVE_LOCK_CONFLICT == ret ||
OB_ERR_SHARED_LOCK_CONFLICT == ret) &&
wait_lock_seconds >= 0) {
ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT_NOWAIT;
}
LOG_WARN("fail lock table", K(ret), K(stmt.get_lock_mode()),
K(wait_lock_seconds), K(table_item->ref_id_),
K(table_item->part_ids_));
}
}
bool explicit_trans = session->has_explicit_start_trans();
bool ac = false;
@ -57,6 +84,5 @@ int ObLockTableExecutor::execute(ObExecContext &ctx,
return ret;
}
} // sql
} // oceanbase
} // namespace sql
} // namespace oceanbase

View File

@ -16,6 +16,7 @@
#define USING_LOG_PREFIX SQL_EXE
#include "share/ob_schema_status_proxy.h" // ObSchemaStatusProxy
#include "share/schema/ob_tenant_schema_service.h"
#include "sql/ob_sql_trans_control.h"
#include "sql/engine/ob_physical_plan.h"
#include "sql/engine/ob_physical_plan_ctx.h"
@ -1090,7 +1091,9 @@ int ObSqlTransControl::acquire_tx_if_need_(transaction::ObTransService *txs, ObS
int ObSqlTransControl::lock_table(ObExecContext &exec_ctx,
const uint64_t table_id,
const ObTableLockMode lock_mode)
const ObIArray<ObObjectID> &part_ids,
const ObTableLockMode lock_mode,
const int64_t wait_lock_seconds)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx);
@ -1106,26 +1109,58 @@ int ObSqlTransControl::lock_table(ObExecContext &exec_ctx,
OZ (txs->acquire_tx(session->get_tx_desc(), session->get_sessid()), *session);
}
ObTxParam tx_param;
ObLockTableRequest arg;
OZ (build_tx_param_(session, tx_param));
// calculate lock table timeout
int64_t lock_timeout_us = 0;
{
int64_t stmt_expire_ts = 0;
int64_t tx_expire_ts = 0;
int64_t lock_wait_expire_ts = 0;
OX (stmt_expire_ts = get_stmt_expire_ts(plan_ctx, *session));
OZ (get_trans_expire_ts(*session, tx_expire_ts));
OX (lock_timeout_us = MAX(200L, MIN(stmt_expire_ts, tx_expire_ts) - ObTimeUtility::current_time()));
if (wait_lock_seconds < 0) {
// It means that there's no opt about wait or no wait,
// so we just use the deafult timeout config here.
OX (lock_timeout_us = MAX(200L, MIN(stmt_expire_ts, tx_expire_ts) -
ObTimeUtility::current_time()));
} else {
// The priority of stmt_expire_ts and tx_expire_ts is higher than
// wait N. So if the statement or transaction is timeout, it should
// return error code, rather than wait until N seconds.
lock_wait_expire_ts =
MIN3(session->get_query_start_time() + wait_lock_seconds * 1000 * 1000, stmt_expire_ts, tx_expire_ts);
OX (lock_timeout_us = lock_wait_expire_ts - ObTimeUtility::current_time());
lock_timeout_us = lock_timeout_us < 0 ? 0 : lock_timeout_us;
}
}
arg.table_id_ = table_id;
arg.owner_id_ = 0;
arg.lock_mode_ = lock_mode;
arg.op_type_ = ObTableLockOpType::IN_TRANS_COMMON_LOCK;
arg.timeout_us_ = lock_timeout_us;
OZ (lock_service->lock_table(*session->get_tx_desc(),
tx_param,
arg),
tx_param, table_id, lock_mode, lock_timeout_us);
if (part_ids.empty()) {
ObLockTableRequest arg;
arg.table_id_ = table_id;
arg.owner_id_ = 0;
arg.lock_mode_ = lock_mode;
arg.op_type_ = ObTableLockOpType::IN_TRANS_COMMON_LOCK;
arg.timeout_us_ = lock_timeout_us;
OZ (lock_service->lock_table(*session->get_tx_desc(),
tx_param,
arg),
tx_param, table_id, lock_mode, lock_timeout_us);
} else {
ObLockPartitionRequest arg;
arg.table_id_ = table_id;
arg.owner_id_ = 0;
arg.lock_mode_ = lock_mode;
arg.op_type_ = ObTableLockOpType::IN_TRANS_COMMON_LOCK;
arg.timeout_us_ = lock_timeout_us;
for (int64_t i = 0; i < part_ids.count() && OB_SUCC(ret); ++i) {
arg.part_object_id_ = part_ids.at(i);
OZ(lock_service->lock_partition_or_subpartition(*session->get_tx_desc(),
tx_param, arg),
tx_param, table_id, lock_mode, lock_timeout_us);
}
}
return ret;
}
@ -1148,6 +1183,7 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id,
|| !addr.is_valid()
|| max_stale_time_us <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ls_id), K(addr), K(max_stale_time_us));
} else if (observer::ObServer::get_instance().get_self() == addr) {
storage::ObLSService *ls_svr = MTL(storage::ObLSService *);

View File

@ -233,7 +233,9 @@ public:
static int get_trans_result(ObExecContext &exec_ctx, transaction::ObTxExecResult &trans_result);
static int lock_table(ObExecContext &exec_ctx,
const uint64_t table_id,
const transaction::tablelock::ObTableLockMode lock_mode);
const ObIArray<ObObjectID> &part_ids,
const transaction::tablelock::ObTableLockMode lock_mode,
const int64_t wait_lock_seconds);
static void clear_xa_branch(const transaction::ObXATransID &xid, transaction::ObTxDesc *&tx_desc);
static int check_ls_readable(const uint64_t tenant_id,
const share::ObLSID &ls_id,

View File

@ -14,6 +14,7 @@
#include "ob_lock_table_resolver.h"
#include "ob_lock_table_stmt.h"
#include "sql/resolver/dml/ob_dml_stmt.h"
#include "sql/parser/ob_parser_utils.h"
namespace oceanbase
{
@ -55,9 +56,9 @@ int ObLockTableResolver::resolve_mysql_mode(const ParseNode &parse_tree)
int ObLockTableResolver::resolve_oracle_mode(const ParseNode &parse_tree)
{
int ret = OB_SUCCESS;
ObLockTableStmt *lock_stmt = NULL;
ObLockTableStmt *lock_stmt = nullptr;
if (2 != parse_tree.num_child_) {
if (3 != parse_tree.num_child_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("wrong node number", K(ret), K(parse_tree.num_child_));
} else if (OB_ISNULL(parse_tree.children_)
@ -72,62 +73,66 @@ int ObLockTableResolver::resolve_oracle_mode(const ParseNode &parse_tree)
stmt_ = lock_stmt;
}
// 1. resolve table items
// 1. resolve table item
if (OB_SUCC(ret)) {
ParseNode *table_node = parse_tree.children_[TABLE];
ParseNode *table_node = parse_tree.children_[TABLE_LIST];
if (OB_ISNULL(table_node)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid parse tree", K(ret));
} else if (OB_FAIL(resolve_table_relation_node(*table_node))) {
} else if (OB_FAIL(resolve_table_list(*table_node))) {
LOG_WARN("resolve table failed", K(ret));
}
}
// 2. resolve lock mode
if (OB_SUCC(ret)) {
if (OB_FAIL(resolve_lock_mode(*parse_tree.children_[LOCK_MODE]))) {
LOG_WARN("resolve where clause failed", K(ret));
LOG_WARN("resolve lock mode failed", K(ret));
}
}
// 3. resolve wait
if (OB_SUCC(ret)) {
// this node maybe null if user didn't input opt about wait
if (OB_NOT_NULL(parse_tree.children_[WAIT])) {
if (OB_FAIL(resolve_wait_lock(*parse_tree.children_[WAIT]))) {
LOG_WARN("resolve wait opt for table lock failed", K(ret));
}
}
}
return ret;
}
// TODO: yanyuan.cxf only deal with one table name.
int ObLockTableResolver::resolve_table_relation_node(const ParseNode &parse_tree)
int ObLockTableResolver::resolve_table_list(const ParseNode &table_list)
{
int ret = OB_SUCCESS;
// TODO: yanyuan.cxf release TableItem
ObString table_name;
ObString database_name;
const ObTableSchema *table_schema = NULL;
int64_t tenant_id = session_info_->get_effective_tenant_id();
uint64_t database_id = OB_INVALID_ID;
ObLockTableStmt *lock_stmt = get_lock_table_stmt();
TableItem *table_item = nullptr;
if (OB_ISNULL(lock_stmt)) {
if (OB_UNLIKELY(T_TABLE_REFERENCES != table_list.type_ &&
T_RELATION_FACTOR != table_list.type_) ||
OB_UNLIKELY(OB_ISNULL(table_list.children_)) ||
OB_UNLIKELY(table_list.num_child_ < 1)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(table_list.type_), K(table_list.num_child_));
} else if (OB_ISNULL(lock_stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("lock stmt should not be null", K(ret));
} else if (OB_ISNULL(schema_checker_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema_checker should not be null", K(ret));
} else if (OB_FAIL(ObDDLResolver::resolve_table_relation_node(&parse_tree,
table_name,
database_name))) {
LOG_WARN("failed to resolve table relation node", K(ret));
} else if (OB_FAIL(schema_checker_->get_database_id(tenant_id,
database_name,
database_id))) {
LOG_WARN("failed to get database id", K(ret));
} else if (OB_FAIL(schema_checker_->get_table_schema(tenant_id,
database_name,
table_name,
false,
table_schema))){
LOG_WARN("failed to get table schema", K(ret));
} else if (OB_ISNULL(table_schema)) {
LOG_WARN("null table schema", K(ret));
} else {
lock_stmt->set_table_id(table_schema->get_table_id());
LOG_WARN("invalid lock table stmt", K(lock_stmt));
}
for (int64_t i = 0; OB_SUCC(ret) && i < table_list.num_child_; ++i) {
const ParseNode *table_node = table_list.children_[i];
const ObTableSchema *table_schema = nullptr;
if (OB_ISNULL(table_node)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table node is null");
} else if (OB_FAIL(ObDMLResolver::resolve_table(*table_node, table_item))) {
LOG_WARN("failed to resolve table", K(ret));
} else if (table_item->is_function_table() || table_item->is_json_table()) {//兼容oracle行为
ret = OB_WRONG_TABLE_NAME;
LOG_WARN("invalid table name", K(ret));
} else {
LOG_DEBUG("succ to add lock table item", KPC(table_item));
}
}
return ret;
}
@ -145,5 +150,22 @@ int ObLockTableResolver::resolve_lock_mode(const ParseNode &parse_tree)
return ret;
}
int ObLockTableResolver::resolve_wait_lock(const ParseNode &parse_tree)
{
int ret = OB_SUCCESS;
ObLockTableStmt *lock_stmt = get_lock_table_stmt();
int64_t wait_lock_seconds = parse_tree.value_;
if (OB_ISNULL(lock_stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("lock stmt should not be null");
} else if (wait_lock_seconds < 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("lock wait time should not be negative", K(wait_lock_seconds));
} else {
lock_stmt->set_wait_lock_seconds(parse_tree.value_);
}
return ret;
}
} // namespace sql
} // namespace oceanbase

View File

@ -14,7 +14,7 @@
#define OCEANBASE_SQL_RESOLVER_DML_OB_LOCK_TABLE_RESOLVER_
#include "sql/resolver/ddl/ob_lock_table_stmt.h"
#include "sql/resolver/ddl/ob_ddl_resolver.h"
#include "sql/resolver/dml/ob_dml_resolver.h"
namespace oceanbase
{
@ -25,15 +25,15 @@ class ObLockTableStmt;
// NOTE: yanyuan.cxf LOCK TABLE is dml at oracle, but it does not have
// SQL plan, so we treat it as ddl operator.
class ObLockTableResolver : public ObDDLResolver
class ObLockTableResolver : public ObDMLResolver
{
public:
static const int64_t TABLE = 0; /* 0. table node */
static const int64_t LOCK_MODE = 1; /* 1. lock mode node */
static const int64_t WAIT_NODE = 2; /* 2. wait node? */
static const int64_t TABLE_LIST = 0;
static const int64_t LOCK_MODE = 1;
static const int64_t WAIT = 2;
public:
explicit ObLockTableResolver(ObResolverParams &params)
: ObDDLResolver(params)
: ObDMLResolver(params)
{}
virtual ~ObLockTableResolver()
{}
@ -42,8 +42,9 @@ public:
private:
int resolve_mysql_mode(const ParseNode &parse_tree);
int resolve_oracle_mode(const ParseNode &parse_tree);
int resolve_table_relation_node(const ParseNode &parse_tree);
int resolve_table_list(const ParseNode &table_list);
int resolve_lock_mode(const ParseNode &parse_tree);
int resolve_wait_lock(const ParseNode &parse_tree);
DISALLOW_COPY_AND_ASSIGN(ObLockTableResolver);
};

View File

@ -12,37 +12,36 @@
#ifndef OCEANBASE_SQL_RESOLVER_DML_OB_LOCK_TABLE_STMT_
#define OCEANBASE_SQL_RESOLVER_DML_OB_LOCK_TABLE_STMT_
#include "sql/resolver/ddl/ob_ddl_stmt.h"
#include "sql/resolver/dml/ob_dml_stmt.h"
#include "sql/resolver/ob_cmd.h"
#include "share/schema/ob_schema_struct.h"
namespace oceanbase
{
namespace sql
{
class ObLockTableStmt : public ObStmt, public ObICmd
class ObLockTableStmt : public ObDMLStmt, public ObICmd
{
public:
explicit ObLockTableStmt()
: ObStmt(stmt::T_LOCK_TABLE),
: ObDMLStmt(stmt::T_LOCK_TABLE),
lock_mode_(0),
table_id_(0)
wait_lock_seconds_(-1)
{}
virtual ~ObLockTableStmt()
{}
virtual int get_cmd_type() const { return get_stmt_type(); }
void set_lock_mode(const int64_t lock_mode) { lock_mode_ = lock_mode; }
void set_table_id(const uint64_t table_id) { table_id_ = table_id; }
void set_wait_lock_seconds(const int64_t wait_lock_seconds) { wait_lock_seconds_ = wait_lock_seconds; }
int64_t get_lock_mode() const { return lock_mode_; }
uint64_t get_table_id() const { return table_id_; }
int64_t get_wait_lock_seconds() const { return wait_lock_seconds_; }
private:
int64_t lock_mode_;
uint64_t table_id_;
int64_t wait_lock_seconds_;
DISALLOW_COPY_AND_ASSIGN(ObLockTableStmt);
};
} // namespace sql
} // namespace oceanbase

View File

@ -173,6 +173,7 @@ int ObLS::init(const share::ObLSID &ls_id,
REGISTER_TO_LOGSERVICE(logservice::DDL_LOG_BASE_TYPE, &ls_ddl_log_handler_);
REGISTER_TO_LOGSERVICE(logservice::KEEP_ALIVE_LOG_BASE_TYPE, &keep_alive_ls_handler_);
REGISTER_TO_LOGSERVICE(logservice::GC_LS_LOG_BASE_TYPE, &gc_handler_);
REGISTER_TO_LOGSERVICE(logservice::OBJ_LOCK_GARBAGE_COLLECT_SERVICE_LOG_BASE_TYPE, &lock_table_);
REGISTER_TO_LOGSERVICE(logservice::RESERVED_SNAPSHOT_LOG_BASE_TYPE, &reserved_snapshot_clog_handler_);
REGISTER_TO_LOGSERVICE(logservice::MEDIUM_COMPACTION_LOG_BASE_TYPE, &medium_compaction_clog_handler_);
@ -641,6 +642,7 @@ void ObLS::destroy()
UNREGISTER_FROM_LOGSERVICE(logservice::DDL_LOG_BASE_TYPE, &ls_ddl_log_handler_);
UNREGISTER_FROM_LOGSERVICE(logservice::KEEP_ALIVE_LOG_BASE_TYPE, &keep_alive_ls_handler_);
UNREGISTER_FROM_LOGSERVICE(logservice::GC_LS_LOG_BASE_TYPE, &gc_handler_);
UNREGISTER_FROM_LOGSERVICE(logservice::OBJ_LOCK_GARBAGE_COLLECT_SERVICE_LOG_BASE_TYPE, &lock_table_);
UNREGISTER_FROM_LOGSERVICE(logservice::RESERVED_SNAPSHOT_LOG_BASE_TYPE, &reserved_snapshot_clog_handler_);
UNREGISTER_FROM_LOGSERVICE(logservice::MEDIUM_COMPACTION_LOG_BASE_TYPE, &medium_compaction_clog_handler_);
if (ls_meta_.ls_id_ == IDS_LS) {

View File

@ -486,6 +486,12 @@ public:
// int get_lock_op_iter(const ObLockID &lock_id,
// ObLockOpIterator &iter);
DELEGATE_WITH_RET(lock_table_, get_lock_op_iter, int);
// check and clear lock ops and obj locks in this ls (or lock_table)
// @param[in] force_compact, if it's set to true, the gc thread will
// force compact unlock op which is committed, even though there's
// no paired lock op.
// int check_and_clear_obj_lock(const bool force_compact)
DELEGATE_WITH_RET(lock_table_, check_and_clear_obj_lock, int);
// set the member_list of log_service
// @param [in] member_list, the member list to be set.

View File

@ -593,6 +593,18 @@ int ObLockMemtable::get_lock_op_iter(const ObLockID &lock_id,
return ret;
}
int ObLockMemtable::check_and_clear_obj_lock(const bool force_compact)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TABLELOCK_LOG(WARN, "ObLockMemtable not inited.", K(ret));
} else if (OB_FAIL(obj_lock_map_.check_and_clear_obj_lock(force_compact))) {
TABLELOCK_LOG(WARN, "check and clear obj lock failed", K(ret));
}
return ret;
}
int ObLockMemtable::scan(
const ObTableIterParam &param,
ObTableAccessContext &context,

View File

@ -103,6 +103,14 @@ public:
int get_lock_op_iter(const ObLockID &lock_id,
ObLockOpIterator &iter);
// Iterate obj lock in lock map, and check 2 status of it:
// 1. Check whether the lock ops in the obj lock can be compacted.
// If it can be compacted (i.e. there're paired lock op and unlock
// op), remove them from the obj lock and recycle resources.
// 2. Check whether the obj lock itself is empty.
// If it's empty (i.e. no lock ops in it), remove it from the lock
// map and recycle resources.
int check_and_clear_obj_lock(const bool force_compact);
// ================ INHERITED FROM ObIMemtable ===============
// We need to inherient the memtable method for merge process to iterate the
// lock for dumping the lock table.

View File

@ -16,6 +16,7 @@
#include "storage/tx/ob_trans_define_v4.h"
#include "storage/tablelock/ob_table_lock_common.h"
#include "storage/tablelock/ob_lock_table.h"
#include "storage/tablelock/ob_table_lock_service.h"
#include "common/ob_tablet_id.h" // ObTabletID
#include "share/ob_rpc_struct.h" // ObBatchCreateTabletArg
@ -727,6 +728,57 @@ int ObLockTable::admin_update_lock_op(const ObTableLockOp &op_info,
return ret;
}
int ObLockTable::check_and_clear_obj_lock(const bool force_compact)
{
int ret = OB_SUCCESS;
ObTableHandleV2 handle;
ObLockMemtable *lock_memtable = nullptr;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObLockTable is not inited", K(ret));
} else if (OB_FAIL(get_lock_memtable(handle))) {
LOG_WARN("get lock memtable failed", K(ret));
} else if (OB_FAIL(handle.get_lock_memtable(lock_memtable))) {
LOG_WARN("get lock memtable from lock handle failed", K(ret));
} else if (OB_FAIL(lock_memtable->check_and_clear_obj_lock(force_compact))) {
LOG_WARN("check and clear obj lock failed", K(ret));
}
return ret;
}
int ObLockTable::switch_to_leader()
{
int ret = OB_SUCCESS;
ObTableLockService::ObOBJLockGarbageCollector *obj_lock_gc = nullptr;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObLockTable is not inited", K(ret));
} else if (OB_FAIL(MTL(ObTableLockService *)
->get_obj_lock_garbage_collector(obj_lock_gc))) {
LOG_WARN("can not get ObOBJLockGarbageCollector", K(ret));
} else if (OB_ISNULL(obj_lock_gc)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ObOBJLockGarbageCollector is null", K(ret));
} else {
if (OB_NOT_NULL(parent_)) {
LOG_INFO("start to check and clear obj lock when switch to leader", K(ret),
K(parent_->get_ls_id()));
}
ret = obj_lock_gc->obj_lock_gc_thread_pool_.commit_task_ignore_ret(
[this]() { return check_and_clear_obj_lock(true); });
}
if (OB_FAIL(ret)) {
if (OB_ISNULL(parent_)) {
LOG_WARN("parent ls of ObLockTable is null", K(ret));
} else {
LOG_WARN("collect obj lock garbage when switch to leader failed", K(ret),
K(parent_->get_ls_id()));
}
}
return ret;
}
} // tablelock
} // transaction
} // oceanbase

View File

@ -17,6 +17,7 @@
#include "lib/worker.h"
#include "storage/ob_i_table.h"
#include "storage/tablelock/ob_obj_lock.h"
#include "logservice/ob_log_base_type.h"
namespace oceanbase
{
@ -62,7 +63,9 @@ struct ObLockParam;
class ObTableLockOp;
class ObLockMemtable;
class ObLockTable
class ObLockTable : public logservice::ObIReplaySubHandler,
public logservice::ObIRoleChangeSubHandler,
public logservice::ObICheckpointSubHandler
{
public:
ObLockTable()
@ -129,6 +132,24 @@ public:
const share::SCN &commit_version,
const share::SCN &commit_scn,
const ObTableLockOpStatus status);
// check and clear paired lock ops which can be compacted,
// and clear empty obj locks to recycle resources.
// See the ObLockMemtable::check_and_clear_obj_lock for deatails.
int check_and_clear_obj_lock(const bool force_compact);
// for replay
int replay(const void *buffer,
const int64_t nbytes,
const palf::LSN &lsn,
const share::SCN &scn) override { return OB_SUCCESS; }
// for checkpoint
share::SCN get_rec_scn() override { return share::SCN::max_scn(); }
int flush(share::SCN &rec_scn) override { return OB_SUCCESS; }
// for role change
void switch_to_follower_forcedly() override{};
int switch_to_leader() override;
int switch_to_follower_gracefully() override { return OB_SUCCESS; }
int resume_leader() override { return OB_SUCCESS; }
private:
// We use the method to recover the lock_table for reboot.
int restore_lock_table_(storage::ObITable &sstable);

View File

@ -181,8 +181,6 @@ int ObOBJLock::slow_lock(
ObTableLockOpLinkNode *lock_op_node = NULL;
uint64_t tenant_id = MTL_ID();
bool conflict_with_dml_lock = false;
bool unused_is_compacted = false;
const bool is_force_compact = true;
ObMemAttr attr(tenant_id, "ObTableLockOp");
// 1. check lock conflict.
// 2. record lock op.
@ -193,8 +191,7 @@ int ObOBJLock::slow_lock(
} else if (OB_FAIL(check_allow_lock_(lock_op,
lock_mode_in_same_trans,
conflict_tx_set,
conflict_with_dml_lock,
allocator))) {
conflict_with_dml_lock))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("check allow lock failed", K(ret), K(lock_op));
}
@ -399,6 +396,8 @@ int ObOBJLock::update_lock_status(const ObTableLockOp &lock_op,
allocator,
unused_is_compacted))) {
LOG_WARN("compact tablelock failed", K(tmp_ret), K(lock_op));
} else {
drop_op_list_if_empty_(lock_op.lock_mode_, op_list, allocator);
}
}
if (OB_SUCC(ret) &&
@ -453,8 +452,6 @@ int ObOBJLock::fast_lock(
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
bool is_compacted = false;
const bool is_force_compact = true;
{
// lock first time
RDLockGuard guard(rwlock_);
@ -469,31 +466,6 @@ int ObOBJLock::fast_lock(
LOG_DEBUG("succeed create lock ", K(lock_op));
}
}
// compact if need.
if (OB_TRY_LOCK_ROW_CONFLICT == ret) {
WRLockGuard guard(rwlock_);
if (is_deleted_) {
ret = OB_EAGAIN;
need_retry = false;
} else if(OB_TMP_FAIL(compact_tablelock_(allocator, is_compacted, is_force_compact))) {
// compact the obj lock to make sure the lock op that need compact will
// not block the lock operation next time.
LOG_WARN("compact tablelock failed", K(tmp_ret), K(lock_op));
}
}
if (OB_TRY_LOCK_ROW_CONFLICT == ret && is_compacted) {
RDLockGuard guard(rwlock_);
if (OB_FAIL(try_fast_lock_(lock_op,
lock_mode_in_same_trans,
need_retry,
conflict_tx_set))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("try fast lock failed", KR(ret), K(lock_op));
}
} else {
LOG_DEBUG("succeed create lock ", K(lock_op));
}
}
// 1. need retry basic conditions
if (ret == OB_TRY_LOCK_ROW_CONFLICT && !param.is_try_lock_) {
need_retry = true;
@ -590,7 +562,6 @@ int ObOBJLock::lock(
}
}
LOG_DEBUG("ObOBJLock::lock finish", K(ret), K(conflict_tx_set));
return ret;
}
@ -749,6 +720,17 @@ int ObOBJLock::get_table_lock_store_info(
return ret;
}
int ObOBJLock::compact_tablelock(ObMalloc &allocator,
bool &is_compacted,
const bool is_force) {
int ret = OB_SUCCESS;
WRLockGuard guard(rwlock_);
if (OB_FAIL(compact_tablelock_(allocator, is_compacted, is_force))) {
LOG_WARN("compact table lock failed", K(ret), K(is_compacted), K(is_force));
}
return ret;
}
bool ObOBJLockMap::GetTableLockStoreInfoFunctor::operator() (
ObOBJLock *obj_lock)
{
@ -828,6 +810,50 @@ int ObOBJLockMap::get_lock_op_iter(const ObLockID &lock_id,
return ret;
}
int ObOBJLockMap::check_and_clear_obj_lock(const bool force_compact)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObLockIDIterator lock_id_iter;
ObLockID lock_id;
ObOBJLock *obj_lock = nullptr;
bool is_compacted = false;
if (OB_FAIL(get_lock_id_iter(lock_id_iter))) {
TABLELOCK_LOG(WARN, "get lock id iterator failed", K(ret));
} else {
do {
if (OB_FAIL(lock_id_iter.get_next(lock_id))) {
if (OB_ITER_END != ret) {
TABLELOCK_LOG(WARN, "fail to get next obj lock", K(ret));
}
} else if (OB_FAIL(get_obj_lock_with_ref_(lock_id, obj_lock))) {
if (ret != OB_OBJ_LOCK_NOT_EXIST) {
TABLELOCK_LOG(WARN, "get obj lock failed", K(ret), K(lock_id));
} else {
// Concurrent deletion may occur here. If it is found
// that the obj lock cannot be gotten, it will continue
// to iterate the remaining obj lock.
TABLELOCK_LOG(WARN, "obj lock has been deleted", K(ret), K(lock_id));
ret = OB_SUCCESS;
continue;
}
} else {
if (OB_TMP_FAIL(
obj_lock->compact_tablelock(allocator_, is_compacted, force_compact))) {
TABLELOCK_LOG(WARN, "compact table lock failed", K(ret), K(tmp_ret),
K(lock_id));
}
drop_obj_lock_if_empty_(lock_id, obj_lock);
if (OB_NOT_NULL(obj_lock)) {
lock_map_.revert(obj_lock);
}
}
} while (OB_SUCC(ret));
}
ret = OB_ITER_END == ret ? OB_SUCCESS : ret;
return ret;
}
bool ObOBJLockMap::GetMinCommittedDDLLogtsFunctor::operator() (
ObOBJLock *obj_lock)
{
@ -848,7 +874,6 @@ bool ObOBJLockMap::GetMinCommittedDDLLogtsFunctor::operator() (
return bool_ret;
}
int ObOBJLock::check_op_allow_lock_(const ObTableLockOp &lock_op)
{
int ret = OB_SUCCESS;
@ -919,42 +944,6 @@ int ObOBJLock::check_allow_unlock_(
return ret;
}
int ObOBJLock::check_allow_lock_(
const ObTableLockOp &lock_op,
const ObTableLockMode &lock_mode_in_same_trans,
ObTxIDSet &conflict_tx_set,
bool &conflict_with_dml_lock,
ObMalloc &allocator)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
bool is_compacted = false;
const bool is_force_compact = true;
if (OB_FAIL(check_allow_lock_(lock_op,
lock_mode_in_same_trans,
conflict_tx_set,
conflict_with_dml_lock))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("check allow lock failed", K(ret), K(lock_op));
} else if (OB_TMP_FAIL(compact_tablelock_(allocator, is_compacted, is_force_compact))) {
// compact the obj lock to make sure the lock op that need compact will
// not block the lock operation next time.
LOG_WARN("compact tablelock failed", K(tmp_ret), K(lock_op));
} else if (!is_compacted) {
// do nothing
} else if (OB_FAIL(check_allow_lock_(lock_op,
lock_mode_in_same_trans,
conflict_tx_set,
conflict_with_dml_lock))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("check allow lock failed", K(ret), K(lock_op));
}
}
}
return ret;
}
int ObOBJLock::check_allow_lock_(
const ObTableLockOp &lock_op,
const ObTableLockMode &lock_mode_in_same_trans,
@ -1018,43 +1007,17 @@ int ObOBJLock::check_allow_lock(
const bool only_check_dml_lock)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
bool is_compacted = false;
const bool is_force_compact = true;
if (OB_UNLIKELY(!lock_op.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument.", K(ret), K(lock_op));
} else {
// prevent from create new lock op.
// may be we need compact the lock.
{
RDLockGuard guard(rwlock_);
ret = check_allow_lock_(lock_op,
lock_mode_in_same_trans,
conflict_tx_set,
conflict_with_dml_lock,
include_finish_tx,
only_check_dml_lock);
}
// compact if need.
if (OB_TRY_LOCK_ROW_CONFLICT == ret) {
WRLockGuard guard(rwlock_);
if(OB_TMP_FAIL(compact_tablelock_(allocator, is_compacted, is_force_compact))) {
// compact the obj lock to make sure the lock op that need compact will
// not block the lock operation next time.
LOG_WARN("compact tablelock failed", K(tmp_ret), K(lock_op));
}
}
// recheck if compacted
if (OB_TRY_LOCK_ROW_CONFLICT == ret && is_compacted) {
RDLockGuard guard(rwlock_);
ret = check_allow_lock_(lock_op,
lock_mode_in_same_trans,
conflict_tx_set,
conflict_with_dml_lock,
include_finish_tx,
only_check_dml_lock);
}
RDLockGuard guard(rwlock_);
ret = check_allow_lock_(lock_op,
lock_mode_in_same_trans,
conflict_tx_set,
conflict_with_dml_lock,
include_finish_tx,
only_check_dml_lock);
}
return ret;
}
@ -1588,6 +1551,7 @@ int ObOBJLock::compact_tablelock_(ObTableLockOpList *&op_list,
// do nothing
}
}
drop_op_list_if_empty_(unlock_op.lock_mode_, op_list, allocator);
if (OB_OBJ_LOCK_NOT_EXIST == ret) {
// compact finished succeed
ret = OB_SUCCESS;
@ -1724,12 +1688,11 @@ int ObOBJLock::get_op_list(const ObTableLockMode mode,
void ObOBJLock::drop_op_list_if_empty_(
const ObTableLockMode mode,
const ObTableLockOpList *op_list,
ObTableLockOpList *&op_list,
ObMalloc &allocator)
{
int ret = OB_SUCCESS;
int map_index = 0;
ObTableLockOpList *tmp_op_list = NULL;
if (OB_ISNULL(op_list) || OB_UNLIKELY(!is_lock_mode_valid(mode))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(op_list), K(mode));
@ -1738,13 +1701,12 @@ void ObOBJLock::drop_op_list_if_empty_(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid lock mode", K(ret), K(mode), K(map_index));
} else if (op_list->get_size() == 0) {
if (OB_ISNULL(tmp_op_list = map_[map_index])) {
// empty list, do nothing
} else {
tmp_op_list->~ObTableLockOpList();
allocator.free(tmp_op_list);
map_[map_index] = NULL;
}
op_list->~ObTableLockOpList();
allocator.free(op_list);
map_[map_index] = NULL;
// We have to set op_list to NULL to avoid
// visit the op_list by the pointer again
op_list = NULL;
}
}
@ -1931,10 +1893,6 @@ int ObOBJLockMap::lock(
} else {
LOG_DEBUG("succeed create lock ", K(lock_op));
}
if (OB_FAIL(ret) && OB_NOT_NULL(obj_lock)) {
// drop map, should never fail.
drop_obj_lock_if_empty_(lock_op.lock_id_, obj_lock);
}
if (OB_NOT_NULL(obj_lock)) {
lock_map_.revert(obj_lock);
}
@ -1979,10 +1937,6 @@ int ObOBJLockMap::unlock(
} else {
LOG_DEBUG("succeed create unlock op ", K(lock_op));
}
if (OB_FAIL(ret) && OB_NOT_NULL(obj_lock)) {
// drop map, should never fail.
(void)drop_obj_lock_if_empty_(lock_op.lock_id_, obj_lock);
}
if (OB_NOT_NULL(obj_lock)) {
lock_map_.revert(obj_lock);
}
@ -2019,8 +1973,6 @@ void ObOBJLockMap::remove_lock_record(const ObTableLockOp &lock_op)
}
} else {
obj_lock->remove_lock_op(lock_op, allocator_);
// TODO: GC lock with gc thread.
drop_obj_lock_if_empty_(lock_op.lock_id_, obj_lock);
lock_map_.revert(obj_lock);
}
LOG_DEBUG("ObOBJLockMap::remove_lock_record finish.", K(ret));
@ -2085,10 +2037,6 @@ int ObOBJLockMap::recover_obj_lock(const ObTableLockOp &lock_op)
} else {
LOG_DEBUG("succeed create lock ", K(lock_op));
}
if (OB_FAIL(ret) && OB_NOT_NULL(obj_lock)) {
// drop map, should never fail.
drop_obj_lock_if_empty_(lock_op.lock_id_, obj_lock);
}
if (OB_NOT_NULL(obj_lock)) {
lock_map_.revert(obj_lock);
}
@ -2199,8 +2147,7 @@ void ObOBJLockMap::drop_obj_lock_if_empty_(
WRLockGuard guard(obj_lock->rwlock_);
// lock and delete flag make sure no one insert a new op.
// but maybe have deleted by another concurrent thread.
if (obj_lock->size_without_lock() == 0 &&
!obj_lock->is_deleted()) {
if (obj_lock->size_without_lock() == 0 && !obj_lock->is_deleted()) {
obj_lock->set_deleted();
if (OB_FAIL(get_obj_lock_with_ref_(lock_id, recheck_ptr))) {
if (ret != OB_OBJ_LOCK_NOT_EXIST) {
@ -2211,6 +2158,8 @@ void ObOBJLockMap::drop_obj_lock_if_empty_(
KP(recheck_ptr));
} else if (OB_FAIL(lock_map_.del(lock_id, obj_lock))) {
LOG_WARN("remove obj lock from map failed. ", K(ret), K(lock_id));
} else {
LOG_DEBUG("remove obj lock successfully", K(ret), K(lock_id), KPC(obj_lock));
}
}
}
@ -2222,7 +2171,6 @@ void ObOBJLockMap::drop_obj_lock_if_empty_(
K(obj_lock));
}
}
}
}

View File

@ -121,7 +121,9 @@ public:
int get_table_lock_store_info(
ObIArray<ObTableLockOp> &store_arr,
const share::SCN &freeze_scn);
int compact_tablelock(ObMalloc &allocator,
bool &is_compacted,
const bool is_force = false);
void reset(ObMalloc &allocator);
void reset_without_lock(ObMalloc &allocator);
int size_without_lock() const;
@ -139,12 +141,6 @@ public:
private:
void print_() const;
void reset_(ObMalloc &allocator);
int check_allow_lock_(
const ObTableLockOp &lock_op,
const ObTableLockMode &lock_mode_in_same_trans,
ObTxIDSet &conflict_tx_set,
bool &conflict_with_dml_lock,
ObMalloc &allocator);
int check_allow_lock_(
const ObTableLockOp &lock_op,
const ObTableLockMode &lock_mode_in_same_trans,
@ -203,7 +199,7 @@ private:
ObTableLockOpList *&op_list);
void drop_op_list_if_empty_(
const ObTableLockMode mode,
const ObTableLockOpList *op_list,
ObTableLockOpList *&op_list,
ObMalloc &allocator);
void delete_lock_op_from_list_(
const ObTableLockOp &lock_op,
@ -357,6 +353,8 @@ public:
// @param[out] iter, the iterator returned.
int get_lock_op_iter(const ObLockID &lock_id,
ObLockOpIterator &iter);
// check all obj locks in the lock map, and clear it if it's empty.
int check_and_clear_obj_lock(const bool force_compact);
private:
class LockIDIterFunctor
{

View File

@ -23,6 +23,7 @@
#include "share/schema/ob_tenant_schema_service.h"
#include "storage/tx/ob_trans_deadlock_adapter.h"
#include "storage/tx/ob_trans_service.h"
#include "storage/tx_storage/ob_ls_service.h"
namespace oceanbase
{
@ -91,6 +92,134 @@ ObTableLockService::ObTableLockCtx::ObTableLockCtx(const ObTableLockTaskType tas
obj_id_ = obj_id;
}
int64_t ObTableLockService::ObOBJLockGarbageCollector::GARBAGE_COLLECT_PRECISION = 100_ms;
int64_t ObTableLockService::ObOBJLockGarbageCollector::GARBAGE_COLLECT_EXEC_INTERVAL = 10_s;
int64_t ObTableLockService::ObOBJLockGarbageCollector::GARBAGE_COLLECT_TIMEOUT = 10_min;
ObTableLockService::ObOBJLockGarbageCollector::ObOBJLockGarbageCollector()
: timer_(),
timer_handle_(),
last_success_timestamp_(0) {}
ObTableLockService::ObOBJLockGarbageCollector::~ObOBJLockGarbageCollector() {}
int ObTableLockService::ObOBJLockGarbageCollector::start()
{
int ret = OB_SUCCESS;
if (OB_FAIL(obj_lock_gc_thread_pool_.init_and_start(
OBJ_LOCK_GC_THREAD_NUM))) {
LOG_WARN(
"fail to init and start gc thread pool for ObTableLockService::ObOBJLockGarbageCollector",
KR(ret));
} else if (OB_FAIL(timer_.init_and_start(obj_lock_gc_thread_pool_,
GARBAGE_COLLECT_PRECISION,
"OBJLockGC"))) {
LOG_WARN("fail to init and start timer for ObTableLockService::ObOBJLockGarbageCollector",
K(ret), KPC(this));
} else if (OB_FAIL(timer_.schedule_task_repeat(
timer_handle_, GARBAGE_COLLECT_EXEC_INTERVAL,
[this]() mutable {
int ret = OB_SUCCESS;
if (OB_FAIL(garbage_collect_for_all_ls_())) {
check_and_report_timeout_();
LOG_WARN(
"check and clear obj lock failed, will retry later",
K(ret), K(last_success_timestamp_), KPC(this));
} else {
last_success_timestamp_ = ObClockGenerator::getClock();
LOG_DEBUG("check and clear obj lock successfully", K(ret),
K(last_success_timestamp_), KPC(this));
}
return false;
}))) {
LOG_ERROR("ObTableLockService::ObOBJLockGarbageCollector schedules repeat task failed",
K(ret), KPC(this));
} else {
LOG_INFO("ObTableLockService::ObOBJLockGarbageCollector starts successfully", K(ret),
KPC(this));
}
return ret;
}
void ObTableLockService::ObOBJLockGarbageCollector::stop()
{
timer_handle_.stop();
LOG_INFO("ObTableLockService::ObOBJLockGarbageCollector stops successfully", KPC(this));
}
void ObTableLockService::ObOBJLockGarbageCollector::wait()
{
timer_handle_.wait();
LOG_INFO("ObTableLockService::ObOBJLockGarbageCollector waits successfully", KPC(this));
}
int ObTableLockService::ObOBJLockGarbageCollector::garbage_collect_right_now()
{
int ret = OB_SUCCESS;
if (!timer_.is_running()) {
ret = OB_NOT_INIT;
LOG_WARN("timer of ObTableLockService::ObOBJLockGarbageCollector is not running", K(ret));
} else if (!timer_handle_.is_running()) {
ret = OB_NOT_INIT;
LOG_WARN("timer_handle of ObTableLockService::ObOBJLockGarbageCollector is not running", K(ret));
} else if (OB_FAIL(timer_handle_.reschedule_after(10))) {
LOG_WARN("reschedule task for ObTableLockService::ObOBJLockGarbageCollector failed", K(ret));
}
return ret;
}
int ObTableLockService::ObOBJLockGarbageCollector::garbage_collect_for_all_ls_()
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObSharedGuard<ObLSIterator> ls_iter_guard;
ObLSService *ls_service = nullptr;
ObLS *ls = nullptr;
if (!timer_.is_running()) {
ret = OB_NOT_INIT;
LOG_WARN("timer of ObTableLockService::ObOBJLockGarbageCollector is not running", K(ret));
} else if (!timer_handle_.is_running()) {
ret = OB_NOT_INIT;
LOG_WARN("timer_handle of ObTableLockService::ObOBJLockGarbageCollector is not running", K(ret));
} else if (OB_ISNULL(ls_service = MTL(ObLSService *))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("mtl ObLSService should not be null", K(ret));
} else if (ls_service->get_ls_iter(ls_iter_guard,
ObLSGetMod::TABLELOCK_MOD)) {
LOG_WARN("fail to get ls iterator", K(ret));
} else {
do {
if (OB_FAIL(ls_iter_guard->get_next(ls))) {
if (OB_ITER_END != ret) {
LOG_WARN("get next iter failed", K(ret));
}
} else if (OB_TMP_FAIL(ls->check_and_clear_obj_lock(false))) {
LOG_WARN("check and clear obj lock failed", K(ret), K(tmp_ret),
K(ls->get_ls_id()));
} else {
LOG_INFO("start to check and clear obj lock", K(ls->get_ls_id()));
}
} while (OB_SUCC(ret));
}
ret = OB_ITER_END == ret ? OB_SUCCESS : ret;
return ret;
}
void ObTableLockService::ObOBJLockGarbageCollector::check_and_report_timeout_()
{
int ret = OB_SUCCESS;
int current_timestamp = ObClockGenerator::getClock();
if (last_success_timestamp_ > current_timestamp) {
LOG_ERROR("last success timestamp is not correct", K(current_timestamp),
K(last_success_timestamp_), KPC(this));
} else if (current_timestamp - last_success_timestamp_ >
GARBAGE_COLLECT_TIMEOUT &&
last_success_timestamp_ != 0) {
LOG_ERROR("task failed too many times", K(current_timestamp),
K(last_success_timestamp_), KPC(this));
}
}
bool ObTableLockService::ObTableLockCtx::is_timeout() const
{
return ObTimeUtility::current_time() >= abs_timeout_ts_;
@ -182,15 +311,18 @@ int ObTableLockService::init()
int ObTableLockService::start()
{
obj_lock_garbage_collector_.start();
return OB_SUCCESS;
}
void ObTableLockService::stop()
{
obj_lock_garbage_collector_.stop();
}
void ObTableLockService::wait()
{
obj_lock_garbage_collector_.wait();
}
void ObTableLockService::destroy()
@ -366,6 +498,10 @@ int ObTableLockService::lock_table(ObTxDesc &tx_desc,
LOG_WARN("invalid argument", K(ret), K(tx_desc), K(arg), K(tx_desc.is_valid()),
K(tx_param.is_valid()), K(arg.is_valid()));
} else {
// origin_timeout_us_ and timeout_us_ are both set as timeout_us_, which
// is set by user in the 'WAIT n' option.
// Furthermore, if timeout_us_ is 0, this lock will be judged as a try
// lock semantics. It meets the actual semantics of 'NOWAIT' option.
ObTableLockCtx ctx(LOCK_TABLE, arg.table_id_, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
ctx.tx_desc_ = &tx_desc;
@ -460,6 +596,41 @@ int ObTableLockService::unlock_tablet(ObTxDesc &tx_desc,
return ret;
}
int ObTableLockService::lock_partition_or_subpartition(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObLockPartitionRequest &arg)
{
int ret = OB_SUCCESS;
ObPartitionLevel part_level = PARTITION_LEVEL_MAX;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("lock service is not inited", K(ret));
} else if (OB_FAIL(get_table_partition_level_(arg.table_id_, part_level))) {
LOG_WARN("can not get table partition level", K(ret), K(arg));
} else {
switch (part_level) {
case PARTITION_LEVEL_ONE: {
if (OB_FAIL(lock_partition(tx_desc, tx_param, arg))) {
LOG_WARN("lock partition failed", K(ret), K(arg));
}
break;
}
case PARTITION_LEVEL_TWO: {
if (OB_FAIL(lock_subpartition(tx_desc, tx_param, arg))) {
LOG_WARN("lock subpartition failed", K(ret), K(arg));
}
break;
}
default: {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected partition level", K(ret), K(arg), K(part_level));
}
}
}
return ret;
}
int ObTableLockService::lock_partition(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObLockPartitionRequest &arg)
@ -623,6 +794,31 @@ int ObTableLockService::unlock_obj(ObTxDesc &tx_desc,
return ret;
}
int ObTableLockService::garbage_collect_right_now()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLockService is not be inited", K(ret));
} else if (OB_FAIL(obj_lock_garbage_collector_.garbage_collect_right_now())) {
LOG_WARN("garbage collect right now failed", K(ret));
} else {
LOG_DEBUG("garbage collect right now");
}
return ret;
}
int ObTableLockService::get_obj_lock_garbage_collector(ObOBJLockGarbageCollector *&obj_lock_garbage_collector)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTableLockService is not be inited", K(ret));
} else {
obj_lock_garbage_collector = &obj_lock_garbage_collector_;
}
return ret;
}
int ObTableLockService::process_lock_task_(ObTableLockCtx &ctx,
const ObTableLockMode lock_mode,
const ObTableLockOwnerID lock_owner)
@ -1072,6 +1268,33 @@ int ObTableLockService::deal_with_deadlock_(ObTableLockCtx &ctx)
return ret;
}
int ObTableLockService::get_table_partition_level_(const ObTableID table_id,
ObPartitionLevel &part_level)
{
int ret = OB_SUCCESS;
ObMultiVersionSchemaService *schema_service = MTL(ObTenantSchemaService*)->get_schema_service();
ObRefreshSchemaStatus schema_status;
ObTableSchema *table_schema = nullptr;
ObArenaAllocator allocator("TableSchema");
schema_status.tenant_id_ = MTL_ID();
if (OB_ISNULL(schema_service)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("can not get schema service", K(ret));
} else if (OB_FAIL(schema_service->get_schema_service()
->get_table_schema(schema_status,
table_id,
INT64_MAX - 1 /* refresh the newest schema */,
*sql_proxy_,
allocator,
table_schema))) {
LOG_WARN("can not get table schema", K(ret), K(table_id));
} else {
part_level = table_schema->get_part_level();
}
return ret;
}
int ObTableLockService::pack_batch_request_(ObTableLockCtx &ctx,
const ObTableLockTaskType task_type,
const ObTableLockMode &lock_mode,
@ -1155,7 +1378,6 @@ int ObTableLockService::batch_rpc_handle_(RpcProxy &proxy_batch,
const ObLockIDArray &lock_ids = data->second;
ObLockTaskBatchRequest request;
ObAddr addr;
ObTableLockTaskResult result;
if (OB_FAIL(ls_array.push_back(ls_id))) {
LOG_WARN("push_back lsid failed", K(ret), K(ls_id));
@ -1176,7 +1398,7 @@ int ObTableLockService::batch_rpc_handle_(RpcProxy &proxy_batch,
timeout_us,
ctx.tx_desc_->get_tenant_id(),
request))) {
LOG_WARN("failed to all async rpc", KR(ret), K(addr),
LOG_WARN("failed to call async rpc", KR(ret), K(addr),
K(ctx.abs_timeout_ts_), K(request));
} else {
rpc_count++;

View File

@ -17,16 +17,14 @@
#include "common/ob_tablet_id.h"
#include "share/ob_ls_id.h"
#include "share/ob_occam_timer.h"
#include "sql/ob_sql_trans_control.h"
#include "storage/tablelock/ob_table_lock_common.h"
#include "storage/tablelock/ob_table_lock_rpc_proxy.h"
#include "storage/tablelock/ob_table_lock_rpc_struct.h"
#include "lib/utility/ob_macro_utils.h"
namespace oceanbase
{
namespace rpc
{
namespace frame
@ -142,6 +140,36 @@ private:
K(current_savepoint_), K(need_rollback_ls_),
K(tablet_list_), K(schema_version_), K(tx_is_killed_), K(stmt_savepoint_));
};
public:
class ObOBJLockGarbageCollector
{
static const int OBJ_LOCK_GC_THREAD_NUM = 2;
public:
friend class ObLockTable;
ObOBJLockGarbageCollector();
~ObOBJLockGarbageCollector();
public:
int start();
void stop();
void wait();
int garbage_collect_right_now();
TO_STRING_KV(KP(this),
K_(last_success_timestamp));
private:
int garbage_collect_for_all_ls_();
void check_and_report_timeout_();
public:
static int64_t GARBAGE_COLLECT_PRECISION;
static int64_t GARBAGE_COLLECT_EXEC_INTERVAL;
static int64_t GARBAGE_COLLECT_TIMEOUT;
private:
common::ObOccamThreadPool obj_lock_gc_thread_pool_;
common::ObOccamTimer timer_;
common::ObOccamTimerTaskRAIIHandle timer_handle_;
int64_t last_success_timestamp_;
};
public:
typedef hash::ObHashMap<ObLockID, share::ObLSID> LockMap;
@ -149,6 +177,7 @@ public:
ObTableLockService()
: location_service_(nullptr),
sql_proxy_(nullptr),
obj_lock_garbage_collector_(),
is_inited_(false) {}
~ObTableLockService() {}
int init();
@ -225,6 +254,9 @@ public:
int unlock_tablet(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObUnLockTabletRequest &arg);
int lock_partition_or_subpartition(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObLockPartitionRequest &arg);
int lock_partition(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObLockPartitionRequest &arg);
@ -243,6 +275,8 @@ public:
int unlock_obj(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObUnLockObjRequest &arg);
int garbage_collect_right_now();
int get_obj_lock_garbage_collector(ObOBJLockGarbageCollector *&obj_lock_garbage_collector);
private:
bool need_retry_single_task_(const ObTableLockCtx &ctx,
const int64_t ret) const;
@ -355,6 +389,7 @@ private:
const ObTableLockOwnerID lock_owner);
// used by deadlock detector.
int deal_with_deadlock_(ObTableLockCtx &ctx);
int get_table_partition_level_(const ObTableID table_id, ObPartitionLevel &part_level);
DISALLOW_COPY_AND_ASSIGN(ObTableLockService);
private:
@ -364,6 +399,7 @@ private:
share::ObLocationService *location_service_;
common::ObMySQLProxy *sql_proxy_;
ObOBJLockGarbageCollector obj_lock_garbage_collector_;
bool is_inited_;
};