Python에서 대용량 파일 병렬 처리

Python에서 대용량 파일 병렬 처리

소스 노드 : 1970104

Python에서 대용량 파일 병렬 처리
작성자 별 이미지
 

병렬 처리를 위해 작업을 하위 단위로 나눕니다. 프로그램에서 처리하는 작업 수를 늘리고 전체 처리 시간을 줄입니다. 

예를 들어 큰 CSV 파일로 작업 중이고 단일 열을 수정하려는 경우입니다. 데이터를 배열로 함수에 공급하고 사용 가능한 개수에 따라 한 번에 여러 값을 병렬 처리합니다.  근로자. 이러한 작업자는 프로세서 내의 코어 수를 기반으로 합니다. 
 

참고 : 더 작은 데이터 세트에서 병렬 처리를 사용해도 처리 시간이 개선되지 않습니다.

 

이 블로그에서는 다음을 사용하여 대용량 파일의 처리 시간을 줄이는 방법을 배웁니다. 멀티 프로세싱, 잡립tqdm 파이썬 패키지. 모든 파일, 데이터베이스, 이미지, 비디오 및 오디오에 적용할 수 있는 간단한 자습서입니다. 
 

참고 : 우리는 실험을 위해 Kaggle 노트북을 사용하고 있습니다. 처리 시간은 기계마다 다를 수 있습니다.  

 

우리는 미국 사고(2016 – 2021) 2.8만 개의 레코드와 47개의 열로 구성된 Kaggle의 데이터 세트입니다. 

수입합니다 multiprocessing, joblibtqdm for 병렬 처리, pandas for 데이터 수집re, nltkstring for 텍스트 처리

# 병렬 컴퓨팅
import 멀티 프로세싱 as mp
 잡립 import 병렬, 지연
 tqdm.노트북 import tqdm # 데이터 수집 
import 팬더 as pd # 텍스트 처리 
import re  nltk.corpus import 스톱 워드
import

바로 시작하기 전에 설정합시다 n_workers 두 배로 cpu_count(). 보시다시피 직원은 8명입니다.

n_workers = 2 * mp.cpu_count() print(f"{n_workers} 작업자를 사용할 수 있습니다.") >>> 8명의 노동자는 유효합니다

다음 단계에서는 다음을 사용하여 대용량 CSV 파일을 수집합니다. 팬더 read_csv 기능. 그런 다음 데이터 프레임의 모양, 열 이름 및 처리 시간을 인쇄합니다. 

참고 : Jupyter의 마법 함수 %%time 표시 할 수 있습니다 CPU 시간벽 시간 프로세스의 끝에. 

 

%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Shape:{df.shape}nn열 이름:n{df.columns}n")

산출

모양:(2845342, 47) 열 이름: Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi) ', '설명', '번호', '거리', '측면', '시', '군', '주', '우편번호', '국가', '시간대', 'Airport_Code', 'Weather_Timestamp', '온도(화씨)', '바람_냉기(화씨)', '습도(%)', '기압(in)', '시정(mi)', '풍향', '풍속(mph)', '강수량(in) )', '날씨_상황', '편의시설', '범프', '교차로', '양보', '교차로', '출구 금지', '철도', '로터리', '역', '정지', '교통_진정' , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') CPU 시간: 사용자 33.9초, 시스템: 3.93초, 총계: 37.9초 벽 시간 : 46.9초

XNUMXD덴탈의 clean_text 텍스트를 처리하고 정리하는 간단한 기능입니다. 우리는 영어를 얻을 것이다 스톱 워드 사용 nltk.copus 텍스트 줄에서 중지 단어를 필터링하는 데 사용합니다. 그런 다음 문장에서 특수 문자와 여분의 공백을 제거합니다. 처리 시간을 결정하는 기본 기능이 됩니다. 일련의, 병렬일괄 처리. 

def 깨끗한 텍스트(텍스트): # 불용어 제거 stops = stopwords.words("english") text = " ".join([단어 for
 워드 in 텍스트.분할() if 워드 지원 in 중지]) # 특수 문자 제거 텍스트 = text.translate(str.maketrans('', '', string.punctuation)) # 여분의 공백 제거 텍스트 = re.sub(' +',' ', 텍스트) return 본문

직렬 처리를 위해 pandas를 사용할 수 있습니다. .apply() 기능을 사용할 수 있지만 진행률 표시줄을 보려면 활성화해야 합니다. tqdm for 팬더 그런 다음 .progress_apply() 기능. 

2.8만 개의 레코드를 처리하고 결과를 "설명" 열 열에 다시 저장합니다. 

%%time tqdm.pandas() df['설명'] = df['설명'].progress_apply(clean_text)

산출

9분 5초가 걸렸다. 하이 엔드 프로세서에서 직렬로 2.8만 행을 처리합니다. 

100% 2845342/2845342 [09:05<00:00, 5724.25it/s] CPU 시간: 사용자 8분 14초, 시스템: 53.6초, 총계: 9분 7초 벽 시간: 9분 5초

파일을 병렬처리하는 방법은 여러가지가 있는데, 모두 알아보도록 하겠습니다. 그만큼 multiprocessing 대용량 파일을 병렬 처리하는 데 일반적으로 사용되는 내장 Python 패키지입니다. 

우리는 다중 처리를 만들 것입니다 8 노동자 그리고 지도 프로세스를 시작하는 기능. 진행률 표시줄을 표시하기 위해 다음을 사용하고 있습니다. tqdm.

지도 기능은 두 부분으로 구성됩니다. 첫 번째는 함수가 필요하고 두 번째는 인수 또는 인수 목록이 필요합니다. 

자세히 알아보기 선적 서류 비치

%%time p = mp.Pool(n_workers) df['설명'] = p.map(clean_text,tqdm(df['설명']))

