[Memory Engine] add core column related classes (#3508)

add core column related classes
This commit is contained in:
Binglin Chang
2020-05-13 16:30:32 +08:00
committed by GitHub
parent 54e38ecda2
commit a7cfafe076
22 changed files with 1137 additions and 11 deletions

View File

@ -22,6 +22,12 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/olap/memory")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/olap/memory")
add_library(Memory STATIC
buffer.cpp
column.cpp
column_block.cpp
column_delta.cpp
delta_index.cpp
hash_index.cpp
mem_tablet.cpp
schema.cpp
)

View File

@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/memory/buffer.h"
namespace doris {
namespace memory {
Status Buffer::alloc(size_t bsize) {
if (bsize > 0) {
uint8_t* data =
reinterpret_cast<uint8_t*>(aligned_malloc(bsize, bsize >= 4096 ? 4096 : 64));
if (!data) {
return Status::MemoryAllocFailed(StringPrintf("alloc buffer size=%zu failed", bsize));
}
_data = data;
_bsize = bsize;
}
return Status::OK();
}
void Buffer::clear() {
if (_data) {
free(_data);
_data = nullptr;
_bsize = 0;
}
}
void Buffer::set_zero() {
if (_data) {
memset(_data, 0, _bsize);
}
}
Buffer::~Buffer() {
clear();
}
} // namespace memory
} // namespace doris

View File

@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "olap/memory/common.h"
namespace doris {
namespace memory {
// A generic buffer holding column base and delta data
// It can be considered as an array of any primitive type, but it does not
// have compile time type information, user can use utility method as<T> to
// get typed array view.
class Buffer {
public:
Buffer() = default;
~Buffer();
// allocate memory for this buffer, with buffer byte size of bsize
Status alloc(size_t bsize);
// clear buffer, free memory
void clear();
// set all memory content to zero
void set_zero();
// return true if this buffer is not empty
operator bool() const { return _data != nullptr; }
// returns a direct pointer to the memory array
const uint8_t* data() const { return _data; }
// returns a direct pointer to the memory array
uint8_t* data() { return _data; }
// get byte size of the buffer
size_t bsize() const { return _bsize; }
// get typed array view
template <class T>
T* as() {
return reinterpret_cast<T*>(_data);
}
// get typed array view
template <class T>
const T* as() const {
return reinterpret_cast<const T*>(_data);
}
private:
size_t _bsize = 0;
uint8_t* _data = nullptr;
DISALLOW_COPY_AND_ASSIGN(Buffer);
};
} // namespace memory
} // namespace doris

View File

