Parallell behandling av stor fil i Python

Parallell behandling av stor fil i Python

Kilde node: 1970104

Parallell behandling av stor fil i Python
Bilde av forfatter
 

For parallell behandling deler vi opp oppgaven vår i underenheter. Det øker antallet jobber som behandles av programmet og reduserer den totale behandlingstiden. 

For eksempel, hvis du jobber med en stor CSV-fil og du ønsker å endre en enkelt kolonne. Vi vil mate dataene som en matrise til funksjonen, og den vil parallellbehandle flere verdier samtidig basert på antall tilgjengelige  arbeidere. Disse arbeiderne er basert på antall kjerner i prosessoren din. 
 

OBS: bruk av parallell behandling på et mindre datasett vil ikke forbedre behandlingstiden.

 

I denne bloggen vil vi lære hvordan du kan redusere behandlingstiden på store filer ved hjelp av multi, jobblibog tqdm Python-pakker. Det er en enkel opplæring som kan gjelde for hvilken som helst fil, database, bilde, video og lyd. 
 

OBS: vi bruker Kaggle-notisboken for eksperimentene. Behandlingstiden kan variere fra maskin til maskin.  

 

Vi skal bruke Ulykker i USA (2016–2021) datasett fra Kaggle som består av 2.8 millioner poster og 47 kolonner. 

Vi vil importere multiprocessing, joblibog tqdm forum parallell behandling, pandas forum datainntakog re, nltkog string forum tekstbehandling

# Parallell databehandling
importere multi as mp
fra jobblib importere Parallell, forsinket
fra tqdm.notebook importere tqdm # Datainntak 
importere pandaer as pd # Tekstbehandling 
importere re fra nltk.korpus importere stoppord
importere string

Før vi hopper rett inn, la oss sette i gang n_workers ved å doble cpu_count(). Som du ser har vi 8 arbeidere.

n_workers = 2 * mp.cpu_count() print(f"{n_workers} arbeidere er tilgjengelige") >>> 8 arbeidere er tilgjengelige

I neste trinn vil vi innta store CSV-filer ved å bruke pandaer read_csv funksjon. Skriv deretter ut formen på datarammen, navnet på kolonnene og behandlingstiden. 

OBS: Jupyters magiske funksjon %%time kan vise CPU-tider og veggtid på slutten av prosessen. 

 

%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(filnavn) print(f"Shape:{df.shape}nnKolonnenavn:n{df.columns}n")

Produksjon

Shape:(2845342, 47) Kolonnenavn: Index(['ID', 'Alvorlighet', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi) ', 'Beskrivelse', 'Nummer', 'Gate', 'Side', 'By', 'Fylke', 'Stat', 'Postnummer', 'Land', 'Tidssone', 'Airport_Code', 'Vær_tidsstempel', 'Temperature(F)', 'Wind_Chill(F)', 'Fuktighet(%)', 'Pressure(in)', 'Synt(mi)', 'Vind_Direction', 'Wind_Speed(mph)', 'Nedbør(in) )', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming' , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') CPU-tider: bruker 33.9 s, sys: 3.93 s. Vegg 37.9 s, totalt: 46.9 sek. : XNUMX s

De clean_text er en enkel funksjon for å behandle og rense teksten. Vi skal få engelsk stoppord ved hjelp av nltk.copus bruk den til å filtrere ut stoppord fra tekstlinjen. Etter det vil vi fjerne spesialtegn og ekstra mellomrom fra setningen. Det vil være basisfunksjonen å bestemme behandlingstid for serie~~POS=TRUNC, paralleltog batch behandling. 

def ren_tekst(tekst): # Fjern stoppord stops = stopwords.words("engelsk") text = " ".join([ord forum ord in text.split() if ord ikke in stopper]) # Fjern spesialtegn text = text.translate(str.maketrans('', '', string.punctuation)) # fjerner de ekstra mellomrommene text = re.sub(' +',' ', tekst) retur tekst

For seriebehandling kan vi bruke pandaene .apply() funksjon, men hvis du vil se fremdriftslinjen, må du aktivere tqdm forum pandaer og bruk deretter .progress_apply() funksjon. 

Vi skal behandle de 2.8 millioner postene og lagre resultatet tilbake til kolonnen "Beskrivelse". 

%%time tqdm.pandas() df['Description'] = df['Description'].progress_apply(clean_text)

Produksjon

Det tok 9 minutter og 5 sekunder for high-end prosessor til seriell prosess 2.8 millioner rader. 

100 % 2845342/2845342 [09:05<00:00, 5724.25it/s] CPU-tider: bruker 8min 14s, sys: 53.6 s, totalt: 9min 7s Veggtid: 9min 5s

Det er ulike måter å parallellbehandle filen på, og vi skal lære om dem alle. De multiprocessing er en innebygd python-pakke som vanligvis brukes til parallell behandling av store filer. 

Vi skal lage en multiprosessering Basseng med 8 arbeidere og bruk kart funksjon for å starte prosessen. For å vise fremdriftslinjer bruker vi tqdm.

Kartfunksjonen består av to seksjoner. Den første krever funksjonen, og den andre krever et argument eller en liste med argumenter. 

Lær mer ved å lese dokumentasjon

%%time p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))

Produksjon

Vi har forbedret behandlingstiden med nesten 3X. Behandlingstiden falt fra 9 minutter 5 sekunder til 3 minutter 51 sekunder.   

