基于数据库锁实现springtask的集群

frank0 2017-06-12

目前项目使用大量spring-task,spring-task有个足的地方是缺失对集群的支持。quartz可以支持定时任务集群,我们项目没有用,所以就自己实现了。我们设计的定时任务有三类。1、节点间不允许并发,2、节点间允许并发,节点内不允许并发,3.节点间允许并发,节点内允许多线程并发。

首先实现任务接口

public interface Task {

	/**
	 * 定时任务被调用入口,此方法中异常应捕获,不应往外面抛出
	 */
    public void excut();
	
    /**
     * 定时业务任务实现方法
     * @throws Exception
     */
	public void doExcut() throws Exception;
	
	
	/**
	 * 是否允许多节点并发运行
	 * @return
	 */
	public boolean isConcurrent();
	
	
	/**
	 * 是否是单节中配置了多线程执行任务
	 * @return
	 */
	public boolean isMulitiThread();
	
	
	/**
	 * 设置任务数据分片信息,当多节点并发取数时,需通过此方法设置分片信息
	 * @param dataSliceInfo  每个节点的取数分片信息
	 */
	public void setDataSliceInfo(DataSliceInfo dataSliceInfo);
	
	/**
	 * 获取单节点下,任务的并发数目 。当节点下任务为多线程并发时,返回此值
	 * @return
	 */
	public int getLocalMulitiThreadNum();
}

创建一个任务基类

public abstract class ClusterBaseTask implements Task {

	protected Logger logger = Logger.getLogger(getClass());
	
	
	@Autowired
	ClusterTaskExcutor excutor;
	
