pick #38215 --------- Co-authored-by: Zou Xinyi <zouxinyi@selectdb.com>
This commit is contained in:
@ -17,6 +17,8 @@
|
||||
|
||||
#include "result_sink_operator.h"
|
||||
|
||||
#include <sys/select.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "common/config.h"
|
||||
@ -96,7 +98,8 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
|
||||
}
|
||||
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
|
||||
std::shared_ptr<arrow::Schema> arrow_schema;
|
||||
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema));
|
||||
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema,
|
||||
state->timezone()));
|
||||
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
|
||||
arrow_schema);
|
||||
_writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
|
||||
|
||||
@ -46,7 +46,8 @@ namespace doris {
|
||||
|
||||
using strings::Substitute;
|
||||
|
||||
Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::DataType>* result) {
|
||||
Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::DataType>* result,
|
||||
const std::string& timezone) {
|
||||
switch (type.type) {
|
||||
case TYPE_NULL:
|
||||
*result = arrow::null();
|
||||
@ -96,11 +97,11 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
|
||||
break;
|
||||
case TYPE_DATETIMEV2:
|
||||
if (type.scale > 3) {
|
||||
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
|
||||
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO, timezone);
|
||||
} else if (type.scale > 0) {
|
||||
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MILLI);
|
||||
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MILLI, timezone);
|
||||
} else {
|
||||
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND);
|
||||
*result = std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND, timezone);
|
||||
}
|
||||
break;
|
||||
case TYPE_DECIMALV2:
|
||||
@ -120,7 +121,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
|
||||
case TYPE_ARRAY: {
|
||||
DCHECK_EQ(type.children.size(), 1);
|
||||
std::shared_ptr<arrow::DataType> item_type;
|
||||
static_cast<void>(convert_to_arrow_type(type.children[0], &item_type));
|
||||
static_cast<void>(convert_to_arrow_type(type.children[0], &item_type, timezone));
|
||||
*result = std::make_shared<arrow::ListType>(item_type);
|
||||
break;
|
||||
}
|
||||
@ -128,8 +129,8 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
|
||||
DCHECK_EQ(type.children.size(), 2);
|
||||
std::shared_ptr<arrow::DataType> key_type;
|
||||
std::shared_ptr<arrow::DataType> val_type;
|
||||
static_cast<void>(convert_to_arrow_type(type.children[0], &key_type));
|
||||
static_cast<void>(convert_to_arrow_type(type.children[1], &val_type));
|
||||
static_cast<void>(convert_to_arrow_type(type.children[0], &key_type, timezone));
|
||||
static_cast<void>(convert_to_arrow_type(type.children[1], &val_type, timezone));
|
||||
*result = std::make_shared<arrow::MapType>(key_type, val_type);
|
||||
break;
|
||||
}
|
||||
@ -138,7 +139,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
|
||||
std::vector<std::shared_ptr<arrow::Field>> fields;
|
||||
for (size_t i = 0; i < type.children.size(); i++) {
|
||||
std::shared_ptr<arrow::DataType> field_type;
|
||||
static_cast<void>(convert_to_arrow_type(type.children[i], &field_type));
|
||||
static_cast<void>(convert_to_arrow_type(type.children[i], &field_type, timezone));
|
||||
fields.push_back(std::make_shared<arrow::Field>(type.field_names[i], field_type,
|
||||
type.contains_nulls[i]));
|
||||
}
|
||||
@ -156,20 +157,22 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status convert_to_arrow_field(SlotDescriptor* desc, std::shared_ptr<arrow::Field>* field) {
|
||||
Status convert_to_arrow_field(SlotDescriptor* desc, std::shared_ptr<arrow::Field>* field,
|
||||
const std::string& timezone) {
|
||||
std::shared_ptr<arrow::DataType> type;
|
||||
RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type));
|
||||
RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type, timezone));
|
||||
*field = arrow::field(desc->col_name(), type, desc->is_nullable());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status convert_block_arrow_schema(const vectorized::Block& block,
|
||||
std::shared_ptr<arrow::Schema>* result) {
|
||||
std::shared_ptr<arrow::Schema>* result,
|
||||
const std::string& timezone) {
|
||||
std::vector<std::shared_ptr<arrow::Field>> fields;
|
||||
for (const auto& type_and_name : block) {
|
||||
std::shared_ptr<arrow::DataType> arrow_type;
|
||||
RETURN_IF_ERROR(convert_to_arrow_type(type_and_name.type->get_type_as_type_descriptor(),
|
||||
&arrow_type));
|
||||
&arrow_type, timezone));
|
||||
fields.push_back(std::make_shared<arrow::Field>(type_and_name.name, arrow_type,
|
||||
type_and_name.type->is_nullable()));
|
||||
}
|
||||
@ -178,12 +181,13 @@ Status convert_block_arrow_schema(const vectorized::Block& block,
|
||||
}
|
||||
|
||||
Status convert_to_arrow_schema(const RowDescriptor& row_desc,
|
||||
std::shared_ptr<arrow::Schema>* result) {
|
||||
std::shared_ptr<arrow::Schema>* result,
|
||||
const std::string& timezone) {
|
||||
std::vector<std::shared_ptr<arrow::Field>> fields;
|
||||
for (auto tuple_desc : row_desc.tuple_descriptors()) {
|
||||
for (auto desc : tuple_desc->slots()) {
|
||||
std::shared_ptr<arrow::Field> field;
|
||||
RETURN_IF_ERROR(convert_to_arrow_field(desc, &field));
|
||||
RETURN_IF_ERROR(convert_to_arrow_field(desc, &field, timezone));
|
||||
fields.push_back(field);
|
||||
}
|
||||
}
|
||||
@ -192,12 +196,13 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc,
|
||||
}
|
||||
|
||||
Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs,
|
||||
std::shared_ptr<arrow::Schema>* result) {
|
||||
std::shared_ptr<arrow::Schema>* result,
|
||||
const std::string& timezone) {
|
||||
std::vector<std::shared_ptr<arrow::Field>> fields;
|
||||
for (int i = 0; i < output_vexpr_ctxs.size(); i++) {
|
||||
std::shared_ptr<arrow::DataType> arrow_type;
|
||||
auto root_expr = output_vexpr_ctxs.at(i)->root();
|
||||
RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type));
|
||||
RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type, timezone));
|
||||
auto field_name = root_expr->is_slot_ref() && !root_expr->expr_label().empty()
|
||||
? root_expr->expr_label()
|
||||
: fmt::format("{}_{}", root_expr->data_type()->get_name(), i);
|
||||
|
||||
@ -41,17 +41,20 @@ namespace doris {
|
||||
|
||||
class RowDescriptor;
|
||||
|
||||
Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::DataType>* result);
|
||||
Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::DataType>* result,
|
||||
const std::string& timezone);
|
||||
|
||||
// Convert Doris RowDescriptor to Arrow Schema.
|
||||
Status convert_to_arrow_schema(const RowDescriptor& row_desc,
|
||||
std::shared_ptr<arrow::Schema>* result);
|
||||
std::shared_ptr<arrow::Schema>* result, const std::string& timezone);
|
||||
|
||||
Status convert_block_arrow_schema(const vectorized::Block& block,
|
||||
std::shared_ptr<arrow::Schema>* result);
|
||||
std::shared_ptr<arrow::Schema>* result,
|
||||
const std::string& timezone);
|
||||
|
||||
Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs,
|
||||
std::shared_ptr<arrow::Schema>* result);
|
||||
std::shared_ptr<arrow::Schema>* result,
|
||||
const std::string& timezone);
|
||||
|
||||
Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result);
|
||||
|
||||
|
||||
@ -266,7 +266,8 @@ Status VParquetTransformer::_parse_schema() {
|
||||
std::vector<std::shared_ptr<arrow::Field>> fields;
|
||||
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
|
||||
std::shared_ptr<arrow::DataType> type;
|
||||
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type));
|
||||
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type,
|
||||
_state->timezone()));
|
||||
if (_parquet_schemas != nullptr) {
|
||||
std::shared_ptr<arrow::Field> field =
|
||||
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
|
||||
|
||||
@ -88,7 +88,7 @@ Status MemoryScratchSink::send(RuntimeState* state, Block* input_block, bool eos
|
||||
*input_block, &block));
|
||||
std::shared_ptr<arrow::Schema> block_arrow_schema;
|
||||
// After expr executed, use recaculated schema as final schema
|
||||
RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema));
|
||||
RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema, state->timezone()));
|
||||
RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, arrow::default_memory_pool(),
|
||||
&result, _timezone_obj));
|
||||
_queue->blocking_put(result);
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
#include "vec/sink/vresult_sink.h"
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <sys/select.h>
|
||||
#include <time.h>
|
||||
|
||||
#include <new>
|
||||
@ -105,7 +106,8 @@ Status VResultSink::prepare(RuntimeState* state) {
|
||||
}
|
||||
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
|
||||
std::shared_ptr<arrow::Schema> arrow_schema;
|
||||
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema));
|
||||
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema,
|
||||
state->timezone()));
|
||||
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
|
||||
arrow_schema);
|
||||
_writer.reset(new (std::nothrow) VArrowFlightResultWriter(_sender.get(), _output_vexpr_ctxs,
|
||||
|
||||
@ -489,7 +489,7 @@ void serialize_and_deserialize_arrow_test() {
|
||||
RowDescriptor row_desc(&tuple_desc, true);
|
||||
// arrow schema
|
||||
std::shared_ptr<arrow::Schema> _arrow_schema;
|
||||
EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema), Status::OK());
|
||||
EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK());
|
||||
|
||||
// serialize
|
||||
std::shared_ptr<arrow::RecordBatch> result;
|
||||
@ -623,7 +623,7 @@ TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) {
|
||||
RowDescriptor row_desc(&tuple_desc, true);
|
||||
// arrow schema
|
||||
std::shared_ptr<arrow::Schema> _arrow_schema;
|
||||
EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema), Status::OK());
|
||||
EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK());
|
||||
|
||||
// serialize
|
||||
std::shared_ptr<arrow::RecordBatch> result;
|
||||
|
||||
Reference in New Issue
Block a user