[Improve](row-store) support row cache (#16263)

This commit is contained in:
lihangyu
2023-02-06 11:16:39 +08:00
committed by GitHub
parent a390252893
commit f2fd47f238
15 changed files with 259 additions and 35 deletions

View File

@ -246,6 +246,7 @@ CONF_mBool(row_nums_check, "true");
// modify them upon necessity
CONF_Int32(min_file_descriptor_number, "60000");
CONF_Int64(index_stream_cache_capacity, "10737418240");
CONF_String(row_cache_mem_limit, "20%");
// Cache for storage page size
CONF_String(storage_page_cache_limit, "20%");
@ -257,6 +258,8 @@ CONF_Int32(storage_page_cache_shard_size, "16");
CONF_Int32(index_page_cache_percentage, "10");
// whether to disable page cache feature in storage
CONF_Bool(disable_storage_page_cache, "false");
// whether to disable row cache feature in storage
CONF_Bool(disable_storage_row_cache, "false");
CONF_Bool(enable_storage_vectorization, "true");

View File

@ -140,6 +140,7 @@ Status DeltaWriter::init() {
context.oldest_write_timestamp = UnixSeconds();
context.newest_write_timestamp = UnixSeconds();
context.tablet_id = _tablet->table_id();
context.is_direct_write = true;
RETURN_NOT_OK(_tablet->create_rowset_writer(context, &_rowset_writer));
_schema.reset(new Schema(_tablet_schema));
_reset_mem_table();

View File

@ -839,6 +839,7 @@ Status BetaRowsetWriter::_do_create_segment_writer(
segment_v2::SegmentWriterOptions writer_options;
writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
writer_options.rowset_ctx = &_context;
writer_options.is_direct_write = _context.is_direct_write;
if (is_segcompaction) {
writer->reset(new segment_v2::SegmentWriter(file_writer.get(), _num_segcompacted,

View File

@ -77,6 +77,9 @@ struct RowsetWriterContext {
int64_t newest_write_timestamp;
bool enable_unique_key_merge_on_write = false;
std::set<int32_t> skip_inverted_index;
// If it is directly write from load procedure, else
// it could be compaction or schema change etc..
bool is_direct_write = false;
};
} // namespace doris

View File

@ -30,6 +30,7 @@
#include "olap/schema.h"
#include "olap/short_key_index.h"
#include "runtime/memory/mem_tracker.h"
#include "service/point_query_executor.h"
#include "util/crc32c.h"
#include "util/faststring.h"
#include "util/key_util.h"
@ -187,6 +188,17 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {
return Status::OK();
}
void SegmentWriter::_maybe_invalid_row_cache(const std::string& key) {
// Just invalid row cache for simplicity, since the rowset is not visible at present.
// If we update/insert cache, if load failed rowset will not be visible but cached data
// will be visible, and lead to inconsistency.
if (!config::disable_storage_row_cache && _tablet_schema->store_row_column() &&
_opts.is_direct_write) {
// invalidate cache
RowCache::instance()->erase({_opts.rowset_ctx->tablet_id, key});
}
}
Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_pos,
size_t num_rows) {
CHECK(block->columns() == _column_writers.size())
@ -233,8 +245,9 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
// create primary indexes
for (size_t pos = 0; pos < num_rows; pos++) {
RETURN_IF_ERROR(
_primary_key_index_builder->add_item(_full_encode_keys(key_columns, pos)));
const std::string& key = _full_encode_keys(key_columns, pos);
RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
_maybe_invalid_row_cache(key);
}
} else {
// create short key indexes'

View File

@ -61,6 +61,9 @@ struct SegmentWriterOptions {
bool enable_unique_key_merge_on_write = false;
RowsetWriterContext* rowset_ctx = nullptr;
// If it is directly write from load procedure, else
// it could be compaction or schema change etc..
bool is_direct_write = false;
};
class SegmentWriter {
@ -116,6 +119,7 @@ private:
Status _write_primary_key_index();
Status _write_footer();
Status _write_raw_data(const std::vector<Slice>& slices);
void _maybe_invalid_row_cache(const std::string& key);
std::string _encode_keys(const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
size_t pos, bool null_first = true);
// for unique-key merge on write and segment min_max key

View File

@ -62,6 +62,7 @@
#include "olap/tablet_meta_manager.h"
#include "olap/tablet_schema.h"
#include "segment_loader.h"
#include "service/point_query_executor.h"
#include "util/defer_op.h"
#include "util/path_util.h"
#include "util/pretty_printer.h"
@ -70,6 +71,7 @@
#include "util/trace.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/jsonb/serialize.h"
namespace doris {
using namespace ErrorCode;
@ -1973,8 +1975,9 @@ TabletSchemaSPtr Tablet::get_max_version_schema(std::lock_guard<std::shared_mute
return _max_version_schema;
}
Status Tablet::lookup_row_data(const RowLocation& row_location, const TupleDescriptor* desc,
vectorized::Block* block) {
Status Tablet::lookup_row_data(const Slice& encoded_key, const RowLocation& row_location,
const TupleDescriptor* desc, vectorized::Block* block,
bool write_to_cache) {
// read row data
BetaRowsetSharedPtr rowset =
std::static_pointer_cast<BetaRowset>(get_rowset(row_location.rowset_id));
@ -2005,7 +2008,6 @@ Status Tablet::lookup_row_data(const RowLocation& row_location, const TupleDescr
LOG_EVERY_N(INFO, 500) << "get a single_row, cost(us):" << watch.elapsed_time() / 1000
<< ", row_size:" << row_size;
});
// TODO(lhy) too long, refacor
if (tablet_schema->store_row_column()) {
// create _source column
segment_v2::ColumnIterator* column_iterator = nullptr;
@ -2025,6 +2027,11 @@ Status Tablet::lookup_row_data(const RowLocation& row_location, const TupleDescr
RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 1, column_ptr));
assert(column_ptr->size() == 1);
auto string_column = static_cast<vectorized::ColumnString*>(column_ptr.get());
if (write_to_cache) {
StringRef value = string_column->get_data_at(0);
RowCache::instance()->insert({tablet_id(), encoded_key},
Slice {value.data, value.size});
}
vectorized::JsonbSerializeUtil::jsonb_to_block(*desc, *string_column, *block);
return Status::OK();
}

View File

@ -328,8 +328,9 @@ public:
RowLocation* row_location, uint32_t version);
// Lookup a row with TupleDescriptor and fill Block
Status lookup_row_data(const RowLocation& row_location, const TupleDescriptor* desc,
vectorized::Block* block);
Status lookup_row_data(const Slice& encoded_key, const RowLocation& row_location,
const TupleDescriptor* desc, vectorized::Block* block,
bool write_to_cache = false);
// calc delete bitmap when flush memtable, use a fake version to calc
// For example, cur max version is 5, and we use version 6 to calc but

View File

@ -46,6 +46,7 @@
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/tmp_file_mgr.h"
#include "service/point_query_executor.h"
#include "util/bfd_parser.h"
#include "util/brpc_client_cache.h"
#include "util/doris_metrics.h"
@ -192,6 +193,19 @@ Status ExecEnv::_init_mem_env() {
<< PrettyPrinter::print(storage_cache_limit, TUnit::BYTES)
<< ", origin config value: " << config::storage_page_cache_limit;
// Init row cache
int64_t row_cache_mem_limit =
ParseUtil::parse_mem_spec(config::row_cache_mem_limit, MemInfo::mem_limit(),
MemInfo::physical_mem(), &is_percent);
while (!is_percent && row_cache_mem_limit > MemInfo::mem_limit() / 2) {
// Reason same as buffer_pool_limit
row_cache_mem_limit = row_cache_mem_limit / 2;
}
RowCache::create_global_cache(row_cache_mem_limit);
LOG(INFO) << "Row cache memory limit: "
<< PrettyPrinter::print(row_cache_mem_limit, TUnit::BYTES)
<< ", origin config value: " << config::row_cache_mem_limit;
uint64_t fd_number = config::min_file_descriptor_number;
struct rlimit l;
int ret = getrlimit(RLIMIT_NOFILE, &l);

View File

@ -17,6 +17,7 @@
#include "service/point_query_executor.h"
#include "olap/lru_cache.h"
#include "olap/row_cursor.h"
#include "olap/storage_engine.h"
#include "service/internal_service.h"
@ -26,6 +27,7 @@
#include "util/thrift_util.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vliteral.h"
#include "vec/jsonb/serialize.h"
#include "vec/sink/vmysql_result_writer.cpp"
namespace doris {
@ -74,6 +76,52 @@ void Reusable::return_block(std::unique_ptr<vectorized::Block>& block) {
_block_pool.push_back(std::move(block));
}
RowCache* RowCache::_s_instance = nullptr;
RowCache::RowCache(int64_t capacity, int num_shards) {
// Create Row Cache
_cache = std::unique_ptr<Cache>(
new_lru_cache("RowCache", capacity, LRUCacheType::SIZE, num_shards));
}
// Create global instance of this class
void RowCache::create_global_cache(int64_t capacity, uint32_t num_shards) {
DCHECK(_s_instance == nullptr);
static RowCache instance(capacity, num_shards);
_s_instance = &instance;
}
RowCache* RowCache::instance() {
return _s_instance;
}
bool RowCache::lookup(const RowCacheKey& key, CacheHandle* handle) {
const std::string& encoded_key = key.encode();
auto lru_handle = _cache->lookup(encoded_key);
if (!lru_handle) {
// cache miss
return false;
}
*handle = CacheHandle(_cache.get(), lru_handle);
return true;
}
void RowCache::insert(const RowCacheKey& key, const Slice& value) {
auto deleter = [](const doris::CacheKey& key, void* value) { free(value); };
char* cache_value = static_cast<char*>(malloc(value.size));
memcpy(cache_value, value.data, value.size);
const std::string& encoded_key = key.encode();
auto handle =
_cache->insert(encoded_key, cache_value, value.size, deleter, CachePriority::NORMAL);
// handle will released
auto tmp = CacheHandle {_cache.get(), handle};
}
void RowCache::erase(const RowCacheKey& key) {
const std::string& encoded_key = key.encode();
_cache->erase(encoded_key);
}
Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request,
PTabletKeyLookupResponse* response) {
SCOPED_TIMER(&_profile_metrics.init_ns);
@ -142,10 +190,11 @@ std::string PointQueryExecutor::print_profile() {
"lookup_key:{}us, lookup_data:{}us, output_data:{}us, hit_lookup_cache:{}"
""
""
", is_binary_row:{}, output_columns:{}"
", is_binary_row:{}, output_columns:{}, total_keys:{}, row_cache_hits:{}"
"",
total_us, init_us, init_key_us, lookup_key_us, lookup_data_us, output_data_us,
_hit_lookup_cache, _binary_row_format, _reusable->output_exprs().size());
_hit_lookup_cache, _binary_row_format, _reusable->output_exprs().size(),
_primary_keys.size(), _row_cache_hits);
}
Status PointQueryExecutor::_init_keys(const PTabletKeyLookupRequest* request) {
@ -173,18 +222,29 @@ Status PointQueryExecutor::_init_keys(const PTabletKeyLookupRequest* request) {
Status PointQueryExecutor::_lookup_row_key() {
SCOPED_TIMER(&_profile_metrics.lookup_key_ns);
_row_locations.reserve(_primary_keys.size());
_row_locations.resize(_primary_keys.size());
_cached_row_data.resize(_primary_keys.size());
// 2. lookup row location
Status st;
for (size_t i = 0; i < _primary_keys.size(); ++i) {
RowLocation location;
if (!config::disable_storage_row_cache) {
RowCache::CacheHandle cache_handle;
auto hit_cache = RowCache::instance()->lookup({_tablet->tablet_id(), _primary_keys[i]},
&cache_handle);
if (hit_cache) {
_cached_row_data[i] = std::move(cache_handle);
++_row_cache_hits;
continue;
}
}
st = (_tablet->lookup_row_key(_primary_keys[i], nullptr, &location,
INT32_MAX /*rethink?*/));
if (st.is_not_found()) {
continue;
}
RETURN_IF_ERROR(st);
_row_locations.push_back(location);
_row_locations[i] = location;
}
return Status::OK();
}
@ -193,8 +253,19 @@ Status PointQueryExecutor::_lookup_row_data() {
// 3. get values
SCOPED_TIMER(&_profile_metrics.lookup_data_ns);
for (size_t i = 0; i < _row_locations.size(); ++i) {
RETURN_IF_ERROR(_tablet->lookup_row_data(_row_locations[i], _reusable->tuple_desc(),
_result_block.get()));
if (_cached_row_data[i].valid()) {
vectorized::JsonbSerializeUtil::jsonb_to_block(
*_reusable->tuple_desc(), _cached_row_data[i].data().data,
_cached_row_data[i].data().size, *_result_block);
continue;
}
if (!_row_locations[i].has_value()) {
continue;
}
RETURN_IF_ERROR(_tablet->lookup_row_data(
_primary_keys[i], _row_locations[i].value(), _reusable->tuple_desc(),
_result_block.get(),
!config::disable_storage_row_cache /*whether write row cache*/));
}
return Status::OK();
}
@ -234,4 +305,4 @@ Status PointQueryExecutor::_output_data() {
return Status::OK();
}
} // namespace doris
} // namespace doris

View File

@ -29,6 +29,9 @@
namespace doris {
class RowCache;
class Cache;
// For caching point lookup pre allocted blocks and exprs
class Reusable {
public:
@ -61,10 +64,93 @@ private:
int64_t _create_timestamp = 0;
};
// RowCache is a LRU cache for row store
class RowCache {
public:
// The cache key for row lru cache
struct RowCacheKey {
RowCacheKey(int64_t tablet_id, const Slice& key) : tablet_id(tablet_id), key(key) {}
int64_t tablet_id;
Slice key;
// Encode to a flat binary which can be used as LRUCache's key
std::string encode() const {
std::string full_key;
full_key.resize(sizeof(int64_t) + key.size);
int8store(&full_key.front(), tablet_id);
memcpy((&full_key.front()) + sizeof(tablet_id), key.data, key.size);
return full_key;
}
};
// A handle for RowCache entry. This class make it easy to handle
// Cache entry. Users don't need to release the obtained cache entry. This
// class will release the cache entry when it is destroyed.
class CacheHandle {
public:
CacheHandle() = default;
CacheHandle(Cache* cache, Cache::Handle* handle) : _cache(cache), _handle(handle) {}
~CacheHandle() {
if (_handle != nullptr) {
_cache->release(_handle);
}
}
CacheHandle(CacheHandle&& other) noexcept {
std::swap(_cache, other._cache);
std::swap(_handle, other._handle);
}
CacheHandle& operator=(CacheHandle&& other) noexcept {
std::swap(_cache, other._cache);
std::swap(_handle, other._handle);
return *this;
}
bool valid() { return _cache != nullptr && _handle != nullptr; }
Cache* cache() const { return _cache; }
Slice data() const { return _cache->value_slice(_handle); }
private:
Cache* _cache = nullptr;
Cache::Handle* _handle = nullptr;
// Don't allow copy and assign
DISALLOW_COPY_AND_ASSIGN(CacheHandle);
};
// Create global instance of this class
static void create_global_cache(int64_t capacity, uint32_t num_shards = kDefaultNumShards);
static RowCache* instance();
// Lookup a row key from cache,
// If the Row key is found, the cache entry will be written into handle.
// CacheHandle will release cache entry to cache when it destructs
// Return true if entry is found, otherwise return false.
bool lookup(const RowCacheKey& key, CacheHandle* handle);
// Insert a row with key into this cache.
// This function is thread-safe, and when two clients insert two same key
// concurrently, this function can assure that only one page is cached.
// The in_memory page will have higher priority.
void insert(const RowCacheKey& key, const Slice& data);
//
void erase(const RowCacheKey& key);
private:
static constexpr uint32_t kDefaultNumShards = 128;
RowCache(int64_t capacity, int num_shards = kDefaultNumShards);
static RowCache* _s_instance;
std::unique_ptr<Cache> _cache = nullptr;
};
// A cache used for prepare stmt.
// One connection per stmt perf uuid
// Use DoublyBufferedData to wrap Cache for performance and thread safe,
// since it's not barely modified
// since it's barely modified
class LookupCache {
public:
// uuid to reusable
@ -170,12 +256,14 @@ private:
PTabletKeyLookupResponse* _response;
TabletSharedPtr _tablet;
std::vector<std::string> _primary_keys;
std::vector<RowLocation> _row_locations;
std::vector<RowCache::CacheHandle> _cached_row_data;
std::vector<std::optional<RowLocation>> _row_locations;
std::shared_ptr<Reusable> _reusable;
std::unique_ptr<vectorized::Block> _result_block;
Metrics _profile_metrics;
size_t _row_cache_hits = 0;
bool _hit_lookup_cache = false;
bool _binary_row_format = false;
};
} // namespace doris
} // namespace doris

View File

@ -303,23 +303,30 @@ void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const Block&
}
}
// batch rows
void JsonbSerializeUtil::jsonb_to_block(const TupleDescriptor& desc,
const ColumnString& jsonb_column, Block& dst) {
for (int i = 0; i < jsonb_column.size(); ++i) {
StringRef jsonb_data = jsonb_column.get_data_at(i);
auto pdoc = JsonbDocument::createDocument(jsonb_data.data, jsonb_data.size);
JsonbDocument& doc = *pdoc;
for (int j = 0; j < desc.slots().size(); ++j) {
SlotDescriptor* slot = desc.slots()[j];
JsonbValue* slot_value = doc->find(slot->col_unique_id());
MutableColumnPtr dst_column = dst.get_by_position(j).column->assume_mutable();
if (!slot_value || slot_value->isNull()) {
// null or not exist
dst_column->insert_default();
continue;
}
deserialize_column(slot->type().type, slot_value, dst_column);
jsonb_to_block(desc, jsonb_data.data, jsonb_data.size, dst);
}
}
// single row
void JsonbSerializeUtil::jsonb_to_block(const TupleDescriptor& desc, const char* data, size_t size,
Block& dst) {
auto pdoc = JsonbDocument::createDocument(data, size);
JsonbDocument& doc = *pdoc;
for (int j = 0; j < desc.slots().size(); ++j) {
SlotDescriptor* slot = desc.slots()[j];
JsonbValue* slot_value = doc->find(slot->col_unique_id());
MutableColumnPtr dst_column = dst.get_by_position(j).column->assume_mutable();
if (!slot_value || slot_value->isNull()) {
// null or not exist
dst_column->insert_default();
continue;
}
deserialize_column(slot->type().type, slot_value, dst_column);
}
}

View File

@ -25,7 +25,11 @@ class JsonbSerializeUtil {
public:
static void block_to_jsonb(const TabletSchema& schema, const Block& block, ColumnString& dst,
int num_cols);
// batch rows
static void jsonb_to_block(const TupleDescriptor& desc, const ColumnString& jsonb_column,
Block& dst);
// single row
static void jsonb_to_block(const TupleDescriptor& desc, const char* data, size_t size,
Block& dst);
};
} // namespace doris::vectorized

View File

@ -21,22 +21,25 @@
1235 120939.111300000 a ddd laooq 2030-01-02 2020-01-01T12:36:38 22.822 7022-01-01 123
-- !point_select --
1235 120939.111300000 a ddd laooq 2030-01-02 2020-01-01T12:36:38 22.822 7022-01-01 123
1235 120939.111300000 a ddd xxxxxx 2030-01-02 2020-01-01T12:36:38 22.822 7022-01-01 123
-- !point_select --
1235 120939.111300000 a ddd laooq 2030-01-02 2020-01-01T12:36:38 22.822 7022-01-01 123
1235 120939.111300000 a ddd xxxxxx 2030-01-02 2020-01-01T12:36:38 22.822 7022-01-01 123
-- !point_select --
1235 120939.111300000 a ddd laooq 2030-01-02 2020-01-01T12:36:38 22.822 7022-01-01 123 \N
1235 120939.111300000 a ddd xxxxxx 2030-01-02 2020-01-01T12:36:38 22.822 7022-01-01 123
-- !point_select --
1235 120939.111300000 a ddd laooq 2030-01-02 2020-01-01T12:36:38 22.822 7022-01-01 123 \N
1235 120939.111300000 a ddd xxxxxx 2030-01-02 2020-01-01T12:36:38 22.822 7022-01-01 123 \N
-- !point_select --
1235 120939.111300000 a ddd laooq 2030-01-02 2020-01-01T12:36:38 22.822 7022-01-01 123
1235 120939.111300000 a ddd xxxxxx 2030-01-02 2020-01-01T12:36:38 22.822 7022-01-01 123 \N
-- !point_select --
1235 120939.111300000 a ddd laooq 2030-01-02 2020-01-01T12:36:38 22.822 7022-01-01 123
1235 120939.111300000 a ddd xxxxxx 2030-01-02 2020-01-01T12:36:38 22.822 7022-01-01 123
-- !point_select --
1235 120939.111300000 a ddd xxxxxx 2030-01-02 2020-01-01T12:36:38 22.822 7022-01-01 123
-- !sql --
1231 119291.110000000 ddd laooq \N 2020-01-01T12:36:38 \N 1022-01-01 \N

View File

@ -22,6 +22,7 @@ suite("test_point_query") {
def user = context.config.jdbcUser
def password = context.config.jdbcPassword
def url = context.config.jdbcUrl + "&useServerPrepStmts=true"
// def url = context.config.jdbcUrl
def result1 = connect(user=user, password=password, url=url) {
sql """DROP TABLE IF EXISTS ${tableName}"""
test {
@ -106,6 +107,9 @@ suite("test_point_query") {
stmt.setString(2, "a ddd")
qe_point_select stmt
qe_point_select stmt
// invalidate cache
sql """ INSERT INTO ${tableName} VALUES(1235, 120939.11130, "a ddd", "xxxxxx", "2030-01-02", "2020-01-01 12:36:38", 22.822, "7022-01-01 11:30:38", 123) """
qe_point_select stmt
qe_point_select stmt
qe_point_select stmt
sql """