百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

python执行.sql语法和文件

wptr33 2025-01-06 15:47 28 浏览

前面我们执行了python文件对mysql数据库的单条语法操作,那么如果我们要对很多句的.sql文件如何操作呢?

用for循环来做,除去空格!但是这句没有清除sql的备注语言

try:

#db = pymysql.connect(host='localhost',user='root', password='12345678', port=3306, db='stocks', charset='utf8')

#c = db.cursor()

with open('000001_sz.sql',encoding='utf-8',mode='r') as f:

# 读取整个sql文件,以分号切割。[:-1]删除最后一个元素,也就是空字符串

sql_list = f.read().split(';')[:-1]

sql_listsure = ''

#print(sql_list)

#raise SystemExit

for x in sql_list:

# 判断包含空行的

if '\n' in x:

# 替换空行为1个空格

x = x.replace('\n', ' ')


# 判断多个空格时

if ' ' in x:

# 替换为空

x = x.replace(' ', '')


# sql语句添加分号结尾

sql_item = x+';'

c.execute(sql_item)

print("执行成功sql: %s"%sql_item)

except Exception as e:

print(e)

print('执行失败sql: %s'%sql_item)

finally:

那我们再添加上删除sql的备注的正则表达式

import re

def rehint_limit(sql_values):

write_tag = 0 # 用来控制是否写入新变量

write_limit = '' # 记录/或者*

sql_result = '' # 记录去除注释后的结果

for case in sql_values:

if (write_limit + case) == '/*':

sql_result = sql_result.strip('/') # 去除最后一个/

write_tag += 1

if write_tag == 0:

sql_result += case

if (write_limit + case) == '*/':

write_tag -= 1

write_limit = ''

if '/' == case or '*' == case:

write_limit = case

return sql_result

def rehint_line(sql_values):

rev = re.compile('--.*\\n?')

sql_values = re.sub(rev,'\n',sql_values)

return sql_values

if __name__ == '__main__':

sql = '''

--这是个sql

select '1' v1,'2' v2 from dual union all;

select '2' v1,'3' v2 from dual union all;

/* 这段select 1 v1,2 v2 from dual union all

select 2 v1,3 v2 from dual 写错了

*/

select /* 这个是select语句 */ 'a' v1, --v1列

'b' v2 --v2列

from dual; --dual是个伪表

'''

print(sql)

# 先去除段注释

sql = rehint_limit(sql)

print('rehint_limit: ' + sql)

# 去除行注释

sql = rehint_line(sql)

print('rehint_line: ' + sql)

好像很麻烦的样子!

那咱们封装一下,直接做一个封装的文件

例如import pymysql

from pathlib import Path

class ConnectMsql:

def __init__(self, host='localhost', port=3306, user='root',

password='12345678', database="stocks", filename: str = "test_sh.sql"):

"""

:param host: 域名

:param port: 端口

:param user: 用户名

:param password: 密码

:param database: 数据库名

:param filename: 文件名称

"""

self._host: str = host

self._port: int = port

self._user: str = user

self._password: str = password

self._database: str = database

self._file_path = Path(__file__).parent.joinpath(filename)

def _show_databases_and_create(self):

"""

查询数据库是否存在,不存在则进行新建操作

:return:

"""

connection = pymysql.connect(host=self._host, port=self._port, user=self._user, password=self._password,

cursorclass=pymysql.cursors.DictCursor)

with connection:

with connection.cursor() as cursor:

cursor.execute('show databases;')

result = cursor.fetchall()

results = self._database not in tuple(x["Database"] for x in result)

if results:

with connection.cursor() as cursor:

cursor.execute(f'create database {self._database};')

with connection.cursor() as cursor:

cursor.execute('show databases;')

result = cursor.fetchall()

results = self._database in tuple(x["Database"] for x in result)

return results if results else result

else:

return True

def _export_databases_data(self):

"""

读取.sql文件,解析处理后,执行sql语句

:return:

"""

if self._show_databases_and_create() is True:

connection = pymysql.connect(host=self._host, port=self._port, user=self._user, password=self._password,

database=self._database, charset='utf8')

# 读取sql文件,并提取出sql语句

results, results_list = "", []

with open(self._file_path, mode="r+", encoding="utf-8") as r:

for sql in r.readlines():

# 去除数据中的“\n”和“\r”字符

sql = sql.replace("\n", "").replace("\r", "")

