利用python实现mysql数据库向sqlserver的同步
话不多说,利用直接上代码。实现l数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 #!/usr/bin/python # -*- coding:utf8 -*- # author: chenzhixin """ 一、据库安装环境: python3 pip install pymysql pip install pymssql 二、利用实现功能: 将mysql的实现l数oa_2016.formmain_5027(手机打卡记录)数据,高防服务器增量同步到sqlserver数据库的据库kaoqin.CHECKINOUT中 三、运行方法: a)定时任务 [root@oadb1 shell]# crontab -l * * * * * python -W ignore /usr/local/shell/sync_mobile_kaoqin.py >> /var/log/sync_mobile_kaoqin.log 2>&1 b) 日志位置 tail -f /var/log/sync_mobile_kaoqin.log 四、利用测试sql: mysql执行 select * from oa_2016.formmain_5027 sqlserver上执行 select * from CHECKINOUT where sn=手机端打卡 """ from contextlib import contextmanager import pymysql as mysqldb import pymssql as mssqldb import time @contextmanager def get_mysql_conn(**kwargs): """ 建立MySQL数据库连接 :param kwargs: :return: """ conn = mysqldb.connect(host=kwargs.get(host,实现l数 localhost), user=kwargs.get(user), password=kwargs.get(password), port=kwargs.get(port, 3306), database=kwargs.get(database) ) try: yield conn finally: if conn: conn.close() @contextmanager def get_mssql_conn(**kwargs): """ 建立sqlserver数据库连接 :param kwargs: :return: """ conn = mssqldb.connect(server=kwargs.get(host), user=kwargs.get(user), password=kwargs.get(password), database=kwargs.get(database) ) try: yield conn finally: if conn: conn.close def execute_mysql_select_sql(conn, sql): """ 执行mysql的select类型语句 :param conn: :param sql: :return: """ with conn.cursor() as cur: cur.execute(sql) rows = cur.fetchall() return rows def execute_mysql_sql(conn, sql): """ 执行mysql的亿华云dml和ddl语句,不包括select语句 :param conn: :param sql: :return: """ with conn.cursor() as cur: cur.execute(sql) def execute_mssql_sql(conn,据库 sql): """ 执行sqlserver的dml和ddl语句,不包含select语句 :param conn: :param sql: :return: """ with conn.cursor() as cur: cur.execute(sql) conn.commit() def get_mysql_kaoqin_data(conn): """ 获取mysql的利用考勤数据 :param conn: :return: """ sql = "select * from formmain_5027 where field0008 is null or field0008=" mysql_kaoqin_data_rows = execute_mysql_select_sql(conn, sql) return mysql_kaoqin_data_rows def mysql_sync_to_sqlserver(mysql_conn, mssql_conn, data): """ 把mysql的云服务器考勤数据同步到sqlserver数据库里面 :param mysql_conn: :param mssql_conn: :param data: :return: """ for index, row in enumerate(data, 1): ID=row[0] state=row[1] start_member_id=row[2] start_date=row[3] approve_member_id=row[4] approve_date=row[5] finishedflag=row[6] ratifyflag=row[7] ratify_member_id=row[8] ratify_date=row[9] sort=row[10] modify_member_id=row[11] modify_date=row[12] field0001=row[13] field0002=row[14] field0003=row[15] field0004=row[16] field0005=row[17] field0006=row[18] field0007=row[19] field0008=row[20] field0009=row[21] #向sqlserver插入数据 insert_data = """ INSERT INTO [kaoqin].[dbo].[CHECKINOUT] ([USERID] ,[CHECKTIME] ,[CHECKTYPE] ,[VERIFYCODE] ,[SENSORID] ,[Memoinfo] ,[WorkCode] ,[sn] ,[UserExtFmt] ,[Synced]) VALUES ((select userid from USERINFO where BADGENUMBER={ userid}), { CHECKTIME}, I, 1, 1, NULL, 0, 手机端打卡, 0, null )""".format(userid=field0002, CHECKTIME=start_date) execute_mssql_sql(mssql_conn, insert_data) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) print(###############第{ }条手机打卡记录###############\n.format(index), insert_data) marked_sql = "update formmain_5027 set field0008=synced where id={ }".format(ID) execute_mysql_sql(mysql_conn, marked_sql) def main(): mysql_conn_args = dict(user=root, host=127.0.0.1, password=