산출

처리 시간이 거의 단축되었습니다. 3X. 처리 시간이 9 분 5 초3 분 51 초.   

100% 2845342/2845342 [02:58<00:00, 135646.12it/s] CPU 시간: 사용자 5.68초, 시스템: 1.56초, 총계: 7.23초 벽 시간: 3분 51초

이제 병렬 처리를 수행하는 또 다른 Python 패키지에 대해 알아봅니다. 이 섹션에서는 joblib의 평행 지연 를 복제하기 위해 지도 기능. 

  • Parallel에는 n_jobs = 8 및 backend = multiprocessing의 두 가지 인수가 필요합니다.
  • 그러면 추가하겠습니다. 깨끗한 텍스트  ~로 지연 기능. 
  • 한 번에 하나의 값을 공급하는 루프를 만듭니다. 

아래 프로세스는 매우 일반적이며 필요에 따라 함수와 배열을 수정할 수 있습니다. 문제없이 수천 개의 오디오 및 비디오 파일을 처리하는 데 사용했습니다. 

권장 사항 : 다음을 사용하여 예외 처리 추가 try:except:

def text_parallel_clean(배열): 결과 = 병렬(n_jobs=n_workers,backend="multiprocessing")( 지연(clean_text) (텍스트) for
 본문 in tqdm(배열) ) return 결과

에 "설명" 열을 추가합니다. text_parallel_clean()

%%time df['설명'] = text_parallel_clean(df['설명'])

산출

다중 처리보다 함수가 13초 더 걸렸습니다. 풀. 그렇다하더라도, 평행 보다 4분 59초 빠릅니다. 일련의 처리. 

100% 2845342/2845342 [04:03<00:00, 10514.98it/s] CPU 시간: 사용자 44.2초, 시스템: 2.92초, 총계: 47.1초 벽 시간: 4분 4초

대용량 파일을 배치로 분할하고 병렬로 처리하여 더 나은 처리 방법이 있습니다. 다음을 실행할 배치 함수를 생성하여 시작하겠습니다. clean_function 값의 단일 배치에 대해. 

일괄 처리 기능

def proc_batch(일괄): return [ clean_text(텍스트) for
 본문 in 배치 ]

파일을 배치로 분할

아래 기능은 작업자 수에 따라 파일을 여러 배치로 분할합니다. 우리의 경우 8개의 배치를 얻습니다. 

def 배치 파일(배열, n_workers): file_len = len(배열) batch_size = round(file_len / n_workers) 배치 = [ array[ix:ix+batch_size] for
 ix in tqdm(범위(0, 파일 길이, 배치 크기)) ] return 배치 배치 = 배치_파일(df['설명'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

병렬 일괄 처리 실행

마지막으로, 우리는 사용할 것입니다 평행 지연 일괄 처리합니다. 

참고 : 값의 단일 배열을 얻으려면 아래와 같이 목록 이해를 실행해야 합니다. 

 

%%time batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")( 지연(proc_batch) (배치) for
 일괄 in tqdm(배치) ) df['설명'] = [j for
 i in 배치_출력 for
 j in i]

산출

처리 시간을 개선했습니다. 이 기술은 복잡한 데이터를 처리하고 딥 러닝 모델을 교육하는 것으로 유명합니다. 

100% 8/8 [00:00<00:00, 2.19it/s] CPU 시간: 사용자 3.39초, 시스템: 1.42초, 총계: 4.81초 벽 시간: 3분 56초

tqdm은 다중 처리를 다음 단계로 끌어 올립니다. 간단하고 강력합니다. 모든 데이터 과학자에게 추천합니다. 

확인 선적 서류 비치 다중 처리에 대해 자세히 알아보세요. 

XNUMXD덴탈의 process_map 요구 사항 :

  1. 기능 명
  2. 데이터 프레임 열
  3. max_workers
  4. chucksize는 배치 크기와 유사합니다. 작업자 수를 사용하여 배치 크기를 계산하거나 선호도에 따라 숫자를 추가할 수 있습니다. 
%%시각
 tqdm.contrib.concurrent import process_map 배치 = round(len(df)/n_workers) df["설명"] = process_map( clean_text, df["설명"], max_workers=n_workers, chunksize=batch )

산출

한 줄의 코드로 최상의 결과를 얻습니다. 

100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] CPU 시간: 사용자 7.32초, 시스템: 1.97초, 총계: 9.29초 벽 시간: 3분 51초

균형을 찾고 상황에 가장 적합한 기법을 선택해야 합니다. 직렬 처리, 병렬 또는 일괄 처리가 될 수 있습니다. 더 작고 덜 복잡한 데이터 세트로 작업하는 경우 병렬 처리가 역효과를 낼 수 있습니다. 

이 미니 튜토리얼에서 우리는 데이터 함수를 병렬 처리할 수 있는 다양한 Python 패키지 및 기술에 대해 배웠습니다. 

테이블 형식의 데이터 세트로만 작업하고 있고 처리 성능을 개선하고 싶다면 다음을 시도해 보시기 바랍니다. 다 스크, 데이터 테이블여울 

참조 

 
 
아비드 알리 아완 (@1abidaliawan)은 기계 학습 모델 구축을 좋아하는 공인 데이터 과학자 전문가입니다. 현재 그는 콘텐츠 제작에 집중하고 있으며 머신 러닝 및 데이터 과학 기술에 대한 기술 블로그를 작성하고 있습니다. Abid는 기술 관리 석사 학위와 통신 공학 학사 학위를 보유하고 있습니다. 그의 비전은 정신 질환으로 고생하는 학생들을 위해 그래프 신경망을 사용하여 AI 제품을 만드는 것입니다.
 

타임 스탬프 :

더보기 너 겟츠