[FsBroker] Fix bug that broker cannot read file with %3A in name (#3028)

The hdfs support file with name like: "2018-01-01 00%3A00%3A00",
we should support it.

Also change the default broker log level to INFO.
This commit is contained in:
Mingyu Chen
2020-03-04 11:03:01 +08:00
committed by GitHub
parent 70cc6df415
commit c032d634f4
5 changed files with 124 additions and 15 deletions

View File

@ -23,12 +23,15 @@ import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Type;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.utframe.UtFrameUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.util.UUID;
public class QueryPlanTest {
@ -78,6 +81,52 @@ public class QueryPlanTest {
"PROPERTIES (\n" +
" \"replication_num\" = \"1\"\n" +
");");
createTable("CREATE TABLE test.`bigtable` (\n" +
" `k1` tinyint(4) NULL COMMENT \"\",\n" +
" `k2` smallint(6) NULL COMMENT \"\",\n" +
" `k3` int(11) NULL COMMENT \"\",\n" +
" `k4` bigint(20) NULL COMMENT \"\",\n" +
" `k5` decimal(9, 3) NULL COMMENT \"\",\n" +
" `k6` char(5) NULL COMMENT \"\",\n" +
" `k10` date NULL COMMENT \"\",\n" +
" `k11` datetime NULL COMMENT \"\",\n" +
" `k7` varchar(20) NULL COMMENT \"\",\n" +
" `k8` double MAX NULL COMMENT \"\",\n" +
" `k9` float SUM NULL COMMENT \"\"\n" +
") ENGINE=OLAP\n" +
"AGGREGATE KEY(`k1`, `k2`, `k3`, `k4`, `k5`, `k6`, `k10`, `k11`, `k7`)\n" +
"COMMENT \"OLAP\"\n" +
"DISTRIBUTED BY HASH(`k1`) BUCKETS 5\n" +
"PROPERTIES (\n" +
"\"replication_num\" = \"1\"\n" +
");");
createTable("CREATE TABLE test.`baseall` (\n" +
" `k1` tinyint(4) NULL COMMENT \"\",\n" +
" `k2` smallint(6) NULL COMMENT \"\",\n" +
" `k3` int(11) NULL COMMENT \"\",\n" +
" `k4` bigint(20) NULL COMMENT \"\",\n" +
" `k5` decimal(9, 3) NULL COMMENT \"\",\n" +
" `k6` char(5) NULL COMMENT \"\",\n" +
" `k10` date NULL COMMENT \"\",\n" +
" `k11` datetime NULL COMMENT \"\",\n" +
" `k7` varchar(20) NULL COMMENT \"\",\n" +
" `k8` double MAX NULL COMMENT \"\",\n" +
" `k9` float SUM NULL COMMENT \"\"\n" +
") ENGINE=OLAP\n" +
"AGGREGATE KEY(`k1`, `k2`, `k3`, `k4`, `k5`, `k6`, `k10`, `k11`, `k7`)\n" +
"COMMENT \"OLAP\"\n" +
"DISTRIBUTED BY HASH(`k1`) BUCKETS 5\n" +
"PROPERTIES (\n" +
"\"replication_num\" = \"1\"\n" +
");");
}
@AfterClass
public static void tearDown() {
File file = new File(runningDir);
file.delete();
}
private static void createTable(String sql) throws Exception {
@ -232,4 +281,12 @@ public class QueryPlanTest {
"type not match, originType=HLL, targeType=DOUBLE"
);
}
@Test
public void testTypeCast() throws Exception {
// cmy: this test may sometimes failed in our daily test env, so I add a case here.
String sql = "select * from test.baseall a where k1 in (select k1 from test.bigtable b where k2 > 0 and k1 = 1);";
UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql);
Assert.assertEquals(MysqlStateType.EOF, connectContext.getState().getStateType());
}
}

View File

@ -8,6 +8,6 @@ log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,S
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = ${BROKER_LOG_DIR}/apache_hdfs_broker.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG
log4j.appender.D.Threshold = INFO
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n

View File

@ -523,7 +523,8 @@ public class FileSystemManager {
"errors while get file pos from output stream");
}
if (currentStreamOffset != offset) {
logger.warn("invalid offset, current read offset is "
// it's ok, when reading some format like parquet, it is not a sequential read
logger.debug("invalid offset, current read offset is "
+ currentStreamOffset + " is not equal to request offset "
+ offset + " seek to it");
try {

View File

@ -26,7 +26,6 @@ import org.apache.logging.log4j.Logger;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
@ -48,8 +47,12 @@ public class WildcardURI {
public WildcardURI(String path) {
try {
// 1. call URLEncoder.encode to encode all special character, like /, *, [, %
// 2. recover the : and /
// 3. the space(" ") will be encoded to "+", we have to change it to "%20"
// example can be found in WildcardURITest.java
String encodedPath = URLEncoder.encode(path, StandardCharsets.UTF_8.toString()).replaceAll("%3A",
":").replaceAll("%2F", "/");
":").replaceAll("%2F", "/").replaceAll("\\+", "%20");
uri = new URI(encodedPath);
uri.normalize();
} catch (UnsupportedEncodingException | URISyntaxException e) {
@ -64,17 +67,11 @@ public class WildcardURI {
return uri;
}
public String getPath() {
try {
return URLDecoder.decode(uri.getPath(), StandardCharsets.UTF_8.toString());
} catch (UnsupportedEncodingException e) {
logger.warn("failed to get path: " + uri.getPath(), e);
throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
e, "failed to get path {} ", uri.getPath());
}
}
public String getAuthority() {
return uri.getAuthority();
}
public String getPath() {
return uri.getPath();
}
}

View File

@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.broker.hdfs;
import org.apache.doris.common.WildcardURI;
import org.junit.Assert;
import org.junit.Test;
public class WildcardURITest {
@Test
public void test() {
String path = "hdfs://host/testdata/20180[8-9]*";
WildcardURI wildcardURI = new WildcardURI(path);
Assert.assertEquals("/testdata/20180[8-9]*", wildcardURI.getPath());
path = "hdfs://host/testdata/2018+ 0[8-9]*";
wildcardURI = new WildcardURI(path);
Assert.assertEquals("/testdata/2018+ 0[8-9]*", wildcardURI.getPath());
path = "hdfs://host/testdata/2018-01-01 00%3A00%3A00";
wildcardURI = new WildcardURI(path);
Assert.assertEquals("/testdata/2018-01-01 00%3A00%3A00", wildcardURI.getPath());
path = "hdfs://host/testdata/2018-01-01 00*";
wildcardURI = new WildcardURI(path);
Assert.assertEquals("/testdata/2018-01-01 00*", wildcardURI.getPath());
path = "hdfs://host/testdata/2018-01-01#123#";
wildcardURI = new WildcardURI(path);
Assert.assertEquals("/testdata/2018-01-01#123#", wildcardURI.getPath());
path = "hdfs://host/testdata/2018-01-01#123 +#*";
wildcardURI = new WildcardURI(path);
Assert.assertEquals("/testdata/2018-01-01#123 +#*", wildcardURI.getPath());
}
}