[Plugin] Add BE plugin framework (#2348) (#2618)

Support BE plugin framework, include:

* update Plugin Manager, support Plugin find method
* support Builtin-Plugin register method

* plugin install/uninstall process
	* PluginLoader:
		* dynamic install and check Plugin .so file
		* dynamic uninstall and check Plugin status
	* PluginZip:
		* support plugin remote/local .zip file download and extract

TODO:

* We should support a PluginContext to transmit necessary system variable when the plugin's init/close method invoke

* Add the entry which is BE dynamic Plugin install/uninstall process, include:
	* The FE send install/uninstall Plugin statement (RPC way)
	* The FE meta update request with Plugin list information
	* The FE operation request(update/query) with Plugin (maybe don't need)

* Add the plugin status upload way
* Load already install Plugin when BE start
This commit is contained in:
Seaven
2020-03-25 21:55:44 +08:00
committed by GitHub
parent 8fa328c344
commit 8426669472
24 changed files with 1291 additions and 30 deletions

View File

@ -371,6 +371,7 @@ set(DORIS_LINK_LIBS
Webserver
TestUtil
Geo
Plugin
${WL_END_GROUP}
)
@ -494,6 +495,7 @@ add_subdirectory(${SRC_DIR}/udf)
add_subdirectory(${SRC_DIR}/tools)
add_subdirectory(${SRC_DIR}/udf_samples)
add_subdirectory(${SRC_DIR}/util)
add_subdirectory(${SRC_DIR}/plugin)
# Utility CMake function to make specifying tests and benchmarks less verbose
FUNCTION(ADD_BE_TEST TEST_NAME)
@ -512,6 +514,24 @@ FUNCTION(ADD_BE_TEST TEST_NAME)
ADD_TEST(${TEST_FILE_NAME} "${BUILD_OUTPUT_ROOT_DIRECTORY}/${TEST_NAME}")
ENDFUNCTION()
FUNCTION(ADD_BE_PLUGIN PLUGIN_NAME)
set(BUILD_OUTPUT_ROOT_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/")
get_filename_component(DIR_NAME ${CMAKE_CURRENT_SOURCE_DIR} NAME)
get_filename_component(PLUGIN_DIR_NAME ${PLUGIN_NAME} PATH)
get_filename_component(PLUGIN_FILE_NAME ${PLUGIN_NAME} NAME)
ADD_LIBRARY(${PLUGIN_FILE_NAME} SHARED ${PLUGIN_NAME}.cpp)
TARGET_LINK_LIBRARIES(${PLUGIN_FILE_NAME} ${DORIS_LINK_LIBS})
SET_TARGET_PROPERTIES(${PLUGIN_FILE_NAME} PROPERTIES COMPILE_FLAGS "-fno-access-control")
if (NOT "${PLUGIN_DIR_NAME}" STREQUAL "")
SET_TARGET_PROPERTIES(${PLUGIN_FILE_NAME} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}/${PLUGIN_DIR_NAME}")
endif ()
ENDFUNCTION()
if (${MAKE_TEST} STREQUAL "ON")
add_subdirectory(${TEST_DIR}/agent)
add_subdirectory(${TEST_DIR}/common)
@ -524,6 +544,8 @@ if (${MAKE_TEST} STREQUAL "ON")
add_subdirectory(${TEST_DIR}/runtime)
add_subdirectory(${TEST_DIR}/udf)
add_subdirectory(${TEST_DIR}/util)
add_subdirectory(${TEST_DIR}/plugin)
add_subdirectory(${TEST_DIR}/plugin/example)
endif ()
# Install be

View File

@ -92,6 +92,8 @@ namespace config {
CONF_Int32(report_disk_state_interval_seconds, "60");
// the interval time(seconds) for agent report olap table to FE
CONF_Int32(report_tablet_interval_seconds, "60");
// the interval time(seconds) for agent report plugin status to FE
// CONF_Int32(report_plugin_interval_seconds, "120");
// the timeout(seconds) for alter table
CONF_Int32(alter_tablet_timeout_seconds, "86400");
// the timeout(seconds) for make snapshot
@ -488,6 +490,8 @@ namespace config {
// tablet_map_lock shard size, the value is 2^n, n=0,1,2,3,4
// this is a an enhancement for better performance to manage tablet
CONF_Int32(tablet_map_shard_size, "1");
CONF_String(plugin_path, "${DORIS_HOME}/plugin")
} // namespace config

View File

@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# where to put generated libraries
set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/plugin")
# where to put generated binaries
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/plugin")
add_library(Plugin STATIC
plugin_loader.cpp
plugin_mgr.cpp
plugin_zip.cpp
)

View File

@ -15,11 +15,19 @@
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_BE_PLUGIN_PLUGIN_H
#define DORIS_BE_PLUGIN_PLUGIN_H
namespace doris {
#define PLUGIN_NOT_DYNAMIC_INSTALL 1UL
#define PLUGIN_TYPE_AUDIT 1
#define PLUGIN_TYPE_IMPORT 2
#define PLUGIN_TYPE_STORAGE 3
#define PLUGIN_TYPE_MAX 4
#define PLUGIN_DEFAULT_FLAG 0UL
#define PLUGIN_INSTALL_EARLY 1UL
#define PLUGIN_NOT_DYNAMIC_UNINSTALL 2UL
#define PLUGIN_INSTALL_EARLY 4UL
#define DORIS_PLUGIN_VERSION 001100UL
@ -41,10 +49,10 @@ struct Plugin {
void* handler;
// invoke when plugin install
int (*init)(void *);
int (* init)(void*);
// invoke when plugin uninstall
int (*close)(void *);
int (* close)(void*);
// flag for plugin
uint64_t flags;
@ -56,20 +64,21 @@ struct Plugin {
void* status;
};
#define __DECLARE_PLUGIN(VERSION, PSIZE, DECLS) \
int VERSION = DORIS_PLUGIN_VERSION; \
int PSIZE = sizeof(struct Plugin); \
Plugin DECLS[] = {
#define declare_plugin(NAME) \
__DECLARE_PLUGIN(NAME, ##NAME##_plugin_interface_version, \
##NAME##_sizeof_struct_st_plugin, \
##NAME##_plugin)
// Plugin Name must be same with plugin's description file
#define declare_plugin(NAME) \
__DECLARE_PLUGIN(NAME##_plugin_interface_version, \
NAME##_sizeof_plugin, \
NAME##_plugin)
#define __DECLARE_PLUGIN(NAME, VERSION, PSIZE, DECLS) \
int VERSION = DORIS_PLUGIN_VERSION; \
int PSIZE = sizeof(struct st_plugin); \
struct st_plugin DECLS[] = {
#define declare_plugin_end \
, { 0, 0, 0, 0, 0, 0 } \
}
};
}
#endif //DORIS_BE_PLUGIN_PLUGIN_H

View File

@ -0,0 +1,194 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/algorithm/string/predicate.hpp>
#include <cstring>
#include "plugin/plugin_loader.h"
#include "plugin/plugin_zip.h"
#include "gutil/strings/substitute.h"
#include "gutil/strings/util.h"
#include "util/dynamic_util.h"
#include "util/file_utils.h"
#include "http/http_client.h"
#include "util/time.h"
#include "util/md5.h"
#include "env/env.h"
namespace doris {
using namespace strings;
static const std::string PLUGIN_VERSION_SYMBOL = "_plugin_interface_version";
static const std::string PLUGIN_SIZE_SYMBOL = "_sizeof_plugin";
static const std::string PLUGIN_STRUCT_SYMBOL = "_plugin";
Status PluginLoader::open_valid() {
return Status::OK();
}
Status PluginLoader::close_valid() {
if (_plugin.get() != nullptr && (_plugin->flags & PLUGIN_NOT_DYNAMIC_UNINSTALL)) {
return Status::InternalError(Substitute("plugin $0 not allow dynamic uninstall", _name));
}
return Status::OK();
}
Status DynamicPluginLoader::install() {
// check already install
std::string so_path = _install_path + "/" + _name + "/" + _so_name;
if (!FileUtils::check_exist(so_path)) {
// no, need download zip install
PluginZip zip(_source);
RETURN_IF_ERROR(zip.extract(_install_path, _name));
}
// open plugin
RETURN_IF_ERROR(open_plugin());
RETURN_IF_ERROR(open_valid());
// plugin init
// todo: what should be send?
if (_plugin->init != nullptr) {
_plugin->init(&_plugin->handler);
}
return Status::OK();
}
/**
* open & valid Plugin:
* 1. check .so file exists
* 2. check .so version symbol
* 3. check .so plugin symbol
*/
Status DynamicPluginLoader::open_plugin() {
// check .so file
std::string so_path = _install_path + "/" + _name + "/" + _so_name;
if (!FileUtils::check_exist(so_path)) {
return Status::InternalError("plugin install not found " + _so_name);
}
RETURN_IF_ERROR(dynamic_open(so_path.c_str(), &_plugin_handler));
void* symbol;
// check version symbol
RETURN_IF_ERROR(dynamic_lookup(_plugin_handler, (_name + PLUGIN_VERSION_SYMBOL).c_str(), &symbol));
if (DORIS_PLUGIN_VERSION > *(int*) symbol) {
return Status::InternalError("plugin compile version too old");
}
RETURN_IF_ERROR(dynamic_lookup(_plugin_handler, (_name + PLUGIN_SIZE_SYMBOL).c_str(), &symbol));
int plugin_size = *(int*) symbol;
if (plugin_size != sizeof(Plugin)) {
return Status::InternalError("plugin struct error");
}
// check Plugin declaration
RETURN_IF_ERROR(dynamic_lookup(_plugin_handler, (_name + PLUGIN_STRUCT_SYMBOL).c_str(), &symbol));
Plugin* end_plugin = (Plugin*) ((char*) symbol + plugin_size);
if (end_plugin->handler != nullptr || end_plugin->init != nullptr || end_plugin->close != nullptr) {
return Status::InternalError("plugin struct error");
}
_plugin = std::make_shared<Plugin>();
std::memcpy(_plugin.get(), symbol, plugin_size);
return Status::OK();
}
Status DynamicPluginLoader::uninstall() {
// close plugin
RETURN_IF_ERROR(close_plugin());
// remove plugin install path
RETURN_IF_ERROR(FileUtils::remove_all(_install_path + "/" + _name));
return Status::OK();
}
Status DynamicPluginLoader::close_plugin() {
if (_close) {
return Status::OK();
}
if (_plugin.get() != nullptr) {
RETURN_IF_ERROR(close_valid());
if (_plugin->close != nullptr) {
// todo: what should be send?
_plugin->close(&_plugin->handler);
}
}
// builtin plugin don't need dynamic uninstall
if (_plugin_handler != nullptr) {
dynamic_close(_plugin_handler);
}
_close = true;
return Status::OK();
}
BuiltinPluginLoader::BuiltinPluginLoader(const std::string& name, int type, const doris::Plugin* plugin) :
PluginLoader(name, type) {
_plugin = std::make_shared<Plugin>();
std::memcpy(_plugin.get(), plugin, sizeof(Plugin));
}
Status BuiltinPluginLoader::install() {
RETURN_IF_ERROR(open_valid());
LOG(INFO) << "plugin: " << _plugin.get();
if (_plugin->init != nullptr) {
_plugin->init(&_plugin->handler);
}
return Status::OK();
}
Status BuiltinPluginLoader::uninstall() {
if (_close) {
return Status::OK();
}
if (_plugin.get() != nullptr) {
RETURN_IF_ERROR(close_valid());
if (_plugin->close != nullptr) {
// todo: what should be send?
_plugin->close(&_plugin->handler);
}
_plugin.reset();
}
_close = true;
return Status::OK();
}
}

View File

@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_BE_PLUGIN_PLUGIN_LOADER_H
#define DORIS_BE_PLUGIN_PLUGIN_LOADER_H
#include <string>
#include <memory>
#include <vector>
#include "gen_cpp/Types_types.h"
#include "common/status.h"
#include "plugin/plugin.h"
namespace doris {
class PluginLoader {
public:
PluginLoader(const std::string& name, int type): _name(name), _type(type), _close(false) {}
~PluginLoader() {};
virtual Status install() = 0;
virtual Status uninstall() = 0;
virtual std::shared_ptr<Plugin>& plugin() {
return _plugin;
};
const std::string& name() {
return _name;
}
int type() {
return _type;
}
protected:
virtual Status open_valid();
virtual Status close_valid();
protected:
std::string _name;
int _type;
std::shared_ptr<Plugin> _plugin;
bool _close;
};
class DynamicPluginLoader: public PluginLoader {
public:
DynamicPluginLoader(const std::string& name, int type, const std::string& source, const std::string& so_name,
const std::string& install_path) : PluginLoader(name, type), _source(source), _so_name(so_name),
_install_path(install_path), _plugin_handler(nullptr) {
};
~DynamicPluginLoader() {
// just close plugin, but don't clean install path (maybe other plugin has used)
WARN_IF_ERROR(close_plugin(), "close plugin failed.");
};
virtual Status install();
virtual Status uninstall();
private:
Status open_plugin();
Status close_plugin();
private:
std::string _source;
std::string _so_name;
std::string _install_path;
void* _plugin_handler;
};
class BuiltinPluginLoader : public PluginLoader {
public:
BuiltinPluginLoader(const std::string& name, int type, const Plugin* plugin);
~BuiltinPluginLoader() {
WARN_IF_ERROR(uninstall(), "close plugin failed.");
}
virtual Status install();
virtual Status uninstall();
};
}
#endif //DORIS_BE_PLUGIN_PLUGIN_LOADER_H

View File

@ -0,0 +1,162 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/foreach.hpp>
#include "plugin/plugin_mgr.h"
#include "gutil/strings/substitute.h"
namespace doris {
using namespace strings;
#define PLUGIN_TYPE_CHECK(_type) { \
if (_type >= PLUGIN_TYPE_MAX) { \
return Status::InvalidArgument(Substitute("error plugin type: $0", _type)); \
} \
}
Status PluginMgr::install_plugin(const TPluginMetaInfo& info) {
{
std::lock_guard<std::mutex> l(_lock);
auto iter = _plugins[info.type].find(info.name);
if (iter != _plugins[info.type].end()) {
return Status::AlreadyExist("plugin " + info.name + " is already installed");
}
}
DCHECK(info.__isset.so_name);
DCHECK(info.__isset.source);
std::unique_ptr<PluginLoader> loader = std::unique_ptr<PluginLoader>(
new DynamicPluginLoader(info.name, info.type, info.source, info.so_name, config::plugin_path));
Status st = loader->install();
if (!st.ok() && !st.is_already_exist()) {
RETURN_IF_ERROR(loader->uninstall());
return st;
}
{
std::lock_guard<std::mutex> l(_lock);
auto iter = _plugins[info.type].find(info.name);
if (iter != _plugins[info.type].end()) {
return Status::AlreadyExist("plugin " + info.name + " is already installed");
} else {
_plugins[info.type][info.name] = std::move(loader);
};
}
return Status::OK();
}
Status PluginMgr::uninstall_plugin(const TPluginMetaInfo& info) {
std::lock_guard<std::mutex> l(_lock);
auto iter = _plugins[info.type].find(info.name);
if (iter != _plugins[info.type].end()) {
_plugins[info.type].erase(iter);
}
return Status::OK();
}
Status PluginMgr::get_plugin(const std::string& name, int type, std::shared_ptr<Plugin>* plugin) {
PLUGIN_TYPE_CHECK(type);
std::lock_guard<std::mutex> l(_lock);
auto iter = _plugins[type].find(name);
if (iter != _plugins[type].end()) {
*plugin = iter->second->plugin();
return Status::OK();
}
return Status::NotFound(Substitute("not found type $0 plugin $1", type, name));
}
Status PluginMgr::get_plugin(const std::string& name, std::shared_ptr<Plugin>* plugin) {
for (int i = 0; i < PLUGIN_TYPE_MAX; ++i) {
std::lock_guard<std::mutex> l(_lock);
auto iter = _plugins[i].find(name);
if (iter != _plugins[i].end()) {
*plugin = iter->second->plugin();
return Status::OK();
}
}
return Status::NotFound(Substitute("not found plugin $0", name));
}
Status PluginMgr::get_plugin_list(int type, std::vector<std::shared_ptr<Plugin>>* plugin_list) {
PLUGIN_TYPE_CHECK(type);
std::lock_guard<std::mutex> l(_lock);
BOOST_FOREACH(const PluginLoaderMap::value_type& iter, _plugins[type]){
plugin_list->push_back(iter.second->plugin());
}
return Status::OK();
}
Status PluginMgr::register_builtin_plugin(const std::string& name, int type, const doris::Plugin* plugin) {
PLUGIN_TYPE_CHECK(type);
std::lock_guard<std::mutex> l(_lock);
auto iter = _plugins[type].find(name);
if (iter != _plugins[type].end()) {
return Status::AlreadyExist(Substitute("the type $0 plugin $1 already register", type, name));
}
std::unique_ptr<PluginLoader> loader = std::unique_ptr<PluginLoader>(new BuiltinPluginLoader(name, type, plugin));
Status st = loader->install();
if(!st.ok()) {
RETURN_IF_ERROR(loader->uninstall());
return st;
}
_plugins[type][name] = std::move(loader);
return Status::OK();
}
Status PluginMgr::get_all_plugin_info(std::vector<TPluginInfo>* plugin_info_list) {
for (int i = 0; i < PLUGIN_TYPE_MAX; ++i) {
BOOST_FOREACH(const PluginLoaderMap::value_type& iter, _plugins[i]) {
TPluginInfo info;
info.__set_plugin_name(iter.second->name());
info.__set_type(iter.second->type());
plugin_info_list->push_back(info);
}
}
return Status::OK();
}
}

View File

@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_BE_PLUGIN_PLUGIN_MGR_H
#define DORIS_BE_PLUGIN_PLUGIN_MGR_H
#include <string>
#include <unordered_map>
#include <memory>
#include <mutex>
#include "gen_cpp/MasterService_types.h"
#include "gen_cpp/AgentService_types.h"
#include "common/status.h"
#include "plugin/plugin_loader.h"
#include "plugin/plugin.h"
namespace doris {
typedef std::unordered_map<std::string, std::unique_ptr<PluginLoader>> PluginLoaderMap;
class PluginMgr {
public:
PluginMgr() {}
~PluginMgr() {}
Status install_plugin(const TPluginMetaInfo& info);
Status uninstall_plugin(const TPluginMetaInfo& info);
Status register_builtin_plugin(const std::string& name, int type, const Plugin* plugin);
Status get_plugin(const std::string& name, int type, std::shared_ptr<Plugin>* plugin);
Status get_plugin(const std::string& name, std::shared_ptr<Plugin>* plugin);
Status get_plugin_list(int type, std::vector<std::shared_ptr<Plugin>>* plugin_list);
Status get_all_plugin_info(std::vector<TPluginInfo>* plugin_info_list);
private:
PluginLoaderMap _plugins[PLUGIN_TYPE_MAX];
std::mutex _lock;
};
}
#endif // DORIS_BE_PLUGIN_PLUGIN_LOADER_H

View File

@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/algorithm/string/predicate.hpp>
#include <string.h>
#include "plugin/plugin_zip.h"
#include "http/http_client.h"
#include "gutil/strings/util.h"
#include "gutil/strings/substitute.h"
#include "util/file_utils.h"
#include "util/md5.h"
#include "util/time.h"
#include "util/zip_util.h"
#include "util/slice.h"
#include "env/env.h"
namespace doris {
using namespace strings;
bool is_local_source(const std::string& source) {
if (HasPrefixString(source, "http") || HasPrefixString(source, "https")) {
return false;
}
return true;
}
PluginZip::~PluginZip() {
for (auto& p : _clean_paths) {
WARN_IF_ERROR(FileUtils::remove_all(p), "clean plugin_zip temp path failed: " + p);
}
}
Status PluginZip::extract(const std::string& target_dir, const std::string& plugin_name) {
// check plugin install path
std::string plugin_install_path = Substitute("$0/$1", target_dir, plugin_name);
if (FileUtils::check_exist(plugin_install_path)) {
return Status::AlreadyExist(Substitute("plugin $0 already install!", plugin_name));
}
if (!FileUtils::check_exist(target_dir)) {
RETURN_IF_ERROR(FileUtils::create_dir(target_dir));
}
std::string zip_path = _source;
if (!is_local_source(_source)) {
zip_path = Substitute("$0/.temp_$1_$2.zip", target_dir, GetCurrentTimeMicros(), plugin_name);
_clean_paths.push_back(zip_path);
RETURN_IF_ERROR(download(zip_path));
}
// zip extract
ZipFile zip_file(zip_path);
RETURN_IF_ERROR(zip_file.extract(target_dir, plugin_name));
return Status::OK();
}
Status PluginZip::download(const std::string& zip_path) {
// download .zip
Status status;
HttpClient client;
Md5Digest digest;
std::unique_ptr<WritableFile> file;
RETURN_IF_ERROR(Env::Default()->new_writable_file(zip_path, &file));
RETURN_IF_ERROR(client.init(_source));
auto download_cb = [&status, &digest, &file](const void* data, size_t length) {
digest.update(data, length);
Slice slice((const char *)data, length);
status = file->append(slice);
if (!status.ok()) {
LOG(WARNING) << "fail to download data, file: " << file->filename()
<< ", error: " << status.to_string();
return false;
}
return true;
};
RETURN_IF_ERROR(client.execute(download_cb));
RETURN_IF_ERROR(status);
RETURN_IF_ERROR(file->flush(WritableFile::FLUSH_ASYNC));
RETURN_IF_ERROR(file->sync());
RETURN_IF_ERROR(file->close());
// md5 check
HttpClient md5_client;
RETURN_IF_ERROR(md5_client.init(_source + ".md5"));
std::string expect;
auto download_md5_cb = [&status, &expect](const void* data, size_t length) {
expect = std::string((const char*) data, length);
return true;
};
RETURN_IF_ERROR(md5_client.execute(download_md5_cb));
digest.digest();
if (0 != strncmp(digest.hex().c_str(), expect.c_str(), 32)) {
return Status::InternalError(
Substitute("plugin install checksum failed. expect: $0, actual:$1", expect, digest.hex()));
}
return Status::OK();
}
}

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_BE_PLUGIN_PLUGIN_ZIP_H
#define DORIS_BE_PLUGIN_PLUGIN_ZIP_H
#include "common/status.h"
namespace doris {
class PluginZip {
public:
PluginZip(std::string source) : _source(source) {}
~PluginZip();
Status extract(const std::string& target_path, const std::string& plugin_name);
private:
Status download(const std::string& zip_path);
private:
std::string _source;
std::vector<std::string> _clean_paths;
};
}
#endif //DORIS_BE_PLUGIN_PLUGIN_ZIP_H

View File

@ -54,6 +54,7 @@ class StreamLoadExecutor;
class RoutineLoadTaskExecutor;
class SmallFileMgr;
class FileBlockManager;
class PluginMgr;
class BackendServiceClient;
class FrontendServiceClient;
@ -132,6 +133,8 @@ public:
RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; }
HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
PluginMgr* plugin_mgr() { return _plugin_mgr; }
private:
Status _init(const std::vector<StorePath>& store_paths);
void _destory();
@ -181,6 +184,8 @@ private:
RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr;
SmallFileMgr* _small_file_mgr = nullptr;
HeartbeatFlags* _heartbeat_flags = nullptr;
PluginMgr* _plugin_mgr = nullptr;
};

View File

@ -61,6 +61,7 @@
#include "gen_cpp/TExtDataSourceService.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "runtime/heartbeat_flags.h"
#include "plugin/plugin_mgr.h"
namespace doris {
@ -103,6 +104,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_stream_load_executor = new StreamLoadExecutor(this);
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
_small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
_plugin_mgr = new PluginMgr();
_backend_client_cache->init_metrics(DorisMetrics::metrics(), "backend");
_frontend_client_cache->init_metrics(DorisMetrics::metrics(), "frontend");

26
be/test/plugin/CMakeLists.txt Executable file
View File

@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# where to put generated libraries
set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/test/plugin")
# where to put generated binaries
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/test/plugin")
ADD_BE_TEST(plugin_zip_test)
ADD_BE_TEST(plugin_loader_test)
ADD_BE_TEST(plugin_mgr_test)

View File

@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# where to put generated libraries
set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/test/plugin/example")
# where to put generated binaries
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/test/plugin/example")
ADD_BE_PLUGIN(plugin_example)

View File

@ -15,25 +15,37 @@
// specific language governing permissions and limitations
// under the License.
#include <string>
#include <unordered_map>
#include <iostream>
#include "common/status.h"
#include "plugin/plugin.h"
namespace doris {
class PluginManager {
int init(void *) {
std::cout << "this is init" << std::endl;
return 1;
}
public:
Status load_plugin(Plugin* plugin);
int close(void *) {
std::cout << "this is close" << std::endl;
return 2;
}
#ifdef __cplusplus
extern "C" {
#endif
Status unload_plugin(Plugin* plugin);
Status get_plugin(std::string name);
declare_plugin(PluginExample) {
nullptr,
&init,
&close,
3,
nullptr,
nullptr
} declare_plugin_end
private:
std::unordered_map<std::string, Plugin*> _plugins;
};
#ifdef __cplusplus
}
#endif
}
}

View File

@ -0,0 +1,123 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <libgen.h>
#include "plugin/plugin_loader.h"
#include "plugin/plugin.h"
#include "util/file_utils.h"
namespace doris {
class DemoPluginHandler {
public:
const std::string& hello(const std::string& msg) {
_msg = msg;
return _msg;
}
private:
std::string _msg;
};
int init_plugin(void* ptr) {
// handler
void** p = (void**) ptr;
*p = new DemoPluginHandler();
return 0;
}
int close_plugin(void* ptr) {
void** p = (void**) ptr;
delete (DemoPluginHandler*) (*p);
LOG(INFO) << "close demo plugin";
return 1;
}
class PluginLoaderTest : public testing::Test {
public:
PluginLoaderTest() {
char buf[1024];
readlink("/proc/self/exe", buf, 1023);
char* dir_path = dirname(buf);
_path = std::string(dir_path);
}
~PluginLoaderTest() { }
public:
std::string _path;
};
TEST_F(PluginLoaderTest, normal) {
FileUtils::remove_all(_path + "/plugin_test/target");
DynamicPluginLoader plugin_loader("PluginExample", PLUGIN_TYPE_STORAGE, _path + "/plugin_test/source/test.zip",
"libplugin_example.so", _path + "/plugin_test/target");
ASSERT_TRUE(plugin_loader.install().ok());
ASSERT_TRUE(FileUtils::is_dir(_path + "/plugin_test/target/PluginExample"));
ASSERT_TRUE(FileUtils::check_exist(_path + "/plugin_test/target/PluginExample/"));
std::shared_ptr<Plugin> plugin = plugin_loader.plugin();
ASSERT_EQ(3, plugin->flags);
ASSERT_EQ(1, plugin->init(nullptr));
ASSERT_EQ(2, plugin->close(nullptr));
ASSERT_TRUE(plugin->flags & PLUGIN_NOT_DYNAMIC_UNINSTALL);
ASSERT_FALSE(plugin_loader.uninstall().ok());
ASSERT_TRUE(FileUtils::is_dir(_path + "/plugin_test/target/PluginExample"));
ASSERT_TRUE(FileUtils::check_exist(_path + "/plugin_test/target/PluginExample/"));
FileUtils::remove_all(_path + "/plugin_test/target");
}
TEST_F(PluginLoaderTest, builtin) {
Plugin demoPlugin = {
nullptr,
&init_plugin,
&close_plugin,
PLUGIN_DEFAULT_FLAG,
nullptr,
nullptr,
};
BuiltinPluginLoader plugin_loader("test", PLUGIN_TYPE_AUDIT, &demoPlugin);
ASSERT_TRUE(plugin_loader.install().ok());
std::shared_ptr<Plugin> plugin = plugin_loader.plugin();
ASSERT_EQ(PLUGIN_DEFAULT_FLAG, plugin->flags);
ASSERT_NE(nullptr,plugin->handler);
ASSERT_EQ("test",((DemoPluginHandler *)plugin->handler)->hello("test"));
ASSERT_TRUE(plugin_loader.uninstall().ok());
}
}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "string"
#include <gtest/gtest.h>
#include <libgen.h>
#include "plugin/plugin.h"
#include "plugin/plugin_mgr.h"
#include "plugin/plugin_loader.h"
namespace doris {
class DemoPluginHandler {
public:
const std::string& hello(const std::string& msg) {
_msg = msg;
return _msg;
}
private:
std::string _msg;
};
int init_plugin(void* ptr) {
// handler
void** p = (void**) ptr;
*p = new DemoPluginHandler();
return 0;
}
int close_plugin(void* ptr) {
void** p = (void**) ptr;
delete (DemoPluginHandler*) (*p);
LOG(INFO) << "close demo plugin";
return 1;
}
//declare_builtin_plugin(DemoPlugin) {
Plugin DemoPlugin = {
nullptr,
&init_plugin,
&close_plugin,
PLUGIN_DEFAULT_FLAG,
nullptr,
nullptr,
};
class PluginMgrTest : public testing::Test {
public:
PluginMgrTest() {
char buf[1024];
readlink("/proc/self/exe", buf, 1023);
char* dir_path = dirname(buf);
_path = std::string(dir_path);
}
~PluginMgrTest() { }
public:
std::string _path;
};
TEST_F(PluginMgrTest, normal) {
PluginMgr mgr;
mgr.register_builtin_plugin("demo", PLUGIN_TYPE_AUDIT, &DemoPlugin);
std::shared_ptr<Plugin> re;
ASSERT_TRUE(mgr.get_plugin("demo", PLUGIN_TYPE_AUDIT, &re).ok());
ASSERT_NE(nullptr, re.get());
ASSERT_EQ("test", ((DemoPluginHandler*) re->handler)->hello("test"));
ASSERT_TRUE(mgr.get_plugin("demo", &re).ok());
ASSERT_EQ("test", ((DemoPluginHandler*) re->handler)->hello("test"));
std::vector<std::shared_ptr<Plugin>> list;
ASSERT_TRUE(mgr.get_plugin_list(PLUGIN_TYPE_AUDIT, &list).ok());
ASSERT_EQ(1, list.size());
}
}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

Binary file not shown.

View File

@ -0,0 +1 @@
c68ae6cce50da90a5fd8a818d4e85c96 test.zip

View File

@ -0,0 +1,164 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "plugin/plugin_zip.h"
#include <memory>
#include <gtest/gtest.h>
#include <set>
#include <libgen.h>
#include <cstdio>
#include <cstdlib>
#include "util/file_utils.h"
#include "util/slice.h"
#include "env/env.h"
#include "http/ev_http_server.h"
#include "http/http_handler.h"
#include "http/http_request.h"
#include "http/http_channel.h"
namespace doris {
class HttpTestHandler : public HttpHandler {
public:
void handle(HttpRequest* req) override {
char buf[1024];
readlink("/proc/self/exe", buf, 1023);
char* dir_path = dirname(buf);
std::string path = std::string(dir_path);
std::unique_ptr<SequentialFile> file;
auto& file_name = req->param("FILE");
FILE* fp = fopen((path + "/plugin_test/source/" + file_name).c_str(), "r");
std::string response;
char f[1024];
while (true) {
auto size = fread(f, 1, 1024, fp);
response.append(f, size);
if (size < 1024) {
break;
}
}
fclose(fp);
HttpChannel::send_reply(req, response);
}
};
class PluginZipTest : public testing::Test {
public:
PluginZipTest() {
char buf[1024];
readlink("/proc/self/exe", buf, 1023);
char* dir_path = dirname(buf);
_path = std::string(dir_path);
_server.reset(new EvHttpServer(29191));
_server->register_handler(GET, "/{FILE}", &_handler);
_server->start();
std::cout << "the path: " << _path << std::endl;
}
~PluginZipTest() {
_server->stop();
};
public:
std::string _path;
std::unique_ptr<EvHttpServer> _server;
HttpTestHandler _handler;
};
TEST_F(PluginZipTest, local_normal) {
FileUtils::remove_all(_path + "/plugin_test/target");
PluginZip zip(_path + "/plugin_test/source/test.zip");
ASSERT_TRUE(zip.extract(_path + "/plugin_test/target/", "test").ok());
ASSERT_TRUE(FileUtils::check_exist(_path + "/plugin_test/target/test"));
ASSERT_TRUE(FileUtils::check_exist(_path + "/plugin_test/target/test/test.txt"));
std::unique_ptr<RandomAccessFile> file;
Env::Default()->new_random_access_file(_path + "/plugin_test/target/test/test.txt", &file);
char f[11];
Slice s(f, 11);
file->read_at(0, s);
ASSERT_EQ("hello world", s.to_string());
FileUtils::remove_all(_path + "/plugin_test/target/");
}
TEST_F(PluginZipTest, http_normal) {
FileUtils::remove_all(_path + "/plugin_test/target");
PluginZip zip("http://127.0.0.1:29191/test.zip");
// ASSERT_TRUE(zip.extract(_path + "/plugin_test/target/", "test").ok());
Status st = (zip.extract(_path + "/plugin_test/target/", "test"));
std::cout << st.to_string() << std::endl;
ASSERT_TRUE(FileUtils::check_exist(_path + "/plugin_test/target/test"));
ASSERT_TRUE(FileUtils::check_exist(_path + "/plugin_test/target/test/test.txt"));
std::unique_ptr<RandomAccessFile> file;
Env::Default()->new_random_access_file(_path + "/plugin_test/target/test/test.txt", &file);
char f[11];
Slice s(f, 11);
file->read_at(0, s);
ASSERT_EQ("hello world", s.to_string());
std::set<std::string> dirs;
std::set<std::string> files;
ASSERT_TRUE(FileUtils::list_dirs_files(_path + "/plugin_test/target", &dirs, &files, Env::Default()).ok());
ASSERT_EQ(1, dirs.size());
ASSERT_EQ(1, files.size());
FileUtils::remove_all(_path + "/plugin_test/target/");
}
TEST_F(PluginZipTest, already_install) {
FileUtils::remove_all(_path + "/plugin_test/target");
PluginZip zip("http://127.0.0.1:29191/test.zip");
ASSERT_FALSE(zip.extract(_path + "/plugin_test/", "source").ok());
}
}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -251,6 +251,13 @@ struct TUpdateTabletMetaInfoReq {
1: optional list<TTabletMetaInfo> tabletMetaInfos
}
struct TPluginMetaInfo {
1: required string name
2: required i32 type
3: optional string so_name
4: optional string source
}
struct TAgentTaskRequest {
1: required TAgentServiceVersion protocol_version
2: required Types.TTaskType task_type

View File

@ -73,6 +73,11 @@ struct TDisk {
7: optional Types.TStorageMedium storage_medium
}
struct TPluginInfo {
1: required string plugin_name
2: required i32 type
}
struct TReportRequest {
1: required Types.TBackend backend
2: optional i64 report_version

View File

@ -165,7 +165,9 @@ enum TTaskType {
STREAM_LOAD,
UPDATE_TABLET_META_INFO,
// this type of task will replace both ROLLUP and SCHEMA_CHANGE
ALTER
ALTER,
INSTALL_PLUGIN,
UNINSTALL_PLUGIN
}
enum TStmtType {

View File

@ -130,6 +130,7 @@ if [ -d ${DORIS_TEST_BINARY_DIR}/util/test_data ]; then
rm -rf ${DORIS_TEST_BINARY_DIR}/util/test_data
fi
cp -r ${DORIS_HOME}/be/test/util/test_data ${DORIS_TEST_BINARY_DIR}/util/
cp -r ${DORIS_HOME}/be/test/plugin/plugin_test ${DORIS_TEST_BINARY_DIR}/plugin/
# Running Util Unittest
${DORIS_TEST_BINARY_DIR}/util/bit_util_test
@ -307,6 +308,11 @@ ${DORIS_TEST_BINARY_DIR}/runtime/kafka_consumer_pipe_test
${DORIS_TEST_BINARY_DIR}/runtime/routine_load_task_executor_test
${DORIS_TEST_BINARY_DIR}/runtime/heartbeat_flags_test
# Runing plugin test
${DORIS_TEST_BINARY_DIR}/Plugin/plugin/plugin_loader_test
${DORIS_TEST_BINARY_DIR}/Plugin/plugin/plugin_mgr_test
${DORIS_TEST_BINARY_DIR}/Plugin/plugin/plugin_zip_test
# Running agent unittest
# Prepare agent testdata
if [ -d ${DORIS_TEST_BINARY_DIR}/agent/test_data ]; then