周健华 2019-06-28
本文主要研究一下storm的window trigger
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java
public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) { this.topologyContext = context; List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories(); if (parents.size() != 1) { throw new RuntimeException("Aggregation related operation can only have one parent"); } Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf); this.tridentContext = tridentContext; collector = new FreshCollector(tridentContext); projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields); windowStore = windowStoreFactory.create(stormConf); windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR; windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId); tridentWindowManager = storeTuplesInStore ? new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields) : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector()); tridentWindowManager.prepare(); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
public AbstractTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator, BatchOutputCollector delegateCollector) { this.windowTaskId = windowTaskId; this.windowStore = windowStore; this.aggregator = aggregator; this.delegateCollector = delegateCollector; windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + windowTaskId; windowManager = new WindowManager<>(new TridentWindowLifeCycleListener()); WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy(); EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy(); windowManager.setEvictionPolicy(evictionPolicy); triggerPolicy = windowStrategy.getTriggerPolicy(windowManager, evictionPolicy); windowManager.setTriggerPolicy(triggerPolicy); } public void prepare() { preInitialize(); initialize(); postInitialize(); } private void postInitialize() { // start trigger once the initialization is done. triggerPolicy.start(); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/strategy/SlidingDurationWindowStrategy.java
/** * Returns a {@code TriggerPolicy} which triggers for every configured sliding window duration. * * @param triggerHandler * @param evictionPolicy * @return */ @Override public TriggerPolicy<T> getTriggerPolicy(TriggerHandler triggerHandler, EvictionPolicy<T> evictionPolicy) { return new TimeTriggerPolicy<>(windowConfig.getSlidingLength(), triggerHandler, evictionPolicy); }
storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/TimeTriggerPolicy.java
public void start() { executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS); } private Runnable newTriggerTask() { return new Runnable() { @Override public void run() { // do not process current timestamp since tuples might arrive while the trigger is executing long now = System.currentTimeMillis() - 1; try { /* * set the current timestamp as the reference time for the eviction policy * to evict the events */ if (evictionPolicy != null) { evictionPolicy.setContext(new DefaultEvictionContext(now, null, null, duration)); } handler.onTrigger(); } catch (Throwable th) { LOG.error("handler.onTrigger failed ", th); /* * propagate it so that task gets canceled and the exception * can be retrieved from executorFuture.get() */ throw th; } } }; }
windowConfig.getSlidingLength()
);而run方法是触发handler.onTrigger(),即WindowManager.onTrigger()storm-core-1.2.2-sources.jar!/org/apache/storm/windowing/WindowManager.java
/** * The callback invoked by the trigger policy. */ @Override public boolean onTrigger() { List<Event<T>> windowEvents = null; List<T> expired = null; try { lock.lock(); /* * scan the entire window to handle out of order events in * the case of time based windows. */ windowEvents = scanEvents(true); expired = new ArrayList<>(expiredEvents); expiredEvents.clear(); } finally { lock.unlock(); } List<T> events = new ArrayList<>(); List<T> newEvents = new ArrayList<>(); for (Event<T> event : windowEvents) { events.add(event.get()); if (!prevWindowEvents.contains(event)) { newEvents.add(event.get()); } } prevWindowEvents.clear(); if (!events.isEmpty()) { prevWindowEvents.addAll(windowEvents); LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", events.size()); windowLifecycleListener.onActivation(events, newEvents, expired); } else { LOG.debug("No events in the window, skipping onActivation"); } triggerPolicy.reset(); return !events.isEmpty(); }
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java
/** * Listener to reeive any activation/expiry of windowing events and take further action on them. */ class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> { @Override public void onExpiry(List<T> expiredEvents) { LOG.debug("onExpiry is invoked"); onTuplesExpired(expiredEvents); } @Override public void onActivation(List<T> events, List<T> newEvents, List<T> expired) { LOG.debug("onActivation is invoked with events size: [{}]", events.size()); // trigger occurred, create an aggregation and keep them in store int currentTriggerId = triggerId.incrementAndGet(); execAggregatorAndStoreResult(currentTriggerId, events); } } private void execAggregatorAndStoreResult(int currentTriggerId, List<T> tupleEvents) { List<TridentTuple> resultTuples = getTridentTuples(tupleEvents); // run aggregator to compute the result AccumulatedTuplesCollector collector = new AccumulatedTuplesCollector(delegateCollector); Object state = aggregator.init(currentTriggerId, collector); for (TridentTuple resultTuple : resultTuples) { aggregator.aggregate(state, resultTuple, collector); } aggregator.complete(state, collector); List<List<Object>> resultantAggregatedValue = collector.values; ArrayList<WindowsStore.Entry> entries = Lists.newArrayList(new WindowsStore.Entry(windowTriggerCountId, currentTriggerId + 1), new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(windowTaskId, currentTriggerId), resultantAggregatedValue)); windowStore.putAll(entries); pendingTriggers.add(new TriggerResult(currentTriggerId, resultantAggregatedValue)); }
从这里可以清晰看到Aggregator接口的各个方法的调用逻辑及顺序
)