图片作者
对于并行处理,我们将任务划分为子单元。它增加了程序处理的作业数量并减少了总体处理时间。
例如,如果您正在使用大型 CSV 文件并且想要修改单个列。我们将数据作为数组提供给函数,它将根据可用的数量一次并行处理多个值 工人。这些工作线程基于处理器内的内核数量。
请注意: 在较小的数据集上使用并行处理不会缩短处理时间。
在本博客中,我们将学习如何使用以下方法减少大文件的处理时间 多处理, 作业库及 全面质量管理 Python 包。这是一个简单的教程,可以适用于任何文件、数据库、图像、视频和音频。
请注意: 我们使用 Kaggle 笔记本进行实验。处理时间因机器而异。
我们将使用 美国事故(2016 年 – 2021 年) 来自 Kaggle 的数据集,包含 2.8 万条记录和 47 列。
我们将导入 multiprocessing
, joblib
及 tqdm
并行处理, pandas
数据摄取及 re
, nltk
及 string
文本处理.
# 并行计算 进口 多处理 as mp 止 作业库 进口 并行、延迟 止 tqdm笔记本 进口 全面质量管理 # 数据摄取 进口 大熊猫 as pd # 文本处理 进口 re 止 语料库 进口 停用词 进口 绳子
在我们开始之前,让我们先设置一下 n_workers
通过加倍 cpu_count()
。如您所见,我们有 8 名工人。
n_workers = 2 * mp.cpu_count() print(f"{n_workers} 个工人可用") >>> 8名工人可用
在下一步中,我们将使用以下命令提取大型 CSV 文件 大熊猫 read_csv
功能。然后,打印出数据框的形状、列的名称和处理时间。
请注意: Jupyter 的神奇功能
%%time
可以显示 CPU时间 和 墙上时间 在该过程结束时。
%%时间 file_name =“../input/us-accidents/US_Accidents_Dec21_updated.csv” df = pd.read_csv(file_name) print(f"形状:{df.shape}nn列名称:n{df.columns}n")
输出
形状:(2845342, 47) 列名称:索引(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', '距离(mi) ', '描述', '号码', '街道', '侧面', '城市', '县', '州', '邮政编码', '国家', '时区', '机场代码', '天气时间戳', '温度(F)', 'Wind_Chill(F)', '湿度(%)', '气压(in)', '能见度(mi)', 'Wind_Direction', 'Wind_Speed(mph)', '降水量(in) )', 'Weather_Condition', '便利设施', '颠簸', '十字路口', 'Give_Way', 'Junction', 'No_Exit', '铁路', '环岛', '车站', '停止', 'Traffic_Calming' , '交通信号', '转向循环', '日出_日落', '民用_暮光', '航海_暮光', '天文_暮光'], dtype='object') CPU 时间:用户 33.9 秒,系统:3.93 秒,总计:37.9 秒 挂壁时间:46.9 秒
clean_text
是一个用于处理和清理文本的简单函数。我们会得到英语 停用词 运用 nltk.copus
使用它从文本行中过滤掉停用词。之后,我们将从句子中删除特殊字符和多余的空格。它将作为确定处理时间的基线函数 串行, 并行及 批量 处理。
DEF 干净的文本(文本): # 删除停用词 stops = stopwords.words("english") text = " ".join([word 字 in 文本.split() if 字 不能 in 停止]) # 删除特殊字符 text = text.translate(str.maketrans('', '', string.punctuation)) # 删除多余的空格 文本 = re.sub(' +', ' ', 文本) 回报 文本
对于串行处理,我们可以使用 pandas .apply()
功能,但如果想看到进度条,需要激活 全面质量管理 大熊猫 然后使用 .progress_apply()
功能。
我们将处理这 2.8 万条记录并将结果保存回“Description”列。
%%时间 tqdm.pandas() df['描述'] = df['描述'].progress_apply(clean_text)
输出
耗时9分5秒 高端 处理器串行处理 2.8 万行。
100% 2845342/2845342 [09:05<00:00, 5724.25it/s] CPU 时间:用户 8 分钟 14 秒,系统:53.6 秒,总计:9 分钟 7 秒 挂墙时间:9分5秒
并行处理文件的方法有多种,我们将了解所有这些方法。这 multiprocessing
是一个内置的Python包,通常用于并行处理大文件。
我们将创建一个多处理 泳池 8工人 并使用 地图 函数来启动该过程。为了显示进度条,我们使用 全面质量管理.
地图功能由两部分组成。第一个需要函数,第二个需要参数或参数列表。
通过阅读了解更多信息 文件.
%%时间 p = mp.Pool(n_workers) df['描述'] = p.map(clean_text,tqdm(df['描述']))
输出
我们的处理时间缩短了近乎 3X。处理时间从 9分钟5秒 至 3分钟51秒.
100% 2845342/2845342 [02:58<00:00, 135646.12it/s] CPU 时间:用户 5.68 秒,系统:1.56 秒,总计:7.23 秒 挂壁时间:3分51秒
现在我们将了解另一个执行并行处理的 Python 包。在本节中,我们将使用 joblib 的 并行 和 延迟 来复制 地图 功能。
- 并行需要两个参数:n_jobs = 8 和 backend = 多处理。
- 然后,我们将添加 干净的文本 到 延迟 功能。
- 创建一个循环以一次提供一个值。
下面的过程非常通用,您可以根据需要修改函数和数组。我已经用它处理了数千个音频和视频文件,没有任何问题。
推荐: 使用添加异常处理 try:
和 except:
DEF 文本并行清理(数组):结果=并行(n_jobs = n_workers,backend =“multiprocessing”)(延迟(clean_text)(文本) 文本 in tqdm(数组) ) 回报 导致
添加“描述”栏 text_parallel_clean()
.
%%时间 df['描述'] = text_parallel_clean(df['描述'])
输出
我们的函数比多处理函数多花了 13 秒 游泳池 即使这样, 并行 比 快 4 分 59 秒 串行 处理。
100% 2845342/2845342 [04:03<00:00, 10514.98it/s] CPU 时间:用户 44.2 秒,系统:2.92 秒,总计:47.1 秒 挂壁时间:4分4秒
有一种更好的方法来处理大文件,将它们分成批并并行处理。让我们首先创建一个批处理函数来运行 clean_function
在单批值上。
批处理功能
DEF 过程批处理(批): 回报 [ 干净的文本(文本) 文本 in 批 ]
将文件分成批次
下面的函数将根据工作人员的数量将文件分成多个批次。在我们的例子中,我们得到 8 个批次。
DEF 批处理文件(数组,n_workers):file_len = len(数组)batch_size = round(file_len / n_workers)批次= [数组[ix:ix+batch_size] ix in tqdm(范围(0,file_len,batch_size))] 回报 批次 批次 = batch_file(df['描述'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]
运行并行批处理
最后,我们将使用 并行 和 延迟 来处理批次。
请注意: 要获取单个值数组,我们必须运行列表理解,如下所示。
%%时间 batch_output = 并行(n_jobs = n_workers,backend =“multiprocessing”)(延迟(proc_batch)(批处理) 批量 in tqdm(批次) ) df['描述'] = [j i in 批量输出 j in i]
输出
我们改进了处理时间。该技术因处理复杂数据和训练深度学习模型而闻名。
100% 8/8 [00:00<00:00, 2.19it/s] CPU 时间:用户 3.39 秒,系统:1.42 秒,总计:4.81 秒 挂壁时间:3分56秒
tqdm 将多处理提升到一个新的水平。它简单而强大。我会把它推荐给每一位数据科学家。
查询 文件 了解有关多处理的更多信息。
process_map
要求:
- 功能名称
- 数据框列
- 最大工人数
- 卡盘尺寸与批量尺寸相似。我们将使用工人数量来计算批量大小,或者您可以根据您的喜好添加数量。
%%时间 止 tqdm.contrib.并发 进口 流程图 批次 = round(len(df)/n_workers) df["描述"] = process_map( clean_text, df["描述"], max_workers=n_workers, chunksize=batch )
输出
只需一行代码,我们就可以获得最佳结果。
100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] CPU 时间:用户 7.32 秒,系统:1.97 秒,总计:9.29 秒 挂壁时间:3分51秒
您需要找到平衡点并选择最适合您的情况的技术。它可以是串行处理、并行或批处理。如果您使用较小、不太复杂的数据集,并行处理可能会适得其反。
在这个迷你教程中,我们学习了各种 Python 包和技术,它们使我们能够并行处理数据函数。
如果您仅使用表格数据集并希望提高处理性能,那么我建议您尝试 达斯克, 数据表及 急流
参考文献
阿比德·阿里·阿万 (@1abidaliawan) 是一名经过认证的数据科学家专业人士,他热爱构建机器学习模型。 目前,他专注于内容创建和撰写有关机器学习和数据科学技术的技术博客。 Abid 拥有技术管理硕士学位和电信工程学士学位。 他的愿景是使用图形神经网络为患有精神疾病的学生构建一个人工智能产品。
- SEO 支持的内容和 PR 分发。 今天得到放大。
- 柏拉图区块链。 Web3 元宇宙智能。 知识放大。 访问这里。
- Sumber: https://www.kdnuggets.com/2022/07/parallel-processing-large-file-python.html?utm_source=rss&utm_medium=rss&utm_campaign=parallel-processing-large-file-in-python
- 1
- 10
- 11
- 2016
- 2021
- 39
- 7
- 9
- a
- 关于
- 事故
- 根据
- 后
- AI
- 所有类型
- 和
- 另一个
- 使用
- 论点
- 参数
- 排列
- 音频
- 可使用
- 背部
- 后端
- 当前余额
- 酒吧
- 酒吧
- 基于
- 底线
- 如下。
- 最佳
- 更好
- 博客
- 博客
- 建立
- 建筑物
- 内建的
- 计算
- 案件
- 认证
- 字符
- 城市
- 清洁
- 码
- 柱
- 列
- 常用
- 复杂
- 并发
- 内容
- 国家
- 县
- 中央处理器
- 创建信息图
- 创造
- 创建
- 目前
- data
- 数据科学
- 数据科学家
- 数据库
- 深
- 深入学习
- 学位
- 延迟
- 描述
- 确定
- 屏 显:
- 加倍
- 下降
- 工程师
- 英语
- 醚(ETH)
- 所有的
- 例子
- 例外
- 额外
- 著名
- 快
- 文件
- 档
- 过滤
- 找到最适合您的地方
- (名字)
- 聚焦
- 止
- 功能
- 功能
- 得到
- GitHub上
- 去
- 图形
- 图神经网络
- 处理
- 持有
- 创新中心
- How To
- HTML
- HTTPS
- 疾病
- 图片
- 进口
- 改善
- 改善
- in
- 增加
- 开始
- 问题
- IT
- 工作机会
- 跳
- 掘金队
- 大
- 学习用品
- 知道
- 学习
- Level
- Line
- 清单
- 机
- 机器学习
- 魔法
- 颠覆性技术
- 地图
- 主
- 心理
- 精神疾病
- 百万
- 分钟
- 模型
- 修改
- 更多
- 多
- 姓名
- 名称
- 需求
- 需要
- 网络
- 神经
- 神经网络
- 下页
- 笔记本
- 数
- 对象
- 最划算
- 包
- 包
- 大熊猫
- 并行
- 演出
- 性能
- 柏拉图
- 柏拉图数据智能
- 柏拉图数据
- 强大
- 打印
- 过程
- 处理
- 处理器
- 产品
- 所以专业
- 曲目
- 进展
- 蟒蛇
- 铁路
- RE
- 阅读
- 建议
- 记录
- 减少
- 减少
- 去掉
- 删除
- 需要
- 导致
- 运行
- 保存
- 科学
- 科学家
- 其次
- 秒
- 部分
- 部分
- 句子
- 串行
- 集
- 形状
- 如图
- 类似
- 简易
- 单
- 尺寸
- 小
- 剩余名额
- 特别
- 分裂
- 开始
- 州/领地
- 站
- 步
- Stop 停止
- 车站
- 简单的
- 街头
- 奋斗的
- 学生
- 需要
- 任务
- 文案
- 技术
- 技术
- 专业技术
- 电信
- 数千
- 次
- 时
- 时区
- 至
- 合计
- 产品培训
- 教程
- us
- 使用
- 用户
- 折扣值
- 价值观
- 各个
- 视频
- 愿景
- 方法
- 这
- WHO
- 将
- 中
- 也完全不需要
- Word
- 话
- 工人
- 加工
- 合作
- 写作
- 您一站式解决方案
- 和风网