chenjieit 2019-06-28
本文主要研究一下storm worker的executor与task
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
public static void main(String[] args) throws Exception { Preconditions.checkArgument(args.length == 5, "Illegal number of arguments. Expected: 5, Actual: " + args.length); String stormId = args[0]; String assignmentId = args[1]; String supervisorPort = args[2]; String portStr = args[3]; String workerId = args[4]; Map<String, Object> conf = ConfigUtils.readStormConfig(); Utils.setupDefaultUncaughtExceptionHandler(); StormCommon.validateDistributedMode(conf); Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(supervisorPort), Integer.parseInt(portStr), workerId); worker.start(); Utils.addShutdownHookWithForceKillIn1Sec(worker::shutdown); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
public void start() throws Exception { LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId, ConfigUtils.maskPasswords(conf)); // because in local mode, its not a separate // process. supervisor will register it in this case // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode. if (!ConfigUtils.isLocalMode(conf)) { // Distributed mode SysOutOverSLF4J.sendSystemOutAndErrToSLF4J(); String pid = Utils.processPid(); FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid))); FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid, Charset.forName("UTF-8")); } final Map<String, Object> topologyConf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId)); ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf); IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext); IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext); StormMetricRegistry.start(conf, DaemonType.WORKER); Credentials initialCredentials = stormClusterState.credentials(topologyId, null); Map<String, String> initCreds = new HashMap<>(); if (initialCredentials != null) { initCreds.putAll(initialCredentials.get_creds()); } autoCreds = ClientAuthUtils.getAutoCredentials(topologyConf); subject = ClientAuthUtils.populateSubject(null, autoCreds, initCreds); Subject.doAs(subject, (PrivilegedExceptionAction<Object>) () -> loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials) ); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
private AtomicReference<List<IRunningExecutor>> executorsAtom; private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState, Map<String, String> initCreds, Credentials initialCredentials) throws Exception { workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorPort, port, workerId, topologyConf, stateStorage, stormClusterState, autoCreds); // Heartbeat here so that worker process dies if this fails // it's important that worker heartbeat to supervisor ASAP so that supervisor knows // that worker is running and moves on doHeartBeat(); executorsAtom = new AtomicReference<>(null); // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout // to the supervisor workerState.heartbeatTimer .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> { try { doHeartBeat(); } catch (IOException e) { throw new RuntimeException(e); } }); workerState.executorHeartbeatTimer .scheduleRecurring(0, (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS), Worker.this::doExecutorHeartbeats); workerState.registerCallbacks(); workerState.refreshConnections(null); workerState.activateWorkerWhenAllConnectionsReady(); workerState.refreshStormActive(null); workerState.runWorkerStartHooks(); List<Executor> execs = new ArrayList<>(); for (List<Long> e : workerState.getLocalExecutors()) { if (ConfigUtils.isLocalMode(topologyConf)) { Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds); execs.add(executor); for (int i = 0; i < executor.getTaskIds().size(); ++i) { workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue()); } } else { Executor executor = Executor.mkExecutor(workerState, e, initCreds); for (int i = 0; i < executor.getTaskIds().size(); ++i) { workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue()); } execs.add(executor); } } List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>(); for (Executor executor : execs) { newExecutors.add(executor.execute()); } executorsAtom.set(newExecutors); //...... setupFlushTupleTimer(topologyConf, newExecutors); setupBackPressureCheckTimer(topologyConf); LOG.info("Worker has topology config {}", ConfigUtils.maskPasswords(topologyConf)); LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port); return this; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
// local executors and localTaskIds running in this worker final Set<List<Long>> localExecutors; public Set<List<Long>> getLocalExecutors() { return localExecutors; } public WorkerState(Map<String, Object> conf, IContext mqContext, String topologyId, String assignmentId, int supervisorPort, int port, String workerId, Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState, Collection<IAutoCredentials> autoCredentials) throws IOException, InvalidTopologyException { this.autoCredentials = autoCredentials; this.conf = conf; this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port)); //...... } private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId, int port) { LOG.info("Reading assignments"); List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>(); executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID); Map<List<Long>, NodeInfo> executorToNodePort = getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port(); for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) { NodeInfo nodeInfo = entry.getValue(); if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) { executorsAssignedToThisWorker.add(entry.getKey()); } } return executorsAssignedToThisWorker; } private Assignment getLocalAssignment(Map<String, Object> conf, IStormClusterState stormClusterState, String topologyId) { if (!ConfigUtils.isLocalMode(conf)) { try (SupervisorClient supervisorClient = SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPort)) { Assignment assignment = supervisorClient.getClient().getLocalAssignmentForStorm(topologyId); return assignment; } catch (Throwable tr1) { //if any error/exception thrown, fetch it from zookeeper return stormClusterState.remoteAssignmentInfo(topologyId, null); } } else { return stormClusterState.remoteAssignmentInfo(topologyId, null); } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
public Assignment remoteAssignmentInfo(String stormId, Runnable callback) { if (callback != null) { assignmentInfoCallback.put(stormId, callback); } byte[] serialized = stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null); return ClusterUtils.maybeDeserialize(serialized, Assignment.class); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
public static final String ZK_SEPERATOR = "/"; public static final String ASSIGNMENTS_ROOT = "assignments"; public static final String ASSIGNMENTS_SUBTREE = ZK_SEPERATOR + ASSIGNMENTS_ROOT; public static String assignmentPath(String id) { return ASSIGNMENTS_SUBTREE + ZK_SEPERATOR + id; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
public static Executor mkExecutor(WorkerState workerState, List<Long> executorId, Map<String, String> credentials) { Executor executor; WorkerTopologyContext workerTopologyContext = workerState.getWorkerTopologyContext(); List<Integer> taskIds = StormCommon.executorIdToTasks(executorId); String componentId = workerTopologyContext.getComponentId(taskIds.get(0)); String type = getExecutorType(workerTopologyContext, componentId); if (ClientStatsUtil.SPOUT.equals(type)) { executor = new SpoutExecutor(workerState, executorId, credentials); } else { executor = new BoltExecutor(workerState, executorId, credentials); } int minId = Integer.MAX_VALUE; Map<Integer, Task> idToTask = new HashMap<>(); for (Integer taskId : taskIds) { minId = Math.min(minId, taskId); try { Task task = new Task(executor, taskId); idToTask.put(taskId, task); } catch (IOException ex) { throw Utils.wrapInRuntime(ex); } } executor.idToTaskBase = minId; executor.idToTask = Utils.convertToArray(idToTask, minId); return executor; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
/** * separated from mkExecutor in order to replace executor transfer in executor data for testing. */ public ExecutorShutdown execute() throws Exception { LOG.info("Loading executor tasks " + componentId + ":" + executorId); String handlerName = componentId + "-executor" + executorId; Utils.SmartThread handler = Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName); LOG.info("Finished loading executor " + componentId + ":" + executorId); return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask, receiveQueue); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/Utils.java
/** * Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds equal to the return value of the previous * call. * * The given afn may be a callable that returns the number of seconds to sleep, or it may be a Callable that returns another Callable * that in turn returns the number of seconds to sleep. In the latter case isFactory. * * @param afn the code to call on each iteration * @param isDaemon whether the new thread should be a daemon thread * @param eh code to call when afn throws an exception * @param priority the new thread's priority * @param isFactory whether afn returns a callable instead of sleep seconds * @param startImmediately whether to start the thread before returning * @param threadName a suffix to be appended to the thread name * @return the newly created thread * * @see Thread */ public static SmartThread asyncLoop(final Callable afn, boolean isDaemon, final Thread.UncaughtExceptionHandler eh, int priority, final boolean isFactory, boolean startImmediately, String threadName) { SmartThread thread = new SmartThread(new Runnable() { public void run() { try { final Callable<Long> fn = isFactory ? (Callable<Long>) afn.call() : afn; while (true) { if (Thread.interrupted()) { throw new InterruptedException(); } final Long s = fn.call(); if (s == null) { // then stop running it break; } if (s > 0) { Time.sleep(s); } } } catch (Throwable t) { if (Utils.exceptionCauseIsInstanceOf( InterruptedException.class, t)) { LOG.info("Async loop interrupted!"); return; } LOG.error("Async loop died!", t); throw new RuntimeException(t); } } }); if (eh != null) { thread.setUncaughtExceptionHandler(eh); } else { thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { LOG.error("Async loop died!", e); Utils.exitProcess(1, "Async loop died!"); } }); } thread.setDaemon(isDaemon); thread.setPriority(priority); if (threadName != null && !threadName.isEmpty()) { thread.setName(thread.getName() + "-" + threadName); } if (startImmediately) { thread.start(); } return thread; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
/** * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q */ public int consume(JCQueue.Consumer consumer) { return consume(consumer, continueRunning); } /** * Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false. Returns number of * elements consumed from Q */ public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) { try { return consumeImpl(consumer, exitCond); } catch (InterruptedException e) { throw new RuntimeException(e); } } /** * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q * * @param consumer * @param exitCond */ private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException { int drainCount = 0; while (exitCond.keepRunning()) { Object tuple = recvQueue.poll(); if (tuple == null) { break; } consumer.accept(tuple); ++drainCount; } int overflowDrainCount = 0; int limit = overflowQ.size(); while (exitCond.keepRunning() && (overflowDrainCount < limit)) { // 2nd cond prevents staying stuck with consuming overflow Object tuple = overflowQ.poll(); ++overflowDrainCount; consumer.accept(tuple); } int total = drainCount + overflowDrainCount; if (total > 0) { consumer.flush(); } return total; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java
public class Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private final TaskMetrics taskMetrics; private Executor executor; private WorkerState workerData; private TopologyContext systemTopologyContext; private TopologyContext userTopologyContext; private WorkerTopologyContext workerTopologyContext; private Integer taskId; private String componentId; private Object taskObject; // Spout/Bolt object private Map<String, Object> topoConf; private BooleanSupplier emitSampler; private CommonStats executorStats; private Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper; private HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> streamToGroupers; private boolean debug; public Task(Executor executor, Integer taskId) throws IOException { this.taskId = taskId; this.executor = executor; this.workerData = executor.getWorkerData(); this.topoConf = executor.getTopoConf(); this.componentId = executor.getComponentId(); this.streamComponentToGrouper = executor.getStreamToComponentToGrouper(); this.streamToGroupers = getGroupersPerStream(streamComponentToGrouper); this.executorStats = executor.getStats(); this.workerTopologyContext = executor.getWorkerTopologyContext(); this.emitSampler = ConfigUtils.mkStatsSampler(topoConf); this.systemTopologyContext = mkTopologyContext(workerData.getSystemTopology()); this.userTopologyContext = mkTopologyContext(workerData.getTopology()); this.taskObject = mkTaskObject(); this.debug = topoConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) topoConf.get(Config.TOPOLOGY_DEBUG); this.addTaskHooks(); this.taskMetrics = new TaskMetrics(this.workerTopologyContext, this.componentId, this.taskId); } //...... }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@Override public void accept(Object event) { AddressedTuple addressedTuple = (AddressedTuple) event; int taskId = addressedTuple.getDest(); TupleImpl tuple = (TupleImpl) addressedTuple.getTuple(); if (isDebug) { LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple); } try { if (taskId != AddressedTuple.BROADCAST_DEST) { tupleActionFn(taskId, tuple); } else { for (Integer t : taskIds) { tupleActionFn(t, tuple); } } } catch (Exception e) { throw new RuntimeException(e); } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
public class ExecutorShutdown implements Shutdownable, IRunningExecutor { private static final Logger LOG = LoggerFactory.getLogger(ExecutorShutdown.class); private final Executor executor; private final List<Utils.SmartThread> threads; private final ArrayList<Task> taskDatas; private final JCQueue receiveQueue; //...... @Override public void credentialsChanged(Credentials credentials) { TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), new Values(credentials), Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID, Constants.CREDENTIALS_CHANGED_STREAM_ID); AddressedTuple addressedTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple); try { executor.getReceiveQueue().publish(addressedTuple); executor.getReceiveQueue().flush(); } catch (InterruptedException e) { throw new RuntimeException(e); } } public void loadChanged(LoadMapping loadMapping) { executor.reflectNewLoadMapping(loadMapping); } @Override public JCQueue getReceiveQueue() { return receiveQueue; } @Override public boolean publishFlushTuple() { return executor.publishFlushTuple(); } @Override public void shutdown() { try { LOG.info("Shutting down executor " + executor.getComponentId() + ":" + executor.getExecutorId()); executor.getReceiveQueue().close(); for (Utils.SmartThread t : threads) { t.interrupt(); } for (Utils.SmartThread t : threads) { LOG.debug("Executor " + executor.getComponentId() + ":" + executor.getExecutorId() + " joining thread " + t.getName()); t.join(); } executor.getStats().cleanupStats(); for (Task task : taskDatas) { if (task == null) { continue; } TopologyContext userContext = task.getUserContext(); for (ITaskHook hook : userContext.getHooks()) { hook.cleanup(); } } executor.getStormClusterState().disconnect(); if (executor.getOpenOrPrepareWasCalled().get()) { for (Task task : taskDatas) { if (task == null) { continue; } Object object = task.getTaskObject(); if (object instanceof ISpout) { ((ISpout) object).close(); } else if (object instanceof IBolt) { ((IBolt) object).cleanup(); } else { LOG.error("unknown component object"); } } } LOG.info("Shut down executor " + executor.getComponentId() + ":" + executor.getExecutorId()); } catch (Exception e) { throw Utils.wrapInRuntime(e); } } }
BoltExecutor.call主要是调用receiveQueue.consume方法;SpoutExecutor.call除了调用receiveQueue.consume方法,还调用了spouts.get(j).nextTuple()
BoltExecutor.tupleActionFn主要是从task获取boltObject,然后调用boltObject.execute(tuple);SpoutExecutor.tupleActionFn主要是从RotatingMap<Long, TupleInfo> pending取出TupleInfo,然后进行成功或失败的ack
rebalance命令只能重新调整worker、executor数量,无法改变task数量
)