[enhancement](broker) broker load support tencent cos (#12801)

This commit is contained in:
xueweizhang
2022-11-22 21:51:15 +08:00
committed by GitHub
parent 6eeebd47a9
commit 2eca51f3ba
9 changed files with 206 additions and 38 deletions

View File

@ -366,25 +366,25 @@ WITH BROKER broker_name
8. Import a batch of data from HDFS, specify the timeout and filter ratio. Broker with clear text my_hdfs_broker. Simple authentication. And delete the columns in the original data that match the columns with v2 greater than 100 in the imported data, and other columns are imported normally
```sql
LOAD LABEL example_db.label8
(
MERGE DATA INFILE("HDFS://test:802/input/file")
INTO TABLE `my_table`
(k1, k2, k3, v2, v1)
DELETE ON v2 > 100
)
WITH HDFS
(
"hadoop.username"="user",
"password"="pass"
)
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1"
);
````
```sql
LOAD LABEL example_db.label8
(
MERGE DATA INFILE("HDFS://test:802/input/file")
INTO TABLE `my_table`
(k1, k2, k3, v2, v1)
DELETE ON v2 > 100
)
WITH HDFS
(
"hadoop.username"="user",
"password"="pass"
)
PROPERTIES
(
"timeout" = "3600",
"max_filter_ratio" = "0.1"
);
````
Import using the MERGE method. `my_table` must be a table with Unique Key. When the value of the v2 column in the imported data is greater than 100, the row is considered a delete row.
@ -392,21 +392,21 @@ WITH BROKER broker_name
9. Specify the source_sequence column when importing to ensure the replacement order in the UNIQUE_KEYS table:
```sql
LOAD LABEL example_db.label9
(
DATA INFILE("HDFS://test:802/input/file")
INTO TABLE `my_table`
COLUMNS TERMINATED BY ","
(k1,k2,source_sequence,v1,v2)
ORDER BY source_sequence
)
WITH HDFS
(
"hadoop.username"="user",
"password"="pass"
)
````
```sql
LOAD LABEL example_db.label9
(
DATA INFILE("HDFS://test:802/input/file")
INTO TABLE `my_table`
COLUMNS TERMINATED BY ","
(k1,k2,source_sequence,v1,v2)
ORDER BY source_sequence
)
WITH HDFS
(
"hadoop.username"="user",
"password"="pass"
)
````
`my_table` must be an Unqiue Key model table with Sequence Col specified. The data will be ordered according to the value of the `source_sequence` column in the source data.
@ -459,6 +459,24 @@ WITH BROKER broker_name
"max_filter_ratio"="0.1"
);
```
11. Load data in csv format from cos(Tencent Cloud Object Storage).
```SQL
LOAD LABEL example_db.label10
(
DATA INFILE("cosn://my_bucket/input/file.csv")
INTO TABLE `my_table`
(k1, k2, k3)
)
WITH BROKER "broker_name"
(
"fs.cosn.userinfo.secretId" = "xxx",
"fs.cosn.userinfo.secretKey" = "xxxx",
"fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
)
```
### Keywords
BROKER, LOAD

View File

@ -189,6 +189,21 @@ PROPERTIES (
)
```
9. Export all data in the testTbl table to cos(Tencent Cloud Object Storage).
```sql
EXPORT TABLE testTbl TO "cosn://my_bucket/export/a/b/c"
PROPERTIES (
"column_separator"=",",
"line_delimiter" = "\n"
) WITH BROKER "broker_name"
(
"fs.cosn.userinfo.secretId" = "xxx",
"fs.cosn.userinfo.secretKey" = "xxxx",
"fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
)
```
### Keywords
EXPORT

View File

