diff --git a/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java index 7a05fba64b..faf18c9b3e 100644 --- a/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java +++ b/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java @@ -41,7 +41,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * maintain the colocate table related indexes and meta */ public class ColocateTableIndex implements Writable { - private ReentrantReadWriteLock lock; + private transient ReentrantReadWriteLock lock; // group_id -> table_ids private Multimap group2Tables; @@ -174,6 +174,19 @@ public class ColocateTableIndex implements Writable { } } + public boolean isGroupExist(long groupId) { + readLock(); + try { + return group2DB.containsKey(groupId); + } finally { + readUnlock(); + } + } + + public Set getBalancingGroupIds() { + return balancingGroups; + } + public long getGroup(long tableId) { readLock(); try { @@ -184,10 +197,6 @@ public class ColocateTableIndex implements Writable { } } - public Set getBalancingGroupIds() { - return balancingGroups; - } - public Set getAllGroupIds() { readLock(); try { diff --git a/fe/src/main/java/org/apache/doris/http/BaseRequest.java b/fe/src/main/java/org/apache/doris/http/BaseRequest.java index 23e72525f6..24e8f9ddf6 100644 --- a/fe/src/main/java/org/apache/doris/http/BaseRequest.java +++ b/fe/src/main/java/org/apache/doris/http/BaseRequest.java @@ -21,6 +21,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Maps; import java.net.InetSocketAddress; +import java.nio.charset.Charset; import java.util.List; import java.util.Map; import java.util.Set; @@ -28,9 +29,11 @@ import java.util.Set; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.Cookie; import io.netty.handler.codec.http.CookieDecoder; +import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.QueryStringDecoder; +import org.apache.doris.common.DdlException; public class BaseRequest { protected ChannelHandlerContext context; @@ -115,6 +118,15 @@ public class BaseRequest { return params.get(key); } + + public String getContent() throws DdlException { + if (request instanceof FullHttpRequest) { + FullHttpRequest fullHttpRequest = (FullHttpRequest) request; + return fullHttpRequest.content().toString(Charset.forName("UTF-8")); + } else { + throw new DdlException("Invalid request"); + } + } // get an array parameter. // eg. ?a=1&a=2 diff --git a/fe/src/main/java/org/apache/doris/http/Http API Reference b/fe/src/main/java/org/apache/doris/http/Http API Reference new file mode 100644 index 0000000000..83ab4d31ef --- /dev/null +++ b/fe/src/main/java/org/apache/doris/http/Http API Reference @@ -0,0 +1,113 @@ +# Http API Reference +This page documents all of the API endpoints for Doris FE + + +## Colocate Meta +You can get the ids for table, group, db, backend by `List all colocate meta` or +Doris FE Web or `SHOW PROC` mysql command. + +The following API need Admin privilege. + +### List all colocate meta + +`Get /api/colocate` + +**Response Sample** + +``` +{ + "colocate_meta": { + "group2Tables": { + "372686": 372686, + "372686": 372700 + }, + "table2Groups": { + "372686": 372686, + "372700": 372686 + }, + "group2DBs": { + "372686": 10004 + }, + "group2BackendsPerBucketSeq": { + "372686": [ + [ + 10001, + 10002, + 10003 + ], + [ + 10001, + 10002, + 10003 + ], + [ + 10001, + 10002, + 10003 + ] + ] + }, + "balancingGroups": [] + }, + "status": "OK" +} +``` + +### Add table to colocate group +`POST /api/colocate/table_group` + +**Request Parameters** + +- table_id: the id for table +- group_id: the id for group (it's the same as colocate parent table id) +- db_id: the id for DB + +### Remove table from colocate group +`DELETE /api/colocate/table` + +**Request Parameters** + +- table_id: the id for table + +### Mark colocate group balancing +`POST /api/colocate/balancing_group` + +**Request Parameters** +- group_id: the id for group (it's the same as colocate parent table id) + +### Mark colocate group stable +`DELETE /api/colocate/balancing_group` + +**Request Parameters** +- group_id: the id for group (it's the same as colocate parent table id) + +### Update backendsPerBucketSeq meta +`POST /api/colocate/bucketseq` + +**Request Parameters** +- group_id: the id for group (it's the same as colocate parent table id) + +**Request Body** + +the json format content for backendsPerBucketSeq meta. the following is an example: + +``` +[ + [ + 10001, + 10002, + 10003 + ], + [ + 10001, + 10002, + 10003 + ], + [ + 10001, + 10002, + 10003 + ] +] +``` + diff --git a/fe/src/main/java/org/apache/doris/http/HttpServer.java b/fe/src/main/java/org/apache/doris/http/HttpServer.java index 06cdbf4359..9a5e212776 100644 --- a/fe/src/main/java/org/apache/doris/http/HttpServer.java +++ b/fe/src/main/java/org/apache/doris/http/HttpServer.java @@ -28,6 +28,8 @@ import org.apache.doris.http.action.SessionAction; import org.apache.doris.http.action.StaticResourceAction; import org.apache.doris.http.action.SystemAction; import org.apache.doris.http.action.VariableAction; +import org.apache.doris.http.common.DorisHttpPostObjectAggregator; +import org.apache.doris.http.meta.ColocateMetaService; import org.apache.doris.http.meta.MetaService.CheckAction; import org.apache.doris.http.meta.MetaService.DumpAction; import org.apache.doris.http.meta.MetaService.ImageAction; @@ -140,6 +142,11 @@ public class HttpServer { RowCountAction.registerAction(controller); CheckDecommissionAction.registerAction(controller); MetaReplayerCheckAction.registerAction(controller); + ColocateMetaService.BucketSeqAction.registerAction(controller); + ColocateMetaService.ColocateMetaAction.registerAction(controller); + ColocateMetaService.BalancingGroupAction.registerAction(controller); + ColocateMetaService.TableAction.registerAction(controller); + ColocateMetaService.TableGroupAction.registerAction(controller); // meta service action File imageDir = MetaHelper.getMasterImageDir(); @@ -163,8 +170,8 @@ public class HttpServer { protected class PaloHttpServerInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast("codec", new HttpServerCodec()); - // ch.pipeline().addLast("compressor", new HttpContentCompressor()); + ch.pipeline().addLast(new HttpServerCodec()); + ch.pipeline().addLast(new DorisHttpPostObjectAggregator(100 * 65536)); ch.pipeline().addLast(new ChunkedWriteHandler()); ch.pipeline().addLast(new HttpServerHandler(controller, qeService)); } diff --git a/fe/src/main/java/org/apache/doris/http/HttpServerHandler.java b/fe/src/main/java/org/apache/doris/http/HttpServerHandler.java index 43248f5e58..30e58d66b8 100644 --- a/fe/src/main/java/org/apache/doris/http/HttpServerHandler.java +++ b/fe/src/main/java/org/apache/doris/http/HttpServerHandler.java @@ -57,9 +57,6 @@ public class HttpServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); - if (action == null) { - ctx.close(); - } } @Override @@ -108,7 +105,7 @@ public class HttpServerHandler extends ChannelInboundHandlerAdapter { String uri = request.getRequest().uri(); // ignore this request, which is a default request from client's browser. if (uri.endsWith("/favicon.ico")) { - return null; + return NotFoundAction.getNotFoundAction(); } else if (uri.equals("/")) { return new IndexAction(controller); } diff --git a/fe/src/main/java/org/apache/doris/http/common/DorisHttpPostObjectAggregator.java b/fe/src/main/java/org/apache/doris/http/common/DorisHttpPostObjectAggregator.java new file mode 100644 index 0000000000..d904c421a3 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/http/common/DorisHttpPostObjectAggregator.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.http.common; + +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpMessage; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequest; + +/* + * only handle post request, avoid conflicting with {@link LoadAction} + * don't handle 100-continue header + */ +public class DorisHttpPostObjectAggregator extends HttpObjectAggregator { + private boolean startAggregated = false; + + public DorisHttpPostObjectAggregator(int maxContentLength) { + super(maxContentLength, false); + } + + @Override + protected boolean isStartMessage(HttpObject msg) throws Exception { + if (msg instanceof HttpMessage) { + HttpRequest request = (HttpRequest) msg; + if (request.method().equals(HttpMethod.POST)) { + startAggregated = true; + return true; + } + } + return false; + } + + @Override + protected boolean isContentMessage(HttpObject msg) throws Exception { + return msg instanceof HttpContent && startAggregated; + } + + // Doris FE needn't handle 100-continue header + @Override + protected Object newContinueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) { + return null; + } +} diff --git a/fe/src/main/java/org/apache/doris/http/meta/ColocateMetaService.java b/fe/src/main/java/org/apache/doris/http/meta/ColocateMetaService.java new file mode 100644 index 0000000000..bfa18c3f01 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/http/meta/ColocateMetaService.java @@ -0,0 +1,272 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.http.meta; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.ColocateTableIndex; +import org.apache.doris.catalog.Database; +import org.apache.doris.common.DdlException; +import org.apache.doris.http.ActionController; +import org.apache.doris.http.BaseRequest; +import org.apache.doris.http.BaseResponse; +import org.apache.doris.http.IllegalArgException; +import org.apache.doris.http.rest.RestBaseAction; +import org.apache.doris.http.rest.RestBaseResult; +import org.apache.doris.http.rest.RestResult; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.ColocatePersistInfo; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.lang.reflect.Type; + +/* + * the colocate meta define in {@link ColocateTableIndex} + */ +public class ColocateMetaService { + private static final Logger LOG = LogManager.getLogger(ColocateMetaService.class); + private static final String GROUP_ID = "group_id"; + private static final String TABLE_ID = "table_id"; + private static final String DB_ID = "db_id"; + + private static ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex(); + + private static long checkAndGetGroupId(BaseRequest request) throws DdlException { + long groupId = Long.valueOf(request.getSingleParameter(GROUP_ID).trim()); + if (!colocateIndex.isGroupExist(groupId)) { + throw new DdlException("the group " + groupId + "isn't exist"); + } + return groupId; + } + + private static long getTableId(BaseRequest request) throws DdlException { + return Long.valueOf(request.getSingleParameter(TABLE_ID).trim()); + } + + public static class ColocateMetaBaseAction extends RestBaseAction { + ColocateMetaBaseAction(ActionController controller) { + super(controller); + } + + @Override + public void executeWithoutPassword(AuthorizationInfo authInfo, BaseRequest request, BaseResponse response) + throws DdlException { + if (redirectToMaster(request, response)) { + return; + } + checkGlobalAuth(authInfo, PrivPredicate.ADMIN); + executeInMasterWithAdmin(authInfo, request, response); + } + + protected void executeInMasterWithAdmin(AuthorizationInfo authInfo, BaseRequest request, BaseResponse response) + throws DdlException { + throw new DdlException("Not implemented"); + } + } + + + // get all colocate meta + public static class ColocateMetaAction extends ColocateMetaBaseAction { + ColocateMetaAction(ActionController controller) { + super(controller); + } + + public static void registerAction(ActionController controller) throws IllegalArgException { + ColocateMetaAction action = new ColocateMetaAction(controller); + controller.registerHandler(HttpMethod.GET, "/api/colocate", action); + } + + @Override + public void executeInMasterWithAdmin(AuthorizationInfo authInfo, BaseRequest request, BaseResponse response) + throws DdlException { + response.setContentType("application/json"); + RestResult result = new RestResult(); + result.addResultEntry("colocate_meta", Catalog.getCurrentColocateIndex()); + sendResult(request, response, result); + } + } + + // add a table to a colocate group + public static class TableGroupAction extends ColocateMetaBaseAction { + TableGroupAction(ActionController controller) { + super(controller); + } + + public static void registerAction(ActionController controller) throws IllegalArgException { + TableGroupAction action = new TableGroupAction(controller); + controller.registerHandler(HttpMethod.POST, "/api/colocate/table_group", action); + } + + @Override + public void executeInMasterWithAdmin(AuthorizationInfo authInfo, BaseRequest request, BaseResponse response) + throws DdlException { + long groupId = checkAndGetGroupId(request); + long tableId = getTableId(request); + long dbId = Long.valueOf(request.getSingleParameter(DB_ID).trim()); + + Database database = Catalog.getInstance().getDb(dbId); + if (database == null) { + throw new DdlException("the db " + dbId + " isn't exist"); + } + if (database.getTable(tableId) == null) { + throw new DdlException("the table " + tableId + " isn't exist"); + } + if (database.getTable(groupId) == null) { + throw new DdlException("the parent table " + groupId + " isn't exist"); + } + + LOG.info("will add table {} to group {}", tableId, groupId); + colocateIndex.addTableToGroup(dbId, tableId, groupId); + ColocatePersistInfo info = ColocatePersistInfo.CreateForAddTable(tableId, groupId, dbId, new ArrayList<>()); + Catalog.getInstance().getEditLog().logColocateAddTable(info); + LOG.info("table {} has added to group {}", tableId, groupId); + + sendResult(request, response); + } + } + + // remove a table from a colocate group + public static class TableAction extends ColocateMetaBaseAction { + TableAction(ActionController controller) { + super(controller); + } + + public static void registerAction(ActionController controller) throws IllegalArgException { + TableAction action = new TableAction(controller); + controller.registerHandler(HttpMethod.DELETE, "/api/colocate/table", action); + } + + @Override + public void executeInMasterWithAdmin(AuthorizationInfo authInfo, BaseRequest request, BaseResponse response) + throws DdlException { + long tableId = getTableId(request); + + LOG.info("will delete table {} from colocate meta", tableId); + Catalog.getCurrentColocateIndex().removeTable(tableId); + ColocatePersistInfo colocateInfo = ColocatePersistInfo.CreateForRemoveTable(tableId); + Catalog.getInstance().getEditLog().logColocateRemoveTable(colocateInfo); + LOG.info("table {} has deleted from colocate meta", tableId); + + sendResult(request, response); + } + } + + // mark a colocate group to balancing or stable + public static class BalancingGroupAction extends ColocateMetaBaseAction { + BalancingGroupAction(ActionController controller) { + super(controller); + } + + public static void registerAction(ActionController controller) throws IllegalArgException { + BalancingGroupAction action = new BalancingGroupAction(controller); + controller.registerHandler(HttpMethod.POST, "/api/colocate/balancing_group", action); + controller.registerHandler(HttpMethod.DELETE, "/api/colocate/balancing_group", action); + } + + @Override + public void executeInMasterWithAdmin(AuthorizationInfo authInfo, BaseRequest request, BaseResponse response) + throws DdlException { + long groupId = checkAndGetGroupId(request); + + HttpMethod method = request.getRequest().method(); + if (method.equals(HttpMethod.POST)) { + colocateIndex.markGroupBalancing(groupId); + ColocatePersistInfo info = ColocatePersistInfo.CreateForMarkBalancing(groupId); + Catalog.getInstance().getEditLog().logColocateMarkBalancing(info); + LOG.info("mark colocate group {} balancing", groupId); + } else if (method.equals(HttpMethod.DELETE)) { + colocateIndex.markGroupStable(groupId); + ColocatePersistInfo info = ColocatePersistInfo.CreateForMarkStable(groupId); + Catalog.getInstance().getEditLog().logColocateMarkStable(info); + LOG.info("mark colocate group {} stable", groupId); + } else { + response.appendContent(new RestBaseResult("HTTP method is not allowed.").toJson()); + writeResponse(request, response, HttpResponseStatus.METHOD_NOT_ALLOWED); + } + + sendResult(request, response); + } + } + + // update a backendsPerBucketSeq meta for a colocate group + public static class BucketSeqAction extends ColocateMetaBaseAction { + private static final Logger LOG = LogManager.getLogger(BucketSeqAction.class); + + BucketSeqAction(ActionController controller) { + super(controller); + } + + public static void registerAction(ActionController controller) throws IllegalArgException { + BucketSeqAction action = new BucketSeqAction(controller); + controller.registerHandler(HttpMethod.POST, "/api/colocate/bucketseq", action); + } + + @Override + public void executeInMasterWithAdmin(AuthorizationInfo authInfo, BaseRequest request, BaseResponse response) + throws DdlException { + final String clusterName = authInfo.cluster; + if (Strings.isNullOrEmpty(clusterName)) { + throw new DdlException("No cluster selected."); + } + long groupId = checkAndGetGroupId(request); + + String meta = request.getContent(); + Type type = new TypeToken>>() {}.getType(); + List> backendsPerBucketSeq = new Gson().fromJson(meta, type); + LOG.info("HttpServer {}", backendsPerBucketSeq); + + List clusterBackendIds = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true); + //check the Backend id + for (List backendIds : backendsPerBucketSeq) { + for (Long beId : backendIds) { + if (!clusterBackendIds.contains(beId)) { + throw new DdlException("The backend " + beId + " is not exist or alive"); + } + } + } + + int metaSize = colocateIndex.getBackendsPerBucketSeq(groupId).size(); + Preconditions.checkState(backendsPerBucketSeq.size() == metaSize, + backendsPerBucketSeq.size() + " vs. " + metaSize); + updateBackendPerBucketSeq(groupId, backendsPerBucketSeq); + LOG.info("the group {} backendsPerBucketSeq meta has updated", groupId); + + sendResult(request, response); + } + + private void updateBackendPerBucketSeq(Long groupId, List> backendsPerBucketSeq) { + colocateIndex.markGroupBalancing(groupId); + ColocatePersistInfo info1 = ColocatePersistInfo.CreateForMarkBalancing(groupId); + Catalog.getInstance().getEditLog().logColocateMarkBalancing(info1); + + colocateIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq); + ColocatePersistInfo info2 = ColocatePersistInfo.CreateForBackendsPerBucketSeq(groupId, + backendsPerBucketSeq); + Catalog.getInstance().getEditLog().logColocateBackendsPerBucketSeq(info2); + } + } + +}