11、源码解析 EurekaServer 集群间注册表同步使机制
EurekaServer集群间注册表同步使用的3层队列任务批处理机制
1、EurekaServer同步任务批处理机制流程图
三个队列
第一个队列,就是纯写入;
private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
第二个队列,是用来根据时间和大小,来拆分队列;
private final Deque processingOrder = new LinkedList<>();
第三个队列,用来放批处理任务 ==》 异步批处理机制
private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();
2、涉及到的类 类图
1、源码入口分析
因为都是由 batchDispatcher.process() 放入一个任务
比如心跳的:
com.netflix.eureka.cluster.PeerEurekaNode#heartbeat 中的代码:
batchingDispatcher.process(taskId(“heartbeat”, info), replicationTask, expiryTime);
1.1 BatchingDispatcher 创建的源码 TaskDispatchers.createBatchingTaskDispatcher
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
batcherName,
config.getMaxElementsInPeerReplicationPool(),
batchSize,
config.getMaxThreadsForPeerReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
int maxBufferSize,
int workloadSize,
int workerCount,
long maxBatchingDelay,
long congestionRetryDelayMs,
long networkFailureRetryMs,
TaskProcessor<T> taskProcessor) {
// 1、创建一个 AcceptorExecutor
final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
);
// 2、创建一个 TaskExecutors
final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
// 3、创建 TaskDispatcher
return new TaskDispatcher<ID, T>() {
// 3.1 所以从这里可以看到 PeerEurekaNode 中执行 batchingTaskDispatcher.process() 的时候其实是由 acceptorExecutor 来执行的
@Override
public void process(ID id, T task, long expiryTime) {
acceptorExecutor.process(id, task, expiryTime);
}
@Override
public void shutdown() {
acceptorExecutor.shutdown();
taskExecutor.shutdown();
}
};
}
1.2 new AcceptorExecutor()
AcceptorExecutor(String id,
int maxBufferSize,
int maxBatchingSize,
long maxBatchingDelay,
long congestionRetryDelayMs,
long networkFailureRetryMs) {
// 1、设置一下 bufferSize、batchingSize、batchingDelay
this.maxBufferSize = maxBufferSize;
this.maxBatchingSize = maxBatchingSize;
this.maxBatchingDelay = maxBatchingDelay;
// 2、设置网络相关的参数
this.trafficShaper = new TrafficShaper(congestionRetryDelayMs, networkFailureRetryMs);
// 3、这个是重点,这个会以 Daemon 的方式运行一个线程
// 具体的逻辑在 AcceptorRunner(实现 Runnable 接口的类) 中实现
ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
this.acceptorThread.setDaemon(true);
this.acceptorThread.start();
// 4、这里就是统计相关的,不用太过关注
final double[] percentiles = {
50.0, 95.0, 99.0, 99.5};
final StatsConfig statsConfig = new StatsConfig.Builder()
.withSampleSize(1000)
.withPercentiles(percentiles)
.withPublishStdDev(true)
.build();
final MonitorConfig config = MonitorConfig.builder(METRIC_REPLICATION_PREFIX + "batchSize").build();
this.batchSizeMetric = new StatsTimer(config, statsConfig);
try {
Monitors.registerObject(id, this);
} catch (Throwable e) {
logger.warn("Cannot register servo monitor for this object", e);
}
}
1.2.1 com.netflix.eureka.util.batcher.AcceptorExecutor#process
// 其实很简单,就是往 acceptorQueue 中添加数据
void process(ID id, T task, long expiryTime) {
acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
acceptedTasks++;
}
1.2.2 new AcceptorRunner(重点看一下这个后台线程在干啥)
class AcceptorRunner implements Runnable {
@Override
public void run() {
long scheduleTime = 0;
// 1、其实就是一个 while(true) 死循环
while (!isShutdown.get()) {
try {
// 2、处理输入队列
drainInputQueues();
// 3、总数 = 处理中的大小
int totalItems = processingOrder.size();
// 4、重置 scheduleTime
long now = System.currentTimeMillis();
if (scheduleTime < now) {
scheduleTime = now + trafficShaper.transmissionDelay();
}
// 5、分配任务
if (scheduleTime <= now) {
assignBatchWork();
assignSingleItemWork();
}
// 6、当没有任务可处理的时候, sleep 一会,避免太过紧密的循环
// If no worker is requesting data or there is a delay injected by the traffic shaper,
// sleep for some time to avoid tight loop.
if (totalItems == processingOrder.size()) {
Thread.sleep(10);
}
} catch (InterruptedException ex) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery AcceptorThread error", e);
}
}
}
private boolean isFull() {
return pendingTasks.size() >= maxBufferSize;
}
private void drainInputQueues() throws InterruptedException {
do {
drainReprocessQueue();
drainAcceptorQueue();
if (!isShutdown.get()) {
// If all queues are empty, block for a while on the acceptor queue
if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
if (taskHolder != null) {
appendTaskHolder(taskHolder);
}
}
}
} while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
}
private void drainAcceptorQueue() {
// 1、如果 acceptorQueue 不为空则返回任务取出放到 pendingTasks 中
while (!acceptorQueue.isEmpty()) {
appendTaskHolder(acceptorQueue.poll());
}
}
private void drainReprocessQueue() {
long now = System.currentTimeMillis();
// 1、reprocessQueue 不为空且 pendingTasks < 10000
while (!reprocessQueue.isEmpty() && !isFull()) {
TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast();
ID id = taskHolder.getId();
// 2、如果过期了,则把 expiredTawsks + 1
if (taskHolder.getExpiryTime() <= now) {
expiredTasks++;
} else if (pendingTasks.containsKey(id)) {
// 3、如果处理中的任务包括该任务,则把 overriddentTasks + 1
overriddenTasks++;
} else {
// 4、添加任务到 pendingTasks、添加到 processingOrder 头部
pendingTasks.put(id, taskHolder);
processingOrder.addFirst(id);
}
}
// 5、如果满了,则把 queueOverflows + reprocessQueue 的大小,并清空 reprocessQueue
if (isFull()) {
queueOverflows += reprocessQueue.size();
reprocessQueue.clear();
}
}
private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
// 1、如果待处理的任务满了,则从 processingOrder 中拿出一个任务,然后从待处理任务中移除。队列溢出的值 +1
if (isFull()) {
pendingTasks.remove(processingOrder.poll());
queueOverflows++;
}
// 2、然后放入待处理任务的 Map 中
TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
// 3、如果没有放入成功,则把其加入 processingOrder 中
if (previousTask == null) {
processingOrder.add(taskHolder.getId());
} else {
// 4、加入成功则把覆盖任务数 +1
overriddenTasks++;
}
}
void assignSingleItemWork() {
// 1、如果 processingOrder 不为空,则使用信号量去控制一次一个
if (!processingOrder.isEmpty()) {
if (singleItemWorkRequests.tryAcquire(1)) {
long now = System.currentTimeMillis();
// 2、如果 processingOrder 不为空
while (!processingOrder.isEmpty()) {
// 2.1 则从中取任务从 pendingTasks 中移除,
ID id = processingOrder.poll();
TaskHolder<ID, T> holder = pendingTasks.remove(id);
// 2.2 然后判断如果过期时间大于现在,则添加到 singleItemWorkQueue 中
if (holder.getExpiryTime() > now) {
singleItemWorkQueue.add(holder);
return;
}
// 2.3 否则过期任务数 +1
expiredTasks++;
}
// 3、释放信号
singleItemWorkRequests.release();
}
}
}
void assignBatchWork() {
// 1、有足够的任务进入
if (hasEnoughTasksForNextBatch()) {
// 2、信号量控制进入
if (batchWorkRequests.tryAcquire(1)) {
long now = System.currentTimeMillis();
// 3、取 processingOrder 和 maxBatchingSize 中的最小值。如果任务过多按照 maxBatchingSize 分隔任务
int len = Math.min(maxBatchingSize, processingOrder.size());
List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
//4、往 holders 中添加数据
while (holders.size() < len && !processingOrder.isEmpty()) {
// 4.1 从 processingOrder 中获取数据
ID id = processingOrder.poll();
// 4.2 从 pendingTasks 中移除
TaskHolder<ID, T> holder = pendingTasks.remove(id);
// 4.3 判断是否过期,不过期加入
if (holder.getExpiryTime() > now) {
holders.add(holder);
} else {
// 4.4 过期了,expiredTasks + 1
expiredTasks++;
}
}
// 5、如果 holders 为空,说明没有任务,则释放信号
if (holders.isEmpty()) {
batchWorkRequests.release();
} else {
// 6、否则记录一下数据,添加到 batchWorkQueue 中
batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
batchWorkQueue.add(holders);
}
}
}
}
private boolean hasEnoughTasksForNextBatch() {
// 1、如果 processingOrder 为空,返回 false
if (processingOrder.isEmpty()) {
return false;
}
// 2、如果 pendingTasks 大小大于 maxBufferSize(10000) 返回 true
if (pendingTasks.size() >= maxBufferSize) {
return true;
}
// 3、从 processingOrder 中取出在 pendingTasks 中获取
TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
// 4、判断如果 delay >= maxBatchingDelay(500) 返回 true
return delay >= maxBatchingDelay;
}
}
}
版权声明:「DDKK.COM 弟弟快看,程序员编程资料站」本站文章,版权归原作者所有