@ -0,0 +1,124 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/memory/column.h"
namespace doris {
namespace memory {
Column::Column(const ColumnSchema& cs, ColumnType storage_type, uint64_t version)
: _cs(cs), _storage_type(storage_type), _base_idx(0) {
_base.reserve(BASE_CAPACITY_MIN_STEP_SIZE);
_versions.reserve(VERSION_CAPACITY_STEP_SIZE);
_versions.emplace_back(version);
}
Column::Column(const Column& rhs, size_t new_base_capacity, size_t new_version_capacity)
: _cs(rhs._cs), _storage_type(rhs._storage_type), _base_idx(rhs._base_idx) {
_base.reserve(std::max(new_base_capacity, rhs._base.capacity()));
_base.resize(rhs._base.size());
for (size_t i = 0; i < _base.size(); i++) {
_base[i] = rhs._base[i];
}
_versions.reserve(std::max(new_version_capacity, rhs._versions.capacity()));
_versions.resize(rhs._versions.size());
for (size_t i = 0; i < _versions.size(); i++) {
_versions[i] = rhs._versions[i];
}
}
size_t Column::memory() const {
size_t bs = _base.size();
size_t ds = _versions.size();
size_t base_memory = 0;
for (size_t i = 0; i < bs; i++) {
base_memory += _base[i]->memory();
}
size_t delta_memory = 0;
for (size_t i = 0; i < ds; i++) {
if (_versions[i].delta) {
delta_memory += _versions[i].delta->memory();
}
}
return base_memory + delta_memory;
}
string Column::debug_string() const {
return StringPrintf("Column(%s version=%zu)", _cs.debug_string().c_str(),
_versions.back().version);
}
Status Column::capture_version(uint64_t version, vector<ColumnDelta*>* deltas,
uint64_t* real_version) const {
uint64_t base_version = _versions[_base_idx].version;
*real_version = base_version;
if (version < base_version) {
uint64_t oldest = _versions[0].version;
if (version < oldest) {
return Status::NotFound(
StringPrintf("version %zu(oldest=%zu) deleted", version, oldest));
}
DCHECK_GT(_base_idx, 0);
for (ssize_t i = static_cast<ssize_t>(_base_idx) - 1; i >= 0; i--) {
uint64_t v = _versions[i].version;
if (v >= version) {
DCHECK(_versions[i].delta);
*real_version = v;
deltas->emplace_back(_versions[i].delta.get());
if (v == version) {
break;
}
} else {
break;
}
}
} else if (version > base_version) {
size_t vsize = _versions.size();
for (size_t i = _base_idx + 1; i < vsize; i++) {
uint64_t v = _versions[i].version;
if (v <= version) {
DCHECK(_versions[i].delta);
*real_version = v;
deltas->emplace_back(_versions[i].delta.get());
if (v == version) {
break;
}
} else {
break;
}
}
}
return Status::OK();
}
void Column::capture_latest(vector<ColumnDelta*>* deltas) const {
deltas->reserve(_versions.size() - _base_idx - 1);
for (size_t i = _base_idx + 1; i < _versions.size(); i++) {
deltas->emplace_back(_versions[i].delta.get());
}
}
Status Column::read(uint64_t version, std::unique_ptr<ColumnReader>* reader) {
return Status::NotSupported("not supported");
}
Status Column::write(std::unique_ptr<ColumnWriter>* writer) {
return Status::NotSupported("not supported");
}
} // namespace memory
} // namespace doris

107
be/src/olap/memory/column.h Normal file
View File

@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "olap/memory/column_block.h"
#include "olap/memory/column_delta.h"
#include "olap/memory/common.h"
#include "olap/memory/schema.h"
namespace doris {
namespace memory {
class ColumnReader;
class ColumnWriter;
// Column store all the data of a column, including base and deltas.
// It supports single-writer multi-reader concurrency.
// It's properties are all immutable except _base and _versions.
// _base and _versions use std::vector, which is basically thread-safe
// in-practice for single-writer/multi-reader access, if there isn't
// any over-capacity realloc or delta compaction/GC caused data change.
// When these situations occur, we do a copy-on-write.
//
// TODO: add column read&writer
class Column : public RefCountedThreadSafe<Column> {
public:
static const uint32_t BLOCK_SIZE = 1 << 16;
static const uint32_t BLOCK_MASK = 0xffff;
// base vector capacity min grow step size
static const uint32_t BASE_CAPACITY_MIN_STEP_SIZE = 8;
// base vector capacity max grow step size
static const uint32_t BASE_CAPACITY_MAX_STEP_SIZE = 8;
// version vector capacity grow step size
static const uint32_t VERSION_CAPACITY_STEP_SIZE = 8;
// create a Column which provided column schema, underlying storage_type and initial version
Column(const ColumnSchema& cs, ColumnType storage_type, uint64_t version);
// copy-on-write a new Column with new capacity
Column(const Column& rhs, size_t new_base_capacity, size_t new_version_capacity);
// get column schema
const ColumnSchema& schema() { return _cs; }
// get memory usage in bytes
size_t memory() const;
string debug_string() const;
// read this Column at a specific version, get a reader for this Column
// support multiple concurrent readers
Status read(uint64_t version, std::unique_ptr<ColumnReader>* reader);
// write this Column, get a writer for this Column
// caller needs to make sure there is only one or no writer exists at any time
Status write(std::unique_ptr<ColumnWriter>* writer);
private:
ColumnSchema _cs;
// For some types the storage_type may be different from actual type from schema.
// For example, string stored in dictionary, so column_block store a integer id,
// and the storage type may change as the dictionary grows, e.g. from uint8 to uint16
ColumnType _storage_type;
// base's position at _versions vector
size_t _base_idx;
// base data, a vector of ColumnBlocks
vector<scoped_refptr<ColumnBlock>> _base;
struct VersionInfo {
VersionInfo() = default;
explicit VersionInfo(uint64_t version) : version(version) {}
uint64_t version = 0;
// null if it's base
scoped_refptr<ColumnDelta> delta;
};
// version vector
vector<VersionInfo> _versions;
// get related deltas of a specified version, and it's corresponding real_version
// For example:
// if we have [1,3,5,7,13,16,20,30] in versions array, and base is at version 13
// capture version 24 will get deltas=[13, 16, 20], and real_version 20
Status capture_version(uint64_t version, vector<ColumnDelta*>* deltas,
uint64_t* real_version) const;
// get latest version's related delta
void capture_latest(vector<ColumnDelta*>* deltas) const;
DISALLOW_COPY_AND_ASSIGN(Column);
};
} // namespace memory
} // namespace doris

