《Druid源码解析(1) Guice和Realtime流程》——图较精简,不错

齐天大圣数据候 2019-12-30

 
 
最近两年更新少

Druid源码解析(1) Guice和Realtime流程

Druid is a fast column-oriented distributed data store. http://druid.io/

当启动Druid的服务,会启动一个java进程,比如run_example_server.sh会启动io.druid.cli.Main example realtime.

Guice Inject

Main的buidler类包含了多种服务组, 比如server服务包括了Druid的大部分组件: 协调,历史,Broker,实时,Overlord等.

injectMembers和toInstance注入实例化好的对象

12345
final Injector injector = GuiceInjectors.makeStartupInjector();final Cli<Runnable> cli = builder.build();final Runnable command = cli.parse(args);injector.injectMembers(command);            //command已经是实例化好的线程类,直接注入command.run();

Guice是个DI框架.客户端使用对象的流程是: 创建Injector,从Injector中获取实例,调用实例的方法. 客户端解析出来的命令是一个Runnable.
CliRealtime继承了ServerRunnable(又继承了GuiceRunnable). 在makeInjector调用的Initialization初始化会添加很多Module.

《Druid源码解析(1) Guice和Realtime流程》——图较精简,不错

CliRealtime的getModules()主要是RealtimeModule. 每个节点都要注册自己职责范围内的Modules.
ReailtimeModule绑定了SegmentPublisher,ChatHandlerProvider,RealtimeManager等.

12345678
JsonConfigProvider.bind(binder, "druid.realtime", RealtimeManagerConfig.class);binder.bind(new TypeLiteral<List<FireDepartment>>(){}).toProvider(FireDepartmentsProvider.class).in(LazySingleton.class);   //①JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);binder.install(new CacheModule());binder.bind(QuerySegmentWalker.class).to(RealtimeManager.class).in(ManageLifecycle.class);binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("realtime"));

toInstance也绑定的也是一个实例化对象,而没有接口. 比如NodeTypeConfig并不是一个接口,而是一个正常的类.
https://github.com/google/guice/wiki/Injections#on-demand-injection

Provider的get方法返回值绑定实现类

重要的是RealtimeManager,它的构造函数有三个List,QueryRunnerFactoryConglomerate. 最后一个参数chiefs直接在构造函数中初始化.
前面两个需要通过@Inject注入. 其中①List是泛型类,所以通过上面的TypeLiteral使用FireDepartmentsProvider注入.

123456789101112
public class RealtimeManager implements QuerySegmentWalker {  private final List<FireDepartment> fireDepartments;           //①  private final QueryRunnerFactoryConglomerate conglomerate;    //②  private final Map<String, List<FireChief>> chiefs;  //key=data source name,value=FireChiefs of all partition of that data source  @Inject  public RealtimeManager(List<FireDepartment> fireDepartments, QueryRunnerFactoryConglomerate conglomerate) {    this.fireDepartments = fireDepartments;    this.conglomerate = conglomerate;    this.chiefs = Maps.newHashMap();  }}

对象的注入使用Provider:FireDepartmentsProvider,Provider的get方法返回值会作为List的实现类.
而FireDepartmentsProvider的构造方法需要注入 ObjectMapper 和 RealtimeManagerConfig.其中RealtimeManagerConfig在bind Provider前已经注入.

1234
public class FireDepartmentsProvider implements Provider<List<FireDepartment>>{  private final List<FireDepartment> fireDepartments = Lists.newArrayList();  public List<FireDepartment> get() { return fireDepartments; }}

注解和Provides绑定实现类

ObjectMapper是jackson的内部类,druid的实现类是DefaultObjectMapper. 绑定ObjectMapper也是在初始化的JacksonModule中.
这里的to使用了注解方式, 因为注解的类型是Json, 所以对应的是jsonMapper())创建的DefaultObjectMapper(这里是一个Provides方法,类似于Provider).

12345678910
public class JacksonModule implements Module{  public void configure(Binder binder){    binder.bind(ObjectMapper.class).to(Key.get(ObjectMapper.class, Json.class));  }  @Provides @LazySingleton @Json  public ObjectMapper jsonMapper() {    return new DefaultObjectMapper();  }}

JSON Property

前面JsonConfigProvider绑定的druid.realtime,使用RealtimeManagerConfig,而它只有一个属性@JsonProperty private File specFile
在FireDepartmentsProvider的构造方法中会使用DefaultObjectMapper读取启动进程时druid.realtime.specFile指定的json文件.

