Spark高级玩法 2019-06-30
本文主要研究一下flink DataStream的split操作
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } });
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
@Public public class DataStream<T> { //...... public SplitStream<T> split(OutputSelector<T> outputSelector) { return new SplitStream<>(this, clean(outputSelector)); } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
@PublicEvolving public interface OutputSelector<OUT> extends Serializable { Iterable<String> select(OUT value); }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SplitStream.java
@PublicEvolving public class SplitStream<OUT> extends DataStream<OUT> { protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) { super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector)); } public DataStream<OUT> select(String... outputNames) { return selectOutput(outputNames); } private DataStream<OUT> selectOutput(String[] outputNames) { for (String outName : outputNames) { if (outName == null) { throw new RuntimeException("Selected names must not be null"); } } SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames)); return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform); } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@Internal public class StreamGraphGenerator { //...... private Collection<Integer> transform(StreamTransformation<?> transform) { if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } LOG.debug("Transforming " + transform); if (transform.getMaxParallelism() <= 0) { // if the max parallelism hasn't been set, then first use the job wide max parallelism // from theExecutionConfig. int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism(); if (globalMaxParallelismFromConfig > 0) { transform.setMaxParallelism(globalMaxParallelismFromConfig); } } // call at least once to trigger exceptions about MissingTypeInfo transform.getOutputType(); Collection<Integer> transformedIds; if (transform instanceof OneInputTransformation<?, ?>) { transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform); } else if (transform instanceof TwoInputTransformation<?, ?, ?>) { transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform); } else if (transform instanceof SourceTransformation<?>) { transformedIds = transformSource((SourceTransformation<?>) transform); } else if (transform instanceof SinkTransformation<?>) { transformedIds = transformSink((SinkTransformation<?>) transform); } else if (transform instanceof UnionTransformation<?>) { transformedIds = transformUnion((UnionTransformation<?>) transform); } else if (transform instanceof SplitTransformation<?>) { transformedIds = transformSplit((SplitTransformation<?>) transform); } else if (transform instanceof SelectTransformation<?>) { transformedIds = transformSelect((SelectTransformation<?>) transform); } else if (transform instanceof FeedbackTransformation<?>) { transformedIds = transformFeedback((FeedbackTransformation<?>) transform); } else if (transform instanceof CoFeedbackTransformation<?>) { transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform); } else if (transform instanceof PartitionTransformation<?>) { transformedIds = transformPartition((PartitionTransformation<?>) transform); } else if (transform instanceof SideOutputTransformation<?>) { transformedIds = transformSideOutput((SideOutputTransformation<?>) transform); } else { throw new IllegalStateException("Unknown transformation: " + transform); } // need this check because the iterate transformation adds itself before // transforming the feedback edges if (!alreadyTransformed.containsKey(transform)) { alreadyTransformed.put(transform, transformedIds); } if (transform.getBufferTimeout() >= 0) { streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout()); } if (transform.getUid() != null) { streamGraph.setTransformationUID(transform.getId(), transform.getUid()); } if (transform.getUserProvidedNodeHash() != null) { streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash()); } if (transform.getMinResources() != null && transform.getPreferredResources() != null) { streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources()); } return transformedIds; } private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) { StreamTransformation<T> input = select.getInput(); Collection<Integer> resultIds = transform(input); // the recursive transform might have already transformed this if (alreadyTransformed.containsKey(select)) { return alreadyTransformed.get(select); } List<Integer> virtualResultIds = new ArrayList<>(); for (int inputId : resultIds) { int virtualId = StreamTransformation.getNewNodeId(); streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames()); virtualResultIds.add(virtualId); } return virtualResultIds; } private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) { StreamTransformation<T> input = split.getInput(); Collection<Integer> resultIds = transform(input); // the recursive transform call might have transformed this already if (alreadyTransformed.containsKey(split)) { return alreadyTransformed.get(split); } for (int inputId : resultIds) { streamGraph.addOutputSelector(inputId, split.getOutputSelector()); } return resultIds; } //...... }