View File

@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/memory/column_block.h"
namespace doris {
namespace memory {
size_t ColumnBlock::memory() const {
return _data.bsize() + _nulls.bsize();
}
Status ColumnBlock::alloc(size_t size, size_t esize) {
if (_data || _nulls) {
LOG(FATAL) << "reinit column page";
}
RETURN_IF_ERROR(_data.alloc(size * esize));
_data.set_zero();
_size = size;
return Status::OK();
}
Status ColumnBlock::set_null(uint32_t idx) {
if (!_nulls) {
RETURN_IF_ERROR(_nulls.alloc(_size));
_nulls.set_zero();
}
_nulls.as<bool>()[idx] = true;
return Status::OK();
}
Status ColumnBlock::set_not_null(uint32_t idx) {
if (_nulls) {
_nulls.as<bool>()[idx] = false;
}
return Status::OK();
}
Status ColumnBlock::copy_to(ColumnBlock* dest, size_t size, size_t esize) {
if (size > dest->size()) {
return Status::InvalidArgument("ColumnBlock copy to a smaller ColumnBlock");
}
if (dest->nulls()) {
if (nulls()) {
memcpy(dest->nulls().data(), nulls().data(), size);
} else {
memset(dest->nulls().data(), 0, size);
}
} else if (nulls()) {
RETURN_IF_ERROR(dest->nulls().alloc(dest->size()));
memcpy(dest->nulls().data(), nulls().data(), size);
}
return Status::OK();
}
} // namespace memory
} // namespace doris

View File

@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "olap/memory/buffer.h"
#include "olap/memory/common.h"
namespace doris {
namespace memory {
// ColumnBlock stores one block of data for a Column
class ColumnBlock : public RefCountedThreadSafe<ColumnBlock> {
public:
ColumnBlock() = default;
size_t memory() const;
size_t size() const { return _size; }
Buffer& data() { return _data; }
Buffer& nulls() { return _nulls; }
// Allocate memory for this block, with space for size elements and each
// element have esize byte size
Status alloc(size_t size, size_t esize);
bool is_null(uint32_t idx) const { return _nulls && _nulls.as<bool>()[idx]; }
Status set_null(uint32_t idx);
Status set_not_null(uint32_t idx);
// Copy the first size elements to dest ColumnBlock, each element has
// esize byte size
Status copy_to(ColumnBlock* dest, size_t size, size_t esize);
private:
size_t _size = 0;
Buffer _nulls;
Buffer _data;
DISALLOW_COPY_AND_ASSIGN(ColumnBlock);
};
} // namespace memory
} // namespace doris

View File

