Parallele Verarbeitung großer Dateien in Python

Parallele Verarbeitung großer Dateien in Python

Quellknoten: 1970104

Parallele Verarbeitung großer Dateien in Python
Bild vom Autor
 

Für die Parallelverarbeitung unterteilen wir unsere Aufgabe in Untereinheiten. Es erhöht die Anzahl der vom Programm verarbeiteten Jobs und verkürzt die Gesamtverarbeitungszeit. 

Wenn Sie beispielsweise mit einer großen CSV-Datei arbeiten und eine einzelne Spalte ändern möchten. Wir geben die Daten als Array an die Funktion weiter und sie verarbeitet parallel mehrere Werte gleichzeitig, basierend auf der Anzahl der verfügbaren Werte  Arbeiter. Diese Worker basieren auf der Anzahl der Kerne in Ihrem Prozessor. 
 

Hinweis: Die Verwendung der Parallelverarbeitung für einen kleineren Datensatz führt nicht zu einer Verbesserung der Verarbeitungszeit.

 

In diesem Blog erfahren Sie, wie Sie die Verarbeitungszeit großer Dateien reduzieren können Mehrfachverarbeitung, Joblib und tqdm Python-Pakete. Es handelt sich um ein einfaches Tutorial, das auf jede Datei, Datenbank, jedes Bild, Video und Audio angewendet werden kann. 
 

Hinweis: Für die Experimente verwenden wir das Kaggle-Notizbuch. Die Bearbeitungszeit kann von Maschine zu Maschine variieren.  

 

Wir werden das benutzen US-Unfälle (2016 – 2021) Datensatz von Kaggle, der aus 2.8 Millionen Datensätzen und 47 Spalten besteht. 

Wir werden importieren multiprocessing, joblib und tqdm für parallele Verarbeitung, pandas für Datenaufnahmen und re, nltk und string für Textverarbeitung

# Paralleles Rechnen
importieren Mehrfachverarbeitung as mp
für Joblib importieren Parallel, verzögert
für tqdm.notebook importieren tqdm # Datenaufnahme 
importieren Pandas as pd # Textverarbeitung 
importieren re für nltk.corpus importieren Stoppwörter
importieren Schnur

Bevor wir gleich loslegen, legen wir los n_workers durch Verdoppelung cpu_count(). Wie Sie sehen, haben wir 8 Mitarbeiter.

n_workers = 2 * mp.cpu_count() print(f"{n_workers} Arbeiter sind verfügbar") >>> 8 Arbeitskräfte stehen zur Verfügung

Im nächsten Schritt werden wir große CSV-Dateien mit einlesen Pandas read_csv Funktion. Drucken Sie dann die Form des Datenrahmens, die Namen der Spalten und die Verarbeitungszeit aus. 

Hinweis: Jupyters magische Funktion %%time können angezeigt werden CPU-Zeiten und Wandzeit am Ende des Prozesses. 

 

%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Shape:{df.shape}nnColumn Names:n{df.columns}n")

Output

Shape:(2845342, 47) Column Names: Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi) ', 'Beschreibung', 'Nummer', 'Straße', 'Seite', 'Stadt', 'Landkreis', 'Staat', 'Postleitzahl', 'Land', 'Zeitzone', 'Flughafencode', 'Wetter_Zeitstempel', 'Temperature(F)', 'Wind_Chill(F)', 'Humidity(%)', 'Pressure(in)', 'Sichtweite(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 'Precipitation(in )“, „Weather_Condition“, „Amenity“, „Bump“, „Crossing“, „Give_Way“, „Junction“, „No_Exit“, „Railway“, „Roundabout“, „Station“, „Stop“, „Traffic_Calming“ , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') CPU-Zeiten: Benutzer 33.9 s, System: 3.93 s, Gesamt: 37.9 s Wandzeit : 46.9 Sek

Das clean_text ist eine einfache Funktion zum Bearbeiten und Bereinigen des Textes. Wir werden Englisch lernen Stoppwörter Verwendung von nltk.copus Verwenden Sie es, um Stoppwörter aus der Textzeile herauszufiltern. Danach entfernen wir Sonderzeichen und zusätzliche Leerzeichen aus dem Satz. Dies ist die Basisfunktion zur Bestimmung der Verarbeitungszeit seriell, Parallel und Portion Verarbeitung. 

