kunzai 2016-08-19
最近开发一个数据同步的小功能,需要从A主机的Oracle数据库中把数据同步到B主机的Oracle库中。当然能够用dmp脚本或者SQL脚本是最好,但是对于两边异构的表结构来说,直接导入不可行。然后在需要实时同步的情况下用存储过程也不可行了。写一个数据同步的小程序是个不错的选择。使用框架的封装和连接池是必须的,Spring是首选,这里我们同样需要Spring的多数据源连接配置方式。 其实再进行项目开发的时候,一个项目有可能不止用到一个数据源,为了提高数据库的水平伸缩性,需要对多个数据库实例进行管理,需要配置多数据源。(4种数据库,不同的URL)
1. 配置多个数据源
这里以两个c3p0数据库连接池的数据源作为实例。在Spring框架下使用c3p0的数据库需要加入c3p0-0.9.1.2.jar(现在最新的)这个支持包。这里以数据同步项目为例:
数据来源库的连接池数据源配置:
public class DynamicDataSource extends AbstractRoutingDataSource{ @Override protected Object determineCurrentLookupKey() { return DBContextHolder.getDBType(); } }
上下文DbContextHolder为一线程安全的ThreadLocal,具体代码如下:
public class DBContextHolder{ public static final String DATA_SOURCE_FROM = "dataSourceFrom"; public static final String DATA_SOURCE_TO = "dataSourceTo"; private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>(); public static void setDBType(String dbType) { contextHolder.set(dbType); } public static String getDBType() { return contextHolder.get(); } public static void clearDBType() { contextHolder.remove(); } }
3.配置动态数据源
将DynamicDataSource Bean加入到Spring的上下文xml配置文件中去,同时配置DynamicDataSource的targetDataSources(多数据源目标)属性的Map映射。
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); BaseDAO dao = (BaseDAO) context.getBean("sqlBaseDAO", BaseDAOImpl.class); try { DBContextHolder.setCustomerType(DBContextHolder.DATA_SOURCE_FROM); System.err.println(dao.select("select count(*) sum from TEST t ").get(0).get("SUM")); DBContextHolder.setCustomerType(DBContextHolder.DATA_SOURCE_TO); System.err.println(dao.select("select count(*) sum from TEST t ").get(0).get("SUM")); } catch (Exception e) { e.printStackTrace(); }
也可以采用AOP的控制方式:
@Aspect public class DynamicDataSourceAspect { @Pointcut("execution (public service.impl..*.*(..))") public void serviceExecution(){} @Before("serviceExecution()") public void setDynamicDataSource(JoinPoint jp) { for(Object o : jp.getArgs()) { //处理具体的逻辑 ,根据具体的境况CustomerContextHolder.setCustomerType()选取DataSource } } }
7.总结
通过扩展Spring的AbstractRoutingDataSource可以很好的实现多数据源的rout效果,而且对扩展更多的数据源有良好的伸缩性,只要增加数据源和修改DynamicDataSource的targetDataSources属性配置就好。在数据源选择控制上,可以采用手动控制(业务逻辑并不多的时候),也可以很好的用AOP的@Aspect在Service的入口加入一个切面@Pointcut,在@Before里判断JoinPoint的类容选定特定的数据源。
以上是参考别人的。下面贴自己的代码(主要是INSERT 后面VALUES值是动态的拼接,表名和字段都是动态的,网上没搜到,自己写的,还没试,逻辑上是这样的)
/** * @author wj * @date 2016-8-19 * 切换不同的数据源,同步远程最新数据到本地 * @param tableName 表名 * @param stcd 字段 * @param dbType 数据库类型 */ private void syncOne(String tableName,String stcd,String dbType){ if("oracle".equals(dbType)){ //如果远程数据库是ORACLE DBContextHolder.setDBType(DBContextHolder.dataSourceTo); String hql = "select MAX( TM ) from "+ tableName +" WHERE STCD = '"+stcd+"'"; List r = dao.find(hql); Date latestTime = (Date)r.get(0); DBContextHolder.setDBType(DBContextHolder.dataSourceOracle); //切换到远程ORACLE数据库,查询远程最新数据 String remoteTableName = tableName.replace("YZSQ", "ST"); String gtTimeResultSql = "select * from "+remoteTableName+" where STCD= '"+ stcd +"' and tm> :tm" ; Map<String,Object> params = Maps.newHashMap(); params.put("tm", latestTime); List<Map> gtTimeResult = dao.findBySql(gtTimeResultSql, params); DBContextHolder.setDBType(DBContextHolder.dataSourceTo); //切换到本地目标数据库,插入数据到本地 for(Map map: gtTimeResult){ StringBuilder inserSql = new StringBuilder(" insert into "+tableName+" values ("); int i=0; for (Object key : map.keySet()) { i++; System.out.println("key= "+ String.valueOf(key) + " and value= " + map.get(key)); Object value = map.get(key); if(i< map.keySet().size()){ if(value instanceof String){ inserSql.append("'"+(String)value+"',"); }else if(value instanceof Date){ String pattern = "yyyy-MM-dd HH24:mi:ss"; Date converted = (Date)value; inserSql.append("to_date('"+DateUtil.dateToString(converted, pattern) +"', '"+pattern+"'),"); }else if(value instanceof Integer){ inserSql.append((Integer)value+","); }else { inserSql.append(value+","); } }else{ if(value instanceof String){ inserSql.append("'"+(String)value+"')"); }else if(value instanceof Date){ String pattern = "yyyy-MM-dd HH24:mi:ss"; Date converted = (Date)value; inserSql.append("to_date('"+DateUtil.dateToString(converted, pattern) +"', '"+pattern+"') )"); }else if(value instanceof Integer){ inserSql.append((Integer)value+")"); }else { inserSql.append(value+")"); } } dao.executeSql(inserSql.toString()); } } //String inserSql = " insert into "+tableName+" values "+ }else if("mysql".equals(dbType)){//如果远程数据库是mysql } }
(或者,用存储过程代替,但是sqlserver的openquery只能返回一条记录,好像ODBC换成OLEDB可行,没试)
USE [SQB] GO /****** Object: StoredProcedure [dbo].[PROC_SYNC_ONE_LATEST_DATA] Script Date: 08/19/2016 18:06:32 ******/ SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO -- ============================================= -- Author: <Author,,Name> -- Create date: <Create Date,,> -- Description: 查询各个表的某闸的最大时间。远程表里,大于这个时间的结果返回过来。在本地表插入返回记录。-插入完成。视图再查就行 -- ============================================= ALTER PROCEDURE [dbo].[PROC_SYNC_ONE_LATEST_DATA] ---单表单闸的 @tableName varchar(50), @STCD varchar(50), @dbType varchar(50) AS BEGIN DECLARE @latestTime datetime DECLARE @remoteTableName varchar(50) DECLARE @sql nvarchar(1000); DECLARE @insertSql nvarchar(1000); --远程表copy到本地 IF (@dbType='oracle') BEGIN SET @sql='select @latestTime=MAX( TM ) from '+ @tableName +' WHERE STCD = '''+@STCD+''''; exec sp_executesql @sql,N'@latestTime datetime output',@latestTime out --本表最近时间 SET @remoteTableName = REPLACE (@tableName,'YZSQ','ST') SET @insertSql=' insert into '+@tableName+' select * from '+ ' openquery(yzsq,''select * from '+@remoteTableName+' where STCD= '''+ @STCD +''' and tm>'+@latestTime+''')' exec(@insertSql) END ELSE IF(@dbType='mysql') BEGIN SET @sql='select @latestTime=MAX( time ) from '+ @tableName ; exec sp_executesql @sql,N'@latestTime datetime output',@latestTime out --本表最近时间 SET @insertSql=' insert into '+@tableName +'select * from '+ ' openquery(mysql,select * from '+@tableName+' where time>'+@latestTime exec(@insertSql) END ELSE --sqlserver BEGIN SET @sql='select @latestTime=MAX( time ) from '+ @tableName ; exec sp_executesql @sql,N'@latestTime datetime output',@latestTime out --本表最近时间 SET @insertSql=' insert into '+@tableName +'select * from '+ ' openquery(192.168.100.101,select * from '+@tableName+' where datetime>'+@latestTime exec(@insertSql) END END