@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/memory/column_delta.h"
namespace doris {
namespace memory {
size_t ColumnDelta::memory() const {
return _index->memory() + _nulls.bsize() + _data.bsize();
}
Status ColumnDelta::alloc(size_t nblock, size_t size, size_t esize, bool has_null) {
if (_data || _nulls) {
LOG(FATAL) << "reinit column delta";
}
_index.reset(new DeltaIndex());
_index->block_ends().resize(nblock, 0);
Status ret = _index->data().alloc(size * sizeof(uint16_t));
if (!ret.ok()) {
return ret;
}
ret = _data.alloc(size * esize);
if (!ret.ok()) {
return ret;
}
if (has_null) {
ret = _nulls.alloc(size);
if (!ret.ok()) {
_data.clear();
return Status::MemoryAllocFailed("init column delta nulls");
}
_nulls.set_zero();
}
_data.set_zero();
_size = size;
return Status::OK();
}
} // namespace memory
} // namespace doris

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "olap/memory/common.h"
#include "olap/memory/delta_index.h"
namespace doris {
namespace memory {
// ColumnDelta store a column's updates of a commit(version)
class ColumnDelta : public RefCountedThreadSafe<ColumnDelta> {
public:
ColumnDelta() = default;
size_t memory() const;
size_t size() const { return _size; }
Buffer& nulls() { return _nulls; }
Buffer& data() { return _data; }
DeltaIndex* index() { return _index.get(); }
bool contains_block(uint32_t bid) const { return _index->contains_block(bid); }
uint32_t find_idx(uint32_t rid) { return _index->find_idx(rid); }
Status alloc(size_t nblock, size_t size, size_t esize, bool has_null);
private:
size_t _size = 0;
scoped_refptr<DeltaIndex> _index;
Buffer _nulls;
Buffer _data;
DISALLOW_COPY_AND_ASSIGN(ColumnDelta);
};
} // namespace memory
} // namespace doris

View File

@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include "common/logging.h"
#include "common/status.h"
#include "gutil/ref_counted.h"
#include "gutil/stringprintf.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/types.h"
namespace doris {
namespace memory {
template <class T, class ST>
inline T padding(T v, ST pad) {
return (v + pad - 1) / pad * pad;
}
template <class T, class ST>
inline size_t num_block(T v, ST bs) {
return (v + bs - 1) / bs;
}
} // namespace memory
} // namespace doris

View File

@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/memory/delta_index.h"
namespace doris {
namespace memory {
size_t DeltaIndex::memory() const {
return _data.bsize() + _block_ends.size() * sizeof(uint32_t);
}
uint32_t DeltaIndex::find_idx(uint32_t rid) {
if (!_data) {
return npos;
}
uint32_t bid = rid >> 16;
if (bid >= _block_ends.size()) {
return npos;
}
// TODO: use SIMD
uint32_t start = bid > 0 ? _block_ends[bid - 1] : 0;
uint32_t end = _block_ends[bid];
if (start == end) {
return npos;
}
uint16_t* astart = _data.as<uint16_t>() + start;
uint16_t* aend = _data.as<uint16_t>() + end;
uint32_t bidx = rid & 0xffff;
uint16_t* pos = std::lower_bound(astart, aend, bidx);
if ((pos != aend) && (*pos == bidx)) {
return pos - _data.as<uint16_t>();
} else {
return npos;
}
}
} // namespace memory
} // namespace doris

View File

