Airuio 2018-03-20
本文主要内容python MySQLdb数据库批量插入insert,更新update的:
1.python MySQLdb的使用,写了一个基类让其他的sqldb继承这样比较方便,数据库的ip, port等信息使用json配置文件
2.常见的查找,批量插入更新
下面贴出基类代码:
# _*_ coding:utf-8 _*_ import MySQLdb import json import codecs # 这个自己改一下啊 from utils.JsonUtil import get_json_from_file def byteify(input): """ the string of json typed unicode to str in python This function coming from stack overflow :param input: {u'first_name': u'Guido', u'last_name': u'jack'} :return: {'first_name': 'Guido', 'last_name': 'jack'} """ if isinstance(input, dict): return {byteify(key): byteify(value) for key, value in input.iteritems()} elif isinstance(input, list): return [byteify(element) for element in input] elif isinstance(input, unicode): return input.encode('utf-8') else: return input def get_json_from_file(filename): with open(filename) as jf: jsondata = json.load(jf) return byteify(jsondata) class DbBase(object): def __init__(self, **kwargs): self.db_config_file = kwargs['db_config_file'] self.config_db(self.db_config_file) def config_db(self, db_config_file): data = get_json_from_file(db_config_file) host = data['host'] user = data['user'] pwd = data['pwd'] db = data['db'] port = data['port'] self.tb_audit_mobile = data['tb_audit_mobile'] self.conn = MySQLdb.connect(host=host, port=port, user=user, passwd=pwd, db=db, charset="utf8", use_unicode=True) self.cursor = self.conn.cursor()
子类的示例:
class DbAuditTestService(DbBase): def __init__(self, **kwargs): super(DbAuditTestService, self).__init__(**kwargs) def getAdTestURl(self, beg, end): sql = """select url, source from tb_name where create_date BETWEEN '%s' and '%s' """ % (beg, end) self.cursor.execute(sql) res = [row for row in self.cursor] return res def insert(self, lst_row): """batch insert, use ignore 避免索引唯一问题""" try: insert_sql = 'INSERT ignore INTO tb_ms_mobile_report_test (appid, source) VALUES (%s, %s)' self.cursor.executemany(insert_sql, lst_row) self.conn.commit() except MySQLdb.OperationalError as e: logger.info('%s' % e) self.cursor.close() self.conn.close() self.config_db(self.db_config_file) def update_ip_info(self, ip_info): """ batch update [[voilate_right_rate, ip]] :param ip_info: :return: """ query = """ update tb_ms_audit_ip_info set ip_right_rate=%s where submit_ip=%s """ self.cursor.executemany(query, ip_info) self.conn.commit() def insert_all(): """批量操作的示例""" db_audit = DbAuditService(db_config_file='../config/mysql_police_audit.json') size = db_audit.count() db_audit_test = DbAuditTestService(db_config_file='../config/mysql_local_audit.json') batch_size = 2000 for k in xrange(100000, size, batch_size): logger.info('query limit %s ~ %s' % (k, batch_size)) lst_row = db_audit.query_limit(k, batch_size) logger.info('convert_rows ') lst_row = convert_rows(lst_row) db_audit_test.insert(lst_row)
总结