branch-2.1: [Fix](field) Fix potential memory leak and wrong binary reading about JsonbField (#50174) (#52693)
pick https://github.com/apache/doris/pull/50174
This commit is contained in:
@ -42,8 +42,8 @@ struct JsonBinaryValue {
|
||||
size_t len = 0;
|
||||
JsonbParser parser;
|
||||
|
||||
JsonBinaryValue() : ptr(nullptr), len(0) {}
|
||||
JsonBinaryValue(char* ptr, int len) {
|
||||
JsonBinaryValue() = default;
|
||||
JsonBinaryValue(char* ptr, size_t len) {
|
||||
static_cast<void>(from_json_string(const_cast<const char*>(ptr), len));
|
||||
}
|
||||
JsonBinaryValue(const std::string& s) {
|
||||
@ -51,11 +51,11 @@ struct JsonBinaryValue {
|
||||
}
|
||||
JsonBinaryValue(const char* ptr, int len) { static_cast<void>(from_json_string(ptr, len)); }
|
||||
|
||||
const char* value() { return ptr; }
|
||||
const char* value() const { return ptr; }
|
||||
|
||||
size_t size() { return len; }
|
||||
size_t size() const { return len; }
|
||||
|
||||
void replace(char* ptr, int len) {
|
||||
void replace(const char* ptr, int len) {
|
||||
this->ptr = ptr;
|
||||
this->len = len;
|
||||
}
|
||||
|
||||
@ -167,6 +167,8 @@ public:
|
||||
check_chars_length(new_size, old_size + 1);
|
||||
|
||||
chars.resize(new_size);
|
||||
DCHECK(s.data != nullptr);
|
||||
DCHECK(chars.data() != nullptr);
|
||||
memcpy(chars.data() + old_size, s.data, size_to_append);
|
||||
offsets.push_back(new_size);
|
||||
sanity_check_simple();
|
||||
|
||||
@ -25,6 +25,8 @@
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
// store and commit data. only after commit the data is effective on its' base(ColumnString)
|
||||
// everytime commit, the _data add one row.
|
||||
class BufferWritable final {
|
||||
public:
|
||||
explicit BufferWritable(ColumnString& vector)
|
||||
@ -63,6 +65,7 @@ private:
|
||||
using VectorBufferWriter = BufferWritable;
|
||||
using BufferWriter = BufferWritable;
|
||||
|
||||
// There is consumption of the buffer in the read method.
|
||||
class BufferReadable {
|
||||
public:
|
||||
explicit BufferReadable(StringRef& ref) : _data(ref.data) {}
|
||||
|
||||
@ -26,14 +26,9 @@
|
||||
#include "vec/io/io_helper.h"
|
||||
#include "vec/io/var_int.h"
|
||||
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
namespace doris::vectorized {
|
||||
class BufferReadable;
|
||||
class BufferWritable;
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
void read_binary(Array& x, BufferReadable& buf) {
|
||||
size_t size;
|
||||
|
||||
@ -156,59 +156,61 @@ DEFINE_FIELD_VECTOR(Map);
|
||||
|
||||
using VariantMap = std::map<PathInData, Field>;
|
||||
|
||||
//TODO: rethink if we really need this? it only save one pointer from std::string
|
||||
// not POD type so could only use read/write_json_binary instead of read/write_binary
|
||||
class JsonbField {
|
||||
public:
|
||||
JsonbField() = default;
|
||||
~JsonbField() = default; // unique_ptr will handle cleanup automatically
|
||||
|
||||
JsonbField(const char* ptr, uint32_t len) : size(len) {
|
||||
data = new char[size];
|
||||
JsonbField(const char* ptr, size_t len) : size(len) {
|
||||
data = std::make_unique<char[]>(size);
|
||||
if (!data) {
|
||||
LOG(FATAL) << "new data buffer failed, size: " << size;
|
||||
}
|
||||
memcpy(data, ptr, size);
|
||||
if (size > 0) {
|
||||
memcpy(data.get(), ptr, size);
|
||||
}
|
||||
}
|
||||
|
||||
JsonbField(const JsonbField& x) : size(x.size) {
|
||||
data = new char[size];
|
||||
data = std::make_unique<char[]>(size);
|
||||
if (!data) {
|
||||
LOG(FATAL) << "new data buffer failed, size: " << size;
|
||||
}
|
||||
memcpy(data, x.data, size);
|
||||
if (size > 0) {
|
||||
memcpy(data.get(), x.data.get(), size);
|
||||
}
|
||||
}
|
||||
|
||||
JsonbField(JsonbField&& x) : data(x.data), size(x.size) {
|
||||
x.data = nullptr;
|
||||
x.size = 0;
|
||||
}
|
||||
JsonbField(JsonbField&& x) noexcept : data(std::move(x.data)), size(x.size) { x.size = 0; }
|
||||
|
||||
// dispatch for all type of storage. so need this. but not really used now.
|
||||
JsonbField& operator=(const JsonbField& x) {
|
||||
data = new char[size];
|
||||
if (!data) {
|
||||
LOG(FATAL) << "new data buffer failed, size: " << size;
|
||||
if (this != &x) {
|
||||
data = std::make_unique<char[]>(x.size);
|
||||
if (!data) {
|
||||
LOG(FATAL) << "new data buffer failed, size: " << x.size;
|
||||
}
|
||||
if (x.size > 0) {
|
||||
memcpy(data.get(), x.data.get(), x.size);
|
||||
}
|
||||
size = x.size;
|
||||
}
|
||||
memcpy(data, x.data, size);
|
||||
return *this;
|
||||
}
|
||||
|
||||
JsonbField& operator=(JsonbField&& x) {
|
||||
if (data) {
|
||||
delete[] data;
|
||||
JsonbField& operator=(JsonbField&& x) noexcept {
|
||||
if (this != &x) {
|
||||
data = std::move(x.data);
|
||||
size = x.size;
|
||||
x.size = 0;
|
||||
}
|
||||
data = x.data;
|
||||
size = x.size;
|
||||
x.data = nullptr;
|
||||
x.size = 0;
|
||||
return *this;
|
||||
}
|
||||
|
||||
~JsonbField() {
|
||||
if (data) {
|
||||
delete[] data;
|
||||
}
|
||||
}
|
||||
|
||||
const char* get_value() const { return data; }
|
||||
uint32_t get_size() const { return size; }
|
||||
const char* get_value() const { return data.get(); }
|
||||
size_t get_size() const { return size; }
|
||||
|
||||
bool operator<(const JsonbField& r) const {
|
||||
LOG(FATAL) << "comparing between JsonbField is not supported";
|
||||
@ -246,8 +248,8 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
char* data = nullptr;
|
||||
uint32_t size = 0;
|
||||
std::unique_ptr<char[]> data = nullptr;
|
||||
size_t size = 0;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
|
||||
@ -18,9 +18,9 @@
|
||||
#pragma once
|
||||
|
||||
#include <gen_cpp/Types_types.h>
|
||||
#include <stddef.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
@ -36,15 +36,11 @@
|
||||
#include "vec/data_types/serde/data_type_serde.h"
|
||||
#include "vec/data_types/serde/data_type_string_serde.h"
|
||||
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
namespace doris::vectorized {
|
||||
class BufferWritable;
|
||||
class IColumn;
|
||||
class ReadBuffer;
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
namespace doris::vectorized {
|
||||
class DataTypeJsonb final : public IDataType {
|
||||
public:
|
||||
using ColumnType = ColumnString;
|
||||
@ -67,10 +63,13 @@ public:
|
||||
|
||||
MutableColumnPtr create_column() const override;
|
||||
|
||||
virtual Field get_default() const override {
|
||||
Field get_default() const override {
|
||||
std::string default_json = "{}";
|
||||
JsonBinaryValue binary_val(default_json.c_str(), default_json.size());
|
||||
return JsonbField(binary_val.value(), binary_val.size());
|
||||
// convert default_json to binary
|
||||
JsonBinaryValue binary_val(default_json.c_str(), static_cast<Int32>(default_json.size()));
|
||||
// Throw exception if default_json.size() is large than INT32_MAX
|
||||
// JsonbField keeps its own memory
|
||||
return JsonbField(binary_val.value(), static_cast<UInt32>(binary_val.size()));
|
||||
}
|
||||
|
||||
Field get_field(const TExprNode& node) const override {
|
||||
@ -99,4 +98,5 @@ public:
|
||||
private:
|
||||
DataTypeString data_type_string;
|
||||
};
|
||||
} // namespace doris::vectorized
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -30,6 +30,7 @@
|
||||
#include "vec/common/string_buffer.hpp"
|
||||
#include "vec/common/string_ref.h"
|
||||
#include "vec/common/uint128.h"
|
||||
#include "vec/core/field.h"
|
||||
#include "vec/core/types.h"
|
||||
#include "vec/io/reader_buffer.h"
|
||||
#include "vec/io/var_int.h"
|
||||
@ -126,7 +127,7 @@ inline void write_string_binary(const char* s, BufferWritable& buf) {
|
||||
write_string_binary(StringRef {std::string(s)}, buf);
|
||||
}
|
||||
|
||||
inline void write_json_binary(JsonbField s, BufferWritable& buf) {
|
||||
inline void write_json_binary(const JsonbField& s, BufferWritable& buf) {
|
||||
write_string_binary(StringRef {s.get_value(), s.get_size()}, buf);
|
||||
}
|
||||
|
||||
@ -200,13 +201,14 @@ inline StringRef read_string_binary_into(Arena& arena, BufferReadable& buf) {
|
||||
char* data = arena.alloc(size);
|
||||
buf.read(data, size);
|
||||
|
||||
return StringRef(data, size);
|
||||
return {data, size};
|
||||
}
|
||||
|
||||
inline void read_json_binary(JsonbField val, BufferReadable& buf,
|
||||
inline void read_json_binary(JsonbField& val, BufferReadable& buf,
|
||||
size_t MAX_JSON_SIZE = DEFAULT_MAX_JSON_SIZE) {
|
||||
StringRef jrf = StringRef {val.get_value(), val.get_size()};
|
||||
read_string_binary(jrf, buf);
|
||||
StringRef result;
|
||||
read_string_binary(result, buf);
|
||||
val = JsonbField(result.data, result.size);
|
||||
}
|
||||
|
||||
template <typename Type>
|
||||
|
||||
@ -93,35 +93,44 @@ inline void read_var_uint(UInt64& x, std::istream& istr) {
|
||||
for (size_t i = 0; i < 9; ++i) {
|
||||
UInt64 byte = istr.get();
|
||||
x |= (byte & 0x7F) << (7 * i);
|
||||
if (!(byte & 0x80)) return;
|
||||
if (!(byte & 0x80)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inline void write_var_uint(UInt64 x, std::ostream& ostr) {
|
||||
for (size_t i = 0; i < 9; ++i) {
|
||||
uint8_t byte = x & 0x7F;
|
||||
if (x > 0x7F) byte |= 0x80;
|
||||
if (x > 0x7F) {
|
||||
byte |= 0x80;
|
||||
}
|
||||
|
||||
ostr.put(byte);
|
||||
|
||||
x >>= 7;
|
||||
if (!x) return;
|
||||
if (!x) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: do real implement in the future
|
||||
inline void read_var_uint(UInt64& x, BufferReadable& buf) {
|
||||
x = 0;
|
||||
// get length from first byte firstly
|
||||
uint8_t len = 0;
|
||||
buf.read((char*)&len, 1);
|
||||
auto ref = buf.read(len);
|
||||
|
||||
// read data and set it to x per byte.
|
||||
char* bytes = const_cast<char*>(ref.data);
|
||||
for (size_t i = 0; i < 9; ++i) {
|
||||
UInt64 byte = bytes[i];
|
||||
x |= (byte & 0x7F) << (7 * i);
|
||||
|
||||
if (!(byte & 0x80)) return;
|
||||
if (!(byte & 0x80)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -130,12 +139,16 @@ inline void write_var_uint(UInt64 x, BufferWritable& ostr) {
|
||||
uint8_t i = 0;
|
||||
while (i < 9) {
|
||||
uint8_t byte = x & 0x7F;
|
||||
if (x > 0x7F) byte |= 0x80;
|
||||
if (x > 0x7F) {
|
||||
byte |= 0x80;
|
||||
}
|
||||
|
||||
bytes[i++] = byte;
|
||||
|
||||
x >>= 7;
|
||||
if (!x) break;
|
||||
if (!x) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
ostr.write((char*)&i, 1);
|
||||
ostr.write(bytes, i);
|
||||
@ -144,13 +157,17 @@ inline void write_var_uint(UInt64 x, BufferWritable& ostr) {
|
||||
inline char* write_var_uint(UInt64 x, char* ostr) {
|
||||
for (size_t i = 0; i < 9; ++i) {
|
||||
uint8_t byte = x & 0x7F;
|
||||
if (x > 0x7F) byte |= 0x80;
|
||||
if (x > 0x7F) {
|
||||
byte |= 0x80;
|
||||
}
|
||||
|
||||
*ostr = byte;
|
||||
++ostr;
|
||||
|
||||
x >>= 7;
|
||||
if (!x) return ostr;
|
||||
if (!x) {
|
||||
return ostr;
|
||||
}
|
||||
}
|
||||
|
||||
return ostr;
|
||||
|
||||
166
be/test/vec/core/field_test.cpp
Normal file
166
be/test/vec/core/field_test.cpp
Normal file
@ -0,0 +1,166 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "vec/core/field.h"
|
||||
|
||||
#include <gtest/gtest-message.h>
|
||||
#include <gtest/gtest-test-part.h>
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "gtest/gtest_pred_impl.h" // IWYU pragma: keep
|
||||
#include "runtime/define_primitive_type.h"
|
||||
#include "vec/columns/column_string.h"
|
||||
#include "vec/common/string_buffer.hpp"
|
||||
#include "vec/common/string_ref.h"
|
||||
#include "vec/core/types.h"
|
||||
#include "vec/io/io_helper.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
TEST(VFieldTest, jsonb_field_unique_ptr) {
|
||||
// Test default constructor
|
||||
JsonbField empty;
|
||||
ASSERT_EQ(empty.get_value(), nullptr);
|
||||
ASSERT_EQ(empty.get_size(), 0);
|
||||
|
||||
// Test constructor with data
|
||||
const char* test_data = R"({ "key": "value" })";
|
||||
size_t test_size = strlen(test_data);
|
||||
JsonbField jf1(test_data, test_size);
|
||||
ASSERT_NE(jf1.get_value(), nullptr);
|
||||
ASSERT_EQ(jf1.get_size(), test_size);
|
||||
ASSERT_EQ(std::string(jf1.get_value(), jf1.get_size()), std::string(test_data));
|
||||
|
||||
// Test copy constructor
|
||||
JsonbField jf2(jf1);
|
||||
ASSERT_NE(jf2.get_value(), nullptr);
|
||||
ASSERT_NE(jf2.get_value(), jf1.get_value()); // Different memory locations
|
||||
ASSERT_EQ(jf2.get_size(), jf1.get_size());
|
||||
ASSERT_EQ(std::string(jf2.get_value(), jf2.get_size()),
|
||||
std::string(jf1.get_value(), jf1.get_size()));
|
||||
|
||||
// Test move constructor
|
||||
JsonbField jf3(std::move(jf2));
|
||||
ASSERT_NE(jf3.get_value(), nullptr);
|
||||
ASSERT_EQ(jf2.get_value(), nullptr); // jf2 should be empty after move
|
||||
ASSERT_EQ(jf2.get_size(), 0); // jf2 size should be 0 after move
|
||||
ASSERT_EQ(jf3.get_size(), test_size);
|
||||
ASSERT_EQ(std::string(jf3.get_value(), jf3.get_size()), std::string(test_data));
|
||||
|
||||
// Test copy assignment
|
||||
JsonbField jf4;
|
||||
jf4 = jf1;
|
||||
ASSERT_NE(jf4.get_value(), nullptr);
|
||||
ASSERT_NE(jf4.get_value(), jf1.get_value()); // Different memory locations
|
||||
ASSERT_EQ(jf4.get_size(), jf1.get_size());
|
||||
ASSERT_EQ(std::string(jf4.get_value(), jf4.get_size()),
|
||||
std::string(jf1.get_value(), jf1.get_size()));
|
||||
|
||||
// Test move assignment
|
||||
JsonbField jf5;
|
||||
jf5 = std::move(jf4);
|
||||
ASSERT_NE(jf5.get_value(), nullptr);
|
||||
ASSERT_EQ(jf4.get_value(), nullptr); // jf4 should be empty after move
|
||||
ASSERT_EQ(jf4.get_size(), 0); // jf4 size should be 0 after move
|
||||
ASSERT_EQ(jf5.get_size(), test_size);
|
||||
ASSERT_EQ(std::string(jf5.get_value(), jf5.get_size()), std::string(test_data));
|
||||
|
||||
// Test JsonbField with Field
|
||||
Field field_jf = jf1;
|
||||
ASSERT_EQ(field_jf.get_type(), Field::Types::JSONB);
|
||||
ASSERT_NE(field_jf.get<JsonbField>().get_value(), nullptr);
|
||||
ASSERT_EQ(field_jf.get<JsonbField>().get_size(), test_size);
|
||||
ASSERT_EQ(std::string(field_jf.get<JsonbField>().get_value(),
|
||||
field_jf.get<JsonbField>().get_size()),
|
||||
std::string(test_data));
|
||||
}
|
||||
|
||||
// Test for JsonbField I/O operations
|
||||
TEST(VFieldTest, jsonb_field_io) {
|
||||
// Prepare a JsonbField
|
||||
const char* test_data = R"({ "key": "value" })";
|
||||
size_t test_size = strlen(test_data);
|
||||
JsonbField original(test_data, test_size);
|
||||
|
||||
// TEST 1: write_json_binary - From JsonbField to buffer
|
||||
// Create a ColumnString to use with BufferWritable
|
||||
ColumnString column_str;
|
||||
|
||||
// Write the JsonbField to the buffer
|
||||
{
|
||||
BufferWritable buf(column_str);
|
||||
write_json_binary(original, buf);
|
||||
buf.commit(); // Important: commit the write operation
|
||||
}
|
||||
|
||||
// Verify data was written
|
||||
ASSERT_GT(column_str.size(), 0);
|
||||
|
||||
// Read the JsonbField back using BufferReadable
|
||||
{
|
||||
// Get the StringRef from ColumnString
|
||||
StringRef str_ref = column_str.get_data_at(0);
|
||||
|
||||
// Create a BufferReadable from StringRef
|
||||
BufferReadable read_buf(str_ref);
|
||||
|
||||
// Read the data back into a new JsonbField
|
||||
JsonbField read_field;
|
||||
read_json_binary(read_field, read_buf);
|
||||
|
||||
// Verify the data
|
||||
ASSERT_NE(read_field.get_value(), nullptr);
|
||||
ASSERT_EQ(read_field.get_size(), original.get_size());
|
||||
ASSERT_EQ(std::string(read_field.get_value(), read_field.get_size()),
|
||||
std::string(original.get_value(), original.get_size()));
|
||||
}
|
||||
|
||||
// Test with JsonbField as a Field and serde it
|
||||
{
|
||||
ColumnString field_column;
|
||||
|
||||
// ser
|
||||
{
|
||||
BufferWritable field_buf(field_column);
|
||||
write_json_binary(original, field_buf);
|
||||
field_buf.commit();
|
||||
}
|
||||
|
||||
// Verify field was written
|
||||
ASSERT_GT(field_column.size(), 0);
|
||||
|
||||
// de
|
||||
{
|
||||
StringRef field_str_ref = field_column.get_data_at(0);
|
||||
BufferReadable read_field_buf(field_str_ref);
|
||||
|
||||
// we can't use read_binary because of the JsonbField is not POD type
|
||||
JsonbField jsonb_from_field;
|
||||
read_json_binary(jsonb_from_field, read_field_buf);
|
||||
Field f2 = jsonb_from_field;
|
||||
|
||||
ASSERT_EQ(f2.get_type(), Field::Types::JSONB);
|
||||
ASSERT_NE(f2.get<JsonbField>().get_value(), nullptr);
|
||||
ASSERT_EQ(
|
||||
std::string(f2.get<JsonbField>().get_value(), f2.get<JsonbField>().get_size()),
|
||||
std::string(test_data));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
Reference in New Issue
Block a user