Parallel behandling af stor fil i Python

Parallel behandling af stor fil i Python

Kildeknude: 1970104

Parallel behandling af stor fil i Python
Billede af forfatter
 

Ved parallel bearbejdning opdeler vi vores opgave i underenheder. Det øger antallet af job, der behandles af programmet, og reducerer den samlede behandlingstid. 

For eksempel, hvis du arbejder med en stor CSV-fil, og du vil ændre en enkelt kolonne. Vi vil føre dataene som en matrix til funktionen, og den vil parallelbehandle flere værdier på én gang baseret på antallet af tilgængelige  arbejdere. Disse arbejdere er baseret på antallet af kerner i din processor. 
 

Bemærk: Brug af parallel behandling på et mindre datasæt vil ikke forbedre behandlingstiden.

 

I denne blog vil vi lære, hvordan man reducerer behandlingstiden på store filer ved hjælp af multibearbejdning, joblibog tqdm Python-pakker. Det er en simpel tutorial, der kan gælde for enhver fil, database, billede, video og lyd. 
 

Bemærk: vi bruger Kaggle-notesbogen til eksperimenterne. Behandlingstiden kan variere fra maskine til maskine.  

 

Vi vil bruge Ulykker i USA (2016 – 2021) datasæt fra Kaggle som består af 2.8 millioner poster og 47 kolonner. 

Vi importerer multiprocessing, joblibog tqdm forum parallel bearbejdning, pandas forum dataindtagelserog re, nltkog string forum tekstbehandling

# Parallel computing
importere multibearbejdning as mp
fra joblib importere Parallel, forsinket
fra tqdm.notesbog importere tqdm # Dataindtagelse 
importere pandaer as pd # Tekstbehandling 
importere re fra nltk.korpus importere stopord
importere streng

Inden vi springer lige ind, lad os sætte i gang n_workers ved at fordoble cpu_count(). Som du kan se, har vi 8 medarbejdere.

n_workers = 2 * mp.cpu_count() print(f"{n_workers} arbejdere er tilgængelige") >>> 8 arbejdere er til rådighed

I det næste trin indtager vi store CSV-filer ved hjælp af pandaer read_csv fungere. Udskriv derefter formen på datarammen, navnet på kolonnerne og behandlingstiden. 

Bemærk: Jupyters magiske funktion %%time kan vise CPU-tider , væg tid i slutningen af ​​processen. 

 

%%tid
file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
df = pd.read_csv(filnavn) print(f"Shape:{df.shape}nnKolonnenavne:n{df.columns}n")

Produktion

Shape:(2845342, 47) Kolonnenavne: Indeks(['ID', 'Alvorlighed', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi) ', 'Beskrivelse', 'Nummer', 'Gade', 'Side', 'By', 'Amt', 'Stat', 'Postnummer', 'Land', 'Tidszone', 'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)', 'Fugtighed(%)', 'Pressure(in)', 'Sigibility(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 'Nedbør(in) )', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming' , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'],
dtype='object') CPU-tider: bruger 33.9 s, sys: 3.93 s, i alt: 37.9 s
Vægtid: 46.9 s

clean_text er en ligetil funktion til at behandle og rense teksten. Vi får engelsk stopord ved brug af nltk.copus brug den til at filtrere stopord fra tekstlinjen. Derefter fjerner vi specialtegn og ekstra mellemrum fra sætningen. Det vil være basisfunktionen at bestemme behandlingstiden for seriel, parallelog parti forarbejdning. 

def ren_tekst(tekst): # Fjern stopord stops = stopwords.words("engelsk") text = " ".join([ord forum ord in text.split() if ord ikke in stopper]) # Fjern specialtegn text = text.translate(str.maketrans('', '', string.punctuation)) # fjerner de ekstra mellemrum text = re.sub(' +',' ', tekst) afkast tekst

Til seriel behandling kan vi bruge pandaerne .apply() funktion, men hvis du vil se statuslinjen, skal du aktivere tqdm forum pandaer og brug derefter .progress_apply() funktion. 

Vi skal behandle de 2.8 millioner poster og gemme resultatet tilbage i kolonnen "Beskrivelse". 

%%tid
tqdm.pandas() df['Description'] = df['Description'].progress_apply(clean_text)

Produktion

Det tog 9 minutter og 5 sekunder for high-end processor til seriel proces 2.8 millioner rækker. 

100% 2845342/2845342 [09:05<00:00, 5724.25it/s] CPU-tider: bruger 8min 14s, sys: 53.6 s, i alt: 9min 7s
Vægtid: 9min 5s

Der er forskellige måder at parallelbehandle filen på, og vi skal lære om dem alle. Det multiprocessing er en indbygget python-pakke, der almindeligvis bruges til parallel behandling af store filer. 

Vi vil skabe en multiprocessing pool med arbejdstagere 8 og brug kort funktion til at starte processen. For at vise statuslinjer bruger vi tqdm.

Kortfunktionen består af to sektioner. Den første kræver funktionen, og den anden kræver et argument eller en liste over argumenter. 

Lær mere ved at læse dokumentation

%%tid
p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))

Produktion

Vi har forbedret vores behandlingstid med næsten 3X. Behandlingstiden faldt fra 9 minutter 5 sekunder til 3 minutter 51 sekunder.   

