近期公司做个报表系统,为了报表系统中复杂的查询条件,不影响线上业务系统的使用,研究了一下MySQL数据库同步,下面用Java代码实现MySQL数据库同步,以便自己查阅!
数据库同步实现功能点:
1.支持跨服务器跨库的多线程同步
2.每张表的同步有日志记录
3.每次同步记录数可配置
源码和具体的使用细则,可以到下载源码及使用说明请添加链接描述 。
一、数据同步核心代码
package com.zrscsoft.synchtool.db;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;import cn.com.dd.connector.db.ConnectionPoolManager;import com.synch.connector.DataBaseUtil;
import com.synch.connector.DataSourceConnectDBVO;
import com.synch.connector.TableFieldVO;
import com.zrscsoft.synchtool.util.PropertyUtil;
import com.zrscsoft.synchtool.util.StringUtil;public class SynchMain {public void addSynchLog(Connection bakconn, String synchtable,String synchsql, String syncherr, String synchtime) {PreparedStatement bakps = null;try {String bakInsertSQL = "insert into sys_synchlog (synchtable,synchsql,syncherr,synchtime,createtime) values ('"+ synchtable+ "','"+ synchsql+ "',?,'"+ synchtime + "',now())";bakps = bakconn.prepareStatement(bakInsertSQL);bakps.setString(1, syncherr);bakps.execute();bakconn.commit();} catch (Exception e) {e.printStackTrace();} finally {// 备库连接释放if (bakps != null) {try {bakps.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}public void synch(String synchtablenames) throws Exception {HashMap<String, String> synProperty = PropertyUtil.getAllMessage();// 数据库连接参数DataSourceConnectDBVO maindb = new DataSourceConnectDBVO();maindb.setDataSourceType(0);maindb.setDataSourceName("maindb"+synchtablenames);maindb.setAccessMethod(0);maindb.setDataSourceAddress(synProperty.get("maindb.address"));maindb.setDataSourcePort(Integer.parseInt(synProperty.get("maindb.port")));maindb.setDatabaseName(synProperty.get("maindb.databasename"));maindb.setAccountName(synProperty.get("maindb.accountname"));maindb.setAccountPwd(synProperty.get("maindb.accountpwd"));DataSourceConnectDBVO bakdb = new DataSourceConnectDBVO();bakdb.setDataSourceType(0);bakdb.setDataSourceName("bakdb"+synchtablenames);bakdb.setAccessMethod(0);bakdb.setDataSourceAddress(synProperty.get("bakdb.address"));bakdb.setDataSourcePort(Integer.parseInt(synProperty.get("bakdb.port")));bakdb.setDatabaseName(synProperty.get("bakdb.databasename"));bakdb.setAccountName(synProperty.get("bakdb.accountname"));bakdb.setAccountPwd(synProperty.get("bakdb.accountpwd"));int currows = Integer.parseInt(synProperty.get("synchrows"));// 每次同步的记录数// 同步表名.String synchtablenamestmp=synProperty.get(synchtablenames);if(synchtablenamestmp==null||"".equals(synchtablenamestmp)){return;}String[] tableNames = synchtablenamestmp.split(",");for (int t = 0; t < tableNames.length; t++) {String tableName = tableNames[t];// 获取同步字段名List<TableFieldVO> list = DataBaseUtil.getFieldsForTable(tableName,maindb);String fields = "`"+list.get(0).getColumnName()+"`";String values = "?";String idfield="";int paraCount = list.size();for (int i = 1; i < paraCount; i++) {String columnName=list.get(i).getColumnName();fields = fields + ",`" + columnName+"`";values = values + ",?";if("id".equals(columnName.toLowerCase())){idfield=columnName;}}Connection mainconn = null;PreparedStatement mainps = null;ResultSet mainrs = null;Connection bakconn = null;PreparedStatement bakps = null;int curpage = 1;// 第几页,默认显示第1页long startTime = System.currentTimeMillis();String mainQuerySQL = "";String bakInsertSQL ="";while (true) {try {String orderby="".equals(idfield)?"":" order by "+idfield+" ";String limit = orderby+" limit " + (curpage - 1) * currows + ","+ currows;mainQuerySQL = "select * from " + tableName + limit;bakInsertSQL = "insert ignore into " + tableName + " ("+ fields + ") values (" + values + ")";// 主库mainconn = DataBaseUtil.getMySQLConnect(maindb);mainps = mainconn.prepareStatement(mainQuerySQL);mainrs = mainps.executeQuery();// 被库bakconn = DataBaseUtil.getMySQLConnect(bakdb);bakps = bakconn.prepareStatement(bakInsertSQL);int querySize = 0;while (mainrs.next()) {querySize++;for (int i = 0; i < paraCount; i++) {bakps.setString(i + 1, mainrs.getString(list.get(i).getColumnName()));}bakps.addBatch();}if (querySize == 0) {break;}curpage++;bakps.executeBatch();bakconn.commit();this.addSynchLog(bakconn, tableName, mainQuerySQL, "",(System.currentTimeMillis() - startTime) + "");} catch (Exception e) {e.printStackTrace();this.addSynchLog(bakconn, tableName, mainQuerySQL,StringUtil.getExceptionAllinformation(e),(System.currentTimeMillis() - startTime) + "");break;} finally {// 主库连接释放if (mainrs != null) {try {mainrs.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();}}if (mainps != null) {try {mainps.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();}}if (mainconn != null) {ConnectionPoolManager.getInstance().release(mainconn);}// 备库连接释放if (bakps != null) {try {bakps.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();}}if (bakconn != null) {ConnectionPoolManager.getInstance().release(bakconn);}}}System.out.println(tableNames.length + "/" + t + "表" + tableName+ "耗时:" + (System.currentTimeMillis() - startTime) + "毫秒");}}public static void main(String[] args) throws Exception {new SynchThread("synchtablenames1").start();new SynchThread("synchtablenames2").start();new SynchThread("synchtablenames3").start();}}
二、数据库同步多线程实现
package com.zrscsoft.synchtool.db;public class SynchThread extends Thread {private String synchtablenames;public SynchThread(String synchtablenames) {this.synchtablenames = synchtablenames;}public void run() {if(synchtablenames==null||"".equals(synchtablenames)){return;}SynchMain main = new SynchMain();try {main.synch(synchtablenames);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}}}
三、配置文件及读取配置文件代码
配置文件内容为:
#主库
maindb.address=localhost
maindb.port=3306
maindb.databasename=mysqlmain
maindb.accountname=root
maindb.accountpwd=root
#备库
bakdb.address=localhost
bakdb.port=3306
bakdb.databasename=mysqlbak
bakdb.accountname=root
bakdb.accountpwd=root
#同步表名,多张表以,分开
synchtablenames1=sys_user1
synchtablenames2=sys_user2
synchtablenames3=sys_user3
#每次同步记录数
synchrows=10000
读取配置文件的Java类为:
package com.zrscsoft.synchtool.util;import java.util.Enumeration;
import java.util.HashMap;
import java.util.ResourceBundle;public class PropertyUtil {public static HashMap<String, String> getAllMessage() {String propertyName = "synch";// 获得资源包ResourceBundle rb = ResourceBundle.getBundle(propertyName.trim());// 通过资源包拿到所有的keyEnumeration<String> allKey = rb.getKeys();// 遍历key 得到 valueHashMap<String, String> valList = new HashMap<String, String>();while (allKey.hasMoreElements()) {String key = allKey.nextElement();String value = (String) rb.getString(key);valList.put(key, value);}return valList;}
}
运行方式为:执行SynchMain.java类中的main()方法。
源码和具体的使用细则,可以到下载源码及使用说明请添加链接描述 。
转载于:https://blog.51cto.com/1681229/2370801