@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <vector>
#include "olap/memory/buffer.h"
#include "olap/memory/common.h"
namespace doris {
namespace memory {
// DeltaIndex store all the updated rows' id(rowids) for a ColumnDelta.
// Rowids are sorted and divided into blocks, each 64K rowid space is a
// block. Since each block only have 64K id space, it can be store as uint16_t
// rather than uint32_t to save memory.
class DeltaIndex : public RefCountedThreadSafe<DeltaIndex> {
public:
static const uint32_t npos = 0xffffffffu;
DeltaIndex() = default;
// get memory consumption
size_t memory() const;
// find rowid(rid) in the index,
// return index position if found, else return npos
uint32_t find_idx(uint32_t rid);
// get a block's index position range as [start, end)
void block_range(uint32_t bid, uint32_t* start, uint32_t* end) const {
if (bid < _block_ends.size()) {
*start = bid > 0 ? _block_ends[bid - 1] : 0;
*end = _block_ends[bid];
} else {
*start = 0;
*end = 0;
}
}
// Return true if this index has any rowid belonging to this block
bool contains_block(uint32_t bid) const {
if (bid < _block_ends.size()) {
return (bid > 0 ? _block_ends[bid - 1] : 0) < _block_ends[bid];
}
return false;
}
Buffer& data() { return _data; }
const Buffer& data() const { return _data; }
vector<uint32_t>& block_ends() { return _block_ends; }
const vector<uint32_t>& block_ends() const { return _block_ends; }
private:
vector<uint32_t> _block_ends;
Buffer _data;
DISALLOW_COPY_AND_ASSIGN(DeltaIndex);
};
} // namespace memory
} // namespace doris

View File

@ -28,6 +28,7 @@
#include "gutil/stringprintf.h"
namespace doris {
namespace memory {
struct alignas(64) HashChunk {
static const uint32_t CAPACITY = 12;
@ -166,4 +167,5 @@ const std::string HashIndex::dump() const {
size() / (_num_chunks * 12.0f));
}
} // namespace memory
} // namespace doris

View File

@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_BE_SRC_OLAP_MEMORY_HASH_INDEX_H_
#define DORIS_BE_SRC_OLAP_MEMORY_HASH_INDEX_H_
#pragma once
#include <stdint.h>
@ -27,6 +26,7 @@
#include "gutil/ref_counted.h"
namespace doris {
namespace memory {
struct HashChunk;
@ -103,6 +103,5 @@ private:
HashChunk* _chunks;
};
} /* namespace doris */
#endif /* DORIS_BE_SRC_OLAP_MEMORY_HASH_INDEX_H_ */
} // namespace memory
} // namespace doris

View File

@ -18,5 +18,12 @@
#include "olap/memory/mem_tablet.h"
namespace doris {
namespace memory {
} /* namespace doris */
MemTablet::MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
: BaseTablet(tablet_meta, data_dir) {}
MemTablet::~MemTablet() {}
} // namespace memory
} // namespace doris

View File

@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_BE_SRC_MEMORY_MEM_TABLET_H_
#define DORIS_BE_SRC_MEMORY_MEM_TABLET_H_
#pragma once
#include "olap/base_tablet.h"
namespace doris {
namespace memory {
// Tablet class for memory-optimized storage engine.
//
@ -29,11 +29,13 @@ namespace doris {
//
// TODO: This is just a skeleton, will add implementation in the future.
class MemTablet : public BaseTablet {
public:
MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir);
virtual ~MemTablet();
private:
DISALLOW_COPY_AND_ASSIGN(MemTablet);
};
} /* namespace doris */
#endif /* DORIS_BE_SRC_MEMORY_MEM_TABLET_H_ */
} // namespace memory
} // namespace doris

View File

