[refactor] remove BlockManager (#10913)

* remove BlockManager
* remove deprecated field in tablet meta
This commit is contained in:
plat1ko
2022-07-17 14:10:06 +08:00
committed by GitHub
parent 464830437a
commit 3bc6655069
52 changed files with 45 additions and 2018 deletions

View File

@ -603,7 +603,6 @@ set(DORIS_LINK_LIBS
IO
Olap
Rowset
OlapFs
Runtime
Service
Udf

View File

@ -51,7 +51,7 @@ std::string cast_to_string(T value, int scale) {
return std::to_string(static_cast<int>(value));
} else if constexpr (primitive_type == TYPE_LARGEINT) {
std::stringstream ss;
ss << value;
doris::operator<<(ss, value);
return ss.str();
} else {
return boost::lexical_cast<std::string>(value);

View File

@ -22,7 +22,6 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/olap")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/olap")
add_subdirectory(rowset)
add_subdirectory(fs)
add_library(Olap STATIC
aggregate_func.cpp

View File

@ -21,7 +21,6 @@
#include "olap/data_dir.h"
#include "util/doris_metrics.h"
#include "util/path_util.h"
#include "util/storage_backend_mgr.h"
namespace doris {
@ -29,11 +28,9 @@ extern MetricPrototype METRIC_query_scan_bytes;
extern MetricPrototype METRIC_query_scan_rows;
extern MetricPrototype METRIC_query_scan_count;
BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, const StorageParamPB& storage_param,
DataDir* data_dir)
BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
: _state(tablet_meta->tablet_state()),
_tablet_meta(tablet_meta),
_storage_param(storage_param),
_schema(tablet_meta->tablet_schema()),
_data_dir(data_dir) {
_gen_tablet_path();

View File

@ -33,8 +33,7 @@ class DataDir;
// storage engine evolves.
class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
public:
BaseTablet(TabletMetaSharedPtr tablet_meta, const StorageParamPB& storage_param,
DataDir* data_dir);
BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir);
virtual ~BaseTablet();
DataDir* data_dir() const;
@ -76,7 +75,6 @@ protected:
protected:
TabletState _state;
TabletMetaSharedPtr _tablet_meta;
StorageParamPB _storage_param;
const TabletSchema& _schema;
DataDir* _data_dir;

View File

@ -48,8 +48,6 @@
#include "service/backend_options.h"
#include "util/errno.h"
#include "util/file_utils.h"
#include "util/storage_backend.h"
#include "util/storage_backend_mgr.h"
#include "util/string_util.h"
using strings::Substitute;

View File

@ -1,31 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# where to put generated libraries
set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/olap/fs")
# where to put generated binaries
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/olap/fs")
add_library(OlapFs STATIC
block_id.cpp
block_manager_metrics.cpp
block_manager.cpp
fs_util.cpp
file_block_manager.cpp
remote_block_manager.cpp
)

View File

@ -1,42 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/fs/block_id.h"
#include <string>
#include <vector>
#include "common/logging.h"
#include "gutil/strings/join.h"
using std::string;
using std::vector;
namespace doris {
const uint64_t BlockId::kInvalidId = 0;
std::string BlockId::join_strings(const std::vector<BlockId>& blocks) {
std::vector<string> strings;
strings.reserve(blocks.size());
for (const BlockId& block : blocks) {
strings.push_back(block.to_string());
}
return ::JoinStrings(strings, ",");
}
} // namespace doris

View File

@ -1,85 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cinttypes>
#include <cstddef>
#include <cstdint>
#include <iosfwd>
#include <string>
#include <unordered_set>
#include <vector>
#include "gutil/stringprintf.h"
namespace doris {
// Block is the smallest unit of reading and writing.
// In the future, each BlockId should become a relatively simple structure,
// such as a uint64_t. But now, we don't have a mapping management structure
// from uint64_t to files, so we need to save the file name in BlockId.
class BlockId {
public:
BlockId() : _id(kInvalidId) {}
explicit BlockId(uint64_t id) : _id(id) {}
void set_id(uint64_t id) { _id = id; }
bool is_null() const { return _id == kInvalidId; }
std::string to_string() const { return StringPrintf("%016" PRIu64, _id); }
bool operator==(const BlockId& other) const { return _id == other._id; }
bool operator!=(const BlockId& other) const { return _id != other._id; }
bool operator<(const BlockId& other) const { return _id < other._id; }
// Returns the raw ID. Use with care; in most cases the BlockId should be
// treated as a completely opaque value.
uint64_t id() const { return _id; }
// Join the given block IDs with ','. Useful for debug printouts.
static std::string join_strings(const std::vector<BlockId>& blocks);
private:
static const uint64_t kInvalidId;
uint64_t _id;
};
std::ostream& operator<<(std::ostream& o, const BlockId& block_id);
struct BlockIdHash {
// size_t is same as uint64_t
size_t operator()(const BlockId& block_id) const { return block_id.id(); }
};
struct BlockIdCompare {
bool operator()(const BlockId& first, const BlockId& second) const { return first < second; }
};
struct BlockIdEqual {
bool operator()(const BlockId& first, const BlockId& second) const { return first == second; }
};
typedef std::unordered_set<BlockId, BlockIdHash, BlockIdEqual> BlockIdSet;
typedef std::vector<BlockId> BlockIdContainer;
} // end namespace doris

View File

@ -1,52 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/fs/block_manager.h"
#include <mutex>
#include <ostream>
#include "common/logging.h"
namespace doris {
namespace fs {
// Controls when to pre-flush a block. Valid values are 'finalize',
// 'close', or 'never'.
// + If 'finalize', blocks will be pre-flushed when writing is finished.
// + If 'close', blocks will be pre-flushed when their transaction is committed.
// + If 'never', blocks will never be pre-flushed but still be flushed when closed.");
//
// The default value is optimized for throughput in the case that
// there are multiple drives backing the tablet. By asynchronously
// flushing each block before issuing any fsyncs, the IO across
// disks is done in parallel.
//
// This increases throughput but can harm latency in the case that
// there are few disks and the WAL is on the same disk as the
// data blocks. The default is chosen based on the assumptions that:
// - latency is leveled across machines by Raft
// - latency-sensitive applications can devote a disk to the WAL
// - super-sensitive applications can devote an SSD to the WAL.
// - users could always change this to "never", which slows down
// throughput but may improve write latency.
//
// TODO(lingbin): move it to conf later, to allow adjust dynamically.
const std::string BlockManager::block_manager_preflush_control = "finalize";
} // namespace fs
} // namespace doris

View File

@ -1,251 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
#include "common/status.h"
#include "env/env.h"
namespace doris {
class BlockId;
class Env;
struct Slice;
namespace fs {
class BlockManager;
// The smallest unit of data that is backed by the filesystem.
//
// The block interface reflects Doris on-disk storage design principles:
// - Blocks are append only.
// - Blocks are immutable once written.
// - Blocks opened for reading are thread-safe and may be used by multiple
// concurrent readers.
// - Blocks opened for writing are not thread-safe.
class Block {
public:
virtual ~Block() {}
// Returns the identifier for this block.
// TODO: should we assign a block an identifier?
virtual const BlockId& id() const = 0;
// Currently, each block in Doris will correspond to a file, but it may not be
// in the future (that is, a block may correspond to multiple files, or multiple
// blocks correspond to a file).
// For convenience, the path interface is directly exposed. At that time, the path()
// method should be removed.
virtual const FilePathDesc& path_desc() const = 0;
};
// A block that has been opened for writing. There may only be a single
// writing thread, and data may only be appended to the block.
//
// close() is an expensive operation, as it must flush both dirty block data
// and metadata to disk. The block manager API provides two ways to improve
// close() performance:
// 1. finalize() before close(). When 'block_manager_preflush_control' is set
// to 'finalize', if there's enough work to be done between the two calls,
// there will be less outstanding I/O to wait for during close().
// 2. CloseBlocks() on a group of blocks. This ensures: 1) flushing of dirty
// blocks are grouped together if possible, resulting in less I/O.
// 2) when waiting on outstanding I/O, the waiting is done in parallel.
//
// NOTE: if a WritableBlock is not explicitly close()ed, it will be aborted
// (i.e. deleted).
class WritableBlock : public Block {
public:
enum State {
// There is no dirty data in the block.
CLEAN,
// There is some dirty data in the block.
DIRTY,
// No more data may be written to the block, but it is not yet guaranteed
// to be durably stored on disk.
FINALIZED,
// The block is closed. No more operations can be performed on it.
CLOSED
};
// Destroy the WritableBlock. If it was not explicitly closed using close(),
// this will Abort() the block.
virtual ~WritableBlock() {}
// Destroys the in-memory representation of the block and synchronizes
// dirty block data and metadata with the disk. On success, guarantees
// that the entire block is durable.
virtual Status close() = 0;
// Like close() but does not synchronize dirty data or metadata to disk.
// Meaning, after a successful Abort(), the block no longer exists.
virtual Status abort() = 0;
// Get a pointer back to this block's manager.
virtual BlockManager* block_manager() const = 0;
// Appends the chunk of data referenced by 'data' to the block.
//
// Does not guarantee durability of 'data'; close() must be called for all
// outstanding data to reach the disk.
virtual Status append(const Slice& data) = 0;
// Appends multiple chunks of data referenced by 'data' to the block.
//
// Does not guarantee durability of 'data'; close() must be called for all
// outstanding data to reach the disk.
virtual Status appendv(const Slice* data, size_t data_cnt) = 0;
// Signals that the block will no longer receive writes. Does not guarantee
// durability; close() must still be called for that.
//
// When 'block_manager_preflush_control' is set to 'finalize', it also begins an
// asynchronous flush of dirty block data to disk. If there is other work
// to be done between the final Append() and the future close(),
// finalize() will reduce the amount of time spent waiting for outstanding
// I/O to complete in close(). This is analogous to readahead or prefetching.
virtual Status finalize() = 0;
// Returns the number of bytes successfully appended via Append().
virtual size_t bytes_appended() const = 0;
virtual State state() const = 0;
};
// A block that has been opened for reading. Multiple in-memory blocks may
// be constructed for the same logical block, and the same in-memory block
// may be shared amongst threads for concurrent reading.
class ReadableBlock : public Block {
public:
virtual ~ReadableBlock() {}
// Destroys the in-memory representation of the block.
virtual Status close() = 0;
// Get a pointer back to this block's manager.
virtual BlockManager* block_manager() const = 0;
// Returns the on-disk size of a written block.
virtual Status size(uint64_t* sz) const = 0;
// Reads exactly 'result.size' bytes beginning from 'offset' in the block,
// returning an error if fewer bytes exist.
// Sets "result" to the data that was read.
// If an error was encountered, returns a non-OK status.
virtual Status read(uint64_t offset, Slice result) const = 0;
// Reads exactly the "results" aggregate bytes, based on each Slice's "size",
// beginning from 'offset' in the block, returning an error if fewer bytes exist.
// Sets each "result" to the data that was read.
// If an error was encountered, returns a non-OK status.
virtual Status readv(uint64_t offset, const Slice* res, size_t res_cnt) const = 0;
// Returns the memory usage of this object including the object itself.
// virtual size_t memory_footprint() const = 0;
};
// Provides options and hints for block placement. This is used for identifying
// the correct DataDirGroups to place blocks. In the future this may also be
// used to specify directories based on block type (e.g. to prefer bloom block
// placement into SSD-backed directories).
struct CreateBlockOptions {
CreateBlockOptions(const FilePathDesc& new_path_desc) { path_desc = new_path_desc; }
CreateBlockOptions(const std::string& path) { path_desc.filepath = path; }
// const std::string tablet_id;
FilePathDesc path_desc;
};
// Block manager creation options.
struct BlockManagerOptions {
BlockManagerOptions() = default;
// If false, metrics will not be produced.
bool enable_metric = false;
// Whether the block manager should only allow reading. Defaults to false.
bool read_only = false;
};
// Utilities for Block lifecycle management. All methods are thread-safe.
class BlockManager {
public:
// Lists the available block manager types.
static std::vector<std::string> block_manager_types() { return {"file"}; }
virtual ~BlockManager() {}
// Opens an existing on-disk representation of this block manager and
// checks it for inconsistencies. If found, and if the block manager was not
// constructed in read-only mode, an attempt will be made to repair them.
//
// If 'report' is not nullptr, it will be populated with the results of the
// check (and repair, if applicable); otherwise, the results of the check
// will be logged and the presence of fatal inconsistencies will manifest as
// a returned error.
//
// Returns an error if an on-disk representation does not exist or cannot be
// opened.
virtual Status open() = 0;
// Creates a new block using the provided options and opens it for
// writing. The block's ID will be generated.
//
// Does not guarantee the durability of the block; it must be closed to
// ensure that it reaches disk.
//
// Does not modify 'block' on error.
virtual Status create_block(const CreateBlockOptions& opts,
std::unique_ptr<WritableBlock>* block) = 0;
// Opens an existing block for reading.
//
// While it is safe to delete a block that has already been opened, it is
// not safe to do so concurrently with the OpenBlock() call itself. In some
// block manager implementations this may result in unusual behavior. For
// example, OpenBlock() may succeed but subsequent ReadableBlock operations
// may fail.
//
// Does not modify 'block' on error.
virtual Status open_block(const FilePathDesc& path_desc,
std::unique_ptr<ReadableBlock>* block) = 0;
// Retrieves the IDs of all blocks under management by this block manager.
// These include ReadableBlocks as well as WritableBlocks.
//
// Returned block IDs are not guaranteed to be in any particular order,
// nor is the order guaranteed to be deterministic. Furthermore, if
// concurrent operations are ongoing, some of the blocks themselves may not
// even exist after the call.
virtual Status get_all_block_ids(std::vector<BlockId>* block_ids) = 0;
virtual Status delete_block(const FilePathDesc& path_desc, bool is_dir = false) = 0;
virtual Status link_file(const FilePathDesc& src_path_desc,
const FilePathDesc& dest_path_desc) = 0;
static const std::string block_manager_preflush_control;
};
} // namespace fs
} // namespace doris

View File

@ -1,41 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/fs/block_manager_metrics.h"
#include "util/doris_metrics.h"
namespace doris {
namespace fs {
namespace internal {
BlockManagerMetrics::BlockManagerMetrics() {
blocks_open_reading = DorisMetrics::instance()->blocks_open_reading;
blocks_open_writing = DorisMetrics::instance()->blocks_open_writing;
total_readable_blocks = DorisMetrics::instance()->readable_blocks_total;
total_writable_blocks = DorisMetrics::instance()->writable_blocks_total;
total_blocks_created = DorisMetrics::instance()->blocks_created_total;
total_blocks_deleted = DorisMetrics::instance()->blocks_deleted_total;
total_bytes_read = DorisMetrics::instance()->bytes_read_total;
total_bytes_written = DorisMetrics::instance()->bytes_written_total;
total_disk_sync = DorisMetrics::instance()->disk_sync_total;
}
} // namespace internal
} // namespace fs
} // namespace doris

View File

@ -1,54 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "util/metrics.h"
namespace doris {
namespace fs {
namespace internal {
// TODO(lingbin): we should add a registry mechanism to Metrics, so that for
// different BlockManager we can register different metrics.
struct BlockManagerMetrics {
explicit BlockManagerMetrics();
// Number of data blocks currently open for reading
IntGauge* blocks_open_reading;
// Number of data blocks currently open for writing
IntGauge* blocks_open_writing;
// Number of data blocks opened for writing since service start
IntCounter* total_readable_blocks;
// Number of data blocks opened for reading since service start
IntCounter* total_writable_blocks;
// Number of data blocks that were created since service start
IntCounter* total_blocks_created;
// Number of data blocks that were deleted since service start
IntCounter* total_blocks_deleted;
// Number of bytes of block data written since service start
IntCounter* total_bytes_read;
// Number of bytes of block data read since service start
IntCounter* total_bytes_written;
// Number of disk synchronizations of block data since service start
IntCounter* total_disk_sync;
};
} // namespace internal
} // namespace fs
} // namespace doris

View File

@ -1,464 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/fs/file_block_manager.h"
#include <atomic>
#include <cstddef>
#include <memory>
#include <numeric>
#include <string>
#include <utility>
#include "common/config.h"
#include "common/logging.h"
#include "env/env.h"
#include "env/env_util.h"
#include "gutil/strings/substitute.h"
#include "olap/fs/block_id.h"
#include "olap/fs/block_manager_metrics.h"
#include "olap/storage_engine.h"
#include "util/doris_metrics.h"
#include "util/file_cache.h"
#include "util/metrics.h"
#include "util/path_util.h"
#include "util/slice.h"
using std::accumulate;
using std::shared_ptr;
using std::string;
using strings::Substitute;
namespace doris {
namespace fs {
namespace internal {
////////////////////////////////////////////////////////////
// FileWritableBlock
////////////////////////////////////////////////////////////
// A file-backed block that has been opened for writing.
//
// Contains a pointer to the block manager as well as file path
// so that dirty metadata can be synced via BlockManager::SyncMetadata()
// at Close() time. Embedding a file path (and not a simpler
// BlockId) consumes more memory, but the number of outstanding
// FileWritableBlock instances is expected to be low.
class FileWritableBlock : public WritableBlock {
public:
FileWritableBlock(FileBlockManager* block_manager, const FilePathDesc& path_desc,
shared_ptr<WritableFile> writer);
virtual ~FileWritableBlock();
virtual Status close() override;
virtual Status abort() override;
virtual BlockManager* block_manager() const override;
virtual const BlockId& id() const override;
virtual const FilePathDesc& path_desc() const override;
virtual Status append(const Slice& data) override;
virtual Status appendv(const Slice* data, size_t data_cnt) override;
virtual Status finalize() override;
virtual size_t bytes_appended() const override;
virtual State state() const override;
void handle_error(const Status& s) const;
// Starts an asynchronous flush of dirty block data to disk.
Status flush_data_async();
private:
DISALLOW_COPY_AND_ASSIGN(FileWritableBlock);
enum SyncMode { SYNC, NO_SYNC };
// Close the block, optionally synchronizing dirty data and metadata.
Status _close(SyncMode mode);
// Back pointer to the block manager.
//
// Should remain alive for the lifetime of this block.
FileBlockManager* _block_manager;
const BlockId _block_id;
FilePathDesc _path_desc;
// The underlying opened file backing this block.
shared_ptr<WritableFile> _writer;
State _state;
// The number of bytes successfully appended to the block.
size_t _bytes_appended;
};
FileWritableBlock::FileWritableBlock(FileBlockManager* block_manager, const FilePathDesc& path_desc,
shared_ptr<WritableFile> writer)
: _block_manager(block_manager),
_path_desc(path_desc),
_writer(writer),
_state(CLEAN),
_bytes_appended(0) {
if (_block_manager->_metrics) {
_block_manager->_metrics->blocks_open_writing->increment(1);
_block_manager->_metrics->total_writable_blocks->increment(1);
}
}
FileWritableBlock::~FileWritableBlock() {
if (_state != CLOSED) {
WARN_IF_ERROR(abort(),
strings::Substitute("Failed to close block $0", _path_desc.filepath));
}
}
Status FileWritableBlock::close() {
return _close(SYNC);
}
Status FileWritableBlock::abort() {
RETURN_IF_ERROR(_close(NO_SYNC));
return _block_manager->delete_block(_path_desc);
}
BlockManager* FileWritableBlock::block_manager() const {
return _block_manager;
}
const BlockId& FileWritableBlock::id() const {
CHECK(false) << "Not support Block.id(). (TODO)";
return _block_id;
}
const FilePathDesc& FileWritableBlock::path_desc() const {
return _path_desc;
}
Status FileWritableBlock::append(const Slice& data) {
return appendv(&data, 1);
}
Status FileWritableBlock::appendv(const Slice* data, size_t data_cnt) {
DCHECK(_state == CLEAN || _state == DIRTY)
<< "path=" << _path_desc.filepath << " invalid state=" << _state;
RETURN_IF_ERROR(_writer->appendv(data, data_cnt));
_state = DIRTY;
// Calculate the amount of data written
size_t bytes_written =
accumulate(data, data + data_cnt, static_cast<size_t>(0),
[](size_t sum, const Slice& curr) { return sum + curr.size; });
_bytes_appended += bytes_written;
return Status::OK();
}
Status FileWritableBlock::flush_data_async() {
VLOG_NOTICE << "Flushing block " << _path_desc.filepath;
RETURN_IF_ERROR(_writer->flush(WritableFile::FLUSH_ASYNC));
return Status::OK();
}
Status FileWritableBlock::finalize() {
DCHECK(_state == CLEAN || _state == DIRTY || _state == FINALIZED)
<< "path=" << _path_desc.filepath << "Invalid state: " << _state;
if (_state == FINALIZED) {
return Status::OK();
}
VLOG_NOTICE << "Finalizing block " << _path_desc.filepath;
if (_state == DIRTY && BlockManager::block_manager_preflush_control == "finalize") {
flush_data_async();
}
_state = FINALIZED;
return Status::OK();
}
size_t FileWritableBlock::bytes_appended() const {
return _bytes_appended;
}
WritableBlock::State FileWritableBlock::state() const {
return _state;
}
Status FileWritableBlock::_close(SyncMode mode) {
if (_state == CLOSED) {
return Status::OK();
}
Status sync;
if (mode == SYNC && (_state == CLEAN || _state == DIRTY || _state == FINALIZED)) {
// Safer to synchronize data first, then metadata.
VLOG_NOTICE << "Syncing block " << _path_desc.filepath;
if (_block_manager->_metrics) {
_block_manager->_metrics->total_disk_sync->increment(1);
}
sync = _writer->sync();
if (sync.ok()) {
sync = _block_manager->_sync_metadata(_path_desc.filepath);
}
WARN_IF_ERROR(sync, strings::Substitute("Failed to sync when closing block $0",
_path_desc.filepath));
}
Status close = _writer->close();
_state = CLOSED;
_writer.reset();
if (_block_manager->_metrics) {
_block_manager->_metrics->blocks_open_writing->increment(-1);
_block_manager->_metrics->total_bytes_written->increment(_bytes_appended);
_block_manager->_metrics->total_blocks_created->increment(1);
}
// Either Close() or Sync() could have run into an error.
RETURN_IF_ERROR(close);
RETURN_IF_ERROR(sync);
// Prefer the result of Close() to that of Sync().
return close.ok() ? close : sync;
}
////////////////////////////////////////////////////////////
// FileReadableBlock
////////////////////////////////////////////////////////////
// A file-backed block that has been opened for reading.
//
// There may be millions of instances of FileReadableBlock outstanding, so
// great care must be taken to reduce its size. To that end, it does _not_
// embed a FileBlockLocation, using the simpler BlockId instead.
class FileReadableBlock : public ReadableBlock {
public:
FileReadableBlock(FileBlockManager* block_manager, const FilePathDesc& path_desc,
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle);
virtual ~FileReadableBlock();
virtual Status close() override;
virtual BlockManager* block_manager() const override;
virtual const BlockId& id() const override;
virtual const FilePathDesc& path_desc() const override;
virtual Status size(uint64_t* sz) const override;
virtual Status read(uint64_t offset, Slice result) const override;
virtual Status readv(uint64_t offset, const Slice* results, size_t res_cnt) const override;
void handle_error(const Status& s) const;
private:
// Back pointer to the owning block manager.
FileBlockManager* _block_manager;
// The block's identifier.
const BlockId _block_id;
const FilePathDesc _path_desc;
// The underlying opened file backing this block.
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> _file_handle;
// the backing file of OpenedFileHandle, not owned.
RandomAccessFile* _file;
// Whether or not this block has been closed. Close() is thread-safe, so
// this must be an atomic primitive.
std::atomic_bool _closed;
DISALLOW_COPY_AND_ASSIGN(FileReadableBlock);
};
FileReadableBlock::FileReadableBlock(
FileBlockManager* block_manager, const FilePathDesc& path_desc,
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle)
: _block_manager(block_manager),
_path_desc(path_desc),
_file_handle(std::move(file_handle)),
_closed(false) {
if (_block_manager->_metrics) {
_block_manager->_metrics->blocks_open_reading->increment(1);
_block_manager->_metrics->total_readable_blocks->increment(1);
}
_file = _file_handle->file();
}
FileReadableBlock::~FileReadableBlock() {
WARN_IF_ERROR(close(), strings::Substitute("Failed to close block $0", _path_desc.filepath));
}
Status FileReadableBlock::close() {
bool expected = false;
if (_closed.compare_exchange_strong(expected, true)) {
_file_handle.reset();
if (_block_manager->_metrics) {
_block_manager->_metrics->blocks_open_reading->increment(-1);
}
}
return Status::OK();
}
BlockManager* FileReadableBlock::block_manager() const {
return _block_manager;
}
const BlockId& FileReadableBlock::id() const {
CHECK(false) << "Not support Block.id(). (TODO)";
return _block_id;
}
const FilePathDesc& FileReadableBlock::path_desc() const {
return _path_desc;
}
Status FileReadableBlock::size(uint64_t* sz) const {
DCHECK(!_closed.load());
RETURN_IF_ERROR(_file->size(sz));
return Status::OK();
}
Status FileReadableBlock::read(uint64_t offset, Slice result) const {
return readv(offset, &result, 1);
}
Status FileReadableBlock::readv(uint64_t offset, const Slice* results, size_t res_cnt) const {
DCHECK(!_closed.load());
RETURN_IF_ERROR(_file->readv_at(offset, results, res_cnt));
if (_block_manager->_metrics) {
// Calculate the read amount of data
size_t bytes_read = accumulate(results, results + res_cnt, static_cast<size_t>(0),
[&](int sum, const Slice& curr) { return sum + curr.size; });
_block_manager->_metrics->total_bytes_read->increment(bytes_read);
}
return Status::OK();
}
} // namespace internal
////////////////////////////////////////////////////////////
// FileBlockManager
////////////////////////////////////////////////////////////
FileBlockManager::FileBlockManager(Env* env, BlockManagerOptions opts)
: _env(DCHECK_NOTNULL(env)), _opts(std::move(opts)) {
if (_opts.enable_metric) {
_metrics.reset(new internal::BlockManagerMetrics());
}
#ifdef BE_TEST
_file_cache.reset(new FileCache<RandomAccessFile>("Readable_file_cache",
config::file_descriptor_cache_capacity));
#else
_file_cache.reset(new FileCache<RandomAccessFile>("Readable_file_cache",
StorageEngine::instance()->file_cache()));
#endif
}
FileBlockManager::~FileBlockManager() {}
Status FileBlockManager::open() {
// TODO(lingbin)
return Status::NotSupported("to be implemented. (TODO)");
}
Status FileBlockManager::create_block(const CreateBlockOptions& opts,
std::unique_ptr<WritableBlock>* block) {
CHECK(!_opts.read_only);
shared_ptr<WritableFile> writer;
WritableFileOptions wr_opts;
wr_opts.mode = Env::MUST_CREATE;
RETURN_IF_ERROR(env_util::open_file_for_write(wr_opts, _env, opts.path_desc.filepath, &writer));
VLOG_CRITICAL << "Creating new block at " << opts.path_desc.filepath;
block->reset(new internal::FileWritableBlock(this, opts.path_desc, writer));
return Status::OK();
}
Status FileBlockManager::open_block(const FilePathDesc& path_desc,
std::unique_ptr<ReadableBlock>* block) {
VLOG_CRITICAL << "Opening block with path at " << path_desc.filepath;
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle(
new OpenedFileHandle<RandomAccessFile>());
bool found = _file_cache->lookup(path_desc.filepath, file_handle.get());
if (!found) {
std::unique_ptr<RandomAccessFile> file;
RETURN_IF_ERROR(_env->new_random_access_file(path_desc.filepath, &file));
_file_cache->insert(path_desc.filepath, file.release(), file_handle.get());
}
block->reset(new internal::FileReadableBlock(this, path_desc, file_handle));
return Status::OK();
}
// TODO(lingbin): We should do something to ensure that deletion can only be done
// after the last reader or writer has finished
Status FileBlockManager::delete_block(const FilePathDesc& path_desc, bool is_dir) {
CHECK(!_opts.read_only);
RETURN_IF_ERROR(_env->delete_file(path_desc.filepath));
// We don't bother fsyncing the parent directory as there's nothing to be
// gained by ensuring that the deletion is made durable. Even if we did
// fsync it, we'd need to account for garbage at startup time (in the
// event that we crashed just before the fsync), and with such accounting
// fsync-as-you-delete is unnecessary.
//
// The block's directory hierarchy is left behind. We could prune it if
// it's empty, but that's racy and leaving it isn't much overhead.
return Status::OK();
}
Status FileBlockManager::link_file(const FilePathDesc& src_path_desc,
const FilePathDesc& dest_path_desc) {
if (link(src_path_desc.filepath.c_str(), dest_path_desc.filepath.c_str()) != 0) {
LOG(WARNING) << "fail to create hard link. from=" << src_path_desc.filepath << ", "
<< "to=" << dest_path_desc.filepath << ", "
<< "errno=" << Errno::no();
return Status::InternalError("link file failed");
}
return Status::OK();
}
// TODO(lingbin): only one level is enough?
Status FileBlockManager::_sync_metadata(const FilePathDesc& path_desc) {
string dir = path_util::dir_name(path_desc.filepath);
if (_metrics) {
_metrics->total_disk_sync->increment(1);
}
RETURN_IF_ERROR(_env->sync_dir(dir));
return Status::OK();
}
} // namespace fs
} // namespace doris

View File

@ -1,122 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <memory>
#include <string>
#include <unordered_set>
#include <vector>
#include "common/status.h"
#include "olap/fs/block_manager.h"
#include "util/file_cache.h"
namespace doris {
class BlockId;
class Env;
class RandomAccessFile;
namespace fs {
namespace internal {
class FileReadableBlock;
class FileWritableBlock;
struct BlockManagerMetrics;
} // namespace internal
// TODO(lingbin): When we create a batch of blocks(blocks are created one by one),
// eg, when we do a compaction, multiple files will be generated in sequence.
// For this scenario, we should have a mechanism that can give the Operating System
// more opportunities to perform IO merge.
// A file-backed block storage implementation.
//
// This is a naive block implementation which maps each block to its own
// file on disk.
//
// The block manager can take advantage of multiple filesystem paths.
//
// When creating blocks, the block manager will place blocks based on the
// provided CreateBlockOptions.
// The file-backed block manager.
class FileBlockManager : public BlockManager {
public:
// Note: all objects passed as pointers should remain alive for the lifetime
// of the block manager.
FileBlockManager(Env* env, BlockManagerOptions opts);
virtual ~FileBlockManager();
Status open() override;
Status create_block(const CreateBlockOptions& opts,
std::unique_ptr<WritableBlock>* block) override;
Status open_block(const FilePathDesc& path_desc,
std::unique_ptr<ReadableBlock>* block) override;
Status get_all_block_ids(std::vector<BlockId>* block_ids) override {
// TODO(lingbin): to be implemented after we assign each block an id
return Status::OK();
};
// Deletes an existing block, allowing its space to be reclaimed by the
// filesystem. The change is immediately made durable.
//
// Blocks may be deleted while they are open for reading or writing;
// the actual deletion will take place after the last open reader or
// writer is closed.
// is_dir: whether this path is a dir or file. if it is true, delete all files in this path
Status delete_block(const FilePathDesc& path_desc, bool is_dir = false) override;
Status link_file(const FilePathDesc& src_path_desc,
const FilePathDesc& dest_path_desc) override;
private:
friend class internal::FileReadableBlock;
friend class internal::FileWritableBlock;
// Synchronizes the metadata for a block with the given location.
Status _sync_metadata(const FilePathDesc& path_desc);
Env* env() const { return _env; }
// For manipulating files.
Env* _env;
// The options that the FileBlockManager was created with.
const BlockManagerOptions _opts;
// Tracks the block directories which are dirty from block creation. This
// lets us perform some simple coalescing when synchronizing metadata.
std::unordered_set<std::string> _dirty_dirs;
// Metric container for the block manager.
// May be null if instantiated without metrics.
std::unique_ptr<internal::BlockManagerMetrics> _metrics;
// DISALLOW_COPY_AND_ASSIGN(FileBlockManager);
// Underlying cache instance. Caches opened files.
std::unique_ptr<FileCache<RandomAccessFile>> _file_cache;
};
} // namespace fs
} // namespace doris

View File

@ -1,102 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/fs/fs_util.h"
#include "common/status.h"
#include "env/env.h"
#include "olap/fs/file_block_manager.h"
#include "olap/fs/remote_block_manager.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "util/storage_backend.h"
#include "util/storage_backend_mgr.h"
namespace doris {
namespace fs {
namespace fs_util {
BlockManager* block_manager(const FilePathDesc& path_desc) {
fs::BlockManagerOptions bm_opts;
bm_opts.read_only = false;
if (path_desc.is_remote()) {
bm_opts.read_only = true;
std::shared_ptr<StorageBackend> storage_backend =
StorageBackendMgr::instance()->get_storage_backend(path_desc.storage_name);
if (storage_backend == nullptr) {
LOG(WARNING) << "storage_backend is invalid: " << path_desc.debug_string();
return nullptr;
}
static RemoteBlockManager remote_block_mgr(Env::Default(), storage_backend, bm_opts);
return &remote_block_mgr;
} else {
static FileBlockManager block_mgr(Env::Default(), std::move(bm_opts));
return &block_mgr;
}
}
StorageMediumPB get_storage_medium_pb(TStorageMedium::type t_storage_medium) {
switch (t_storage_medium) {
case TStorageMedium::S3:
return StorageMediumPB::S3;
case TStorageMedium::SSD:
return StorageMediumPB::SSD;
case TStorageMedium::HDD:
default:
return StorageMediumPB::HDD;
}
}
TStorageMedium::type get_t_storage_medium(StorageMediumPB storage_medium) {
switch (storage_medium) {
case StorageMediumPB::S3:
return TStorageMedium::S3;
case StorageMediumPB::SSD:
return TStorageMedium::SSD;
case StorageMediumPB::HDD:
default:
return TStorageMedium::HDD;
}
}
StorageParamPB get_storage_param_pb(const TStorageParam& t_storage_param) {
StorageParamPB storage_param;
storage_param.set_storage_medium(get_storage_medium_pb(t_storage_param.storage_medium));
storage_param.set_storage_name(t_storage_param.storage_name);
switch (t_storage_param.storage_medium) {
case TStorageMedium::S3: {
S3StorageParamPB* s3_param = storage_param.mutable_s3_storage_param();
s3_param->set_s3_endpoint(t_storage_param.s3_storage_param.s3_endpoint);
s3_param->set_s3_region(t_storage_param.s3_storage_param.s3_region);
s3_param->set_s3_ak(t_storage_param.s3_storage_param.s3_ak);
s3_param->set_s3_sk(t_storage_param.s3_storage_param.s3_sk);
s3_param->set_s3_max_conn(t_storage_param.s3_storage_param.s3_max_conn);
s3_param->set_s3_request_timeout_ms(t_storage_param.s3_storage_param.s3_request_timeout_ms);
s3_param->set_s3_conn_timeout_ms(t_storage_param.s3_storage_param.s3_conn_timeout_ms);
s3_param->set_root_path(t_storage_param.s3_storage_param.root_path);
return storage_param;
}
case TStorageMedium::SSD:
case TStorageMedium::HDD:
default:
return storage_param;
}
}
} // namespace fs_util
} // namespace fs
} // namespace doris

View File

@ -1,42 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "common/status.h"
#include "gen_cpp/AgentService_types.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/olap_file.pb.h"
#include "olap/fs/block_manager.h"
namespace doris {
namespace fs {
namespace fs_util {
// Each BlockManager type may have different params, so we provide a separate
// method for each type(instead of a factory method which require same params)
BlockManager* block_manager(const FilePathDesc& path_desc);
StorageMediumPB get_storage_medium_pb(TStorageMedium::type t_storage_medium);
TStorageMedium::type get_t_storage_medium(StorageMediumPB storage_medium);
StorageParamPB get_storage_param_pb(const TStorageParam& t_storage_param);
} // namespace fs_util
} // namespace fs
} // namespace doris

View File

@ -1,345 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/fs/remote_block_manager.h"
#include <atomic>
#include <cstddef>
#include <memory>
#include <numeric>
#include <sstream>
#include <string>
#include <utility>
#include "common/config.h"
#include "common/logging.h"
#include "env/env.h"
#include "env/env_posix.h"
#include "env/env_util.h"
#include "gutil/strings/substitute.h"
#include "olap/fs/block_id.h"
#include "util/storage_backend.h"
using std::shared_ptr;
using std::string;
using strings::Substitute;
namespace doris {
namespace fs {
namespace internal {
////////////////////////////////////////////////////////////
// RemoteWritableBlock
////////////////////////////////////////////////////////////
// A remote-backed block that has been opened for writing.
//
// Contains a pointer to the block manager as well as file path
// so that dirty metadata can be synced via BlockManager::SyncMetadata()
// at Close() time. Embedding a file path (and not a simpler
// BlockId) consumes more memory, but the number of outstanding
// RemoteWritableBlock instances is expected to be low.
class RemoteWritableBlock : public WritableBlock {
public:
RemoteWritableBlock(RemoteBlockManager* block_manager, const FilePathDesc& path_desc,
shared_ptr<WritableFile> writer);
virtual ~RemoteWritableBlock();
virtual Status close() override;
virtual Status abort() override;
virtual BlockManager* block_manager() const override;
virtual const BlockId& id() const override;
virtual const FilePathDesc& path_desc() const override;
virtual Status append(const Slice& data) override;
virtual Status appendv(const Slice* data, size_t data_cnt) override;
virtual Status finalize() override;
virtual size_t bytes_appended() const override;
virtual State state() const override;
void handle_error(const Status& s) const;
// Starts an asynchronous flush of dirty block data to disk.
Status flush_data_async();
private:
DISALLOW_COPY_AND_ASSIGN(RemoteWritableBlock);
enum SyncMode { SYNC, NO_SYNC };
// Close the block, optionally synchronizing dirty data and metadata.
Status _close(SyncMode mode);
// Back pointer to the block manager.
//
// Should remain alive for the lifetime of this block.
RemoteBlockManager* _block_manager;
const BlockId _block_id;
FilePathDesc _path_desc;
// The underlying opened file backing this block.
shared_ptr<WritableFile> _local_writer;
State _state;
// The number of bytes successfully appended to the block.
size_t _bytes_appended;
};
RemoteWritableBlock::RemoteWritableBlock(RemoteBlockManager* block_manager,
const FilePathDesc& path_desc,
shared_ptr<WritableFile> local_writer)
: _block_manager(block_manager),
_path_desc(path_desc),
_local_writer(std::move(local_writer)) {}
RemoteWritableBlock::~RemoteWritableBlock() {}
Status RemoteWritableBlock::close() {
return Status::IOError("invalid function");
}
Status RemoteWritableBlock::abort() {
return Status::IOError("invalid function");
}
BlockManager* RemoteWritableBlock::block_manager() const {
return _block_manager;
}
const BlockId& RemoteWritableBlock::id() const {
CHECK(false) << "Not support Block.id(). (TODO)";
return _block_id;
}
const FilePathDesc& RemoteWritableBlock::path_desc() const {
return _path_desc;
}
Status RemoteWritableBlock::append(const Slice& data) {
return appendv(&data, 1);
}
Status RemoteWritableBlock::appendv(const Slice* data, size_t data_cnt) {
return Status::IOError("invalid function");
}
Status RemoteWritableBlock::flush_data_async() {
return Status::IOError("invalid function");
}
Status RemoteWritableBlock::finalize() {
return Status::IOError("invalid function");
}
size_t RemoteWritableBlock::bytes_appended() const {
return _bytes_appended;
}
WritableBlock::State RemoteWritableBlock::state() const {
return _state;
}
Status RemoteWritableBlock::_close(SyncMode mode) {
return Status::IOError("invalid function");
}
////////////////////////////////////////////////////////////
// RemoteReadableBlock
////////////////////////////////////////////////////////////
// A file-backed block that has been opened for reading.
//
// There may be millions of instances of RemoteReadableBlock outstanding, so
// great care must be taken to reduce its size. To that end, it does _not_
// embed a FileBlockLocation, using the simpler BlockId instead.
class RemoteReadableBlock : public ReadableBlock {
public:
RemoteReadableBlock(RemoteBlockManager* block_manager, const FilePathDesc& path_desc,
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle);
virtual ~RemoteReadableBlock();
virtual Status close() override;
virtual BlockManager* block_manager() const override;
virtual const BlockId& id() const override;
virtual const FilePathDesc& path_desc() const override;
virtual Status size(uint64_t* sz) const override;
virtual Status read(uint64_t offset, Slice result) const override;
virtual Status readv(uint64_t offset, const Slice* results, size_t res_cnt) const override;
void handle_error(const Status& s) const;
private:
// Back pointer to the owning block manager.
RemoteBlockManager* _block_manager;
// The block's identifier.
const BlockId _block_id;
const FilePathDesc _path_desc;
// The underlying opened file backing this block.
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> _file_handle;
// the backing file of OpenedFileHandle, not owned.
RandomAccessFile* _file = nullptr;
// Whether or not this block has been closed. Close() is thread-safe, so
// this must be an atomic primitive.
std::atomic_bool _closed;
DISALLOW_COPY_AND_ASSIGN(RemoteReadableBlock);
};
RemoteReadableBlock::RemoteReadableBlock(
RemoteBlockManager* block_manager, const FilePathDesc& path_desc,
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle) {}
RemoteReadableBlock::~RemoteReadableBlock() {}
Status RemoteReadableBlock::close() {
return Status::IOError("invalid function");
}
BlockManager* RemoteReadableBlock::block_manager() const {
return _block_manager;
}
const BlockId& RemoteReadableBlock::id() const {
CHECK(false) << "Not support Block.id(). (TODO)";
return _block_id;
}
const FilePathDesc& RemoteReadableBlock::path_desc() const {
return _path_desc;
}
Status RemoteReadableBlock::size(uint64_t* sz) const {
return Status::IOError("invalid function");
}
Status RemoteReadableBlock::read(uint64_t offset, Slice result) const {
return readv(offset, &result, 1);
}
Status RemoteReadableBlock::readv(uint64_t offset, const Slice* results, size_t res_cnt) const {
return Status::IOError("invalid function");
}
} // namespace internal
////////////////////////////////////////////////////////////
// RemoteBlockManager
////////////////////////////////////////////////////////////
RemoteBlockManager::RemoteBlockManager(Env* local_env,
std::shared_ptr<StorageBackend> storage_backend,
const BlockManagerOptions& opts)
: _local_env(local_env), _storage_backend(storage_backend), _opts(opts) {}
RemoteBlockManager::~RemoteBlockManager() {}
Status RemoteBlockManager::open() {
return Status::NotSupported("to be implemented. (TODO)");
}
Status RemoteBlockManager::create_block(const CreateBlockOptions& opts,
std::unique_ptr<WritableBlock>* block) {
if (_opts.read_only) {
return Status::NotSupported("create_block failed. remote block is readonly: {}",
opts.path_desc.debug_string());
}
shared_ptr<WritableFile> local_writer;
WritableFileOptions wr_opts;
wr_opts.mode = Env::MUST_CREATE;
RETURN_IF_ERROR(env_util::open_file_for_write(wr_opts, Env::Default(), opts.path_desc.filepath,
&local_writer));
VLOG_CRITICAL << "Creating new remote block. local: " << opts.path_desc.filepath
<< ", remote: " << opts.path_desc.remote_path;
block->reset(new internal::RemoteWritableBlock(this, opts.path_desc, local_writer));
return Status::OK();
}
Status RemoteBlockManager::open_block(const FilePathDesc& path_desc,
std::unique_ptr<ReadableBlock>* block) {
VLOG_CRITICAL << "Opening remote block. local: " << path_desc.filepath
<< ", remote: " << path_desc.remote_path;
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle;
if (Env::Default()->path_exists(path_desc.filepath).ok()) {
file_handle.reset(new OpenedFileHandle<RandomAccessFile>());
bool found = _file_cache->lookup(path_desc.filepath, file_handle.get());
if (!found) {
std::unique_ptr<RandomAccessFile> file;
RETURN_IF_ERROR(Env::Default()->new_random_access_file(path_desc.filepath, &file));
_file_cache->insert(path_desc.filepath, file.release(), file_handle.get());
}
}
block->reset(new internal::RemoteReadableBlock(this, path_desc, file_handle));
return Status::OK();
}
Status RemoteBlockManager::delete_block(const FilePathDesc& path_desc, bool is_dir) {
if (is_dir) {
if (_local_env->path_exists(path_desc.filepath).ok()) {
RETURN_IF_ERROR(_local_env->delete_dir(path_desc.filepath));
}
if (!path_desc.remote_path.empty()) {
RETURN_IF_ERROR(_storage_backend->rmdir(path_desc.remote_path));
}
} else {
if (_local_env->path_exists(path_desc.filepath).ok()) {
RETURN_IF_ERROR(_local_env->delete_file(path_desc.filepath));
}
if (_storage_backend->exist(path_desc.remote_path).ok()) {
RETURN_IF_ERROR(_storage_backend->rm(path_desc.remote_path));
}
}
return Status::OK();
}
Status RemoteBlockManager::link_file(const FilePathDesc& src_path_desc,
const FilePathDesc& dest_path_desc) {
if (_local_env->path_exists(src_path_desc.filepath).ok()) {
RETURN_IF_ERROR(_local_env->link_file(src_path_desc.filepath, dest_path_desc.filepath));
}
if (_storage_backend->exist(src_path_desc.remote_path).ok()) {
RETURN_IF_ERROR(
_storage_backend->copy(src_path_desc.remote_path, dest_path_desc.remote_path));
}
return Status::OK();
}
} // namespace fs
} // namespace doris

View File

@ -1,73 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <memory>
#include <string>
#include <unordered_set>
#include <vector>
#include "common/status.h"
#include "olap/fs/block_manager.h"
#include "util/file_cache.h"
namespace doris {
class BlockId;
class Env;
class StorageBackend;
namespace fs {
// The remote-backed block manager.
class RemoteBlockManager : public BlockManager {
public:
// Note: all objects passed as pointers should remain alive for the lifetime
// of the block manager.
RemoteBlockManager(Env* local_env, std::shared_ptr<StorageBackend> storage_backend,
const BlockManagerOptions& opts);
virtual ~RemoteBlockManager();
Status open() override;
Status create_block(const CreateBlockOptions& opts,
std::unique_ptr<WritableBlock>* block) override;
Status open_block(const FilePathDesc& path_desc,
std::unique_ptr<ReadableBlock>* block) override;
Status get_all_block_ids(std::vector<BlockId>* block_ids) override {
// TODO(lingbin): to be implemented after we assign each block an id
return Status::OK();
};
Status delete_block(const FilePathDesc& path_desc, bool is_dir = false) override;
Status link_file(const FilePathDesc& src_path_desc,
const FilePathDesc& dest_path_desc) override;
private:
Env* _local_env;
std::shared_ptr<StorageBackend> _storage_backend;
const BlockManagerOptions _opts;
// Underlying cache instance. Caches opened files.
std::unique_ptr<FileCache<RandomAccessFile>> _file_cache;
};
} // namespace fs
} // namespace doris

View File

@ -24,7 +24,6 @@
#include "env/env.h"
#include "gutil/strings/substitute.h"
#include "io/fs/file_writer.h"
#include "olap/fs/fs_util.h"
#include "olap/memtable.h"
#include "olap/olap_define.h"
#include "olap/row.h" // ContiguousRow

View File

@ -26,7 +26,6 @@
#include "io/fs/file_system.h"
#include "io/fs/file_system_map.h"
#include "olap/column_block.h"
#include "olap/fs/fs_util.h"
#include "olap/rowset/segment_v2/common.h"
#include "olap/rowset/segment_v2/index_page.h"
#include "olap/rowset/segment_v2/page_handle.h"

View File

@ -22,7 +22,6 @@
#include "io/fs/file_system_map.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/fs/fs_util.h"
#include "olap/key_coder.h"
#include "olap/rowset/segment_v2/page_handle.h"
#include "olap/rowset/segment_v2/page_io.h"

View File

@ -20,10 +20,7 @@
#include <memory>
#include <utility>
#include "common/logging.h" // LOG
#include "gutil/strings/substitute.h"
#include "io/fs/file_reader.h"
#include "olap/fs/fs_util.h"
#include "common/logging.h" // LOG
#include "olap/rowset/segment_v2/column_reader.h" // ColumnReader
#include "olap/rowset/segment_v2/empty_segment_iterator.h"
#include "olap/rowset/segment_v2/page_io.h"
@ -37,8 +34,6 @@
namespace doris {
namespace segment_v2 {
using strings::Substitute;
Status Segment::open(io::FileSystem* fs, const std::string& path, uint32_t segment_id,
const TabletSchema* tablet_schema, std::shared_ptr<Segment>* output) {
std::shared_ptr<Segment> segment(new Segment(segment_id, tablet_schema));

View File

@ -24,7 +24,6 @@
#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "olap/column_predicate.h"
#include "olap/fs/fs_util.h"
#include "olap/olap_common.h"
#include "olap/row_block2.h"
#include "olap/row_cursor.h"

View File

@ -41,7 +41,6 @@
#include "olap/base_compaction.h"
#include "olap/cumulative_compaction.h"
#include "olap/data_dir.h"
#include "olap/fs/file_block_manager.h"
#include "olap/lru_cache.h"
#include "olap/memtable_flush_executor.h"
#include "olap/push_handler.h"
@ -58,8 +57,6 @@
#include "util/file_utils.h"
#include "util/pretty_printer.h"
#include "util/scoped_cleanup.h"
#include "util/storage_backend.h"
#include "util/storage_backend_mgr.h"
#include "util/time.h"
#include "util/trace.h"

View File

@ -36,7 +36,6 @@
#include "gen_cpp/MasterService_types.h"
#include "gutil/ref_counted.h"
#include "olap/compaction_permit_limiter.h"
#include "olap/fs/fs_util.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/olap_meta.h"

View File

@ -70,14 +70,13 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_bytes, MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_count, MetricUnit::OPERATIONS);
TabletSharedPtr Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
const StorageParamPB& storage_param,
DataDir* data_dir) {
return std::make_shared<Tablet>(tablet_meta, storage_param, data_dir);
return std::make_shared<Tablet>(tablet_meta, data_dir);
}
Tablet::Tablet(TabletMetaSharedPtr tablet_meta, const StorageParamPB& storage_param,
DataDir* data_dir, const std::string& cumulative_compaction_type)
: BaseTablet(tablet_meta, storage_param, data_dir),
Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
const std::string& cumulative_compaction_type)
: BaseTablet(tablet_meta, data_dir),
_is_bad(false),
_last_cumu_compaction_failure_millis(0),
_last_base_compaction_failure_millis(0),

View File

@ -55,10 +55,9 @@ using TabletSharedPtr = std::shared_ptr<Tablet>;
class Tablet : public BaseTablet {
public:
static TabletSharedPtr create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
const StorageParamPB& storage_param,
DataDir* data_dir = nullptr);
Tablet(TabletMetaSharedPtr tablet_meta, const StorageParamPB& storage_param, DataDir* data_dir,
Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
const std::string& cumulative_compaction_type = "");
Status init();

View File

@ -57,7 +57,6 @@
#include "util/path_util.h"
#include "util/pretty_printer.h"
#include "util/scoped_cleanup.h"
#include "util/storage_backend_mgr.h"
#include "util/time.h"
#include "util/trace.h"
@ -420,16 +419,7 @@ TabletSharedPtr TabletManager::_create_tablet_meta_and_dir_unlocked(
}
}
StorageParamPB storage_param;
Status status =
_get_storage_param(data_dir, tablet_meta->remote_storage_name(), &storage_param);
if (!status.ok()) {
LOG(WARNING) << "fail to _get_storage_param. storage_name: "
<< tablet_meta->remote_storage_name();
return nullptr;
}
TabletSharedPtr new_tablet =
Tablet::create_tablet_from_meta(tablet_meta, storage_param, data_dir);
TabletSharedPtr new_tablet = Tablet::create_tablet_from_meta(tablet_meta, data_dir);
DCHECK(new_tablet != nullptr);
return new_tablet;
}
@ -739,12 +729,7 @@ Status TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tablet_
tablet_meta->set_tablet_state(TABLET_RUNNING);
}
StorageParamPB storage_param;
RETURN_NOT_OK_LOG(
_get_storage_param(data_dir, tablet_meta->remote_storage_name(), &storage_param),
"fail to _get_storage_param. storage_name: " + tablet_meta->remote_storage_name());
TabletSharedPtr tablet = Tablet::create_tablet_from_meta(tablet_meta, storage_param, data_dir);
TabletSharedPtr tablet = Tablet::create_tablet_from_meta(tablet_meta, data_dir);
if (tablet == nullptr) {
LOG(WARNING) << "fail to load tablet. tablet_id=" << tablet_id
<< ", schema_hash:" << schema_hash;
@ -1265,13 +1250,6 @@ void TabletManager::get_tablets_distribution_on_different_disks(
}
}
Status TabletManager::_get_storage_param(DataDir* data_dir, const std::string& storage_name,
StorageParamPB* storage_param) {
storage_param->set_storage_medium(
fs::fs_util::get_storage_medium_pb(data_dir->storage_medium()));
return Status::OK();
}
struct SortCtx {
SortCtx(TabletSharedPtr tablet, int64_t cooldown_timestamp, int64_t file_size)
: tablet(tablet), cooldown_timestamp(cooldown_timestamp), file_size(file_size) {}

View File

@ -181,9 +181,6 @@ private:
std::shared_mutex& _get_tablets_shard_lock(TTabletId tabletId);
Status _get_storage_param(DataDir* data_dir, const std::string& storage_name,
StorageParamPB* storage_param);
private:
DISALLOW_COPY_AND_ASSIGN(TabletManager);

View File

@ -44,8 +44,7 @@ Status TabletMeta::create(const TCreateTabletReq& request, const TabletUid& tabl
request.tablet_schema.schema_hash, shard_id, request.tablet_schema, next_unique_id,
col_ordinal_to_unique_id, tablet_uid,
request.__isset.tablet_type ? request.tablet_type : TTabletType::TABLET_TYPE_DISK,
request.storage_medium, request.storage_param.storage_name, request.compression_type,
request.storage_policy,
request.compression_type, request.storage_policy,
request.__isset.enable_unique_key_merge_on_write
? request.enable_unique_key_merge_on_write
: false));
@ -60,7 +59,6 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
const TTabletSchema& tablet_schema, uint32_t next_unique_id,
const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id,
TabletUid tablet_uid, TTabletType::type tabletType,
TStorageMedium::type t_storage_medium, const std::string& storage_name,
TCompressionType::type compression_type, const std::string& storage_policy,
bool enable_unique_key_merge_on_write)
: _tablet_uid(0, 0), _schema(new TabletSchema), _delete_bitmap(new DeleteBitmap()) {
@ -79,8 +77,6 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
tablet_meta_pb.set_tablet_type(tabletType == TTabletType::TABLET_TYPE_DISK
? TabletTypePB::TABLET_TYPE_DISK
: TabletTypePB::TABLET_TYPE_MEMORY);
tablet_meta_pb.set_storage_medium(fs::fs_util::get_storage_medium_pb(t_storage_medium));
tablet_meta_pb.set_remote_storage_name(storage_name);
tablet_meta_pb.set_enable_unique_key_merge_on_write(enable_unique_key_merge_on_write);
tablet_meta_pb.set_storage_policy(storage_policy);
TabletSchemaPB* schema = tablet_meta_pb.mutable_schema();
@ -207,8 +203,6 @@ TabletMeta::TabletMeta(const TabletMeta& b)
_del_predicates(b._del_predicates),
_in_restore_mode(b._in_restore_mode),
_preferred_rowset_type(b._preferred_rowset_type),
_remote_storage_name(b._remote_storage_name),
_storage_medium(b._storage_medium),
_cooldown_resource(b._cooldown_resource),
_delete_bitmap(new DeleteBitmap(*b._delete_bitmap)) {};
@ -465,8 +459,6 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) {
_preferred_rowset_type = tablet_meta_pb.preferred_rowset_type();
}
_remote_storage_name = tablet_meta_pb.remote_storage_name();
_storage_medium = tablet_meta_pb.storage_medium();
_cooldown_resource = tablet_meta_pb.storage_policy();
if (tablet_meta_pb.has_enable_unique_key_merge_on_write()) {
_enable_unique_key_merge_on_write = tablet_meta_pb.enable_unique_key_merge_on_write();
@ -534,8 +526,6 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
tablet_meta_pb->set_preferred_rowset_type(_preferred_rowset_type);
}
tablet_meta_pb->set_remote_storage_name(_remote_storage_name);
tablet_meta_pb->set_storage_medium(_storage_medium);
tablet_meta_pb->set_storage_policy(_cooldown_resource);
tablet_meta_pb->set_enable_unique_key_merge_on_write(_enable_unique_key_merge_on_write);
@ -761,8 +751,6 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) {
}
if (a._in_restore_mode != b._in_restore_mode) return false;
if (a._preferred_rowset_type != b._preferred_rowset_type) return false;
if (a._storage_medium != b._storage_medium) return false;
if (a._remote_storage_name != b._remote_storage_name) return false;
if (a._cooldown_resource != b._cooldown_resource) return false;
return true;
}

