[Improvement](outfile) Support output null in parquet writer (#12970)

This commit is contained in:
Gabriel
2022-09-29 13:36:30 +08:00
committed by GitHub
parent 29fc167548
commit c2fae109c3
10 changed files with 435 additions and 441 deletions

View File

@ -490,99 +490,49 @@ void OlapBlockDataConvertor::OlapColumnDataConvertorDate::set_source_column(
const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) {
OlapBlockDataConvertor::OlapColumnDataConvertorPaddedPODArray<uint24_t>::set_source_column(
typed_column, row_pos, num_rows);
if (is_date_v2(typed_column.type)) {
from_date_v2_ = true;
} else {
from_date_v2_ = false;
}
}
Status OlapBlockDataConvertor::OlapColumnDataConvertorDate::convert_to_olap() {
assert(_typed_column.column);
if (from_date_v2_) {
const vectorized::ColumnVector<vectorized::UInt32>* column_datetime = nullptr;
if (_nullmap) {
auto nullable_column =
assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get());
column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::UInt32>*>(
nullable_column->get_nested_column_ptr().get());
} else {
column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::UInt32>*>(
_typed_column.column.get());
}
assert(column_datetime);
const DateV2Value<DateV2ValueType>* datetime_cur =
(const DateV2Value<DateV2ValueType>*)(column_datetime->get_data().data()) +
_row_pos;
const DateV2Value<DateV2ValueType>* datetime_end = datetime_cur + _num_rows;
uint24_t* value = _values.data();
if (_nullmap) {
const UInt8* nullmap_cur = _nullmap + _row_pos;
while (datetime_cur != datetime_end) {
if (!*nullmap_cur) {
*value = datetime_cur->to_olap_date();
} else {
// do nothing
}
++value;
++datetime_cur;
++nullmap_cur;
}
assert(nullmap_cur == _nullmap + _row_pos + _num_rows &&
value == _values.get_end_ptr());
} else {
while (datetime_cur != datetime_end) {
*value = datetime_cur->to_olap_date();
++value;
++datetime_cur;
}
assert(value == _values.get_end_ptr());
}
return Status::OK();
const vectorized::ColumnVector<vectorized::Int64>* column_datetime = nullptr;
if (_nullmap) {
auto nullable_column =
assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get());
column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>(
nullable_column->get_nested_column_ptr().get());
} else {
const vectorized::ColumnVector<vectorized::Int64>* column_datetime = nullptr;
if (_nullmap) {
auto nullable_column =
assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get());
column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>(
nullable_column->get_nested_column_ptr().get());
} else {
column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>(
_typed_column.column.get());
}
assert(column_datetime);
const VecDateTimeValue* datetime_cur =
(const VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos;
const VecDateTimeValue* datetime_end = datetime_cur + _num_rows;
uint24_t* value = _values.data();
if (_nullmap) {
const UInt8* nullmap_cur = _nullmap + _row_pos;
while (datetime_cur != datetime_end) {
if (!*nullmap_cur) {
*value = datetime_cur->to_olap_date();
} else {
// do nothing
}
++value;
++datetime_cur;
++nullmap_cur;
}
assert(nullmap_cur == _nullmap + _row_pos + _num_rows &&
value == _values.get_end_ptr());
} else {
while (datetime_cur != datetime_end) {
*value = datetime_cur->to_olap_date();
++value;
++datetime_cur;
}
assert(value == _values.get_end_ptr());
}
return Status::OK();
column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>(
_typed_column.column.get());
}
assert(column_datetime);
const VecDateTimeValue* datetime_cur =
(const VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos;
const VecDateTimeValue* datetime_end = datetime_cur + _num_rows;
uint24_t* value = _values.data();
if (_nullmap) {
const UInt8* nullmap_cur = _nullmap + _row_pos;
while (datetime_cur != datetime_end) {
if (!*nullmap_cur) {
*value = datetime_cur->to_olap_date();
} else {
// do nothing
}
++value;
++datetime_cur;
++nullmap_cur;
}
assert(nullmap_cur == _nullmap + _row_pos + _num_rows && value == _values.get_end_ptr());
} else {
while (datetime_cur != datetime_end) {
*value = datetime_cur->to_olap_date();
++value;
++datetime_cur;
}
assert(value == _values.get_end_ptr());
}
return Status::OK();
}
// class OlapBlockDataConvertor::OlapColumnDataConvertorJsonb
@ -660,99 +610,49 @@ void OlapBlockDataConvertor::OlapColumnDataConvertorDateTime::set_source_column(
const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) {
OlapBlockDataConvertor::OlapColumnDataConvertorPaddedPODArray<uint64_t>::set_source_column(
typed_column, row_pos, num_rows);
if (is_date_v2_or_datetime_v2(typed_column.type)) {
from_datetime_v2_ = true;
} else {
from_datetime_v2_ = false;
}
}
Status OlapBlockDataConvertor::OlapColumnDataConvertorDateTime::convert_to_olap() {
assert(_typed_column.column);
if (from_datetime_v2_) {
const vectorized::ColumnVector<vectorized::UInt64>* column_datetimev2 = nullptr;
if (_nullmap) {
auto nullable_column =
assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get());
column_datetimev2 = assert_cast<const vectorized::ColumnVector<vectorized::UInt64>*>(
nullable_column->get_nested_column_ptr().get());
} else {
column_datetimev2 = assert_cast<const vectorized::ColumnVector<vectorized::UInt64>*>(
_typed_column.column.get());
}
assert(column_datetimev2);
const DateV2Value<DateTimeV2ValueType>* datetime_cur =
(const DateV2Value<DateTimeV2ValueType>*)(column_datetimev2->get_data().data()) +
_row_pos;
const DateV2Value<DateTimeV2ValueType>* datetime_end = datetime_cur + _num_rows;
uint64_t* value = _values.data();
if (_nullmap) {
const UInt8* nullmap_cur = _nullmap + _row_pos;
while (datetime_cur != datetime_end) {
if (!*nullmap_cur) {
*value = datetime_cur->to_olap_datetime();
} else {
// do nothing
}
++value;
++datetime_cur;
++nullmap_cur;
}
assert(nullmap_cur == _nullmap + _row_pos + _num_rows &&
value == _values.get_end_ptr());
} else {
while (datetime_cur != datetime_end) {
*value = datetime_cur->to_olap_datetime();
++value;
++datetime_cur;
}
assert(value == _values.get_end_ptr());
}
return Status::OK();
const vectorized::ColumnVector<vectorized::Int64>* column_datetime = nullptr;
if (_nullmap) {
auto nullable_column =
assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get());
column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>(
nullable_column->get_nested_column_ptr().get());
} else {
const vectorized::ColumnVector<vectorized::Int64>* column_datetime = nullptr;
if (_nullmap) {
auto nullable_column =
assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get());
column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>(
nullable_column->get_nested_column_ptr().get());
} else {
column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>(
_typed_column.column.get());
}
assert(column_datetime);
const VecDateTimeValue* datetime_cur =
(const VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos;
const VecDateTimeValue* datetime_end = datetime_cur + _num_rows;
uint64_t* value = _values.data();
if (_nullmap) {
const UInt8* nullmap_cur = _nullmap + _row_pos;
while (datetime_cur != datetime_end) {
if (!*nullmap_cur) {
*value = datetime_cur->to_olap_datetime();
} else {
// do nothing
}
++value;
++datetime_cur;
++nullmap_cur;
}
assert(nullmap_cur == _nullmap + _row_pos + _num_rows &&
value == _values.get_end_ptr());
} else {
while (datetime_cur != datetime_end) {
*value = datetime_cur->to_olap_datetime();
++value;
++datetime_cur;
}
assert(value == _values.get_end_ptr());
}
return Status::OK();
column_datetime = assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>(
_typed_column.column.get());
}
assert(column_datetime);
const VecDateTimeValue* datetime_cur =
(const VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos;
const VecDateTimeValue* datetime_end = datetime_cur + _num_rows;
uint64_t* value = _values.data();
if (_nullmap) {
const UInt8* nullmap_cur = _nullmap + _row_pos;
while (datetime_cur != datetime_end) {
if (!*nullmap_cur) {
*value = datetime_cur->to_olap_datetime();
} else {
// do nothing
}
++value;
++datetime_cur;
++nullmap_cur;
}
assert(nullmap_cur == _nullmap + _row_pos + _num_rows && value == _values.get_end_ptr());
} else {
while (datetime_cur != datetime_end) {
*value = datetime_cur->to_olap_datetime();
++value;
++datetime_cur;
}
assert(value == _values.get_end_ptr());
}
return Status::OK();
}
Status OlapBlockDataConvertor::OlapColumnDataConvertorDecimal::convert_to_olap() {

View File

@ -209,9 +209,6 @@ private:
void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos,
size_t num_rows) override;
Status convert_to_olap() override;
private:
bool from_date_v2_;
};
class OlapColumnDataConvertorDateTime : public OlapColumnDataConvertorPaddedPODArray<uint64_t> {
@ -219,9 +216,6 @@ private:
void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos,
size_t num_rows) override;
Status convert_to_olap() override;
private:
bool from_datetime_v2_;
};
class OlapColumnDataConvertorDecimal
@ -277,11 +271,6 @@ private:
void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos,
size_t num_rows) override {
OlapColumnDataConvertorBase::set_source_column(typed_column, row_pos, num_rows);
if (is_date(typed_column.type)) {
from_date_to_date_v2_ = true;
} else {
from_date_to_date_v2_ = false;
}
}
const void* get_data() const override { return values_; }
@ -296,67 +285,24 @@ private:
}
Status convert_to_olap() override {
if (UNLIKELY(from_date_to_date_v2_)) {
const vectorized::ColumnVector<vectorized::Int64>* column_datetime = nullptr;
if (_nullmap) {
auto nullable_column = assert_cast<const vectorized::ColumnNullable*>(
_typed_column.column.get());
column_datetime =
assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>(
nullable_column->get_nested_column_ptr().get());
} else {
column_datetime =
assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>(
_typed_column.column.get());
}
assert(column_datetime);
const VecDateTimeValue* datetime_cur =
(const VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos;
const VecDateTimeValue* datetime_end = datetime_cur + _num_rows;
uint32_t* value = const_cast<uint32_t*>(values_);
if (_nullmap) {
const UInt8* nullmap_cur = _nullmap + _row_pos;
while (datetime_cur != datetime_end) {
if (!*nullmap_cur) {
*value = datetime_cur->to_date_v2();
} else {
// do nothing
}
++value;
++datetime_cur;
++nullmap_cur;
}
} else {
while (datetime_cur != datetime_end) {
*value = datetime_cur->to_date_v2();
++value;
++datetime_cur;
}
}
return Status::OK();
const vectorized::ColumnVector<uint32>* column_data = nullptr;
if (_nullmap) {
auto nullable_column =
assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get());
column_data = assert_cast<const vectorized::ColumnVector<uint32>*>(
nullable_column->get_nested_column_ptr().get());
} else {
const vectorized::ColumnVector<uint32>* column_data = nullptr;
if (_nullmap) {
auto nullable_column = assert_cast<const vectorized::ColumnNullable*>(
_typed_column.column.get());
column_data = assert_cast<const vectorized::ColumnVector<uint32>*>(
nullable_column->get_nested_column_ptr().get());
} else {
column_data = assert_cast<const vectorized::ColumnVector<uint32>*>(
_typed_column.column.get());
}
assert(column_data);
values_ = (const uint32*)(column_data->get_data().data()) + _row_pos;
return Status::OK();
column_data = assert_cast<const vectorized::ColumnVector<uint32>*>(
_typed_column.column.get());
}
assert(column_data);
values_ = (const uint32*)(column_data->get_data().data()) + _row_pos;
return Status::OK();
}
private:
const uint32_t* values_ = nullptr;
bool from_date_to_date_v2_;
};
class OlapColumnDataConvertorDateTimeV2 : public OlapColumnDataConvertorBase {
@ -367,11 +313,6 @@ private:
void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos,
size_t num_rows) override {
OlapColumnDataConvertorBase::set_source_column(typed_column, row_pos, num_rows);
if (is_date_or_datetime(typed_column.type)) {
from_datetime_to_datetime_v2_ = true;
} else {
from_datetime_to_datetime_v2_ = false;
}
}
const void* get_data() const override { return values_; }
@ -386,67 +327,24 @@ private:
}
Status convert_to_olap() override {
if (UNLIKELY(from_datetime_to_datetime_v2_)) {
const vectorized::ColumnVector<vectorized::Int64>* column_datetime = nullptr;
if (_nullmap) {
auto nullable_column = assert_cast<const vectorized::ColumnNullable*>(
_typed_column.column.get());
column_datetime =
assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>(
nullable_column->get_nested_column_ptr().get());
} else {
column_datetime =
assert_cast<const vectorized::ColumnVector<vectorized::Int64>*>(
_typed_column.column.get());
}
assert(column_datetime);
const VecDateTimeValue* datetime_cur =
(const VecDateTimeValue*)(column_datetime->get_data().data()) + _row_pos;
const VecDateTimeValue* datetime_end = datetime_cur + _num_rows;
uint64_t* value = const_cast<uint64_t*>(values_);
if (_nullmap) {
const UInt8* nullmap_cur = _nullmap + _row_pos;
while (datetime_cur != datetime_end) {
if (!*nullmap_cur) {
*value = datetime_cur->to_datetime_v2();
} else {
// do nothing
}
++value;
++datetime_cur;
++nullmap_cur;
}
} else {
while (datetime_cur != datetime_end) {
*value = datetime_cur->to_datetime_v2();
++value;
++datetime_cur;
}
}
return Status::OK();
const vectorized::ColumnVector<uint64_t>* column_data = nullptr;
if (_nullmap) {
auto nullable_column =
assert_cast<const vectorized::ColumnNullable*>(_typed_column.column.get());
column_data = assert_cast<const vectorized::ColumnVector<uint64_t>*>(
nullable_column->get_nested_column_ptr().get());
} else {
const vectorized::ColumnVector<uint64_t>* column_data = nullptr;
if (_nullmap) {
auto nullable_column = assert_cast<const vectorized::ColumnNullable*>(
_typed_column.column.get());
column_data = assert_cast<const vectorized::ColumnVector<uint64_t>*>(
nullable_column->get_nested_column_ptr().get());
} else {
column_data = assert_cast<const vectorized::ColumnVector<uint64_t>*>(
_typed_column.column.get());
}
assert(column_data);
values_ = (const uint64_t*)(column_data->get_data().data()) + _row_pos;
return Status::OK();
column_data = assert_cast<const vectorized::ColumnVector<uint64_t>*>(
_typed_column.column.get());
}
assert(column_data);
values_ = (const uint64_t*)(column_data->get_data().data()) + _row_pos;
return Status::OK();
}
private:
const uint64_t* values_ = nullptr;
bool from_datetime_to_datetime_v2_;
};
// decimalv3 don't need to do any convert