FireDepartment的三个属性字段dataSchema,ioConfig,tuningConfig正好对应了specFile中的json属性. 所以整个流程是:
指定specFile文件,创建DefaultObjectMapper(JacksonModule),DefaultObjectMapper读取JSON文件,构造FireDepartmentsProvider,返回List

123456
@JsonCreatorpublic FireDepartment(    @JsonProperty("dataSchema") DataSchema dataSchema,    @JsonProperty("ioConfig") RealtimeIOConfig ioConfig,    @JsonProperty("tuningConfig") RealtimeTuningConfig tuningConfig)

别的Module注入同样可用

我们并没有看到②QueryRunnerFactoryConglomerate在这里被注入. 怎么办呢? 进入该接口,查看它比较重要的实现类DefaultQueryRunnerFactoryConglomerate.
然后CMD+单机查看它的Usages,只有StorageNodeModule的configure方法,它也是一个Module,被Usage的方法恰好在Initialization初始化的时候.

123456789101112
public class StorageNodeModule implements Module{  public void configure(Binder binder) {    JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class);    JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class);    binder.bind(NodeTypeConfig.class).toProvider(Providers.<NodeTypeConfig>of(null));    binder.bind(QueryableIndexFactory.class).to(MMappedQueryableIndexFactory.class).in(LazySingleton.class);    binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);    binder.bind(QueryRunnerFactoryConglomerate.class).to(DefaultQueryRunnerFactoryConglomerate.class).in(LazySingleton.class);  }}

对于DefaultQueryRunnerFactoryConglomerate的构造函数也需要注入:Map<Class<? extends Query>, QueryRunnerFactory> factories
同样使用Usage进入QueryRunnerFactory,进入其中一个实现类TimeBoundaryQueryRunnerFactory,在进入其Usage是QueryRunnerFactoryModule
可以看到只要是接口要绑定到某个实现类上, 最后一定是使用Guice的Module来完成的.

MapBinder注入Map

TimeBoundaryQueryRunnerFactory的构造函数也依赖了QueryWatcher,正好也在QueryRunnerFactoryModule一并解决了:
其中mappings定义了Druid支持的各种查询类,对应的查询工厂类. MapBinder是Guice中一种支持Map对象的注入(也用到了TypeLiteral).

