support multi black servers
This commit is contained in:
@ -295,14 +295,28 @@ int ObBackupDataLSTaskMgr::update_black_server(
|
||||
const ObAddr &block_server)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t full_replica_num = 0;
|
||||
ObSqlString black_server_sql_string("");
|
||||
ObSEArray<ObAddr, OB_MAX_MEMBER_NUMBER> new_black_servers_;
|
||||
if (!block_server.is_valid() || !ls_attr.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("[DATA_BACKUP]invalid argument", K(ret), K(block_server), K(ls_attr));
|
||||
} else if (OB_FAIL(lease_service.check_lease())) {
|
||||
} else if (OB_FAIL(ObBackupUtils::get_full_replica_num(ls_attr.tenant_id_, full_replica_num))) {
|
||||
LOG_WARN("failed to get full replica num", K(ret));
|
||||
} else if (ls_attr.black_servers_.count() + 1 == full_replica_num) {
|
||||
// all replicas are in black servers, clear the black servers.
|
||||
} else if (OB_FAIL(new_black_servers_.assign(ls_attr.black_servers_))) {
|
||||
LOG_WARN("failed to assign black servers", K(ret));
|
||||
} else if (OB_FAIL(new_black_servers_.push_back(block_server))) {
|
||||
LOG_WARN("failed to push back black server", K(ret));
|
||||
} else if (OB_FAIL(ls_attr.get_black_server_str(new_black_servers_, black_server_sql_string))) {
|
||||
LOG_WARN("failed to get black server str", K(ret), K(new_black_servers_));
|
||||
}
|
||||
if (FAILEDx(lease_service.check_lease())) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to check lease", K(ret));
|
||||
} else if (OB_FAIL(ObBackupLSTaskOperator::update_black_server(
|
||||
sql_proxy, ls_attr.task_id_, ls_attr.tenant_id_, ls_attr.ls_id_, block_server))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to update block server", K(ret), K(ls_attr), K(block_server));
|
||||
sql_proxy, ls_attr.task_id_, ls_attr.tenant_id_, ls_attr.ls_id_, black_server_sql_string.string()))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to update block server", K(ret), K(ls_attr), K(black_server_sql_string));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1514,24 +1514,22 @@ int ObBackupLSTaskOperator::update_black_server(
|
||||
const int64_t task_id,
|
||||
const uint64_t tenant_id,
|
||||
const ObLSID &ls_id,
|
||||
const ObAddr &block_server)
|
||||
const ObString &black_servers)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql;
|
||||
int64_t affected_rows = -1;
|
||||
ObDMLSqlSplicer dml;
|
||||
char black_server_str[OB_MAX_SERVER_ADDR_SIZE] = "";
|
||||
block_server.ip_port_to_string(black_server_str, OB_MAX_SERVER_ADDR_SIZE);
|
||||
if (task_id <= 0 || tenant_id == OB_INVALID_TENANT_ID || !ls_id.is_valid() || !block_server.is_valid()) {
|
||||
if (task_id <= 0 || tenant_id == OB_INVALID_TENANT_ID || !ls_id.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("[DATA_BACKUP]invalid argument", K(ret), K(task_id), K(tenant_id), K(ls_id), K(block_server));
|
||||
LOG_WARN("[DATA_BACKUP]invalid argument", K(ret), K(task_id), K(tenant_id), K(ls_id));
|
||||
} else if (OB_FAIL(dml.add_pk_column(OB_STR_TENANT_ID, tenant_id))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to add column", K(ret));
|
||||
} else if (OB_FAIL(dml.add_pk_column(OB_STR_TASK_ID, task_id))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to add column", K(ret));
|
||||
} else if (OB_FAIL(dml.add_pk_column(OB_STR_LS_ID, ls_id.id()))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to add column", K(ret));
|
||||
} else if (OB_FAIL(dml.add_column(OB_STR_BLACK_LIST, black_server_str))) {
|
||||
} else if (OB_FAIL(dml.add_column(OB_STR_BLACK_LIST, black_servers.ptr()))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to add column", K(ret));
|
||||
} else if (OB_FAIL(dml.splice_update_sql(OB_ALL_BACKUP_LS_TASK_TNAME, sql))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to splice_update_sql", K(ret));
|
||||
@ -1838,7 +1836,7 @@ int ObBackupLSTaskOperator::do_parse_ls_result_(ObMySQLResult &result, ObBackupL
|
||||
char status_str[OB_DEFAULT_STATUS_LENTH] = "";
|
||||
char backup_type_str[OB_SYS_TASK_TYPE_LENGTH] = "";
|
||||
char task_type_str[64] = "";
|
||||
char black_list_str[OB_MAX_SERVER_ADDR_SIZE] = "";
|
||||
char black_list_str[OB_INNER_TABLE_DEFAULT_VALUE_LENTH] = "";
|
||||
char trace_id_str[OB_MAX_TRACE_ID_BUFFER_SIZE] = "";
|
||||
|
||||
EXTRACT_INT_FIELD_MYSQL(result, OB_STR_TASK_ID, ls_attr.task_id_, int64_t);
|
||||
@ -1851,7 +1849,7 @@ int ObBackupLSTaskOperator::do_parse_ls_result_(ObMySQLResult &result, ObBackupL
|
||||
EXTRACT_STRBUF_FIELD_MYSQL(result, OB_STR_TASK_TYPE, task_type_str, 64, real_length);
|
||||
EXTRACT_INT_FIELD_MYSQL(result, OB_STR_START_TS, ls_attr.start_ts_, int64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(result, OB_STR_END_TS, ls_attr.end_ts_, int64_t);
|
||||
EXTRACT_STRBUF_FIELD_MYSQL(result, OB_STR_BLACK_LIST, black_list_str, OB_MAX_SERVER_ADDR_SIZE, real_length);
|
||||
EXTRACT_STRBUF_FIELD_MYSQL(result, OB_STR_BLACK_LIST, black_list_str, OB_INNER_TABLE_DEFAULT_VALUE_LENTH, real_length);
|
||||
EXTRACT_INT_FIELD_MYSQL(result, OB_STR_DATE, ls_attr.backup_date_, int64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(result, OB_STR_TURN_ID, ls_attr.turn_id_, int64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(result, OB_STR_RETRY_ID, ls_attr.retry_id_, int64_t);
|
||||
@ -1879,8 +1877,8 @@ int ObBackupLSTaskOperator::do_parse_ls_result_(ObMySQLResult &result, ObBackupL
|
||||
LOG_WARN("[DATA_BACKUP]failed to set backup_type", K(ret), K(backup_type_str));
|
||||
} else if (OB_FAIL(ls_attr.task_type_.set_type(task_type_str))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to set task type", K(ret), K(task_type_str));
|
||||
} else if (OB_FAIL(parse_string_to_addr_(black_list_str, ls_attr.black_servers_))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to parse string to addr", K(ret));
|
||||
} else if (OB_FAIL(ls_attr.set_black_servers(black_list_str))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to parse black list str", K(ret));
|
||||
} else if (strcmp(trace_id_str, "") != 0 && OB_FAIL(ls_attr.task_trace_id_.set(trace_id_str))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to set task trace id", K(ret), K(trace_id_str));
|
||||
} else if (!ls_attr.dst_.set_ip_addr(server_str, static_cast<int32_t>(port))) {
|
||||
@ -1892,22 +1890,6 @@ int ObBackupLSTaskOperator::do_parse_ls_result_(ObMySQLResult &result, ObBackupL
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBackupLSTaskOperator::parse_string_to_addr_(
|
||||
const char *str,
|
||||
ObIArray<common::ObAddr> &servers)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObAddr server;
|
||||
if (0 == strcmp(str, "")) {
|
||||
} else if (OB_FAIL(server.parse_from_cstring(str))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to parse server from cstring", K(ret), K(str));
|
||||
} else if (OB_FAIL(servers.push_back(server))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to push server back", K(server));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int ObBackupLSTaskOperator::advance_status(
|
||||
common::ObISQLClient &proxy,
|
||||
const ObBackupLSTaskAttr &ls_attr,
|
||||
|
||||
@ -161,14 +161,13 @@ public:
|
||||
static int update_stats_(common::ObISQLClient &proxy, const int64_t task_id, const uint64_t tenant_id,
|
||||
const ObLSID &ls_id, const ObBackupStats &stats);
|
||||
static int update_black_server(common::ObISQLClient &proxy, const int64_t task_id, const uint64_t tenant_id,
|
||||
const ObLSID &ls_id, const ObAddr &block_server);
|
||||
const ObLSID &ls_id, const ObString &black_servers);
|
||||
static int delete_ls_task_without_sys(common::ObISQLClient &proxy, const uint64_t tenant_id, const int64_t task_id);
|
||||
private:
|
||||
static int fill_dml_with_ls_task_(const ObBackupLSTaskAttr &ls_attr, ObDMLSqlSplicer &dml);
|
||||
static int fill_select_ls_task_sql_(ObSqlString &sql);
|
||||
static int parse_ls_result_(sqlclient::ObMySQLResult &result, ObIArray<ObBackupLSTaskAttr> &ls_attrs);
|
||||
static int do_parse_ls_result_(sqlclient::ObMySQLResult &result, ObBackupLSTaskAttr &ls_attr);
|
||||
static int parse_string_to_addr_(const char *str, ObIArray<common::ObAddr> &servers);
|
||||
};
|
||||
|
||||
class ObBackupLSTaskInfoOperator : public ObBackupBaseTableOperator
|
||||
|
||||
@ -2049,6 +2049,26 @@ int ObBackupUtils::check_tenant_data_version_match(const uint64_t tenant_id, con
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBackupUtils::get_full_replica_num(const uint64_t tenant_id, int64_t &replica_num)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
replica_num = 0;
|
||||
ObMultiVersionSchemaService *schema_service = nullptr;
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
const ObTenantSchema *tenant_info = nullptr;
|
||||
if (OB_ISNULL(schema_service = GCTX.schema_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema service must not be null", K(ret));
|
||||
} else if (OB_FAIL(schema_service->get_tenant_schema_guard(tenant_id, schema_guard))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to get_tenant_schema_guard", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id, tenant_info))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to get tenant info", K(ret), K(tenant_id));
|
||||
} else {
|
||||
replica_num = tenant_info->get_full_replica_num();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBackupUtils::get_backup_info_default_timeout_ctx(ObTimeoutCtx &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -3505,6 +3525,58 @@ bool ObBackupLSTaskAttr::is_valid() const
|
||||
&& turn_id_ > 0;
|
||||
}
|
||||
|
||||
int ObBackupLSTaskAttr::get_black_server_str(const ObIArray<ObAddr> &black_servers, ObSqlString &sql_string) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
char black_server_str[OB_MAX_SERVER_ADDR_SIZE] = "";
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < black_servers.count(); i++) {
|
||||
const ObAddr &server = black_servers.at(i);
|
||||
if (OB_FALSE_IT(MEMSET(black_server_str, 0, OB_MAX_SERVER_ADDR_SIZE))) {
|
||||
} else if (OB_FALSE_IT(server.ip_port_to_string(black_server_str, OB_MAX_SERVER_ADDR_SIZE))) {
|
||||
} else if (OB_FAIL(sql_string.append_fmt("%.*s%s",
|
||||
static_cast<int>(OB_MAX_SERVER_ADDR_SIZE), black_server_str,
|
||||
i == black_servers.count() - 1 ? "" : ","))) {
|
||||
LOG_WARN("failed to append fmt black server", K(ret));
|
||||
} else if (sql_string.length() > OB_INNER_TABLE_DEFAULT_VALUE_LENTH) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid black servers", K(ret), K(sql_string));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBackupLSTaskAttr::set_black_servers(const ObString &str)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
char *buf = nullptr;
|
||||
ObArenaAllocator allocator;
|
||||
int64_t buf_len = str.length() + 1;
|
||||
ObAddr server;
|
||||
if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(buf_len)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("fail to alloc string", K(ret), "buf length", buf_len);
|
||||
} else if (OB_FALSE_IT(MEMSET(buf, 0, buf_len))) {
|
||||
} else if (OB_FAIL(databuff_printf(buf, buf_len, "%.*s", static_cast<int>(str.length()), str.ptr()))) {
|
||||
LOG_WARN("fail to print str", K(ret), K(str), K(buf_len));
|
||||
} else {
|
||||
char *token = nullptr;
|
||||
char *saveptr = nullptr;
|
||||
token = buf;
|
||||
for (char *str = token; OB_SUCC(ret); str = nullptr) {
|
||||
token = ::STRTOK_R(str, ",", &saveptr);
|
||||
if (OB_ISNULL(token)) {
|
||||
break;
|
||||
} else if (OB_FALSE_IT(server.reset())) {
|
||||
} else if (OB_FAIL(server.parse_from_cstring(token))) {
|
||||
LOG_WARN("fail to assign backup set path", K(ret));
|
||||
} else if (OB_FAIL(black_servers_.push_back(server))) {
|
||||
LOG_WARN("fail to push back path", K(ret), K(server));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBackupLSTaskAttr::assign(const ObBackupLSTaskAttr &other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -1020,6 +1020,7 @@ public:
|
||||
share::SCN &start_replay_scn);
|
||||
static int get_backup_scn(const uint64_t &tenant_id, share::SCN &scn);
|
||||
static int check_tenant_data_version_match(const uint64_t tenant_id, const uint64_t data_version);
|
||||
static int get_full_replica_num(const uint64_t tenant_id, int64_t &replica_num);
|
||||
private:
|
||||
static const int64_t RETRY_INTERVAL = 10 * 1000 * 1000;
|
||||
static const int64_t MAX_RETRY_TIMES = 3;
|
||||
@ -1402,6 +1403,8 @@ struct ObBackupLSTaskAttr final
|
||||
~ObBackupLSTaskAttr() {}
|
||||
bool is_valid() const;
|
||||
int assign(const ObBackupLSTaskAttr &other);
|
||||
int get_black_server_str(const ObIArray<ObAddr> &black_servers, ObSqlString &sql_string) const;
|
||||
int set_black_servers(const ObString &str);
|
||||
TO_STRING_KV(K_(task_id), K_(tenant_id), K_(ls_id), K_(job_id), K_(backup_set_id), K_(backup_type), K_(task_type),
|
||||
K_(status), K_(start_ts), K_(end_ts), K_(backup_date), K_(black_servers), K_(dst), K_(task_trace_id),
|
||||
K_(stats), K_(start_turn_id), K_(turn_id), K_(retry_id), K_(result));
|
||||
|
||||
Reference in New Issue
Block a user