View File

@ -87,7 +87,6 @@ public:
uint32_t next_unique_id,
const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id,
TabletUid tablet_uid, TTabletType::type tabletType,
TStorageMedium::type t_storage_medium, const std::string& remote_storage_name,
TCompressionType::type compression_type,
const std::string& storage_policy = std::string(),
bool enable_unique_key_merge_on_write = false);
@ -186,10 +185,6 @@ public:
bool all_beta() const;
std::string remote_storage_name() const { return _remote_storage_name; }
StorageMediumPB storage_medium() const { return _storage_medium; }
const io::ResourceId& cooldown_resource() const {
std::shared_lock<std::shared_mutex> rlock(_meta_lock);
return _cooldown_resource;
@ -241,8 +236,6 @@ private:
std::vector<DeletePredicatePB> _del_predicates;
bool _in_restore_mode = false;
RowsetTypePB _preferred_rowset_type = BETA_ROWSET;
std::string _remote_storage_name;
StorageMediumPB _storage_medium = StorageMediumPB::HDD;
// FIXME(cyx): Currently `cooldown_resource` is equivalent to `storage_policy`.
io::ResourceId _cooldown_resource;

View File

@ -51,7 +51,7 @@ set(UTIL_FILES
progress_updater.cpp
runtime_profile.cpp
static_asserts.cpp
storage_backend_mgr.cpp
# storage_backend_mgr.cpp
string_parser.cpp
thrift_util.cpp
thrift_client.cpp

View File

@ -287,16 +287,6 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, max_network_send_bytes_rate);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, max_network_receive_bytes_rate);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, readable_blocks_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, writable_blocks_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, blocks_created_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, blocks_deleted_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, bytes_read_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, bytes_written_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, disk_sync_total);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, blocks_open_reading);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, blocks_open_writing);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_rows);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_bytes);

