MayMatrix 2012-08-01
刚把上一个项目的组的框架和技术栈理顺,突然又要转到新的项目组,而且现在已经不是刚入公司的新人了,没有那么多的时间慢慢适应,赶紧上手做事才是王道,在新的项目组的技术栈中,首当其冲的就是SpringBatch,赶紧上手练习之。
SpringBatch是什么?
SpringBatch是一个基于Spring的企业级批处理框架,按照我师父的说法,所有基于Spring的框架都是使用了spring的IoC特性,然后加上自己的一些处理规则。因此,要理解SpringBatch的设计和使用,首先需要理解批处理的机制和特点。
所谓企业批处理就是指在企业级应用中,不需要人工干预,定期读取数据,进行相应的业务处理之后,再进行归档的这类操作。从上面的描述中可以看出,批处理的整个流程可以明显的分为3个阶段:
1、读数据
2、业务处理
3、归档结果数据
另外,从定义中可以发现批处理的一个重要特色就是无需人工干预、定期执行,因此一个批处理框架,需要关注事务的粒度,日志监控,执行方式,资源管理,读数据,处理数据,写数据的解耦等方面。
SpringBatch为我们提供了什么呢?
1、统一的读写接口
2、丰富的任务处理方式、
3、灵活的事务管理及并发处理
4、日志、监控、任务重启与跳过等特性
注意,SpringBatch未提供关于批处理任务调度的功能,因此如何周期性的调用批处理任务需要自己想办法解决,就Java来说,Quartz是一个不错的解决方案,或者写脚本处理之。
SpringBatchFirstDemo
前面讲了很多SpringBatch的特性,接下来就通过一个小例子来看看SpringBatch是如何实现批处理的读数据-》处理数据-》归档结果这一过程的。
首先,搭建项目框架,推荐大家使用Maven或者Gradle结构的项目,不会的,赶紧学学,对于学习新技术省很多时间。一个Spring项目需要依赖的lib(可能有多,大家可以试探性的删掉一些不必要的包)如下:<dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${springframework.core.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>${springframework.core.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${springframework.core.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${springframework.core.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${springframework.core.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${springframework.core.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${springframework.core.version}</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> <version>${spring.batch.version}</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-infrastructure</artifactId> <version>${spring.batch.version}</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <version>${spring.batch.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency>项目构建好以后,首先开始写读取数据的逻辑,Spring Batch针对读、写操作提供了很多实现方式,包括文件,数据库,对于数据库的操作还提供了很多ORM框架(Hibernate,iBatis,JPA)的支持,这儿为了简单,以读文件作为例子,假设我们需要读取一个文件中所有人的信息,大于16岁的需要发信息需要发信息通知它去公安局办理身份证。简化文件如下:
TWer1,15 TWer2,21 TWer3,13 TWer4,16 TWer5,25 TWer6,45 TWer7,16,这儿需要的Spring Batch的读文件功能就是把文件中的每一行都能转化为一个内存对象,其对应的类就是User.java
public class User { String name; int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } }另外,需要在message_job.xml中配置如下内容
<bean id="messageReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="lineMapper" ref="lineMapper"/> <property name="resource" value="/message/user.txt"/> </bean> <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer" ref="lineTokenizer"/> <property name="fieldSetMapper" ref="fieldSetMapper"/> </bean> <bean id="fieldSetMapper" class="com.ning.demo.UserMapper"/> <bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"/>,该配置文件中除了UserMapper外,都是SpringBatch默认提供的。UserMapper.java代码如下:
public class UserMapper implements FieldSetMapper<User> { @Override public User mapFieldSet(FieldSet fieldSet) throws BindException { User user = new User(); user.setName(fieldSet.readString(0)); user.setAge(fieldSet.readInt(1)); return user; } }
这样,文件中的每一行数据都会变成一个User类的instance。
接下来,是处理数据的过程,判断每个user的年龄,如果大于16,就生成一条Message。public class MessageProcessor implements ItemProcessor<User, Message> { @Override public Message process(User item) throws Exception { Message message = null; if (item.getAge() > 16) { message = new Message(); message.setContent(item.getName() + ",Please come to police station!"); } return message; } }该类实现了SpringBatch的ItemProcessor接口, 最后,把处理后得到的所有Message打印到Console上,
public class MessageWriter implements ItemWriter<Message> { @Override public void write(List<? extends Message> items) throws Exception { System.out.println("Results:"); for (Message item : items) { System.out.println(item.getContent()); } } }该类实现了SpringBatch的ItemWriter接口。SpringBatch本身提供了多种Writer实现。 通过上面的几个步骤,把读数据,处理数据,写数据都构造出来了,那么那么是如何串联起来的呢?答案是配置文件,
<batch:job id="messageJob"> <batch:step id="messageStep"> <batch:tasklet> <batch:chunk reader="messageReader" processor="messageProcessor" writer="messageWriter" commit-interval="10" chunk-completion-policy=""> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager"/> </bean> <bean id="messageReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="lineMapper" ref="lineMapper"/> <property name="resource" value="/message/user.txt"/> </bean> <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer" ref="lineTokenizer"/> <property name="fieldSetMapper" ref="fieldSetMapper"/> </bean> <bean id="fieldSetMapper" class="com.ning.demo.UserMapper"/> <bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"/> <bean id="messageProcessor" class="com.ning.demo.MessageProcessor"/> <bean id="messageWriter" class="com.ning.demo.MessageWriter"/> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>Spring Batch 将批处理任务称为一个 Job,同时,Job 下分为多个 Step。Step 是一个独立的、顺序的处理步骤,包含该步骤批处理中需要的所有信息。多个批处理 Step 按照一定的流程组成一个 Job。通过这样的设计方式,我们可以灵活配置 Job 的处理过程。 接下来的问题是如何运行构建好的BatchJob呢?SpringBatch提供了JobLauncher接口用于运行Job,并提供了一个默认的SimpleJobLauncher实现。
public class Main { public static void main(String[] args) { ClassPathXmlApplicationContext c = new ClassPathXmlApplicationContext("message_job.xml"); SimpleJobLauncher launcher = new SimpleJobLauncher(); launcher.setJobRepository((JobRepository) c.getBean("jobRepository")); launcher.setTaskExecutor(new SimpleAsyncTaskExecutor()); try { launcher.run((Job) c.getBean("messageJob"), new JobParameters()); } catch (Exception e) { e.printStackTrace(); } } }运行BatchJob时需要为 JobLauncher 指定一个 JobRepository,该类负责创建一个 JobExecution 对象来执行 Job,其次,需要指定一个任务执行器,我们使用 Spring Batch 提供的 SimpleAsyncTaskExecutor。最后,通过 run 方法来执行指定的 Job。运行结果如下:
Results:
TWer2,Pleasecometopolicestation!
TWer5,Pleasecometopolicestation!
TWer6,Pleasecometopolicestation!