diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 688c331f4d..93f0cd8966 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1733,6 +1733,9 @@ public class Config extends ConfigBase { @ConfField public static boolean enable_pipeline_load = false; + @ConfField + public static boolean enable_resource_group = false; + @ConfField(mutable = false, masterOnly = true) public static int backend_rpc_timeout_ms = 60000; // 1 min diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index b4dc528fab..fbda3f252a 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -386,6 +386,7 @@ terminal String KW_GRAPH, KW_GROUP, KW_GROUPING, + KW_GROUPS, KW_HASH, KW_HAVING, KW_HDFS, @@ -1941,10 +1942,20 @@ create_stmt ::= {: RESULT = new AlterTableStmt(tableName, Lists.newArrayList(new CreateIndexClause(tableName, new IndexDef(indexName, ifNotExists, cols, indexType, properties, comment), false))); :} - /* resource */ - | KW_CREATE opt_external:isExternal KW_RESOURCE opt_if_not_exists:ifNotExists ident_or_text:resourceName opt_properties:properties + /* external resource */ + | KW_CREATE KW_EXTERNAL KW_RESOURCE opt_if_not_exists:ifNotExists ident_or_text:resourceName opt_properties:properties {: - RESULT = new CreateResourceStmt(isExternal, ifNotExists, resourceName, properties); + RESULT = new CreateResourceStmt(true, ifNotExists, resourceName, properties); + :} + /* resource */ + | KW_CREATE KW_RESOURCE opt_if_not_exists:ifNotExists ident_or_text:resourceName opt_properties:properties + {: + RESULT = new CreateResourceStmt(false, ifNotExists, resourceName, properties); + :} + /* resource group*/ + | KW_CREATE KW_RESOURCE KW_GROUP opt_if_not_exists:ifNotExists ident_or_text:resourceGroupName opt_properties:properties + {: + RESULT = new CreateResourceGroupStmt(ifNotExists, resourceGroupName, properties); :} /* encryptkey */ | KW_CREATE KW_ENCRYPTKEY opt_if_not_exists:ifNotExists encryptkey_name:keyName KW_AS STRING_LITERAL:keyString @@ -3853,6 +3864,10 @@ show_param ::= {: RESULT = new ShowResourcesStmt(parser.where, orderByClause, limitClause); :} + | KW_RESOURCE KW_GROUPS + {: + RESULT = new ShowResourceGroupsStmt(); + :} | KW_BACKENDS {: RESULT = new ShowBackendsStmt(); @@ -7286,6 +7301,8 @@ keyword ::= {: RESULT = id; :} | KW_GROUPING:id {: RESULT = id; :} + | KW_GROUPS:id + {: RESULT = id; :} | KW_DYNAMIC:id {: RESULT = id; :} | time_unit:id diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 1a9fa518dd..142e019251 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -69,6 +69,7 @@ import org.apache.doris.rewrite.mvrewrite.ExprToSlotRefRule; import org.apache.doris.rewrite.mvrewrite.HLLHashToSlotRefRule; import org.apache.doris.rewrite.mvrewrite.NDVToHll; import org.apache.doris.rewrite.mvrewrite.ToBitmapToSlotRefRule; +import org.apache.doris.thrift.TPipelineResourceGroup; import org.apache.doris.thrift.TQueryGlobals; import com.google.common.base.Joiner; @@ -402,6 +403,8 @@ public class Analyzer { private final Map> migrateFailedConjuncts = Maps.newHashMap(); + public List tResourceGroups; + public GlobalState(Env env, ConnectContext context) { this.env = env; this.context = context; @@ -580,6 +583,14 @@ public class Analyzer { return explicitViewAlias; } + public void setResourceGroups(List tResourceGroups) { + globalState.tResourceGroups = tResourceGroups; + } + + public List getResourceGroups() { + return globalState.tResourceGroups; + } + /** * Registers a local view definition with this analyzer. Throws an exception if a view * definition with the same alias has already been registered or if the number of diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateResourceGroupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateResourceGroupStmt.java new file mode 100644 index 0000000000..b09de46412 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateResourceGroupStmt.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeNameFormat; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; + +import java.util.Map; + +public class CreateResourceGroupStmt extends DdlStmt { + + private final boolean ifNotExists; + + private final String resourceGroupName; + private final Map properties; + + public CreateResourceGroupStmt(boolean ifNotExists, String resourceGroupName, Map properties) { + this.ifNotExists = ifNotExists; + this.resourceGroupName = resourceGroupName; + this.properties = properties; + } + + public boolean isIfNotExists() { + return ifNotExists; + } + + public String getResourceGroupName() { + return resourceGroupName; + } + + public Map getProperties() { + return properties; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + + // check auth + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + + // check name + FeNameFormat.checkResourceGroupName(resourceGroupName); + + if (properties == null || properties.isEmpty()) { + throw new AnalysisException("Resource group properties can't be null"); + } + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("CREATE "); + sb.append("RESOURCE GROUP '").append(resourceGroupName).append("' "); + sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")"); + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java index 49d7311845..88cc3798c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java @@ -25,7 +25,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.ParseUtil; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.mysql.privilege.UserResource; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.GlobalVariable; import org.apache.doris.qe.SessionVariable; @@ -116,13 +115,6 @@ public class SetVar { result = (LiteralExpr) literalExpr; - // Need to check if group is valid - if (variable.equalsIgnoreCase(SessionVariable.RESOURCE_VARIABLE)) { - if (result != null && !UserResource.isValidGroup(result.getStringValue())) { - throw new AnalysisException("Invalid resource group, now we support {low, normal, high}."); - } - } - if (variable.equalsIgnoreCase(GlobalVariable.DEFAULT_ROWSET_TYPE)) { if (result != null && !HeartbeatFlags.isValidRowsetType(result.getStringValue())) { throw new AnalysisException("Invalid rowset type, now we support {alpha, beta}."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowResourceGroupsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowResourceGroupsStmt.java new file mode 100644 index 0000000000..41a2d34d47 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowResourceGroupsStmt.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.resource.resourcegroup.ResourceGroupMgr; + +public class ShowResourceGroupsStmt extends ShowStmt { + + public ShowResourceGroupsStmt() {} + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + } + + @Override + public String toSql() { + return "SHOW RESOURCE GROUPS"; + } + + @Override + public String toString() { + return toSql(); + } + + @Override + public ShowResultSetMetaData getMetaData() { + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + for (String title : ResourceGroupMgr.RESOURCE_GROUP_PROC_NODE_TITLE_NAMES) { + builder.addColumn(new Column(title, ScalarType.createVarchar(30))); + } + return builder.build(); + } + + @Override + public RedirectStatus getRedirectStatus() { + if (ConnectContext.get().getSessionVariable().getForwardToMaster()) { + return RedirectStatus.FORWARD_NO_SYNC; + } else { + return RedirectStatus.NO_FORWARD; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index a68c2e573a..d1bea53bc1 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -211,6 +211,7 @@ import org.apache.doris.qe.GlobalVariable; import org.apache.doris.qe.JournalObservable; import org.apache.doris.qe.VariableMgr; import org.apache.doris.resource.Tag; +import org.apache.doris.resource.resourcegroup.ResourceGroupMgr; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.AnalysisTaskScheduler; @@ -447,6 +448,8 @@ public class Env { private AtomicLong stmtIdCounter; + private ResourceGroupMgr resourceGroupMgr; + public List getFrontends(FrontendNodeType nodeType) { if (nodeType == null) { // get all @@ -649,6 +652,7 @@ public class Env { this.analysisManager = new AnalysisManager(); } this.globalFunctionMgr = new GlobalFunctionMgr(); + this.resourceGroupMgr = new ResourceGroupMgr(); } public static void destroyCheckpoint() { @@ -716,6 +720,10 @@ public class Env { return auditEventProcessor; } + public ResourceGroupMgr getResourceGroupMgr() { + return resourceGroupMgr; + } + // use this to get correct ClusterInfoService instance public static SystemInfoService getCurrentSystemInfo() { return getCurrentEnv().getClusterInfo(); @@ -1314,6 +1322,8 @@ public class Env { Config.rpc_port); editLog.logMasterInfo(masterInfo); + this.resourceGroupMgr.init(); + // for master, the 'isReady' is set behind. // but we are sure that all metadata is replayed if we get here. // so no need to check 'isReady' flag in this method @@ -1880,6 +1890,12 @@ public class Env { return checksum; } + public long loadResourceGroups(DataInputStream in, long checksum) throws IOException { + resourceGroupMgr = ResourceGroupMgr.read(in); + LOG.info("finished replay resource groups from image"); + return checksum; + } + public long loadSmallFiles(DataInputStream in, long checksum) throws IOException { smallFileMgr.readFields(in); LOG.info("finished replay smallFiles from image"); @@ -2145,6 +2161,11 @@ public class Env { return checksum; } + public long saveResourceGroups(CountingDataOutputStream dos, long checksum) throws IOException { + Env.getCurrentEnv().getResourceGroupMgr().write(dos); + return checksum; + } + public long saveSmallFiles(CountingDataOutputStream dos, long checksum) throws IOException { smallFileMgr.write(dos); return checksum; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java index 1e6eb0f195..71192a235d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java @@ -129,6 +129,10 @@ public class FeNameFormat { checkCommonName("resource", resourceName); } + public static void checkResourceGroupName(String resourceGroupName) throws AnalysisException { + checkCommonName("resource group", resourceGroupName); + } + public static void checkCommonName(String type, String name) throws AnalysisException { if (Strings.isNullOrEmpty(name) || !name.matches(getCommonNameRegex())) { ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_NAME_FORMAT, type, name); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 9c08ff555e..bd748b0d3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -113,6 +113,7 @@ import org.apache.doris.plugin.PluginInfo; import org.apache.doris.policy.DropPolicyLog; import org.apache.doris.policy.Policy; import org.apache.doris.policy.StoragePolicy; +import org.apache.doris.resource.resourcegroup.ResourceGroup; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.transaction.TransactionState; @@ -801,6 +802,11 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_CREATE_RESOURCE_GROUP: { + data = ResourceGroup.read(in); + isRead = true; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 9f53e48746..af7d4d9143 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -79,6 +79,7 @@ import org.apache.doris.plugin.PluginInfo; import org.apache.doris.policy.DropPolicyLog; import org.apache.doris.policy.Policy; import org.apache.doris.policy.StoragePolicy; +import org.apache.doris.resource.resourcegroup.ResourceGroup; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.transaction.TransactionState; @@ -1001,6 +1002,11 @@ public class EditLog { env.getCatalogMgr().replayRefreshExternalPartitions(log); break; } + case OperationType.OP_CREATE_RESOURCE_GROUP: { + final ResourceGroup resourceGroup = (ResourceGroup) journal.getData(); + env.getResourceGroupMgr().replayCreateResourceGroup(resourceGroup); + break; + } case OperationType.OP_INIT_EXTERNAL_TABLE: { // Do nothing. break; @@ -1548,6 +1554,10 @@ public class EditLog { logEdit(OperationType.OP_ALTER_RESOURCE, resource); } + public void logCreateResourceGroup(ResourceGroup resourceGroup) { + logEdit(OperationType.OP_CREATE_RESOURCE_GROUP, resourceGroup); + } + public void logAlterStoragePolicy(StoragePolicy storagePolicy) { logEdit(OperationType.OP_ALTER_STORAGE_POLICY, storagePolicy); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index ee6496f173..a4eeacbec8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -280,6 +280,9 @@ public class OperationType { public static final short OP_UPDATE_COOLDOWN_CONF = 401; public static final short OP_COOLDOWN_DELETE = 402; + // resource group 410 ~ 419 + public static final short OP_CREATE_RESOURCE_GROUP = 410; + /** * Get opcode name by op code. **/ diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java index a55116bc2e..7b151198cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java @@ -209,6 +209,12 @@ public class MetaPersistMethod { metaPersistMethod.writeMethod = Env.class.getDeclaredMethod("saveGlobalFunction", CountingDataOutputStream.class, long.class); break; + case "resourceGroups": + metaPersistMethod.readMethod = + Env.class.getDeclaredMethod("loadResourceGroups", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Env.class.getDeclaredMethod("saveResourceGroups", CountingDataOutputStream.class, long.class); + break; default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java index 953a3f3cb1..257fe982f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java @@ -38,7 +38,7 @@ public class PersistMetaModules { "masterInfo", "frontends", "backends", "datasource", "db", "alterJob", "recycleBin", "globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler", "paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles", - "plugins", "deleteHandler", "sqlBlockRule", "policy", "mtmvJobManager", "globalFunction"); + "plugins", "deleteHandler", "sqlBlockRule", "policy", "mtmvJobManager", "globalFunction", "resourceGroups"); static { MODULES_MAP = Maps.newHashMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 6615198f22..dc13f74829 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -86,6 +86,7 @@ import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TPipelineFragmentParamsList; import org.apache.doris.thrift.TPipelineInstanceParams; +import org.apache.doris.thrift.TPipelineResourceGroup; import org.apache.doris.thrift.TPlanFragmentDestination; import org.apache.doris.thrift.TPlanFragmentExecParams; import org.apache.doris.thrift.TQueryGlobals; @@ -265,6 +266,8 @@ public class Coordinator { private StatsErrorEstimator statsErrorEstimator; + private List tResourceGroups = Lists.newArrayList(); + private static class BackendHash implements Funnel { @Override public void funnel(Backend backend, PrimitiveSink primitiveSink) { @@ -346,6 +349,7 @@ public class Coordinator { nextInstanceId.setHi(queryId.hi); nextInstanceId.setLo(queryId.lo + 1); this.assignedRuntimeFilters = planner.getRuntimeFilters(); + this.tResourceGroups = analyzer == null ? null : analyzer.getResourceGroups(); } // Used for broker load task/export task/update coordinator @@ -3147,6 +3151,9 @@ public class Coordinator { params.setFragment(fragment.toThrift()); params.setLocalParams(Lists.newArrayList()); params.setSharedScanOpt(sharedScanOpt); + if (tResourceGroups != null) { + params.setResourceGroups(tResourceGroups); + } res.put(instanceExecParam.host, params); } TPipelineFragmentParams params = res.get(instanceExecParam.host); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index af82985471..0110292e7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -62,6 +62,7 @@ import org.apache.doris.analysis.CreateMaterializedViewStmt; import org.apache.doris.analysis.CreateMultiTableMaterializedViewStmt; import org.apache.doris.analysis.CreatePolicyStmt; import org.apache.doris.analysis.CreateRepositoryStmt; +import org.apache.doris.analysis.CreateResourceGroupStmt; import org.apache.doris.analysis.CreateResourceStmt; import org.apache.doris.analysis.CreateRoleStmt; import org.apache.doris.analysis.CreateRoutineLoadStmt; @@ -259,6 +260,8 @@ public class DdlExecutor { env.getResourceMgr().createResource((CreateResourceStmt) ddlStmt); } else if (ddlStmt instanceof DropResourceStmt) { env.getResourceMgr().dropResource((DropResourceStmt) ddlStmt); + } else if (ddlStmt instanceof CreateResourceGroupStmt) { + env.getResourceGroupMgr().createResourceGroup((CreateResourceGroupStmt) ddlStmt); } else if (ddlStmt instanceof CreateDataSyncJobStmt) { CreateDataSyncJobStmt createSyncJobStmt = (CreateDataSyncJobStmt) ddlStmt; SyncJobManager syncJobMgr = env.getSyncJobManager(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 709bea5b31..0ce5bcd054 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -74,6 +74,7 @@ import org.apache.doris.analysis.ShowProcStmt; import org.apache.doris.analysis.ShowProcesslistStmt; import org.apache.doris.analysis.ShowQueryProfileStmt; import org.apache.doris.analysis.ShowRepositoriesStmt; +import org.apache.doris.analysis.ShowResourceGroupsStmt; import org.apache.doris.analysis.ShowResourcesStmt; import org.apache.doris.analysis.ShowRestoreStmt; import org.apache.doris.analysis.ShowRolesStmt; @@ -324,6 +325,8 @@ public class ShowExecutor { handleShowBroker(); } else if (stmt instanceof ShowResourcesStmt) { handleShowResources(); + } else if (stmt instanceof ShowResourceGroupsStmt) { + handleShowResourceGroups(); } else if (stmt instanceof ShowExportStmt) { handleShowExport(); } else if (stmt instanceof ShowBackendsStmt) { @@ -1796,6 +1799,13 @@ public class ShowExecutor { resultSet = new ShowResultSet(showStmt.getMetaData(), rows); } + private void handleShowResourceGroups() { + ShowResourceGroupsStmt showStmt = (ShowResourceGroupsStmt) stmt; + List> resourceGroupsInfos = Env.getCurrentEnv().getResourceGroupMgr().getResourcesInfo(); + + resultSet = new ShowResultSet(showStmt.getMetaData(), resourceGroupsInfos); + } + private void handleShowExport() throws AnalysisException { ShowExportStmt showExportStmt = (ShowExportStmt) stmt; Env env = Env.getCurrentEnv(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 43985ae290..6f615a6260 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -869,6 +869,10 @@ public class StmtExecutor implements ProfileWriter { if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt || parsedStmt instanceof CreateTableAsSelectStmt) { + if (Config.enable_resource_group && context.sessionVariable.enablePipelineEngine()) { + analyzer.setResourceGroups(analyzer.getEnv().getResourceGroupMgr() + .getResourceGroup(context.sessionVariable.resourceGroup)); + } Map tableMap = Maps.newTreeMap(); QueryStmt queryStmt; Set parentViewNameSet = Sets.newHashSet(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java new file mode 100644 index 0000000000..92f02fecb9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.resourcegroup; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.proc.BaseProcResult; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TPipelineResourceGroup; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang.StringUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +public class ResourceGroup implements Writable { + + public static final String CPU_SHARE = "cpu_share"; + + private static final ImmutableSet REQUIRED_PROPERTIES_NAME = new ImmutableSet.Builder().add( + CPU_SHARE).build(); + + private static final ImmutableSet ALL_PROPERTIES_NAME = new ImmutableSet.Builder().add( + CPU_SHARE).build(); + + @SerializedName(value = "id") + private long id; + + @SerializedName(value = "name") + private String name; + + @SerializedName(value = "properties") + private Map properties; + + // Version update required after alter resource group + @SerializedName(value = "version") + private long version; + + private ResourceGroup(long id, String name, Map properties) { + this.id = id; + this.name = name; + this.properties = properties; + this.version = 0; + } + + public static ResourceGroup createResourceGroup(String name, Map properties) throws DdlException { + checkProperties(properties); + return new ResourceGroup(Env.getCurrentEnv().getNextId(), name, properties); + } + + private static void checkProperties(Map properties) throws DdlException { + for (String propertyName : properties.keySet()) { + if (!ALL_PROPERTIES_NAME.contains(propertyName)) { + throw new DdlException("Property " + propertyName + " is not supported."); + } + } + for (String propertyName : REQUIRED_PROPERTIES_NAME) { + if (!properties.containsKey(propertyName)) { + throw new DdlException("Property " + propertyName + " is required."); + } + } + + String cpuSchedulingWeight = properties.get(CPU_SHARE); + if (!StringUtils.isNumeric(cpuSchedulingWeight) || Long.parseLong(cpuSchedulingWeight) <= 0) { + throw new DdlException(CPU_SHARE + " requires a positive integer."); + } + } + + public long getId() { + return id; + } + + public String getName() { + return name; + } + + public Map getProperties() { + return properties; + } + + public void getProcNodeData(BaseProcResult result) { + for (Map.Entry entry : properties.entrySet()) { + result.addRow(Lists.newArrayList(String.valueOf(id), name, entry.getKey(), entry.getValue())); + } + } + + @Override + public String toString() { + return GsonUtils.GSON.toJson(this); + } + + public TPipelineResourceGroup toThrift() { + return new TPipelineResourceGroup().setId(id).setName(name).setProperties(properties).setVersion(version); + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static ResourceGroup read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, ResourceGroup.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java new file mode 100644 index 0000000000..058eee4471 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.resourcegroup; + +import org.apache.doris.analysis.CreateResourceGroupStmt; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.proc.BaseProcResult; +import org.apache.doris.common.proc.ProcNodeInterface; +import org.apache.doris.common.proc.ProcResult; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TPipelineResourceGroup; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class ResourceGroupMgr implements Writable, GsonPostProcessable { + + private static final Logger LOG = LogManager.getLogger(ResourceGroupMgr.class); + + public static final String DEFAULT_GROUP_NAME = "normal"; + + public static final ImmutableList RESOURCE_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder() + .add("Id").add("Name").add("Item").add("Value") + .build(); + + @SerializedName(value = "idToResourceGroup") + private final Map idToResourceGroup = Maps.newHashMap(); + + private final Map nameToResourceGroup = Maps.newHashMap(); + + private final ResourceProcNode procNode = new ResourceProcNode(); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + public ResourceGroupMgr() { + } + + private void readLock() { + lock.readLock().lock(); + } + + private void readUnlock() { + lock.readLock().unlock(); + } + + private void writeLock() { + lock.writeLock().lock(); + } + + private void writeUnlock() { + lock.writeLock().unlock(); + } + + public void init() { + if (Config.enable_resource_group) { + checkAndCreateDefaultGroup(); + } + } + + public List getResourceGroup(String groupName) throws UserException { + List resourceGroups = Lists.newArrayList(); + readLock(); + try { + ResourceGroup resourceGroup = nameToResourceGroup.get(groupName); + if (resourceGroup == null) { + throw new UserException("Resource group " + groupName + " does not exist"); + } + // need to check resource group privs + resourceGroups.add(resourceGroup.toThrift()); + } finally { + readUnlock(); + } + return resourceGroups; + } + + private void checkAndCreateDefaultGroup() { + ResourceGroup defaultResourceGroup = null; + writeLock(); + try { + if (nameToResourceGroup.containsKey(DEFAULT_GROUP_NAME)) { + return; + } + Map properties = Maps.newHashMap(); + properties.put(ResourceGroup.CPU_SHARE, "10"); + defaultResourceGroup = ResourceGroup.createResourceGroup(DEFAULT_GROUP_NAME, properties); + nameToResourceGroup.put(DEFAULT_GROUP_NAME, defaultResourceGroup); + idToResourceGroup.put(defaultResourceGroup.getId(), defaultResourceGroup); + Env.getCurrentEnv().getEditLog().logCreateResourceGroup(defaultResourceGroup); + } catch (DdlException e) { + LOG.warn("Create resource group " + DEFAULT_GROUP_NAME + " fail"); + } finally { + writeUnlock(); + } + LOG.info("Create resource group success: {}", defaultResourceGroup); + } + + public void createResourceGroup(CreateResourceGroupStmt stmt) throws DdlException { + ResourceGroup resourceGroup = ResourceGroup.createResourceGroup(stmt.getResourceGroupName(), + stmt.getProperties()); + String resourceGroupNameName = resourceGroup.getName(); + writeLock(); + try { + if (nameToResourceGroup.putIfAbsent(resourceGroupNameName, resourceGroup) != null) { + if (stmt.isIfNotExists()) { + return; + } + throw new DdlException("Resource group " + resourceGroupNameName + " already exist"); + } + idToResourceGroup.put(resourceGroup.getId(), resourceGroup); + Env.getCurrentEnv().getEditLog().logCreateResourceGroup(resourceGroup); + } finally { + writeUnlock(); + } + LOG.info("Create resource group success: {}", resourceGroup); + } + + public void replayCreateResourceGroup(ResourceGroup resourceGroup) { + writeLock(); + try { + nameToResourceGroup.put(resourceGroup.getName(), resourceGroup); + idToResourceGroup.put(resourceGroup.getId(), resourceGroup); + } finally { + writeUnlock(); + } + } + + public List> getResourcesInfo() { + return procNode.fetchResult().getRows(); + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static ResourceGroupMgr read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, ResourceGroupMgr.class); + } + + @Override + public void gsonPostProcess() throws IOException { + idToResourceGroup.forEach( + (id, resourceGroup) -> nameToResourceGroup.put(resourceGroup.getName(), resourceGroup)); + } + + public class ResourceProcNode implements ProcNodeInterface { + @Override + public ProcResult fetchResult() { + BaseProcResult result = new BaseProcResult(); + result.setNames(RESOURCE_GROUP_PROC_NODE_TITLE_NAMES); + readLock(); + try { + for (ResourceGroup resourceGroup : idToResourceGroup.values()) { + // need to check resource group privs + resourceGroup.getProcNodeData(result); + } + } finally { + readUnlock(); + } + return result; + } + } +} diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index 1ab037e4af..85cad5f058 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -244,6 +244,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("graph", new Integer(SqlParserSymbols.KW_GRAPH)); keywordMap.put("group", new Integer(SqlParserSymbols.KW_GROUP)); keywordMap.put("grouping", new Integer(SqlParserSymbols.KW_GROUPING)); + keywordMap.put("groups", new Integer(SqlParserSymbols.KW_GROUPS)); keywordMap.put("hash", new Integer(SqlParserSymbols.KW_HASH)); keywordMap.put("having", new Integer(SqlParserSymbols.KW_HAVING)); keywordMap.put("hdfs", new Integer(SqlParserSymbols.KW_HDFS)); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index dc1a947ef6..1807ef1dd5 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -576,6 +576,13 @@ struct TPipelineInstanceParams { 6: optional i32 backend_num } +struct TPipelineResourceGroup { + 1: optional i64 id + 2: optional string name + 3: optional map properties + 4: optional i64 version +} + // ExecPlanFragment struct TPipelineFragmentParams { 1: required PaloInternalServiceVersion protocol_version @@ -604,6 +611,7 @@ struct TPipelineFragmentParams { 23: optional Planner.TPlanFragment fragment 24: list local_params 25: optional bool shared_scan_opt = false; + 26: optional list resource_groups } struct TPipelineFragmentParamsList {