pick https://github.com/apache/doris/pull/42210
This commit is contained in:
@ -5579,8 +5579,9 @@ public class Env {
|
||||
ConfigBase.setMutableConfig(key, value);
|
||||
if (configtoThreads.get(key) != null) {
|
||||
try {
|
||||
// not atomic. maybe delay to aware. but acceptable.
|
||||
configtoThreads.get(key).get().setInterval(Config.getField(key).getLong(null) * 1000L);
|
||||
configtoThreads.get(key).get().interrupt();
|
||||
// shouldn't interrupt to keep possible bdbje writing safe.
|
||||
LOG.info("set config " + key + " to " + value);
|
||||
} catch (IllegalAccessException e) {
|
||||
LOG.warn("set config " + key + " failed: " + e.getMessage());
|
||||
|
||||
@ -50,6 +50,7 @@ import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.common.util.RangeUtils;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.meta.MetaContext;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
@ -88,6 +89,8 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
|
||||
private static final String DEFAULT_RUNTIME_VALUE = FeConstants.null_string;
|
||||
|
||||
private static final long SLEEP_PIECE = 5000L;
|
||||
|
||||
private Map<Long, Map<String, String>> runtimeInfos = Maps.newConcurrentMap();
|
||||
private Set<Pair<Long, Long>> dynamicPartitionTableInfo = Sets.newConcurrentHashSet();
|
||||
private boolean initialize;
|
||||
@ -663,6 +666,48 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
initialize = true;
|
||||
}
|
||||
|
||||
// specialized schedule logic. split sleep to many small pieces. so if interval changed, it won't take too much
|
||||
// time to aware.
|
||||
@Override
|
||||
public void run() {
|
||||
if (metaContext != null) {
|
||||
metaContext.setThreadLocalInfo();
|
||||
}
|
||||
|
||||
while (!isStop.get()) {
|
||||
try {
|
||||
runOneCycle();
|
||||
} catch (Throwable e) {
|
||||
LOG.error("daemon thread got exception. name: {}", getName(), e);
|
||||
}
|
||||
|
||||
try {
|
||||
long oldInterval = intervalMs;
|
||||
long remainingInterval = oldInterval;
|
||||
while (remainingInterval > SLEEP_PIECE) {
|
||||
// if it changed. let it know at most 10 seconds. and 5 second per wakeup is acceptable.
|
||||
if (intervalMs != oldInterval) { // changed
|
||||
break;
|
||||
}
|
||||
|
||||
Thread.sleep(SLEEP_PIECE);
|
||||
remainingInterval -= SLEEP_PIECE;
|
||||
}
|
||||
if (remainingInterval <= SLEEP_PIECE) {
|
||||
Thread.sleep(remainingInterval);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// This thread should NEVER be interrupted. or meet bdbje writing, it will be disaster.
|
||||
LOG.fatal("InterruptedException: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (metaContext != null) {
|
||||
MetaContext.remove();
|
||||
}
|
||||
LOG.error("daemon thread exits. name=" + this.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
if (!initialize) {
|
||||
|
||||
@ -28,12 +28,15 @@ public class Daemon extends Thread {
|
||||
private static final Logger LOG = LogManager.getLogger(Daemon.class);
|
||||
private static final int DEFAULT_INTERVAL_SECONDS = 30; // 30 seconds
|
||||
|
||||
private long intervalMs;
|
||||
private AtomicBoolean isStop;
|
||||
private Runnable runnable;
|
||||
private AtomicBoolean isStart = new AtomicBoolean(false);
|
||||
protected long intervalMs;
|
||||
|
||||
private MetaContext metaContext = null;
|
||||
protected AtomicBoolean isStop;
|
||||
|
||||
protected MetaContext metaContext = null;
|
||||
|
||||
private Runnable runnable;
|
||||
|
||||
private AtomicBoolean isStart = new AtomicBoolean(false);
|
||||
|
||||
{
|
||||
setDaemon(true);
|
||||
|
||||
Reference in New Issue
Block a user