[fix](hdfs)read 'fs.defaultFS' from core-site.xml for hdfs load which has no default fs (#34217) (#34372)
bp #34217 Co-authored-by: slothever <18522955+wsjz@users.noreply.github.com>
This commit is contained in:
@ -46,9 +46,11 @@ import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@ -316,6 +318,10 @@ public class FileGroupInfo {
|
||||
rangeDesc.setSize(rangeBytes);
|
||||
rangeDesc.setFileSize(fileStatus.size);
|
||||
rangeDesc.setColumnsFromPath(columnsFromPath);
|
||||
if (getFileType() == TFileType.FILE_HDFS) {
|
||||
URI fileUri = new Path(fileStatus.path).toUri();
|
||||
rangeDesc.setFsName(fileUri.getScheme() + "://" + fileUri.getAuthority());
|
||||
}
|
||||
} else {
|
||||
// for stream load
|
||||
if (getFileType() == TFileType.FILE_LOCAL) {
|
||||
|
||||
@ -311,3 +311,8 @@
|
||||
50 guangzhou 2345675
|
||||
200 changsha 3456789
|
||||
|
||||
-- !select15 --
|
||||
30 hangzhou 2345673
|
||||
40 shenzhen 2345674
|
||||
50 guangzhou 2345675
|
||||
200 changsha 3456789
|
||||
|
||||
@ -77,6 +77,39 @@ suite("test_hdfs_json_load", "p0,external,external_docker,external_docker_hive,h
|
||||
assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected")
|
||||
}
|
||||
|
||||
def load_from_hdfs2 = {new_json_reader_flag, strip_flag, fuzzy_flag, testTablex, label, fileName,
|
||||
fsPath, hdfsUser, exprs, jsonpaths, json_root, columns_parameter, where ->
|
||||
def hdfsFilePath = "${fsPath}/user/doris/preinstalled_data/json_format_test/${fileName}"
|
||||
def result1= sql """
|
||||
LOAD LABEL ${label} (
|
||||
DATA INFILE("${hdfsFilePath}")
|
||||
INTO TABLE ${testTablex}
|
||||
FORMAT as "json"
|
||||
${columns_parameter}
|
||||
${exprs}
|
||||
${where}
|
||||
properties(
|
||||
"json_root" = "${json_root}",
|
||||
"jsonpaths" = "${jsonpaths}",
|
||||
"strip_outer_array" = "${strip_flag}",
|
||||
"fuzzy_parse" = "${fuzzy_flag}"
|
||||
)
|
||||
)
|
||||
with HDFS (
|
||||
"hadoop.username" = "${hdfsUser}"
|
||||
)
|
||||
PROPERTIES (
|
||||
"timeout"="1200"
|
||||
);
|
||||
"""
|
||||
|
||||
println "${result1}"
|
||||
|
||||
assertTrue(result1.size() == 1)
|
||||
assertTrue(result1[0].size() == 1)
|
||||
assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected")
|
||||
}
|
||||
|
||||
def check_load_result = {checklabel, testTablex ->
|
||||
def max_try_milli_secs = 30000
|
||||
while(max_try_milli_secs) {
|
||||
@ -514,7 +547,23 @@ suite("test_hdfs_json_load", "p0,external,external_docker,external_docker_hive,h
|
||||
}
|
||||
}
|
||||
|
||||
// case15: verify no default FS properties
|
||||
def q15 = {
|
||||
try {
|
||||
def test_load_label1 = UUID.randomUUID().toString().replaceAll("-", "")
|
||||
sql "DROP TABLE IF EXISTS ${testTable}"
|
||||
create_test_table1.call(testTable)
|
||||
load_from_hdfs2.call("false", "false", "false", testTable, test_load_label1, "nest_json.json", fsPath, hdfsUser,
|
||||
"SET(id = id * 10)", """[\\"\$.id\\", \\"\$.code\\",\\"\$.city\\"]""", '$.item',
|
||||
'(id, code, city)', 'WHERE id>20')
|
||||
|
||||
check_load_result(test_load_label1, testTable)
|
||||
sql "sync"
|
||||
qt_select15 "select * from ${testTable} order by id"
|
||||
} finally {
|
||||
try_sql("DROP TABLE IF EXISTS ${testTable}")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
String enabled = context.config.otherConfigs.get("enableHiveTest")
|
||||
@ -547,5 +596,7 @@ suite("test_hdfs_json_load", "p0,external,external_docker,external_docker_hive,h
|
||||
q13()
|
||||
log.info("Begin Test q14:")
|
||||
q14()
|
||||
log.info("Begin Test q15:")
|
||||
q15()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user