[Fix](CCR) Use tableId as the credential for CCR syncer instead of tableName (#21466)
This commit is contained in:
@ -914,6 +914,24 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
return result;
|
||||
}
|
||||
|
||||
private List<String> getTableNames(String cluster, String dbName, List<Long> tableIds) throws UserException {
|
||||
final String fullDbName = ClusterNamespace.getFullName(cluster, dbName);
|
||||
Database db = Env.getCurrentInternalCatalog().getDbNullable(fullDbName);
|
||||
if (db == null) {
|
||||
throw new UserException(String.format("can't find db named: %s", dbName));
|
||||
}
|
||||
List<String> tableNames = Lists.newArrayList();
|
||||
for (Long id : tableIds) {
|
||||
Table table = db.getTableNullable(id);
|
||||
if (table == null) {
|
||||
throw new UserException(String.format("can't find table id: %d in db: %s", id, dbName));
|
||||
}
|
||||
tableNames.add(table.getName());
|
||||
}
|
||||
|
||||
return tableNames;
|
||||
}
|
||||
|
||||
private void checkPasswordAndPrivs(String cluster, String user, String passwd, String db, String tbl,
|
||||
String clientIp, PrivPredicate predicate) throws AuthenticationException {
|
||||
checkPasswordAndPrivs(cluster, user, passwd, db, Lists.newArrayList(tbl), clientIp, predicate);
|
||||
@ -1070,8 +1088,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
if (!request.isSetDb()) {
|
||||
throw new UserException("db is not set");
|
||||
}
|
||||
if (!request.isSetTables()) {
|
||||
throw new UserException("tables is not set");
|
||||
if (!request.isSetTableIds()) {
|
||||
throw new UserException("table ids is not set");
|
||||
}
|
||||
if (!request.isSetLabel()) {
|
||||
throw new UserException("label is not set");
|
||||
@ -1084,7 +1102,9 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
|
||||
// step 1: check auth
|
||||
if (Strings.isNullOrEmpty(request.getToken())) {
|
||||
checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTables(),
|
||||
// lookup table ids && convert into tableNameList
|
||||
List<String> tableNameList = getTableNames(cluster, request.getDb(), request.getTableIds());
|
||||
checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), tableNameList,
|
||||
request.getUserIp(), PrivPredicate.LOAD);
|
||||
}
|
||||
|
||||
@ -1106,15 +1126,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
}
|
||||
|
||||
// step 4: fetch all tableIds
|
||||
// lookup tables && convert into tableIdList
|
||||
List<Long> tableIdList = Lists.newArrayList();
|
||||
for (String tblName : request.getTables()) {
|
||||
Table table = db.getTableOrMetaException(tblName, TableType.OLAP);
|
||||
if (table == null) {
|
||||
throw new UserException("unknown table, table=" + tblName);
|
||||
}
|
||||
tableIdList.add(table.getId());
|
||||
}
|
||||
// table ids is checked at step 1
|
||||
List<Long> tableIdList = request.getTableIds();
|
||||
|
||||
// step 5: get timeout
|
||||
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
|
||||
@ -2320,8 +2333,10 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
// step 4: fetch all tableIds
|
||||
// lookup tables && convert into tableIdList
|
||||
long tableId = -1;
|
||||
String tableName = request.getTable();
|
||||
if (!Strings.isNullOrEmpty(tableName)) {
|
||||
if (request.isSetTableId()) {
|
||||
tableId = request.getTableId();
|
||||
} else if (request.isSetTable()) {
|
||||
String tableName = request.getTable();
|
||||
Table table = db.getTableOrMetaException(tableName, TableType.OLAP);
|
||||
if (table == null) {
|
||||
throw new UserException("unknown table, table=" + tableName);
|
||||
|
||||
Reference in New Issue
Block a user