View File

@ -159,17 +159,6 @@ public:
IntGauge* max_network_send_bytes_rate;
IntGauge* max_network_receive_bytes_rate;
// Metrics related with BlockManager
IntCounter* readable_blocks_total;
IntCounter* writable_blocks_total;
IntCounter* blocks_created_total;
IntCounter* blocks_deleted_total;
IntCounter* bytes_read_total;
IntCounter* bytes_written_total;
IntCounter* disk_sync_total;
IntGauge* blocks_open_reading;
IntGauge* blocks_open_writing;
// Metrics related with file reader/writer
IntCounter* local_file_reader_total;
IntCounter* s3_file_reader_total;

View File

@ -173,7 +173,7 @@ public:
bool is_date_type() const override { return get_nested_column().is_date_type(); }
bool is_date_v2_type() const override { return get_nested_column().is_date_v2_type(); }
bool is_datetime_type() const override { return get_nested_column().is_datetime_type(); }
bool is_decimalv2_type() const { return get_nested_column().is_decimalv2_type(); }
bool is_decimalv2_type() const override { return get_nested_column().is_decimalv2_type(); }
void set_date_type() override { get_nested_column().set_date_type(); }
void set_date_v2_type() override { get_nested_column().set_date_v2_type(); }
void set_datetime_type() override { get_nested_column().set_datetime_type(); }

