Druid源码解析 - overlord的TaskMaster

XGQ 2019-09-06

TaskMaster是Druid overlord服务中最重要的一个类,它封装了索引服务的leadership生命周期。

TaskMaster被LifeCycle类管理启动和停止,LifeCycle启动时会调用TaskMaster的start()方法。TaskMaster在CliOverlord类的getModules方法中被绑定到LifeCycle:

binder.bind(TaskMaster.class).in(ManageLifecycle.class);

在TaskMaster的start()方法中,主要有如下代码:

overlordLeaderSelector.registerListener(leadershipListener);

这里先看一下overlordLeaderSelector这个TaskMaster的成员。overlordLeaderSelector的类型是DruidLeaderSelector,通过TaskMaster的构造函数注入:

@Inject TaskMaster(
  ...
  @IndexingService final DruidLeaderSelector overlordLeaderSelector
)

DiscoveryModule的configure方法中有:

PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, IndexingService.class))
        .addBinding(CURATOR_KEY)
        .toProvider(
            new DruidLeaderSelectorProvider(
                (zkPathsConfig) -> ZKPaths.makePath(zkPathsConfig.getOverlordPath(), "_OVERLORD")
            )
        )
        .in(LazySingleton.class);

DruidLeaderSelectorProvider的get()方法定义如下:

public DruidLeaderSelector get() 
{
  return new CuratorDruidLeaderSelector(
      curatorFramework,
      druidNode,
      latchPathFn.apply(zkPathsConfig)
  )
}

又有CuratorDruidLeaderSelector的构造函数:

public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self, String latchPath)
{
  this.curator = curator;
  this.self = self;
  this.latchPath = latchPath;

  ...
}

可见overlordLeaderSelector的实际类型是CuratorDruidLeaderSelector,并且它的latchPath成员的值为/druid/overlord/_OVERLORD

CuratorDruidLeaderSelector的registerListener方法

listener成员变量赋值

this.listener = listener;

这里传入的listener类型为DruidLeaderSelector.Listener,在TaskMaster中定义,这个下面会详细说到。

listenerExecutor成员变量赋值

this.listenerExecutor = Execs.singleThreaded(StringUtils.format("LeaderSelector[%s]", latchPath))

创建LeaderLatch

createNewLeaderLatchWithListener方法中,首先调用createNewLeaderLatch()方法创建一个LeaderLatch实例:

final LeaderLatch newLeaderLatch = createNewLeaderLatch();
private LeaderLatch createNewLeaderLatch() 
{
  return new LeaderLatch(curator, latchPath, self.getServiceScheme() + "://" + self.getHostAndPortToUse());
}

稍后调用LeaderLatch的start()方法,会在latchPath(这里是/druid/overlord/_OVERLORD)下创建一个EPHEMERAL_SEQUENTIAL类型的znode,这个znode的data就是LeaderLatch构造函数的第三个参数,这里可能是http://localhost:8090

然后对这个LeaderLatch实例调用addListener设置listener,LeaderLatch的listener必须是LeaderLatchListener接口的实现:

public interface LeaderLatchListener {
  void isLeader();
  void notLeader();
}

需要实现isLeader()和notLeader()两个接口,分别在当前服务成为leader以及当前服务失去leader角色时调用。

这里设置的LeaderLatchListener的实现,对于isLeader()方法,主要的逻辑就是调用在前面赋值的DruidLeaderSelector.Listener类型的listener的becomeLeader()方法。后面我会详细分析这个方法。

调用LeaderLatch实例的start()方法

下面就是调用刚刚在createNewLeaderLatchWithListener方法中创建的LeaderLatch(这个LeaderLatch被设置在了成员变量leaderLatch这个AtomicReference中)的start()方法进行leader的选举,并在选举为leader时在LeaderLatchListener的isLeader()方法中调用DruidLeaderSelector.Listener的becomeLeader方法

如何利用LeaderLatch进行leader选举?