100 % 2845342/2845342 [02:58<00:00, 135646.12it/s] CPU-tider: bruker 5.68 s, sys: 1.56 s, totalt: 7.23 s Veggtid: 3min 51s

Vi vil nå lære om en annen Python-pakke for å utføre parallell prosessering. I denne delen vil vi bruke joblib's Parallel og forsinket å gjenskape kart funksjon. 

  • Parallellen krever to argumenter: n_jobs = 8 og backend = multiprosessering.
  • Så legger vi til ren_tekst  til forsinket funksjon. 
  • Lag en løkke for å mate en enkelt verdi om gangen. 

Prosessen nedenfor er ganske generisk, og du kan endre funksjonen og matrisen din i henhold til dine behov. Jeg har brukt den til å behandle tusenvis av lyd- og videofiler uten problemer. 

Anbefalt: legg til unntakshåndtering ved hjelp av try: og except:

def text_parallel_clean(array): resultat = Parallell(n_jobs=n_workers,backend="multiprocessing")( delayed(clean_text) (tekst) forum tekst in tqdm(array)) retur resultere

Legg til «Beskrivelse»-kolonnen i text_parallel_clean()

%%time df['Description'] = text_parallel_clean(df['Description'])

Produksjon

Det tok vår funksjon 13 sekunder mer enn å multiprosessere Basseng. Selv da, Parallel er 4 minutter og 59 sekunder raskere enn serie~~POS=TRUNC behandling. 

100 % 2845342/2845342 [04:03<00:00, 10514.98it/s] CPU-tider: bruker 44.2 s, sys: 2.92 s, totalt: 47.1 s Veggtid: 4min 4s

Det er en bedre måte å behandle store filer ved å dele dem opp i batcher og behandle dem parallelt. La oss starte med å lage en batch-funksjon som vil kjøre en clean_function på en enkelt gruppe med verdier. 

Batchbehandlingsfunksjon

def proc_batch(parti): retur [ ren_tekst(tekst) forum tekst in parti ]

Deler opp filen i grupper

Funksjonen nedenfor deler filen i flere grupper basert på antall arbeidere. I vårt tilfelle får vi 8 partier. 

def batch_fil(array,n_workers): file_len = len(array) batch_size = round(file_len / n_workers) batcher = [ array[ix:ix+batch_size] forum ix in tqdm(range(0, file_len, batch_size)) ] retur batches batcher = batch_file(df['Description'],n_workers) >>> 100 % 8/8 [00:00<00:00, 280.01it/s]

Kjører parallell batchbehandling

Til slutt vil vi bruke Parallel og forsinket å behandle batcher. 

OBS: For å få en enkelt matrise med verdier, må vi kjøre listeforståelse som vist nedenfor. 

 

%%time batch_output = Parallell(n_jobs=n_workers,backend="multiprocessing")( delayed(proc_batch) (batch) forum batch in tqdm(batcher) ) df['Beskrivelse'] = [j forum i in batch_output forum j in i]

Produksjon

Vi har forbedret behandlingstiden. Denne teknikken er kjent for å behandle komplekse data og trene dyplæringsmodeller. 

100 % 8/8 [00:00<00:00, 2.19it/s] CPU-tider: bruker 3.39 s, sys: 1.42 s, totalt: 4.81 s Veggtid: 3min 56s

tqdm tar multiprosessering til neste nivå. Den er enkel og kraftig. Jeg vil anbefale det til enhver dataforsker. 

Sjekk ut dokumentasjon for å lære mer om multiprosessering. 

De process_map krever:

  1. Funksjonsnavn
  2. Dataramme kolonne
  3. max_arbeidere
  4. Chucksize er lik batchstørrelse. Vi vil beregne batchstørrelsen ved å bruke antall arbeidere, eller du kan legge til antallet basert på dine preferanser. 
%%tid
fra tqdm.contrib.concurrent importere process_map batch = round(len(df)/n_workers) df["Description"] = process_map( clean_text, df["Description"], max_workers=n_workers, chunksize=batch )

Produksjon

Med en enkelt kodelinje får vi det beste resultatet. 

100 % 2845342/2845342 [03:48<00:00, 1426320.93it/s] CPU-tider: bruker 7.32 s, sys: 1.97 s, totalt: 9.29 s Veggtid: 3min 51s

Du må finne en balanse og velge den teknikken som fungerer best for ditt tilfelle. Det kan være seriebehandling, parallell- eller batchbehandling. Den parallelle behandlingen kan slå tilbake hvis du arbeider med et mindre, mindre komplekst datasett. 

I denne miniveiledningen har vi lært om ulike Python-pakker og teknikker som lar oss parallellbehandle datafunksjonene våre. 

Hvis du bare jobber med et tabelldatasett og ønsker å forbedre behandlingsytelsen din, vil jeg foreslå at du prøver CBE, data bordog RAPIDS 

Referanse 

 
 
Abid Ali Awan (@1abidaliawan) er en sertifisert dataforsker som elsker å bygge maskinlæringsmodeller. For tiden fokuserer han på innholdsskaping og skriver tekniske blogger om maskinlæring og datavitenskapsteknologier. Abid har en mastergrad i teknologiledelse og en bachelorgrad i telekommunikasjonsteknikk. Hans visjon er å bygge et AI-produkt ved å bruke et grafisk nevralt nettverk for studenter som sliter med psykiske lidelser.
 

Tidstempel:

Mer fra KDnuggets