@ -187,7 +187,7 @@ illustrate:
);
````
5. Export the query result of the select statement to the file `cos://${bucket_name}/path/result.txt`. Specify the export format as csv.
5. Export the query result of the select statement to the file `s3a://${bucket_name}/path/result.txt`. Specify the export format as csv.
After the export is complete, an identity file is generated.
```sql
@ -285,6 +285,26 @@ illustrate:
If the final generated file is not larger than 100MB, it will be: `result_0.csv`.
If larger than 100MB, it may be `result_0.csv, result_1.csv, ...`.
9. Export the query result of the select statement to the file `cosn://${bucket_name}/path/result.txt` on Tencent Cloud Object Storage (COS). Specify the export format as csv.
After the export is complete, an identity file is generated.
```sql
select k1,k2,v1 from tbl1 limit 100000
into outfile "cosn://my_bucket/export/my_file_"
FORMAT AS CSV
PROPERTIES
(
"broker.name" = "broker_name",
"broker.fs.cosn.userinfo.secretId" = "xxx",
"broker.fs.cosn.userinfo.secretKey" = "xxxx",
"broker.fs.cosn.bucket.endpoint_suffix" = "https://cos.xxxxxx.myqcloud.com/",
"column_separator" = ",",
"line_delimiter" = "\n",
"max_file_size" = "1024MB",
"success_file_name" = "SUCCESS"
)
````
### keywords
OUTFILE

View File

@ -457,6 +457,22 @@ WITH BROKER broker_name
"timeout"="1200",
"max_filter_ratio"="0.1"
);
11. 从腾讯云cos中以csv格式导入数据。
```SQL
LOAD LABEL example_db.label10
(
DATA INFILE("cosn://my_bucket/input/file.csv")
INTO TABLE `my_table`
(k1, k2, k3)
)
WITH BROKER "broker_name"
(
"fs.cosn.userinfo.secretId" = "xxx",
"fs.cosn.userinfo.secretKey" = "xxxx",
"fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
)
```
### Keywords

View File

@ -188,6 +188,21 @@ PROPERTIES (
)
```
9. 将 testTbl 表中的所有数据导出到 cos(腾讯云) 上。
```sql
EXPORT TABLE testTbl TO "cosn://my_bucket/export/a/b/c"
PROPERTIES (
"column_separator"=",",
"line_delimiter" = "\n"
) WITH BROKER "broker_name"
(
"fs.cosn.userinfo.secretId" = "xxx",
"fs.cosn.userinfo.secretKey" = "xxxx",
"fs.cosn.bucket.endpoint_suffix" = "cos.xxxxxxxxx.myqcloud.com"
)
```
### Keywords
EXPORT

View File

@ -188,7 +188,7 @@ INTO OUTFILE "file_path"
);
```
5. 将 select 语句的查询结果导出到文件 `cos://${bucket_name}/path/result.txt`。指定导出格式为 csv。
5. 将 select 语句的查询结果导出到文件 `s3a://${bucket_name}/path/result.txt`。指定导出格式为 csv。
导出完成后,生成一个标识文件。
```sql
@ -287,6 +287,26 @@ INTO OUTFILE "file_path"
最终生成文件如如果不大于 100MB,则为:`result_0.csv`。
如果大于 100MB,则可能为 `result_0.csv, result_1.csv, ...`。
9. 将 select 语句的查询结果导出到腾讯云cos的文件 `cosn://${bucket_name}/path/result.txt`。指定导出格式为 csv。
导出完成后,生成一个标识文件。
```sql
select k1,k2,v1 from tbl1 limit 100000
into outfile "cosn://my_bucket/export/my_file_"
FORMAT AS CSV
PROPERTIES
(
"broker.name" = "broker_name",
"broker.fs.cosn.userinfo.secretId" = "xxx",
"broker.fs.cosn.userinfo.secretKey" = "xxxx",
"broker.fs.cosn.bucket.endpoint_suffix" = "https://cos.xxxxxx.myqcloud.com/",
"column_separator" = ",",
"line_delimiter" = "\n",
"max_file_size" = "1024MB",
"success_file_name" = "SUCCESS"
)
```
### keywords
SELECT, INTO, OUTFILE

View File

