[fix](audit-plugin) Fix audit load plugin may stopped when throw unexpected exceptions (#7607)
Fix audit load may stopped when throw unexpected exceptions
This commit is contained in:
@ -29,6 +29,11 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.CharBuffer;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.CharsetDecoder;
|
||||
import java.nio.charset.CodingErrorAction;
|
||||
import java.nio.file.FileSystems;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
@ -36,7 +41,6 @@ import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.TimeZone;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -62,9 +66,6 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
|
||||
private volatile boolean isClosed = false;
|
||||
private volatile boolean isInit = false;
|
||||
|
||||
// the max auditEventQueue size to store audit_event
|
||||
private static final int MAX_AUDIT_EVENT_SIZE = 4096;
|
||||
|
||||
@Override
|
||||
public void init(PluginInfo info, PluginContext ctx) throws PluginException {
|
||||
super.init(info, ctx);
|
||||
@ -161,12 +162,27 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
|
||||
auditBuffer.append(event.peakMemoryBytes).append("\t");
|
||||
// trim the query to avoid too long
|
||||
// use `getBytes().length` to get real byte length
|
||||
int maxLen = Math.min(conf.max_stmt_length, event.stmt.getBytes().length);
|
||||
String stmt = new String(event.stmt.getBytes(), 0, maxLen).replace("\t", " ");
|
||||
String stmt = truncateByBytes(event.stmt).replace("\t", " ");
|
||||
LOG.debug("receive audit event with stmt: {}", stmt);
|
||||
auditBuffer.append(stmt).append("\n");
|
||||
}
|
||||
|
||||
private String truncateByBytes(String str) {
|
||||
int maxLen = Math.min(conf.max_stmt_length, str.getBytes().length);
|
||||
if (maxLen >= str.getBytes().length) {
|
||||
return str;
|
||||
}
|
||||
Charset utf8Charset = Charset.forName("UTF-8");
|
||||
CharsetDecoder decoder = utf8Charset.newDecoder();
|
||||
byte[] sb = str.getBytes();
|
||||
ByteBuffer buffer = ByteBuffer.wrap(sb, 0, maxLen);
|
||||
CharBuffer charBuffer = CharBuffer.allocate(maxLen);
|
||||
decoder.onMalformedInput(CodingErrorAction.IGNORE);
|
||||
decoder.decode(buffer, charBuffer, true);
|
||||
decoder.flush(charBuffer);
|
||||
return new String(charBuffer.array(), 0, charBuffer.position());
|
||||
}
|
||||
|
||||
private void loadIfNecessary(DorisStreamLoader loader) {
|
||||
if (auditBuffer.length() < conf.maxBatchSize && System.currentTimeMillis() - lastLoadTime < conf.maxBatchIntervalSec * 1000) {
|
||||
return;
|
||||
@ -256,8 +272,10 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
|
||||
assembleAudit(event);
|
||||
loadIfNecessary(loader);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("encounter exception when loading current audit batch", e);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.debug("encounter exception when loading current audit batch", ie);
|
||||
} catch (Exception e) {
|
||||
LOG.error("run audit logger error:", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user