Add ColocateMetaService (#562)

This commit is contained in:
kangkaisen
2019-01-24 11:20:12 +08:00
committed by ZHAO Chun
parent 2912a16b16
commit aeb89ab4d3
7 changed files with 482 additions and 11 deletions

View File

@ -41,7 +41,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* maintain the colocate table related indexes and meta
*/
public class ColocateTableIndex implements Writable {
private ReentrantReadWriteLock lock;
private transient ReentrantReadWriteLock lock;
// group_id -> table_ids
private Multimap<Long, Long> group2Tables;
@ -174,6 +174,19 @@ public class ColocateTableIndex implements Writable {
}
}
public boolean isGroupExist(long groupId) {
readLock();
try {
return group2DB.containsKey(groupId);
} finally {
readUnlock();
}
}
public Set<Long> getBalancingGroupIds() {
return balancingGroups;
}
public long getGroup(long tableId) {
readLock();
try {
@ -184,10 +197,6 @@ public class ColocateTableIndex implements Writable {
}
}
public Set<Long> getBalancingGroupIds() {
return balancingGroups;
}
public Set<Long> getAllGroupIds() {
readLock();
try {

View File

@ -21,6 +21,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -28,9 +29,11 @@ import java.util.Set;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.Cookie;
import io.netty.handler.codec.http.CookieDecoder;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.QueryStringDecoder;
import org.apache.doris.common.DdlException;
public class BaseRequest {
protected ChannelHandlerContext context;
@ -115,6 +118,15 @@ public class BaseRequest {
return params.get(key);
}
public String getContent() throws DdlException {
if (request instanceof FullHttpRequest) {
FullHttpRequest fullHttpRequest = (FullHttpRequest) request;
return fullHttpRequest.content().toString(Charset.forName("UTF-8"));
} else {
throw new DdlException("Invalid request");
}
}
// get an array parameter.
// eg. ?a=1&a=2

View File

@ -0,0 +1,113 @@
# Http API Reference
This page documents all of the API endpoints for Doris FE
## Colocate Meta
You can get the ids for table, group, db, backend by `List all colocate meta` or
Doris FE Web or `SHOW PROC` mysql command.
The following API need Admin privilege.
### List all colocate meta
`Get /api/colocate`
**Response Sample**
```
{
"colocate_meta": {
"group2Tables": {
"372686": 372686,
"372686": 372700
},
"table2Groups": {
"372686": 372686,
"372700": 372686
},
"group2DBs": {
"372686": 10004
},
"group2BackendsPerBucketSeq": {
"372686": [
[
10001,
10002,
10003
],
[
10001,
10002,
10003
],
[
10001,
10002,
10003
]
]
},
"balancingGroups": []
},
"status": "OK"
}
```
### Add table to colocate group
`POST /api/colocate/table_group`
**Request Parameters**
- table_id: the id for table
- group_id: the id for group (it's the same as colocate parent table id)
- db_id: the id for DB
### Remove table from colocate group
`DELETE /api/colocate/table`
**Request Parameters**
- table_id: the id for table
### Mark colocate group balancing
`POST /api/colocate/balancing_group`
**Request Parameters**
- group_id: the id for group (it's the same as colocate parent table id)
### Mark colocate group stable
`DELETE /api/colocate/balancing_group`
**Request Parameters**
- group_id: the id for group (it's the same as colocate parent table id)
### Update backendsPerBucketSeq meta
`POST /api/colocate/bucketseq`
**Request Parameters**
- group_id: the id for group (it's the same as colocate parent table id)
**Request Body**
the json format content for backendsPerBucketSeq meta. the following is an example:
```
[
[
10001,
10002,
10003
],
[
10001,
10002,
10003
],
[
10001,
10002,
10003
]
]
```

View File

@ -28,6 +28,8 @@ import org.apache.doris.http.action.SessionAction;
import org.apache.doris.http.action.StaticResourceAction;
import org.apache.doris.http.action.SystemAction;
import org.apache.doris.http.action.VariableAction;
import org.apache.doris.http.common.DorisHttpPostObjectAggregator;
import org.apache.doris.http.meta.ColocateMetaService;
import org.apache.doris.http.meta.MetaService.CheckAction;
import org.apache.doris.http.meta.MetaService.DumpAction;
import org.apache.doris.http.meta.MetaService.ImageAction;
@ -140,6 +142,11 @@ public class HttpServer {
RowCountAction.registerAction(controller);
CheckDecommissionAction.registerAction(controller);
MetaReplayerCheckAction.registerAction(controller);
ColocateMetaService.BucketSeqAction.registerAction(controller);
ColocateMetaService.ColocateMetaAction.registerAction(controller);
ColocateMetaService.BalancingGroupAction.registerAction(controller);
ColocateMetaService.TableAction.registerAction(controller);
ColocateMetaService.TableGroupAction.registerAction(controller);
// meta service action
File imageDir = MetaHelper.getMasterImageDir();
@ -163,8 +170,8 @@ public class HttpServer {
protected class PaloHttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("codec", new HttpServerCodec());
// ch.pipeline().addLast("compressor", new HttpContentCompressor());
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new DorisHttpPostObjectAggregator(100 * 65536));
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpServerHandler(controller, qeService));
}

View File

@ -57,9 +57,6 @@ public class HttpServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
if (action == null) {
ctx.close();
}
}
@Override
@ -108,7 +105,7 @@ public class HttpServerHandler extends ChannelInboundHandlerAdapter {
String uri = request.getRequest().uri();
// ignore this request, which is a default request from client's browser.
if (uri.endsWith("/favicon.ico")) {
return null;
return NotFoundAction.getNotFoundAction();
} else if (uri.equals("/")) {
return new IndexAction(controller);
}

View File

@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.http.common;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
/*
* only handle post request, avoid conflicting with {@link LoadAction}
* don't handle 100-continue header
*/
public class DorisHttpPostObjectAggregator extends HttpObjectAggregator {
private boolean startAggregated = false;
public DorisHttpPostObjectAggregator(int maxContentLength) {
super(maxContentLength, false);
}
@Override
protected boolean isStartMessage(HttpObject msg) throws Exception {
if (msg instanceof HttpMessage) {
HttpRequest request = (HttpRequest) msg;
if (request.method().equals(HttpMethod.POST)) {
startAggregated = true;
return true;
}
}
return false;
}
@Override
protected boolean isContentMessage(HttpObject msg) throws Exception {
return msg instanceof HttpContent && startAggregated;
}
// Doris FE needn't handle 100-continue header
@Override
protected Object newContinueResponse(HttpMessage start, int maxContentLength, ChannelPipeline pipeline) {
return null;
}
}

View File

@ -0,0 +1,272 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.http.meta;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.DdlException;
import org.apache.doris.http.ActionController;
import org.apache.doris.http.BaseRequest;
import org.apache.doris.http.BaseResponse;
import org.apache.doris.http.IllegalArgException;
import org.apache.doris.http.rest.RestBaseAction;
import org.apache.doris.http.rest.RestBaseResult;
import org.apache.doris.http.rest.RestResult;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.lang.reflect.Type;
/*
* the colocate meta define in {@link ColocateTableIndex}
*/
public class ColocateMetaService {
private static final Logger LOG = LogManager.getLogger(ColocateMetaService.class);
private static final String GROUP_ID = "group_id";
private static final String TABLE_ID = "table_id";
private static final String DB_ID = "db_id";
private static ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
private static long checkAndGetGroupId(BaseRequest request) throws DdlException {
long groupId = Long.valueOf(request.getSingleParameter(GROUP_ID).trim());
if (!colocateIndex.isGroupExist(groupId)) {
throw new DdlException("the group " + groupId + "isn't exist");
}
return groupId;
}
private static long getTableId(BaseRequest request) throws DdlException {
return Long.valueOf(request.getSingleParameter(TABLE_ID).trim());
}
public static class ColocateMetaBaseAction extends RestBaseAction {
ColocateMetaBaseAction(ActionController controller) {
super(controller);
}
@Override
public void executeWithoutPassword(AuthorizationInfo authInfo, BaseRequest request, BaseResponse response)
throws DdlException {
if (redirectToMaster(request, response)) {
return;
}
checkGlobalAuth(authInfo, PrivPredicate.ADMIN);
executeInMasterWithAdmin(authInfo, request, response);
}
protected void executeInMasterWithAdmin(AuthorizationInfo authInfo, BaseRequest request, BaseResponse response)
throws DdlException {
throw new DdlException("Not implemented");
}
}
// get all colocate meta
public static class ColocateMetaAction extends ColocateMetaBaseAction {
ColocateMetaAction(ActionController controller) {
super(controller);
}
public static void registerAction(ActionController controller) throws IllegalArgException {
ColocateMetaAction action = new ColocateMetaAction(controller);
controller.registerHandler(HttpMethod.GET, "/api/colocate", action);
}
@Override
public void executeInMasterWithAdmin(AuthorizationInfo authInfo, BaseRequest request, BaseResponse response)
throws DdlException {
response.setContentType("application/json");
RestResult result = new RestResult();
result.addResultEntry("colocate_meta", Catalog.getCurrentColocateIndex());
sendResult(request, response, result);
}
}
// add a table to a colocate group
public static class TableGroupAction extends ColocateMetaBaseAction {
TableGroupAction(ActionController controller) {
super(controller);
}
public static void registerAction(ActionController controller) throws IllegalArgException {
TableGroupAction action = new TableGroupAction(controller);
controller.registerHandler(HttpMethod.POST, "/api/colocate/table_group", action);
}
@Override
public void executeInMasterWithAdmin(AuthorizationInfo authInfo, BaseRequest request, BaseResponse response)
throws DdlException {
long groupId = checkAndGetGroupId(request);
long tableId = getTableId(request);
long dbId = Long.valueOf(request.getSingleParameter(DB_ID).trim());
Database database = Catalog.getInstance().getDb(dbId);
if (database == null) {
throw new DdlException("the db " + dbId + " isn't exist");
}
if (database.getTable(tableId) == null) {
throw new DdlException("the table " + tableId + " isn't exist");
}
if (database.getTable(groupId) == null) {
throw new DdlException("the parent table " + groupId + " isn't exist");
}
LOG.info("will add table {} to group {}", tableId, groupId);
colocateIndex.addTableToGroup(dbId, tableId, groupId);
ColocatePersistInfo info = ColocatePersistInfo.CreateForAddTable(tableId, groupId, dbId, new ArrayList<>());
Catalog.getInstance().getEditLog().logColocateAddTable(info);
LOG.info("table {} has added to group {}", tableId, groupId);
sendResult(request, response);
}
}
// remove a table from a colocate group
public static class TableAction extends ColocateMetaBaseAction {
TableAction(ActionController controller) {
super(controller);
}
public static void registerAction(ActionController controller) throws IllegalArgException {
TableAction action = new TableAction(controller);
controller.registerHandler(HttpMethod.DELETE, "/api/colocate/table", action);
}
@Override
public void executeInMasterWithAdmin(AuthorizationInfo authInfo, BaseRequest request, BaseResponse response)
throws DdlException {
long tableId = getTableId(request);
LOG.info("will delete table {} from colocate meta", tableId);
Catalog.getCurrentColocateIndex().removeTable(tableId);
ColocatePersistInfo colocateInfo = ColocatePersistInfo.CreateForRemoveTable(tableId);
Catalog.getInstance().getEditLog().logColocateRemoveTable(colocateInfo);
LOG.info("table {} has deleted from colocate meta", tableId);
sendResult(request, response);
}
}
// mark a colocate group to balancing or stable
public static class BalancingGroupAction extends ColocateMetaBaseAction {
BalancingGroupAction(ActionController controller) {
super(controller);
}
public static void registerAction(ActionController controller) throws IllegalArgException {
BalancingGroupAction action = new BalancingGroupAction(controller);
controller.registerHandler(HttpMethod.POST, "/api/colocate/balancing_group", action);
controller.registerHandler(HttpMethod.DELETE, "/api/colocate/balancing_group", action);
}
@Override
public void executeInMasterWithAdmin(AuthorizationInfo authInfo, BaseRequest request, BaseResponse response)
throws DdlException {
long groupId = checkAndGetGroupId(request);
HttpMethod method = request.getRequest().method();
if (method.equals(HttpMethod.POST)) {
colocateIndex.markGroupBalancing(groupId);
ColocatePersistInfo info = ColocatePersistInfo.CreateForMarkBalancing(groupId);
Catalog.getInstance().getEditLog().logColocateMarkBalancing(info);
LOG.info("mark colocate group {} balancing", groupId);
} else if (method.equals(HttpMethod.DELETE)) {
colocateIndex.markGroupStable(groupId);
ColocatePersistInfo info = ColocatePersistInfo.CreateForMarkStable(groupId);
Catalog.getInstance().getEditLog().logColocateMarkStable(info);
LOG.info("mark colocate group {} stable", groupId);
} else {
response.appendContent(new RestBaseResult("HTTP method is not allowed.").toJson());
writeResponse(request, response, HttpResponseStatus.METHOD_NOT_ALLOWED);
}
sendResult(request, response);
}
}
// update a backendsPerBucketSeq meta for a colocate group
public static class BucketSeqAction extends ColocateMetaBaseAction {
private static final Logger LOG = LogManager.getLogger(BucketSeqAction.class);
BucketSeqAction(ActionController controller) {
super(controller);
}
public static void registerAction(ActionController controller) throws IllegalArgException {
BucketSeqAction action = new BucketSeqAction(controller);
controller.registerHandler(HttpMethod.POST, "/api/colocate/bucketseq", action);
}
@Override
public void executeInMasterWithAdmin(AuthorizationInfo authInfo, BaseRequest request, BaseResponse response)
throws DdlException {
final String clusterName = authInfo.cluster;
if (Strings.isNullOrEmpty(clusterName)) {
throw new DdlException("No cluster selected.");
}
long groupId = checkAndGetGroupId(request);
String meta = request.getContent();
Type type = new TypeToken<List<List<Long>>>() {}.getType();
List<List<Long>> backendsPerBucketSeq = new Gson().fromJson(meta, type);
LOG.info("HttpServer {}", backendsPerBucketSeq);
List<Long> clusterBackendIds = Catalog.getCurrentSystemInfo().getClusterBackendIds(clusterName, true);
//check the Backend id
for (List<Long> backendIds : backendsPerBucketSeq) {
for (Long beId : backendIds) {
if (!clusterBackendIds.contains(beId)) {
throw new DdlException("The backend " + beId + " is not exist or alive");
}
}
}
int metaSize = colocateIndex.getBackendsPerBucketSeq(groupId).size();
Preconditions.checkState(backendsPerBucketSeq.size() == metaSize,
backendsPerBucketSeq.size() + " vs. " + metaSize);
updateBackendPerBucketSeq(groupId, backendsPerBucketSeq);
LOG.info("the group {} backendsPerBucketSeq meta has updated", groupId);
sendResult(request, response);
}
private void updateBackendPerBucketSeq(Long groupId, List<List<Long>> backendsPerBucketSeq) {
colocateIndex.markGroupBalancing(groupId);
ColocatePersistInfo info1 = ColocatePersistInfo.CreateForMarkBalancing(groupId);
Catalog.getInstance().getEditLog().logColocateMarkBalancing(info1);
colocateIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
ColocatePersistInfo info2 = ColocatePersistInfo.CreateForBackendsPerBucketSeq(groupId,
backendsPerBucketSeq);
Catalog.getInstance().getEditLog().logColocateBackendsPerBucketSeq(info2);
}
}
}