100% 2845342/2845342 [02:58<00:00, 135646.12it/s] CPU-tider: bruger 5.68 s, sys: 1.56 s, i alt: 7.23 s
Vægtid: 3min 51s

Vi vil nu lære om en anden Python-pakke til at udføre parallel behandling. I dette afsnit vil vi bruge joblib's Parallel , forsinket at replikere kort funktion. 

  • Parallellen kræver to argumenter: n_jobs = 8 og backend = multiprocessing.
  • Så vil vi tilføje ren_tekst  til forsinket funktion. 
  • Opret en løkke for at fremføre en enkelt værdi ad gangen. 

Processen nedenfor er ret generisk, og du kan ændre din funktion og array efter dine behov. Jeg har brugt det til at behandle tusindvis af lyd- og videofiler uden problemer. 

Anbefalet: tilføje undtagelseshåndtering vha try: , except:

def tekst_parallel_ren(array): resultat = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(clean_text) (tekst) forum tekst in tqdm(array)) afkast resultere

Tilføj kolonnen "Beskrivelse" til text_parallel_clean()

%%tid
df['Description'] = text_parallel_clean(df['Description'])

Produktion

Det tog vores funktion 13 sekunder mere end at multibehandle Pool. Selv da, Parallel er 4 minutter og 59 sekunder hurtigere end seriel forarbejdning. 

100% 2845342/2845342 [04:03<00:00, 10514.98it/s] CPU-tider: bruger 44.2 s, sys: 2.92 s, i alt: 47.1 s
Vægtid: 4min 4s

Der er en bedre måde at behandle store filer på ved at opdele dem i batches og behandle dem parallelt. Lad os starte med at oprette en batch-funktion, der kører en clean_function på et enkelt parti værdier. 

Batchbehandlingsfunktion

def proc_batch(parti): afkast [ ren_tekst(tekst) forum tekst in parti ]

Opdeling af filen i batches

Funktionen nedenfor vil opdele filen i flere batches baseret på antallet af arbejdere. I vores tilfælde får vi 8 partier. 

def batch_fil(array,n_workers): file_len = len(array) batch_size = round(file_len / n_workers) batches = [ array[ix:ix+batch_size] forum ix in tqdm(range(0, file_len, batch_size)) ] afkast batches batches = batch_file(df['Description'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

Kører parallel batchbehandling

Til sidst vil vi bruge Parallel , forsinket at behandle batches. 

Bemærk: For at få et enkelt array af værdier skal vi køre listeforståelse som vist nedenfor. 

 

%%tid
batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(proc_batch) (batch) forum parti in tqdm(batches) ) df['Beskrivelse'] = [j forum i in batch_output forum j in i]

Produktion

Vi har forbedret behandlingstiden. Denne teknik er berømt for at behandle komplekse data og træne deep learning-modeller. 

100% 8/8 [00:00<00:00, 2.19it/s] CPU-tider: bruger 3.39 s, sys: 1.42 s, i alt: 4.81 s
Vægtid: 3min 56s

tqdm tager multiprocessing til det næste niveau. Den er enkel og kraftfuld. Jeg vil anbefale det til enhver dataforsker. 

Tjek den dokumentation for at lære mere om multiprocessing. 

process_map kræver:

  1. Funktionsnavn
  2. Dataramme kolonne
  3. max_arbejdere
  4. chucksize svarer til batchstørrelse. Vi beregner batchstørrelsen ud fra antallet af arbejdere, eller du kan tilføje antallet baseret på dine præferencer. 
%%tid
fra tqdm.contrib.concurrent importere proceskort
batch = rund(len(df)/n_arbejdere) df["Beskrivelse"] = proceskort( ren_tekst, df["Beskrivelse"], max_arbejdere=n_arbejdere, chunksize=batch
)

Produktion

Med en enkelt kodelinje får vi det bedste resultat. 

100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] CPU-tider: bruger 7.32 s, sys: 1.97 s, i alt: 9.29 s
Vægtid: 3min 51s

Du skal finde en balance og vælge den teknik, der passer bedst til din sag. Det kan være seriel behandling, parallel eller batchbehandling. Den parallelle behandling kan give bagslag, hvis du arbejder med et mindre, mindre komplekst datasæt. 

I denne mini-tutorial har vi lært om forskellige Python-pakker og -teknikker, der giver os mulighed for at parallelbehandle vores datafunktioner. 

Hvis du kun arbejder med et tabeldatasæt og ønsker at forbedre din behandlingsydelse, så vil jeg foreslå, at du prøver Dashboard, datatabelog HURTIGE 

Henvisning 

 
 
Abid Ali Awan (@1abidaliawan) er en certificeret dataforsker, der elsker at bygge maskinlæringsmodeller. I øjeblikket fokuserer han på indholdsskabelse og skriver tekniske blogs om maskinlæring og datavidenskabsteknologier. Abid har en kandidatgrad i teknologiledelse og en bachelorgrad i telekommunikationsingeniør. Hans vision er at bygge et AI-produkt ved hjælp af et grafisk neuralt netværk til studerende, der kæmper med psykisk sygdom.
 

Tidsstempel:

Mere fra KDnuggets