View File

@ -191,7 +191,6 @@ set(OLAP_TEST_FILES
olap/selection_vector_test.cpp
olap/block_column_predicate_test.cpp
olap/options_test.cpp
olap/fs/file_block_manager_test.cpp
olap/common_test.cpp
olap/primary_key_index_test.cpp
# olap/memtable_flush_executor_test.cpp

View File

@ -33,7 +33,7 @@ public:
void SetUp() {
_tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta(
1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD, "", TCompressionType::LZ4F));
TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
_json_rowset_meta = R"({
"rowset_id": 540081,
@ -207,10 +207,7 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score)
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_NUM_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_NUM_BASED_POLICY));
_tablet->init();
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
@ -230,10 +227,7 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, calculate_cumulative_point) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_NUM_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_NUM_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -248,10 +242,7 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_candidate_rowsets) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_NUM_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_NUM_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -269,10 +260,7 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_input_rowsets_normal) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_NUM_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_NUM_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -301,10 +289,7 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_input_rowsets_delete) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_NUM_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_NUM_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -337,7 +322,7 @@ public:
_tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta(
1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD, "", TCompressionType::LZ4F));
TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
_json_rowset_meta = R"({
"rowset_id": 540081,
@ -678,10 +663,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -702,10 +684,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calc_cumulative_compaction_score
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
@ -725,10 +704,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calculate_cumulative_point_big_b
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -743,10 +719,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, calculate_cumulative_point_overl
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -761,10 +734,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -782,10 +752,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets_big_base)
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -803,10 +770,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_normal) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -835,10 +799,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_big_base) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -867,10 +828,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_promotion) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -899,10 +857,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_same_leve
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -931,10 +886,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_empty) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -963,10 +915,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_reach_min
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -995,10 +944,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_delete) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -1028,10 +974,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _calc_promotion_size_big) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -1050,10 +993,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _calc_promotion_size_small) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
_tablet->calculate_cumulative_point();
@ -1071,10 +1011,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _level_size) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
SizeBasedCumulativeCompactionPolicy* policy =
@ -1095,10 +1032,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _pick_missing_version_cumulative
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(
new Tablet(_tablet_meta, storage_param, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();
// has miss version

View File

@ -1,81 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/fs/file_block_manager.h"
#include <gtest/gtest.h>
#include <string>
#include "env/env.h"
#include "util/file_utils.h"
#include "util/slice.h"
using std::string;
namespace doris {
class FileBlockManagerTest : public testing::Test {
protected:
const std::string kBlockManagerDir = "./ut_dir/file_block_manager";
void SetUp() override {
if (FileUtils::check_exist(kBlockManagerDir)) {
EXPECT_TRUE(FileUtils::remove_all(kBlockManagerDir).ok());
}
EXPECT_TRUE(FileUtils::create_dir(kBlockManagerDir).ok());
}
void TearDown() override {
if (FileUtils::check_exist(kBlockManagerDir)) {
EXPECT_TRUE(FileUtils::remove_all(kBlockManagerDir).ok());
}
}
};
TEST_F(FileBlockManagerTest, NormalTest) {
fs::BlockManagerOptions bm_opts;
bm_opts.read_only = false;
bm_opts.enable_metric = false;
Env* env = Env::Default();
std::unique_ptr<fs::FileBlockManager> fbm(new fs::FileBlockManager(env, std::move(bm_opts)));
std::unique_ptr<fs::WritableBlock> wblock;
std::string fname = kBlockManagerDir + "/test_file";
fs::CreateBlockOptions wblock_opts(fname);
Status st = fbm->create_block(wblock_opts, &wblock);
EXPECT_TRUE(st.ok()) << st.get_error_msg();
std::string data = "abcdefghijklmnopqrstuvwxyz";
wblock->append(data);
wblock->close();
FilePathDesc path_desc;
path_desc.filepath = fname;
std::unique_ptr<fs::ReadableBlock> rblock;
st = fbm->open_block(path_desc, &rblock);
uint64_t file_size = 0;
EXPECT_TRUE(rblock->size(&file_size).ok());
EXPECT_EQ(data.size(), file_size);
std::string read_buff(data.size(), 'a');
Slice read_slice(read_buff);
rblock->read(0, read_slice);
EXPECT_EQ(data, read_buff);
rblock->close();
}
} // namespace doris

View File

@ -21,7 +21,6 @@
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/fs/fs_util.h"
#include "olap/row_cursor.h"
#include "olap/tablet_schema_helper.h"
#include "util/debug_util.h"

View File

@ -22,8 +22,6 @@
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/fs/block_manager.h"
#include "olap/fs/fs_util.h"
#include "olap/key_coder.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/bloom_filter.h"

View File

@ -25,7 +25,6 @@
#include "io/fs/local_file_system.h"
#include "olap/column_block.h"
#include "olap/decimal12.h"
#include "olap/fs/fs_util.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/column_reader.h"
#include "olap/rowset/segment_v2/column_writer.h"

View File

@ -29,7 +29,6 @@
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/fs/fs_util.h"
#include "olap/page_cache.h"
#include "util/file_utils.h"

View File

@ -29,8 +29,6 @@
#include "io/fs/local_file_system.h"
#include "olap/comparison_predicate.h"
#include "olap/data_dir.h"
#include "olap/fs/block_manager.h"
#include "olap/fs/fs_util.h"
#include "olap/in_list_predicate.h"
#include "olap/olap_common.h"
#include "olap/row_block.h"

View File

@ -27,8 +27,6 @@
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/fs/block_manager.h"
#include "olap/fs/fs_util.h"
#include "olap/page_cache.h"
#include "olap/tablet_schema_helper.h"
#include "util/file_utils.h"

View File

@ -27,8 +27,7 @@ TEST(TabletMetaTest, SaveAndParse) {
std::string meta_path = "./be/test/olap/test_data/tablet_meta_test.hdr";
TabletMeta old_tablet_meta(1, 2, 3, 3, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD, "",
TCompressionType::LZ4F);
TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F);
EXPECT_EQ(Status::OK(), old_tablet_meta.save(meta_path));
{

View File

@ -40,7 +40,7 @@ public:
virtual void SetUp() {
_tablet_meta = static_cast<TabletMetaSharedPtr>(new TabletMeta(
1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD, "", TCompressionType::LZ4F));
TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
_json_rowset_meta = R"({
"rowset_id": 540081,
"tablet_id": 15673,
@ -192,9 +192,7 @@ TEST_F(TestTablet, delete_expired_stale_rowset) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(new Tablet(_tablet_meta, storage_param, nullptr));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr));
_tablet->init();
for (auto ptr : expired_rs_metas) {
@ -240,9 +238,7 @@ TEST_F(TestTablet, cooldown_policy) {
_tablet_meta->add_rs_meta(rowset);
}
StorageParamPB storage_param;
storage_param.set_storage_medium(StorageMediumPB::HDD);
TabletSharedPtr _tablet(new Tablet(_tablet_meta, storage_param, nullptr));
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr));
_tablet->init();
_tablet->set_cooldown_resource("test_policy_name");

View File

@ -147,8 +147,6 @@
},
"preferred_rowset_type": "BETA_ROWSET",
"tablet_type": "TABLET_TYPE_DISK",
"storage_medium": "HDD",
"remote_storage_name": "",
"replica_id": 0,
"storage_policy": "",
"delete_bitmap": {},

