diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 50ab78eda6..5ff765f137 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -54,7 +54,6 @@ set(RUNTIME_FILES collection_value.cpp tuple.cpp tuple_row.cpp - dpp_writer.cpp qsorter.cpp fragment_mgr.cpp dpp_sink_internal.cpp diff --git a/be/src/runtime/dpp_writer.cpp b/be/src/runtime/dpp_writer.cpp deleted file mode 100644 index 56971c2bf4..0000000000 --- a/be/src/runtime/dpp_writer.cpp +++ /dev/null @@ -1,266 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/dpp_writer.h" - -#include - -#include - -#include "exprs/expr_context.h" -#include "olap/utils.h" -#include "runtime/primitive_type.h" -#include "runtime/row_batch.h" -#include "runtime/tuple_row.h" -#include "util/debug_util.h" -#include "util/types.h" - -namespace doris { - -DppWriter::DppWriter(int32_t schema_hash, const std::vector& output_expr_ctxs, - FileHandler* fp) - : _schema_hash(schema_hash), - _output_expr_ctxs(output_expr_ctxs), - _fp(fp), - _buf(nullptr), - _end(nullptr), - _pos(nullptr), - _write_len(0), - _content_adler32(1) { - _num_null_slots = 0; - for (int i = 0; i < _output_expr_ctxs.size(); ++i) { - if (true == _output_expr_ctxs[i]->is_nullable()) { - _num_null_slots += 1; - } - } - _num_null_bytes = (_num_null_slots + 7) / 8; -} - -DppWriter::~DppWriter() { - if (_buf) { - delete[] _buf; - } -} - -Status DppWriter::open() { - // Write header - _header.mutable_message()->set_schema_hash(_schema_hash); - _header.prepare(_fp); - _content_adler32 = 1; - // seek to size() - _fp->seek(_header.size(), SEEK_SET); - - // new buf - const int k_buf_len = 16 * 1024; - _buf = new char[k_buf_len]; - _pos = _buf; - _end = _buf + k_buf_len; - return Status::OK(); -} - -void DppWriter::reset_buf() { - _pos = _buf; -} - -void DppWriter::append_to_buf(const void* ptr, int len) { - if (_pos + len > _end) { - // enlarge - int cur_len = _pos - _buf; - int old_buf_len = _end - _buf; - int new_len = std::max(2 * old_buf_len, old_buf_len + len); - char* new_buf = new char[new_len]; - memcpy(new_buf, _buf, cur_len); - delete[] _buf; - _buf = new_buf; - _pos = _buf + cur_len; - _end = _buf + new_len; - } - - memcpy(_pos, ptr, len); - _pos += len; -} - -void DppWriter::increase_buf(int len) { - //increase buf to store nullptr bytes - //len is the bytes of nullptr - if (_pos + len > _end) { - int cur_len = _pos - _buf; - int old_buf_len = _end - _buf; - int new_len = std::max(2 * old_buf_len, old_buf_len + len); - char* new_buf = new char[new_len]; - memcpy(new_buf, _buf, cur_len); - delete[] _buf; - _buf = new_buf; - _pos = _buf + cur_len; - _end = _buf + new_len; - } - - memset(_pos, 0, len); - _pos += len; -} - -Status DppWriter::append_one_row(TupleRow* row) { - int num_columns = _output_expr_ctxs.size(); - int off = 0; - int pos = _pos - _buf; - increase_buf(_num_null_bytes); - for (int i = 0; i < num_columns; ++i) { - char* position = _buf + pos; - void* item = _output_expr_ctxs[i]->get_value(row); - // What happened failed??? - if (true == _output_expr_ctxs[i]->is_nullable()) { - int index = off % 8; - if (item == nullptr) { - //store nullptr bytes - position[off / 8] |= 1 << (7 - index); - off += 1; - continue; - } else { - position[off / 8] &= ~(1 << (7 - index)); - off += 1; - } - } - switch (_output_expr_ctxs[i]->root()->type().type) { - case TYPE_TINYINT: - append_to_buf(item, 1); - break; - case TYPE_SMALLINT: - append_to_buf(item, 2); - break; - case TYPE_INT: - append_to_buf(item, 4); - break; - case TYPE_BIGINT: - append_to_buf(item, 8); - break; - case TYPE_LARGEINT: - append_to_buf(item, 16); - break; - case TYPE_FLOAT: - append_to_buf(item, 4); - break; - case TYPE_DOUBLE: - append_to_buf(item, 8); - break; - case TYPE_DATE: { - const DateTimeValue* time_val = (const DateTimeValue*)(item); - uint64_t val = time_val->to_olap_date(); - uint8_t char_val = val & 0xff; - append_to_buf(&char_val, 1); - val >>= 8; - char_val = val & 0xff; - append_to_buf(&char_val, 1); - val >>= 8; - char_val = val & 0xff; - append_to_buf(&char_val, 1); - break; - } - case TYPE_DATETIME: { - const DateTimeValue* time_val = (const DateTimeValue*)(item); - uint64_t val = time_val->to_olap_datetime(); - append_to_buf(&val, 8); - break; - } - case TYPE_VARCHAR: { - case TYPE_HLL: - case TYPE_STRING: - const StringValue* str_val = (const StringValue*)(item); - if (UNLIKELY(str_val->ptr == nullptr && str_val->len != 0)) { - return Status::InternalError("String value ptr is null"); - } - - // write len first - uint16_t len = str_val->len; - if (len != str_val->len) { - return Status::InternalError("length of string is overflow.len={}", str_val->len); - } - append_to_buf(&len, 2); - // passing a nullptr pointer to memcpy may be core/ - if (len == 0) { - break; - } - append_to_buf(str_val->ptr, len); - break; - } - case TYPE_CHAR: { - const StringValue* str_val = (const StringValue*)(item); - if (UNLIKELY(str_val->ptr == nullptr || str_val->len == 0)) { - return Status::InternalError("String value ptr is null"); - } - append_to_buf(str_val->ptr, str_val->len); - break; - } - case TYPE_DECIMALV2: { - const DecimalV2Value decimal_val(reinterpret_cast(item)->value); - int64_t int_val = decimal_val.int_value(); - int32_t frac_val = decimal_val.frac_value(); - append_to_buf(&int_val, sizeof(int_val)); - append_to_buf(&frac_val, sizeof(frac_val)); - break; - } - default: { - std::stringstream ss; - ss << "Unknown column type " << _output_expr_ctxs[i]->root()->type(); - return Status::InternalError(ss.str()); - } - } - } - - return Status::OK(); -} - -Status DppWriter::add_batch(RowBatch* batch) { - int num_rows = batch->num_rows(); - if (num_rows <= 0) { - return Status::OK(); - } - - Status status; - for (int i = 0; i < num_rows; ++i) { - reset_buf(); - TupleRow* row = batch->get_row(i); - status = append_one_row(row); - if (!status.ok()) { - LOG(WARNING) << "convert row to dpp output failed. reason: " << status.get_error_msg(); - - return status; - } - int len = _pos - _buf; - Status olap_status = _fp->write(_buf, len); - if (!olap_status.ok()) { - return Status::InternalError("write to file failed."); - } - _content_adler32 = olap_adler32(_content_adler32, _buf, len); - _write_len += len; - } - - return status; -} - -Status DppWriter::write_header() { - _header.set_file_length(_header.size() + _write_len); - _header.set_checksum(_content_adler32); - _header.serialize(_fp); - return Status::OK(); -} - -Status DppWriter::close() { - // Write header - return write_header(); -} - -} // namespace doris diff --git a/be/src/runtime/dpp_writer.h b/be/src/runtime/dpp_writer.h deleted file mode 100644 index cdd0feb64d..0000000000 --- a/be/src/runtime/dpp_writer.h +++ /dev/null @@ -1,73 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "common/status.h" -#include "gen_cpp/olap_file.pb.h" -#include "olap/file_helper.h" - -namespace doris { - -class ExprContext; -class TupleRow; -class RowBatch; - -// Class used to convert to DPP output -// this is used for don't change code in storage. -class DppWriter { -public: - DppWriter(int32_t schema_hash, const std::vector& output_expr, FileHandler* fp); - - ~DppWriter(); - - // Some prepare work complete here. - Status open(); - - // Add one batch to this writer - Status add_batch(RowBatch* row_batch); - - // Close this writer - // Write needed header to this file. - Status close(); - -private: - Status write_header(); - void reset_buf(); - void append_to_buf(const void* ptr, int len); - void increase_buf(int len); - Status append_one_row(TupleRow* row); - - // pass by client, data write to this file - // owned by client, doesn't care about this - int32_t _schema_hash; - int _num_null_slots; - int _num_null_bytes; - const std::vector& _output_expr_ctxs; - FileHandler* _fp; - - char* _buf; - char* _end; - char* _pos; - - int64_t _write_len; - uint32_t _content_adler32; - - FileHeader _header; -}; - -} // namespace doris diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 14bb5ecb5e..19bc56823a 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -287,21 +287,6 @@ message TabletMetaPB { optional bool enable_unique_key_merge_on_write = 24 [default = false]; } -message OLAPIndexHeaderMessage { - required int32 start_version = 1; - required int32 end_version = 2; - required int64 cumulative_version_hash = 3; - - required uint32 segment = 4; - required uint32 num_rows_per_block = 5; - optional bool null_supported = 6; - optional bool delete_flag = 7; -} - -message OLAPDataHeaderMessage { - required uint32 segment = 2; -} - message OLAPRawDeltaHeaderMessage { required int32 schema_hash = 2; }