Support decimal and timestamp type in orc load (#2759)

This commit is contained in:
HangyuanLiu
2020-01-15 07:40:30 +08:00
committed by ZHAO Chun
parent 64b2291347
commit a36193dfab
9 changed files with 222 additions and 9 deletions

View File

@ -233,9 +233,43 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) {
str_slot->len = wbytes;
break;
}
//TODO (lhy) : support more orc type
case orc::TIMESTAMP:
case orc::DECIMAL:
case orc::DECIMAL: {
int precision = ((orc::Decimal64VectorBatch*) cvb)->precision;
int scale = ((orc::Decimal64VectorBatch*) cvb)->scale;
//Decimal64VectorBatch handles decimal columns with precision no greater than 18.
//Decimal128VectorBatch handles the others.
std::string decimal_str;
if (precision <= 18) {
decimal_str = std::to_string(((orc::Decimal64VectorBatch*) cvb)->values[_current_line_of_group]);
} else {
decimal_str = ((orc::Decimal128VectorBatch*) cvb)->values[_current_line_of_group].toString();
}
//Orc api will fill in 0 at the end, so size must greater than scale
std::string v = decimal_str.substr(0, decimal_str.size() - scale) + "."
+ decimal_str.substr(decimal_str.size() - scale);
str_slot->ptr = reinterpret_cast<char*>(tuple_pool->allocate(v.size()));
memcpy(str_slot->ptr, v.c_str(), v.size());
str_slot->len = v.size();
break;
}
case orc::TIMESTAMP: {
int64_t timestamp = ((orc::TimestampVectorBatch*) cvb)->data[_current_line_of_group];
std::string timezone = _state->timezone();
DateTimeValue dtv;
if (!dtv.from_unixtime(timestamp, timezone)) {
std::stringstream str_error;
str_error << "Parse timestamp (" + std::to_string(timestamp) + ") error";
LOG(WARNING) << str_error.str();
return Status::InternalError(str_error.str());
}
char* buf_end = dtv.to_string((char*) tmp_buf);
wbytes = buf_end - (char*) tmp_buf -1;
str_slot->ptr = reinterpret_cast<char*>(tuple_pool->allocate(wbytes));
memcpy(str_slot->ptr, tmp_buf, wbytes);
str_slot->len = wbytes;
break;
}
default: {
std::stringstream str_error;
str_error << "The field name(" << slot_desc->col_name() << ") type not support. ";

View File

@ -29,6 +29,7 @@
#include "exec/local_file_reader.h"
#include "exec/orc_scanner.h"
#include "exprs/cast_functions.h"
#include "exprs/decimal_operators.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/row_batch.h"
@ -48,6 +49,7 @@ public:
static void SetUpTestCase() {
UserFunctionCache::instance()->init("./be/test/runtime/test_data/user_function_cache/normal");
CastFunctions::init();
DecimalOperators::init();
}
protected:
@ -523,6 +525,183 @@ TEST_F(OrcScannerTest, normal2) {
scanner.close();
}
TEST_F(OrcScannerTest, normal3) {
TBrokerScanRangeParams params;
TTypeDesc varchar_type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::VARCHAR);
scalar_type.__set_len(65535);
node.__set_scalar_type(scalar_type);
varchar_type.types.push_back(node);
}
TTypeDesc decimal_type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::DECIMAL);
node.__set_scalar_type(scalar_type);
decimal_type.types.push_back(node);
}
TTypeDesc datetime_type;
{
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::DATETIME);
node.__set_scalar_type(scalar_type);
datetime_type.types.push_back(node);
}
{
for (int i = 0; i < 5; ++i) {
TExprNode cast_expr;
cast_expr.node_type = TExprNodeType::CAST_EXPR;
cast_expr.type = decimal_type;
cast_expr.__set_opcode(TExprOpcode::CAST);
cast_expr.__set_num_children(1);
cast_expr.__set_output_scale(-1);
cast_expr.__isset.fn = true;
cast_expr.fn.name.function_name = "casttodecimal";
cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
cast_expr.fn.arg_types.push_back(varchar_type);
cast_expr.fn.ret_type = decimal_type;
cast_expr.fn.has_var_args = false;
cast_expr.fn.__set_signature("cast_to_decimal_val(VARCHAR(*))");
cast_expr.fn.__isset.scalar_fn = true;
cast_expr.fn.scalar_fn.symbol = "doris::DecimalOperators::cast_to_decimal_val";
TExprNode slot_ref;
slot_ref.node_type = TExprNodeType::SLOT_REF;
slot_ref.type = varchar_type;
slot_ref.num_children = 0;
slot_ref.__isset.slot_ref = true;
slot_ref.slot_ref.slot_id = i;
slot_ref.slot_ref.tuple_id = 0;
TExpr expr;
expr.nodes.push_back(cast_expr);
expr.nodes.push_back(slot_ref);
params.expr_of_dest_slot.emplace(6 + i, expr);
params.src_slot_ids.push_back(i);
}
{
TExprNode cast_expr;
cast_expr.node_type = TExprNodeType::CAST_EXPR;
cast_expr.type = datetime_type;
cast_expr.__set_opcode(TExprOpcode::CAST);
cast_expr.__set_num_children(1);
cast_expr.__set_output_scale(-1);
cast_expr.__isset.fn = true;
cast_expr.fn.name.function_name = "casttodatetime";
cast_expr.fn.binary_type = TFunctionBinaryType::BUILTIN;
cast_expr.fn.arg_types.push_back(varchar_type);
cast_expr.fn.ret_type = datetime_type;
cast_expr.fn.has_var_args = false;
cast_expr.fn.__set_signature("cast_to_datetime_val(VARCHAR(*))");
cast_expr.fn.__isset.scalar_fn = true;
cast_expr.fn.scalar_fn.symbol = "doris::CastFunctions::cast_to_datetime_val";
TExprNode slot_ref;
slot_ref.node_type = TExprNodeType::SLOT_REF;
slot_ref.type = varchar_type;
slot_ref.num_children = 0;
slot_ref.__isset.slot_ref = true;
slot_ref.slot_ref.slot_id = 5;
slot_ref.slot_ref.tuple_id = 0;
TExpr expr;
expr.nodes.push_back(cast_expr);
expr.nodes.push_back(slot_ref);
params.expr_of_dest_slot.emplace(11, expr);
params.src_slot_ids.push_back(5);
}
}
params.__set_src_tuple_id(0);
params.__set_dest_tuple_id(1);
//init_desc_table
TDescriptorTable t_desc_table;
// table descriptors
TTableDescriptor t_table_desc;
t_table_desc.id = 0;
t_table_desc.tableType = TTableType::BROKER_TABLE;
t_table_desc.numCols = 0;
t_table_desc.numClusteringCols = 0;
t_desc_table.tableDescriptors.push_back(t_table_desc);
t_desc_table.__isset.tableDescriptors = true;
TDescriptorTableBuilder dtb;
TTupleDescriptorBuilder src_tuple_builder;
src_tuple_builder.add_slot(
TSlotDescriptorBuilder().string_type(65535).nullable(true).column_name("col1").column_pos(1).build());
src_tuple_builder.add_slot(
TSlotDescriptorBuilder().string_type(65535).nullable(true).column_name("col2").column_pos(2).build());
src_tuple_builder.add_slot(
TSlotDescriptorBuilder().string_type(65535).nullable(true).column_name("col3").column_pos(3).build());
src_tuple_builder.add_slot(
TSlotDescriptorBuilder().string_type(65535).nullable(true).column_name("col4").column_pos(4).build());
src_tuple_builder.add_slot(
TSlotDescriptorBuilder().string_type(65535).nullable(true).column_name("col5").column_pos(5).build());
src_tuple_builder.add_slot(
TSlotDescriptorBuilder().string_type(65535).nullable(true).column_name("col6").column_pos(6).build());
src_tuple_builder.build(&dtb);
TTupleDescriptorBuilder dest_tuple_builder;
dest_tuple_builder.add_slot(
TSlotDescriptorBuilder().decimal_type(10,9).column_name("col1").column_pos(1).build());
dest_tuple_builder.add_slot(
TSlotDescriptorBuilder().decimal_type(7,5).column_name("col2").column_pos(2).build());
dest_tuple_builder.add_slot(
TSlotDescriptorBuilder().decimal_type(10,9).column_name("col3").column_pos(3).build());
dest_tuple_builder.add_slot(
TSlotDescriptorBuilder().decimal_type(10,5).column_name("col4").column_pos(4).build());
dest_tuple_builder.add_slot(
TSlotDescriptorBuilder().decimal_type(10,5).column_name("col5").column_pos(5).build());
dest_tuple_builder.add_slot(
TSlotDescriptorBuilder().type(TYPE_DATETIME).column_name("col5").column_pos(6).build());
dest_tuple_builder.build(&dtb);
t_desc_table = dtb.desc_tbl();
DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
_runtime_state.set_desc_tbl(_desc_tbl);
std::vector<TBrokerRangeDesc> ranges;
TBrokerRangeDesc rangeDesc;
rangeDesc.start_offset = 0;
rangeDesc.size = -1;
rangeDesc.format_type = TFileFormatType::FORMAT_ORC;
rangeDesc.splittable = false;
rangeDesc.path = "./be/test/exec/test_data/orc_scanner/decimal_and_timestamp.orc";
rangeDesc.file_type = TFileType::FILE_LOCAL;
ranges.push_back(rangeDesc);
ORCScanner scanner(&_runtime_state, _profile, params, ranges, _addresses, &_counter);
ASSERT_TRUE(scanner.open().ok());
MemTracker tracker;
MemPool tuple_pool(&tracker);
Tuple *tuple = (Tuple *) tuple_pool.allocate(_desc_tbl->get_tuple_descriptor(1)->byte_size());
bool eof = false;
ASSERT_TRUE(scanner.get_next(tuple, &tuple_pool, &eof).ok());
ASSERT_EQ(Tuple::to_string(tuple, *_desc_tbl->get_tuple_descriptor(1)),
"(1.123456789 1.12 1.1234500000 1.12345 1.12345 2020-01-14 22:12:19)");
scanner.close();
}
} // end namespace doris
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);

