diff --git a/be/src/common/config.h b/be/src/common/config.h index f868d78dc8..6d245d9950 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -576,6 +576,16 @@ namespace config { // Soft memory limit as a fraction of hard memory limit. CONF_Double(soft_mem_limit_frac, "0.9"); + + // Set max cache's size of query results, the unit is M byte + CONF_Int32(query_cache_max_size_mb, "256"); + + // Cache memory is pruned when reach query_cache_max_size_mb + query_cache_elasticity_size_mb + CONF_Int32(query_cache_elasticity_size_mb, "128"); + + // Maximum number of cache partitions corresponding to a SQL + CONF_Int32(query_cache_max_partition_count, "1024"); + } // namespace config } // namespace doris diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index e2ea260163..0176d74c41 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -106,6 +106,8 @@ set(RUNTIME_FILES mysql_result_writer.cpp memory/system_allocator.cpp memory/chunk_allocator.cpp + cache/result_node.cpp + cache/result_cache.cpp ) if (WITH_MYSQL) diff --git a/be/src/runtime/cache/cache_utils.h b/be/src/runtime/cache/cache_utils.h new file mode 100644 index 0000000000..192289f9b1 --- /dev/null +++ b/be/src/runtime/cache/cache_utils.h @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_RUNTIME_CACHE_UTILS_H +#define DORIS_BE_SRC_RUNTIME_CACHE_UTILS_H + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace doris { + +typedef boost::shared_lock CacheReadLock; +typedef boost::unique_lock CacheWriteLock; + +//#ifndef PARTITION_CACHE_DEV +//#define PARTITION_CACHE_DEV +//#endif + +struct CacheStat { + static const uint32 DAY_SECONDS = 86400; + long cache_time; + long last_update_time; + long last_read_time; + uint32 read_count; + CacheStat() { init(); } + + inline long cache_time_second() { + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec; + } + + void init() { + cache_time = 0; + last_update_time = 0; + last_read_time = 0; + read_count = 0; + } + + void update() { + last_update_time = cache_time_second(); + if (cache_time == 0) { + cache_time = last_update_time; + } + last_read_time = last_update_time; + read_count++; + } + + void query() { + last_read_time = cache_time_second(); + read_count++; + } + + double last_query_day() { return (cache_time_second() - last_read_time) * 1.0 / DAY_SECONDS; } + + double avg_query_count() { + return read_count * DAY_SECONDS * 1.0 / (cache_time_second() - last_read_time + 1); + } +}; + +} // namespace doris +#endif diff --git a/be/src/runtime/cache/result_cache.cpp b/be/src/runtime/cache/result_cache.cpp new file mode 100644 index 0000000000..96326646dd --- /dev/null +++ b/be/src/runtime/cache/result_cache.cpp @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "gen_cpp/internal_service.pb.h" +#include "runtime/cache/result_cache.h" +#include "util/doris_metrics.h" + +namespace doris { + +/** +* Remove the tail node of link +*/ +ResultNode* ResultNodeList::pop() { + remove(_head); + return _head; +} + +void ResultNodeList::remove(ResultNode* node) { + if (!node) return; + if (node == _head) _head = node->get_next(); + if (node == _tail) _tail = node->get_prev(); + node->unlink(); + _node_count--; +} + +void ResultNodeList::push_back(ResultNode* node) { + if (!node) return; + if (!_head) _head = node; + node->append(_tail); + _tail = node; + _node_count++; +} + +void ResultNodeList::move_tail(ResultNode* node) { + if (!node || node == _tail) return; + if (!_head) + _head = node; + else if (node == _head) + _head = node->get_next(); + node->unlink(); + node->append(_tail); + _tail = node; +} + +void ResultNodeList::delete_node(ResultNode** node) { + (*node)->clear(); + SAFE_DELETE(*node); +} + +void ResultNodeList::clear() { + LOG(INFO) << "clear result node list."; + while (_head) { + ResultNode* tmp_node = _head->get_next(); + _head->clear(); + SAFE_DELETE(_head); + _head = tmp_node; + } + _node_count = 0; +} +/** + * Find the node and update partition data + * New node, the node updated in the first partition will move to the tail of the list + */ +void ResultCache::update(const PUpdateCacheRequest* request, PCacheResponse* response) { + ResultNode* node; + PCacheStatus status; + bool update_first = false; + UniqueId sql_key = request->sql_key(); + LOG(INFO) << "update cache, sql key:" << sql_key; + + CacheWriteLock write_lock(_cache_mtx); + auto it = _node_map.find(sql_key); + if (it != _node_map.end()) { + node = it->second; + _cache_size -= node->get_data_size(); + _partition_count -= node->get_partition_count(); + status = node->update_partition(request, update_first); + } else { + node = _node_list.new_node(sql_key); + status = node->update_partition(request, update_first); + _node_list.push_back(node); + _node_map[sql_key] = node; + _node_count += 1; + } + if (update_first) { + _node_list.move_tail(node); + } + _cache_size += node->get_data_size(); + _partition_count += node->get_partition_count(); + response->set_status(status); + + prune(); + update_monitor(); +} + +/** + * Fetch cache through sql key, partition key, version and time + */ +void ResultCache::fetch(const PFetchCacheRequest* request, PFetchCacheResult* result) { + bool hit_first = false; + ResultNodeMap::iterator node_it; + const UniqueId sql_key = request->sql_key(); + LOG(INFO) << "fetch cache, sql key:" << sql_key; + { + CacheReadLock read_lock(_cache_mtx); + node_it = _node_map.find(sql_key); + if (node_it == _node_map.end()) { + result->set_status(PCacheStatus::NO_SQL_KEY); + LOG(INFO) << "no such sql key:" << sql_key; + return; + } + ResultNode* node = node_it->second; + PartitionRowBatchList part_rowbatch_list; + PCacheStatus status = node->fetch_partition(request, part_rowbatch_list, hit_first); + + for (auto part_it = part_rowbatch_list.begin(); part_it != part_rowbatch_list.end(); part_it++) { + PCacheValue* srcValue = (*part_it)->get_value(); + if (srcValue != NULL) { + PCacheValue* value = result->add_values(); + value->CopyFrom(*srcValue); + LOG(INFO) << "fetch cache partition key:" << srcValue->param().partition_key(); + } else { + LOG(WARNING) << "prowbatch of cache is null"; + status = PCacheStatus::EMPTY_DATA; + break; + } + } + result->set_status(status); + } + + if (hit_first) { + CacheWriteLock write_lock(_cache_mtx); + _node_list.move_tail(node_it->second); + } +} + +bool ResultCache::contains(const UniqueId& sql_key) { + CacheReadLock read_lock(_cache_mtx); + return _node_map.find(sql_key) != _node_map.end(); +} + +/** + * enum PClearType { + * CLEAR_ALL = 0, + * PRUNE_CACHE = 1, + * CLEAR_BEFORE_TIME = 2, + * CLEAR_SQL_KEY = 3 + * }; + */ +void ResultCache::clear(const PClearCacheRequest* request, PCacheResponse* response) { + LOG(INFO) << "clear cache type" << request->clear_type() + << ", node size:" << _node_list.get_node_count() << ", map size:" << _node_map.size(); + CacheWriteLock write_lock(_cache_mtx); + //0 clear, 1 prune, 2 before_time,3 sql_key + switch (request->clear_type()) { + case PClearType::CLEAR_ALL: + _node_list.clear(); + _node_map.clear(); + _cache_size = 0; + _node_count = 0; + _partition_count = 0; + break; + case PClearType::PRUNE_CACHE: + prune(); + break; + default: + break; + } + update_monitor(); + response->set_status(PCacheStatus::CACHE_OK); +} + +//private method +ResultNode* find_min_time_node(ResultNode* result_node) { + if (result_node->get_prev()) { + if (result_node->get_prev()->first_partition_last_time() <= + result_node->first_partition_last_time()) { + return result_node->get_prev(); + } + } + + if (result_node->get_next()) { + if (result_node->get_next()->first_partition_last_time() < + result_node->first_partition_last_time()) { + return result_node->get_next(); + } + } + return result_node; +} + +/* +* Two-dimensional array, prune the min last_read_time PartitionRowBatch. +* The following example is the last read time array. +* 1 and 2 is the read time, nodes with pruning read time < 3 +* Before: +* 1,2 //_head ResultNode* +* 1,2,3,4,5 +* 2,4,3,6,8 +* 5,7,9,11,13 //_tail ResultNode* +* After: +* 4,5 //_head +* 4,3,6,8 +* 5,7,9,11,13 //_tail +*/ +void ResultCache::prune() { + if (_cache_size <= (_max_size + _elasticity_size)) { + return; + } + LOG(INFO) << "begin prune cache, cache_size : " << _cache_size << ", max_size : " << _max_size + << ", elasticity_size : " << _elasticity_size; + ResultNode* result_node = _node_list.get_head(); + while (_cache_size > _max_size) { + if (result_node == NULL) { + break; + } + result_node = find_min_time_node(result_node); + _cache_size -= result_node->prune_first(); + if (result_node->get_data_size() == 0) { + ResultNode* next_node; + if (result_node->get_next()) { + next_node = result_node->get_next(); + } else if (result_node->get_prev()) { + next_node = result_node->get_prev(); + } else { + next_node = _node_list.get_head(); + } + remove(result_node); + result_node = next_node; + } + } + LOG(INFO) << "finish prune, cache_size : " << _cache_size; + _node_count = _node_map.size(); + _cache_size = 0; + _partition_count = 0; + for (auto node_it = _node_map.begin(); node_it != _node_map.end(); node_it++) { + _partition_count += node_it->second->get_partition_count(); + _cache_size += node_it->second->get_data_size(); + } +} + +void ResultCache::remove(ResultNode* result_node) { + auto node_it = _node_map.find(result_node->get_sql_key()); + if (node_it != _node_map.end()) { + _node_map.erase(node_it); + _node_list.remove(result_node); + _node_list.delete_node(&result_node); + } +} + +void ResultCache::update_monitor() { + DorisMetrics::instance()->query_cache_memory_total_byte->set_value(_cache_size); + DorisMetrics::instance()->query_cache_sql_total_count->set_value(_node_count); + DorisMetrics::instance()->query_cache_partition_total_count->set_value(_partition_count); +} + +} // namespace doris + diff --git a/be/src/runtime/cache/result_cache.h b/be/src/runtime/cache/result_cache.h new file mode 100644 index 0000000000..c05e253e7a --- /dev/null +++ b/be/src/runtime/cache/result_cache.h @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_RUNTIME_RESULT_CACHE_H +#define DORIS_BE_SRC_RUNTIME_RESULT_CACHE_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/config.h" +#include "runtime/cache/cache_utils.h" +#include "runtime/cache/result_node.h" +#include "runtime/mem_pool.h" +#include "runtime/mem_tracker.h" +#include "runtime/row_batch.h" +#include "runtime/tuple_row.h" + +namespace doris { + +typedef std::unordered_map ResultNodeMap; + +// a doubly linked list class, point to result node +class ResultNodeList { +public: + ResultNodeList() : _head(NULL), _tail(NULL), _node_count(0) {} + virtual ~ResultNodeList() {} + + ResultNode* new_node(const UniqueId& sql_key) { return new ResultNode(sql_key); } + + void delete_node(ResultNode** node); + + ResultNode* pop(); + void move_tail(ResultNode* node); + //Just remove node from link, do not delete node + void remove(ResultNode* node); + void push_back(ResultNode* node); + void clear(); + + ResultNode* get_head() const { return _head; } + + ResultNode* get_tail() const { return _tail; } + + size_t get_node_count() const { return _node_count; } + +private: + ResultNode* _head; + ResultNode* _tail; + size_t _node_count; +}; + +/** + * Cache results of query, including the entire result set or the result set of divided partitions. + * Two data structures, one is unordered_map and the other is a doubly linked list, corresponding to a result node. + * If the cache is hit, the node will be moved to the end of the linked list. + * If the cache is cleared, nodes that are expired or have not been accessed for a long time will be cleared. + */ +class ResultCache { +public: + ResultCache(int32 max_size, int32 elasticity_size) { + _max_size = max_size * 1024 * 1024; + _elasticity_size = elasticity_size * 1024 * 1024; + _cache_size = 0; + _node_count = 0; + _partition_count = 0; + } + + virtual ~ResultCache() {} + void update(const PUpdateCacheRequest* request, PCacheResponse* response); + void fetch(const PFetchCacheRequest* request, PFetchCacheResult* result); + bool contains(const UniqueId& sql_key); + void clear(const PClearCacheRequest* request, PCacheResponse* response); + + size_t get_cache_size() { return _cache_size; } + +private: + void prune(); + void remove(ResultNode* result_node); + void update_monitor(); + + //At the same time, multithreaded reading + //Single thread updating and cleaning(only single be, Fe is not affected) + mutable boost::shared_mutex _cache_mtx; + ResultNodeMap _node_map; + //List of result nodes corresponding to SqlKey,last recently useed at the tail + ResultNodeList _node_list; + size_t _cache_size; + size_t _max_size; + double _elasticity_size; + size_t _node_count; + size_t _partition_count; + +private: + ResultCache(); + ResultCache(const ResultCache&); + const ResultCache& operator=(const ResultCache&); +}; + +} // namespace doris +#endif diff --git a/be/src/runtime/cache/result_node.cpp b/be/src/runtime/cache/result_node.cpp new file mode 100644 index 0000000000..7813458e85 --- /dev/null +++ b/be/src/runtime/cache/result_node.cpp @@ -0,0 +1,272 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "gen_cpp/internal_service.pb.h" +#include "runtime/cache/result_node.h" +#include "runtime/cache/cache_utils.h" + +namespace doris { + +bool compare_partition(const PartitionRowBatch* left_node, const PartitionRowBatch* right_node) { + return left_node->get_partition_key() < right_node->get_partition_key(); +} + +//return new batch size,only include the size of PRowBatch +void PartitionRowBatch::set_row_batch(const PCacheValue& value) { + if (_cache_value != NULL && !check_newer(value.param())) { + LOG(WARNING) << "set old version data, cache ver:" << _cache_value->param().last_version() + << ",cache time:" << _cache_value->param().last_version_time() + << ", setdata ver:" << value.param().last_version() + << ",setdata time:" << value.param().last_version_time(); + return; + } + SAFE_DELETE(_cache_value); + _cache_value = new PCacheValue(value); + _data_size += _cache_value->data_size(); + _cache_stat.update(); + LOG(INFO) << "finish set row batch, row num:" << _cache_value->rows_size() + << ", data size:" << _data_size; +} + +bool PartitionRowBatch::is_hit_cache(const PCacheParam& param) { + if (!check_match(param)) { + return false; + } + _cache_stat.query(); + return true; +} + +void PartitionRowBatch::clear() { + LOG(INFO) << "clear partition rowbatch."; + SAFE_DELETE(_cache_value); + _partition_key = 0; + _data_size = 0; + _cache_stat.init(); +} + +/** + * Update partition cache data, find RowBatch from partition map by partition key, + * the partition rowbatch are stored in the order of partition keys + */ +PCacheStatus ResultNode::update_partition(const PUpdateCacheRequest* request, bool& is_update_firstkey) { + is_update_firstkey = false; + if (_sql_key != request->sql_key()) { + LOG(INFO) << "no match sql_key " << request->sql_key().hi() << request->sql_key().lo(); + return PCacheStatus::PARAM_ERROR; + } + + if (request->values_size() > config::query_cache_max_partition_count) { + LOG(WARNING) << "too many partitions size:" << request->values_size(); + return PCacheStatus::PARAM_ERROR; + } + + //Only one thread per SQL key can update the cache + CacheWriteLock write_lock(_node_mtx); + + PartitionKey first_key = kint64max; + if (_partition_list.size() == 0) { + is_update_firstkey = true; + } else { + first_key = (*(_partition_list.begin()))->get_partition_key(); + } + PartitionRowBatch* partition = NULL; + for (int i = 0; i < request->values_size(); i++) { + const PCacheValue& value = request->values(i); + PartitionKey partition_key = value.param().partition_key(); + if (!is_update_firstkey && partition_key <= first_key) { + is_update_firstkey = true; + } + auto it = _partition_map.find(partition_key); + if (it == _partition_map.end()) { + partition = new PartitionRowBatch(partition_key); + partition->set_row_batch(value); + _partition_map[partition_key] = partition; + _partition_list.push_back(partition); +#ifdef PARTITION_CACHE_DEV + LOG(INFO) << "add index:" << i << ", pkey:" << partition->get_partition_key() + << ", list size:" << _partition_list.size() + << ", map size:" << _partition_map.size(); +#endif + } else { + partition = it->second; + _data_size -= partition->get_data_size(); + partition->set_row_batch(value); +#ifdef PARTITION_CACHE_DEV + LOG(INFO) << "update index:" << i << ", pkey:" << partition->get_partition_key() + << ", list size:" << _partition_list.size() + << ", map size:" << _partition_map.size(); +#endif + } + _data_size += partition->get_data_size(); + } + _partition_list.sort(compare_partition); + LOG(INFO) << "finish update batches:" << _partition_list.size(); + while (config::query_cache_max_partition_count > 0 && + _partition_list.size() > config::query_cache_max_partition_count) { + if (prune_first() == 0) { + break; + } + } + return PCacheStatus::CACHE_OK; +} + +/** +* Only the range query of the key of the partition is supported, and the separated partition key query is not supported. +* Because a query can only be divided into two parts, part1 get data from cache, part2 fetch_data by scan node from BE. +* Partion cache : 20191211-20191215 +* Hit cache parameter : [20191211 - 20191215], [20191212 - 20191214], [20191212 - 20191216],[20191210 - 20191215] +* Miss cache parameter: [20191210 - 20191216] +*/ +PCacheStatus ResultNode::fetch_partition(const PFetchCacheRequest* request, + PartitionRowBatchList& row_batch_list, bool& is_hit_firstkey) { + is_hit_firstkey = false; + if (request->params_size() == 0) { + return PCacheStatus::PARAM_ERROR; + } + + CacheReadLock read_lock(_node_mtx); + + if (_partition_list.size() == 0) { + return PCacheStatus::NO_PARTITION_KEY; + } + + if (request->params(0).partition_key() > (*_partition_list.rbegin())->get_partition_key() || + request->params(request->params_size() - 1).partition_key() < + (*_partition_list.begin())->get_partition_key()) { + return PCacheStatus::NO_PARTITION_KEY; + } + + bool find = false; + int begin_idx = -1, end_idx = -1, param_idx = 0; + auto begin_it = _partition_list.end(); + auto end_it = _partition_list.end(); + auto part_it = _partition_list.begin(); + + PCacheStatus status = PCacheStatus::CACHE_OK; + while (param_idx < request->params_size() && part_it != _partition_list.end()) { +#ifdef PARTITION_CACHE_DEV + LOG(INFO) << "Param index : " << param_idx + << ", param part Key : " << request->params(param_idx).partition_key() + << ", batch part key : " << (*part_it)->get_partition_key(); +#endif + if (!find) { + while (part_it != _partition_list.end() && + request->params(param_idx).partition_key() > (*part_it)->get_partition_key()) { + part_it++; + } + while (param_idx < request->params_size() && + request->params(param_idx).partition_key() < (*part_it)->get_partition_key()) { + param_idx++; + } + if (request->params(param_idx).partition_key() == (*part_it)->get_partition_key()) { + find = true; + } + } + if (find) { +#ifdef PARTITION_CACHE_DEV + LOG(INFO) << "Find! Param index : " << param_idx + << ", param part Key : " << request->params(param_idx).partition_key() + << ", batch part key : " << (*part_it)->get_partition_key() + << ", param part version : " << request->params(param_idx).last_version() + << ", batch part version : " << (*part_it)->get_value()->param().last_version() + << ", param part version time : " << request->params(param_idx).last_version_time() + << ", batch part version time : " << (*part_it)->get_value()->param().last_version_time(); +#endif + if ((*part_it)->is_hit_cache(request->params(param_idx))) { + if (begin_idx < 0) { + begin_idx = param_idx; + begin_it = part_it; + } + end_idx = param_idx; + end_it = part_it; + param_idx++; + part_it++; + } else { + status = PCacheStatus::DATA_OVERDUE; + break; + } + } + } + + if (begin_it == _partition_list.end() && end_it == _partition_list.end()) { + return status; + } + + //[20191210 - 20191216] hit partition range [20191212-20191214],the sql will be splited to 3 part! + if (begin_idx != 0 && end_idx != request->params_size() - 1) { + return PCacheStatus::INVALID_KEY_RANGE; + } + if (begin_it == _partition_list.begin()) { + is_hit_firstkey = true; + } + + while (true) { + row_batch_list.push_back(*begin_it); + if (begin_it == end_it) { + break; + } + begin_it++; + } + return status; +} + +/* +* prune first partition result +*/ +size_t ResultNode::prune_first() { + if (_partition_list.size() == 0) { + return 0; + } + PartitionRowBatch* part_node = *_partition_list.begin(); + size_t prune_size = part_node->get_data_size(); + _partition_list.erase(_partition_list.begin()); + part_node->clear(); + SAFE_DELETE(part_node); + _data_size -= prune_size; + return prune_size; +} + +void ResultNode::clear() { + CacheWriteLock write_lock(_node_mtx); + LOG(INFO) << "clear result node:" << _sql_key; + _sql_key.hi = 0; + _sql_key.lo = 0; + for (auto it = _partition_list.begin(); it != _partition_list.end();) { + (*it)->clear(); + SAFE_DELETE(*it); + it = _partition_list.erase(it); + } + _data_size = 0; +} + +void ResultNode::append(ResultNode* tail) { + _prev = tail; + if (tail) tail->set_next(this); +} + +void ResultNode::unlink() { + if (_next) { + _next->set_prev(_prev); + } + if (_prev) { + _prev->set_next(_next); + } + _next = NULL; + _prev = NULL; +} + +} // namespace doris + diff --git a/be/src/runtime/cache/result_node.h b/be/src/runtime/cache/result_node.h new file mode 100644 index 0000000000..722509de4f --- /dev/null +++ b/be/src/runtime/cache/result_node.h @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_RUNTIME_RESULT_NODE_H +#define DORIS_BE_SRC_RUNTIME_RESULT_NODE_H + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/config.h" +#include "olap/olap_define.h" +#include "runtime/cache/cache_utils.h" +#include "runtime/mem_pool.h" +#include "runtime/row_batch.h" +#include "runtime/tuple_row.h" +#include "util/uid_util.h" + +namespace doris { + +enum PCacheStatus; +class PCacheParam; +class PCacheValue; +class PCacheResponse; +class PFetchCacheRequest; +class PFetchCacheResult; +class PUpdateCacheRequest; +class PClearCacheRequest; + +/** +* Cache one partition data, request param must match version and time of cache +*/ +class PartitionRowBatch { +public: + PartitionRowBatch(int64 partition_key) + : _partition_key(partition_key), _cache_value(NULL), _data_size(0) {} + + ~PartitionRowBatch() {} + + void set_row_batch(const PCacheValue& value); + bool is_hit_cache(const PCacheParam& param); + void clear(); + + int64 get_partition_key() const { return _partition_key; } + + PCacheValue* get_value() { return _cache_value; } + + size_t get_data_size() { return _data_size; } + + const CacheStat* get_stat() const { return &_cache_stat; } + +private: + bool check_match(const PCacheParam& req_param) { + if (req_param.partition_key() != _partition_key) { + return false; + } + if (req_param.last_version() > _cache_value->param().last_version()) { + return false; + } + if (req_param.last_version_time() > _cache_value->param().last_version_time()) { + return false; + } + return true; + } + + bool check_newer(const PCacheParam& up_param) { + //for init data of sql cache + if (up_param.last_version() == 0 || up_param.last_version_time() == 0) { + return true; + } + if (up_param.last_version_time() > _cache_value->param().last_version_time()) { + return true; + } + if (up_param.last_version() > _cache_value->param().last_version()) { + return true; + } + return false; + } + +private: + int64 _partition_key; + PCacheValue* _cache_value; + size_t _data_size; + CacheStat _cache_stat; +}; + +typedef int64 PartitionKey; +typedef std::list PartitionRowBatchList; +typedef boost::unordered_map PartitionRowBatchMap; + +/** +* Cache the result of one SQL, include many partition rowsets. +* Sql Cache: The partiton ID comes from the partition lastest updated. +* Partition Cache: The partition ID comes from the partition scanned by query. +* The above two modes use the same cache structure. +*/ +class ResultNode { +public: + ResultNode() : _sql_key(0, 0), _prev(NULL), _next(NULL), _data_size(0) {} + + ResultNode(const UniqueId& sql_key) + : _sql_key(sql_key), _prev(NULL), _next(NULL), _data_size(0) {} + + virtual ~ResultNode() {} + + PCacheStatus update_partition(const PUpdateCacheRequest* request, bool& is_update_firstkey); + PCacheStatus fetch_partition(const PFetchCacheRequest* request, + PartitionRowBatchList& rowBatchList, bool& is_hit_firstkey); + + size_t prune_first(); + void clear(); + + ResultNode* get_prev() { return _prev; } + + void set_prev(ResultNode* prev) { _prev = prev; } + + ResultNode* get_next() { return _next; } + + void set_next(ResultNode* next) { _next = next; } + + void append(ResultNode* tail); + + void unlink(); + + size_t get_partition_count() const { return _partition_list.size(); } + + size_t get_data_size() const { return _data_size; } + + UniqueId get_sql_key() { return _sql_key; } + + bool sql_key_null() { return _sql_key.hi == 0 && _sql_key.lo == 0; } + + void set_sql_key(const UniqueId& sql_key) { _sql_key = sql_key; } + + long first_partition_last_time() const { + if (_partition_list.size() == 0) { + return 0; + } + const PartitionRowBatch* first = *(_partition_list.begin()); + return first->get_stat()->last_read_time; + } + + const CacheStat* get_first_stat() const { + if (_partition_list.size() == 0) { + return NULL; + } + return (*(_partition_list.begin()))->get_stat(); + } + + const CacheStat* get_last_stat() const { + if (_partition_list.size() == 0) { + return NULL; + } + return (*(_partition_list.end()--))->get_stat(); + } + +private: + mutable boost::shared_mutex _node_mtx; + UniqueId _sql_key; + ResultNode* _prev; + ResultNode* _next; + size_t _data_size; + PartitionRowBatchList _partition_list; + PartitionRowBatchMap _partition_map; +}; + +} // namespace doris +#endif diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index d22150da47..6bd8a80926 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -21,7 +21,8 @@ namespace doris { -ExecEnv::ExecEnv() {} +ExecEnv::ExecEnv() : _is_init(false) { +} ExecEnv::~ExecEnv() {} diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index a880419921..1f973489e1 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -34,6 +34,7 @@ class EtlJobMgr; class EvHttpServer; class ExternalScanContextMgr; class FragmentMgr; +class ResultCache; class LoadPathMgr; class LoadStreamMgr; class MemTracker; @@ -55,6 +56,7 @@ class SmallFileMgr; class FileBlockManager; class PluginMgr; + class BackendServiceClient; class FrontendServiceClient; class TPaloBrokerServiceClient; @@ -113,6 +115,7 @@ public: PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; } CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; } FragmentMgr* fragment_mgr() { return _fragment_mgr; } + ResultCache* result_cache() { return _result_cache; } TMasterInfo* master_info() { return _master_info; } EtlJobMgr* etl_job_mgr() { return _etl_job_mgr; } LoadPathMgr* load_path_mgr() { return _load_path_mgr; } @@ -147,6 +150,7 @@ private: void _init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t clean_pages_limit); private: + bool _is_init; std::vector _store_paths; // Leave protected so that subclasses can override ExternalScanContextMgr* _external_scan_context_mgr = nullptr; @@ -164,6 +168,7 @@ private: PriorityThreadPool* _etl_thread_pool = nullptr; CgroupsMgr* _cgroups_mgr = nullptr; FragmentMgr* _fragment_mgr = nullptr; + ResultCache* _result_cache = nullptr; TMasterInfo* _master_info = nullptr; EtlJobMgr* _etl_job_mgr = nullptr; LoadPathMgr* _load_path_mgr = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index fb84bb114a..36bac1433a 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -35,6 +35,7 @@ #include "runtime/load_channel_mgr.h" #include "runtime/tmp_file_mgr.h" #include "runtime/bufferpool/reservation_tracker.h" +#include "runtime/cache/result_cache.h" #include "util/metrics.h" #include "util/network_util.h" #include "util/parse_util.h" @@ -65,11 +66,15 @@ namespace doris { -Status ExecEnv::init(ExecEnv* env, const std::vector& store_paths) { +Status ExecEnv::init(ExecEnv* env, const std::vector& store_paths) { return env->_init(store_paths); } Status ExecEnv::_init(const std::vector& store_paths) { + //Only init once before be destroyed + if (_is_init) { + return Status::OK(); + } _store_paths = store_paths; _external_scan_context_mgr = new ExternalScanContextMgr(this); _stream_mgr = new DataStreamMgr(); @@ -89,6 +94,7 @@ Status ExecEnv::_init(const std::vector& store_paths) { config::etl_thread_pool_queue_size); _cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups); _fragment_mgr = new FragmentMgr(this); + _result_cache = new ResultCache(config::query_cache_max_size_mb, config::query_cache_elasticity_size_mb); _master_info = new TMasterInfo(); _etl_job_mgr = new EtlJobMgr(this); _load_path_mgr = new LoadPathMgr(this); @@ -122,6 +128,7 @@ Status ExecEnv::_init(const std::vector& store_paths) { RETURN_IF_ERROR(_load_channel_mgr->init(_mem_tracker->limit())); _heartbeat_flags = new HeartbeatFlags(); + _is_init = true; return Status::OK(); } @@ -207,6 +214,10 @@ void ExecEnv::_init_buffer_pool(int64_t min_page_size, } void ExecEnv::_destory() { + //Only destroy once after init + if (!_is_init) { + return; + } SAFE_DELETE(_brpc_stub_cache); SAFE_DELETE(_load_stream_mgr); SAFE_DELETE(_load_channel_mgr); @@ -234,6 +245,7 @@ void ExecEnv::_destory() { SAFE_DELETE(_routine_load_task_executor); SAFE_DELETE(_external_scan_context_mgr); SAFE_DELETE(_heartbeat_flags); + _is_init = false; } void ExecEnv::destroy(ExecEnv* env) { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 68dfdfd638..99395a36fb 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -19,6 +19,7 @@ #include "common/config.h" #include "gen_cpp/BackendService.h" +#include "gen_cpp/internal_service.pb.h" #include "runtime/exec_env.h" #include "runtime/data_stream_mgr.h" #include "runtime/fragment_mgr.h" @@ -222,6 +223,32 @@ void PInternalServiceImpl::get_info( Status::OK().to_protobuf(response->mutable_status()); } +template +void PInternalServiceImpl::update_cache(google::protobuf::RpcController* controller, + const PUpdateCacheRequest* request, + PCacheResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->update(request, response); +} + +template +void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller, + const PFetchCacheRequest* request, + PFetchCacheResult* result, + google::protobuf::Closure* done) { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->fetch(request, result); +} + +template +void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controller, + const PClearCacheRequest* request, + PCacheResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard closure_guard(done); + _exec_env->result_cache()->clear(request, response); +} template class PInternalServiceImpl; template class PInternalServiceImpl; diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index e27c2c3a40..03fbcfedea 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -21,6 +21,7 @@ #include "gen_cpp/internal_service.pb.h" #include "gen_cpp/palo_internal_service.pb.h" #include "util/priority_thread_pool.hpp" +#include "runtime/cache/result_cache.h" namespace brpc { class Controller; @@ -86,6 +87,20 @@ public: PProxyResult* response, google::protobuf::Closure* done) override; + void update_cache(google::protobuf::RpcController* controller, + const PUpdateCacheRequest* request, + PCacheResponse* response, + google::protobuf::Closure* done) override; + + void fetch_cache(google::protobuf::RpcController* controller, + const PFetchCacheRequest* request, + PFetchCacheResult* result, + google::protobuf::Closure* done) override; + + void clear_cache(google::protobuf::RpcController* controller, + const PClearCacheRequest* request, + PCacheResponse* response, + google::protobuf::Closure* done) override; private: Status _exec_plan_fragment(brpc::Controller* cntl); private: diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 023074a66e..39ec078b12 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -19,7 +19,6 @@ #include #include - #include "env/env.h" #include "util/debug_util.h" #include "util/file_utils.h" @@ -126,6 +125,10 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(disk_sync_total, MetricUnit::OPERATIONS); DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(blocks_open_reading, MetricUnit::BLOCKS); DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(blocks_open_writing, MetricUnit::BLOCKS); + +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(query_cache_memory_total_byte, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(query_cache_sql_total_count, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(query_cache_partition_total_count, MetricUnit::NOUNIT); const std::string DorisMetrics::_s_registry_name = "doris_be"; const std::string DorisMetrics::_s_hook_name = "doris_metrics"; @@ -230,6 +233,10 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_bytes); _server_metric_entity->register_hook(_s_hook_name, std::bind(&DorisMetrics::_update, this)); + + INT_UGAUGE_METRIC_REGISTER(_server_metric_entity, query_cache_memory_total_byte); + INT_UGAUGE_METRIC_REGISTER(_server_metric_entity, query_cache_sql_total_count); + INT_UGAUGE_METRIC_REGISTER(_server_metric_entity, query_cache_partition_total_count); } void DorisMetrics::initialize( diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 3ffbe746d8..924f365688 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -170,6 +170,11 @@ public: UIntGauge* tablet_writer_count; UIntGauge* compaction_mem_current_consumption; + + // Cache metrics + UIntGauge* query_cache_memory_total_byte; + UIntGauge* query_cache_sql_total_count; + UIntGauge* query_cache_partition_total_count; static DorisMetrics* instance() { static DorisMetrics instance; diff --git a/be/src/util/uid_util.h b/be/src/util/uid_util.h index dc2d938392..c5c085906f 100644 --- a/be/src/util/uid_util.h +++ b/be/src/util/uid_util.h @@ -62,6 +62,7 @@ struct UniqueId { int64_t lo = 0; UniqueId(int64_t hi_, int64_t lo_) : hi(hi_), lo(lo_) { } + UniqueId(const UniqueId& uid) : hi(uid.hi), lo(uid.lo) { } UniqueId(const TUniqueId& tuid) : hi(tuid.hi), lo(tuid.lo) { } UniqueId(const PUniqueId& puid) : hi(puid.hi()), lo(puid.lo()) { } UniqueId(const std::string& hi_str, const std::string& lo_str) { @@ -87,6 +88,32 @@ struct UniqueId { to_hex(lo, buf + 17); return {buf, 33}; } + + UniqueId& operator=(const UniqueId uid) { + hi = uid.hi; + lo = uid.lo; + return *this; + } + + UniqueId& operator=(const PUniqueId puid) { + hi = puid.hi(); + lo = puid.lo(); + return *this; + } + + UniqueId& operator=(const TUniqueId tuid) { + hi = tuid.hi; + lo = tuid.lo; + return *this; + } + //compare PUniqueId and UniqueId + bool operator==(const PUniqueId& rhs) const { + return hi == rhs.hi() && lo == rhs.lo(); + } + + bool operator!=(const PUniqueId& rhs) const { + return hi != rhs.hi() || lo != rhs.lo(); + } // std::map std::set needs this operator bool operator<(const UniqueId& right) const { diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt index b360534569..c3bab67a23 100644 --- a/be/test/runtime/CMakeLists.txt +++ b/be/test/runtime/CMakeLists.txt @@ -62,3 +62,4 @@ ADD_BE_TEST(external_scan_context_mgr_test) ADD_BE_TEST(memory/chunk_allocator_test) ADD_BE_TEST(memory/system_allocator_test) +ADD_BE_TEST(cache/partition_cache_test) diff --git a/be/test/runtime/cache/partition_cache_test.cpp b/be/test/runtime/cache/partition_cache_test.cpp new file mode 100644 index 0000000000..e370a786a7 --- /dev/null +++ b/be/test/runtime/cache/partition_cache_test.cpp @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include "util/logging.h" +#include "util/cpu_info.h" +#include "gen_cpp/internal_service.pb.h" +#include "gen_cpp/PaloInternalService_types.h" +#include "runtime/cache/result_cache.h" +#include "runtime/buffer_control_block.h" + +namespace doris { + +class PartitionCacheTest : public testing::Test { +public: + PartitionCacheTest() { + + } + virtual ~PartitionCacheTest() { +// clear(); + } +protected: + virtual void SetUp() { + } + +private: + void init_default(){ + LOG(WARNING) << "init test default\n"; + init(16,4); + } + void init(int max_size, int ela_size); + void clear(); + PCacheStatus init_batch_data(int sql_num, int part_begin, int part_num); + ResultCache* _cache; + PUpdateCacheRequest* _update_request; + PCacheResponse* _update_response; + PFetchCacheRequest* _fetch_request; + PFetchCacheResult* _fetch_result; + PClearCacheRequest* _clear_request; + PCacheResponse* _clear_response; +}; + +void PartitionCacheTest::init(int max_size, int ela_size){ + LOG(WARNING) << "init test\n"; + _cache = new ResultCache(max_size, ela_size); + _update_request = new PUpdateCacheRequest(); + _update_response = new PCacheResponse(); + _fetch_request = new PFetchCacheRequest(); + _fetch_result = new PFetchCacheResult(); + _clear_request = new PClearCacheRequest(); + _clear_response = new PCacheResponse(); +} + +void PartitionCacheTest::clear(){ + _clear_request->set_clear_type(PClearType::CLEAR_ALL); + _cache->clear(_clear_request, _clear_response); + SAFE_DELETE(_cache); + SAFE_DELETE(_update_request); + SAFE_DELETE(_update_response); + SAFE_DELETE(_fetch_request); + SAFE_DELETE(_fetch_result); + SAFE_DELETE(_clear_request); + SAFE_DELETE(_clear_response); +} + +void set_sql_key(PUniqueId* sql_key, int64 hi, int64 lo){ + sql_key->set_hi(hi); + sql_key->set_lo(lo); +} + +PCacheStatus PartitionCacheTest::init_batch_data(int sql_num, int part_begin, int part_num) { + LOG(WARNING) << "init data, sql_num:" << sql_num << ",part_num:" << part_num; + PUpdateCacheRequest* up_req = NULL; + PCacheResponse* up_res = NULL; + PCacheStatus st = PCacheStatus::DEFAULT; + for (int i = 1; i < sql_num + 1; i++) { + LOG(WARNING) << "Sql:" << i; + up_req = new PUpdateCacheRequest(); + up_res = new PCacheResponse(); + set_sql_key(up_req->mutable_sql_key(), i, i); + //partition + for (int j = part_begin; j < part_begin + part_num; j++) { + PCacheValue* value = up_req->add_values(); + value->mutable_param()->set_partition_key(j); + value->mutable_param()->set_last_version(j); + value->mutable_param()->set_last_version_time(j); + value->set_data_size(16); + value->add_rows("0123456789abcdef"); //16 byte + } + _cache->update(up_req, up_res); + LOG(WARNING) << "finish update data"; + st = up_res->status(); + SAFE_DELETE(up_req); + SAFE_DELETE(up_res); + } + return st; +} + +TEST_F(PartitionCacheTest, update_data) { + init_default(); + PCacheStatus st = init_batch_data(1, 1, 1); + ASSERT_TRUE(st == PCacheStatus::CACHE_OK); + LOG(WARNING) << "clear cache"; + clear(); +} + +TEST_F(PartitionCacheTest, update_over_partition) { + init_default(); + PCacheStatus st = init_batch_data(1, 1, config::query_cache_max_partition_count+1); + ASSERT_TRUE(st == PCacheStatus::PARAM_ERROR); + clear(); +} + +TEST_F(PartitionCacheTest, cache_clear) { + init_default(); + init_batch_data(1, 1, 1); + _cache->clear(_clear_request, _clear_response); + ASSERT_EQ(_cache->get_cache_size(),0); + clear(); +} + +TEST_F(PartitionCacheTest, fetch_simple_data) { + init_default(); + init_batch_data(1, 1, 1); + + LOG(WARNING) << "finish init\n"; + set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); + PCacheParam* p1 = _fetch_request->add_params(); + p1->set_partition_key(1); + p1->set_last_version(1); + p1->set_last_version_time(1); + LOG(WARNING) << "begin fetch\n"; + _cache->fetch(_fetch_request, _fetch_result); + LOG(WARNING) << "finish fetch1\n"; + ASSERT_TRUE(_fetch_result->status() == PCacheStatus::CACHE_OK); + ASSERT_EQ(_fetch_result->values_size(), 1); + ASSERT_EQ(_fetch_result->values(0).rows(0), "0123456789abcdef"); + + LOG(WARNING) << "finish fetch2\n"; + clear(); + LOG(WARNING) << "finish fetch3\n"; +} + +TEST_F(PartitionCacheTest, fetch_not_sqlid) { + init_default(); + init_batch_data(1, 1, 1); + + set_sql_key(_fetch_request->mutable_sql_key(), 2, 2); + PCacheParam* p1 = _fetch_request->add_params(); + p1->set_partition_key(1); + p1->set_last_version(1); + p1->set_last_version_time(1); + _cache->fetch(_fetch_request, _fetch_result); + ASSERT_TRUE(_fetch_result->status() == PCacheStatus::NO_SQL_KEY); + + clear(); +} + +TEST_F(PartitionCacheTest, fetch_range_data) { + init_default(); + init_batch_data(1, 1, 3); + + set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); + PCacheParam* p1 = _fetch_request->add_params(); + p1->set_partition_key(2); + p1->set_last_version(2); + p1->set_last_version_time(2); + PCacheParam* p2 = _fetch_request->add_params(); + p2->set_partition_key(3); + p2->set_last_version(3); + p2->set_last_version_time(3); + _cache->fetch(_fetch_request, _fetch_result); + + ASSERT_TRUE(_fetch_result->status() == PCacheStatus::CACHE_OK); + ASSERT_EQ(_fetch_result->values_size(), 2); + + clear(); +} + +TEST_F(PartitionCacheTest, fetch_invalid_key_range) { + init_default(); + init_batch_data(1, 2, 1); + + set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); + PCacheParam* p1 = _fetch_request->add_params(); + p1->set_partition_key(1); + p1->set_last_version(1); + p1->set_last_version_time(1); + + PCacheParam* p2 = _fetch_request->add_params(); + p2->set_partition_key(2); + p2->set_last_version(2); + p2->set_last_version_time(2); + + PCacheParam* p3 = _fetch_request->add_params(); + p3->set_partition_key(3); + p3->set_last_version(3); + p3->set_last_version_time(3); + _cache->fetch(_fetch_request, _fetch_result); + ASSERT_TRUE(_fetch_result->status() == PCacheStatus::INVALID_KEY_RANGE); + ASSERT_EQ(_fetch_result->values_size(), 0); + clear(); +} + +TEST_F(PartitionCacheTest, fetch_data_overdue) { + init_default(); + init_batch_data(1, 1, 1); + + set_sql_key(_fetch_request->mutable_sql_key(), 1, 1); + PCacheParam* p1 = _fetch_request->add_params(); + p1->set_partition_key(1); + //cache version is 1, request version is 2 + p1->set_last_version(2); + p1->set_last_version_time(2); + _cache->fetch(_fetch_request, _fetch_result); + + LOG(WARNING) << "fetch_data_overdue:" << _fetch_result->status(); + + ASSERT_TRUE(_fetch_result->status() == PCacheStatus::DATA_OVERDUE); + ASSERT_EQ(_fetch_result->values_size(), 0); + + clear(); +} + +TEST_F(PartitionCacheTest, prune_data) { + init(1,1); + init_batch_data(129, 1, 1024); // 16*1024*128=2M + ASSERT_LE(_cache->get_cache_size(), 1*1024*1024); //cache_size <= 1M + clear(); +} + +} + +int main(int argc, char** argv) { + std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; + if (!doris::config::init(conffile.c_str(), false)) { + fprintf(stderr, "error read config file. \n"); + return -1; + } + doris::init_glog("be-test"); + ::testing::InitGoogleTest(&argc, argv); + doris::CpuInfo::init(); + return RUN_ALL_TESTS(); +} +/* vim: set ts=4 sw=4 sts=4 tw=100 */ diff --git a/docs/zh-CN/administrator-guide/partition_cache.md b/docs/zh-CN/administrator-guide/partition_cache.md new file mode 100644 index 0000000000..a14ac5a837 --- /dev/null +++ b/docs/zh-CN/administrator-guide/partition_cache.md @@ -0,0 +1,197 @@ +# 分区缓存 + +## 需求场景 +大部分数据分析场景是写少读多,数据写入一次,多次频繁读取,比如一张报表涉及的维度和指标,数据在凌晨一次性计算好,但每天有数百甚至数千次的页面访问,因此非常适合把结果集缓存起来。在数据分析或BI应用中,存在下面的业务场景: +* **高并发场景**,Doris可以较好的支持高并发,但单台服务器无法承载太高的QPS +* **复杂图表的看板**,复杂的Dashboard或者大屏类应用,数据来自多张表,每个页面有数十个查询,虽然每个查询只有数十毫秒,但是总体查询时间会在数秒 +* **趋势分析**,给定日期范围的查询,指标按日显示,比如查询最近7天内的用户数的趋势,这类查询数据量大,查询范围广,查询时间往往需要数十秒 +* **用户重复查询**,如果产品没有防重刷机制,用户因手误或其他原因重复刷新页面,导致提交大量的重复的SQL + +以上四种场景,在应用层的解决方案,把查询结果放到Redis中,周期性的更新缓存或者用户手工刷新缓存,但是这个方案有如下问题: +* **数据不一致**,无法感知数据的更新,导致用户经常看到旧的数据 +* **命中率低**,缓存整个查询结果,如果数据实时写入,缓存频繁失效,命中率低且系统负载较重 +* **额外成本**,引入外部缓存组件,会带来系统复杂度,增加额外成本 + +## 解决方案 +本分区缓存策略可以解决上面的问题,优先保证数据一致性,在此基础上细化缓存粒度,提升命中率,因此有如下特点: +* 用户无需担心数据一致性,通过版本来控制缓存失效,缓存的数据和从BE中查询的数据是一致的 +* 没有额外的组件和成本,缓存结果存储在BE的内存中,用户可以根据需要调整缓存内存大小 +* 实现了两种缓存策略,SQLCache和PartitionCache,后者缓存粒度更细 +* 用一致性哈希解决BE节点上下线的问题,BE中的缓存算法是改进的LRU + +## SQLCache +SQLCache按SQL的签名、查询的表的分区ID、分区最新版本来存储和获取缓存。三者组合确定一个缓存数据集,任何一个变化了,如SQL有变化,如查询字段或条件不一样,或数据更新后版本变化了,会导致命中不了缓存。 + +如果多张表Join,使用最近更新的分区ID和最新的版本号,如果其中一张表更新了,会导致分区ID或版本号不一样,也一样命中不了缓存。 + +SQLCache,更适合T+1更新的场景,凌晨数据更新,首次查询从BE中获取结果放入到缓存中,后续相同查询从缓存中获取。实时更新数据也可以使用,但是可能存在命中率低的问题,可以参考如下PartitionCache。 + +## PartitionCache + +### 设计原理 +1. SQL可以并行拆分,Q = Q1 ∪ Q2 ... ∪ Qn,R= R1 ∪ R2 ... ∪ Rn,Q为查询语句,R为结果集 +2. 拆分为只读分区和可更新分区,只读分区缓存,更新分区不缓存 + +如上,查询最近7天的每天用户数,如按日期分区,数据只写当天分区,当天之外的其他分区的数据,都是固定不变的,在相同的查询SQL下,查询某个不更新分区的指标都是固定的。如下,在2020-03-09当天查询前7天的用户数,2020-03-03至2020-03-07的数据来自缓存,2020-03-08第一次查询来自分区,后续的查询来自缓存,2020-03-09因为当天在不停写入,所以来自分区。 + +因此,查询N天的数据,数据更新最近的D天,每天只是日期范围不一样相似的查询,只需要查询D个分区即可,其他部分都来自缓存,可以有效降低集群负载,减少查询时间。 + +``` +MySQL [(none)]> SELECT eventdate,count(userid) FROM testdb.appevent WHERE eventdate>="2020-03-03" AND eventdate<="2020-03-09" GROUP BY eventdate ORDER BY eventdate; ++------------+-----------------+ +| eventdate | count(`userid`) | ++------------+-----------------+ +| 2020-03-03 | 15 | +| 2020-03-04 | 20 | +| 2020-03-05 | 25 | +| 2020-03-06 | 30 | +| 2020-03-07 | 35 | +| 2020-03-08 | 40 | //第一次来自分区,后续来自缓存 +| 2020-03-09 | 25 | //来自分区 ++------------+-----------------+ +7 rows in set (0.02 sec) +``` + +在PartitionCache中,缓存第一级Key是去掉了分区条件后的SQL的128位MD5签名,下面是改写后的待签名的SQL: +``` +SELECT eventdate,count(userid) FROM testdb.appevent GROUP BY eventdate ORDER BY eventdate; +``` +缓存的第二级Key是查询结果集的分区字段的内容,比如上面查询结果的eventdate列的内容,二级Key的附属信息是分区的版本号和版本更新时间。 + +下面演示上面SQL在2020-03-09当天第一次执行的流程: +1. 从缓存中获取数据 +``` ++------------+-----------------+ +| 2020-03-03 | 15 | +| 2020-03-04 | 20 | +| 2020-03-05 | 25 | +| 2020-03-06 | 30 | +| 2020-03-07 | 35 | ++------------+-----------------+ +``` +2. 从BE中获取数据的SQL和数据 +``` +SELECT eventdate,count(userid) FROM testdb.appevent WHERE eventdate>="2020-03-08" AND eventdate<="2020-03-09" GROUP BY eventdate ORDER BY eventdate; + ++------------+-----------------+ +| 2020-03-08 | 40 | ++------------+-----------------+ +| 2020-03-09 | 25 | ++------------+-----------------+ +``` +3. 最后发送给终端的数据 +``` ++------------+-----------------+ +| eventdate | count(`userid`) | ++------------+-----------------+ +| 2020-03-03 | 15 | +| 2020-03-04 | 20 | +| 2020-03-05 | 25 | +| 2020-03-06 | 30 | +| 2020-03-07 | 35 | +| 2020-03-08 | 40 | +| 2020-03-09 | 25 | ++------------+-----------------+ +``` +4. 发送给缓存的数据 +``` ++------------+-----------------+ +| 2020-03-08 | 40 | ++------------+-----------------+ +``` + +Partition缓存,适合按日期分区,部分分区实时更新,查询SQL较为固定。 + +分区字段也可以是其他字段,但是需要保证只有少量分区更新。 + +### 一些限制 +* 只支持OlapTable,其他存储如MySQL的表没有版本信息,无法感知数据是否更新 +* 只支持按分区字段分组,不支持按其他字段分组,按其他字段分组,该分组数据都有可能被更新,会导致缓存都失效 +* 只支持结果集的前半部分、后半部分以及全部命中缓存,不支持结果集被缓存数据分割成几个部分 + +## 使用方式 +### 开启SQLCache +确保fe.conf的cache_enable_sql_mode=true(默认是true) +``` +vim fe/conf/fe.conf +cache_enable_sql_mode=true +``` +在MySQL命令行中设置变量 +``` +MySQL [(none)]> set [global] enable_sql_cache=true; +``` +注:global是全局变量,不加指当前会话变量 + +### 开启PartitionCache +确保fe.conf的cache_enable_partition_mode=true(默认是true) +``` +vim fe/conf/fe.conf +cache_enable_partition_mode=true +``` +在MySQL命令行中设置变量 +``` +MySQL [(none)]> set [global] enable_partition_cache=true; +``` + +如果同时开启了两个缓存策略,下面的参数,需要注意一下: +``` +cache_last_version_interval_second=900 +``` +如果分区的最新版本的时间离现在的间隔,大于cache_last_version_interval_second,则会优先把整个查询结果缓存。如果小于这个间隔,如果符合PartitionCache的条件,则按PartitionCache数据。 + +### 监控 +FE的监控项: +``` +query_table //Query中有表的数量 +query_olap_table //Query中有Olap表的数量 +cache_mode_sql //识别缓存模式为sql的Query数量 +cache_hit_sql //模式为sql的Query命中Cache的数量 +query_mode_partition //识别缓存模式为Partition的Query数量 +cache_hit_partition //通过Partition命中的Query数量 +partition_all //Query中扫描的所有分区 +partition_hit //通过Cache命中的分区数量 + +Cache命中率 = (cache_hit_sql + cache_hit_partition) / query_olap_table +Partition命中率 = partition_hit / partition_all +``` + +BE的监控项: +``` +query_cache_memory_total_byte //Cache内存大小 +query_query_cache_sql_total_count //Cache的SQL的数量 +query_cache_partition_total_count //Cache分区数量 + +SQL平均数据大小 = cache_memory_total / cache_sql_total +Partition平均数据大小 = cache_memory_total / cache_partition_total +``` + +其他监控: +可以从Grafana中查看BE节点的CPU和内存指标,Query统计中的Query Percentile等指标,配合Cache参数的调整来达成业务目标。 + + +### 优化参数 +FE的配置项cache_result_max_row_count,查询结果集放入缓存的最大行数,可以根据实际情况调整,但建议不要设置过大,避免过多占用内存,超过这个大小的结果集不会被缓存。 +``` +vim fe/conf/fe.conf +cache_result_max_row_count=3000 +``` + +BE最大分区数量cache_max_partition_count,指每个SQL对应的最大分区数,如果是按日期分区,能缓存2年多的数据,假如想保留更长时间的缓存,请把这个参数设置得更大,同时修改cache_result_max_row_count的参数。 +``` +vim be/conf/be.conf +cache_max_partition_count=1024 +``` + +BE中缓存内存设置,有两个参数query_cache_max_size和query_cache_elasticity_size两部分组成(单位MB),内存超过query_cache_max_size + cache_elasticity_size会开始清理,并把内存控制到query_cache_max_size以下。可以根据BE节点数量,节点内存大小,和缓存命中率来设置这两个参数。 +``` +query_cache_max_size_mb=256 +query_cache_elasticity_size_mb=128 +``` +计算方法: + +假如缓存10K个Query,每个Query缓存1000行,每行是128个字节,分布在10台BE上,则每个BE需要128M内存(10K*1000*128/10)。 + +## 未尽事项 +* T+1的数据,是否也可以用Partition缓存? 目前不支持 +* 类似的SQL,之前查询了2个指标,现在查询3个指标,是否可以利用2个指标的缓存? 目前不支持 +* 按日期分区,但是需要按周维度汇总数据,是否可用PartitionCache? 目前不支持