[feature](journal) Add a method to write a set of journals in batch (#30582)

* [feature](journal) Add a method to write a set of journals in batch (#30380)

* [feature](journal) Add log and metric to improve the observability of journal batch (#30401)
This commit is contained in:
walter
2024-01-30 19:09:31 +08:00
committed by yiguolei
parent 59b79d47ca
commit 7838ba6d4e
8 changed files with 311 additions and 1 deletions

View File

@ -49,6 +49,11 @@ public interface Journal {
// Write a journal and sync to disk
public long write(short op, Writable writable) throws IOException;
// Write a set of journal to disk in batch.
//
// Return the first id of the batched journals.
public long write(JournalBatch batch) throws IOException;
// Get current journal number
public long getJournalNum();

View File

@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.journal;
import org.apache.doris.common.io.DataOutputBuffer;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.OperationType;
import java.io.IOException;
import java.util.ArrayList;
public class JournalBatch {
private static final int OUTPUT_BUFFER_INIT_SIZE = 128;
private ArrayList<Entity> entities;
public JournalBatch() {
entities = new ArrayList<>();
}
public JournalBatch(int cap) {
entities = new ArrayList<>(cap);
}
// Add a writable data into journal batch.
//
// The writable data will be serialized and saved in the journal batch with an internal
// representation, so it is safety to update the data object once this function returned.
//
// If the batch is too large, it may cause a latency spike. Generally, we recommend controlling
// the number of batch entities to less than 32 and the batch data size to less than 640KB.
public void addJournal(short op, Writable data) throws IOException {
if (op == OperationType.OP_TIMESTAMP) {
// OP_TIMESTAMP is not supported, see `BDBJEJournal.write` for details.
throw new RuntimeException("JournalBatch.addJournal is not supported OP_TIMESTAMP");
}
JournalEntity entity = new JournalEntity();
entity.setOpCode(op);
entity.setData(data);
DataOutputBuffer buffer = new DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
entity.write(buffer);
entities.add(new Entity(op, buffer));
}
public ArrayList<Entity> getJournalEntities() {
return entities;
}
public static class Entity {
short op;
DataOutputBuffer data;
Entity(short op, DataOutputBuffer data) {
this.op = op;
this.data = data;
}
public short getOpCode() {
return op;
}
public byte[] getBinaryData() {
return data.getData();
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.journal.Journal;
import org.apache.doris.journal.JournalBatch;
import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.metric.MetricRepo;
@ -48,6 +49,7 @@ import com.sleepycat.je.rep.ReplicaWriteException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.RollbackException;
import com.sleepycat.je.rep.TimeConsistencyPolicy;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -59,6 +61,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/*
* This is the bdb implementation of Journal interface.
* First, we open() this journal, then read from or write to the bdb environment
@ -121,6 +124,102 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
}
}
@Override
public synchronized long write(JournalBatch batch) throws IOException {
List<JournalBatch.Entity> entities = batch.getJournalEntities();
int entitySize = entities.size();
long dataSize = 0;
long firstId = nextJournalId.getAndAdd(entitySize);
// Write the journals to bdb.
for (int i = 0; i < RETRY_TIME; i++) {
Transaction txn = null;
StopWatch watch = StopWatch.createStarted();
try {
// The default config is constructed from the configs of environment.
txn = bdbEnvironment.getReplicatedEnvironment().beginTransaction(null, null);
dataSize = 0;
for (int j = 0; j < entitySize; ++j) {
JournalBatch.Entity entity = entities.get(j);
DatabaseEntry theKey = idToKey(firstId + j);
DatabaseEntry theData = new DatabaseEntry(entity.getBinaryData());
currentJournalDB.put(txn, theKey, theData); // Put with overwrite, it always success
dataSize += theData.getSize();
if (i == 0) {
LOG.debug("opCode = {}, journal size = {}", entity.getOpCode(), theData.getSize());
}
}
txn.commit();
txn = null;
if (MetricRepo.isInit) {
MetricRepo.COUNTER_EDIT_LOG_SIZE_BYTES.increase(dataSize);
MetricRepo.COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES.increase(dataSize);
MetricRepo.HISTO_JOURNAL_BATCH_SIZE.update(entitySize);
MetricRepo.HISTO_JOURNAL_BATCH_DATA_SIZE.update(dataSize);
}
if (entitySize > 32) {
LOG.warn("write bdb journal batch is too large, batch size {}, the first journal id {}, "
+ "data size {}", entitySize, firstId, dataSize);
}
if (dataSize > 640 * 1024) { // 640KB
LOG.warn("write bdb journal batch data is too large, data size {}, the first journal id {}, "
+ "batch size {}", dataSize, firstId, entitySize);
}
return firstId;
} catch (ReplicaWriteException e) {
/**
* This exception indicates that an update operation or transaction commit
* or abort was attempted while in the
* {@link ReplicatedEnvironment.State#REPLICA} state. The transaction is marked
* as being invalid.
* <p>
* The exception is the result of either an error in the application logic or
* the result of a transition of the node from Master to Replica while a
* transaction was in progress.
* <p>
* The application must abort the current transaction and redirect all
* subsequent update operations to the Master.
*/
LOG.error("catch ReplicaWriteException when writing to database, will exit. the first journal id {}",
firstId, e);
String msg = "write bdb failed. will exit. the first journalId: " + firstId + ", bdb database Name: "
+ currentJournalDB.getDatabaseName();
LOG.error(msg);
Util.stdoutWithTime(msg);
System.exit(-1);
} catch (DatabaseException e) {
LOG.error("catch an exception when writing to database. sleep and retry. the first journal id {}",
firstId, e);
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e1) {
LOG.warn("", e1);
}
} finally {
if (txn != null) {
txn.abort();
}
watch.stop();
if (watch.getTime() > 100000) { // 100ms
LOG.warn("write bdb is too slow, cost {}ms, the first journal id, batch size {}, data size{}",
watch.getTime(), firstId, entitySize, dataSize);
}
}
}
String msg = "write bdb failed. will exit. the first journalId: " + firstId + ", bdb database Name: "
+ currentJournalDB.getDatabaseName();
LOG.error(msg);
Util.stdoutWithTime(msg);
System.exit(-1);
return 0; // unreachable!
}
@Override
public synchronized long write(short op, Writable writable) throws IOException {
JournalEntity entity = new JournalEntity();

View File

@ -19,6 +19,7 @@ package org.apache.doris.journal.local;
import org.apache.doris.common.io.Writable;
import org.apache.doris.journal.Journal;
import org.apache.doris.journal.JournalBatch;
import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.persist.EditLogFileOutputStream;
@ -139,6 +140,17 @@ public class LocalJournal implements Journal {
return cursor;
}
@Override
public synchronized long write(JournalBatch batch) throws IOException {
List<JournalBatch.Entity> entities = batch.getJournalEntities();
for (JournalBatch.Entity entity : entities) {
outputStream.write(entity.getOpCode(), entity.getBinaryData());
}
outputStream.setReadyToFlush();
outputStream.flush();
return journalId.getAndAdd(entities.size());
}
@Override
public synchronized long write(short op, Writable writable) throws IOException {
outputStream.write(op, writable);

View File

@ -96,6 +96,8 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_SUCCESS;
public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_FAILED;
public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY;
public static Histogram HISTO_JOURNAL_BATCH_SIZE;
public static Histogram HISTO_JOURNAL_BATCH_DATA_SIZE;
public static LongCounterMetric COUNTER_IMAGE_WRITE_SUCCESS;
public static LongCounterMetric COUNTER_IMAGE_WRITE_FAILED;
@ -380,7 +382,11 @@ public final class MetricRepo {
COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES.addLabel(new MetricLabel("type", "current_bytes"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES);
HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(
MetricRegistry.name("editlog", "write", "latency", "ms"));
MetricRegistry.name("editlog", "write", "latency", "ms"));
HISTO_JOURNAL_BATCH_SIZE = METRIC_REGISTER.histogram(
MetricRegistry.name("journal", "write", "batch_size"));
HISTO_JOURNAL_BATCH_DATA_SIZE = METRIC_REGISTER.histogram(
MetricRegistry.name("journal", "write", "batch_data_size"));
// edit log clean
COUNTER_EDIT_LOG_CLEAN_SUCCESS = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS,

View File

@ -65,6 +65,11 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
writable.write(bufCurrent);
}
public void write(short op, byte[] data) throws IOException {
bufCurrent.writeShort(op);
bufCurrent.write(data);
}
// Create empty edits logs file.
void create() throws IOException {
fc.truncate(0);

View File

@ -51,6 +51,8 @@ public abstract class EditLogOutputStream extends OutputStream {
*/
public abstract void write(short op, Writable writable) throws IOException;
public abstract void write(short op, byte[] data) throws IOException;
abstract void create() throws IOException;
public abstract void close() throws IOException;