View File

@ -36,7 +36,7 @@ Doris 支持多种导入方式。建议先完整阅读本文档,再根据所
为适配不同的数据导入需求,Doris 系统提供了5种不同的导入方式。每种导入方式支持不同的数据源,存在不同的使用方式(异步,同步)。
所有导入方式都支持 csv 数据格式。其中 Broker load 还支持 parquet 数据格式。
所有导入方式都支持 csv 数据格式。其中 Broker load 还支持 parquet 和 orc 数据格式。
每个导入方式的说明请参阅单个导入方式的操作手册。

View File

@ -84,7 +84,7 @@ under the License.
file_type:
用于指定导入文件的类型,例如:parquet、csv。默认值通过文件后缀名判断。
用于指定导入文件的类型,例如:parquet、orc、csv。默认值通过文件后缀名判断。
column_list:

View File

@ -93,7 +93,7 @@ under the License.
file_type:
用于指定导入文件的类型,例如:parquet、csv。默认值通过文件后缀名判断。
用于指定导入文件的类型,例如:parquet、orc、csv。默认值通过文件后缀名判断。
column_list:

View File

@ -36,7 +36,7 @@ Doris supports multiple imports. It is recommended to read this document in full
To adapt to different data import requirements, Doris system provides five different import methods. Each import mode supports different data sources and has different usage modes (asynchronous, synchronous).
All import methods support CSV data format. Broker load also supports parquet data format.
All import methods support CSV data format. Broker load also supports parquet and orc data format.
For instructions on each import mode, please refer to the operation manual for a single import mode.

View File

@ -87,7 +87,7 @@ under the License.
file_type:
Used to specify the type of imported file, such as parquet, csv. Default values are determined by the file suffix name.
Used to specify the type of imported file, such as parquet, orc, csv. Default values are determined by the file suffix name.
column_list:

View File

@ -92,7 +92,7 @@ For example, the separator X01 of the hive file is specified as "\ x01"
File type:
Used to specify the type of imported file, such as parquet, csv. The default value is determined by the file suffix name.
Used to specify the type of imported file, such as parquet, orc, csv. The default value is determined by the file suffix name.
column_list: