[Plugin] Making FE audit module pluggable (#3219)

Currently we have implemented the plugin framework in FE. 
This CL make the original audit log logic pluggable.
The following classes are mainly implemented:

1. AuditPlugin
    The interface of audit plugin

2. AuditEvent
    An AuditEvent contains all information about an audit event, such as a query, or a connection.

3. AuditEventProcessor
    Audit event processor receive all audit events and deliver them to all installed audit plugins.

This CL implements two audit module plugins:

1. The builtin plugin `AuditLogBuilder`, which act same as the previous logic, to save the 
    audit log to the `fe.audit.log`

2. An optional plugin `AuditLoader`, which will periodically inserts the audit log into a Doris table
    specified by the user. In this way, users can conveniently use SQL to query and analyze this
    audit log table.

Some documents are added:

1. HELP docs of install/uninstall/show plugin.
2. Rename the `README.md` in `fe_plugins/` dir to `plugin-development-manual.md` and move
    it to the `docs/` dir
3. `audit-plugin.md` to introduce the usage of `AuditLoader` plugin.

ISSUE: #3226
This commit is contained in:
Mingyu Chen
2020-04-03 09:53:50 +08:00
committed by GitHub
parent c9ff6f68d1
commit fcb651329c
49 changed files with 2378 additions and 631 deletions

View File

@ -55,6 +55,35 @@ public class QueryPlanTest {
String createDbStmtStr = "create database test;";
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
Catalog.getCurrentCatalog().createDb(createDbStmt);
createTable("create table test.test1\n" +
"(\n" +
" query_id varchar(48) comment \"Unique query id\",\n" +
" time datetime not null comment \"Query start time\",\n" +
" client_ip varchar(32) comment \"Client IP\",\n" +
" user varchar(64) comment \"User name\",\n" +
" db varchar(96) comment \"Database of this query\",\n" +
" state varchar(8) comment \"Query result state. EOF, ERR, OK\",\n" +
" query_time bigint comment \"Query execution time in millisecond\",\n" +
" scan_bytes bigint comment \"Total scan bytes of this query\",\n" +
" scan_rows bigint comment \"Total scan rows of this query\",\n" +
" return_rows bigint comment \"Returned rows of this query\",\n" +
" stmt_id int comment \"An incremental id of statement\",\n" +
" is_query tinyint comment \"Is this statemt a query. 1 or 0\",\n" +
" frontend_ip varchar(32) comment \"Frontend ip of executing this statement\",\n" +
" stmt varchar(2048) comment \"The original statement, trimed if longer than 2048 bytes\"\n" +
")\n" +
"partition by range(time) ()\n" +
"distributed by hash(query_id) buckets 1\n" +
"properties(\n" +
" \"dynamic_partition.time_unit\" = \"DAY\",\n" +
" \"dynamic_partition.start\" = \"-30\",\n" +
" \"dynamic_partition.end\" = \"3\",\n" +
" \"dynamic_partition.prefix\" = \"p\",\n" +
" \"dynamic_partition.buckets\" = \"1\",\n" +
" \"dynamic_partition.enable\" = \"true\",\n" +
" \"replication_num\" = \"1\"\n" +
");");
createTable("CREATE TABLE test.bitmap_table (\n" +
" `id` int(11) NULL COMMENT \"\",\n" +

View File

@ -21,17 +21,18 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DigitalVersion;
import org.apache.doris.plugin.PluginInfo.PluginType;
import org.apache.commons.io.FileUtils;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Map;
public class PluginLoaderTest {
@Before
@ -71,7 +72,7 @@ public class PluginLoaderTest {
DigitalVersion.JDK_1_8_0, "plugin.PluginTest", "libtest.so", "plugin_test.jar");
DynamicPluginLoader util = new DynamicPluginLoader(PluginTestUtil.getTestPathString(""), info);
Plugin p = util.dynamicLoadPlugin(PluginTestUtil.getTestPath(""));
Plugin p = util.dynamicLoadPlugin();
p.init(null, null);
p.close();

View File

@ -20,71 +20,51 @@ package org.apache.doris.plugin;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.apache.doris.analysis.InstallPluginStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.DataOutputBuffer;
import org.apache.doris.utframe.UtFrameUtils;
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.apache.commons.io.FileUtils;
import org.apache.doris.analysis.InstallPluginStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.FakeCatalog;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.DataOutputBuffer;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.junit.Before;
import org.junit.Test;
import java.util.UUID;
public class PluginMgrTest {
class TestPlugin extends Plugin implements AuditPlugin {
private static String runningDir = "fe/mocked/PluginMgrTest/" + UUID.randomUUID().toString() + "/";
@Override
public boolean eventFilter(short type, short masks) {
return type == AuditEvent.AUDIT_CONNECTION;
}
@Override
public void exec(AuditEvent event) {
}
@BeforeClass
public static void beforeClass() throws Exception {
UtFrameUtils.createMinDorisCluster(runningDir);
}
private Catalog catalog;
private FakeCatalog fakeCatalog;
private PluginMgr mgr;
@AfterClass
public static void tearDown() {
File file = new File(runningDir);
file.delete();
}
@Before
public void setUp() {
try {
fakeCatalog = new FakeCatalog();
catalog = Deencapsulation.newInstance(Catalog.class);
FakeCatalog.setCatalog(catalog);
FakeCatalog.setMetaVersion(FeConstants.meta_version);
FileUtils.deleteQuietly(PluginTestUtil.getTestFile("target"));
assertFalse(Files.exists(PluginTestUtil.getTestPath("target")));
Files.createDirectory(PluginTestUtil.getTestPath("target"));
assertTrue(Files.exists(PluginTestUtil.getTestPath("target")));
Config.plugin_dir = PluginTestUtil.getTestPathString("target");
mgr = new PluginMgr();
PluginInfo info = new PluginInfo("TestPlugin", PluginInfo.PluginType.AUDIT, "use for test");
assertTrue(mgr.registerPlugin(info, new TestPlugin()));
} catch (IOException e) {
e.printStackTrace();
}
public void setUp() throws IOException {
FileUtils.deleteQuietly(PluginTestUtil.getTestFile("target"));
assertFalse(Files.exists(PluginTestUtil.getTestPath("target")));
Files.createDirectory(PluginTestUtil.getTestPath("target"));
assertTrue(Files.exists(PluginTestUtil.getTestPath("target")));
Config.plugin_dir = PluginTestUtil.getTestPathString("target");
}
@Test
@ -94,30 +74,31 @@ public class PluginMgrTest {
assertFalse(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo/auditdemo.jar")));
InstallPluginStmt stmt = new InstallPluginStmt(PluginTestUtil.getTestPathString("auditdemo.zip"));
mgr.installPlugin(stmt);
Catalog.getCurrentCatalog().installPlugin(stmt);
assertEquals(2, mgr.getActivePluginList(PluginInfo.PluginType.AUDIT).size());
PluginMgr pluginMgr = Catalog.getCurrentPluginMgr();
Plugin p = mgr.getActivePlugin("audit_plugin_demo", PluginInfo.PluginType.AUDIT);
assertEquals(2, pluginMgr.getActivePluginList(PluginInfo.PluginType.AUDIT).size());
Plugin p = pluginMgr.getActivePlugin("audit_plugin_demo", PluginInfo.PluginType.AUDIT);
assertNotNull(p);
assertTrue(p instanceof AuditPlugin);
assertTrue(((AuditPlugin) p).eventFilter(AuditEvent.AUDIT_QUERY, AuditEvent.AUDIT_QUERY_START));
assertTrue(((AuditPlugin) p).eventFilter(AuditEvent.AUDIT_QUERY, AuditEvent.AUDIT_QUERY_END));
assertFalse(((AuditPlugin) p).eventFilter(AuditEvent.AUDIT_CONNECTION, AuditEvent.AUDIT_QUERY_END));
assertTrue(((AuditPlugin) p).eventFilter(AuditEvent.EventType.AFTER_QUERY));
assertFalse(((AuditPlugin) p).eventFilter(AuditEvent.EventType.BEFORE_QUERY));
assertTrue(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo")));
assertTrue(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo/auditdemo.jar")));
assertEquals(1, mgr.getAllDynamicPluginInfo().size());
PluginInfo info = mgr.getAllDynamicPluginInfo().get(0);
assertEquals(1, pluginMgr.getAllDynamicPluginInfo().size());
PluginInfo info = pluginMgr.getAllDynamicPluginInfo().get(0);
assertEquals("audit_plugin_demo", info.getName());
assertEquals(PluginInfo.PluginType.AUDIT, info.getType());
assertEquals("just for test", info.getDescription());
assertEquals("plugin.AuditPluginDemo", info.getClassName());
mgr.uninstallPlugin("audit_plugin_demo");
pluginMgr.uninstallPlugin("audit_plugin_demo");
assertFalse(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo")));
assertFalse(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo/auditdemo.jar")));
@ -139,25 +120,27 @@ public class PluginMgrTest {
PluginTestUtil.getTestPath("test_plugin").toFile());
InstallPluginStmt stmt = new InstallPluginStmt(PluginTestUtil.getTestPathString("test_plugin"));
mgr.installPlugin(stmt);
Catalog.getCurrentCatalog().installPlugin(stmt);
PluginMgr pluginMgr = Catalog.getCurrentPluginMgr();
assertFalse(Files.exists(PluginTestUtil.getTestPath("test_plugin")));
assertFalse(Files.exists(PluginTestUtil.getTestPath("test_plugin/auditdemo.jar")));
Plugin p = mgr.getActivePlugin("audit_plugin_demo", PluginInfo.PluginType.AUDIT);
Plugin p = pluginMgr.getActivePlugin("audit_plugin_demo", PluginInfo.PluginType.AUDIT);
assertEquals(2, mgr.getActivePluginList(PluginInfo.PluginType.AUDIT).size());
assertEquals(2, pluginMgr.getActivePluginList(PluginInfo.PluginType.AUDIT).size());
assertNotNull(p);
assertTrue(p instanceof AuditPlugin);
assertTrue(((AuditPlugin) p).eventFilter(AuditEvent.AUDIT_QUERY, AuditEvent.AUDIT_QUERY_START));
assertTrue(((AuditPlugin) p).eventFilter(AuditEvent.AUDIT_QUERY, AuditEvent.AUDIT_QUERY_END));
assertFalse(((AuditPlugin) p).eventFilter(AuditEvent.AUDIT_CONNECTION, AuditEvent.AUDIT_QUERY_END));
assertTrue(((AuditPlugin) p).eventFilter(AuditEvent.EventType.AFTER_QUERY));
assertFalse(((AuditPlugin) p).eventFilter(AuditEvent.EventType.BEFORE_QUERY));
assertTrue(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo")));
assertTrue(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo/auditdemo.jar")));
mgr.uninstallPlugin("audit_plugin_demo");
testSerializeBuiltinPlugin(pluginMgr);
pluginMgr.uninstallPlugin("audit_plugin_demo");
assertFalse(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo")));
assertFalse(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo/auditdemo.jar")));
@ -168,10 +151,7 @@ public class PluginMgrTest {
}
}
@Test
public void testSerializeBuiltinPlugin() {
private void testSerializeBuiltinPlugin(PluginMgr mgr) {
try {
DataOutputBuffer dob = new DataOutputBuffer();
DataOutputStream dos = new DataOutputStream(dob);
@ -180,47 +160,10 @@ public class PluginMgrTest {
PluginMgr test = new PluginMgr();
test.readFields(new DataInputStream(new ByteArrayInputStream(dob.getData())));
assertEquals(0, test.getAllDynamicPluginInfo().size());
assertEquals(1, test.getAllDynamicPluginInfo().size());
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testSerializeDynamicPlugin() {
try {
assertFalse(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo")));
assertFalse(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo/auditdemo.jar")));
InstallPluginStmt stmt = new InstallPluginStmt(PluginTestUtil.getTestPathString("auditdemo.zip"));
mgr.installPlugin(stmt);
assertTrue(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo")));
assertTrue(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo/auditdemo.jar")));
assertEquals(1, mgr.getAllDynamicPluginInfo().size());
DataOutputBuffer dob = new DataOutputBuffer();
DataOutputStream dos = new DataOutputStream(dob);
mgr.write(dos);
PluginMgr test = new PluginMgr();
assertNotNull(dob);
assertNotNull(test);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dob.getData()));
test.readFields(dis);
assertEquals(1, test.getAllDynamicPluginInfo().size());
mgr.uninstallPlugin("audit_plugin_demo");
assertFalse(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo")));
assertFalse(Files.exists(PluginTestUtil.getTestPath("target/audit_plugin_demo/auditdemo.jar")));
} catch (IOException | UserException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,124 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.util.DigitalVersion;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.PluginInfo;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.qe.AuditEventProcessor;
import org.apache.doris.qe.AuditLogBuilder;
import org.apache.doris.utframe.UtFrameUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
public class AuditEventProcessorTest {
private static String runningDir = "fe/mocked/AuditProcessorTest/" + UUID.randomUUID().toString() + "/";
@BeforeClass
public static void beforeClass() throws Exception {
UtFrameUtils.createMinDorisCluster(runningDir);
}
@AfterClass
public static void tearDown() {
File file = new File(runningDir);
file.delete();
}
@Test
public void testAuditEvent() {
AuditEvent event = new AuditEvent.AuditEventBuilder().setEventType(EventType.AFTER_QUERY)
.setTimestamp(System.currentTimeMillis())
.setClientIp("127.0.0.1")
.setUser("user1")
.setDb("db1")
.setState("EOF")
.setQueryTime(2000)
.setScanBytes(100000)
.setScanRows(200000)
.setReturnRows(1)
.setStmtId(1234)
.setStmt("select * from tbl1").build();
Assert.assertEquals("127.0.0.1", event.clientIp);
Assert.assertEquals(200000, event.scanRows);
}
@Test
public void testAuditLogBuilder() throws IOException {
try (AuditLogBuilder auditLogBuilder = new AuditLogBuilder()) {
PluginInfo pluginInfo = auditLogBuilder.getPluginInfo();
Assert.assertEquals(DigitalVersion.fromString("0.12.0"), pluginInfo.getVersion());
Assert.assertEquals(DigitalVersion.fromString("1.8.31"), pluginInfo.getJavaVersion());
long start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
AuditEvent event = new AuditEvent.AuditEventBuilder().setEventType(EventType.AFTER_QUERY)
.setTimestamp(System.currentTimeMillis())
.setClientIp("127.0.0.1")
.setUser("user1")
.setDb("db1")
.setState("EOF")
.setQueryTime(2000)
.setScanBytes(100000)
.setScanRows(200000)
.setReturnRows(i)
.setStmtId(1234)
.setStmt("select * from tbl1").build();
if (auditLogBuilder.eventFilter(event.type)) {
auditLogBuilder.exec(event);
}
}
long total = System.currentTimeMillis() - start;
System.out.println("total(ms): " + total + ", avg: " + total / 10000.0);
}
}
@Test
public void testAuditEventProcessor() throws IOException {
AuditEventProcessor processor = Catalog.getCurrentAuditEventProcessor();
long start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
AuditEvent event = new AuditEvent.AuditEventBuilder().setEventType(EventType.AFTER_QUERY)
.setTimestamp(System.currentTimeMillis())
.setClientIp("127.0.0.1")
.setUser("user1")
.setDb("db1")
.setState("EOF")
.setQueryTime(2000)
.setScanBytes(100000)
.setScanRows(200000)
.setReturnRows(i)
.setStmtId(1234)
.setStmt("select * from tbl1").build();
processor.handleAuditEvent(event);
}
long total = System.currentTimeMillis() - start;
System.out.println("total(ms): " + total + ", avg: " + total / 10000.0);
}
}

View File

@ -17,8 +17,6 @@
package org.apache.doris.qe;
import mockit.Expectations;
import mockit.Mocked;
import org.apache.doris.analysis.AccessTestUtil;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.jmockit.Deencapsulation;
@ -29,6 +27,7 @@ import org.apache.doris.mysql.MysqlEofPacket;
import org.apache.doris.mysql.MysqlErrPacket;
import org.apache.doris.mysql.MysqlOkPacket;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.proto.PQueryStatistics;
import org.apache.doris.thrift.TUniqueId;
@ -41,13 +40,16 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import mockit.Expectations;
import mockit.Mocked;
public class ConnectProcessorTest {
private static ByteBuffer initDbPacket;
private static ByteBuffer pingPacket;
private static ByteBuffer quitPacket;
private static ByteBuffer queryPacket;
private static ByteBuffer fieldListPacket;
private static AuditBuilder auditBuilder = new AuditBuilder();
private static AuditEventBuilder auditBuilder = new AuditEventBuilder();
private static ConnectContext myContext;
@Mocked
@ -202,7 +204,7 @@ public class ConnectProcessorTest {
minTimes = 0;
result = catalog;
context.getAuditBuilder();
context.getAuditEventBuilder();
minTimes = 0;
result = auditBuilder;

View File

@ -24,6 +24,7 @@ import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.SqlParserUtils;
@ -126,6 +127,7 @@ public class UtFrameUtils {
if (Strings.isNullOrEmpty(dorisHome)) {
dorisHome = Files.createTempDirectory("DORIS_HOME").toAbsolutePath().toString();
}
Config.plugin_dir = dorisHome + "/plugins";
int fe_http_port = findValidPort();
int fe_rpc_port = findValidPort();