Parallell bearbetning av stor fil i Python

Parallell bearbetning av stor fil i Python

Källnod: 1970104

Parallell bearbetning av stor fil i Python
Bild av författare
 

För parallell bearbetning delar vi upp vår uppgift i underenheter. Det ökar antalet jobb som bearbetas av programmet och minskar den totala handläggningstiden. 

Till exempel, om du arbetar med en stor CSV-fil och du vill ändra en enda kolumn. Vi kommer att mata data som en array till funktionen, och den kommer att parallellbehandla flera värden samtidigt baserat på antalet tillgängliga  arbetare. Dessa arbetare är baserade på antalet kärnor i din processor. 
 

Notera: att använda parallell bearbetning på en mindre datauppsättning kommer inte att förbättra bearbetningstiden.

 

I den här bloggen kommer vi att lära oss hur man minskar bearbetningstiden på stora filer med hjälp av multibehandlings, jobbliboch tqdm Python-paket. Det är en enkel handledning som kan tillämpas på vilken fil, databas, bild, video och ljud som helst. 
 

Notera: vi använder Kaggle-anteckningsboken för experimenten. Bearbetningstiden kan variera från maskin till maskin.  

 

Vi kommer att använda Olyckor i USA (2016–2021) dataset från Kaggle som består av 2.8 miljoner poster och 47 kolumner. 

Vi importerar multiprocessing, jobliboch tqdm för parallell behandling, pandas för dataintagoch re, nltkoch string för textbehandling

# Parallell beräkning
importera multibehandlings as mp
från jobblib importera Parallellt, försenat
från tqdm.notebook importera tqdm # Dataintag 
importera pandor as pd # Textbearbetning 
importera re från nltk.corpus importera stoppord
importera sträng

Innan vi hoppar in direkt, låt oss sätta igång n_workers genom att fördubbla cpu_count(). Som ni ser har vi 8 anställda.

n_workers = 2 * mp.cpu_count() print(f"{n_workers} arbetare är tillgängliga") >>> 8 arbetare finns tillgängliga

I nästa steg kommer vi att mata in stora CSV-filer med hjälp av pandor read_csv fungera. Skriv sedan ut formen på dataramen, namnet på kolumnerna och bearbetningstiden. 

Notera: Jupyters magiska funktion %%time kan visa CPU-tider och väggtid i slutet av processen. 

 

%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Shape:{df.shape}nnKolumnnamn:n{df.columns}n")

Produktion

Shape:(2845342, 47) Kolumnnamn: Index(['ID', 'Allvarlighet', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi) ', 'Beskrivning', 'Nummer', 'Street', 'Side', 'Stad', 'County', 'State', 'Postnummer', 'Land', 'Tidszon', 'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)', 'Fuktighet(%)', 'Tryck(in)', 'Synt(mi)', 'Vindriktning', 'Vindhastighet(mph)', 'Nerbörd(inom) )', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming' , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') CPU-tider: användare 33.9 s, sys: 3.93 s Vägg37.9 gånger, totalt: 46.9 s VäggXNUMX. : XNUMX s

Smakämnen clean_text är en enkel funktion för att bearbeta och rensa texten. Vi kommer att få engelska stoppord med hjälp av nltk.copus använd den för att filtrera bort stoppord från textraden. Efter det kommer vi att ta bort specialtecken och extra mellanslag från meningen. Det kommer att vara baslinjefunktionen att bestämma handläggningstid för seriell, parallelloch sats bearbetning. 

def ren_text(text): # Ta bort stoppord stops = stopwords.words("engelska") text = " ".join([ord för ord in text.split() if ord inte in stannar]) # Ta bort specialtecken text = text.translate(str.maketrans('', '', string.interpunctuation)) # tar bort de extra utrymmena text = re.sub(' +',' ', text) avkastning text

För seriell bearbetning kan vi använda pandorna .apply() funktion, men om du vill se förloppsindikatorn måste du aktivera tqdm för pandor och använd sedan .progress_apply() funktion. 

Vi kommer att behandla de 2.8 miljoner posterna och spara resultatet tillbaka till kolumnen "Beskrivning". 

%%time tqdm.pandas() df['Description'] = df['Description'].progress_apply(clean_text)

Produktion

Det tog 9 minuter och 5 sekunder för high-end processor till seriell process 2.8 miljoner rader. 

100% 2845342/2845342 [09:05<00:00, 5724.25it/s] CPU-tider: användare 8min 14s, sys: 53.6 s, totalt: 9min 7s Väggtid: 9min 5s

Det finns olika sätt att parallellbearbeta filen, och vi kommer att lära oss om dem alla. De multiprocessing är ett inbyggt python-paket som vanligtvis används för parallell bearbetning av stora filer. 

Vi kommer att skapa en multiprocessing Pool med 8 arbetare och använd karta funktion för att initiera processen. För att visa förloppsindikatorer använder vi tqdm.

Kartfunktionen består av två sektioner. Den första kräver funktionen och den andra kräver ett argument eller en lista med argument. 

Lär dig mer genom att läsa dokumentation

%%time p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))

Produktion

Vi har förbättrat vår handläggningstid med nästan 3X. Handläggningstiden sjönk från 9 minuter 5 sekunder till 3 minuter 51 sekunder.   

