在 Python 中并行处理大文件

在 Python 中并行处理大文件

源节点: 1970104

在 Python 中并行处理大文件
图片作者
 

对于并行处理,我们将任务划分为子单元。它增加了程序处理的作业数量并减少了总体处理时间。 

例如,如果您正在使用大型 CSV 文件并且想要修改单个列。我们将数据作为数组提供给函数,它将根据可用的数量一次并行处理多个值  工人。这些工作线程基于处理器内的内核数量。 
 

请注意: 在较小的数据集上使用并行处理不会缩短处理时间。

 

在本博客中,我们将学习如何使用以下方法减少大文件的处理时间 多处理, 作业库全面质量管理 Python 包。这是一个简单的教程,可以适用于任何文件、数据库、图像、视频和音频。 
 

请注意: 我们使用 Kaggle 笔记本进行实验。处理时间因机器而异。  

 

我们将使用 美国事故(2016 年 – 2021 年) 来自 Kaggle 的数据集,包含 2.8 万条记录和 47 列。 

我们将导入 multiprocessing, joblibtqdm 并行处理, pandas 数据摄取re, nltkstring 文本处理

# 并行计算
进口 多处理 as mp
 作业库 进口 并行、延迟
 tqdm笔记本 进口 全面质量管理 # 数据摄取 
进口 大熊猫 as pd # 文本处理 
进口 re  语料库 进口 停用词
进口 绳子

在我们开始之前,让我们先设置一下 n_workers 通过加倍 cpu_count()。如您所见,我们有 8 名工人。

n_workers = 2 * mp.cpu_count() print(f"{n_workers} 个工人可用") >>> 8名工人可用

在下一步中,我们将使用以下命令提取大型 CSV 文件 大熊猫 read_csv 功能。然后,打印出数据框的形状、列的名称和处理时间。 

请注意: Jupyter 的神奇功能 %%time 可以显示 CPU时间墙上时间 在该过程结束时。 

 

%%时间
file_name =“../input/us-accidents/US_Accidents_Dec21_updated.csv”
df = pd.read_csv(file_name) print(f"形状:{df.shape}nn列名称:n{df.columns}n")

输出

形状:(2845342, 47) 列名称:索引(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', '距离(mi) ', '描述', '号码', '街道', '侧面', '城市', '县', '州', '邮政编码', '国家', '时区', '机场代码', '天气时间戳', '温度(F)', 'Wind_Chill(F)', '湿度(%)', '气压(in)', '能见度(mi)', 'Wind_Direction', 'Wind_Speed(mph)', '降水量(in) )', 'Weather_Condition', '便利设施', '颠簸', '十字路口', 'Give_Way', 'Junction', 'No_Exit', '铁路', '环岛', '车站', '停止', 'Traffic_Calming' , '交通信号', '转向循环', '日出_日落', '民用_暮光', '航海_暮光', '天文_暮光'],
dtype='object') CPU 时间:用户 33.9 秒,系统:3.93 秒,总计:37.9 秒
挂壁时间:46.9 秒

clean_text 是一个用于处理和清理文本的简单函数。我们会得到英语 停用词 运用 nltk.copus 使用它从文本行中过滤掉停用词。之后,我们将从句子中删除特殊字符和多余的空格。它将作为确定处理时间的基线函数 串行, 并行批量 处理。 

DEF 干净的文本(文本): # 删除停用词 stops = stopwords.words("english") text = " ".join([word  in 文本.split() if不能 in 停止]) # 删除特殊字符 text = text.translate(str.maketrans('', '', string.punctuation)) # 删除多余的空格 文本 = re.sub(' +', ' ', 文本) 回报 文本

对于串行处理,我们可以使用 pandas .apply() 功能,但如果想看到进度条,需要激活 全面质量管理 大熊猫 然后使用 .progress_apply() 功能。 

我们将处理这 2.8 万条记录并将结果保存回“Description”列。 

%%时间
tqdm.pandas() df['描述'] = df['描述'].progress_apply(clean_text)

输出

耗时9分5秒 高端 处理器串行处理 2.8 万行。 

100% 2845342/2845342 [09:05<00:00, 5724.25it/s] CPU 时间:用户 8 分钟 14 秒,系统:53.6 秒,总计:9 分钟 7 秒
挂墙时间:9分5秒

并行处理文件的方法有多种,我们将了解所有这些方法。这 multiprocessing 是一个内置的Python包,通常用于并行处理大文件。 

我们将创建一个多处理 泳池 8工人 并使用 地图 函数来启动该过程。为了显示进度条,我们使用 全面质量管理.

地图功能由两部分组成。第一个需要函数,第二个需要参数或参数列表。 