@ -240,9 +240,10 @@ public class ExportStmt extends StatementBase {
&& !schema.equalsIgnoreCase("ofs")
&& !schema.equalsIgnoreCase("obs")
&& !schema.equalsIgnoreCase("oss")
&& !schema.equalsIgnoreCase("s3a"))) {
&& !schema.equalsIgnoreCase("s3a")
&& !schema.equalsIgnoreCase("cosn"))) {
throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'ofs://', 'obs://',"
+ "'oss://'," + " or 's3a://' path.");
+ "'oss://', 's3a://' or 'cosn://' path.");
}
} else if (type == StorageBackend.StorageType.S3) {
if (schema == null || !schema.equalsIgnoreCase("s3")) {

View File

@ -292,6 +292,12 @@ under the License.
<artifactId>hadoop-aliyun</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.qcloud.cos/hadoop-cos -->
<dependency>
<groupId>com.qcloud.cos</groupId>
<artifactId>hadoop-cos</artifactId>
<version>2.8.5-8.1.8</version>
</dependency>
</dependencies>
<build>
<finalName>apache_hdfs_broker</finalName>

View File

@ -70,6 +70,7 @@ public class FileSystemManager {
private static final String CHDFS_SCHEME = "ofs";
private static final String OBS_SCHEME = "obs";
private static final String OSS_SCHEME = "oss";
private static final String COS_SCHEME = "cosn";
private static final String USER_NAME_KEY = "username";
private static final String PASSWORD_KEY = "password";
@ -124,6 +125,13 @@ public class FileSystemManager {
private static final String FS_OSS_IMPL_DISABLE_CACHE = "fs.oss.impl.disable.cache";
private static final String FS_OSS_IMPL = "fs.oss.impl";
// arguments for cos
private static final String FS_COS_ACCESS_KEY = "fs.cosn.userinfo.secretId";
private static final String FS_COS_SECRET_KEY = "fs.cosn.userinfo.secretKey";
private static final String FS_COS_ENDPOINT = "fs.cosn.bucket.endpoint_suffix";
private static final String FS_COS_IMPL = "fs.cosn.impl";
private static final String FS_COS_IMPL_DISABLE_CACHE = "fs.cosn.impl.disable.cache";
private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2);
private int readBufferSize = 128 << 10; // 128k
@ -187,6 +195,8 @@ public class FileSystemManager {
brokerFileSystem = getOBSFileSystem(path, properties);
} else if (scheme.equals(OSS_SCHEME)) {
brokerFileSystem = getOSSFileSystem(path, properties);
} else if (scheme.equals(COS_SCHEME)) {
brokerFileSystem = getCOSFileSystem(path, properties);
} else {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"invalid path. scheme is not supported");
@ -681,6 +691,53 @@ public class FileSystemManager {
}
}
/**
* visible for test
* <p>
* file system handle is cached, the identity is endpoint + bucket + accessKey_secretKey
*
* @param path
* @param properties
* @return
* @throws URISyntaxException
* @throws Exception
*/
public BrokerFileSystem getCOSFileSystem(String path, Map<String, String> properties) {
WildcardURI pathUri = new WildcardURI(path);
String accessKey = properties.getOrDefault(FS_COS_ACCESS_KEY, "");
String secretKey = properties.getOrDefault(FS_COS_SECRET_KEY, "");
String endpoint = properties.getOrDefault(FS_COS_ENDPOINT, "");
String disableCache = properties.getOrDefault(FS_COS_IMPL_DISABLE_CACHE, "true");
// endpoint is the server host, pathUri.getUri().getHost() is the bucket
// we should use these two params as the host identity, because FileSystem will cache both.
String host = COS_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost();
String cosUgi = accessKey + "," + secretKey;
FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, cosUgi);
BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties);
fileSystem.getLock().lock();
try {
if (fileSystem.getDFSFileSystem() == null) {
logger.info("could not find file system for path " + path + " create a new one");
// create a new filesystem
Configuration conf = new Configuration();
conf.set(FS_COS_ACCESS_KEY, accessKey);
conf.set(FS_COS_SECRET_KEY, secretKey);
conf.set(FS_COS_ENDPOINT, endpoint);
conf.set(FS_COS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
conf.set(FS_COS_IMPL_DISABLE_CACHE, disableCache);
FileSystem cosFileSystem = FileSystem.get(pathUri.getUri(), conf);
fileSystem.setFileSystem(cosFileSystem);
}
return fileSystem;
} catch (Exception e) {
logger.error("errors while connect to " + path, e);
throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
fileSystem.getLock().unlock();
}
}
public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties) {
List<TBrokerFileStatus> resultFileStatus = null;
WildcardURI pathUri = new WildcardURI(path);