1234567891011121314151617181920
public class QueryRunnerFactoryModule extends QueryToolChestModule {  private static final Map<Class<? extends Query>, Class<? extends QueryRunnerFactory>> mappings =      ImmutableMap.<Class<? extends Query>, Class<? extends QueryRunnerFactory>>builder()                  .put(TimeseriesQuery.class, TimeseriesQueryRunnerFactory.class)                  .put(TimeBoundaryQuery.class, TimeBoundaryQueryRunnerFactory.class)                  .....build();  public void configure(Binder binder) {    super.configure(binder);    binder.bind(QueryWatcher.class).to(QueryManager.class).in(LazySingleton.class);    binder.bind(QueryManager.class).in(LazySingleton.class);    final MapBinder<Class<? extends Query>, QueryRunnerFactory> queryFactoryBinder = DruidBinders.queryRunnerFactoryBinder(binder);    for (Map.Entry<Class<? extends Query>, Class<? extends QueryRunnerFactory>> entry : mappings.entrySet()) {      queryFactoryBinder.addBinding(entry.getKey()).to(entry.getValue());   //注入Map的key和value,对应factories      binder.bind(entry.getValue()).in(LazySingleton.class);                //最好还要注入一下value    }    binder.bind(GroupByQueryEngine.class).in(LazySingleton.class);  }}

QueryRunnerFactory具体实现类中是查询的具体实现,这里有XXXQuery,XXXQueryRunner,XXXQueryToolChest,XXXResultValue等.
run_example_client.sh为例,它的查询类型是timeBoundary,对应TimeBoundaryQuery.

RealtimeManager

RealtimeManager构造函数需要的List和QueryRunnerFactoryConglomerate都注入之后,在start方法就可以开工了.
fireDepartments的每个FireDepartment会被构造成FireChief,FireDepartment的DataSchema的DataSource都对应了一个FireChief.
FireChief包括FireDepartment(数据源),Firehose(怎么读取,迭代器,Source),Plumber(Sink).
FireChief线程会initPlumber初始化Plumber, 由Plumber启动作业, initFirehose初始化Firehose连接数据源,最后runFirehose读取数据.

1234567
public void run() {  plumber = initPlumber();    //fireDepartment.findPlumber() 先找到水管(水龙头)  plumber.startJob();         //准备工作,接上管子  firehose = initFirehose();  //fireDepartment.connect() (向消防局申请一条)消防带,给消防带接上水龙头  runFirehose(firehose);      //开始消防工作,水会从源头数据源不断流出来  plumber.finishJob();        //完成工作,卸掉消防带,关闭水龙头}

《Druid源码解析(1) Guice和Realtime流程》——图较精简,不错

Firehose

Firehose消防带连接的是水源,当数据不断注入数据源(比如Kafka),则从消防水管会源源不断喷射出水流,喷射出来的就是InputRow.Firehose类似一个迭代器.

12345678910
private void runFirehose(Firehose firehose) {  final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);  while (firehose.hasMore()) {    final InputRow inputRow = firehose.nextRow();    lateEvent = plumber.add(inputRow, committerSupplier) == -1;    if (indexLimitExceeded || lateEvent) {      plumber.persist(committerSupplier.get());    }  }}

Firehose通过spec文件的ioConfig的firehose属性①,获取到FirehoseFactory后,根据dataSchema的parser②得到firehoseParser,从而创建Firehose.
为什么Firehose需要dataSchema,因为输出的数据依赖于输入数据的格式,parser用来如何解析输入源数据.parseSpec会指定输入数据的格式,时间撮和维度字段.

123
public Firehose connect() throws IOException {  return ioConfig.getFirehoseFactory().connect(dataSchema.getParser());}

以Kafka数据源为例,①firehose的type得到KafkaEightFirehoseFactory. 有三个属性:consumerProps,feed和FirehoseFactory中的type.

1234567891011121314
"ioConfig" : {  "type" : "realtime",  "firehose": {             //①    "type": "kafka-0.8",    //对应KafkaEightFirehoseFactory    "consumerProps": {      "zookeeper.connect": "localhost:2181",      ...    },    "feed": "wikipedia"  },  "plumber": {    "type": "realtime"  }},

DataSchema

DataSchema有四个json属性,它的构造函数参数ObjectMapper是依赖注入进来的. dataSchema.getParser()获得InputRowParser.
parser里面又配置了多个属性,所以在读取spec文件的时候,会将parser的JSON信息转换为Map.

12345678
@JsonCreatorpublic DataSchema(    @JsonProperty("dataSource") String dataSource,    @JsonProperty("parser") Map<String, Object> parser,    @JsonProperty("metricsSpec") AggregatorFactory[] aggregators,    @JsonProperty("granularitySpec") GranularitySpec granularitySpec,    @JacksonInject ObjectMapper jsonMapper)

下面是wikipedia的DataSchema spec文件.

123456789101112131415161718192021222324252627
"dataSchema" : {  "dataSource" : "wikipedia",  "parser" : {              //②    "type" : "string",      //对应StringInputRowParser    "parseSpec" : {      "format" : "json",      "timestampSpec" : {        "column" : "timestamp",        "format" : "auto"      },      "dimensionsSpec" : {        "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],        "dimensionExclusions" : [],        "spatialDimensions" : []      }    }  },  "metricsSpec" : [{    "type" : "count",    "name" : "count"  }],  "granularitySpec" : {    "type" : "uniform",    "segmentGranularity" : "DAY",    "queryGranularity" : "NONE"  }}

如何从JSON转换而来的Map得到InputRowParser,因为格式是固定的,所以在获取到parser后,分别获取timestampSpec和dimensionsSpec. spec是说明书的意思,按照说明书吃药,没错

1234567891011121314
public InputRowParser getParser(){  final InputRowParser inputRowParser = jsonMapper.convertValue(this.parser, InputRowParser.class);  final DimensionsSpec dimensionsSpec = inputRowParser.getParseSpec().getDimensionsSpec();  final TimestampSpec timestampSpec = inputRowParser.getParseSpec().getTimestampSpec();  return inputRowParser.withParseSpec(        //进入parseSpec      inputRowParser.getParseSpec()           //获取parseSpec            .withDimensionsSpec(              //进入dimensionsSpec                dimensionsSpec                //获取dimensionsSpec                    .withDimensionExclusions( //过滤dimensionExclusions                        Sets.difference(dimensionExclusions, dimSet)                    )            )  );}

KafkaEightFirehoseFactory

KafkaEightFirehoseFactory的connect方法会返回匿名的Firehose对象,它的nextRow方法会根据parser解析kafka的输入数据.
读取Kafka数据使用配置的consumerProps和feed,即可确定要连接的zk和topic. 然后创建一个消费者,数据保存在ConsumerIterator中.

12345
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));final Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(ImmutableMap.of(feed,1));final List<KafkaStream<byte[], byte[]>> streamList = streams.get(feed);final KafkaStream<byte[], byte[]> stream = streamList.get(0);final ConsumerIterator<byte[], byte[]> iter = stream.iterator();

stream就是kafka的消息流. 通过迭代消息流中的message, 使用InputRowParser解析数据, 返回的就是InputRow.
再接上RealtimeManager的runFirehose会调用Firehose的nextRow读取数据, 整个流程就完成了: DataSchema定义-InputRowParser解析-InputRow.

123456789
return new Firehose() {  public boolean hasMore() {    return iter.hasNext();  }  public InputRow nextRow(){    final byte[] message = iter.next().message();    return theParser.parse(ByteBuffer.wrap(message));  }}

《Druid源码解析(1) Guice和Realtime流程》——图较精简,不错

RealtimePlumber

从Firehose读取的每一行InputRow都会添加到Plumber中.每一行数据都有一个时间撮timestamp.truncatedTime是使用segmentGranularity对时间撮进行截断.
由于每条记录最终都要存在于一个Segment中,而Segment是以Interval指定的时间间隔存储.比如间隔为1h的Segment:20151011-100000~20151011-110000.
sinks保存的是截断的时间撮对应Sink.Sink保存的是这段时间内的所有事件.获取到Sink后往Sink中添加这一行记录. Sink底层使用了IncrementalIndex增量索引.
如果Sink不能再添加新的一行(比如Segment大小达到阈值)或者与达到刷新时间的间隔(IntermediatePersistPeriod,默认10分钟),就会将Sink中的数据进行持久化.

12345678910111213141516171819202122
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException {  final Sink sink = getSink(row.getTimestampFromEpoch());  final int numRows = sink.add(row);  if (!sink.canAppendRow() || System.currentTimeMillis() > nextFlush) {    persist(committerSupplier.get());  }  return numRows;}private Sink getSink(long timestamp) {  final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity();  final VersioningPolicy versioningPolicy = config.getVersioningPolicy();  final long truncatedTime = segmentGranularity.truncate(new DateTime(timestamp)).getMillis();  Sink retVal = sinks.get(truncatedTime);  if (retVal == null) {    final Interval sinkInterval = new Interval(new DateTime(truncatedTime),segmentGranularity.increment(new DateTime(truncatedTime)));    retVal = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval));    segmentAnnouncer.announceSegment(retVal.getSegment());    sinks.put(truncatedTime, retVal);    sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), new SingleElementPartitionChunk<Sink>(retVal));  }  return retVal;}

在初次创建一个Sink的时候,会通过segmentAnnouncer通知生成一个新的Segment. 实际上是通知ZooKeeper创建对应的临时节点.
然后往sinks中添加截断的时间撮和Sink的映射关系,假设后面事件的截断时间撮(比如都在同一个小时内),就直接使用创建好的Sink.
sinkTimeline是Sink的时间线,除了Interval,还有版本信息,分区编号. 比如一个Segment在同一个小时内数据量太大,会分成多个分区.

Sink使用了FireHydrant和IncrementalIndex增量索引. 我们知道Druid存储的并不是原始数据,而是Roll-up后的结果.
在前面getSink第一次创建Sink的时候, 也会顺带创建FireHydrant和OnheapIncrementalIndex(在堆中的增量索引)以及DataSegment!
因为实时数据写入到实时节点,经过索引后,这些数据要能够立即被查询到. 所以经过Roll-up后的数据是放在实时节点的内存中的.

《Druid源码解析(1) Guice和Realtime流程》——图较精简,不错

IncrementalIndex

添加一行InputRow会从FireHydrant中获取出OnheapIncrementalIndex,往增量索引中添加一条记录.