通过阅读了解更多信息 文件

%%时间
p = mp.Pool(n_workers) df['描述'] = p.map(clean_text,tqdm(df['描述']))

输出

我们的处理时间缩短了近乎 3X。处理时间从 9分钟5秒3分钟51秒.   

100% 2845342/2845342 [02:58<00:00, 135646.12it/s] CPU 时间:用户 5.68 秒,系统:1.56 秒,总计:7.23 秒
挂壁时间:3分51秒

现在我们将了解另一个执行并行处理的 Python 包。在本节中,我们将使用 joblib 的 并行 延迟 来复制 地图 功能。 

  • 并行需要两个参数:n_jobs = 8 和 backend = 多处理。
  • 然后,我们将添加 干净的文本  到 延迟 功能。 
  • 创建一个循环以一次提供一个值。 

下面的过程非常通用,您可以根据需要修改函数和数组。我已经用它处理了数千个音频和视频文件,没有任何问题。 

推荐: 使用添加异常处理 try:except:

DEF 文本并行清理(数组):结果=并行(n_jobs = n_workers,backend =“multiprocessing”)(延迟(clean_text)(文本)   文本 in tqdm(数组) ) 回报 导致

添加“描述”栏 text_parallel_clean()

%%时间
df['描述'] = text_parallel_clean(df['描述'])

输出

我们的函数比多处理函数多花了 13 秒 游泳池 即使这样, 并行 比 快 4 分 59 秒 串行 处理。 

100% 2845342/2845342 [04:03<00:00, 10514.98it/s] CPU 时间:用户 44.2 秒,系统:2.92 秒,总计:47.1 秒
挂壁时间:4分4秒

有一种更好的方法来处理大文件,将它们分成批并并行处理。让我们首先创建一个批处理函数来运行 clean_function 在单批值上。 

批处理功能

DEF 过程批处理(批): 回报 [ 干净的文本(文本)   文本 in 批 ]

将文件分成批次

下面的函数将根据工作人员的数量将文件分成多个批次。在我们的例子中,我们得到 8 个批次。 

DEF 批处理文件(数组,n_workers):file_len = len(数组)batch_size = round(file_len / n_workers)批次= [数组[ix:ix+batch_size]   ix in tqdm(范围(0,file_len,batch_size))] 回报 批次 批次 = batch_file(df['描述'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

运行并行批处理

最后,我们将使用 并行 延迟 来处理批次。 

请注意: 要获取单个值数组,我们必须运行列表理解,如下所示。 

 

%%时间
batch_output = 并行(n_jobs = n_workers,backend =“multiprocessing”)(延迟(proc_batch)(批处理)   批量 in tqdm(批次) ) df['描述'] = [j   i in 批量输出   j in i]

输出

我们改进了处理时间。该技术因处理复杂数据和训练深度学习模型而闻名。 

100% 8/8 [00:00<00:00, 2.19it/s] CPU 时间:用户 3.39 秒,系统:1.42 秒,总计:4.81 秒
挂壁时间:3分56秒

tqdm 将多处理提升到一个新的水平。它简单而强大。我会把它推荐给每一位数据科学家。 

查询 文件 了解有关多处理的更多信息。 

process_map 要求:

  1. 功能名称
  2. 数据框列
  3. 最大工人数
  4. 卡盘尺寸与批量尺寸相似。我们将使用工人数量来计算批量大小,或者您可以根据您的喜好添加数量。 
%%时间
 tqdm.contrib.并发 进口 流程图
批次 = round(len(df)/n_workers) df["描述"] = process_map( clean_text, df["描述"], max_workers=n_workers, chunksize=batch
)

输出

只需一行代码,我们就可以获得最佳结果。 

100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] CPU 时间:用户 7.32 秒,系统:1.97 秒,总计:9.29 秒
挂壁时间:3分51秒

您需要找到平衡点并选择最适合您的情况的技术。它可以是串行处理、并行或批处理。如果您使用较小、不太复杂的数据集,并行处理可能会适得其反。 

在这个迷你教程中,我们学习了各种 Python 包和技术,它们使我们能够并行处理数据函数。 

如果您仅使用表格数据集并希望提高处理性能,那么我建议您尝试 达斯克, 数据表急流 

参考文献 

 
 
阿比德·阿里·阿万 (@1abidaliawan) 是一名经过认证的数据科学家专业人士,他热爱构建机器学习模型。 目前,他专注于内容创建和撰写有关机器学习和数据科学技术的技术博客。 Abid 拥有技术管理硕士学位和电信工程学士学位。 他的愿景是使用图形神经网络为患有精神疾病的学生构建一个人工智能产品。
 

时间戳记:

更多来自 掘金队