View File

@ -781,17 +781,6 @@ public:
return val;
}
uint64_t to_olap_datetime() const {
uint64_t date_val =
date_v2_value_.year_ * 10000 + date_v2_value_.month_ * 100 + date_v2_value_.day_;
uint64_t time_val = 0;
if constexpr (is_datetime) {
time_val = date_v2_value_.hour_ * 10000 + date_v2_value_.minute_ * 100 +
date_v2_value_.second_;
}
return date_val * 1000000 + time_val;
}
bool to_format_string(const char* format, int len, char* to) const;
bool from_date_format_str(const char* format, int format_len, const char* value,

View File

@ -89,20 +89,17 @@ void VParquetWriterWrapper::parse_schema(const std::vector<TParquetSchema>& parq
#define DISPATCH_PARQUET_NUMERIC_WRITER(WRITER, COLUMN_TYPE, NATIVE_TYPE) \
parquet::RowGroupWriter* rgWriter = get_rg_writer(); \
parquet::WRITER* col_writer = static_cast<parquet::WRITER*>(rgWriter->column(i)); \
__int128 default_value = 0; \
if (null_map != nullptr) { \
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \
for (size_t row_id = 0; row_id < sz; row_id++) { \
col_writer->WriteBatch(1, nullptr, nullptr, \
(*null_map)[row_id] != 0 \
? reinterpret_cast<const NATIVE_TYPE*>(&default_value) \
: reinterpret_cast<const NATIVE_TYPE*>( \
assert_cast<const COLUMN_TYPE&>(*col) \
.get_data_at(row_id) \
.data)); \
def_level[row_id] = null_data[row_id] == 0; \
} \
col_writer->WriteBatch(sz, def_level.data(), nullptr, \
reinterpret_cast<const NATIVE_TYPE*>( \
assert_cast<const COLUMN_TYPE&>(*col).get_data().data())); \
} else if (const auto* not_nullable_column = check_and_get_column<const COLUMN_TYPE>(col)) { \
col_writer->WriteBatch( \
sz, nullptr, nullptr, \
sz, nullable ? def_level.data() : nullptr, nullptr, \
reinterpret_cast<const NATIVE_TYPE*>(not_nullable_column->get_data().data())); \
} else { \
RETURN_WRONG_TYPE \
@ -117,14 +114,17 @@ void VParquetWriterWrapper::parse_schema(const std::vector<TParquetSchema>& parq
check_and_get_data_type<DataTypeDecimal<DECIMAL_TYPE>>(remove_nullable(type).get()); \
DCHECK(decimal_type); \
if (null_map != nullptr) { \
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \
for (size_t row_id = 0; row_id < sz; row_id++) { \
if ((*null_map)[row_id] != 0) { \
col_writer->WriteBatch(1, nullptr, nullptr, &value); \
if (null_data[row_id] != 0) { \
single_def_level = 0; \
col_writer->WriteBatch(1, &single_def_level, nullptr, &value); \
single_def_level = 1; \
} else { \
auto s = decimal_type->to_string(*col, row_id); \
value.ptr = reinterpret_cast<const uint8_t*>(s.data()); \
value.len = s.size(); \
col_writer->WriteBatch(1, nullptr, nullptr, &value); \
col_writer->WriteBatch(1, &single_def_level, nullptr, &value); \
} \
} \
} else { \
@ -132,7 +132,7 @@ void VParquetWriterWrapper::parse_schema(const std::vector<TParquetSchema>& parq
auto s = decimal_type->to_string(*col, row_id); \
value.ptr = reinterpret_cast<const uint8_t*>(s.data()); \
value.len = s.size(); \
col_writer->WriteBatch(1, nullptr, nullptr, &value); \
col_writer->WriteBatch(1, nullable ? def_level.data() : nullptr, nullptr, &value); \
} \
}
@ -141,16 +141,19 @@ void VParquetWriterWrapper::parse_schema(const std::vector<TParquetSchema>& parq
parquet::ByteArrayWriter* col_writer = \
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); \
if (null_map != nullptr) { \
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \
for (size_t row_id = 0; row_id < sz; row_id++) { \
if ((*null_map)[row_id] != 0) { \
if (null_data[row_id] != 0) { \
single_def_level = 0; \
parquet::ByteArray value; \
col_writer->WriteBatch(1, nullptr, nullptr, &value); \
col_writer->WriteBatch(1, &single_def_level, nullptr, &value); \
single_def_level = 1; \
} else { \
const auto& tmp = col->get_data_at(row_id); \
parquet::ByteArray value; \
value.ptr = reinterpret_cast<const uint8_t*>(tmp.data); \
value.len = tmp.size; \
col_writer->WriteBatch(1, nullptr, nullptr, &value); \
col_writer->WriteBatch(1, &single_def_level, nullptr, &value); \
} \
} \
} else if (const auto* not_nullable_column = check_and_get_column<const COLUMN_TYPE>(col)) { \
@ -159,7 +162,7 @@ void VParquetWriterWrapper::parse_schema(const std::vector<TParquetSchema>& parq
parquet::ByteArray value; \
value.ptr = reinterpret_cast<const uint8_t*>(tmp.data); \
value.len = tmp.size; \
col_writer->WriteBatch(1, nullptr, nullptr, &value); \
col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, &value); \
} \
} else { \
RETURN_WRONG_TYPE \
@ -173,22 +176,25 @@ Status VParquetWriterWrapper::write(const Block& block) {
try {
for (size_t i = 0; i < block.columns(); i++) {
auto& raw_column = block.get_by_position(i).column;
const auto col = raw_column->is_nullable()
? reinterpret_cast<const ColumnNullable*>(
block.get_by_position(i).column.get())
->get_nested_column_ptr()
.get()
: block.get_by_position(i).column.get();
auto null_map =
raw_column->is_nullable() && reinterpret_cast<const ColumnNullable*>(
block.get_by_position(i).column.get())
->get_null_map_column_ptr()
->has_null()
? reinterpret_cast<const ColumnNullable*>(
block.get_by_position(i).column.get())
->get_null_map_column_ptr()
: nullptr;
auto nullable = raw_column->is_nullable();
const auto col = nullable ? reinterpret_cast<const ColumnNullable*>(
block.get_by_position(i).column.get())
->get_nested_column_ptr()
.get()
: block.get_by_position(i).column.get();
auto null_map = nullable && reinterpret_cast<const ColumnNullable*>(
block.get_by_position(i).column.get())
->has_null()
? reinterpret_cast<const ColumnNullable*>(
block.get_by_position(i).column.get())
->get_null_map_column_ptr()
: nullptr;
auto& type = block.get_by_position(i).type;
std::vector<int16_t> def_level(sz);
// For scalar type, definition level == 1 means this value is not NULL.
std::fill(def_level.begin(), def_level.end(), 1);
int16_t single_def_level = 1;
switch (_output_vexpr_ctxs[i]->root()->type().type) {
case TYPE_BOOLEAN: {
DISPATCH_PARQUET_NUMERIC_WRITER(BoolWriter, ColumnVector<UInt8>, bool)
@ -210,63 +216,49 @@ Status VParquetWriterWrapper::write(const Block& block) {
break;
}
case TYPE_TINYINT:
case TYPE_SMALLINT:
case TYPE_INT: {
case TYPE_SMALLINT: {
parquet::RowGroupWriter* rgWriter = get_rg_writer();
parquet::Int32Writer* col_writer =
static_cast<parquet::Int32Writer*>(rgWriter->column(i));
int32_t default_int32 = 0;
if (null_map != nullptr) {
if (const auto* nested_column =
check_and_get_column<const ColumnVector<Int32>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
col_writer->WriteBatch(
1, nullptr, nullptr,
(*null_map)[row_id] != 0
? &default_int32
: reinterpret_cast<const int32_t*>(
nested_column->get_data_at(row_id).data));
}
} else if (const auto* int16_column =
check_and_get_column<const ColumnVector<Int16>>(col)) {
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data();
if (const auto* int16_column =
check_and_get_column<const ColumnVector<Int16>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
if (null_data[row_id] != 0) {
single_def_level = 0;
}
const int32_t tmp = int16_column->get_data()[row_id];
col_writer->WriteBatch(
1, nullptr, nullptr,
(*null_map)[row_id] != 0
? &default_int32
: reinterpret_cast<const int32_t*>(&tmp));
col_writer->WriteBatch(1, &single_def_level, nullptr,
reinterpret_cast<const int32_t*>(&tmp));
single_def_level = 1;
}
} else if (const auto* int8_column =
check_and_get_column<const ColumnVector<Int8>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
if (null_data[row_id] != 0) {
single_def_level = 0;
}
const int32_t tmp = int8_column->get_data()[row_id];
col_writer->WriteBatch(
1, nullptr, nullptr,
(*null_map)[row_id] != 0
? &default_int32
: reinterpret_cast<const int32_t*>(&tmp));
col_writer->WriteBatch(1, &single_def_level, nullptr,
reinterpret_cast<const int32_t*>(&tmp));
single_def_level = 1;
}
} else {
RETURN_WRONG_TYPE
}
} else if (const auto* not_nullable_column =
check_and_get_column<const ColumnVector<Int32>>(col)) {
col_writer->WriteBatch(sz, nullptr, nullptr,
reinterpret_cast<const int32_t*>(
not_nullable_column->get_data().data()));
} else if (const auto& int16_column =
check_and_get_column<const ColumnVector<Int16>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
const int32_t tmp = int16_column->get_data()[row_id];
col_writer->WriteBatch(1, nullptr, nullptr,
col_writer->WriteBatch(1, nullable ? def_level.data() : nullptr, nullptr,
reinterpret_cast<const int32_t*>(&tmp));
}
} else if (const auto& int8_column =
check_and_get_column<const ColumnVector<Int8>>(col)) {
for (size_t row_id = 0; row_id < sz; row_id++) {
const int32_t tmp = int8_column->get_data()[row_id];
col_writer->WriteBatch(1, nullptr, nullptr,
col_writer->WriteBatch(1, nullable ? def_level.data() : nullptr, nullptr,
reinterpret_cast<const int32_t*>(&tmp));
}
} else {
@ -274,25 +266,34 @@ Status VParquetWriterWrapper::write(const Block& block) {
}
break;
}
case TYPE_INT: {
DISPATCH_PARQUET_NUMERIC_WRITER(Int32Writer, ColumnVector<Int32>, Int32)
break;
}
case TYPE_DATETIME:
case TYPE_DATE: {
parquet::RowGroupWriter* rgWriter = get_rg_writer();
parquet::Int64Writer* col_writer =
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
int64_t default_int64 = 0;
uint64_t default_int64 = 0;
if (null_map != nullptr) {
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data();
for (size_t row_id = 0; row_id < sz; row_id++) {
if ((*null_map)[row_id] != 0) {
col_writer->WriteBatch(1, nullptr, nullptr, &default_int64);
def_level[row_id] = null_data[row_id] == 0;
}
uint64_t tmp_data[sz];
for (size_t row_id = 0; row_id < sz; row_id++) {
if (null_data[row_id] != 0) {
tmp_data[row_id] = default_int64;
} else {
const auto tmp = binary_cast<Int64, VecDateTimeValue>(
assert_cast<const ColumnVector<Int64>&>(*col)
.get_data()[row_id])
.to_olap_datetime();
col_writer->WriteBatch(1, nullptr, nullptr,
reinterpret_cast<const int64_t*>(&tmp));
tmp_data[row_id] = binary_cast<Int64, VecDateTimeValue>(
assert_cast<const ColumnVector<Int64>&>(*col)
.get_data()[row_id])
.to_olap_datetime();
}
}
col_writer->WriteBatch(sz, def_level.data(), nullptr,
reinterpret_cast<const int64_t*>(tmp_data));
} else if (const auto* not_nullable_column =
check_and_get_column<const ColumnVector<Int64>>(col)) {
std::vector<uint64_t> res(sz);
@ -301,7 +302,7 @@ Status VParquetWriterWrapper::write(const Block& block) {
not_nullable_column->get_data()[row_id])
.to_olap_datetime();
}
col_writer->WriteBatch(sz, nullptr, nullptr,
col_writer->WriteBatch(sz, nullable ? def_level.data() : nullptr, nullptr,
reinterpret_cast<const int64_t*>(res.data()));
} else {
RETURN_WRONG_TYPE
@ -310,32 +311,39 @@ Status VParquetWriterWrapper::write(const Block& block) {
}
case TYPE_DATEV2: {
parquet::RowGroupWriter* rgWriter = get_rg_writer();
parquet::Int64Writer* col_writer =
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
int64_t default_int64 = 0;
parquet::ByteArrayWriter* col_writer =
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
parquet::ByteArray value;
if (null_map != nullptr) {
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data();
for (size_t row_id = 0; row_id < sz; row_id++) {
if ((*null_map)[row_id] != 0) {
col_writer->WriteBatch(1, nullptr, nullptr, &default_int64);
if (null_data[row_id] != 0) {
single_def_level = 0;
col_writer->WriteBatch(1, &single_def_level, nullptr, &value);
single_def_level = 1;
} else {
uint64_t tmp = binary_cast<UInt32, DateV2Value<DateV2ValueType>>(
assert_cast<const ColumnVector<UInt32>&>(*col)
.get_data()[row_id])
.to_olap_datetime();
col_writer->WriteBatch(1, nullptr, nullptr,
reinterpret_cast<const int64_t*>(&tmp));
char buffer[30];
int output_scale = _output_vexpr_ctxs[i]->root()->type().scale;
value.ptr = reinterpret_cast<const uint8_t*>(buffer);
value.len = binary_cast<UInt32, DateV2Value<DateV2ValueType>>(
assert_cast<const ColumnVector<UInt32>&>(*col)
.get_data()[row_id])
.to_buffer(buffer, output_scale);
col_writer->WriteBatch(1, &single_def_level, nullptr, &value);
}
}
} else if (const auto* not_nullable_column =
check_and_get_column<const ColumnVector<UInt32>>(col)) {
std::vector<uint64_t> res(sz);
for (size_t row_id = 0; row_id < sz; row_id++) {
res[row_id] = binary_cast<UInt32, DateV2Value<DateV2ValueType>>(
not_nullable_column->get_data()[row_id])
.to_olap_datetime();
char buffer[30];
int output_scale = _output_vexpr_ctxs[i]->root()->type().scale;
value.ptr = reinterpret_cast<const uint8_t*>(buffer);
value.len = binary_cast<UInt32, DateV2Value<DateV2ValueType>>(
not_nullable_column->get_data()[row_id])
.to_buffer(buffer, output_scale);
col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr,
&value);
}
col_writer->WriteBatch(sz, nullptr, nullptr,
reinterpret_cast<const int64_t*>(res.data()));
} else {
RETURN_WRONG_TYPE
}
@ -343,32 +351,39 @@ Status VParquetWriterWrapper::write(const Block& block) {
}
case TYPE_DATETIMEV2: {
parquet::RowGroupWriter* rgWriter = get_rg_writer();
parquet::Int64Writer* col_writer =
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
int64_t default_int64 = 0;
parquet::ByteArrayWriter* col_writer =
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
parquet::ByteArray value;
if (null_map != nullptr) {
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data();
for (size_t row_id = 0; row_id < sz; row_id++) {
if ((*null_map)[row_id] != 0) {
col_writer->WriteBatch(1, nullptr, nullptr, &default_int64);
if (null_data[row_id] != 0) {
single_def_level = 0;
col_writer->WriteBatch(1, &single_def_level, nullptr, &value);
single_def_level = 1;
} else {
uint64_t tmp = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(
assert_cast<const ColumnVector<UInt64>&>(*col)
.get_data()[row_id])
.to_olap_datetime();
col_writer->WriteBatch(1, nullptr, nullptr,
reinterpret_cast<const int64_t*>(&tmp));
char buffer[30];
int output_scale = _output_vexpr_ctxs[i]->root()->type().scale;
value.ptr = reinterpret_cast<const uint8_t*>(buffer);
value.len = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(
assert_cast<const ColumnVector<UInt64>&>(*col)
.get_data()[row_id])
.to_buffer(buffer, output_scale);
col_writer->WriteBatch(1, &single_def_level, nullptr, &value);
}
}
} else if (const auto* not_nullable_column =
check_and_get_column<const ColumnVector<UInt64>>(col)) {
std::vector<uint64_t> res(sz);
for (size_t row_id = 0; row_id < sz; row_id++) {
res[row_id] = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(
not_nullable_column->get_data()[row_id])
.to_olap_datetime();
char buffer[30];
int output_scale = _output_vexpr_ctxs[i]->root()->type().scale;
value.ptr = reinterpret_cast<const uint8_t*>(buffer);
value.len = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(
not_nullable_column->get_data()[row_id])
.to_buffer(buffer, output_scale);
col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr,
&value);
}
col_writer->WriteBatch(sz, nullptr, nullptr,
reinterpret_cast<const int64_t*>(res.data()));
} else {
RETURN_WRONG_TYPE
}
@ -402,9 +417,12 @@ Status VParquetWriterWrapper::write(const Block& block) {
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
parquet::ByteArray value;
if (null_map != nullptr) {
auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data();
for (size_t row_id = 0; row_id < sz; row_id++) {
if ((*null_map)[row_id] != 0) {
col_writer->WriteBatch(1, nullptr, nullptr, &value);
if (null_data[row_id] != 0) {
single_def_level = 0;
col_writer->WriteBatch(1, &single_def_level, nullptr, &value);
single_def_level = 1;
} else {
const DecimalV2Value decimal_val(reinterpret_cast<const PackedInt128*>(
col->get_data_at(row_id).data)
@ -413,7 +431,7 @@ Status VParquetWriterWrapper::write(const Block& block) {
int output_scale = _output_vexpr_ctxs[i]->root()->type().scale;
value.ptr = reinterpret_cast<const uint8_t*>(decimal_buffer);
value.len = decimal_val.to_buffer(decimal_buffer, output_scale);
col_writer->WriteBatch(1, nullptr, nullptr, &value);
col_writer->WriteBatch(1, &single_def_level, nullptr, &value);
}
}
} else if (const auto* not_nullable_column =
@ -427,7 +445,8 @@ Status VParquetWriterWrapper::write(const Block& block) {
int output_scale = _output_vexpr_ctxs[i]->root()->type().scale;
value.ptr = reinterpret_cast<const uint8_t*>(decimal_buffer);
value.len = decimal_val.to_buffer(decimal_buffer, output_scale);
col_writer->WriteBatch(1, nullptr, nullptr, &value);
col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr,
&value);
}
} else {
RETURN_WRONG_TYPE

View File

@ -190,7 +190,7 @@ public class OutFileClause {
return parquetSchemas;
}
public void analyze(Analyzer analyzer, List<Expr> resultExprs) throws UserException {
public void analyze(Analyzer analyzer, List<Expr> resultExprs, List<String> colLabels) throws UserException {
if (isAnalyzed) {
// If the query stmt is rewritten, the whole stmt will be analyzed again.
// But some of fields in this OutfileClause has been changed,
@ -229,13 +229,13 @@ public class OutFileClause {
isAnalyzed = true;
if (isParquetFormat()) {
analyzeForParquetFormat(resultExprs);
analyzeForParquetFormat(resultExprs, colLabels);
}
}
private void analyzeForParquetFormat(List<Expr> resultExprs) throws AnalysisException {
private void analyzeForParquetFormat(List<Expr> resultExprs, List<String> colLabels) throws AnalysisException {
if (this.parquetSchemas.isEmpty()) {
genParquetSchema(resultExprs);
genParquetSchema(resultExprs, colLabels);
}
// check schema number
@ -265,10 +265,8 @@ public class OutFileClause {
case BIGINT:
case DATE:
case DATETIME:
case DATETIMEV2:
case DATEV2:
if (!PARQUET_DATA_TYPE_MAP.get("int64").equals(type)) {
throw new AnalysisException("project field type is BIGINT/DATE/DATETIME/DATEV2/DATETIMEV2,"
throw new AnalysisException("project field type is BIGINT/DATE/DATETIME,"
+ "should use int64, but the definition type of column " + i + " is " + type);
}
break;
@ -291,9 +289,12 @@ public class OutFileClause {
case DECIMAL64:
case DECIMAL128:
case DECIMALV2:
case DATETIMEV2:
case DATEV2:
if (!PARQUET_DATA_TYPE_MAP.get("byte_array").equals(type)) {
throw new AnalysisException("project field type is CHAR/VARCHAR/STRING/DECIMAL,"
+ " should use byte_array, but the definition type of column " + i + " is " + type);
throw new AnalysisException("project field type is CHAR/VARCHAR/STRING/DECIMAL/DATEV2"
+ "/DATETIMEV2, should use byte_array, but the definition type of column "
+ i + " is " + type);
}
break;
case HLL:
@ -316,12 +317,16 @@ public class OutFileClause {
}
}
private void genParquetSchema(List<Expr> resultExprs) throws AnalysisException {
private void genParquetSchema(List<Expr> resultExprs, List<String> colLabels) throws AnalysisException {
Preconditions.checkState(this.parquetSchemas.isEmpty());
for (int i = 0; i < resultExprs.size(); ++i) {
Expr expr = resultExprs.get(i);
TParquetSchema parquetSchema = new TParquetSchema();
parquetSchema.schema_repetition_type = PARQUET_REPETITION_TYPE_MAP.get("required");
if (resultExprs.get(i).isNullable()) {
parquetSchema.schema_repetition_type = PARQUET_REPETITION_TYPE_MAP.get("optional");
} else {
parquetSchema.schema_repetition_type = PARQUET_REPETITION_TYPE_MAP.get("required");
}
switch (expr.getType().getPrimitiveType()) {
case BOOLEAN:
parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("boolean");
@ -334,8 +339,6 @@ public class OutFileClause {
case BIGINT:
case DATE:
case DATETIME:
case DATETIMEV2:
case DATEV2:
parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("int64");
break;
case FLOAT:
@ -351,6 +354,8 @@ public class OutFileClause {
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
case DATETIMEV2:
case DATEV2:
parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("byte_array");
break;
case HLL:
@ -364,7 +369,7 @@ public class OutFileClause {
throw new AnalysisException("currently parquet do not support column type: "
+ expr.getType().getPrimitiveType());
}
parquetSchema.schema_column_name = "col" + i;
parquetSchema.schema_column_name = colLabels.get(i);
parquetSchemas.add(parquetSchema);
}
}

View File

@ -559,7 +559,7 @@ public class SelectStmt extends QueryStmt {
}
}
if (hasOutFileClause()) {
outFileClause.analyze(analyzer, resultExprs);
outFileClause.analyze(analyzer, resultExprs, colLabels);
}
}

View File

@ -303,7 +303,7 @@ public class SetOperationStmt extends QueryStmt {
baseTblResultExprs = resultExprs;
if (hasOutFileClause()) {
outFileClause.analyze(analyzer, resultExprs);
outFileClause.analyze(analyzer, resultExprs, getColLabels());
}
}

View File

@ -640,11 +640,11 @@ public class SelectStmtTest {
try {
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
Assert.assertEquals(1, stmt.getOutFileClause().getParquetSchemas().size());
Assert.assertEquals(stmt.getOutFileClause().PARQUET_REPETITION_TYPE_MAP.get("required"),
Assert.assertEquals(stmt.getOutFileClause().PARQUET_REPETITION_TYPE_MAP.get("optional"),
stmt.getOutFileClause().getParquetSchemas().get(0).schema_repetition_type);
Assert.assertEquals(stmt.getOutFileClause().PARQUET_DATA_TYPE_MAP.get("byte_array"),
stmt.getOutFileClause().getParquetSchemas().get(0).schema_data_type);
Assert.assertEquals("col0", stmt.getOutFileClause().getParquetSchemas().get(0).schema_column_name);
Assert.assertEquals("k1", stmt.getOutFileClause().getParquetSchemas().get(0).schema_column_name);
} catch (Exception e) {
Assert.fail(e.getMessage());
}

View File

@ -0,0 +1,25 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --
1 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 1 1 true 1 1 1 1.1 1.1 char1 1
2 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 2 2 true 2 2 2 2.2 2.2 char2 2
3 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 3 3 true 3 3 3 3.3 3.3 char3 3
4 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 4 4 true 4 4 4 4.4 4.4 char4 4
5 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 5 5 true 5 5 5 5.5 5.5 char5 5
6 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 6 6 true 6 6 6 6.6 6.6 char6 6
7 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 7 7 true 7 7 7 7.7 7.7 char7 7
8 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 8 8 true 8 8 8 8.8 8.8 char8 8
9 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 9 9 true 9 9 9 9.9 9.9 char9 9
10 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 \N \N \N \N \N \N \N \N \N \N \N
-- !select_default --
1 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 1 1 true 1 1 1 1.1 1.1 char1 1
2 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 2 2 true 2 2 2 2.2 2.2 char2 2
3 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 3 3 true 3 3 3 3.3 3.3 char3 3
4 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 4 4 true 4 4 4 4.4 4.4 char4 4
5 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 5 5 true 5 5 5 5.5 5.5 char5 5
6 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 6 6 true 6 6 6 6.6 6.6 char6 6
7 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 7 7 true 7 7 7 7.7 7.7 char7 7
8 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 8 8 true 8 8 8 8.8 8.8 char8 8
9 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 Beijing 9 9 true 9 9 9 9.9 9.9 char9 9
10 2017-10-01 2017-10-01T00:00 2017-10-01 2017-10-01T00:00 2017-10-01T00:00:00.111 2017-10-01T00:00:00.111111 \N \N \N \N \N \N \N \N \N \N \N

View File

@ -0,0 +1,158 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Paths
suite("test_outfile_parquet") {
def dbName = "test_query_db"
sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
sql "USE $dbName"
StringBuilder strBuilder = new StringBuilder()
strBuilder.append("curl --location-trusted -u " + context.config.jdbcUser + ":" + context.config.jdbcPassword)
strBuilder.append(" http://" + context.config.feHttpAddress + "/rest/v1/config/fe")
String command = strBuilder.toString()
def process = command.toString().execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
def out = process.getText()
logger.info("Request FE Config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def response = parseJson(out.trim())
assertEquals(response.code, 0)
assertEquals(response.msg, "success")
def configJson = response.data.rows
boolean enableOutfileToLocal = false
for (Object conf: configJson) {
assert conf instanceof Map
if (((Map<String, String>) conf).get("Name").toLowerCase() == "enable_outfile_to_local") {
enableOutfileToLocal = ((Map<String, String>) conf).get("Value").toLowerCase() == "true"
}
}
if (!enableOutfileToLocal) {
logger.warn("Please set enable_outfile_to_local to true to run test_outfile")
return
}
def tableName = "outfile_parquet_test"
def tableName2 = "outfile_parquet_test2"
def outFilePath = """${context.file.parent}/tmp"""
try {
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`user_id` INT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
`date_1` DATEV2 NOT NULL COMMENT "",
`datetime_1` DATETIMEV2 NOT NULL COMMENT "",
`datetime_2` DATETIMEV2(3) NOT NULL COMMENT "",
`datetime_3` DATETIMEV2(6) NOT NULL COMMENT "",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`bool_col` boolean COMMENT "",
`int_col` int COMMENT "",
`bigint_col` bigint COMMENT "",
`largeint_col` int COMMENT "",
`float_col` float COMMENT "",
`double_col` double COMMENT "",
`char_col` CHAR(10) COMMENT "",
`decimal_col` decimal COMMENT ""
)
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""
StringBuilder sb = new StringBuilder()
int i = 1
for (; i < 10; i ++) {
sb.append("""
(${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', 'Beijing', ${i}, ${i % 128}, true, ${i}, ${i}, ${i}, ${i}.${i}, ${i}.${i}, 'char${i}', ${i}),
""")
}
sb.append("""
(${i}, '2017-10-01', '2017-10-01 00:00:00', '2017-10-01', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', '2017-10-01 00:00:00.111111', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
""")
sql """ INSERT INTO ${tableName} VALUES
${sb.toString()}
"""
qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id; """
// check outfile
File path = new File(outFilePath)
if (!path.exists()) {
assert path.mkdirs()
} else {
throw new IllegalStateException("""${outFilePath} already exists! """)
}
sql """
SELECT * FROM ${tableName} t ORDER BY user_id INTO OUTFILE "file://${outFilePath}/" FORMAT AS PARQUET;
"""
File[] files = path.listFiles()
assert files.length == 1
sql """ DROP TABLE IF EXISTS ${tableName2} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName2} (
`user_id` INT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datetime` DATETIME NOT NULL COMMENT "数据灌入日期时间",
`date_1` DATEV2 NOT NULL COMMENT "",
`datetime_1` DATETIMEV2 NOT NULL COMMENT "",
`datetime_2` DATETIMEV2(3) NOT NULL COMMENT "",
`datetime_3` DATETIMEV2(6) NOT NULL COMMENT "",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`bool_col` boolean COMMENT "",
`int_col` int COMMENT "",
`bigint_col` bigint COMMENT "",
`largeint_col` int COMMENT "",
`float_col` float COMMENT "",
`double_col` double COMMENT "",
`char_col` CHAR(10) COMMENT "",
`decimal_col` decimal COMMENT ""
)
DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
"""
StringBuilder commandBuilder = new StringBuilder()
commandBuilder.append("""curl -v --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword}""")
commandBuilder.append(""" -H format:parquet -T """ + files[0].getAbsolutePath() + """ http://${context.config.feHttpAddress}/api/""" + dbName + "/" + tableName2 + "/_stream_load")
command = commandBuilder.toString()
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())))
out = process.getText()
logger.info("Run command: command=" + command + ",code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
qt_select_default """ SELECT * FROM ${tableName2} t ORDER BY user_id; """
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
try_sql("DROP TABLE IF EXISTS ${tableName2}")
File path = new File(outFilePath)
if (path.exists()) {
for (File f: path.listFiles()) {
f.delete();
}
path.delete();
}
}
}