  • dimensions: 一行记录的所有列名称,从dataSchema的dimensionsSpec/dimensions指定
  • dimensionValues: 某个dimension的列值, 可以是数组有多个值
  • dimensionOrder: <dimension, order> 每个dimension列名的位置
  • dimValues: DimensionHolder<dimension, DimDim> 通过canonical可以快速判断列值是否存在
  • dims[][]: dims[index]中的index来自于dimensionOrder对应的order顺序, 值=getDimVals(DimDim, dimensionValues)
  • TimeAndDims: 时间撮和dims组成
  • metrics AggregatorFactory[]: 聚合算子的构造工厂,通过工厂类的factorize可以构造出算子
  • aggs AggregatorType[]: metrics指标列对应的每个Aggregator算子

在准备了上面的这些数据后, IncrementalIndex调用addToFacts添加facts, OnheapIncrementalIndex的实现会使用构造好的聚合算子,开始聚合操作.

12345678910111213141516171819202122
public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator> {  private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>();  private final ConcurrentNavigableMap<TimeAndDims, Integer> facts = new ConcurrentSkipListMap<>();  protected Integer addToFacts(AggregatorFactory[] metrics, boolean deserializeComplexMetrics, InputRow row,      AtomicInteger numEntries, TimeAndDims key, ThreadLocal<InputRow> rowContainer, Supplier<InputRow> rowSupplier){      Aggregator[] aggs = new Aggregator[metrics.length];      for (int i = 0; i < metrics.length; i++) {        final AggregatorFactory agg = metrics[i];        aggs[i] = agg.factorize(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics));      }      rowContainer.set(row);        //线程安全操作, 在开始聚合前设置为当前行      for (Aggregator agg : aggs) {        synchronized (agg) {          agg.aggregate();          //这里调用聚合算子的aggregate会发生聚合操作        }      }      rowContainer.set(null);       //结束当前行的聚合操作后, 设置为空      return numEntries.get();        }}

InputRow添加到IncrementalIndex, 会加入到增量索引的facts中. facts的TimeAndDims包含了时间撮和维度信息.
经过Roll-up的聚合算子会进行聚合操作,聚合结果也可以通过IncrementalIndex的相关getXXXValue获取.

persistHydrant

在persist持久化最开始, 会进行swap操作:创建一个新的FireHydrant,返回旧的FireHydrant.

1234567891011121314151617181920212223242526272829
public void persist(final Committer committer) {  final List<Pair<FireHydrant, Interval>> indexesToPersist = Lists.newArrayList();  for (Sink sink : sinks.values()) {    //每个Sink都要进行切换, 旧的保存, 新的存储最新的实时数据    if (sink.swappable()) {                 indexesToPersist.add(Pair.of(sink.swap(), sink.getInterval()));    }  }  for (Pair<FireHydrant, Interval> pair : indexesToPersist) {    persistHydrant(pair.lhs, schema, pair.rhs, metadata));  }  committer.run();  resetNextFlush();}protected int persistHydrant(FireHydrant indexToPersist, DataSchema schema, Interval interval, Map<String, Object> metaData){  int numRows = indexToPersist.getIndex().size();     //增量索引中的行数  final IndexSpec indexSpec = config.getIndexSpec();  //索引配置,在tuningConfig中,比如bitmap类型,列压缩格式  final File persistedFile = indexMerger.persist(     //合并索引      indexToPersist.getIndex(),      new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),      metaData,      indexSpec  );  indexToPersist.swapSegment(                         //增量索引转换为可查询的索引片      new QueryableIndexSegment(indexToPersist.getSegment().getIdentifier(), indexIO.loadIndex(persistedFile))  );  return numRows;    }

持久化增量索引,IndexMerge.persist会进一步调用merge,创建IncrementalIndexAdapter适配器.因为indexToPersist是OnHeapIncrementalIndex

1234
return merge(    Arrays.<IndexableAdapter>asList(new IncrementalIndexAdapter(dataInterval, index, indexSpec.getBitmapSerdeFactory().getBitmapFactory())),    index.getMetricAggs(), outDir, segmentMetadata, indexSpec, progress);

IndexMerge会对维度和指标合并成mergedDimensions,mergedMetrics,还有每一行的合并函数rowMergerFn. 最后makeIndexFiles创建索引文件.

本文标题:Druid源码解析(1) Guice和Realtime流程

文章作者:任何忧伤,都抵不过世界的美丽

发布时间:2015年12月08日 - 00时00分

最后更新:2019年02月14日 - 21时42分

原始链接:http://github.com/zqhxuyuan/2015/12/08/2015-12-08-Druid-source/ 

许可协议: "署名-非商用-相同方式共享 3.0" 转载请保留原文链接及作者。



招人广告:对蚂蚁金服中间件感兴趣的可以发邮件到:qihuang.zqh at antfin.com
 
 
 
 
© 2019 任何忧伤,都抵不过世界的美丽
Hexo Theme Yelee by MOxFIVE

相关推荐