利用LeaderLatch进行选举的过程如下:

  1. 在/druid/overlord/_OVERLORD下创建一个PHEMERAL_SEQUENTIAL节点,这种类型的znode会在节点名称后加入sequence_number,比如这里创建的zonde名称可能是_c_ba96d70d-b0f6-4df7-8b62-af078aa3b4c5-latch-0000000006_c_40d06620-8ae7-4b44-987a-f8222b17847e-latch-0000000007等等。
  2. 对LeaderLatch实例调用getChildren()方法,监听/druid/overlord/_OVERLORD节点,如果节点下创建了子节点,也就是上面说的PHEMERAL_SEQUENTIAL节点,则在回调中调用checkLeadership检查当前LeaderLatch所在进程是否为leader。
  3. 调用checkLeadership(event.getChildren())方法检查当前LeaderLatch所在进程或服务是否为leader。这个检查的过程很简单,就是对/druid/overlord/_OVERLORD下的子节点按名称进行排序,排在第一位(id最小)的节点如果是当前LeaderLatch创建的znode,则当前LeaderLatch所在服务成为leader。比如/druid/overlord/_OVERLORD下有两个子节点latch-0000000007和latch-0000000006,那么创建latch-0000000006的LeaderLatch所在的服务或进程就成为leader;如果当前LeaderLatch创建的znode不是排在第一位的,则监听比当前节点排名靠前的第一个节点,在监听的回调中,如果监听的节点被删除了,则重新运行getChildren方法,从而进行重新选举。例如在/druid/overlord/_OVERLORD下有三个孩子节点latch-0000000006,latch-0000000007和latch-0000000008,当前LeaderLatch创建的节点为latch-0000000007,则这个LeaderLatch所在的进程就没有成为leader,这时监听latch-0000000006节点,当latch-0000000006节点被删除则重新运行getChildren方法,这时latch-0000000007节点排在第一位,这时这个LeaderLatch所在的进程就成为了新的leader。

至此对CuratorDruidLeaderSelectorregisterListener方法的调用就结束了。

DruidLeaderSelector.Listener的becomeLeader()方法

leadershipListener是TaskMaster的成员,类型为DruidLeaderSelector.Listener,我们调用CuratorDruidLeaderSelectorregisterListener方法传入的就是leadershipListener这个listener。leadershipListener在TaskMaster的构造函数中被定义为实现DruidLeaderSelector.Listener接口的匿名类。

interface Listener {
  void becomeLeader();
  void stopBeingLeader();
}

在当前overlord服务成为leader时,会调用LeaderLatch注册的listener中的isLeader()方法,进而调用DruidLeaderSelector.Listener的becomeLeader()方法。

在becomeLeader()方法中,首先调用了:

taskLockbox.syncFromStorage();

接着定义了taskRunner:

taskRunner = runnerFactory.build();

然后定义了taskQueue:

taskQueue = new TaskQueue(
    taskQueueConfig,
    taskStorage,
    taskRunner,
    taskActionClientFactory,
    taskLockbox,
    emitter
);

接着声明了一个本地的Lifecycle实例吗,并加入一些被LifeCyle管理的类:

final Lifecycle leaderLifecycle = new Lifecycle("task-master");
...
leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(supervisorManager);
leaderLifecycle.addManagedInstance(overlordHelperManager);

leaderLifecycle.addHandler(
  new Lifecycle.Handler() 
  {
    @Override
    public void start()
    {
      initialized = true;
      serviceAnnouncer.announce(node);
    }
    @Override
    public void stop() { serviceAnnouncer.unannounce(node); }
  }
)

最后调用这个Lifecycle的start()方法启动这个本地Lifecycle:

leaderLifecycle.start();

调用Lifecycle的start()方法,也会顺序调用它所管理的Handler的start方法,具体说来,会依次调用:

  • taskRunner的start()方法;
  • taskQueue的start()方法;
  • supervisorManager的start()方法;
  • overlordHelperManager的start()方法;
  • serviceAnnouncer的announce方法;

下面我们详细来看一下这些方法。

TaskLockbox的syncFromStorage方法

从druid_tasks表中同步status(类型为TaskStatus)为TaskState.RUNNING的taskId到activeTasks中。

暂略

taskRunner的start()方法

初始化TaskRunner

TaskRunner实例的初始化是在DruidLeaderSelector.Listener的becomeLeader方法中进行的:

taskRunner = runnerFactory.build();

taskRunner是TaskMaster的成员变量。

