Fix circular wait between row lock and tenant rpc thread
This commit is contained in:
@ -3263,7 +3263,7 @@ int ObDDLTaskRecordOperator::select_for_update(
|
|||||||
} else {
|
} else {
|
||||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||||
sqlclient::ObMySQLResult *result = NULL;
|
sqlclient::ObMySQLResult *result = NULL;
|
||||||
if (OB_FAIL(sql_string.assign_fmt("SELECT status, execution_id FROM %s WHERE task_id = %lu FOR UPDATE",
|
if (OB_FAIL(sql_string.assign_fmt("SELECT status, execution_id FROM %s WHERE task_id = %lu FOR UPDATE NOWAIT",
|
||||||
OB_ALL_DDL_TASK_STATUS_TNAME, task_id))) {
|
OB_ALL_DDL_TASK_STATUS_TNAME, task_id))) {
|
||||||
LOG_WARN("assign sql string failed", K(ret), K(task_id), K(tenant_id));
|
LOG_WARN("assign sql string failed", K(ret), K(task_id), K(tenant_id));
|
||||||
} else if (OB_FAIL(trans.read(res, tenant_id, sql_string.ptr()))) {
|
} else if (OB_FAIL(trans.read(res, tenant_id, sql_string.ptr()))) {
|
||||||
|
|||||||
@ -483,12 +483,20 @@ int ObGlobalStatProxy::get(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObGlobalStatProxy::select_snapshot_gc_scn_for_update_nowait(
|
||||||
|
common::ObISQLClient &sql_client,
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
SCN &snapshot_gc_scn)
|
||||||
|
{
|
||||||
|
return inner_get_snapshot_gc_scn_(sql_client, tenant_id, snapshot_gc_scn, "FOR UPDATE NOWAIT");
|
||||||
|
}
|
||||||
|
|
||||||
int ObGlobalStatProxy::select_snapshot_gc_scn_for_update(
|
int ObGlobalStatProxy::select_snapshot_gc_scn_for_update(
|
||||||
common::ObISQLClient &sql_client,
|
common::ObISQLClient &sql_client,
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
SCN &snapshot_gc_scn)
|
SCN &snapshot_gc_scn)
|
||||||
{
|
{
|
||||||
return inner_get_snapshot_gc_scn_(sql_client, tenant_id, snapshot_gc_scn, true);
|
return inner_get_snapshot_gc_scn_(sql_client, tenant_id, snapshot_gc_scn, "FOR UPDATE");
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObGlobalStatProxy::get_snapshot_gc_scn(
|
int ObGlobalStatProxy::get_snapshot_gc_scn(
|
||||||
@ -496,14 +504,14 @@ int ObGlobalStatProxy::get_snapshot_gc_scn(
|
|||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
SCN &snapshot_gc_scn)
|
SCN &snapshot_gc_scn)
|
||||||
{
|
{
|
||||||
return inner_get_snapshot_gc_scn_(sql_client, tenant_id, snapshot_gc_scn, false);
|
return inner_get_snapshot_gc_scn_(sql_client, tenant_id, snapshot_gc_scn, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObGlobalStatProxy::inner_get_snapshot_gc_scn_(
|
int ObGlobalStatProxy::inner_get_snapshot_gc_scn_(
|
||||||
common::ObISQLClient &sql_client,
|
common::ObISQLClient &sql_client,
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
SCN &snapshot_gc_scn,
|
SCN &snapshot_gc_scn,
|
||||||
const bool is_for_update)
|
const char *for_update_str)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
uint64_t snapshot_gc_scn_val = 0;
|
uint64_t snapshot_gc_scn_val = 0;
|
||||||
@ -512,7 +520,7 @@ int ObGlobalStatProxy::inner_get_snapshot_gc_scn_(
|
|||||||
ObSqlString sql;
|
ObSqlString sql;
|
||||||
if (OB_FAIL(sql.assign_fmt(
|
if (OB_FAIL(sql.assign_fmt(
|
||||||
"SELECT column_value FROM %s WHERE TABLE_NAME = '__all_global_stat' AND COLUMN_NAME"
|
"SELECT column_value FROM %s WHERE TABLE_NAME = '__all_global_stat' AND COLUMN_NAME"
|
||||||
" = 'snapshot_gc_scn' %s", OB_ALL_CORE_TABLE_TNAME, (is_for_update ? "FOR UPDATE" : "")))) {
|
" = 'snapshot_gc_scn' %s", OB_ALL_CORE_TABLE_TNAME, for_update_str))) {
|
||||||
LOG_WARN("assign sql failed", K(ret));
|
LOG_WARN("assign sql failed", K(ret));
|
||||||
} else if (OB_FAIL(sql_client.read(res, tenant_id, sql.ptr()))) {
|
} else if (OB_FAIL(sql_client.read(res, tenant_id, sql.ptr()))) {
|
||||||
LOG_WARN("execute sql failed", K(ret), K(sql));
|
LOG_WARN("execute sql failed", K(ret), K(sql));
|
||||||
|
|||||||
@ -82,6 +82,9 @@ public:
|
|||||||
|
|
||||||
virtual int get_snapshot_info(int64_t &snapshot_gc_scn,
|
virtual int get_snapshot_info(int64_t &snapshot_gc_scn,
|
||||||
int64_t &gc_schema_version);
|
int64_t &gc_schema_version);
|
||||||
|
static int select_snapshot_gc_scn_for_update_nowait(common::ObISQLClient &sql_client,
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
SCN &snapshot_gc_scn);
|
||||||
static int select_snapshot_gc_scn_for_update(common::ObISQLClient &sql_client,
|
static int select_snapshot_gc_scn_for_update(common::ObISQLClient &sql_client,
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
SCN &snapshot_gc_scn);
|
SCN &snapshot_gc_scn);
|
||||||
@ -107,7 +110,7 @@ private:
|
|||||||
static int inner_get_snapshot_gc_scn_(common::ObISQLClient &sql_client,
|
static int inner_get_snapshot_gc_scn_(common::ObISQLClient &sql_client,
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
SCN &snapshot_gc_scn,
|
SCN &snapshot_gc_scn,
|
||||||
const bool is_for_update);
|
const char *for_update_str);
|
||||||
int update(const ObGlobalStatItem::ItemList &list, const bool is_incremental = false);
|
int update(const ObGlobalStatItem::ItemList &list, const bool is_incremental = false);
|
||||||
int get(ObGlobalStatItem::ItemList &list, bool for_update = false);
|
int get(ObGlobalStatItem::ItemList &list, bool for_update = false);
|
||||||
|
|
||||||
|
|||||||
@ -204,7 +204,7 @@ int ObSnapshotTableProxy::batch_add_snapshot(
|
|||||||
info.snapshot_scn_ = snapshot_scn;
|
info.snapshot_scn_ = snapshot_scn;
|
||||||
info.schema_version_ = schema_version;
|
info.schema_version_ = schema_version;
|
||||||
info.comment_ = comment;
|
info.comment_ = comment;
|
||||||
if (OB_FAIL(ObGlobalStatProxy::select_snapshot_gc_scn_for_update(trans, tenant_id, snapshot_gc_scn))) {
|
if (OB_FAIL(ObGlobalStatProxy::select_snapshot_gc_scn_for_update_nowait(trans, tenant_id, snapshot_gc_scn))) {
|
||||||
LOG_WARN("fail to select gc timstamp for update", KR(ret), K(info), K(tenant_id));
|
LOG_WARN("fail to select gc timstamp for update", KR(ret), K(info), K(tenant_id));
|
||||||
}
|
}
|
||||||
while (OB_SUCC(ret) && report_idx < tablet_id_array.count()) {
|
while (OB_SUCC(ret) && report_idx < tablet_id_array.count()) {
|
||||||
|
|||||||
Reference in New Issue
Block a user