rm sequential file (#8713)
[refactor]remove sequential file reader from env
This commit is contained in:
39
be/src/env/env.h
vendored
39
be/src/env/env.h
vendored
@ -5,6 +5,7 @@
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <list>
|
||||
@ -22,7 +23,6 @@ namespace doris {
|
||||
class RandomAccessFile;
|
||||
class RandomRWFile;
|
||||
class WritableFile;
|
||||
class SequentialFile;
|
||||
class PosixEnv;
|
||||
class StorageBackend;
|
||||
struct FilePathDesc;
|
||||
@ -50,15 +50,6 @@ public:
|
||||
// implementation instead of relying on this default environment.
|
||||
static Env* Default();
|
||||
|
||||
// Create a brand new sequentially-readable file with the specified name.
|
||||
// On success, stores a pointer to the new file in *result and returns OK.
|
||||
// On failure stores nullptr in *result and returns non-OK. If the file does
|
||||
// not exist, returns a non-OK status.
|
||||
//
|
||||
// The returned file will only be accessed by one thread at a time.
|
||||
virtual Status new_sequential_file(const std::string& fname,
|
||||
std::unique_ptr<SequentialFile>* result) = 0;
|
||||
|
||||
// Create a brand new random access read-only file with the
|
||||
// specified name. On success, stores a pointer to the new file in
|
||||
// *result and returns OK. On failure stores nullptr in *result and
|
||||
@ -301,34 +292,6 @@ struct RandomRWFileOptions {
|
||||
Env::OpenMode mode = Env::CREATE_OR_OPEN_WITH_TRUNCATE;
|
||||
};
|
||||
|
||||
// A file abstraction for reading sequentially through a file
|
||||
class SequentialFile {
|
||||
public:
|
||||
SequentialFile() {}
|
||||
virtual ~SequentialFile() {}
|
||||
|
||||
// Read up to "result.size" bytes from the file.
|
||||
// Sets "result.data" to the data that was read.
|
||||
//
|
||||
// If an error was encountered, returns a non-OK status
|
||||
// and the contents of "result" are invalid.
|
||||
//
|
||||
// REQUIRES: External synchronization
|
||||
virtual Status read(Slice* result) = 0;
|
||||
|
||||
// Skip "n" bytes from the file. This is guaranteed to be no
|
||||
// slower that reading the same data, but may be faster.
|
||||
//
|
||||
// If end of file is reached, skipping will stop at the end of the
|
||||
// file, and Skip will return OK.
|
||||
//
|
||||
// REQUIRES: External synchronization
|
||||
virtual Status skip(uint64_t n) = 0;
|
||||
|
||||
// Returns the filename provided when the SequentialFile was constructed.
|
||||
virtual const std::string& filename() const = 0;
|
||||
};
|
||||
|
||||
class RandomAccessFile {
|
||||
public:
|
||||
RandomAccessFile() {}
|
||||
|
||||
53
be/src/env/env_posix.cpp
vendored
53
be/src/env/env_posix.cpp
vendored
@ -223,49 +223,6 @@ static Status do_writev_at(int fd, const string& filename, uint64_t offset, cons
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
class PosixSequentialFile : public SequentialFile {
|
||||
public:
|
||||
PosixSequentialFile(string fname, FILE* f) : _filename(std::move(fname)), _file(f) {}
|
||||
|
||||
~PosixSequentialFile() override {
|
||||
int err;
|
||||
RETRY_ON_EINTR(err, fclose(_file));
|
||||
if (PREDICT_FALSE(err != 0)) {
|
||||
LOG(WARNING) << "Failed to close " << _filename
|
||||
<< ", msg=" << errno_to_string(ferror(_file));
|
||||
}
|
||||
}
|
||||
|
||||
Status read(Slice* result) override {
|
||||
size_t r;
|
||||
STREAM_RETRY_ON_EINTR(r, _file, fread_unlocked(result->data, 1, result->size, _file));
|
||||
if (r < result->size) {
|
||||
if (feof(_file)) {
|
||||
// We leave status as ok if we hit the end of the file.
|
||||
// We need to adjust the slice size.
|
||||
result->truncate(r);
|
||||
} else {
|
||||
// A partial read with an error: return a non-ok status.
|
||||
return io_error(_filename, ferror(_file));
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status skip(uint64_t n) override {
|
||||
if (fseek(_file, n, SEEK_CUR)) {
|
||||
return io_error(_filename, errno);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
const string& filename() const override { return _filename; }
|
||||
|
||||
private:
|
||||
const std::string _filename;
|
||||
FILE* const _file;
|
||||
};
|
||||
|
||||
class PosixRandomAccessFile : public RandomAccessFile {
|
||||
public:
|
||||
PosixRandomAccessFile(std::string filename, int fd) : _filename(std::move(filename)), _fd(fd) {}
|
||||
@ -528,16 +485,6 @@ private:
|
||||
bool _closed = false;
|
||||
};
|
||||
|
||||
Status PosixEnv::new_sequential_file(const string& fname, std::unique_ptr<SequentialFile>* result) {
|
||||
FILE* f;
|
||||
POINTER_RETRY_ON_EINTR(f, fopen(fname.c_str(), "r"));
|
||||
if (f == nullptr) {
|
||||
return io_error(fname, errno);
|
||||
}
|
||||
result->reset(new PosixSequentialFile(fname, f));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// get a RandomAccessFile pointer without file cache
|
||||
Status PosixEnv::new_random_access_file(const std::string& fname,
|
||||
std::unique_ptr<RandomAccessFile>* result) {
|
||||
|
||||
4
be/src/env/env_posix.h
vendored
4
be/src/env/env_posix.h
vendored
@ -24,7 +24,6 @@ namespace doris {
|
||||
class RandomAccessFile;
|
||||
class RandomRWFile;
|
||||
class WritableFile;
|
||||
class SequentialFile;
|
||||
struct WritableFileOptions;
|
||||
struct RandomAccessFileOptions;
|
||||
struct RandomRWFileOptions;
|
||||
@ -33,9 +32,6 @@ class PosixEnv : public Env {
|
||||
public:
|
||||
~PosixEnv() override {}
|
||||
|
||||
Status new_sequential_file(const std::string& fname,
|
||||
std::unique_ptr<SequentialFile>* result) override;
|
||||
|
||||
// get a RandomAccessFile pointer without file cache
|
||||
Status new_random_access_file(const std::string& fname,
|
||||
std::unique_ptr<RandomAccessFile>* result) override;
|
||||
|
||||
20
be/src/env/env_util.cpp
vendored
20
be/src/env/env_util.cpp
vendored
@ -75,26 +75,14 @@ Status write_string_to_file_sync(Env* env, const Slice& data, const std::string&
|
||||
return do_write_string_to_file(env, data, fname, true);
|
||||
}
|
||||
|
||||
Status read_file_to_string(Env* env, const std::string& fname, faststring* data) {
|
||||
Status read_file_to_string(Env* env, const std::string& fname, std::string* data) {
|
||||
data->clear();
|
||||
std::unique_ptr<SequentialFile> file;
|
||||
Status s = env->new_sequential_file(fname, &file);
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
Status s = env->new_random_access_file(fname, &file);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
static const int kBufferSize = 8192;
|
||||
std::unique_ptr<uint8_t[]> scratch(new uint8_t[kBufferSize]);
|
||||
while (true) {
|
||||
Slice fragment(scratch.get(), kBufferSize);
|
||||
s = file->read(&fragment);
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
data->append(fragment.get_data(), fragment.get_size());
|
||||
if (fragment.empty()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
s = file->read_all(data);
|
||||
return s;
|
||||
}
|
||||
|
||||
|
||||
3
be/src/env/env_util.h
vendored
3
be/src/env/env_util.h
vendored
@ -14,6 +14,7 @@
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
@ -45,7 +46,7 @@ Status write_string_to_file(Env* env, const Slice& data, const std::string& fnam
|
||||
Status write_string_to_file_sync(Env* env, const Slice& data, const std::string& fname);
|
||||
|
||||
// A utility routine: read contents of named file into *data
|
||||
Status read_file_to_string(Env* env, const std::string& fname, faststring* data);
|
||||
Status read_file_to_string(Env* env, const std::string& fname, std::string* data);
|
||||
|
||||
} // namespace env_util
|
||||
} // namespace doris
|
||||
|
||||
@ -104,11 +104,11 @@ Status get_thread_stats(int64_t tid, ThreadStats* stats) {
|
||||
if (kTicksPerSec <= 0) {
|
||||
return Status::NotSupported("ThreadStats not supported");
|
||||
}
|
||||
faststring buf;
|
||||
std::string buf;
|
||||
RETURN_IF_ERROR(env_util::read_file_to_string(
|
||||
Env::Default(), strings::Substitute("/proc/self/task/$0/stat", tid), &buf));
|
||||
|
||||
return parse_stat(buf.ToString(), nullptr, stats);
|
||||
return parse_stat(buf, nullptr, stats);
|
||||
}
|
||||
void disable_core_dumps() {
|
||||
struct rlimit lim;
|
||||
@ -138,13 +138,13 @@ bool is_being_debugged() {
|
||||
// Look for the TracerPid line in /proc/self/status.
|
||||
// If this is non-zero, we are being ptraced, which is indicative of gdb or strace
|
||||
// being attached.
|
||||
faststring buf;
|
||||
std::string buf;
|
||||
Status s = env_util::read_file_to_string(Env::Default(), "/proc/self/status", &buf);
|
||||
if (!s.ok()) {
|
||||
LOG(WARNING) << "could not read /proc/self/status: " << s.to_string();
|
||||
return false;
|
||||
}
|
||||
StringPiece buf_sp(reinterpret_cast<const char*>(buf.data()), buf.size());
|
||||
StringPiece buf_sp(buf.data(), buf.size());
|
||||
std::vector<StringPiece> lines = Split(buf_sp, "\n");
|
||||
for (const auto& l : lines) {
|
||||
if (!HasPrefixString(l, "TracerPid:")) continue;
|
||||
|
||||
@ -15,17 +15,17 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "util/storage_backend_mgr.h"
|
||||
|
||||
#include "common/config.h"
|
||||
#include "common/status.h"
|
||||
#include "env/env_util.h"
|
||||
#include "env/env.h"
|
||||
#include "env/env_util.h"
|
||||
#include "gutil/strings/substitute.h"
|
||||
#include "util/faststring.h"
|
||||
#include "util/file_utils.h"
|
||||
#include "util/storage_backend.h"
|
||||
#include "util/storage_backend_mgr.h"
|
||||
#include "util/s3_util.h"
|
||||
#include "util/s3_storage_backend.h"
|
||||
#include "util/s3_util.h"
|
||||
#include "util/storage_backend.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -36,20 +36,22 @@ Status StorageBackendMgr::init(const std::string& storage_param_dir) {
|
||||
Status exist_status = Env::Default()->path_exists(storage_param_dir);
|
||||
if (!exist_status.ok() &&
|
||||
(!exist_status.is_not_found() || !Env::Default()->create_dirs(storage_param_dir).ok())) {
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError(strings::Substitute(
|
||||
"failed to create remote storage_param root path $0", storage_param_dir)),
|
||||
"create_dirs failed");
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(
|
||||
Status::IOError(strings::Substitute(
|
||||
"failed to create remote storage_param root path $0", storage_param_dir)),
|
||||
"create_dirs failed");
|
||||
}
|
||||
|
||||
std::vector<std::string> file_names;
|
||||
RETURN_IF_ERROR(FileUtils::list_files(Env::Default(), storage_param_dir, &file_names));
|
||||
for (auto& file_name : file_names) {
|
||||
faststring buf;
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(env_util::read_file_to_string(
|
||||
Env::Default(), storage_param_dir + "/" + file_name, &buf),
|
||||
strings::Substitute("load storage_name failed. $0", file_name));
|
||||
std::string buf;
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(
|
||||
env_util::read_file_to_string(Env::Default(), storage_param_dir + "/" + file_name,
|
||||
&buf),
|
||||
strings::Substitute("load storage_name failed. $0", file_name));
|
||||
StorageParamPB storage_param_pb;
|
||||
RETURN_IF_ERROR(_deserialize_param(buf.ToString(), &storage_param_pb));
|
||||
RETURN_IF_ERROR(_deserialize_param(buf, &storage_param_pb));
|
||||
RETURN_IF_ERROR(_create_remote_storage_internal(storage_param_pb));
|
||||
LOG(INFO) << "init remote_storage_param successfully. storage_name: " << file_name;
|
||||
}
|
||||
@ -67,18 +69,20 @@ Status StorageBackendMgr::create_remote_storage(const StorageParamPB& storage_pa
|
||||
std::string storage_name = storage_param_pb.storage_name();
|
||||
|
||||
string storage_param_path = _storage_param_dir + "/" + storage_name;
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(FileUtils::remove(storage_param_path),
|
||||
strings::Substitute("rm storage_param_pb file failed: $0", storage_param_path));
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(
|
||||
FileUtils::remove(storage_param_path),
|
||||
strings::Substitute("rm storage_param_pb file failed: $0", storage_param_path));
|
||||
std::string param_binary;
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(_serialize_param(storage_param_pb, ¶m_binary),
|
||||
"_serialize_param storage_param_pb failed.");
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(
|
||||
env_util::write_string_to_file(Env::Default(), Slice(param_binary), storage_param_path),
|
||||
strings::Substitute("write_string_to_file failed: $0", storage_param_path));
|
||||
faststring buf;
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(env_util::read_file_to_string(Env::Default(), storage_param_path, &buf),
|
||||
strings::Substitute("read storage_name failed. $0", storage_param_path));
|
||||
if (buf.ToString() != param_binary) {
|
||||
std::string buf;
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(
|
||||
env_util::read_file_to_string(Env::Default(), storage_param_path, &buf),
|
||||
strings::Substitute("read storage_name failed. $0", storage_param_path));
|
||||
if (buf != param_binary) {
|
||||
LOG(ERROR) << "storage_param written failed. storage_name: ("
|
||||
<< storage_param_pb.storage_name() << "<->" << storage_name << ")";
|
||||
return Status::InternalError("storage_param written failed");
|
||||
@ -89,7 +93,7 @@ Status StorageBackendMgr::create_remote_storage(const StorageParamPB& storage_pa
|
||||
|
||||
Status StorageBackendMgr::_create_remote_storage_internal(const StorageParamPB& storage_param_pb) {
|
||||
std::string storage_name = storage_param_pb.storage_name();
|
||||
WriteLock wrlock(_storage_backend_lock);
|
||||
std::unique_lock wrlock(_storage_backend_lock);
|
||||
if (_storage_backend_map.size() >= doris::config::max_remote_storage_count) {
|
||||
std::map<std::string, time_t>::iterator itr = _storage_backend_active_time.begin();
|
||||
std::string timeout_key = itr->first;
|
||||
@ -107,25 +111,25 @@ Status StorageBackendMgr::_create_remote_storage_internal(const StorageParamPB&
|
||||
}
|
||||
std::map<std::string, std::string> storage_prop;
|
||||
switch (storage_param_pb.storage_medium()) {
|
||||
case StorageMediumPB::S3:
|
||||
default:
|
||||
S3StorageParamPB s3_storage_param = storage_param_pb.s3_storage_param();
|
||||
if (s3_storage_param.s3_ak().empty() || s3_storage_param.s3_sk().empty()
|
||||
|| s3_storage_param.s3_endpoint().empty() || s3_storage_param.s3_region().empty()) {
|
||||
return Status::InternalError("s3_storage_param param is invalid");
|
||||
}
|
||||
storage_prop[S3_AK] = s3_storage_param.s3_ak();
|
||||
storage_prop[S3_SK] = s3_storage_param.s3_sk();
|
||||
storage_prop[S3_ENDPOINT] = s3_storage_param.s3_endpoint();
|
||||
storage_prop[S3_REGION] = s3_storage_param.s3_region();
|
||||
storage_prop[S3_MAX_CONN_SIZE] = s3_storage_param.s3_max_conn();
|
||||
storage_prop[S3_REQUEST_TIMEOUT_MS] = s3_storage_param.s3_request_timeout_ms();
|
||||
storage_prop[S3_CONN_TIMEOUT_MS] = s3_storage_param.s3_conn_timeout_ms();
|
||||
case StorageMediumPB::S3:
|
||||
default:
|
||||
S3StorageParamPB s3_storage_param = storage_param_pb.s3_storage_param();
|
||||
if (s3_storage_param.s3_ak().empty() || s3_storage_param.s3_sk().empty() ||
|
||||
s3_storage_param.s3_endpoint().empty() || s3_storage_param.s3_region().empty()) {
|
||||
return Status::InternalError("s3_storage_param param is invalid");
|
||||
}
|
||||
storage_prop[S3_AK] = s3_storage_param.s3_ak();
|
||||
storage_prop[S3_SK] = s3_storage_param.s3_sk();
|
||||
storage_prop[S3_ENDPOINT] = s3_storage_param.s3_endpoint();
|
||||
storage_prop[S3_REGION] = s3_storage_param.s3_region();
|
||||
storage_prop[S3_MAX_CONN_SIZE] = s3_storage_param.s3_max_conn();
|
||||
storage_prop[S3_REQUEST_TIMEOUT_MS] = s3_storage_param.s3_request_timeout_ms();
|
||||
storage_prop[S3_CONN_TIMEOUT_MS] = s3_storage_param.s3_conn_timeout_ms();
|
||||
|
||||
if (!ClientFactory::is_s3_conf_valid(storage_prop)) {
|
||||
return Status::InternalError("s3_storage_param is invalid");
|
||||
}
|
||||
_storage_backend_map[storage_name] = std::make_shared<S3StorageBackend>(storage_prop);
|
||||
if (!ClientFactory::is_s3_conf_valid(storage_prop)) {
|
||||
return Status::InternalError("s3_storage_param is invalid");
|
||||
}
|
||||
_storage_backend_map[storage_name] = std::make_shared<S3StorageBackend>(storage_prop);
|
||||
}
|
||||
_storage_param_map[storage_name] = storage_param_pb;
|
||||
_storage_backend_active_time[storage_name] = time(nullptr);
|
||||
@ -133,8 +137,9 @@ Status StorageBackendMgr::_create_remote_storage_internal(const StorageParamPB&
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
std::shared_ptr<StorageBackend> StorageBackendMgr::get_storage_backend(const std::string& storage_name) {
|
||||
ReadLock rdlock(_storage_backend_lock);
|
||||
std::shared_ptr<StorageBackend> StorageBackendMgr::get_storage_backend(
|
||||
const std::string& storage_name) {
|
||||
std::shared_lock rdlock(_storage_backend_lock);
|
||||
if (_storage_backend_map.find(storage_name) == _storage_backend_map.end()) {
|
||||
return nullptr;
|
||||
}
|
||||
@ -142,8 +147,9 @@ std::shared_ptr<StorageBackend> StorageBackendMgr::get_storage_backend(const std
|
||||
return _storage_backend_map[storage_name];
|
||||
}
|
||||
|
||||
Status StorageBackendMgr::get_storage_param(const std::string& storage_name, StorageParamPB* storage_param) {
|
||||
ReadLock rdlock(_storage_backend_lock);
|
||||
Status StorageBackendMgr::get_storage_param(const std::string& storage_name,
|
||||
StorageParamPB* storage_param) {
|
||||
std::shared_lock rdlock(_storage_backend_lock);
|
||||
if (_storage_backend_map.find(storage_name) == _storage_backend_map.end()) {
|
||||
return Status::InternalError("storage_name not exist: " + storage_name);
|
||||
}
|
||||
@ -152,7 +158,7 @@ Status StorageBackendMgr::get_storage_param(const std::string& storage_name, Sto
|
||||
}
|
||||
|
||||
Status StorageBackendMgr::get_root_path(const std::string& storage_name, std::string* root_path) {
|
||||
ReadLock rdlock(_storage_backend_lock);
|
||||
std::shared_lock rdlock(_storage_backend_lock);
|
||||
if (_storage_backend_map.find(storage_name) == _storage_backend_map.end()) {
|
||||
return Status::InternalError("storage_name not exist: " + storage_name);
|
||||
}
|
||||
@ -162,18 +168,17 @@ Status StorageBackendMgr::get_root_path(const std::string& storage_name, std::st
|
||||
|
||||
std::string StorageBackendMgr::get_root_path_from_param(const StorageParamPB& storage_param) {
|
||||
switch (storage_param.storage_medium()) {
|
||||
case StorageMediumPB::S3:
|
||||
default:
|
||||
{
|
||||
return storage_param.s3_storage_param().root_path();
|
||||
}
|
||||
case StorageMediumPB::S3:
|
||||
default: {
|
||||
return storage_param.s3_storage_param().root_path();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Status StorageBackendMgr::_check_exist(const StorageParamPB& storage_param_pb) {
|
||||
StorageParamPB old_storage_param;
|
||||
RETURN_IF_ERROR(get_storage_param(storage_param_pb.storage_name(), &old_storage_param));
|
||||
ReadLock rdlock(_storage_backend_lock);
|
||||
std::shared_lock rdlock(_storage_backend_lock);
|
||||
std::string old_param_binary;
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(_serialize_param(old_storage_param, &old_param_binary),
|
||||
"_serialize_param old_storage_param_pb failed.");
|
||||
@ -187,16 +192,19 @@ Status StorageBackendMgr::_check_exist(const StorageParamPB& storage_param_pb) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status StorageBackendMgr::_serialize_param(const StorageParamPB& storage_param_pb, std::string* param_binary) {
|
||||
Status StorageBackendMgr::_serialize_param(const StorageParamPB& storage_param_pb,
|
||||
std::string* param_binary) {
|
||||
bool serialize_success = storage_param_pb.SerializeToString(param_binary);
|
||||
if (!serialize_success) {
|
||||
LOG(WARNING) << "failed to serialize storage_param " << storage_param_pb.storage_name();
|
||||
return Status::InternalError("failed to serialize storage_param: " + storage_param_pb.storage_name());
|
||||
return Status::InternalError("failed to serialize storage_param: " +
|
||||
storage_param_pb.storage_name());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status StorageBackendMgr::_deserialize_param(const std::string& param_binary, StorageParamPB* storage_param_pb) {
|
||||
Status StorageBackendMgr::_deserialize_param(const std::string& param_binary,
|
||||
StorageParamPB* storage_param_pb) {
|
||||
bool parsed = storage_param_pb->ParseFromString(param_binary);
|
||||
if (!parsed) {
|
||||
LOG(WARNING) << "parse storage_param failed";
|
||||
|
||||
@ -17,15 +17,18 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <shared_mutex>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "util/mutex.h"
|
||||
#include "common/status.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class StorageBackend;
|
||||
class StorageParamPB;
|
||||
|
||||
// StorageBackendMgr is used to manage StorageBackend, it has (key -> StorageBackend) map used to connect remote storage
|
||||
class StorageBackendMgr {
|
||||
@ -55,6 +58,7 @@ public:
|
||||
|
||||
// get root_path of remote storage from storage_param
|
||||
static std::string get_root_path_from_param(const StorageParamPB& storage_param);
|
||||
|
||||
private:
|
||||
Status _create_remote_storage_internal(const StorageParamPB& storage_param);
|
||||
Status _check_exist(const StorageParamPB& storage_param_pb);
|
||||
|
||||
31
be/test/env/env_posix_test.cpp
vendored
31
be/test/env/env_posix_test.cpp
vendored
@ -178,37 +178,6 @@ TEST_F(EnvPosixTest, random_rw) {
|
||||
ASSERT_EQ(TStatusCode::END_OF_FILE, st.code());
|
||||
LOG(INFO) << "st=" << st.to_string();
|
||||
}
|
||||
// SequentialFile
|
||||
{
|
||||
char mem[1024];
|
||||
std::unique_ptr<SequentialFile> rfile;
|
||||
st = env->new_sequential_file(fname, &rfile);
|
||||
ASSERT_TRUE(st.ok());
|
||||
|
||||
Slice slice1(mem, 3);
|
||||
st = rfile->read(&slice1);
|
||||
ASSERT_TRUE(st.ok());
|
||||
ASSERT_STREQ("abc", std::string(slice1.data, slice1.size).c_str());
|
||||
|
||||
st = rfile->skip(3);
|
||||
ASSERT_TRUE(st.ok());
|
||||
|
||||
Slice slice3(mem, 3);
|
||||
st = rfile->read(&slice3);
|
||||
ASSERT_STREQ("789", std::string(slice3.data, slice3.size).c_str());
|
||||
|
||||
st = rfile->skip(90);
|
||||
ASSERT_TRUE(st.ok());
|
||||
|
||||
Slice slice4(mem, 15);
|
||||
st = rfile->read(&slice4);
|
||||
ASSERT_TRUE(st.ok());
|
||||
ASSERT_EQ(10, slice4.size);
|
||||
|
||||
st = rfile->read(&slice4);
|
||||
ASSERT_TRUE(st.ok());
|
||||
ASSERT_EQ(0, slice4.size);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(EnvPosixTest, iterate_dir) {
|
||||
|
||||
@ -40,8 +40,6 @@ public:
|
||||
std::string path = GetCurrentRunningDir();
|
||||
ASSERT_FALSE(path.empty());
|
||||
|
||||
std::unique_ptr<SequentialFile> file;
|
||||
|
||||
auto& file_name = req->param("FILE");
|
||||
|
||||
FILE* fp = fopen((path + "/plugin_test/source/" + file_name).c_str(), "r");
|
||||
|
||||
Reference in New Issue
Block a user