heyeqingquan 2013-12-09
JobTracker会接受TaskTracker的心跳,并处理。不多说,直接上源码
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId)
1首先检查heartbeat是否来自自己的host列表,否则抛出异常。
如果不再Host列表或者在排除Host列表中,退出心跳处理。
return (inHostsList(status) && !inExcludedHostsList(status));
2判断是否在黑名单、灰名单、默认名单,并从这些名单中删除
黑名单、灰名单主要是Hadoop的容错机制,在此不做过多解释,可以单写一篇文章。
faultyTrackers.markTrackerHealthy(status.getHost());
3根据trackerName获取上一个heartbeatresponse
HeartbeatResponse prevHeartbeatResponse = trackerToHeartbeatResponseMap.get(trackerName);
4如果上一个heartbeat为null,让Tasktracker重新初始化如果是第一个response从recoveryMap中移除
if (prevHeartbeatResponse == null) { // This is the first heartbeat from the old tracker to the newly // started JobTracker if (hasRestarted()) { addRestartInfo = true; // inform the recovery manager about this tracker joining back recoveryManager.unMarkTracker(trackerName); } else { // Jobtracker might have restarted but no recovery is needed // otherwise this code should not be reached LOG.warn("Serious problem, cannot find record of 'previous' " + "heartbeat for '" + trackerName + "'; reinitializing the tasktracker"); return new HeartbeatResponse(responseId, new TaskTrackerAction[] {new ReinitTrackerAction()}); } }
如果重发的responseId,丢弃掉。
if (prevHeartbeatResponse.getResponseId() != responseId) { LOG.info("Ignoring 'duplicate' heartbeat from '" + trackerName + "'; resending the previous 'lost' response"); return prevHeartbeatResponse; }
5处理heartbeat首先updateTaskTrackerStatus如果是被遗忘的tasktracker加入队列中;更新任务状态;更新健康节点状态;
private synchronized boolean processHeartbeat( TaskTrackerStatus trackerStatus, boolean initialContact, long timeStamp) throws UnknownHostException { //主要集中在此不分析那么详细了 }
6检查新Task是否执行,如果没有执行,加入执行队列
if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) { TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName); if (taskTrackerStatus == null) { LOG.warn("Unknown task tracker polling; ignoring: " + trackerName); } else { List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus); if (tasks == null ) { tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName)); } //添加Task if (tasks != null) { for (Task task : tasks) { expireLaunchingTasks.addNewTask(task.getTaskID()); if(LOG.isDebugEnabled()) { LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID()); } actions.add(new LaunchTaskAction(task)); } } } }
7检查Task是否杀死
List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName); if (killTasksList != null) { actions.addAll(killTasksList); }
8检查task是否cleanup
List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName); if (killJobsList != null) { actions.addAll(killJobsList); }
9检查task的output是否可以save
List<TaskTrackerAction> commitTasksList = getTasksToSave(status); if (commitTasksList != null) { actions.addAll(commitTasksList); }
10计算下一次heartbeat的时间间隔
int nextInterval = getNextHeartbeatInterval(); response.setHeartbeatInterval(nextInterval); response.setActions( actions.toArray(new TaskTrackerAction[actions.size()]));
11更新heartbeatMap,并remove掉Marked已经处理掉的heartbeat
// 更新Map trackerToHeartbeatResponseMap.put(trackerName, response); //清除处理完成的心跳 removeMarkedTasks(trackerName);
不对之处欢迎讨论。
=================参考====
hadoop源码。
环境说明:本环境由两台mysql 数据库和heartbeat 组成,一台的ip 为192.168.10.197,一台为192.168.10.198,对外提供服务的vip 为192.168.10.200