View File

@ -31,8 +31,6 @@
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/field.h"
#include "olap/fs/block_manager.h"
#include "olap/fs/fs_util.h"
#include "olap/row_block2.h"
#include "olap/rowset/segment_v2/column_reader.h"
#include "olap/rowset/segment_v2/column_writer.h"

View File

@ -38,8 +38,6 @@
#include "io/fs/local_file_system.h"
#include "olap/comparison_predicate.h"
#include "olap/data_dir.h"
#include "olap/fs/block_manager.h"
#include "olap/fs/fs_util.h"
#include "olap/in_list_predicate.h"
#include "olap/olap_common.h"
#include "olap/row_block2.h"

View File

@ -248,12 +248,6 @@ message S3StorageParamPB {
optional string root_path = 8;
}
message StorageParamPB {
optional StorageMediumPB storage_medium = 1 [default = HDD];
optional string storage_name = 2 [default = "local"];
optional S3StorageParamPB s3_storage_param = 3;
}
message TabletMetaPB {
optional int64 table_id = 1; // ?
optional int64 partition_id = 2; // ?
@ -278,8 +272,8 @@ message TabletMetaPB {
optional RowsetTypePB preferred_rowset_type = 16;
optional TabletTypePB tablet_type = 17;
repeated RowsetMetaPB stale_rs_metas = 18;
optional StorageMediumPB storage_medium = 19 [default = HDD];
optional string remote_storage_name = 20;
// optional StorageMediumPB storage_medium = 19 [default = HDD];
// optional string remote_storage_name = 20;
optional int64 replica_id = 21 [default = 0];
optional string storage_policy = 22;
optional DeleteBitmapPB delete_bitmap = 23;

View File

@ -79,12 +79,6 @@ struct TGetStoragePolicyResult {
2: required list<TGetStoragePolicy> result_entrys
}
struct TStorageParam {
1: required Types.TStorageMedium storage_medium = TStorageMedium.HDD
2: required string storage_name = "";
3: optional TS3StorageParam s3_storage_param
}
enum TCompressionType {
UNKNOWN_COMPRESSION = 0,
DEFAULT_COMPRESSION = 1,
@ -117,7 +111,7 @@ struct TCreateTabletReq {
12: optional bool is_eco_mode
13: optional TStorageFormat storage_format
14: optional TTabletType tablet_type
15: optional TStorageParam storage_param
// 15: optional TStorageParam storage_param
16: optional TCompressionType compression_type = TCompressionType.LZ4F
17: optional Types.TReplicaId replica_id = 0
18: optional string storage_policy