	/**
	 * 暴露在外的方法
	 * @throws Exception 
	 */
	@Override
	public void excut() {
		try {
			excutor.excute(this);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	
	@Override
	public void setDataSliceInfo(DataSliceInfo dataSliceInfo) {

	}

	@Override
	public int getLocalMulitiThreadNum() {
		return 0;
	}

	
	

}

创建一个ClusterTaskExcutor进行任务控制

public interface ClusterTaskExcutor {

	
	void excute(Task task) throws Exception;
	
}

实现类

@Service
public class ClusterTaskExcutorImpl implements ClusterTaskExcutor{

	protected Logger logger = Logger.getLogger(getClass());
	
	@Autowired
    TaskLockerDAO taskLockerDAO;
	
	@Autowired
	TaskRuntimeInfoService taskRuntimeInfoService;
	
	//spring中具有线程池管理的调度
	@Autowired
	SchedulingTaskExecutor schedulingTaskExecutor;
	
	
	@Autowired
	HeartbeatService  heartbeatService;
	
	
	@Autowired
	private DataSourceTransactionManager txManager;
	
	

	@Override
	public void excute(final Task task) throws Exception {
		
		logger.info("ClusterTaskExcutorImpl begin ");
		
		if(task.isConcurrent()){//节点间可以并发执行
			
			if(task.isMulitiThread()){//一个节点下多线程执行任务
				doConcurrent(task);
			}else{//一个节点下单线程执行
				task.doExcut();
			}
			
		}else{//节点间互斥执行
	
			doSync(task);
		}
		
		logger.info("ClusterTaskExcutorImpl end ");
		
	}


	private void doConcurrent(final Task task) throws Exception {
		
	  
		DataSliceInfo dataSliceInfo=getDataSliceInfo();
		
		if(dataSliceInfo!=null){//获取到分片信息
			task.setDataSliceInfo(dataSliceInfo);
			
			int size=task.getLocalMulitiThreadNum();
			
			List<Future<?>> futures = new ArrayList<Future<?>>(size);
			
			for (int i = 0; i < size; i++) {
				
				RunableTask runableTask=new RunableTask(task);
				Future<?> f=schedulingTaskExecutor.submit(runableTask);
				futures.add(f);
			}
			
			//主线程等待,让所有线程执行完成后,主线程才执行
			for (Future<?> f : futures) {
				if (!f.isDone()) {
					try {
						f.get();
					} catch (CancellationException ignore) {
					} catch (ExecutionException ignore) {
					}
				}
			}
		}else{//未获取到分片信息
			logger.error("未获取到分片信息,任务退出执行");
		}
		
		
		
	}


	/**
	 * 获取本机的分片信息
	 * @return
	 * @throws UnknownHostException
	 * @throws Exception
	 */
	private DataSliceInfo getDataSliceInfo() {
		String nodeIp=null ;
		try {
			DataSliceInfo dataSliceInfo=new DataSliceInfo();
			List<String> ipList=heartbeatService.getAliveHostList();
			nodeIp= InetAddress.getLocalHost().getHostAddress();
			int index=MapUtil.getHashIndex(nodeIp, ipList);
			int size=ipList.size();
			dataSliceInfo.setIndex(index);
			dataSliceInfo.setSize(size);
			return dataSliceInfo;
		} catch (UnknownHostException e) {
			logger.error("getDataSliceInfo错误", e);
			return null;
		} catch (BusinessServiceException e) {
			logger.error("getDataSliceInfo错误;ip:"+nodeIp+"未激活");
			return null;
		}
	}


	/**
	 * 方法同步执行
	 * @param task
	 * @throws InterruptedException
	 */
	private void doSync(final Task task) throws InterruptedException {
		//手动提交事务
		DefaultTransactionDefinition def = new DefaultTransactionDefinition();
		def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 事物隔离级别,开启新事务
		TransactionStatus txStatus = txManager.getTransaction(def); // 获得事务状态
		
		try {
			String taskName=task.getClass().getName();
			if(taskLockerDAO.lockTask(taskName)){//获取任务锁
				
				final TaskRuntimeInfoDTO runtimeInfoDTO = getTaskRunTimeInfo(taskName);
				
				
				Thread thread=new Thread(new Runnable() {
					
					@Override
					public void run() {
						
						taskRuntimeInfoService.beginTask(runtimeInfoDTO);
						try {
							task.doExcut();
						} catch (Exception e) {
							logger.error(e.getMessage(), e);
						}finally{
							taskRuntimeInfoService.endTask(runtimeInfoDTO);
						}
					}
				});
				thread.start();
				thread.join();
			}else{
				logger.warn("任务:["+taskName+"],已经在执行,此线程退出执行");
			}
			
		} finally{
			//操作完成后手动提交事务
			txManager.commit(txStatus);
		}
	}


	private TaskRuntimeInfoDTO getTaskRunTimeInfo(String taskName)
			 {
		
		final TaskRuntimeInfoDTO   runtimeInfoDTO=new TaskRuntimeInfoDTO();
		runtimeInfoDTO.setcTaskName(taskName);
		String nodeIp;
		try {
			nodeIp = InetAddress.getLocalHost().getHostAddress();
		} catch (UnknownHostException e) {
			nodeIp="UnknownHost";
			logger.error("获取IP地址失败", e);
		}
		runtimeInfoDTO.setcRunNodeIp(nodeIp);
		runtimeInfoDTO.setcUpdCde(nodeIp);
		return runtimeInfoDTO;
	}

}

数据库表设计

create table TASK_LOCKER
(
  c_task_name VARCHAR2(256) not null
);
-- Add comments to the table 
comment on table TASK_LOCKER
  is '定时任务locker';
-- Add comments to the columns 
comment on column TASK_LOCKER.c_task_name
  is '任务名称';
-- Create/Recreate primary, unique and foreign key constraints 
alter table TASK_LOCKER
  add constraint TASK_LOCKER_PK_ID primary key (C_TASK_NAME);


-- Create table
create table TASK_RUNTIME_INFO
(
  c_task_name   VARCHAR2(256) not null,
  c_is_run      VARCHAR2(1) not null,
  c_run_node_ip VARCHAR2(256),
  c_crt_cde     VARCHAR2(128) not null,
  t_crt_date    DATE not null,
  c_upd_cde     VARCHAR2(128) not null,
  t_upd_date    DATE not null
);
-- Add comments to the table 
comment on table TASK_RUNTIME_INFO
  is '定时任务运行情况表';
-- Add comments to the columns 
comment on column TASK_RUNTIME_INFO.c_task_name
  is '任务名称';
comment on column TASK_RUNTIME_INFO.c_is_run
  is '是否正在运行';
comment on column TASK_RUNTIME_INFO.c_run_node_ip
  is '运行此任务的节点ip,此任务正在运行时有此值';
comment on column TASK_RUNTIME_INFO.c_crt_cde
  is '创建者';
comment on column TASK_RUNTIME_INFO.t_crt_date
  is '创建时间';
comment on column TASK_RUNTIME_INFO.c_upd_cde
  is '更新者';
comment on column TASK_RUNTIME_INFO.t_upd_date
  is '更新时间';
-- Create/Recreate primary, unique and foreign key constraints 
alter table TASK_RUNTIME_INFO
  add constraint TASK_RUNTIME_INFO_PK_ID primary key (C_TASK_NAME);

获取数据库sql

SELECT 1 FROM  task_locker t WHERE t.c_task_name=#taskName# FOR UPDATE NOWAIT

1、节点间不允许并发定时任务

@Service
public class ClusterTaskMock extends ClusterBaseTask implements TaskMock {

	
	
	@Override
	public void doExcut() throws Exception {
		
		logger.info("doExcut() begin");
		
		Thread.currentThread().sleep(60*1000);
		
		logger.info("doExcut() end");
		
	}

	@Override
	public boolean isConcurrent() {
		return false;
	}

	@Override
	public boolean isMulitiThread() {
		return false;
	}

	
}

2、节点间可以并发定时任务

@Service
public class ConcurrentClusterTaskMock extends ClusterBaseTask implements ConcurrentTaskMock {

	
	
	@Override
	public void doExcut() throws Exception {
		
		logger.info("doExcut() begin");
		
		System.out.println(Thread.currentThread().isDaemon());
		Thread.currentThread().sleep(60*1000);
		
		logger.info("doExcut() end");
		
	}

	@Override
	public boolean isConcurrent() {
		return   true;
	}

	@Override
	public boolean isMulitiThread() {
		return true;
	}

	@Override
	public void setDataSliceInfo(DataSliceInfo dataSliceInfo) {
		
	}

	@Override
	public int getLocalMulitiThreadNum() {
		
		return 5;
	}

DataSliceInfo是多线程执行任务时,我们采用对任务id取模的方法分配每个线程所需要执行的任务,避免了在不加锁情况下,多个线程取到相同的任务

public class DataSliceInfo {

	/**
	 * 总分片数
	 */
	private Integer size;
	
	/**
	 * 分片下标
	 */
	private Integer index;
	

	public Integer getSize() {
		return size;
	}

	public void setSize(Integer size) {
		this.size = size;
	}

	public Integer getIndex() {
		return index;
	}

	public void setIndex(Integer index) {
		this.index = index;
	}
	
	
	
	
}

相关推荐