1291 lines
52 KiB
C++
1291 lines
52 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
|
|
#include "runtime/buffered_block_mgr2.h"
|
|
|
|
#include <gtest/gtest.h>
|
|
#include <sys/stat.h>
|
|
|
|
#include <boost/bind.hpp>
|
|
#include <boost/date_time/posix_time/posix_time.hpp>
|
|
#include <boost/filesystem.hpp>
|
|
#include <boost/scoped_ptr.hpp>
|
|
#include <boost/thread/thread.hpp>
|
|
|
|
#include "runtime/disk_io_mgr.h"
|
|
#include "runtime/exec_env.h"
|
|
#include "runtime/runtime_state.h"
|
|
#include "runtime/test_env.h"
|
|
#include "runtime/tmp_file_mgr.h"
|
|
#include "util/cpu_info.h"
|
|
#include "util/disk_info.h"
|
|
#include "util/filesystem_util.h"
|
|
#include "util/logging.h"
|
|
#include "util/monotime.h"
|
|
|
|
using boost::filesystem::directory_iterator;
|
|
using boost::filesystem::remove;
|
|
using boost::scoped_ptr;
|
|
using boost::unordered_map;
|
|
using boost::thread;
|
|
|
|
using std::string;
|
|
using std::stringstream;
|
|
using std::vector;
|
|
|
|
// Note: This is the default scratch dir created by doris.
|
|
// config::query_scratch_dirs + TmpFileMgr::_s_tmp_sub_dir_name.
|
|
const static string SCRATCH_DIR = "/tmp/doris-scratch";
|
|
|
|
// This suffix is appended to a tmp dir
|
|
const static string SCRATCH_SUFFIX = "/doris-scratch";
|
|
|
|
// Number of millieconds to wait to ensure write completes
|
|
const static int WRITE_WAIT_MILLIS = 500;
|
|
|
|
// How often to check for write completion
|
|
const static int WRITE_CHECK_INTERVAL_MILLIS = 10;
|
|
|
|
namespace doris {
|
|
|
|
class BufferedBlockMgrTest : public ::testing::Test {
|
|
protected:
|
|
const static int _block_size = 1024;
|
|
|
|
virtual void SetUp() {
|
|
_test_env.reset(new TestEnv());
|
|
_client_tracker.reset(new MemTracker(-1));
|
|
}
|
|
|
|
virtual void TearDown() {
|
|
TearDownMgrs();
|
|
_test_env.reset();
|
|
_client_tracker.reset();
|
|
|
|
// Tests modify permissions, so make sure we can delete if they didn't clean up.
|
|
for (int i = 0; i < _created_tmp_dirs.size(); ++i) {
|
|
chmod((_created_tmp_dirs[i] + SCRATCH_SUFFIX).c_str(), S_IRWXU);
|
|
}
|
|
FileSystemUtil::remove_paths(_created_tmp_dirs);
|
|
_created_tmp_dirs.clear();
|
|
}
|
|
|
|
// Reinitialize _test_env to have multiple temporary directories.
|
|
vector<string> InitMultipleTmpDirs(int num_dirs) {
|
|
vector<string> tmp_dirs;
|
|
for (int i = 0; i < num_dirs; ++i) {
|
|
stringstream dir_str;
|
|
dir_str << "/tmp/buffered-block-mgr-test." << i;
|
|
const string& dir = dir_str.str();
|
|
// Fix permissions in case old directories were left from previous runs of test.
|
|
chmod((dir + SCRATCH_SUFFIX).c_str(), S_IRWXU);
|
|
EXPECT_TRUE(FileSystemUtil::create_directory(dir).ok());
|
|
tmp_dirs.push_back(dir);
|
|
_created_tmp_dirs.push_back(dir);
|
|
}
|
|
_test_env->init_tmp_file_mgr(tmp_dirs, false);
|
|
EXPECT_EQ(num_dirs, _test_env->tmp_file_mgr()->num_active_tmp_devices());
|
|
return tmp_dirs;
|
|
}
|
|
|
|
static void ValidateBlock(BufferedBlockMgr2::Block* block, int32_t data) {
|
|
EXPECT_TRUE(block->valid_data_len() == sizeof(int32_t));
|
|
EXPECT_TRUE(*reinterpret_cast<int32_t*>(block->buffer()) == data);
|
|
}
|
|
|
|
static int32_t* MakeRandomSizeData(BufferedBlockMgr2::Block* block) {
|
|
// Format is int32_t size, followed by size bytes of data
|
|
int32_t size = (rand() % 252) + 4; // So blocks have 4-256 bytes of data
|
|
uint8_t* data = block->allocate<uint8_t>(size);
|
|
*(reinterpret_cast<int32_t*>(data)) = size;
|
|
int i = 0;
|
|
for (i = 4; i < size - 5; ++i) {
|
|
data[i] = i;
|
|
}
|
|
for (; i < size; ++i) { // End marker of at least 5 0xff's
|
|
data[i] = 0xff;
|
|
}
|
|
return reinterpret_cast<int32_t*>(data); // Really returns a pointer to size
|
|
}
|
|
|
|
static void ValidateRandomSizeData(BufferedBlockMgr2::Block* block, int32_t size) {
|
|
int32_t bsize = *(reinterpret_cast<int32_t*>(block->buffer()));
|
|
uint8_t* data = reinterpret_cast<uint8_t*>(block->buffer());
|
|
int i = 0;
|
|
EXPECT_EQ(block->valid_data_len(), size);
|
|
EXPECT_EQ(size, bsize);
|
|
for (i = 4; i < size - 5; ++i) {
|
|
EXPECT_EQ(data[i], i);
|
|
}
|
|
for (; i < size; ++i) {
|
|
EXPECT_EQ(data[i], 0xff);
|
|
}
|
|
}
|
|
|
|
/// Helper to create a simple block manager.
|
|
BufferedBlockMgr2* CreateMgr(int64_t query_id, int max_buffers, int block_size,
|
|
RuntimeState** query_state = NULL) {
|
|
RuntimeState* state = NULL;
|
|
EXPECT_TRUE(_test_env->create_query_state(query_id, max_buffers, block_size, &state).ok());
|
|
if (query_state != NULL) {
|
|
*query_state = state;
|
|
}
|
|
return state->block_mgr2();
|
|
}
|
|
|
|
BufferedBlockMgr2* CreateMgrAndClient(int64_t query_id, int max_buffers, int block_size,
|
|
int reserved_blocks, MemTracker* tracker,
|
|
BufferedBlockMgr2::Client** client) {
|
|
RuntimeState* state = NULL;
|
|
BufferedBlockMgr2* mgr = CreateMgr(query_id, max_buffers, block_size, &state);
|
|
EXPECT_TRUE(mgr->register_client(reserved_blocks, tracker, state, client).ok());
|
|
EXPECT_TRUE(client != NULL);
|
|
return mgr;
|
|
}
|
|
|
|
void CreateMgrsAndClients(int64_t start_query_id, int num_mgrs, int buffers_per_mgr,
|
|
int block_size, int reserved_blocks_per_client, MemTracker* tracker,
|
|
vector<BufferedBlockMgr2*>* mgrs,
|
|
vector<BufferedBlockMgr2::Client*>* clients) {
|
|
for (int i = 0; i < num_mgrs; ++i) {
|
|
BufferedBlockMgr2::Client* client;
|
|
BufferedBlockMgr2* mgr =
|
|
CreateMgrAndClient(start_query_id + i, buffers_per_mgr, _block_size,
|
|
reserved_blocks_per_client, tracker, &client);
|
|
mgrs->push_back(mgr);
|
|
clients->push_back(client);
|
|
}
|
|
}
|
|
|
|
// Destroy all created query states and associated block managers.
|
|
void TearDownMgrs() {
|
|
// Freeing all block managers should clean up all consumed memory.
|
|
_test_env->tear_down_query_states();
|
|
EXPECT_EQ(_test_env->block_mgr_parent_tracker()->consumption(), 0);
|
|
}
|
|
|
|
void AllocateBlocks(BufferedBlockMgr2* block_mgr, BufferedBlockMgr2::Client* client,
|
|
int num_blocks, vector<BufferedBlockMgr2::Block*>* blocks) {
|
|
int32_t* data = NULL;
|
|
Status status;
|
|
BufferedBlockMgr2::Block* new_block;
|
|
for (int i = 0; i < num_blocks; ++i) {
|
|
status = block_mgr->get_new_block(client, NULL, &new_block);
|
|
ASSERT_TRUE(status.ok());
|
|
ASSERT_TRUE(new_block != NULL);
|
|
data = new_block->allocate<int32_t>(sizeof(int32_t));
|
|
*data = blocks->size();
|
|
blocks->push_back(new_block);
|
|
}
|
|
}
|
|
|
|
// Pin all blocks, expecting they are pinned successfully.
|
|
void PinBlocks(const vector<BufferedBlockMgr2::Block*>& blocks) {
|
|
for (int i = 0; i < blocks.size(); ++i) {
|
|
bool pinned = false;
|
|
EXPECT_TRUE(blocks[i]->pin(&pinned).ok());
|
|
EXPECT_TRUE(pinned);
|
|
}
|
|
}
|
|
|
|
// Pin all blocks, expecting no errors from unpin() calls.
|
|
void UnpinBlocks(const vector<BufferedBlockMgr2::Block*>& blocks) {
|
|
for (int i = 0; i < blocks.size(); ++i) {
|
|
EXPECT_TRUE(blocks[i]->unpin().ok());
|
|
}
|
|
}
|
|
|
|
static void WaitForWrites(BufferedBlockMgr2* block_mgr) {
|
|
vector<BufferedBlockMgr2*> block_mgrs;
|
|
block_mgrs.push_back(block_mgr);
|
|
WaitForWrites(block_mgrs);
|
|
}
|
|
|
|
// Wait for writes issued through block managers to complete.
|
|
static void WaitForWrites(const vector<BufferedBlockMgr2*>& block_mgrs) {
|
|
int max_attempts = WRITE_WAIT_MILLIS / WRITE_CHECK_INTERVAL_MILLIS;
|
|
for (int i = 0; i < max_attempts; ++i) {
|
|
SleepFor(MonoDelta::FromMilliseconds(WRITE_CHECK_INTERVAL_MILLIS));
|
|
if (AllWritesComplete(block_mgrs)) {
|
|
return;
|
|
}
|
|
}
|
|
EXPECT_TRUE(false) << "Writes did not complete after " << WRITE_WAIT_MILLIS << "ms";
|
|
}
|
|
|
|
static bool AllWritesComplete(const vector<BufferedBlockMgr2*>& block_mgrs) {
|
|
for (int i = 0; i < block_mgrs.size(); ++i) {
|
|
RuntimeProfile::Counter* writes_outstanding =
|
|
block_mgrs[i]->profile()->get_counter("BlockWritesOutstanding");
|
|
if (writes_outstanding->value() != 0) {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// Delete the temporary file backing a block - all subsequent writes to the file
|
|
// should fail. Expects backing file has already been allocated.
|
|
static void DeleteBackingFile(BufferedBlockMgr2::Block* block) {
|
|
const string& path = block->tmp_file_path();
|
|
EXPECT_GT(path.size(), 0);
|
|
EXPECT_TRUE(remove(path));
|
|
LOG(INFO) << "Injected fault by deleting file " << path;
|
|
}
|
|
|
|
// Check that the file backing the block has dir as a prefix of its path.
|
|
static bool BlockInDir(BufferedBlockMgr2::Block* block, const string& dir) {
|
|
return block->tmp_file_path().find(dir) == 0;
|
|
}
|
|
|
|
// Find a block in the list that is backed by a file with the given directory as prefix
|
|
// of its path.
|
|
static BufferedBlockMgr2::Block* FindBlockForDir(
|
|
const vector<BufferedBlockMgr2::Block*>& blocks, const string& dir) {
|
|
for (int i = 0; i < blocks.size(); ++i) {
|
|
if (BlockInDir(blocks[i], dir)) {
|
|
return blocks[i];
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
void TestGetNewBlockImpl(int block_size) {
|
|
Status status;
|
|
int max_num_blocks = 5;
|
|
BufferedBlockMgr2* block_mgr = NULL;
|
|
BufferedBlockMgr2::Client* client;
|
|
block_mgr = CreateMgrAndClient(0, max_num_blocks, block_size, 0, _client_tracker.get(),
|
|
&client);
|
|
EXPECT_EQ(_test_env->block_mgr_parent_tracker()->consumption(), 0);
|
|
|
|
// Allocate blocks until max_num_blocks, they should all succeed and memory
|
|
// usage should go up.
|
|
BufferedBlockMgr2::Block* new_block;
|
|
BufferedBlockMgr2::Block* first_block = NULL;
|
|
for (int i = 0; i < max_num_blocks; ++i) {
|
|
status = block_mgr->get_new_block(client, NULL, &new_block);
|
|
EXPECT_TRUE(new_block != NULL);
|
|
EXPECT_EQ(block_mgr->bytes_allocated(), (i + 1) * block_size);
|
|
if (first_block == NULL) {
|
|
first_block = new_block;
|
|
}
|
|
}
|
|
|
|
// Trying to allocate a new one should fail.
|
|
status = block_mgr->get_new_block(client, NULL, &new_block);
|
|
EXPECT_TRUE(new_block == NULL);
|
|
EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size);
|
|
|
|
// We can allocate a new block by transferring an already allocated one.
|
|
uint8_t* old_buffer = first_block->buffer();
|
|
status = block_mgr->get_new_block(client, first_block, &new_block);
|
|
EXPECT_TRUE(new_block != NULL);
|
|
EXPECT_TRUE(old_buffer == new_block->buffer());
|
|
EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size);
|
|
EXPECT_TRUE(!first_block->is_pinned());
|
|
|
|
// Trying to allocate a new one should still fail.
|
|
status = block_mgr->get_new_block(client, NULL, &new_block);
|
|
EXPECT_TRUE(new_block == NULL);
|
|
EXPECT_EQ(block_mgr->bytes_allocated(), max_num_blocks * block_size);
|
|
|
|
EXPECT_EQ(block_mgr->writes_issued(), 1);
|
|
TearDownMgrs();
|
|
}
|
|
|
|
void TestEvictionImpl(int block_size) {
|
|
Status status;
|
|
DCHECK_GT(block_size, 0);
|
|
int max_num_buffers = 5;
|
|
BufferedBlockMgr2* block_mgr = NULL;
|
|
BufferedBlockMgr2::Client* client = NULL;
|
|
block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size, 0, _client_tracker.get(),
|
|
&client);
|
|
|
|
// Check counters.
|
|
RuntimeProfile* profile = block_mgr->profile();
|
|
RuntimeProfile::Counter* buffered_pin = profile->get_counter("BufferedPins");
|
|
|
|
vector<BufferedBlockMgr2::Block*> blocks;
|
|
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
|
|
|
|
EXPECT_EQ(block_mgr->bytes_allocated(), max_num_buffers * block_size);
|
|
BOOST_FOREACH (BufferedBlockMgr2::Block* block, blocks) { block->unpin(); }
|
|
|
|
// Re-pinning all blocks
|
|
for (int i = 0; i < blocks.size(); ++i) {
|
|
bool pinned = false;
|
|
status = blocks[i]->pin(&pinned);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(pinned);
|
|
ValidateBlock(blocks[i], i);
|
|
}
|
|
int buffered_pins_expected = blocks.size();
|
|
EXPECT_EQ(buffered_pin->value(), buffered_pins_expected);
|
|
|
|
// Unpin all blocks
|
|
BOOST_FOREACH (BufferedBlockMgr2::Block* block, blocks) { block->unpin(); }
|
|
// Get two new blocks.
|
|
AllocateBlocks(block_mgr, client, 2, &blocks);
|
|
// At least two writes must be issued. The first (num_blocks - 2) must be in memory.
|
|
EXPECT_GE(block_mgr->writes_issued(), 2);
|
|
for (int i = 0; i < (max_num_buffers - 2); ++i) {
|
|
bool pinned = false;
|
|
status = blocks[i]->pin(&pinned);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(pinned);
|
|
ValidateBlock(blocks[i], i);
|
|
}
|
|
EXPECT_GE(buffered_pin->value(), buffered_pins_expected);
|
|
|
|
// can not pin any more
|
|
for (int i = (max_num_buffers - 2); i < max_num_buffers; ++i) {
|
|
bool pinned = true;
|
|
status = blocks[i]->pin(&pinned);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_FALSE(pinned);
|
|
}
|
|
|
|
// the last 2 block has already been pinned
|
|
for (int i = max_num_buffers; i < blocks.size(); ++i) {
|
|
bool pinned = false;
|
|
status = blocks[i]->pin(&pinned);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(pinned);
|
|
ValidateBlock(blocks[i], i);
|
|
}
|
|
|
|
TearDownMgrs();
|
|
}
|
|
|
|
// Test that randomly issues GetFreeBlock(), pin(), unpin(), del() and Close()
|
|
// calls. All calls made are legal - error conditions are not expected until the first
|
|
// call to Close(). This is called 2 times with encryption+integrity on/off.
|
|
// When executed in single-threaded mode 'tid' should be SINGLE_THREADED_TID.
|
|
static const int SINGLE_THREADED_TID = -1;
|
|
void TestRandomInternalImpl(RuntimeState* state, BufferedBlockMgr2* block_mgr, int num_buffers,
|
|
int tid) {
|
|
DCHECK(block_mgr != NULL);
|
|
const int num_iterations = 100000;
|
|
const int iters_before_close = num_iterations - 5000;
|
|
bool close_called = false;
|
|
unordered_map<BufferedBlockMgr2::Block*, int> pinned_block_map;
|
|
vector<std::pair<BufferedBlockMgr2::Block*, int32_t> > pinned_blocks;
|
|
unordered_map<BufferedBlockMgr2::Block*, int> unpinned_block_map;
|
|
vector<std::pair<BufferedBlockMgr2::Block*, int32_t> > unpinned_blocks;
|
|
|
|
typedef enum { Pin, New, Unpin, Delete, Close } ApiFunction;
|
|
ApiFunction api_function;
|
|
|
|
BufferedBlockMgr2::Client* client;
|
|
Status status = block_mgr->register_client(0, _client_tracker.get(), state, &client);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(client != NULL);
|
|
|
|
pinned_blocks.reserve(num_buffers);
|
|
BufferedBlockMgr2::Block* new_block;
|
|
for (int i = 0; i < num_iterations; ++i) {
|
|
if ((i % 20000) == 0) {
|
|
LOG(ERROR) << " Iteration " << i << std::endl;
|
|
}
|
|
if (i > iters_before_close && (rand() % 5 == 0)) {
|
|
api_function = Close;
|
|
} else if (pinned_blocks.size() == 0 && unpinned_blocks.size() == 0) {
|
|
api_function = New;
|
|
} else if (pinned_blocks.size() == 0) {
|
|
// Pin or New. Can't unpin or delete.
|
|
api_function = static_cast<ApiFunction>(rand() % 2);
|
|
} else if (pinned_blocks.size() >= num_buffers) {
|
|
// Unpin or delete. Can't pin or get new.
|
|
api_function = static_cast<ApiFunction>(2 + (rand() % 2));
|
|
} else if (unpinned_blocks.size() == 0) {
|
|
// Can't pin. Unpin, new or delete.
|
|
api_function = static_cast<ApiFunction>(1 + (rand() % 3));
|
|
} else {
|
|
// Any api function.
|
|
api_function = static_cast<ApiFunction>(rand() % 4);
|
|
}
|
|
|
|
std::pair<BufferedBlockMgr2::Block*, int32_t> block_data;
|
|
int rand_pick = 0;
|
|
int32_t* data = NULL;
|
|
bool pinned = false;
|
|
switch (api_function) {
|
|
case New:
|
|
status = block_mgr->get_new_block(client, NULL, &new_block);
|
|
if (close_called || (tid != SINGLE_THREADED_TID && status.is_cancelled())) {
|
|
EXPECT_TRUE(new_block == NULL);
|
|
EXPECT_TRUE(status.is_cancelled());
|
|
continue;
|
|
}
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(new_block != NULL);
|
|
data = MakeRandomSizeData(new_block);
|
|
block_data = std::make_pair(new_block, *data);
|
|
|
|
pinned_blocks.push_back(block_data);
|
|
pinned_block_map.insert(std::make_pair(block_data.first, pinned_blocks.size() - 1));
|
|
break;
|
|
case Pin:
|
|
rand_pick = rand() % unpinned_blocks.size();
|
|
block_data = unpinned_blocks[rand_pick];
|
|
status = block_data.first->pin(&pinned);
|
|
if (close_called || (tid != SINGLE_THREADED_TID && status.is_cancelled())) {
|
|
EXPECT_TRUE(status.is_cancelled());
|
|
// In single-threaded runs the block should not have been pinned.
|
|
// In multi-threaded runs pin() may return the block pinned but the status to
|
|
// be cancelled. In this case we could move the block from unpinned_blocks
|
|
// to pinned_blocks. We do not do that because after is_cancelled() no actual
|
|
// block operations should take place.
|
|
// reason: when block_mgr is cancelled in one thread, the same block_mgr
|
|
// is waiting for scan-range to be ready.
|
|
if (tid == SINGLE_THREADED_TID) {
|
|
EXPECT_FALSE(pinned);
|
|
}
|
|
continue;
|
|
}
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(pinned);
|
|
ValidateRandomSizeData(block_data.first, block_data.second);
|
|
unpinned_blocks[rand_pick] = unpinned_blocks.back();
|
|
unpinned_blocks.pop_back();
|
|
unpinned_block_map[unpinned_blocks[rand_pick].first] = rand_pick;
|
|
|
|
pinned_blocks.push_back(block_data);
|
|
pinned_block_map.insert(std::make_pair(block_data.first, pinned_blocks.size() - 1));
|
|
break;
|
|
case Unpin:
|
|
rand_pick = rand() % pinned_blocks.size();
|
|
block_data = pinned_blocks[rand_pick];
|
|
status = block_data.first->unpin();
|
|
if (close_called || (tid != SINGLE_THREADED_TID && status.is_cancelled())) {
|
|
EXPECT_TRUE(status.is_cancelled());
|
|
continue;
|
|
}
|
|
EXPECT_TRUE(status.ok());
|
|
pinned_blocks[rand_pick] = pinned_blocks.back();
|
|
pinned_blocks.pop_back();
|
|
pinned_block_map[pinned_blocks[rand_pick].first] = rand_pick;
|
|
|
|
unpinned_blocks.push_back(block_data);
|
|
unpinned_block_map.insert(
|
|
std::make_pair(block_data.first, unpinned_blocks.size() - 1));
|
|
break;
|
|
case Delete:
|
|
rand_pick = rand() % pinned_blocks.size();
|
|
block_data = pinned_blocks[rand_pick];
|
|
block_data.first->del();
|
|
pinned_blocks[rand_pick] = pinned_blocks.back();
|
|
pinned_blocks.pop_back();
|
|
pinned_block_map[pinned_blocks[rand_pick].first] = rand_pick;
|
|
break;
|
|
case Close:
|
|
block_mgr->cancel();
|
|
close_called = true;
|
|
break;
|
|
} // end switch (apiFunction)
|
|
} // end for ()
|
|
}
|
|
|
|
// Single-threaded execution of the TestRandomInternalImpl.
|
|
void TestRandomInternalSingle(int block_size) {
|
|
DCHECK_GT(block_size, 0);
|
|
DCHECK(_test_env.get() != NULL);
|
|
const int max_num_buffers = 100;
|
|
RuntimeState* state = NULL;
|
|
BufferedBlockMgr2* block_mgr = CreateMgr(0, max_num_buffers, block_size, &state);
|
|
TestRandomInternalImpl(state, block_mgr, max_num_buffers, SINGLE_THREADED_TID);
|
|
TearDownMgrs();
|
|
}
|
|
|
|
// Multi-threaded execution of the TestRandomInternalImpl.
|
|
void TestRandomInternalMulti(int num_threads, int block_size) {
|
|
DCHECK_GT(num_threads, 0);
|
|
DCHECK_GT(block_size, 0);
|
|
DCHECK(_test_env.get() != NULL);
|
|
const int max_num_buffers = 100;
|
|
RuntimeState* state = NULL;
|
|
BufferedBlockMgr2* block_mgr =
|
|
CreateMgr(0, num_threads * max_num_buffers, block_size, &state);
|
|
|
|
boost::thread_group workers;
|
|
for (int i = 0; i < num_threads; ++i) {
|
|
thread* t = new boost::thread(boost::bind(&BufferedBlockMgrTest::TestRandomInternalImpl,
|
|
this, state, block_mgr, max_num_buffers, i));
|
|
workers.add_thread(t);
|
|
}
|
|
workers.join_all();
|
|
TearDownMgrs();
|
|
}
|
|
|
|
// Repeatedly call BufferedBlockMgr2::Create() and BufferedBlockMgr2::~BufferedBlockMgr2().
|
|
void CreateDestroyThread(int index, RuntimeState* state) {
|
|
const int num_buffers = 10;
|
|
const int iters = 100;
|
|
for (int i = 0; i < iters; ++i) {
|
|
LOG(WARNING) << "CreateDestroyThread thread " << index << " begin " << i << std::endl;
|
|
boost::shared_ptr<BufferedBlockMgr2> mgr;
|
|
Status status = BufferedBlockMgr2::create(
|
|
state, _test_env->block_mgr_parent_tracker(), state->runtime_profile(),
|
|
_test_env->tmp_file_mgr(), _block_size * num_buffers, _block_size, &mgr);
|
|
LOG(WARNING) << "CreateDestroyThread thread " << index << " end " << i << std::endl;
|
|
}
|
|
}
|
|
|
|
// IMPALA-2286: Test for races between BufferedBlockMgr2::Create() and
|
|
// BufferedBlockMgr2::~BufferedBlockMgr2().
|
|
void CreateDestroyMulti() {
|
|
const int num_threads = 4;
|
|
boost::thread_group workers;
|
|
// Create a shared RuntimeState with no BufferedBlockMgr2.
|
|
// RuntimeState* shared_state = new RuntimeState(TExecPlanFragmentParams(), "",
|
|
// _test_env->exec_env());
|
|
RuntimeState* shared_state = new RuntimeState(TUniqueId(), TQueryOptions(), TQueryGlobals(),
|
|
_test_env->exec_env());
|
|
for (int i = 0; i < num_threads; ++i) {
|
|
thread* t = new boost::thread(
|
|
boost::bind(&BufferedBlockMgrTest::CreateDestroyThread, this, i, shared_state));
|
|
workers.add_thread(t);
|
|
}
|
|
workers.join_all();
|
|
}
|
|
|
|
scoped_ptr<TestEnv> _test_env;
|
|
// scoped_ptr<MemTracker> _client_tracker;
|
|
scoped_ptr<MemTracker> _client_tracker;
|
|
vector<string> _created_tmp_dirs;
|
|
};
|
|
|
|
TEST_F(BufferedBlockMgrTest, get_new_block) {
|
|
TestGetNewBlockImpl(1024);
|
|
TestGetNewBlockImpl(8 * 1024);
|
|
TestGetNewBlockImpl(8 * 1024 * 1024);
|
|
LOG(WARNING) << "finish test get_new_block." << std::endl;
|
|
}
|
|
|
|
TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) {
|
|
const int block_size = 1024;
|
|
int max_num_blocks = 3;
|
|
BufferedBlockMgr2* block_mgr;
|
|
BufferedBlockMgr2::Client* client;
|
|
block_mgr =
|
|
CreateMgrAndClient(0, max_num_blocks, block_size, 0, _client_tracker.get(), &client);
|
|
EXPECT_EQ(0, _test_env->block_mgr_parent_tracker()->consumption());
|
|
|
|
vector<BufferedBlockMgr2::Block*> blocks;
|
|
|
|
// Allocate a small block.
|
|
BufferedBlockMgr2::Block* new_block = NULL;
|
|
EXPECT_TRUE(block_mgr->get_new_block(client, NULL, &new_block, 128).ok());
|
|
EXPECT_TRUE(new_block != NULL);
|
|
EXPECT_EQ(block_mgr->bytes_allocated(), 0);
|
|
EXPECT_EQ(_test_env->block_mgr_parent_tracker()->consumption(), 0);
|
|
EXPECT_EQ(_client_tracker->consumption(), 128);
|
|
EXPECT_TRUE(new_block->is_pinned());
|
|
EXPECT_EQ(new_block->bytes_remaining(), 128);
|
|
EXPECT_TRUE(new_block->buffer() != NULL);
|
|
blocks.push_back(new_block);
|
|
|
|
// Allocate a normal block
|
|
EXPECT_TRUE(block_mgr->get_new_block(client, NULL, &new_block).ok());
|
|
EXPECT_TRUE(new_block != NULL);
|
|
EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size());
|
|
EXPECT_EQ(_test_env->block_mgr_parent_tracker()->consumption(), block_mgr->max_block_size());
|
|
EXPECT_EQ(_client_tracker->consumption(), 128 + block_mgr->max_block_size());
|
|
EXPECT_TRUE(new_block->is_pinned());
|
|
EXPECT_EQ(new_block->bytes_remaining(), block_mgr->max_block_size());
|
|
EXPECT_TRUE(new_block->buffer() != NULL);
|
|
blocks.push_back(new_block);
|
|
|
|
// Allocate another small block.
|
|
EXPECT_TRUE(block_mgr->get_new_block(client, NULL, &new_block, 512).ok());
|
|
EXPECT_TRUE(new_block != NULL);
|
|
EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size());
|
|
EXPECT_EQ(_test_env->block_mgr_parent_tracker()->consumption(), block_mgr->max_block_size());
|
|
EXPECT_EQ(_client_tracker->consumption(), 128 + 512 + block_mgr->max_block_size());
|
|
EXPECT_TRUE(new_block->is_pinned());
|
|
EXPECT_EQ(new_block->bytes_remaining(), 512);
|
|
EXPECT_TRUE(new_block->buffer() != NULL);
|
|
blocks.push_back(new_block);
|
|
|
|
// Should be able to unpin and pin the middle block
|
|
EXPECT_TRUE(blocks[1]->unpin().ok());
|
|
|
|
bool pinned;
|
|
EXPECT_TRUE(blocks[1]->pin(&pinned).ok());
|
|
EXPECT_TRUE(pinned);
|
|
|
|
for (int i = 0; i < blocks.size(); ++i) {
|
|
blocks[i]->del();
|
|
}
|
|
|
|
TearDownMgrs();
|
|
}
|
|
|
|
// Test that pinning more blocks than the max available buffers.
|
|
TEST_F(BufferedBlockMgrTest, Pin) {
|
|
Status status;
|
|
int max_num_blocks = 5;
|
|
const int block_size = 1024;
|
|
BufferedBlockMgr2* block_mgr;
|
|
BufferedBlockMgr2::Client* client;
|
|
block_mgr =
|
|
CreateMgrAndClient(0, max_num_blocks, block_size, 0, _client_tracker.get(), &client);
|
|
|
|
vector<BufferedBlockMgr2::Block*> blocks;
|
|
AllocateBlocks(block_mgr, client, max_num_blocks, &blocks);
|
|
|
|
// Unpin them all.
|
|
for (int i = 0; i < blocks.size(); ++i) {
|
|
status = blocks[i]->unpin();
|
|
EXPECT_TRUE(status.ok());
|
|
}
|
|
|
|
// Allocate more, this should work since we just unpinned some blocks.
|
|
AllocateBlocks(block_mgr, client, max_num_blocks, &blocks);
|
|
|
|
// Try to pin a unpinned block, this should not be possible.
|
|
bool pinned;
|
|
status = blocks[0]->pin(&pinned);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_FALSE(pinned);
|
|
|
|
// Unpin all blocks.
|
|
for (int i = 0; i < blocks.size(); ++i) {
|
|
status = blocks[i]->unpin();
|
|
EXPECT_TRUE(status.ok());
|
|
}
|
|
|
|
// Should be able to pin max_num_blocks blocks.
|
|
for (int i = 0; i < max_num_blocks; ++i) {
|
|
status = blocks[i]->pin(&pinned);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(pinned);
|
|
}
|
|
|
|
// Can't pin any more though.
|
|
status = blocks[max_num_blocks]->pin(&pinned);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_FALSE(pinned);
|
|
|
|
TearDownMgrs();
|
|
}
|
|
|
|
// Test the eviction policy of the block mgr. No writes issued until more than
|
|
// the max available buffers are allocated. Writes must be issued in LIFO order.
|
|
TEST_F(BufferedBlockMgrTest, Eviction) {
|
|
TestEvictionImpl(1024);
|
|
TestEvictionImpl(8 * 1024 * 1024);
|
|
}
|
|
|
|
// Test deletion and reuse of blocks.
|
|
TEST_F(BufferedBlockMgrTest, Deletion) {
|
|
int max_num_buffers = 5;
|
|
const int block_size = 1024;
|
|
BufferedBlockMgr2* block_mgr;
|
|
BufferedBlockMgr2::Client* client;
|
|
block_mgr =
|
|
CreateMgrAndClient(0, max_num_buffers, block_size, 0, _client_tracker.get(), &client);
|
|
|
|
// Check counters.
|
|
RuntimeProfile* profile = block_mgr->profile();
|
|
RuntimeProfile::Counter* recycled_cnt = profile->get_counter("BlocksRecycled");
|
|
RuntimeProfile::Counter* created_cnt = profile->get_counter("BlocksCreated");
|
|
|
|
vector<BufferedBlockMgr2::Block*> blocks;
|
|
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
|
|
EXPECT_TRUE(created_cnt->value() == max_num_buffers);
|
|
|
|
BOOST_FOREACH (BufferedBlockMgr2::Block* block, blocks) { block->del(); }
|
|
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
|
|
EXPECT_TRUE(created_cnt->value() == max_num_buffers);
|
|
EXPECT_TRUE(recycled_cnt->value() == max_num_buffers);
|
|
|
|
TearDownMgrs();
|
|
}
|
|
|
|
// Delete blocks of various sizes and statuses to exercise the different code paths.
|
|
// This relies on internal validation in block manager to detect many errors.
|
|
TEST_F(BufferedBlockMgrTest, DeleteSingleBlocks) {
|
|
int max_num_buffers = 16;
|
|
BufferedBlockMgr2::Client* client;
|
|
BufferedBlockMgr2* block_mgr =
|
|
CreateMgrAndClient(0, max_num_buffers, _block_size, 0, _client_tracker.get(), &client);
|
|
|
|
// Pinned I/O block.
|
|
BufferedBlockMgr2::Block* new_block;
|
|
EXPECT_TRUE(block_mgr->get_new_block(client, NULL, &new_block).ok());
|
|
EXPECT_TRUE(new_block != NULL);
|
|
EXPECT_TRUE(new_block->is_pinned());
|
|
EXPECT_TRUE(new_block->is_max_size());
|
|
new_block->del();
|
|
EXPECT_TRUE(_client_tracker->consumption() == 0);
|
|
|
|
// Pinned non-I/O block.
|
|
int small_block_size = 128;
|
|
EXPECT_TRUE(block_mgr->get_new_block(client, NULL, &new_block, small_block_size).ok());
|
|
EXPECT_TRUE(new_block != NULL);
|
|
EXPECT_TRUE(new_block->is_pinned());
|
|
EXPECT_EQ(small_block_size, _client_tracker->consumption());
|
|
new_block->del();
|
|
EXPECT_EQ(0, _client_tracker->consumption());
|
|
|
|
// Unpinned I/O block - delete after written to disk.
|
|
EXPECT_TRUE(block_mgr->get_new_block(client, NULL, &new_block).ok());
|
|
EXPECT_TRUE(new_block != NULL);
|
|
EXPECT_TRUE(new_block->is_pinned());
|
|
EXPECT_TRUE(new_block->is_max_size());
|
|
new_block->unpin();
|
|
EXPECT_FALSE(new_block->is_pinned());
|
|
WaitForWrites(block_mgr);
|
|
new_block->del();
|
|
EXPECT_TRUE(_client_tracker->consumption() == 0);
|
|
|
|
// Unpinned I/O block - delete before written to disk.
|
|
EXPECT_TRUE(block_mgr->get_new_block(client, NULL, &new_block).ok());
|
|
EXPECT_TRUE(new_block != NULL);
|
|
EXPECT_TRUE(new_block->is_pinned());
|
|
EXPECT_TRUE(new_block->is_max_size());
|
|
new_block->unpin();
|
|
EXPECT_FALSE(new_block->is_pinned());
|
|
new_block->del();
|
|
WaitForWrites(block_mgr);
|
|
EXPECT_TRUE(_client_tracker->consumption() == 0);
|
|
|
|
TearDownMgrs();
|
|
}
|
|
|
|
// Test that all APIs return cancelled after close.
|
|
TEST_F(BufferedBlockMgrTest, Close) {
|
|
int max_num_buffers = 5;
|
|
const int block_size = 1024;
|
|
BufferedBlockMgr2* block_mgr;
|
|
BufferedBlockMgr2::Client* client;
|
|
block_mgr =
|
|
CreateMgrAndClient(0, max_num_buffers, block_size, 0, _client_tracker.get(), &client);
|
|
|
|
vector<BufferedBlockMgr2::Block*> blocks;
|
|
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
|
|
|
|
block_mgr->cancel();
|
|
|
|
BufferedBlockMgr2::Block* new_block;
|
|
Status status = block_mgr->get_new_block(client, NULL, &new_block);
|
|
EXPECT_TRUE(status.is_cancelled());
|
|
EXPECT_TRUE(new_block == NULL);
|
|
status = blocks[0]->unpin();
|
|
EXPECT_TRUE(status.is_cancelled());
|
|
bool pinned;
|
|
status = blocks[0]->pin(&pinned);
|
|
EXPECT_TRUE(status.is_cancelled());
|
|
blocks[1]->del();
|
|
|
|
TearDownMgrs();
|
|
}
|
|
|
|
// Clear scratch directory. Return # of files deleted.
|
|
static int clear_scratch_dir() {
|
|
int num_files = 0;
|
|
directory_iterator dir_it(SCRATCH_DIR);
|
|
for (; dir_it != directory_iterator(); ++dir_it) {
|
|
++num_files;
|
|
remove_all(dir_it->path());
|
|
}
|
|
return num_files;
|
|
}
|
|
|
|
// Test that the block manager behaves correctly after a write error. Delete the scratch
|
|
// directory before an operation that would cause a write and test that subsequent API
|
|
// calls return 'CANCELLED' correctly.
|
|
TEST_F(BufferedBlockMgrTest, WriteError) {
|
|
Status status;
|
|
int max_num_buffers = 2;
|
|
const int block_size = 1024;
|
|
BufferedBlockMgr2* block_mgr;
|
|
BufferedBlockMgr2::Client* client;
|
|
block_mgr =
|
|
CreateMgrAndClient(0, max_num_buffers, block_size, 0, _client_tracker.get(), &client);
|
|
|
|
vector<BufferedBlockMgr2::Block*> blocks;
|
|
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
|
|
// Unpin two blocks here, to ensure that backing storage is allocated in tmp file.
|
|
for (int i = 0; i < 2; ++i) {
|
|
status = blocks[i]->unpin();
|
|
EXPECT_TRUE(status.ok());
|
|
}
|
|
WaitForWrites(block_mgr);
|
|
// Repin the blocks
|
|
for (int i = 0; i < 2; ++i) {
|
|
bool pinned;
|
|
status = blocks[i]->pin(&pinned);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(pinned);
|
|
}
|
|
// Remove the backing storage so that future writes will fail
|
|
int num_files = clear_scratch_dir();
|
|
EXPECT_TRUE(num_files > 0);
|
|
for (int i = 0; i < 2; ++i) {
|
|
status = blocks[i]->unpin();
|
|
EXPECT_TRUE(status.ok());
|
|
}
|
|
WaitForWrites(block_mgr);
|
|
// Subsequent calls should fail.
|
|
for (int i = 0; i < 2; ++i) {
|
|
blocks[i]->del();
|
|
}
|
|
BufferedBlockMgr2::Block* new_block;
|
|
status = block_mgr->get_new_block(client, NULL, &new_block);
|
|
EXPECT_TRUE(status.is_cancelled());
|
|
EXPECT_TRUE(new_block == NULL);
|
|
|
|
TearDownMgrs();
|
|
}
|
|
|
|
// Test block manager error handling when temporary file space cannot be allocated to
|
|
// back an unpinned buffer.
|
|
TEST_F(BufferedBlockMgrTest, TmpFileAllocateError) {
|
|
Status status;
|
|
int max_num_buffers = 2;
|
|
BufferedBlockMgr2::Client* client;
|
|
BufferedBlockMgr2* block_mgr =
|
|
CreateMgrAndClient(0, max_num_buffers, _block_size, 0, _client_tracker.get(), &client);
|
|
|
|
vector<BufferedBlockMgr2::Block*> blocks;
|
|
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
|
|
// Unpin a block, forcing a write.
|
|
status = blocks[0]->unpin();
|
|
EXPECT_TRUE(status.ok());
|
|
WaitForWrites(block_mgr);
|
|
// Remove temporary files - subsequent operations will fail.
|
|
int num_files = clear_scratch_dir();
|
|
EXPECT_TRUE(num_files > 0);
|
|
// Current implementation will fail here because it tries to expand the tmp file
|
|
// immediately. This behavior is not contractual but we want to know if it changes
|
|
// accidentally.
|
|
status = blocks[1]->unpin();
|
|
EXPECT_FALSE(status.ok());
|
|
|
|
TearDownMgrs();
|
|
}
|
|
|
|
// Test that the block manager is able to blacklist a temporary device correctly after a
|
|
// write error. We should not allocate more blocks on that device, but existing blocks
|
|
// on the device will remain in use.
|
|
/// Disabled because blacklisting was disabled as workaround for IMPALA-2305.
|
|
TEST_F(BufferedBlockMgrTest, DISABLED_WriteErrorBlacklist) {
|
|
// TEST_F(BufferedBlockMgrTest, WriteErrorBlacklist) {
|
|
// Set up two buffered block managers with two temporary dirs.
|
|
vector<string> tmp_dirs = InitMultipleTmpDirs(2);
|
|
// Simulate two concurrent queries.
|
|
const int NUM_BLOCK_MGRS = 2;
|
|
const int MAX_NUM_BLOCKS = 4;
|
|
int blocks_per_mgr = MAX_NUM_BLOCKS / NUM_BLOCK_MGRS;
|
|
vector<BufferedBlockMgr2*> block_mgrs;
|
|
vector<BufferedBlockMgr2::Client*> clients;
|
|
CreateMgrsAndClients(0, NUM_BLOCK_MGRS, blocks_per_mgr, _block_size, 0, _client_tracker.get(),
|
|
&block_mgrs, &clients);
|
|
|
|
// Allocate files for all 2x2 combinations by unpinning blocks.
|
|
vector<vector<BufferedBlockMgr2::Block*> > blocks;
|
|
vector<BufferedBlockMgr2::Block*> all_blocks;
|
|
for (int i = 0; i < NUM_BLOCK_MGRS; ++i) {
|
|
vector<BufferedBlockMgr2::Block*> mgr_blocks;
|
|
AllocateBlocks(block_mgrs[i], clients[i], blocks_per_mgr, &mgr_blocks);
|
|
UnpinBlocks(mgr_blocks);
|
|
for (int j = 0; j < blocks_per_mgr; ++j) {
|
|
LOG(INFO) << "Manager " << i << " Block " << j << " backed by file "
|
|
<< mgr_blocks[j]->tmp_file_path();
|
|
}
|
|
blocks.push_back(mgr_blocks);
|
|
all_blocks.insert(all_blocks.end(), mgr_blocks.begin(), mgr_blocks.end());
|
|
}
|
|
WaitForWrites(block_mgrs);
|
|
int error_mgr = 0;
|
|
int no_error_mgr = 1;
|
|
const string& error_dir = tmp_dirs[0];
|
|
const string& good_dir = tmp_dirs[1];
|
|
// Delete one file from first scratch dir for first block manager.
|
|
BufferedBlockMgr2::Block* error_block = FindBlockForDir(blocks[error_mgr], error_dir);
|
|
ASSERT_TRUE(error_block != NULL) << "Expected a tmp file in dir " << error_dir;
|
|
PinBlocks(all_blocks);
|
|
DeleteBackingFile(error_block);
|
|
UnpinBlocks(all_blocks); // Should succeed since tmp file space was already allocated.
|
|
WaitForWrites(block_mgrs);
|
|
EXPECT_TRUE(block_mgrs[error_mgr]->is_cancelled());
|
|
EXPECT_FALSE(block_mgrs[no_error_mgr]->is_cancelled());
|
|
// Temporary device with error should no longer be active.
|
|
vector<TmpFileMgr::DeviceId> active_tmp_devices =
|
|
_test_env->tmp_file_mgr()->active_tmp_devices();
|
|
EXPECT_EQ(tmp_dirs.size() - 1, active_tmp_devices.size());
|
|
for (int i = 0; i < active_tmp_devices.size(); ++i) {
|
|
const string& device_path =
|
|
_test_env->tmp_file_mgr()->get_tmp_dir_path(active_tmp_devices[i]);
|
|
EXPECT_EQ(string::npos, error_dir.find(device_path));
|
|
}
|
|
// The second block manager should continue using allocated scratch space, since it
|
|
// didn't encounter a write error itself. In future this could change but for now it is
|
|
// the intended behaviour.
|
|
PinBlocks(blocks[no_error_mgr]);
|
|
UnpinBlocks(blocks[no_error_mgr]);
|
|
EXPECT_TRUE(FindBlockForDir(blocks[no_error_mgr], good_dir) != NULL);
|
|
EXPECT_TRUE(FindBlockForDir(blocks[no_error_mgr], error_dir) != NULL);
|
|
// The second block manager should avoid using bad directory for new blocks.
|
|
vector<BufferedBlockMgr2::Block*> no_error_new_blocks;
|
|
AllocateBlocks(block_mgrs[no_error_mgr], clients[no_error_mgr], blocks_per_mgr,
|
|
&no_error_new_blocks);
|
|
UnpinBlocks(no_error_new_blocks);
|
|
for (int i = 0; i < no_error_new_blocks.size(); ++i) {
|
|
LOG(INFO) << "Newly created block backed by file "
|
|
<< no_error_new_blocks[i]->tmp_file_path();
|
|
EXPECT_TRUE(BlockInDir(no_error_new_blocks[i], good_dir));
|
|
}
|
|
// A new block manager should only use the good dir for backing storage.
|
|
BufferedBlockMgr2::Client* new_client;
|
|
BufferedBlockMgr2* new_block_mgr = CreateMgrAndClient(9999, blocks_per_mgr, _block_size, 0,
|
|
_client_tracker.get(), &new_client);
|
|
vector<BufferedBlockMgr2::Block*> new_mgr_blocks;
|
|
AllocateBlocks(new_block_mgr, new_client, blocks_per_mgr, &new_mgr_blocks);
|
|
UnpinBlocks(new_mgr_blocks);
|
|
for (int i = 0; i < blocks_per_mgr; ++i) {
|
|
LOG(INFO) << "New manager Block " << i << " backed by file "
|
|
<< new_mgr_blocks[i]->tmp_file_path();
|
|
EXPECT_TRUE(BlockInDir(new_mgr_blocks[i], good_dir));
|
|
}
|
|
}
|
|
|
|
// Check that allocation error resulting from removal of directory results in blocks
|
|
/// being allocated in other directories.
|
|
TEST_F(BufferedBlockMgrTest, AllocationErrorHandling) {
|
|
// Set up two buffered block managers with two temporary dirs.
|
|
vector<string> tmp_dirs = InitMultipleTmpDirs(2);
|
|
// Simulate two concurrent queries.
|
|
int num_block_mgrs = 2;
|
|
int max_num_blocks = 4;
|
|
int blocks_per_mgr = max_num_blocks / num_block_mgrs;
|
|
// vector<RuntimeState*> runtime_states;
|
|
vector<BufferedBlockMgr2*> block_mgrs;
|
|
vector<BufferedBlockMgr2::Client*> clients;
|
|
CreateMgrsAndClients(0, num_block_mgrs, blocks_per_mgr, _block_size, 0, _client_tracker.get(),
|
|
&block_mgrs, &clients);
|
|
|
|
// Allocate files for all 2x2 combinations by unpinning blocks.
|
|
vector<vector<BufferedBlockMgr2::Block*> > blocks;
|
|
for (int i = 0; i < num_block_mgrs; ++i) {
|
|
vector<BufferedBlockMgr2::Block*> mgr_blocks;
|
|
LOG(INFO) << "Iter " << i;
|
|
AllocateBlocks(block_mgrs[i], clients[i], blocks_per_mgr, &mgr_blocks);
|
|
blocks.push_back(mgr_blocks);
|
|
}
|
|
const string& bad_dir = tmp_dirs[0];
|
|
const string& bad_scratch_subdir = bad_dir + SCRATCH_SUFFIX;
|
|
// const string& good_dir = tmp_dirs[1];
|
|
// const string& good_scratch_subdir = good_dir + SCRATCH_SUFFIX;
|
|
chmod(bad_scratch_subdir.c_str(), 0);
|
|
// The block mgr should attempt to allocate space in bad dir for one block, which will
|
|
// cause an error when it tries to create/expand the file. It should recover and just
|
|
// use the good dir.
|
|
UnpinBlocks(blocks[0]);
|
|
// Directories remain on active list even when they experience errors.
|
|
EXPECT_EQ(2, _test_env->tmp_file_mgr()->num_active_tmp_devices());
|
|
// Blocks should not be written to bad dir even if it remains non-writable.
|
|
UnpinBlocks(blocks[1]);
|
|
// All writes should succeed.
|
|
WaitForWrites(block_mgrs);
|
|
for (int i = 0; i < blocks.size(); ++i) {
|
|
for (int j = 0; j < blocks[i].size(); ++j) {
|
|
blocks[i][j]->del();
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test that block manager fails cleanly when all directories are inaccessible at runtime.
|
|
TEST_F(BufferedBlockMgrTest, NoDirsAllocationError) {
|
|
vector<string> tmp_dirs = InitMultipleTmpDirs(2);
|
|
int max_num_buffers = 2;
|
|
BufferedBlockMgr2::Client* client;
|
|
BufferedBlockMgr2* block_mgr =
|
|
CreateMgrAndClient(0, max_num_buffers, _block_size, 0, _client_tracker.get(), &client);
|
|
vector<BufferedBlockMgr2::Block*> blocks;
|
|
AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
|
|
for (int i = 0; i < tmp_dirs.size(); ++i) {
|
|
const string& tmp_scratch_subdir = tmp_dirs[i] + SCRATCH_SUFFIX;
|
|
chmod(tmp_scratch_subdir.c_str(), 0);
|
|
}
|
|
for (int i = 0; i < blocks.size(); ++i) {
|
|
EXPECT_FALSE(blocks[i]->unpin().ok());
|
|
}
|
|
}
|
|
|
|
// Create two clients with different number of reserved buffers.
|
|
TEST_F(BufferedBlockMgrTest, MultipleClients) {
|
|
Status status;
|
|
int client1_buffers = 3;
|
|
int client2_buffers = 5;
|
|
int max_num_buffers = client1_buffers + client2_buffers;
|
|
const int block_size = 1024;
|
|
RuntimeState* runtime_state;
|
|
BufferedBlockMgr2* block_mgr = CreateMgr(0, max_num_buffers, block_size, &runtime_state);
|
|
|
|
BufferedBlockMgr2::Client* client1 = NULL;
|
|
BufferedBlockMgr2::Client* client2 = NULL;
|
|
status = block_mgr->register_client(client1_buffers, _client_tracker.get(), runtime_state,
|
|
&client1);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(client1 != NULL);
|
|
status = block_mgr->register_client(client2_buffers, _client_tracker.get(), runtime_state,
|
|
&client2);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(client2 != NULL);
|
|
|
|
// Reserve client 1's and 2's buffers. They should succeed.
|
|
bool reserved = block_mgr->try_acquire_tmp_reservation(client1, 1);
|
|
EXPECT_TRUE(reserved);
|
|
reserved = block_mgr->try_acquire_tmp_reservation(client2, 1);
|
|
EXPECT_TRUE(reserved);
|
|
|
|
vector<BufferedBlockMgr2::Block*> client1_blocks;
|
|
// Allocate all of client1's reserved blocks, they should all succeed.
|
|
AllocateBlocks(block_mgr, client1, client1_buffers, &client1_blocks);
|
|
|
|
// Try allocating one more, that should fail.
|
|
BufferedBlockMgr2::Block* block;
|
|
status = block_mgr->get_new_block(client1, NULL, &block);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(block == NULL);
|
|
|
|
// Trying to reserve should also fail.
|
|
reserved = block_mgr->try_acquire_tmp_reservation(client1, 1);
|
|
EXPECT_FALSE(reserved);
|
|
|
|
// Allocate all of client2's reserved blocks, these should succeed.
|
|
vector<BufferedBlockMgr2::Block*> client2_blocks;
|
|
AllocateBlocks(block_mgr, client2, client2_buffers, &client2_blocks);
|
|
|
|
// Try allocating one more from client 2, that should fail.
|
|
status = block_mgr->get_new_block(client2, NULL, &block);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(block == NULL);
|
|
|
|
// Unpin one block from client 1.
|
|
status = client1_blocks[0]->unpin();
|
|
EXPECT_TRUE(status.ok());
|
|
|
|
// Client 2 should still not be able to allocate.
|
|
status = block_mgr->get_new_block(client2, NULL, &block);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(block == NULL);
|
|
|
|
// Client 2 should still not be able to reserve.
|
|
reserved = block_mgr->try_acquire_tmp_reservation(client2, 1);
|
|
EXPECT_FALSE(reserved);
|
|
|
|
// Client 1 should be able to though.
|
|
status = block_mgr->get_new_block(client1, NULL, &block);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(block != NULL);
|
|
|
|
// Unpin two of client 1's blocks (client 1 should have 3 unpinned blocks now).
|
|
status = client1_blocks[1]->unpin();
|
|
EXPECT_TRUE(status.ok());
|
|
status = client1_blocks[2]->unpin();
|
|
EXPECT_TRUE(status.ok());
|
|
|
|
// Clear client 1's reservation
|
|
block_mgr->clear_reservations(client1);
|
|
|
|
// Client 2 should be able to reserve 1 buffers now (there are 2 left);
|
|
reserved = block_mgr->try_acquire_tmp_reservation(client2, 1);
|
|
EXPECT_TRUE(reserved);
|
|
|
|
// Client one can only pin 1.
|
|
bool pinned;
|
|
status = client1_blocks[0]->pin(&pinned);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(pinned);
|
|
// Can't get this one.
|
|
status = client1_blocks[1]->pin(&pinned);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_FALSE(pinned);
|
|
|
|
// Client 2 can pick up the one reserved buffer
|
|
status = block_mgr->get_new_block(client2, NULL, &block);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(block != NULL);
|
|
// But not a second
|
|
BufferedBlockMgr2::Block* block2;
|
|
status = block_mgr->get_new_block(client2, NULL, &block2);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(block2 == NULL);
|
|
|
|
// Unpin client 2's block it got from the reservation. Sine this is a tmp
|
|
// reservation, client 1 can pick it up again (it is not longer reserved).
|
|
status = block->unpin();
|
|
EXPECT_TRUE(status.ok());
|
|
status = client1_blocks[1]->pin(&pinned);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(pinned);
|
|
|
|
TearDownMgrs();
|
|
}
|
|
|
|
// Create two clients with different number of reserved buffers and some additional.
|
|
TEST_F(BufferedBlockMgrTest, MultipleClientsExtraBuffers) {
|
|
Status status;
|
|
int client1_buffers = 1;
|
|
int client2_buffers = 1;
|
|
int max_num_buffers = client1_buffers + client2_buffers + 2;
|
|
const int block_size = 1024;
|
|
RuntimeState* runtime_state;
|
|
BufferedBlockMgr2* block_mgr = CreateMgr(0, max_num_buffers, block_size, &runtime_state);
|
|
|
|
BufferedBlockMgr2::Client* client1 = NULL;
|
|
BufferedBlockMgr2::Client* client2 = NULL;
|
|
BufferedBlockMgr2::Block* block = NULL;
|
|
status = block_mgr->register_client(client1_buffers, _client_tracker.get(), runtime_state,
|
|
&client1);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(client1 != NULL);
|
|
status = block_mgr->register_client(client2_buffers, _client_tracker.get(), runtime_state,
|
|
&client2);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(client2 != NULL);
|
|
|
|
vector<BufferedBlockMgr2::Block*> client1_blocks;
|
|
// Allocate all of client1's reserved blocks, they should all succeed.
|
|
AllocateBlocks(block_mgr, client1, client1_buffers, &client1_blocks);
|
|
|
|
// Allocate all of client2's reserved blocks, these should succeed.
|
|
vector<BufferedBlockMgr2::Block*> client2_blocks;
|
|
AllocateBlocks(block_mgr, client2, client2_buffers, &client2_blocks);
|
|
|
|
// We have two spare buffers now. Each client should be able to allocate it.
|
|
status = block_mgr->get_new_block(client1, NULL, &block);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(block != NULL);
|
|
status = block_mgr->get_new_block(client2, NULL, &block);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(block != NULL);
|
|
|
|
// Now we are completely full, no one should be able to allocate a new block.
|
|
status = block_mgr->get_new_block(client1, NULL, &block);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(block == NULL);
|
|
status = block_mgr->get_new_block(client2, NULL, &block);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(block == NULL);
|
|
|
|
TearDownMgrs();
|
|
}
|
|
|
|
// Create two clients causing oversubscription.
|
|
TEST_F(BufferedBlockMgrTest, ClientOversubscription) {
|
|
Status status;
|
|
int client1_buffers = 1;
|
|
int client2_buffers = 2;
|
|
int max_num_buffers = 2;
|
|
const int block_size = 1024;
|
|
RuntimeState* runtime_state;
|
|
BufferedBlockMgr2* block_mgr = CreateMgr(0, max_num_buffers, block_size, &runtime_state);
|
|
|
|
BufferedBlockMgr2::Client* client1 = NULL;
|
|
BufferedBlockMgr2::Client* client2 = NULL;
|
|
BufferedBlockMgr2::Block* block = NULL;
|
|
status = block_mgr->register_client(client1_buffers, _client_tracker.get(), runtime_state,
|
|
&client1);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(client1 != NULL);
|
|
status = block_mgr->register_client(client2_buffers, _client_tracker.get(), runtime_state,
|
|
&client2);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(client2 != NULL);
|
|
|
|
// Client one allocates first block, should work.
|
|
status = block_mgr->get_new_block(client1, NULL, &block);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(block != NULL);
|
|
|
|
// Client two allocates first block, should work.
|
|
status = block_mgr->get_new_block(client2, NULL, &block);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(block != NULL);
|
|
|
|
// At this point we've used both buffers. Client one reserved one so subsequent
|
|
// calls should fail with no error (but returns no block).
|
|
status = block_mgr->get_new_block(client1, NULL, &block);
|
|
EXPECT_TRUE(status.ok());
|
|
EXPECT_TRUE(block == NULL);
|
|
|
|
// Allocate with client two. Since client two reserved 2 buffers, this should fail
|
|
// with MEM_LIMIT_EXCEEDED.
|
|
status = block_mgr->get_new_block(client2, NULL, &block);
|
|
EXPECT_TRUE(status.is_mem_limit_exceeded());
|
|
|
|
TearDownMgrs();
|
|
}
|
|
|
|
TEST_F(BufferedBlockMgrTest, SingleRandom_plain) {
|
|
TestRandomInternalSingle(1024);
|
|
TestRandomInternalSingle(8 * 1024);
|
|
TestRandomInternalSingle(8 * 1024 * 1024);
|
|
}
|
|
|
|
TEST_F(BufferedBlockMgrTest, Multi2Random_plain) {
|
|
TestRandomInternalMulti(2, 1024);
|
|
TestRandomInternalMulti(2, 8 * 1024);
|
|
TestRandomInternalMulti(2, 8 * 1024 * 1024);
|
|
}
|
|
|
|
TEST_F(BufferedBlockMgrTest, Multi4Random_plain) {
|
|
TestRandomInternalMulti(4, 1024);
|
|
TestRandomInternalMulti(4, 8 * 1024);
|
|
TestRandomInternalMulti(4, 8 * 1024 * 1024);
|
|
}
|
|
|
|
// TODO: Enable when we improve concurrency/scalability of block mgr.
|
|
TEST_F(BufferedBlockMgrTest, DISABLED_Multi8Random_plain) {
|
|
TestRandomInternalMulti(8, 1024);
|
|
}
|
|
|
|
TEST_F(BufferedBlockMgrTest, CreateDestroyMulti) {
|
|
CreateDestroyMulti();
|
|
}
|
|
|
|
} // end namespace doris
|
|
|
|
int main(int argc, char** argv) {
|
|
// std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
|
|
// if (!doris::config::init(conffile.c_str(), false)) {
|
|
// fprintf(stderr, "error read config file. \n");
|
|
// return -1;
|
|
// }
|
|
doris::config::query_scratch_dirs = "/tmp";
|
|
// doris::config::max_free_io_buffers = 128;
|
|
doris::config::read_size = 8388608;
|
|
doris::config::min_buffer_size = 1024;
|
|
|
|
doris::config::disable_mem_pools = false;
|
|
|
|
doris::init_glog("be-test");
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
|
|
doris::CpuInfo::init();
|
|
doris::DiskInfo::init();
|
|
|
|
return RUN_ALL_TESTS();
|
|
}
|