齐天大圣数据候 2019-12-30
Druid is a fast column-oriented distributed data store. http://druid.io/
当启动Druid的服务,会启动一个java进程,比如run_example_server.sh
会启动io.druid.cli.Main example realtime
.
Main的buidler类包含了多种服务组, 比如server服务包括了Druid的大部分组件: 协调,历史,Broker,实时,Overlord等.
12345 | final Injector injector = GuiceInjectors.makeStartupInjector();final Cli<Runnable> cli = builder.build();final Runnable command = cli.parse(args);injector.injectMembers(command); //command已经是实例化好的线程类,直接注入command.run(); |
Guice是个DI框架.客户端使用对象的流程是: 创建Injector,从Injector中获取实例,调用实例的方法. 客户端解析出来的命令是一个Runnable.
CliRealtime继承了ServerRunnable(又继承了GuiceRunnable). 在makeInjector调用的Initialization初始化会添加很多Module.
CliRealtime的getModules()主要是RealtimeModule. 每个节点都要注册自己职责范围内的Modules.
ReailtimeModule绑定了SegmentPublisher,ChatHandlerProvider,RealtimeManager等.
12345678 | JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);binder.bind(new TypeLiteral<List<FireDepartment>>(){}).toProvider(FireDepartmentsProvider.class).in(LazySingleton.class); //①JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);binder.install(new CacheModule());binder.bind(QuerySegmentWalker.class).to(RealtimeManager.class).in(ManageLifecycle.class);binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("realtime")); |
toInstance也绑定的也是一个实例化对象,而没有接口. 比如NodeTypeConfig并不是一个接口,而是一个正常的类.
https://github.com/google/guice/wiki/Injections#on-demand-injection
重要的是RealtimeManager,它的构造函数有三个List,QueryRunnerFactoryConglomerate. 最后一个参数chiefs直接在构造函数中初始化.
前面两个需要通过@Inject注入. 其中①List是泛型类,所以通过上面的TypeLiteral使用FireDepartmentsProvider注入.
123456789101112 | public class RealtimeManager implements QuerySegmentWalker { private final List<FireDepartment> fireDepartments; //① private final QueryRunnerFactoryConglomerate conglomerate; //② private final Map<String, List<FireChief>> chiefs; //key=data source name,value=FireChiefs of all partition of that data source @Inject public RealtimeManager(List<FireDepartment> fireDepartments, QueryRunnerFactoryConglomerate conglomerate) { this.fireDepartments = fireDepartments; this.conglomerate = conglomerate; this.chiefs = Maps.newHashMap(); }} |
对象的注入使用Provider:FireDepartmentsProvider,Provider的get方法返回值会作为List的实现类.
而FireDepartmentsProvider的构造方法需要注入 ObjectMapper 和 RealtimeManagerConfig.其中RealtimeManagerConfig在bind Provider前已经注入.
1234 | public class FireDepartmentsProvider implements Provider<List<FireDepartment>>{ private final List<FireDepartment> fireDepartments = Lists.newArrayList(); public List<FireDepartment> get() { return fireDepartments; }} |
ObjectMapper是jackson的内部类,druid的实现类是DefaultObjectMapper. 绑定ObjectMapper也是在初始化的JacksonModule中.
这里的to使用了注解方式, 因为注解的类型是Json, 所以对应的是jsonMapper())创建的DefaultObjectMapper(这里是一个Provides方法,类似于Provider).
12345678910 | public class JacksonModule implements Module{ public void configure(Binder binder){ binder.bind(ObjectMapper.class).to(Key.get(ObjectMapper.class, Json.class)); } @Provides @LazySingleton @Json public ObjectMapper jsonMapper() { return new DefaultObjectMapper(); }} |
前面JsonConfigProvider绑定的druid.realtime,使用RealtimeManagerConfig,而它只有一个属性@JsonProperty private File specFile
在FireDepartmentsProvider的构造方法中会使用DefaultObjectMapper读取启动进程时druid.realtime.specFile
指定的json文件.
FireDepartment的三个属性字段dataSchema,ioConfig,tuningConfig正好对应了specFile中的json属性. 所以整个流程是:
指定specFile文件,创建DefaultObjectMapper(JacksonModule),DefaultObjectMapper读取JSON文件,构造FireDepartmentsProvider,返回List
123456 | @JsonCreatorpublic FireDepartment( @JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("ioConfig") RealtimeIOConfig ioConfig, @JsonProperty("tuningConfig") RealtimeTuningConfig tuningConfig) |
我们并没有看到②QueryRunnerFactoryConglomerate在这里被注入. 怎么办呢? 进入该接口,查看它比较重要的实现类DefaultQueryRunnerFactoryConglomerate.
然后CMD+单机查看它的Usages,只有StorageNodeModule的configure方法,它也是一个Module,被Usage的方法恰好在Initialization初始化的时候.
123456789101112 | public class StorageNodeModule implements Module{ public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class); JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class); binder.bind(NodeTypeConfig.class).toProvider(Providers.<NodeTypeConfig>of(null)); binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class); binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class); binder.bind(QueryRunnerFactoryConglomerate.class).to(DefaultQueryRunnerFactoryConglomerate.class).in(LazySingleton.class); }} |
对于DefaultQueryRunnerFactoryConglomerate的构造函数也需要注入:Map<Class<? extends Query>, QueryRunnerFactory> factories
同样使用Usage进入QueryRunnerFactory,进入其中一个实现类TimeBoundaryQueryRunnerFactory,在进入其Usage是QueryRunnerFactoryModule
可以看到只要是接口要绑定到某个实现类上, 最后一定是使用Guice的Module来完成的.
TimeBoundaryQueryRunnerFactory的构造函数也依赖了QueryWatcher,正好也在QueryRunnerFactoryModule一并解决了:
其中mappings定义了Druid支持的各种查询类,对应的查询工厂类. MapBinder是Guice中一种支持Map对象的注入(也用到了TypeLiteral).
1234567891011121314151617181920 | public class QueryRunnerFactoryModule extends QueryToolChestModule { private static final Map<Class<? extends Query>, Class<? extends QueryRunnerFactory>> mappings = ImmutableMap.<Class<? extends Query>, Class<? extends QueryRunnerFactory>>builder() .put(TimeseriesQuery.class, TimeseriesQueryRunnerFactory.class) .put(TimeBoundaryQuery.class, TimeBoundaryQueryRunnerFactory.class) .....build(); public void configure(Binder binder) { super.configure(binder); binder.bind(QueryWatcher.class).to(QueryManager.class).in(LazySingleton.class); binder.bind(QueryManager.class).in(LazySingleton.class); final MapBinder<Class<? extends Query>, QueryRunnerFactory> queryFactoryBinder = DruidBinders.queryRunnerFactoryBinder(binder); for (Map.Entry<Class<? extends Query>, Class<? extends QueryRunnerFactory>> entry : mappings.entrySet()) { queryFactoryBinder.addBinding(entry.getKey()).to(entry.getValue()); //注入Map的key和value,对应factories binder.bind(entry.getValue()).in(LazySingleton.class); //最好还要注入一下value } binder.bind(GroupByQueryEngine.class).in(LazySingleton.class); }} |
QueryRunnerFactory具体实现类中是查询的具体实现,这里有XXXQuery,XXXQueryRunner,XXXQueryToolChest,XXXResultValue等.
以run_example_client.sh
为例,它的查询类型是timeBoundary,对应TimeBoundaryQuery.
RealtimeManager构造函数需要的List和QueryRunnerFactoryConglomerate都注入之后,在start方法就可以开工了.
fireDepartments的每个FireDepartment会被构造成FireChief,FireDepartment的DataSchema的DataSource都对应了一个FireChief.
FireChief包括FireDepartment(数据源),Firehose(怎么读取,迭代器,Source),Plumber(Sink).
FireChief线程会initPlumber初始化Plumber, 由Plumber启动作业, initFirehose初始化Firehose连接数据源,最后runFirehose读取数据.
1234567 | public void run() { plumber = initPlumber(); //fireDepartment.findPlumber() 先找到水管(水龙头) plumber.startJob(); //准备工作,接上管子 firehose = initFirehose(); //fireDepartment.connect() (向消防局申请一条)消防带,给消防带接上水龙头 runFirehose(firehose); //开始消防工作,水会从源头数据源不断流出来 plumber.finishJob(); //完成工作,卸掉消防带,关闭水龙头} |
Firehose消防带连接的是水源,当数据不断注入数据源(比如Kafka),则从消防水管会源源不断喷射出水流,喷射出来的就是InputRow.Firehose类似一个迭代器.
12345678910 | private void runFirehose(Firehose firehose) { final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose); while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); lateEvent = plumber.add(inputRow, committerSupplier) == -1; if (indexLimitExceeded || lateEvent) { plumber.persist(committerSupplier.get()); } }} |
Firehose通过spec文件的ioConfig的firehose属性①,获取到FirehoseFactory后,根据dataSchema的parser②得到firehoseParser,从而创建Firehose.
为什么Firehose需要dataSchema,因为输出的数据依赖于输入数据的格式,parser用来如何解析输入源数据.parseSpec会指定输入数据的格式,时间撮和维度字段.
123 | public Firehose connect() throws IOException { return ioConfig.getFirehoseFactory().connect(dataSchema.getParser());} |
以Kafka数据源为例,①firehose的type得到KafkaEightFirehoseFactory. 有三个属性:consumerProps,feed和FirehoseFactory中的type.
1234567891011121314 | "ioConfig" : { "type" : "realtime", "firehose": { //① "type": "kafka-0.8", //对应KafkaEightFirehoseFactory "consumerProps": { "zookeeper.connect": "localhost:2181", ... }, "feed": "wikipedia" }, "plumber": { "type": "realtime" }}, |
DataSchema有四个json属性,它的构造函数参数ObjectMapper是依赖注入进来的. dataSchema.getParser()获得InputRowParser.
parser里面又配置了多个属性,所以在读取spec文件的时候,会将parser的JSON信息转换为Map.
12345678 | @JsonCreatorpublic DataSchema( @JsonProperty("dataSource") String dataSource, @JsonProperty("parser") Map<String, Object> parser, @JsonProperty("metricsSpec") AggregatorFactory[] aggregators, @JsonProperty("granularitySpec") GranularitySpec granularitySpec, @JacksonInject ObjectMapper jsonMapper) |
下面是wikipedia的DataSchema spec文件.
123456789101112131415161718192021222324252627 | "dataSchema" : { "dataSource" : "wikipedia", "parser" : { //② "type" : "string", //对应StringInputRowParser "parseSpec" : { "format" : "json", "timestampSpec" : { "column" : "timestamp", "format" : "auto" }, "dimensionsSpec" : { "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"], "dimensionExclusions" : [], "spatialDimensions" : [] } } }, "metricsSpec" : [{ "type" : "count", "name" : "count" }], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "DAY", "queryGranularity" : "NONE" }} |
如何从JSON转换而来的Map得到InputRowParser,因为格式是固定的,所以在获取到parser后,分别获取timestampSpec和dimensionsSpec. spec是说明书的意思,按照说明书吃药,没错
1234567891011121314 | public InputRowParser getParser(){ final InputRowParser inputRowParser = jsonMapper.convertValue(this.parser, InputRowParser.class); final DimensionsSpec dimensionsSpec = inputRowParser.getParseSpec().getDimensionsSpec(); final TimestampSpec timestampSpec = inputRowParser.getParseSpec().getTimestampSpec(); return inputRowParser.withParseSpec( //进入parseSpec inputRowParser.getParseSpec() //获取parseSpec .withDimensionsSpec( //进入dimensionsSpec dimensionsSpec //获取dimensionsSpec .withDimensionExclusions( //过滤dimensionExclusions Sets.difference(dimensionExclusions, dimSet) ) ) );} |
KafkaEightFirehoseFactory的connect方法会返回匿名的Firehose对象,它的nextRow方法会根据parser解析kafka的输入数据.
读取Kafka数据使用配置的consumerProps和feed,即可确定要连接的zk和topic. 然后创建一个消费者,数据保存在ConsumerIterator中.
12345 | final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));final Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(ImmutableMap.of(feed,1));final List<KafkaStream<byte[], byte[]>> streamList = streams.get(feed);final KafkaStream<byte[], byte[]> stream = streamList.get(0);final ConsumerIterator<byte[], byte[]> iter = stream.iterator(); |
stream就是kafka的消息流. 通过迭代消息流中的message, 使用InputRowParser解析数据, 返回的就是InputRow.
再接上RealtimeManager的runFirehose会调用Firehose的nextRow读取数据, 整个流程就完成了: DataSchema定义-InputRowParser解析-InputRow.
123456789 | return new Firehose() { public boolean hasMore() { return iter.hasNext(); } public InputRow nextRow(){ final byte[] message = iter.next().message(); return theParser.parse(ByteBuffer.wrap(message)); }} |
从Firehose读取的每一行InputRow都会添加到Plumber中.每一行数据都有一个时间撮timestamp.truncatedTime是使用segmentGranularity对时间撮进行截断.
由于每条记录最终都要存在于一个Segment中,而Segment是以Interval指定的时间间隔存储.比如间隔为1h的Segment:20151011-100000~20151011-110000.
sinks保存的是截断的时间撮对应Sink.Sink保存的是这段时间内的所有事件.获取到Sink后往Sink中添加这一行记录. Sink底层使用了IncrementalIndex增量索引.
如果Sink不能再添加新的一行(比如Segment大小达到阈值)或者与达到刷新时间的间隔(IntermediatePersistPeriod,默认10分钟),就会将Sink中的数据进行持久化.
12345678910111213141516171819202122 | public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException { final Sink sink = getSink(row.getTimestampFromEpoch()); final int numRows = sink.add(row); if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) { persist(committerSupplier.get()); } return numRows;}private Sink getSink(long timestamp) { final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); final VersioningPolicy versioningPolicy = config.getVersioningPolicy(); final long truncatedTime = segmentGranularity.truncate(new DateTime(timestamp)).getMillis(); Sink retVal = sinks.get(truncatedTime); if (retVal == null) { final Interval sinkInterval = new Interval(new DateTime(truncatedTime),segmentGranularity.increment(new DateTime(truncatedTime))); retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval)); segmentAnnouncer.announceSegment(retVal.getSegment()); sinks.put(truncatedTime, retVal); sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk<Sink>(retVal)); } return retVal;} |
在初次创建一个Sink的时候,会通过segmentAnnouncer通知生成一个新的Segment. 实际上是通知ZooKeeper创建对应的临时节点.
然后往sinks中添加截断的时间撮和Sink的映射关系,假设后面事件的截断时间撮(比如都在同一个小时内),就直接使用创建好的Sink.
sinkTimeline是Sink的时间线,除了Interval,还有版本信息,分区编号. 比如一个Segment在同一个小时内数据量太大,会分成多个分区.
Sink使用了FireHydrant和IncrementalIndex增量索引. 我们知道Druid存储的并不是原始数据,而是Roll-up后的结果.
在前面getSink第一次创建Sink的时候, 也会顺带创建FireHydrant和OnheapIncrementalIndex(在堆中的增量索引)以及DataSegment!
因为实时数据写入到实时节点,经过索引后,这些数据要能够立即被查询到. 所以经过Roll-up后的数据是放在实时节点的内存中的.
添加一行InputRow会从FireHydrant中获取出OnheapIncrementalIndex,往增量索引中添加一条记录.
在准备了上面的这些数据后, IncrementalIndex调用addToFacts添加facts, OnheapIncrementalIndex的实现会使用构造好的聚合算子,开始聚合操作.
12345678910111213141516171819202122 | public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator> { private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>(); private final ConcurrentNavigableMap<TimeAndDims, Integer> facts = new ConcurrentSkipListMap<>(); protected Integer addToFacts(AggregatorFactory[] metrics, boolean deserializeComplexMetrics, InputRow row, AtomicInteger numEntries, TimeAndDims key, ThreadLocal<InputRow> rowContainer, Supplier<InputRow> rowSupplier){ Aggregator[] aggs = new Aggregator[metrics.length]; for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; aggs[i] = agg.factorize(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)); } rowContainer.set(row); //线程安全操作, 在开始聚合前设置为当前行 for (Aggregator agg : aggs) { synchronized (agg) { agg.aggregate(); //这里调用聚合算子的aggregate会发生聚合操作 } } rowContainer.set(null); //结束当前行的聚合操作后, 设置为空 return numEntries.get(); }} |
InputRow添加到IncrementalIndex, 会加入到增量索引的facts中. facts的TimeAndDims包含了时间撮和维度信息.
经过Roll-up的聚合算子会进行聚合操作,聚合结果也可以通过IncrementalIndex的相关getXXXValue获取.
在persist持久化最开始, 会进行swap操作:创建一个新的FireHydrant,返回旧的FireHydrant.
1234567891011121314151617181920212223242526272829 | public void persist(final Committer committer) { final List<Pair<FireHydrant, Interval>> indexesToPersist = Lists.newArrayList(); for (Sink sink : sinks.values()) { //每个Sink都要进行切换, 旧的保存, 新的存储最新的实时数据 if (sink.swappable()) { indexesToPersist.add(Pair.of(sink.swap(), sink.getInterval())); } } for (Pair<FireHydrant, Interval> pair : indexesToPersist) { persistHydrant(pair.lhs, schema, pair.rhs, metadata)); } committer.run(); resetNextFlush();}protected int persistHydrant(FireHydrant indexToPersist, DataSchema schema, Interval interval, Map<String, Object> metaData){ int numRows = indexToPersist.getIndex().size(); //增量索引中的行数 final IndexSpec indexSpec = config.getIndexSpec(); //索引配置,在tuningConfig中,比如bitmap类型,列压缩格式 final File persistedFile = indexMerger.persist( //合并索引 indexToPersist.getIndex(), new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), metaData, indexSpec ); indexToPersist.swapSegment( //增量索引转换为可查询的索引片 new QueryableIndexSegment(indexToPersist.getSegment().getIdentifier(), indexIO.loadIndex(persistedFile)) ); return numRows; } |
持久化增量索引,IndexMerge.persist会进一步调用merge,创建IncrementalIndexAdapter适配器.因为indexToPersist是OnHeapIncrementalIndex
1234 | return merge( Arrays.<IndexableAdapter>asList(new IncrementalIndexAdapter(dataInterval, index, indexSpec.getBitmapSerdeFactory().getBitmapFactory())), index.getMetricAggs(), outDir, segmentMetadata, indexSpec, progress); |
IndexMerge会对维度和指标合并成mergedDimensions,mergedMetrics,还有每一行的合并函数rowMergerFn. 最后makeIndexFiles创建索引文件.
本文标题:Druid源码解析(1) Guice和Realtime流程
文章作者:任何忧伤,都抵不过世界的美丽
发布时间:2015年12月08日 - 00时00分
最后更新:2019年02月14日 - 21时42分
原始链接:http://github.com/zqhxuyuan/2015/12/08/2015-12-08-Druid-source/
许可协议: "署名-非商用-相同方式共享 3.0" 转载请保留原文链接及作者。