runnerFactory是在实例化TaskMaster时通过Guice注入的TaskRunnerFactory接口类型的实现。由于我们在overlord的配置文件中指定了druid.indexer.runner.type=remote,因此这里Guide实例化的TaskRunnerFactory的实际类型是RemoteTaskRunnerFactory

在overlord的getModules方法中调用了configureRunners方法,在这个方法中通过druid.indexer.runner.type的值来绑定实例化TaskRunnerFactory接口时的实际类型。
在configureRunners方法中,有如下代码:

final MapBinder<String, TaskRunnerFactory> biddy = PolyBind.optionBinder(
  binder,
  Key.get(TaskRunnerFactory.class)
);

biddy.addBinding("local").to(ForkingTaskRunnerFactory.class);
binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class);

biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class);
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);

biddy.addBinding("http").to(HttpRemoteTaskRunnerFactory.class);
binder.bin(HttpRemoteTaskRunnerFactory.class).in(LazySingleton.class);

之后如果调用injector.getInstance(Key.get(TaskRunnerFactory.class))会返回一个类型为Map<String, Provider<TaskRunnerFactory>>的实例。这里的key为什么是Provider<TaskRunnerFactory>而不是TaskRunnerFactory?MapBinder的接口文档中有说明:In addition to binding Map<K, V>, a mapbinder will also bind Map<K, Provider<V>> for lazy value provision

在configureRunners中又有:

PolyBind.createChoice(
  binder,
  "druid.indexer.runner.type",
  Key.get(TaskRunnerFactory.class),
  Key.get(ForkingTaskRunnerFactory.class)
);

这个方法主要是将TaskRunnerFactory绑定到ConfiggedProvider上的实例上。在调用injector.getInstance(TaskRunnerFactory.class)时,会调用ConfiggedProvider的get()方法返回TaskRunnerFactory的实例,在ConfiggedProvider的get()方法中,首先获取"druid.indexer.runner.type"的值(在配置文件中有配置,默认为local,我指定了remote)。然后调用injector.getInstance(Key.get(TaskRunnerFactory.class))返回Map<String, Provider<TaskRunnerFactory>>的实例,然后找到'remote'对应的TaskRunnerFactory,也就是RemoteTaskRunnerFactory并返回。

调用RemoteTaskRunnerFactory的build()方法返回RemoteTaskRunner实例:

return new RemoteTaskRunner(
  jsonMapper,
  remoteTaskRunnerConfig,
  zkPaths,
  curator,
  new PathChildrenCacheFactory.Builder().withCompressed(true),
  httpClient,
  workerConfigRef,
  provisioningSchedulerConfig.isDoAutoScale() ? provisioningStrategy : new NoopProvisioningStrategy<>()
);

这里的zkPaths的类型是IndexerZKConfig,这个类中保存了所有zookeeper中druid相关组件要用到的路径。

调用TaskRunner的start()方法

start()方法主要是在zk的path cache上注册listener监听worker的创建和移除,这里的worker指的是middleManager。

在PathChildrenCache上注册listener

在RemoteTaskRunner的构造函数中初始化了workerPathCache:

this.workerPathCache = pathChildrenCacheFactory.build().make(cf, indexerZKConfig.getAnnouncementsPath());

这里的workerPathCache被绑定在了/druid/indexer/announcements路径上(路径通过调用indexerZKConfig.getAnnouncementsPath方法获取)。

在workerPathCache上加入一个PathChildernCacheListener实例并实现此实例的childEvent方法监听middleManager节点的创建和移除。

workerPathCache.getListenable().addListener(
  new PathChildrenCacheListener()
  {
    public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) 
    {
      final Worker worker
      switch (event.getType()) {
        case CHILD_ADDED:
          ...
        case CHILD_REMOVED:
          ...
        case INITIALIZED:
          ...    
      }
    }
  }
);
CHILD_ADDED

在/druid/indexer/announcements下有新节点加入会出差childEvent方法,并且event类型为CHILD_ADDED。

首先会读取这个event下的json数据并反序列化为Worker对象:

worker = jsonMapper.readValue(
  event.getData().getData(),
  Worker.class
);

Worker中有如下几个成员参数:

String scheme;    // http or https
String host;
String ip;
int capacity;
String version;