@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/memory/schema.h"
namespace doris {
namespace memory {
ColumnSchema::ColumnSchema(const TabletColumn& tcolumn) : _tcolumn(tcolumn) {}
ColumnSchema::ColumnSchema(uint32_t cid, const string& name, ColumnType type, bool nullable,
bool is_key) {
ColumnPB cpb;
cpb.set_unique_id(cid);
cpb.set_name(name);
cpb.set_type(TabletColumn::get_string_by_field_type(type));
cpb.set_is_nullable(nullable);
cpb.set_is_key(is_key);
_tcolumn.init_from_pb(cpb);
}
std::string ColumnSchema::type_name() const {
return TabletColumn::get_string_by_field_type(_tcolumn.type());
}
std::string ColumnSchema::debug_string() const {
return StringPrintf("cid=%d %s %s%s%s", cid(), name().c_str(), type_name().c_str(),
is_nullable() ? " nullable" : "", is_key() ? " key" : "");
}
//////////////////////////////////////////////////////////////////////////////
Schema::Schema(const TabletSchema& tschema) : _tschema(tschema) {
_cid_size = 1;
_cid_to_col.resize(_cid_size, nullptr);
for (size_t i = 0; i < num_columns(); i++) {
const ColumnSchema* cs = get(i);
_cid_size = std::max(_cid_size, cs->cid() + 1);
_cid_to_col.resize(_cid_size, nullptr);
_cid_to_col[cs->cid()] = cs;
_name_to_col[cs->name()] = cs;
}
}
std::string Schema::debug_string() const {
std::string ret("(");
for (size_t i = 0; i < num_columns(); i++) {
const ColumnSchema* cs = get(i);
if (i > 0) {
ret.append(", ");
}
ret.append(cs->debug_string());
}
ret.append(")");
return ret;
}
uint32_t Schema::cid_size() const {
return _cid_size;
}
const ColumnSchema* Schema::get(size_t idx) const {
if (idx < num_columns()) {
// TODO: this is a hack, improve this in the future
return reinterpret_cast<const ColumnSchema*>(&_tschema.columns()[idx]);
}
return nullptr;
}
const ColumnSchema* Schema::get_by_name(const string& name) const {
auto itr = _name_to_col.find(name);
if (itr == _name_to_col.end()) {
return nullptr;
} else {
return itr->second;
}
}
const ColumnSchema* Schema::get_by_cid(uint32_t cid) const {
if (cid < _cid_to_col.size()) {
return _cid_to_col[cid];
} else {
return nullptr;
}
}
} // namespace memory
} // namespace doris

View File

@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "olap/memory/common.h"
#include "olap/tablet_schema.h"
namespace doris {
namespace memory {
// This file contains type and schema adaptors
// from olap's type and schema to memory engine's type and schema
// Memory engine's column type, just use FieldType for now
typedef FieldType ColumnType;
// Memory engine's column schema, simple wrapper of TabletColumn.
// TODO: Add more properties and methods later
class ColumnSchema {
public:
explicit ColumnSchema(const TabletColumn& tcolumn);
ColumnSchema(uint32_t cid, const string& name, ColumnType type, bool nullable, bool is_key);
inline uint32_t cid() const { return static_cast<uint32_t>(_tcolumn.unique_id()); }
inline std::string name() const { return _tcolumn.name(); }
inline ColumnType type() const { return _tcolumn.type(); }
inline bool is_nullable() const { return _tcolumn.is_nullable(); }
inline bool is_key() const { return _tcolumn.is_key(); }
std::string type_name() const;
std::string debug_string() const;
private:
TabletColumn _tcolumn;
};
// Memory engine's tablet schema, simple wrapper of TabletSchema.
// Schema have some differences comparing to original TabletSchema:
// 1. there is a hidden delete_flag column (with special cid=0) to mark
// deleted rows
// 2. in the future, there may be a special compound primary key column
// if primary-key has multiple columns
// TODO: Add more properties and methods later
class Schema {
public:
explicit Schema(const TabletSchema& tschema);
std::string debug_string() const;
inline size_t num_columns() const { return _tschema.num_columns(); }
inline size_t num_key_columns() const { return _tschema.num_key_columns(); }
const ColumnSchema* get(size_t idx) const;
const ColumnSchema* get_by_name(const string& name) const;
uint32_t cid_size() const;
const ColumnSchema* get_by_cid(uint32_t cid) const;
private:
TabletSchema _tschema;
uint32_t _cid_size;
std::unordered_map<string, const ColumnSchema*> _name_to_col;
vector<const ColumnSchema*> _cid_to_col;
};
} // namespace memory
} // namespace doris

View File

@ -83,3 +83,5 @@ ADD_BE_TEST(selection_vector_test)
ADD_BE_TEST(options_test)
ADD_BE_TEST(fs/file_block_manager_test)
ADD_BE_TEST(memory/hash_index_test)
ADD_BE_TEST(memory/column_delta_test)
ADD_BE_TEST(memory/schema_test)

View File