# 获取不是“--”开头且不是“--”结束的数据

if not sql.startswith("--") and not sql.endswith("--"):

# 获取不是“--”的数据

if not sql.startswith("--"):

results = results + sql

# 根据“;”分割数据,处理后插入列表中

for i in results.split(";"):

if i.startswith("/*"):

results_list.append(i.split("*/")[1] + ";")

else:

results_list.append(i + ";")

# 执行sql语句

with connection:

with connection.cursor() as cursor:

# 循环获取sql语句

for x in results_list[:-1]:

# 执行sql语句

cursor.execute(x)

# 提交事务

connection.commit()

else:

return "sql全部语句执行成功 !"

@property

def sql_run(self):

"""

执行方法

:return:

"""

return self._export_databases_data()

if __name__ == '__main__':

res = ConnectMsql().sql_run

print(res)

这样就可以直接用python对*.sql进行操作了

相关推荐

oracle数据导入导出_oracle数据导入导出工具

关于oracle的数据导入导出,这个功能的使用场景,一般是换服务环境,把原先的oracle数据导入到另外一台oracle数据库,或者导出备份使用。只不过oracle的导入导出命令不好记忆,稍稍有点复杂...

继续学习Python中的while true/break语句

上次讲到if语句的用法,大家在微信公众号问了小编很多问题,那么小编在这几种解决一下,1.else和elif是子模块,不能单独使用2.一个if语句中可以包括很多个elif语句,但结尾只能有一个...

python continue和break的区别_python中break语句和continue语句的区别

python中循环语句经常会使用continue和break,那么这2者的区别是?continue是跳出本次循环,进行下一次循环;break是跳出整个循环;例如:...

简单学Python——关键字6——break和continue

Python退出循环,有break语句和continue语句两种实现方式。break语句和continue语句的区别:break语句作用是终止循环。continue语句作用是跳出本轮循环,继续下一次循...

2-1,0基础学Python之 break退出循环、 continue继续循环 多重循

用for循环或者while循环时,如果要在循环体内直接退出循环,可以使用break语句。比如计算1至100的整数和,我们用while来实现:sum=0x=1whileTrue...

Python 中 break 和 continue 傻傻分不清

大家好啊,我是大田。...

python中的流程控制语句:continue、break 和 return使用方法

Python中,continue、break和return是控制流程的关键语句,用于在循环或函数中提前退出或跳过某些操作。它们的用途和区别如下:1.continue(跳过当前循环的剩余部分,进...

L017:continue和break - 教程文案

continue和break在Python中,continue和break是用于控制循环(如for和while)执行流程的关键字,它们的作用如下:1.continue:跳过当前迭代,...

作为前端开发者,你都经历过怎样的面试?

已经裸辞1个月了,最近开始投简历找工作,遇到各种各样的面试,今天分享一下。其实在职的时候也做过面试官,面试官时,感觉自己问的问题很难区分候选人的能力,最好的办法就是看看候选人的github上的代码仓库...

面试被问 const 是否不可变?这样回答才显功底

作为前端开发者,我在学习ES6特性时,总被const的"善变"搞得一头雾水——为什么用const声明的数组还能push元素?为什么基本类型赋值就会报错?直到翻遍MDN文档、对着内存图反...

2023金九银十必看前端面试题!2w字精品!

导文2023金九银十必看前端面试题!金九银十黄金期来了想要跳槽的小伙伴快来看啊CSS1.请解释CSS的盒模型是什么,并描述其组成部分。...

前端面试总结_前端面试题整理

记得当时大二的时候,看到实验室的学长学姐忙于各种春招,有些收获了大厂offer,有些还在苦苦面试,其实那时候的心里还蛮忐忑的,不知道自己大三的时候会是什么样的一个水平,所以从19年的寒假放完,大二下学...

由浅入深,66条JavaScript面试知识点(七)

作者:JakeZhang转发链接:https://juejin.im/post/5ef8377f6fb9a07e693a6061目录...

2024前端面试真题之—VUE篇_前端面试题vue2020及答案

添加图片注释,不超过140字(可选)...

今年最常见的前端面试题,你会做几道?

在面试或招聘前端开发人员时,期望、现实和需求之间总是存在着巨大差距。面试其实是一个交流想法的地方,挑战人们的思考方式,并客观地分析给定的问题。可以通过面试了解人们如何做出决策,了解一个人对技术和解决问...