使用Python实现将多表分批次从数据库导出到Excel

来自:互联网
时间:2020-05-16
阅读:
免费资源网 - https://freexyz.cn/

一、应用场景

为了避免反复的手手工从后台数据库导出某些数据表到Excel文件、高效率到多份离线数据。

二、功能事项

支持一次性导出多个数据源表、自动获取各表的字段名。

支持控制批次的写入速率。例如:每5000行一个批次写入到excel。

支持结构相同的表导入到同一个Excel文件。可适用于经过水平切分后的分布式表。

三、主要实现

1、概览

A[创建类] -->|方法1| B(创建数据库连接)
A[创建类] -->|方法2| C(取查询结果集)
A[创建类] -->|方法3| D(利用句柄写入Excel)
A[创建类] -->|方法4| E(读取多个源表)

B(创建数据库连接) -->U(调用示例)
C(取查询结果集) -->U(调用示例)
D(利用句柄写入Excel) -->U(调用示例)
E(读取多个源表) -->U(调用示例)

2、主要方法

首先需要安装第三方库pymssql实现对SQLServer的连接访问,自定义方法__getConn()需要指定如下五个参数:服务器host、登录用户名user、登录密码pwd、指定的数据库db、字符编码charset。连接成功后,通过cursor()获取游标对象,它将用来执行数据库脚本,并得到返回结果集和数据总量。

创建数据库连接和执行SQL的源码:

def __init__(self,host,user,pwd,db):
    self.host = host
    self.user = user
    self.pwd = pwd
    self.db = db

  def __getConn(self):
    if not self.db:
      rAIse(NameError,'没有设置数据库信息')
    self.conn = pymssql.connect(host=self.host, user=self.user, password=self.pwd, database=self.db, charset='utf8')
    cur = self.conn.cursor()
    if not cur:
      raise(NameError,'连接数据库失败')
    else:
      return cur

3、方法3中写入Excel时,注意一定要用到Pandas中的公共句柄ExcelWriter对象writer。当数据被分批多次写入同一个文件时,如果直接使用to_excel()方法,则前面批次的结果集将会被后续结果覆盖。增加了这个公共句柄限制后,后面的写入会累加到前面写入的数据尾部行,而不是全部覆盖。

writer = pd.ExcelWriter(file)
df_fetch_data[rs_startrow:i*N].to_excel(writer, header=isHeader, index=False, startrow=startRow)

分批次写入到目标Excel时的另一个要注意的参数是写入行startrow的设置。每次写入完成后需要重新指下一批次数据的初始位置值。每个批次的数据会记录各自的所属批次信息。

利用关键字参数**args 指定多个数据源表和数据库连接。

def exportToExcel(self, **args):
  for sourceTB in args['sourceTB']:    
    arc_dict = dict(
      sourceTB = sourceTB,
      path=args['path'],
      startRow=args['startRow'],
      isHeader=args['isHeader'],
      batch=args['batch']
    )
    print('n当前导出的数据表为:%s' %(sourceTB))
    self.writeToExcel(**arc_dict)
  return 'success'

四、先用类MSSQL创建对象,再定义关键字参数args,最终调用方法导出到文件即完成数据导出。

#!/usr/bin/env python
# coding: utf-8

# 主要功能:分批次导出大数据量、结构相同的数据表到excel 
# 导出多个表的数据到各自的文件, 
# 目前问题:to_excel 虽然设置了分批写入,但先前的数据会被下一次写入覆盖,
# 利用Pandas包中的ExcelWriter()方法增加一个公共句柄,在写入新的数据之时保留原来写入的数据,等到把所有的数据都写进去之后关闭这个句柄
import pymssql 
import pandas as pd 
import datetime 
import math
 