@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/memory/column_delta.h"
#include <gtest/gtest.h>
#include <map>
#include <vector>
#include "olap/memory/column.h"
namespace doris {
namespace memory {
TEST(ColumnDelta, Index) {
const int BaseSize = 256001;
const int NumUpdate = 10000;
srand(1);
scoped_refptr<ColumnDelta> delta(new ColumnDelta());
std::map<uint32_t, uint32_t> updates;
for (int i = 0; i < NumUpdate; i++) {
uint32_t idx = rand() % BaseSize;
updates[idx] = rand();
}
size_t nblock = num_block(BaseSize, Column::BLOCK_SIZE);
ASSERT_TRUE(delta->alloc(nblock, updates.size(), sizeof(uint32_t), false).ok());
DeltaIndex* index = delta->index();
vector<uint32_t>& block_ends = index->block_ends();
Buffer& idxdata = index->_data;
Buffer& data = delta->data();
uint32_t cidx = 0;
uint32_t curbid = 0;
for (auto& e : updates) {
uint32_t rid = e.first;
uint32_t bid = rid >> 16;
while (curbid < bid) {
block_ends[curbid] = cidx;
curbid++;
}
idxdata.as<uint16_t>()[cidx] = rid & 0xffff;
data.as<uint32_t>()[cidx] = e.second;
cidx++;
}
while (curbid < nblock) {
block_ends[curbid] = cidx;
curbid++;
}
for (int i = 0; i < BaseSize; i++) {
uint32_t idx = delta->find_idx(i);
auto itr = updates.find(i);
if (itr == updates.end()) {
EXPECT_TRUE(idx == DeltaIndex::npos);
} else {
uint32_t v = delta->data().as<uint32_t>()[idx];
EXPECT_EQ(v, itr->second);
}
}
}
} // namespace memory
} // namespace doris
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -24,6 +24,7 @@
#include "gutil/hash/builtin_type_hash.h"
namespace doris {
namespace memory {
inline uint64_t HashCode(size_t v) {
return Hash64NumWithSeed(v, 0);
@ -103,6 +104,7 @@ TEST(HashIndex, add) {
LOG(INFO) << hi.dump();
}
} // namespace memory
} // namespace doris
int main(int argc, char** argv) {

View File

@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/memory/schema.h"
#include <gtest/gtest.h>
#include <vector>
namespace doris {
namespace memory {
TEST(ColumnSchema, create) {
ColumnSchema cs(1, "uid", ColumnType::OLAP_FIELD_TYPE_TINYINT, false, true);
EXPECT_EQ(1, cs.cid());
EXPECT_EQ(std::string("uid"), cs.name());
EXPECT_FALSE(cs.is_nullable());
EXPECT_TRUE(cs.is_key());
}
TEST(Schema, create) {
TabletSchemaPB tspb;
auto cpb = tspb.add_column();
cpb->set_unique_id(1);
cpb->set_name("uid");
cpb->set_type(TabletColumn::get_string_by_field_type(FieldType::OLAP_FIELD_TYPE_INT));
cpb->set_is_nullable(false);
cpb->set_is_key(true);
auto cpb2 = tspb.add_column();
cpb2->set_unique_id(2);
cpb2->set_type(TabletColumn::get_string_by_field_type(FieldType::OLAP_FIELD_TYPE_INT));
cpb2->set_name("city");
cpb2->set_is_nullable(true);
cpb2->set_is_key(false);
tspb.set_keys_type(KeysType::UNIQUE_KEYS);
tspb.set_next_column_unique_id(3);
tspb.set_num_short_key_columns(1);
tspb.set_is_in_memory(false);
TabletSchema ts;
ts.init_from_pb(tspb);
Schema schema(ts);
EXPECT_EQ(schema.cid_size(), 3);
EXPECT_EQ(schema.get_by_name("uid")->name(), std::string("uid"));
EXPECT_EQ(schema.get_by_cid(1)->name(), std::string("uid"));
}
} // namespace memory
} // namespace doris
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}