qingmoucsdn 2019-11-17
本文主要研究一下Elasticsearch的TimedRunnable
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java
class TimedRunnable extends AbstractRunnable implements WrappedRunnable { private final Runnable original; private final long creationTimeNanos; private long startTimeNanos; private long finishTimeNanos = -1; TimedRunnable(final Runnable original) { this.original = original; this.creationTimeNanos = System.nanoTime(); } @Override public void doRun() { try { startTimeNanos = System.nanoTime(); original.run(); } finally { finishTimeNanos = System.nanoTime(); } } @Override public void onRejection(final Exception e) { if (original instanceof AbstractRunnable) { ((AbstractRunnable) original).onRejection(e); } } @Override public void onAfter() { if (original instanceof AbstractRunnable) { ((AbstractRunnable) original).onAfter(); } } @Override public void onFailure(final Exception e) { if (original instanceof AbstractRunnable) { ((AbstractRunnable) original).onFailure(e); } } @Override public boolean isForceExecution() { return original instanceof AbstractRunnable && ((AbstractRunnable) original).isForceExecution(); } /** * Return the time since this task was created until it finished running. * If the task is still running or has not yet been run, returns -1. */ long getTotalNanos() { if (finishTimeNanos == -1) { // There must have been an exception thrown, the total time is unknown (-1) return -1; } return Math.max(finishTimeNanos - creationTimeNanos, 1); } /** * Return the time this task spent being run. * If the task is still running or has not yet been run, returns -1. */ long getTotalExecutionNanos() { if (startTimeNanos == -1 || finishTimeNanos == -1) { // There must have been an exception thrown, the total time is unknown (-1) return -1; } return Math.max(finishTimeNanos - startTimeNanos, 1); } @Override public Runnable unwrap() { return original; } }
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java
public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecutor { //...... protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); // A task has been completed, it has left the building. We should now be able to get the // total time as a combination of the time in the queue and time spent running the task. We // only want runnables that did not throw errors though, because they could be fast-failures // that throw off our timings, so only check when t is null. assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue"; final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r); final long taskNanos = timedRunnable.getTotalNanos(); final long totalNanos = totalTaskNanos.addAndGet(taskNanos); final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos(); assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos; executionEWMA.addValue(taskExecutionNanos); if (taskCount.incrementAndGet() == this.tasksPerFrame) { final long endTimeNs = System.nanoTime(); final long totalRuntime = endTimeNs - this.startNs; // Reset the start time for all tasks. At first glance this appears to need to be // volatile, since we are reading from a different thread when it is set, but it // is protected by the taskCount memory barrier. // See: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html startNs = endTimeNs; // Calculate the new desired queue size try { final double lambda = calculateLambda(tasksPerFrame, Math.max(totalNanos, 1L)); final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos); final int oldCapacity = workQueue.capacity(); if (logger.isDebugEnabled()) { final long avgTaskTime = totalNanos / tasksPerFrame; logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " + "[{} tasks/s], optimal queue is [{}], current capacity [{}]", getName(), tasksPerFrame, TimeValue.timeValueNanos(totalRuntime), TimeValue.timeValueNanos(avgTaskTime), TimeValue.timeValueNanos((long)executionEWMA.getAverage()), String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()), desiredQueueSize, oldCapacity); } // Adjust the queue size towards the desired capacity using an adjust of // QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max // values the queue size can have. final int newCapacity = workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize); if (oldCapacity != newCapacity && logger.isDebugEnabled()) { logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", getName(), newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT, oldCapacity, newCapacity); } } catch (ArithmeticException e) { // There was an integer overflow, so just log about it, rather than adjust the queue size logger.warn(() -> new ParameterizedMessage( "failed to calculate optimal queue size for [{}] thread pool, " + "total frame time [{}ns], tasks [{}], task execution time [{}ns]", getName(), totalRuntime, tasksPerFrame, totalNanos), e); } finally { // Finally, decrement the task count and time back to their starting values. We // do this at the end so there is no concurrent adjustments happening. We also // decrement them instead of resetting them back to zero, as resetting them back // to zero causes operations that came in during the adjustment to be uncounted int tasks = taskCount.addAndGet(-this.tasksPerFrame); assert tasks >= 0 : "tasks should never be negative, got: " + tasks; if (tasks >= this.tasksPerFrame) { // Start over, because we can potentially reach a "never adjusting" state, // // consider the following: // - If the frame window is 10, and there are 10 tasks, then an adjustment will begin. (taskCount == 10) // - Prior to the adjustment being done, 15 more tasks come in, the taskCount is now 25 // - Adjustment happens and we decrement the tasks by 10, taskCount is now 15 // - Since taskCount will now be incremented forever, it will never be 10 again, // so there will be no further adjustments logger.debug( "[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", getName()); totalTaskNanos.getAndSet(1); taskCount.getAndSet(0); startNs = System.nanoTime(); } else { // Do a regular adjustment totalTaskNanos.addAndGet(-totalNanos); } } } } //...... }
TimedRunnable继承了AbstractRunnable,同时实现了WrappedRunnable接口;它在doRun方法里头记录了原始Runnable的startTimeNanos及finishTimeNanos;同时提供了getTotalExecutionNanos来返回该task的执行耗时
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。