def sauberer_text(Text): # Stoppwörter entfernen stops = stopwords.words("english") text = " ".join([word für Wort in text.split() if Wort nicht in stoppt]) # Sonderzeichen entfernen text = text.translate(str.maketrans('', '', string.punctuation)) # Entfernen der zusätzlichen Leerzeichen text = re.sub(' +',' ', text) Rückkehr Text

Für die serielle Verarbeitung können wir die Pandas verwenden .apply() Funktion, aber wenn Sie den Fortschrittsbalken sehen möchten, müssen Sie ihn aktivieren tqdm für Pandas und dann verwenden Sie die .progress_apply() Funktion. 

Wir werden die 2.8 Millionen Datensätze verarbeiten und das Ergebnis wieder in der Spalte „Beschreibung“ speichern. 

%%time tqdm.pandas() df['Beschreibung'] = df['Beschreibung'].progress_apply(clean_text)

Output

Es dauerte 9 Minuten und 5 Sekunden High-End Prozessor zur seriellen Verarbeitung von 2.8 Millionen Zeilen. 

100% 2845342/2845342 [09:05<00:00, 5724.25it/s] CPU-Zeiten: Benutzer 8min 14s, System: 53.6 s, Gesamt: 9min 7s Wandzeit: 9min 5s

Es gibt verschiedene Möglichkeiten, die Datei parallel zu verarbeiten, und wir werden sie alle kennenlernen. Der multiprocessing ist ein integriertes Python-Paket, das häufig für die parallele Verarbeitung großer Dateien verwendet wird. 

Wir werden eine Multiverarbeitung erstellen Pool mit 8 Arbeiter und benutze die Karte Funktion, um den Prozess einzuleiten. Um Fortschrittsbalken anzuzeigen, verwenden wir tqdm.

Die Kartenfunktion besteht aus zwei Abschnitten. Die erste erfordert die Funktion und die zweite erfordert ein Argument oder eine Liste von Argumenten. 

Erfahren Sie mehr durch Lesen Dokumentation

%%time p = mp.Pool(n_workers) df['Beschreibung'] = p.map(clean_text,tqdm(df['Beschreibung']))

Output

Wir haben unsere Bearbeitungszeit um fast verkürzt 3X. Die Bearbeitungszeit sank ab 9 Minuten 5 Sekunden zu 3 Minuten 51 Sekunden.   

100% 2845342/2845342 [02:58<00:00, 135646.12it/s] CPU-Zeiten: Benutzer 5.68 s, System: 1.56 s, Gesamt: 7.23 s Wandzeit: 3min 51s

Wir lernen nun ein weiteres Python-Paket zur Durchführung der Parallelverarbeitung kennen. In diesem Abschnitt verwenden wir Joblibs Parallel und verzögert um die zu replizieren Karte Funktion. 

  • Die Parallele erfordert zwei Argumente: n_jobs = 8 und backend = multiprocessing.
  • Dann werden wir hinzufügen sauberer_text  zu den verzögert Funktion. 
  • Erstellen Sie eine Schleife, um jeweils einen einzelnen Wert einzugeben. 

Der folgende Prozess ist recht allgemein gehalten und Sie können Ihre Funktion und Ihr Array entsprechend Ihren Anforderungen ändern. Ich habe damit Tausende von Audio- und Videodateien problemlos verarbeitet. 

Empfohlen: Ausnahmebehandlung hinzufügen mit try: und except:

def text_parallel_clean(array): result = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(clean_text) (text) für Text in tqdm(array) ) Rückkehr Folge

Fügen Sie die Spalte „Beschreibung“ hinzu text_parallel_clean()

%%time df['Beschreibung'] = text_parallel_clean(df['Beschreibung'])

Output

Unsere Funktion benötigte 13 Sekunden mehr als die Multiverarbeitung Schwimmbad. Sogar dann, Parallel ist 4 Minuten und 59 Sekunden schneller als seriell Verarbeitung. 

100% 2845342/2845342 [04:03<00:00, 10514.98it/s] CPU-Zeiten: Benutzer 44.2 s, System: 2.92 s, Gesamt: 47.1 s Wandzeit: 4min 4s