class MSSQL(object):
  def __init__(self,host,user,pwd,db):
    self.host = host
    self.user = user
    self.pwd = pwd
    self.db = db
 
  def __getConn(self):
    if not self.db:
      raise(NameError,'没有设置数据库信息')
    self.conn = pymssql.connect(host=self.host, user=self.user, password=self.pwd, database=self.db, charset='utf8')
    cur = self.conn.cursor()
    if not cur:
      raise(NameError,'连接数据库失败')
    else:
      return cur
   
  def executeQuery(self,sql):
    cur = self.__getConn()
    cur.execute(sql)
    # 获取所有数据集
    # fetchall()获取结果集中的剩下的所有行
    # 如果数据量太大,是否需要分批插入 
    resList, rowcount = cur.fetchall(),cur.rowcount
    self.conn.close()
    return (resList, rowcount)
 
  # 导出单个数据表到excel 
  def writeToExcel(self,**args):
    sourceTB = args['sourceTB']
    columns = args.get('columns')
    path=args['path']
    fname=args.get('fname')
    startRow=args['startRow']
    isHeader=args['isHeader']
    N=args['batch']
     
    # 获取指定源数据列
    if columns is None:
      columns_select = ' * '
    else:
      columns_select = ','.join(columns)
     
    if fname is None:
      fname=sourceTB+'_exportData.xlsx'
     
    file = path + fname
    # 增加一个公共句柄,写入新数据时,保留原数据 
    writer = pd.ExcelWriter(file)
     
    sql_select = 'select '+ columns_select + ' from '+ sourceTB
    fetch_data, rowcount = self.executeQuery(sql_select)
    # print(rowcount)
     
    df_fetch_data = pd.DataFrame(fetch_data)
    # 一共有roucount行数据,每N行一个batch提交写入到excel 
    times = math.floor(rowcount/N)
    i = 1
    rs_startrow = 0
    # 当总数据量 > 每批插入的数据量时 
    print(i, times)
    is_while=0
    while i <= times:
      is_while = 1
      # 如果是首次,且指定输入标题,则有标题
      if i==1:
        # isHeader = True
        startRow = 1
      else:
        # isHeader = False
        startRow+=N
      # 切片取指定的每个批次的数据行 ,前闭后开 
      # startrow: 写入到目标文件的起始行。0表示第1行,1表示第2行。。。
      df_fetch_data['batch'] = 'batch'+str(i)
      df_fetch_data[rs_startrow:i*N].to_excel(writer, header=isHeader, index=False, startrow=startRow)
      print('第',str(i),'次循环,取源数据第',rs_startrow,'行至',i*N,'行','写入到第',startRow,'行')
      print('第',str(i),'次写入数据为:',df_fetch_data[rs_startrow:i*N])
      # 重新指定源数据的读取起始行
      rs_startrow =i * N
      i+=1
 
    # 写入文件的开始行数
    # 当没有做任何循环时,仍然从第一行开始写入
    if is_while == 0:
      startRow = startRow
    else:
      startRow+=N
    df_fetch_data['batch'] = 'batch'+str(i)
    print('第{0}次读取数据,从第{1}行开始,写入到第{2}行!'.format(str(i), str(rs_startrow), str(startRow)))
    print('第',str(i),'写入数据为:',df_fetch_data[rs_startrow:i*N])
    df_fetch_data[rs_startrow:i*N].to_excel(writer, header=isHeader, index=False, startrow=startRow)
     
    # 注: 这里一定要saver()将数据从缓存写入磁盘!!!!!!!!!!!!!!!!!!!!!1
    writer.save()
     
    start_time=datetime.datetime.now()
  # 导出结构相同的多个表到同一样excel
  def exportToExcel(self, **args):
    for sourceTB in args['sourceTB']:    
      arc_dict = dict(
        sourceTB = sourceTB,
        path=args['path'],
        startRow=args['startRow'],
        isHeader=args['isHeader'],
        batch=args['batch']
      )
      print('n当前导出的数据表为:%s' %(sourceTB))
      self.writeToExcel(**arc_dict)
       
    return 'success'
    start_time=datetime.datetime.now()
 
if __name__ == "__main__":
  ms = MSSQL(host="localhost",user="test",pwd="test",db="db_jun")
   
  args = dict(
   sourceTB = ['tb2', 'tb1'],# 待导出的表
   path='D:\myPC\Python\',# 导出到指定路径
   startRow=1,#设定写入文件的首行,第2行为数据首行
   isHeader=False,# 是否包含源数据的标题
   batch=5
  )
  # 导出多个文件
  ms.exportToExcel(**args)
免费资源网 - https://freexyz.cn/
返回顶部
顶部