100% 2845342/2845342 [02:58<00:00, 135646.12it/s] CPU-tider: användare 5.68 s, sys: 1.56 s, totalt: 7.23 s Väggtid: 3min 51s

Vi kommer nu att lära oss om ett annat Python-paket för att utföra parallell bearbetning. I det här avsnittet kommer vi att använda joblib's Parallell och försenad att replikera karta funktion. 

  • Parallellen kräver två argument: n_jobs = 8 och backend = multiprocessing.
  • Sedan lägger vi till ren_text  till försenad funktion. 
  • Skapa en loop för att mata ett enda värde åt gången. 

Processen nedan är ganska generisk, och du kan ändra din funktion och array enligt dina behov. Jag har använt den för att bearbeta tusentals ljud- och videofiler utan problem. 

Rekommenderas: lägg till undantagshantering med hjälp av try: och except:

def text_parallel_clean(array): resultat = Parallell(n_jobs=n_workers,backend="multiprocessing")( delayed(clean_text) (text) för text in tqdm(array)) avkastning resultera

Lägg till kolumnen "Beskrivning" i text_parallel_clean()

%%time df['Description'] = text_parallel_clean(df['Description'])

Produktion

Det tog vår funktion 13 sekunder mer än att multibearbeta Slå samman. Även då, Parallell är 4 minuter och 59 sekunder snabbare än seriell bearbetning. 

100% 2845342/2845342 [04:03<00:00, 10514.98it/s] CPU-tider: användare 44.2 s, sys: 2.92 s, totalt: 47.1 s Väggtid: 4min 4s

Det finns ett bättre sätt att bearbeta stora filer genom att dela upp dem i batcher och bearbeta dem parallellt. Låt oss börja med att skapa en batchfunktion som kör en clean_function på en enda grupp värden. 

Batchbearbetningsfunktion

def proc_batch(omgång): avkastning [ ren_text(text) för text in omgång ]

Dela upp filen i batcher

Funktionen nedan kommer att dela upp filen i flera partier baserat på antalet arbetare. I vårt fall får vi 8 partier. 

def kommandofil(array,n_workers): file_len = len(array) batch_size = round(file_len / n_workers) batches = [ array[ix:ix+batch_size] för ix in tqdm(range(0, file_len, batch_size)) ] avkastning batches batches = batch_file(df['Description'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

Kör parallell batchbearbetning

Slutligen kommer vi att använda Parallell och försenad att bearbeta partier. 

Notera: För att få en enda uppsättning värden måste vi köra listförståelse som visas nedan. 

 

%%time batch_output = Parallell(n_jobs=n_workers,backend="multiprocessing")( delayed(proc_batch) (batch) för sats in tqdm(batches) ) df['Beskrivning'] = [j för i in batch_output för j in i]

Produktion

Vi har förbättrat handläggningstiden. Denna teknik är känd för att bearbeta komplexa data och träna djupinlärningsmodeller. 

100% 8/8 [00:00<00:00, 2.19it/s] CPU-tider: användare 3.39 s, sys: 1.42 s, totalt: 4.81 s Väggtid: 3min 56s

tqdm tar multiprocessing till nästa nivå. Det är enkelt och kraftfullt. Jag kommer att rekommendera det till alla dataforskare. 

Kolla in dokumentation för att lära dig mer om multiprocessing. 

Smakämnen process_map kräver:

  1. Funktionsnamn
  2. Dataframe kolumn
  3. max_arbetare
  4. chuckstorlek liknar batchstorlek. Vi kommer att beräkna batchstorleken med hjälp av antalet arbetare eller så kan du lägga till antalet baserat på dina önskemål. 
%%tid
från tqdm.contrib.concurrent importera process_map batch = round(len(df)/n_workers) df["Description"] = process_map( clean_text, df["Description")], max_workers=n_workers, chunksize=batch )

Produktion

Med en enda kodrad får vi det bästa resultatet. 

100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] CPU-tider: användare 7.32 s, sys: 1.97 s, totalt: 9.29 s Väggtid: 3min 51s

Du måste hitta en balans och välja den teknik som fungerar bäst för ditt fall. Det kan vara seriell bearbetning, parallell bearbetning eller batchbearbetning. Den parallella bearbetningen kan slå tillbaka om du arbetar med en mindre, mindre komplex datauppsättning. 

I denna minihandledning har vi lärt oss om olika Python-paket och tekniker som gör att vi kan parallellbearbeta våra datafunktioner. 

Om du bara arbetar med en tabelluppsättning och vill förbättra din bearbetningsprestanda, så föreslår jag att du försöker dask, datatabelloch FORS 

Hänvisning 

 
 
Abid Ali Awan (@1abidaliawan) är en certifierad datavetare som älskar att bygga modeller för maskininlärning. För närvarande fokuserar han på att skapa innehåll och skriva tekniska bloggar om maskininlärning och datavetenskap. Abid har en magisterexamen i Technology Management och en kandidatexamen i telekommunikationsteknik. Hans vision är att bygga en AI-produkt med hjälp av ett grafiskt neuralt nätverk för studenter som kämpar med psykisk ohälsa.
 

Tidsstämpel:

Mer från KDnuggets