python常用的自动化脚本汇总_python 自动脚本
wptr33 2025-09-06 14:04 4 浏览
以下是python常用的自动化脚本,包括数据、网络、文件、性能等操作。
具体内容如下:
数据处理工具
网络检测工具
系统任务自动化工具
测试自动化工具
文件管理自动化工具
性能监控工具
日志分析工具
邮件自动化工具
数据库交互工具
OCR识别
PDF操作自动化
网络抓取自动化
EXCEL电子表格自动化
图像编辑自动化
import pandas as pd
def clean_data(input_file, output_file):
df = pd.read_csv(input_file)
df.dropna(inplace=True) # 删除空值
df.drop_duplicates(inplace=True) # 删除重复值
df.to_csv(output_file, index=False)
# 使用示例
clean_data("data.csv", "cleaned_data.csv")
1.2 数据对比
import pandas as pd
def compare_data(file1, file2):
df1 = pd.read_csv(file1)
df2 = pd.read_csv(file2)
diff = df1.compare(df2)
return diff
# 使用示例
result = compare_data("file1.csv", "file2.csv")
print(result)
2.1 检测端口是否开放
import socket
def check_port(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
# 使用示例
if check_port("example.com", 80):
print("Port 80 is open")
else:
print("Port 80 is closed")
2.2 批量ping测试
import os
def ping_hosts(hosts):
for host in hosts:
response = os.system(f"ping -c 1 {host}")
if response == 0:
print(f"{host} is up")
else:
print(f"{host} is down")
# 使用示例
hosts = ["google.com", "example.com", "localhost"]
ping_hosts(hosts)
3. 监控磁盘空间
import shutil
def check_disk_space(path, threshold):
total, used, free = shutil.disk_usage(path)
free_gb = free // (2**30)
if free_gb < threshold:
print(f"Warning: Free disk space is below {threshold} GB.")
else:
print(f"Free disk space: {free_gb} GB.")
# 使用示例
check_disk_space('/', 10)
4. 使用unittest进行单元测试
import unittest
class TestMyFunction (unittest.TestCase):
def test_addition(self):
result = add(1, 2)
self.assertEqual(result, 3)
def add(a, b):
return a + b
# 使用示例
if __name__ == '__main__':
unittest.main()
5.1 按扩展名排序文件
import os
from shutil import move
def sort_files(directory_path):
for filename in os.listdir(directory_path):
if os.path.isfile(os.path.join(directory_path, filename)):
file_extension = filename.split('.')[-1]
destination_directory = os.path.join(directory_path, file_extension)
ifnot os.path.exists(destination_directory):
os.makedirs(destination_directory)
move(os.path.join(directory_path, filename), os.path.join(destination_directory, filename))
# 使用示例
sort_files('/path/to/directory')
5.2 删除空文件夹
import os
def remove_empty_folders(directory_path):
for root, dirs, files in os.walk(directory_path, topdown=False):
for folder in dirs:
folder_path = os.path.join(root, folder)
if not os.listdir(folder_path):
os.rmdir(folder_path)
# 使用示例
remove_empty_folders('/path/to/directory')
5.3 批量重命名文件
import os
def batch_rename(directory, prefix):
for count, filename in enumerate(os.listdir(directory)):
new_name = f"{prefix}_{count}.txt"
os.rename(os.path.join(directory, filename), os.path.join(directory, new_name))
# 使用示例
batch_rename("/path/to/files", "file")
5.4 查找大文件
import os
def find_large_files(directory, size_limit_mb):
size_limit = size_limit_mb * 1024 * 1024# 转换为字节
large_files = []
for root, dirs, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
if os.path.getsize(file_path) > size_limit:
large_files.append(file_path)
return large_files
# 使用示例
large_files = find_large_files("/path/to/directory", 100)
# 查找大于100MB的文件
print (large_files)
6.1 监控cpu和内存使用情况
mport psutil
import time
def monitor_system(interval=1):
while True:
cpu_usage = psutil.cpu_percent(interval=interval)
memory_usage = psutil.virtual_memory().percent
print(f"CPU Usage: {cpu_usage}% | Memory Usage: {memory_usage}%")
time.sleep(interval)
# 使用示例
monitor_system(interval=2)
6.2 监控GPU使用情况
import pynvml
def monitor_gpu_usage():
pynvml.nvmlInit()
device_count = pynvml.nvmlDeviceGetCount()
for i in range(device_count):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
util = pynvml.nvmlDeviceGetUtilizationRates(handle)
memory_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
print(f"GPU {i}: Usage={util.gpu}%, Memory Used={memory_info.used / 1024 ** 2} MB")
# 使用示例
monitor_gpu_usage()
6.3 监控网络带宽
import psutil
import time
def monitor_network_usage(interval=1):
old_value = psutil.net_io_counters().bytes_sent + psutil.net_io_counters().bytes_recv
while True:
new_value = psutil.net_io_counters().bytes_sent + psutil.net_io_counters().bytes_recv
bandwidth = (new_value - old_value) / interval # 计算带宽(字节/秒)
print(f"Network Bandwidth: {bandwidth} B/s")
old_value = new_value
time.sleep(interval)
# 使用示例
monitor_network_usage(interval=2)
6.4 监控磁盘IO
import psutil
import time
def monitor_disk_io(interval=1):
old_read = psutil.disk_io_counters().read_bytes
old_write = psutil.disk_io_counters().write_bytes
while True:
new_read = psutil.disk_io_counters().read_bytes
new_write = psutil.disk_io_counters().write_bytes
read_speed = (new_read - old_read) / interval
write_speed = (new_write - old_write) / interval
print(f"Read Speed: {read_speed / 1024} KB/s | Write Speed: {write_speed / 1024} KB/s")
old_read = new_read
old_write = new_write
time.sleep(interval)
# 使用示例
monitor_disk_io(interval=2)
6.5 监控进程资源占用
import psutil
def monitor_process(pid):
process = psutil.Process(pid)
while True:
cpu_usage = process.cpu_percent(interval=1)
memory_usage = process.memory_info().rss / 1024 ** 2# 转换为MB
print(f"PID {pid}: CPU={cpu_usage}%, Memory={memory_usage} MB")
# 使用示例
monitor_process(1234) # 替换为目标进程的PID
7.1 统计日志中高频错误
from collections import Counter
import re
def top_n_errors(log_file, n=5):
error_pattern = re.compile(r"ERROR: (.+)")
errors = []
with open(log_file, 'r') as f:
for line in f:
match = error_pattern.search(line)
if match:
errors.append(match.group(1))
return Counter(errors).most_common(n)
# 使用示例
top_errors = top_n_errors("app.log", n=3)
print(top_errors)
7.2 按时间范围过滤日志
from datetime import datetime
def filter_logs_by_time(log_file, start_time, end_time, output_file):
start = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
end = datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
withopen(log_file, 'r') as f:
logs = f.readlines()
filtered_logs = []
forloginlogs:
log_time_str = log.split()[0] + " " + log.split()[1] # 假设时间戳在日志的前两部分
log_time = datetime.strptime(log_time_str, "%Y-%m-%d %H:%M:%S")
ifstart <= log_time <= end:
filtered_logs.append(log)
withopen(output_file, 'w') as f:
f.writelines(filtered_logs)
# 使用示例
filter_logs_by_time("app.log", "2025-02-26 12:00:00", "2025-02-06 14:00:00", "filtered_logs.log")
7.3 提取日志中错误信息
def extract_errors(log_file, output_file):
with open(log_file, 'r') as f:
lines = f.readlines()
errors = [line for line in lines if"ERROR"in line]
with open(output_file, 'w') as f:
f.writelines(errors)
# 使用示例
extract_errors("app.log", "errors.log")
7.4 日志文件合并
def merge_log_files(log_files, output_file):
with open(output_file, 'w') as outfile:
for log_file in log_files:
with open(log_file, 'r') as infile:
outfile.write(infile.read())
# 使用示例
merge_log_files(["log1.log", "log2.log", "log3.log"], "merged_logs.log")
7.5 日志文件实时监控
import time
def tail_log_file(log_file):
with open(log_file, 'r') as f:
f.seek(0, 2) # 移动到文件末尾
while True:
line = f.readline()
if line:
print(line.strip())
else:
time.sleep(0.1)
# 使用示例
tail_log_file("app.log")
8. 发送个性化邮件
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
def send_personalized_email(sender_email, sender_password, recipients, subject, body):
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(sender_email, sender_password)
for recipient_email in recipients:
message = MIMEMultipart()
message['From'] = sender_email
message['To'] = recipient_email
message['Subject'] = subject
message.attach(MIMEText(body, 'plain'))
server.send_message(message)
server.quit()
# 使用示例
sender_email = 'your_email@gmail.com'
sender_password = 'your_password'
recipients = ['recipient1@example.com', 'recipient2@example.com']
subject = 'Hello'
body = 'This is a test email.'
send_personalized_email(sender_email, sender_password, recipients, subject, body)
9. 连接到数据库
import sqlite3
def connect_to_database(db_path):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
return conn, cursor
def execute_query(cursor, query):
cursor.execute(query)
results = cursor.fetchall()
return results
# 使用示例
conn, cursor = connect_to_database('/path/to/database.db')
query = 'SELECT * FROM table_name'
results = execute_query(cursor, query)
print(results)
conn.close()
10. 识别图像中的文本
import pytesseract
from PIL import Image
def recognize_text(image_path):
image = Image.open(image_path)
text = pytesseract.image_to_string(image,) # 使用简体中文
return text
# 使用示例
text = recognize_text('/path/to/image.jpg')
print(text)
11. 从PDF中提取文本
import PyPDF2
def extract_text_from_pdf(pdf_path):
with open(pdf_path, 'rb') asfile:
reader = PyPDF2.PdfFileReader(file)
text = ''
for page_num inrange(reader.numPages):
page = reader.getPage(page_num)
text += page.extractText()
return text
# 使用示例
text = extract_text_from_pdf('/path/to/document.pdf')
print(text)
12.1 从网站提取数据
import requests
from bs4 import BeautifulSoup
def scrape_data(url):
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
# 从网站提取相关数据的代码在此处
return soup
# 使用示例
url = 'https://example.com'
soup = scrape_data(url)
print(soup.title.string)
12.2 批量下载图片
import requests
def download_images(url, save_directory):
response = requests.get(url)
if response.status_code == 200:
images = response.json() # 假设API返回一个图片URL的JSON数组
for index, image_url in enumerate(images):
image_response = requests.get(image_url)
if image_response.status_code == 200:
with open(f"{save_directory}/image_{index}.jpg", "wb") as f:
f.write(image_response.content)
# 使用示例
download_images('https://api.example.com/images', '/path/to/save')
13. 读取和写入Excel
import pandas as pd
def read_excel (file_path):
df = pd.read_excel(file_path)
return df
def write_to_excel(data, file_path):
df = pd.DataFrame(data)
df.to_excel(file_path, index=False)
# 使用示例
data = {'Column1': [1, 2, 3], 'Column2': [4, 5, 6]}
write_to_excel(data, '/path/to/output.xlsx')
df = read_excel('/path/to/output.xlsx')
print(df)
14. 调整图像大小
from PIL import Image
def resize_image(input_path, output_path, width, height):
image = Image.open(input_path)
resized_image = image.resize((width, height), Image.ANTIALIAS)
resized_image.save(output_path)
# 使用示例
resize_image('/path/to/input.jpg', '/path/to/output.jpg', 800, 600)
相关推荐
- 栋察宇宙(二十一):Python 文件操作全解析
-
分享乐趣,传播快乐,增长见识,留下美好。亲爱的您,这里是LearingYard学苑!...
- python中12个文件处理高效技巧,不允许你还不知道
-
在Python中高效处理文件是日常开发中的核心技能,尤其是处理大文件或需要高性能的场景。以下是经过实战验证的高效文件处理技巧,涵盖多种常见场景:一、基础高效操作...
- Python内置模块bz2: 对 bzip2压缩算法的支持详解
-
目录简介知识讲解2.1bzip2压缩算法原理2.2bz2模块概述...
- Python文件及目录处理方法_python目录下所有文件名
-
Python可以用于处理文本文件和二进制文件,比如创建文件、读写文件等操作。本文介绍Python处理目录以及文件的相关方法。...
- The West mustn't write China out of WWII any longer
-
ByWarwickPowellLead:Foreightdecades,theWesthasrewrittenWorldWarIIasanAmericanandEuro...
- Python 的网络与互联网访问模块及应用实例(一)
-
Python提供了丰富的内置模块和第三方库来处理网络与互联网访问,使得从简单的HTTP请求到复杂的网络通信都变得相对简单。以下是常用的网络模块及其应用实例。...
- 高效办公:Python处理excel文件,摆脱无效办公
-
一、Python处理excel文件1.两个头文件importxlrdimportxlwt...
- Python进阶:文件读写操作详解_python对文件的读写操作方法有哪些
-
道友今天开始进阶练习,来吧文件读写是Python编程中非常重要的技能,掌握这些操作可以帮助你处理各种数据存储和交换任务。下面我将详细介绍Python中的文件读写操作。一、基本文件操作...
- [827]ScalersTalk成长会Python小组第11周学习笔记
-
Scalers点评:在2015年,ScalersTalk成长会完成Python小组完成了《Python核心编程》第1轮的学习。到2016年,我们开始第二轮的学习,并且将重点放在章节的习题上。Pytho...
- ScalersTalk 成长会 Python 小组第 9 周学习笔记
-
Scalers点评:在2015年,ScalersTalk成长会完成Python小组完成了《Python核心编程》第1轮的学习。到2016年,我们开始第二轮的学习,并且将重点放...
- 简析python 文件操作_python对文件的操作方法
-
一、打开并读文件1、file=open('打开文件的路径','打开文件的权限')#打开文件并赋值给file#默认权限为r及读权限str=read(num)读文件并放到字符串变量中,其中num表...
- Python 中 必须掌握的 20 个核心函数——open()函数
-
open()是Python中用于文件操作的核心函数,它提供了读写文件的能力,是处理文件输入输出的基础。一、open()的基本用法1.1方法签名...
- python常用的自动化脚本汇总_python 自动脚本
-
以下是python常用的自动化脚本,包括数据、网络、文件、性能等操作。具体内容如下:数据处理工具网络检测工具系统任务自动化工具测试自动化工具文件管理自动化工具性能监控工具日志分析工具邮件...
- Python自动化办公应用学习笔记37—文件读写方法1
-
一、文件读写方法1.读取内容:read(size):读取指定大小的数据,如果不指定size,则读取整个文件。...
- 大叔转行SAP:好好学习,好好工作,做一个幸福的SAP人
-
我是一个崇尚努力的人,坚定认为努力可以改变命运和现状,同时也对自己和未来抱有非常高的期待。随着期待的落空,更对现状滋生不满,结果陷入迷茫。开始比较,发现周围人一个个都比你有钱,而你的事业,永远看不到明...
- 一周热门
-
-
C# 13 和 .NET 9 全知道 :13 使用 ASP.NET Core 构建网站 (1)
-
因果推断Matching方式实现代码 因果推断模型
-
程序员的开源月刊《HelloGitHub》第 71 期
-
详细介绍一下Redis的Watch机制,可以利用Watch机制来做什么?
-
假如有100W个用户抢一张票,除了负载均衡办法,怎么支持高并发?
-
Java面试必考问题:什么是乐观锁与悲观锁
-
如何将AI助手接入微信(打开ai手机助手)
-
redission YYDS spring boot redission 使用
-
SparkSQL——DataFrame的创建与使用
-
一文带你了解Redis与Memcached? redis与memcached的区别
-
- 最近发表
-
- 栋察宇宙(二十一):Python 文件操作全解析
- python中12个文件处理高效技巧,不允许你还不知道
- Python内置模块bz2: 对 bzip2压缩算法的支持详解
- Python文件及目录处理方法_python目录下所有文件名
- The West mustn't write China out of WWII any longer
- Python 的网络与互联网访问模块及应用实例(一)
- 高效办公:Python处理excel文件,摆脱无效办公
- Python进阶:文件读写操作详解_python对文件的读写操作方法有哪些
- [827]ScalersTalk成长会Python小组第11周学习笔记
- ScalersTalk 成长会 Python 小组第 9 周学习笔记
- 标签列表
-
- git pull (33)
- git fetch (35)
- mysql insert (35)
- mysql distinct (37)
- concat_ws (36)
- java continue (36)
- jenkins官网 (37)
- mysql 子查询 (37)
- python元组 (33)
- mybatis 分页 (35)
- vba split (37)
- redis watch (34)
- python list sort (37)
- nvarchar2 (34)
- mysql not null (36)
- hmset (35)
- python telnet (35)
- python readlines() 方法 (36)
- munmap (35)
- docker network create (35)
- redis 集合 (37)
- python sftp (37)
- setpriority (34)
- c语言 switch (34)
- git commit (34)