python查询数据库保存为csv

一次次尝试 2020-05-09

# coding: utf-8
 
import pymssql
import csv
import datetime
from file2zip import *
# from send_email import *
import plugs.FileHelper as fileHelper
from  file2zip import FileToZip #类名
from  send_email import SendMail  #类名


class TestCSVInfo(fileHelper.BaseCheck, SendMail,FileToZip):
    report = ‘‘
    def __init__(self, month):
        SendMail.__init__(self, month)

    def get_sel_excel(self):
        print(" 开始链接数据库!")
        # 建立连接
        conn = pymssql.connect(
            host=‘localhost‘,
            port=1433,
            user=‘test‘,
            password=‘test‘,
            database=‘Test‘,
            charset=‘utf8‘
        ) 
        dateFrom = ‘2020-04-01 0:00:00‘
        dateTo = ‘2020-04-30 23:59:59‘
        today = datetime.datetime.now().strftime(‘%Y-%m-%d‘)
        try: 
            print(os.path.exists(month))
            if os.path.exists(month) == False:
                os.makedirs(month)

            # 建立游标
            cursor = conn.cursor()
            # <editor-fold desc="查询语句">
 
            titile = today + ‘_‘ + ‘商品‘
            header = [‘订单号‘, ‘金额‘, ‘日期‘]
            print("开始查询表!")
            # 执行sql语句
            _sql = self.getsql("test.sql")
            Sql = _sql.replace(‘v_from‘, str(dateFrom)).replace(‘v_to‘, str(dateTo))
            print(titile + ‘——数据库查询中。。。‘)
            cursor.execute(Sql)
            # 获取查询到结果
            res = cursor.fetchall()
            self.w_excel(res, header, titile)
   
            # </editor-fold>

            cursor.close()
            conn.close()

            #压缩为zip
            self.addZip(month)

            # 发送邮件
            self.SendMethod(month)

        except Exception as e:
            print("报错{}".format(e))
            cursor.close()
            conn.close()

    # 操作csv
    def w_excel(self, res, header, titile):
        with open(f‘{month}/‘ + titile + ‘.csv‘, ‘w‘, newline=‘‘, encoding=‘utf-8-sig‘) as csvfile:
            writer = csv.writer(csvfile)
            # 写表头
            writer.writerow(header)
            # 写入数据
            for row in range(0, len(res)):
                writer.writerow(list(res[row]))

        print(titile + f"导出成功!,共有条数:{len(res)}")
        print(titile + "结束-------------------------------------------------------------------------")

if __name__ == "__main__":
    _today = datetime.date.today()
    _first = _today.replace(day=1)
    last_month = _first - datetime.timedelta(days=1)
    month = last_month.strftime(‘%Y-%m‘) #获取上个月月份
    # month = datetime.datetime.now().strftime(‘%Y-%m‘) #获取当前月份
    TestCSVInfo(month).get_sel_excel()

相关推荐