Es gibt eine bessere Möglichkeit, große Dateien zu verarbeiten, indem man sie in Stapel aufteilt und parallel verarbeitet. Beginnen wir mit der Erstellung einer Batch-Funktion, die a ausführt clean_function auf einem einzelnen Wertestapel. 

Stapelverarbeitungsfunktion

def proc_batch(Charge): Rückkehr [ clean_text(text) für Text in Charge ]

Aufteilen der Datei in Stapel

Die folgende Funktion teilt die Datei basierend auf der Anzahl der Arbeiter in mehrere Stapel auf. In unserem Fall erhalten wir 8 Chargen. 

def Batch_Datei(array,n_workers): file_len = len(array) batch_size = Round(file_len / n_workers)batches = [ array[ix:ix+batch_size] für ix in tqdm(range(0, file_len, batch_size)) ] Rückkehr batches batches = batch_file(df['Description'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

Ausführen einer parallelen Stapelverarbeitung

Schließlich werden wir verwenden Parallel und verzögert um Chargen zu verarbeiten. 

Hinweis: Um ein einzelnes Array von Werten zu erhalten, müssen wir das Listenverständnis wie unten gezeigt ausführen. 

 

%%time batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(proc_batch) (Batch) für Portion in tqdm(Batches) ) df['Beschreibung'] = [j für i in Batch_Ausgabe für j in i]

Output

Wir haben die Bearbeitungszeit verbessert. Diese Technik ist für die Verarbeitung komplexer Daten und das Training von Deep-Learning-Modellen bekannt. 

100% 8/8 [00:00<00:00, 2.19it/s] CPU-Zeiten: Benutzer 3.39 s, System: 1.42 s, Gesamt: 4.81 s Wandzeit: 3min 56s

tqdm bringt Multiprocessing auf die nächste Ebene. Es ist einfach und kraftvoll. Ich werde es jedem Datenwissenschaftler empfehlen. 

Probieren Sie auch die Dokumentation um mehr über Multiprocessing zu erfahren. 

Das process_map erfordert:

  1. Funktionsname
  2. Datenrahmenspalte
  3. max_workers
  4. chucksize ist ähnlich wie Batch-Größe. Wir berechnen die Chargengröße anhand der Anzahl der Mitarbeiter oder Sie können die Anzahl nach Ihren Wünschen hinzufügen. 
%%Zeit
für tqdm.contrib.concurrent importieren process_map batch = round(len(df)/n_workers) df["Description"] = process_map( clean_text, df["Description"], max_workers=n_workers, chunksize=batch )

Output

Mit einer einzigen Codezeile erzielen wir das beste Ergebnis. 

100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] CPU-Zeiten: Benutzer 7.32 s, System: 1.97 s, Gesamt: 9.29 s Wandzeit: 3min 51s

Sie müssen ein Gleichgewicht finden und die Technik auswählen, die für Ihren Fall am besten geeignet ist. Dabei kann es sich um serielle, parallele oder Stapelverarbeitung handeln. Die Parallelverarbeitung kann nach hinten losgehen, wenn Sie mit einem kleineren, weniger komplexen Datensatz arbeiten. 

In diesem Mini-Tutorial haben wir verschiedene Python-Pakete und -Techniken kennengelernt, die es uns ermöglichen, unsere Datenfunktionen parallel zu verarbeiten. 

Wenn Sie nur mit einem tabellarischen Datensatz arbeiten und Ihre Verarbeitungsleistung verbessern möchten, empfehle ich Ihnen, es zu versuchen Instrumententafel, Datentabelle und SCHNELLE 

Referenz 

 
 
Abid Ali Awan (@1abidaliawan) ist ein zertifizierter Datenwissenschaftler, der es liebt, Modelle für maschinelles Lernen zu erstellen. Derzeit konzentriert er sich auf die Erstellung von Inhalten und schreibt technische Blogs zu maschinellem Lernen und Data-Science-Technologien. Abid hat einen Master-Abschluss in Technologiemanagement und einen Bachelor-Abschluss in Telekommunikationstechnik. Seine Vision ist es, ein KI-Produkt mit einem grafisch-neuronalen Netzwerk für Schüler zu entwickeln, die mit psychischen Erkrankungen zu kämpfen haben.
 

Zeitstempel:

Mehr von KDnuggets