周健华 2019-06-28
本文主要研究一下storm的CheckpointSpout
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java
public StormTopology createTopology() { Map<String, Bolt> boltSpecs = new HashMap<>(); Map<String, SpoutSpec> spoutSpecs = new HashMap<>(); maybeAddCheckpointSpout(); for (String boltId : _bolts.keySet()) { IRichBolt bolt = _bolts.get(boltId); bolt = maybeAddCheckpointTupleForwarder(bolt); ComponentCommon common = getComponentCommon(boltId, bolt); try { maybeAddCheckpointInputs(common); boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common)); } catch (RuntimeException wrapperCause) { if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) { throw new IllegalStateException( "Bolt '" + boltId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " + "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " + "should be instantiated within the prepare method of '" + boltId + " at the earliest.", wrapperCause); } throw wrapperCause; } } for (String spoutId : _spouts.keySet()) { IRichSpout spout = _spouts.get(spoutId); ComponentCommon common = getComponentCommon(spoutId, spout); try { spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common)); } catch (RuntimeException wrapperCause) { if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) { throw new IllegalStateException( "Spout '" + spoutId + "' contains a non-serializable field of type " + wrapperCause.getCause().getMessage() + ", " + "which was instantiated prior to topology creation. " + wrapperCause.getCause().getMessage() + " " + "should be instantiated within the prepare method of '" + spoutId + " at the earliest.", wrapperCause); } throw wrapperCause; } } StormTopology stormTopology = new StormTopology(spoutSpecs, boltSpecs, new HashMap<>()); stormTopology.set_worker_hooks(_workerHooks); if (!_componentToSharedMemory.isEmpty()) { stormTopology.set_component_to_shared_memory(_componentToSharedMemory); stormTopology.set_shared_memory(_sharedMemory); } return Utils.addVersions(stormTopology); } /** * If the topology has at least one stateful bolt add a {@link CheckpointSpout} component to the topology. */ private void maybeAddCheckpointSpout() { if (hasStatefulBolt) { setSpout(CHECKPOINT_COMPONENT_ID, new CheckpointSpout(), 1); } } private void maybeAddCheckpointInputs(ComponentCommon common) { if (hasStatefulBolt) { addCheckPointInputs(common); } } /** * If the topology has at least one stateful bolt all the non-stateful bolts are wrapped in {@link CheckpointTupleForwarder} so that the * checkpoint tuples can flow through the topology. */ private IRichBolt maybeAddCheckpointTupleForwarder(IRichBolt bolt) { if (hasStatefulBolt && !(bolt instanceof StatefulBoltExecutor)) { bolt = new CheckpointTupleForwarder(bolt); } return bolt; } /** * For bolts that has incoming streams from spouts (the root bolts), add checkpoint stream from checkpoint spout to its input. For other * bolts, add checkpoint stream from the previous bolt to its input. */ private void addCheckPointInputs(ComponentCommon component) { Set<GlobalStreamId> checkPointInputs = new HashSet<>(); for (GlobalStreamId inputStream : component.get_inputs().keySet()) { String sourceId = inputStream.get_componentId(); if (_spouts.containsKey(sourceId)) { checkPointInputs.add(new GlobalStreamId(CHECKPOINT_COMPONENT_ID, CHECKPOINT_STREAM_ID)); } else { checkPointInputs.add(new GlobalStreamId(sourceId, CHECKPOINT_STREAM_ID)); } } for (GlobalStreamId streamId : checkPointInputs) { component.put_to_inputs(streamId, Grouping.all(new NullStruct())); } }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/spout/CheckpointSpout.java
/** * Emits checkpoint tuples which is used to save the state of the {@link org.apache.storm.topology.IStatefulComponent} across the topology. * If a topology contains Stateful bolts, Checkpoint spouts are automatically added to the topology. There is only one Checkpoint task per * topology. Checkpoint spout stores its internal state in a {@link KeyValueState}. * * @see CheckPointState */ public class CheckpointSpout extends BaseRichSpout { public static final String CHECKPOINT_STREAM_ID = "$checkpoint"; public static final String CHECKPOINT_COMPONENT_ID = "$checkpointspout"; public static final String CHECKPOINT_FIELD_TXID = "txid"; public static final String CHECKPOINT_FIELD_ACTION = "action"; private static final Logger LOG = LoggerFactory.getLogger(CheckpointSpout.class); private static final String TX_STATE_KEY = "__state"; private TopologyContext context; private SpoutOutputCollector collector; private long lastCheckpointTs; private int checkpointInterval; private int sleepInterval; private boolean recoveryStepInProgress; private boolean checkpointStepInProgress; private boolean recovering; private KeyValueState<String, CheckPointState> checkpointState; private CheckPointState curTxState; public static boolean isCheckpoint(Tuple input) { return CHECKPOINT_STREAM_ID.equals(input.getSourceStreamId()); } @Override public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { open(context, collector, loadCheckpointInterval(conf), loadCheckpointState(conf, context)); } // package access for unit test void open(TopologyContext context, SpoutOutputCollector collector, int checkpointInterval, KeyValueState<String, CheckPointState> checkpointState) { this.context = context; this.collector = collector; this.checkpointInterval = checkpointInterval; this.sleepInterval = checkpointInterval / 10; this.checkpointState = checkpointState; this.curTxState = checkpointState.get(TX_STATE_KEY); lastCheckpointTs = 0; recoveryStepInProgress = false; checkpointStepInProgress = false; recovering = true; } @Override public void nextTuple() { if (shouldRecover()) { handleRecovery(); startProgress(); } else if (shouldCheckpoint()) { doCheckpoint(); startProgress(); } else { Utils.sleep(sleepInterval); } } @Override public void ack(Object msgId) { LOG.debug("Got ack with txid {}, current txState {}", msgId, curTxState); if (curTxState.getTxid() == ((Number) msgId).longValue()) { if (recovering) { handleRecoveryAck(); } else { handleCheckpointAck(); } } else { LOG.warn("Ack msgid {}, txState.txid {} mismatch", msgId, curTxState.getTxid()); } resetProgress(); } @Override public void fail(Object msgId) { LOG.debug("Got fail with msgid {}", msgId); if (!recovering) { LOG.debug("Checkpoint failed, will trigger recovery"); recovering = true; } resetProgress(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION)); } private int loadCheckpointInterval(Map<String, Object> topoConf) { int interval = 0; if (topoConf.containsKey(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)) { interval = ((Number) topoConf.get(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL)).intValue(); } // ensure checkpoint interval is not less than a sane low value. interval = Math.max(100, interval); LOG.info("Checkpoint interval is {} millis", interval); return interval; } private boolean shouldCheckpoint() { return !recovering && !checkpointStepInProgress && (curTxState.getState() != COMMITTED || checkpointIntervalElapsed()); } private boolean checkpointIntervalElapsed() { return (System.currentTimeMillis() - lastCheckpointTs) > checkpointInterval; } private void doCheckpoint() { LOG.debug("In checkpoint"); if (curTxState.getState() == COMMITTED) { saveTxState(curTxState.nextState(false)); lastCheckpointTs = System.currentTimeMillis(); } Action action = curTxState.nextAction(false); emit(curTxState.getTxid(), action); } private void emit(long txid, Action action) { LOG.debug("Current state {}, emitting txid {}, action {}", curTxState, txid, action); collector.emit(CHECKPOINT_STREAM_ID, new Values(txid, action), txid); } //...... }
topology.state.checkpoint.interval.ms
)读取checkpoint的时间间隔,defaults.yaml中默认是1000,如果没有指定,则使用100,最低值为100storm-2.0.0/storm-client/src/jvm/org/apache/storm/spout/CheckPointState.java
/** * Get the next state based on this checkpoint state. * * @param recovering if in recovering phase * @return the next checkpoint state based on this state. */ public CheckPointState nextState(boolean recovering) { CheckPointState nextState; switch (state) { case PREPARING: nextState = recovering ? new CheckPointState(txid - 1, COMMITTED) : new CheckPointState(txid, COMMITTING); break; case COMMITTING: nextState = new CheckPointState(txid, COMMITTED); break; case COMMITTED: nextState = recovering ? this : new CheckPointState(txid + 1, PREPARING); break; default: throw new IllegalStateException("Unknown state " + state); } return nextState; } /** * Get the next action to perform based on this checkpoint state. * * @param recovering if in recovering phase * @return the next action to perform based on this state */ public Action nextAction(boolean recovering) { Action action; switch (state) { case PREPARING: action = recovering ? Action.ROLLBACK : Action.PREPARE; break; case COMMITTING: action = Action.COMMIT; break; case COMMITTED: action = recovering ? Action.INITSTATE : Action.PREPARE; break; default: throw new IllegalStateException("Unknown state " + state); } return action; }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/BaseStatefulBoltExecutor.java
public void execute(Tuple input) { if (CheckpointSpout.isCheckpoint(input)) { processCheckpoint(input); } else { handleTuple(input); } } /** * Invokes handleCheckpoint once checkpoint tuple is received on all input checkpoint streams to this component. */ private void processCheckpoint(Tuple input) { CheckPointState.Action action = (CheckPointState.Action) input.getValueByField(CHECKPOINT_FIELD_ACTION); long txid = input.getLongByField(CHECKPOINT_FIELD_TXID); if (shouldProcessTransaction(action, txid)) { LOG.debug("Processing action {}, txid {}", action, txid); try { if (txid >= lastTxid) { handleCheckpoint(input, action, txid); if (action == ROLLBACK) { lastTxid = txid - 1; } else { lastTxid = txid; } } else { LOG.debug("Ignoring old transaction. Action {}, txid {}", action, txid); collector.ack(input); } } catch (Throwable th) { LOG.error("Got error while processing checkpoint tuple", th); collector.fail(input); collector.reportError(th); } } else { LOG.debug("Waiting for action {}, txid {} from all input tasks. checkPointInputTaskCount {}, " + "transactionRequestCount {}", action, txid, checkPointInputTaskCount, transactionRequestCount); collector.ack(input); } } /** * Checks if check points have been received from all tasks across all input streams to this component */ private boolean shouldProcessTransaction(CheckPointState.Action action, long txid) { TransactionRequest request = new TransactionRequest(action, txid); Integer count; if ((count = transactionRequestCount.get(request)) == null) { transactionRequestCount.put(request, 1); count = 1; } else { transactionRequestCount.put(request, ++count); } if (count == checkPointInputTaskCount) { transactionRequestCount.remove(request); return true; } return false; } protected void declareCheckpointStream(OutputFieldsDeclarer declarer) { declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION)); }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/StatefulBoltExecutor.java
public class StatefulBoltExecutor<T extends State> extends BaseStatefulBoltExecutor { //...... protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) { LOG.debug("handleCheckPoint with tuple {}, action {}, txid {}", checkpointTuple, action, txid); if (action == PREPARE) { if (boltInitialized) { bolt.prePrepare(txid); state.prepareCommit(txid); preparedTuples.addAll(collector.ackedTuples()); } else { /* * May be the task restarted in the middle and the state needs be initialized. * Fail fast and trigger recovery. */ LOG.debug("Failing checkpointTuple, PREPARE received when bolt state is not initialized."); collector.fail(checkpointTuple); return; } } else if (action == COMMIT) { bolt.preCommit(txid); state.commit(txid); ack(preparedTuples); } else if (action == ROLLBACK) { bolt.preRollback(); state.rollback(); fail(preparedTuples); fail(collector.ackedTuples()); } else if (action == INITSTATE) { if (!boltInitialized) { bolt.initState((T) state); boltInitialized = true; LOG.debug("{} pending tuples to process", pendingTuples.size()); for (Tuple tuple : pendingTuples) { doExecute(tuple); } pendingTuples.clear(); } else { /* * If a worker crashes, the states of all workers are rolled back and an initState message is sent across * the topology so that crashed workers can initialize their state. * The bolts that have their state already initialized need not be re-initialized. */ LOG.debug("Bolt state is already initialized, ignoring tuple {}, action {}, txid {}", checkpointTuple, action, txid); } } collector.emit(CheckpointSpout.CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action)); collector.delegate.ack(checkpointTuple); } //...... }
storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/CheckpointTupleForwarder.java
/** * Wraps {@link IRichBolt} and forwards checkpoint tuples in a stateful topology. * <p> * When a storm topology contains one or more {@link IStatefulBolt} all non-stateful bolts are wrapped in {@link CheckpointTupleForwarder} * so that the checkpoint tuples can flow through the entire topology DAG. * </p> */ public class CheckpointTupleForwarder extends BaseStatefulBoltExecutor { //...... /** * Forwards the checkpoint tuple downstream. * * @param checkpointTuple the checkpoint tuple * @param action the action (prepare, commit, rollback or initstate) * @param txid the transaction id. */ protected void handleCheckpoint(Tuple checkpointTuple, Action action, long txid) { collector.emit(CHECKPOINT_STREAM_ID, checkpointTuple, new Values(txid, action)); collector.ack(checkpointTuple); } //...... }
IStatefulBolt为bolt提供了存取state的抽象,通过checkpiont机制持久化state并利用ack机制提供at-least once语义
),TopologyBuilder会自动添加CheckpointSpout,对于bolt不是StatefulBoltExecutor类型,则会使用CheckpointTupleForwarder进行包装,这样使得checkpint tuple贯穿整个topology的DAG类似three phase commit protocol
)进行相应处理,然后继续流转checkpoint tuple,并进行acktopology.state.checkpoint.interval.ms,默认1000,即1秒
)值小于Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS(topology.message.timeout.secs,默认30秒
);如果checkpointInterval设置得太大,中间假设worker crash了恢复后的state就不太实时,这样就失去了checkpoint的意义了。