Παράλληλη επεξεργασία μεγάλου αρχείου σε Python

Παράλληλη επεξεργασία μεγάλου αρχείου σε Python

Κόμβος πηγής: 1970104

Παράλληλη επεξεργασία μεγάλου αρχείου σε Python
Εικόνα από συγγραφέα
 

Για παράλληλη επεξεργασία, χωρίζουμε την εργασία μας σε υπομονάδες. Αυξάνει τον αριθμό των εργασιών που υποβάλλονται σε επεξεργασία από το πρόγραμμα και μειώνει τον συνολικό χρόνο επεξεργασίας. 

Για παράδειγμα, εάν εργάζεστε με ένα μεγάλο αρχείο CSV και θέλετε να τροποποιήσετε μια στήλη. Θα τροφοδοτήσουμε τα δεδομένα ως πίνακα στη συνάρτηση και θα επεξεργαστεί παράλληλα πολλαπλές τιμές ταυτόχρονα με βάση τον αριθμό των διαθέσιμων  εργαζομένων. Αυτοί οι εργαζόμενοι βασίζονται στον αριθμό των πυρήνων στον επεξεργαστή σας. 
 

Σημείωση: Η χρήση παράλληλης επεξεργασίας σε μικρότερο σύνολο δεδομένων δεν θα βελτιώσει τον χρόνο επεξεργασίας.

 

Σε αυτό το ιστολόγιο, θα μάθουμε πώς να μειώνουμε τον χρόνο επεξεργασίας μεγάλων αρχείων χρησιμοποιώντας πολυεπεξεργασία, δουλειά, να tqdm Πακέτα Python. Είναι ένα απλό σεμινάριο που μπορεί να εφαρμοστεί σε οποιοδήποτε αρχείο, βάση δεδομένων, εικόνα, βίντεο και ήχο. 
 

Σημείωση: χρησιμοποιούμε το σημειωματάριο Kaggle για τα πειράματα. Ο χρόνος επεξεργασίας μπορεί να διαφέρει από μηχανή σε μηχανή.  

 

Θα χρησιμοποιήσουμε το Ατυχήματα ΗΠΑ (2016 – 2021) σύνολο δεδομένων από το Kaggle που αποτελείται από 2.8 εκατομμύρια εγγραφές και 47 στήλες. 

Θα εισαγάγουμε multiprocessing, joblib, να tqdm for παράλληλη επεξεργασία, pandas for απορρόφηση δεδομένων, να re, nltk, να string for επεξεργασία κειμένου

# Παράλληλος Υπολογισμός
εισαγωγή πολυεπεξεργασία as mp
από δουλειά εισαγωγή Παράλληλη, καθυστερημένη
από tqdm.τετράδιο εισαγωγή tqdm # Απορρόφηση δεδομένων 
εισαγωγή Πάντα as pd # Επεξεργασία κειμένου 
εισαγωγή re από nltk.corpus εισαγωγή λέξεις-κλειδιά
εισαγωγή κορδόνι

Πριν πηδήξουμε αμέσως, ας ρυθμίσουμε n_workers με διπλασιασμό cpu_count(). Όπως μπορείτε να δείτε, έχουμε 8 εργάτες.

n_workers = 2 * mp.cpu_count() print(f"{n_workers} είναι διαθέσιμοι εργαζόμενοι") >>> Διατίθενται 8 εργαζόμενοι

Στο επόμενο βήμα, θα απορροφήσουμε μεγάλα αρχεία CSV χρησιμοποιώντας το Πάντα read_csv λειτουργία. Στη συνέχεια, εκτυπώστε το σχήμα του πλαισίου δεδομένων, το όνομα των στηλών και τον χρόνο επεξεργασίας. 

Σημείωση: Η μαγική συνάρτηση του Δία %%time μπορεί να εμφανίσει Χρόνοι CPU και ώρα τοίχου στο τέλος της διαδικασίας. 

 

%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Shape:{df.shape}nnΟνόματα στηλών:n{df.columns}n")

Παραγωγή

Shape:(2845342, 47) Ονόματα στηλών: Ευρετήριο(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi) ', 'Περιγραφή', 'Αριθμός', 'Οδός', 'Πλάγια', 'Πόλη', 'Επαρχία', 'Πολιτεία', 'Ταχυδρομικός κώδικας', 'Χώρα', 'Ζώνη ώρας', 'Αεροδρόμιο_Κωδικός', 'Σφραγίδα_καιρού, 'Θερμοκρασία(F)', 'Wind_Chill(F)', 'Υγρασία(%)', 'Pressure(in)', 'Ορατότητα(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 'Κατακρήμνιση(σε )", "Weather_Condition", "Amenity", "Bump", "Crossing", "Give_Way", "Junction", "No_Exit", "Railway", "Roundabout", "Station", "Stop", "Traffic_Calming" , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') Χρόνοι CPU: χρήστης 33.9 s, sys: 3.93 s, συνολικά 37.9 s. : 46.9 δευτ

Η clean_text είναι μια απλή λειτουργία για την επεξεργασία και τον καθαρισμό του κειμένου. Θα πάρουμε αγγλικά λέξεις-κλειδιά χρησιμοποιώντας nltk.copus χρησιμοποιήστε το για να φιλτράρετε λέξεις τερματισμού από τη γραμμή κειμένου. Μετά από αυτό, θα αφαιρέσουμε ειδικούς χαρακτήρες και επιπλέον κενά από την πρόταση. Θα είναι η βασική συνάρτηση για τον προσδιορισμό του χρόνου επεξεργασίας σειράς, παράλληλα, να σύνολο παραγωγής επεξεργασία. 

def καθαρό_κείμενο(κείμενο): # Αφαιρέστε τις λέξεις διακοπής stops = stopwords.words("αγγλικά") text = " ".join([λέξη for λέξη in text.split() if λέξη δεν in στάσεις]) # Αφαιρέστε ειδικούς χαρακτήρες text = text.translate(str.maketrans('', '', string.punctuation)) # αφαιρώντας τα επιπλέον κενά text = re.sub(' +',' ', κείμενο) απόδοση κείμενο

Για σειριακή επεξεργασία, μπορούμε να χρησιμοποιήσουμε τα πάντα .apply() λειτουργία, αλλά αν θέλετε να δείτε τη γραμμή προόδου, πρέπει να την ενεργοποιήσετε tqdm for Πάντα και μετά χρησιμοποιήστε το .progress_apply() λειτουργία. 

Θα επεξεργαστούμε τις 2.8 εκατομμύρια εγγραφές και θα αποθηκεύσουμε το αποτέλεσμα πίσω στη στήλη "Περιγραφή". 

%%time tqdm.pandas() df['Description'] = df['Description'].progress_apply(clean_text)

Παραγωγή

Χρειάστηκαν 9 λεπτά και 5 δευτερόλεπτα για το high-end επεξεργαστής σε σειριακή επεξεργασία 2.8 εκατομμυρίων σειρών. 

100% 2845342/2845342 [09:05<00:00, 5724.25it/s] Χρόνοι CPU: χρήστης 8 λεπτά 14 δευτ., σύστημα: 53.6 δευτ., σύνολο: 9 λεπτά 7 δευτ. Χρόνος τοίχου: 9 λεπτά 5 δευτ.

Υπάρχουν διάφοροι τρόποι για την παράλληλη επεξεργασία του αρχείου και θα μάθουμε για όλους. ο multiprocessing είναι ένα ενσωματωμένο πακέτο python που χρησιμοποιείται συνήθως για παράλληλη επεξεργασία μεγάλων αρχείων. 

Θα δημιουργήσουμε μια πολυεπεξεργασία Πισίνα με εργαζομένων 8 και χρησιμοποιήστε το χάρτη λειτουργία για την έναρξη της διαδικασίας. Για να εμφανίσουμε τις γραμμές προόδου, χρησιμοποιούμε tqdm.

Η συνάρτηση χάρτη αποτελείται από δύο ενότητες. Το πρώτο απαιτεί τη συνάρτηση και το δεύτερο απαιτεί ένα όρισμα ή λίστα ορισμάτων. 

Μάθετε περισσότερα διαβάζοντας τεκμηρίωση

%%time p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))

Παραγωγή

Έχουμε βελτιώσει σχεδόν τον χρόνο επεξεργασίας 3X. Ο χρόνος επεξεργασίας μειώθηκε από 9 λεπτά 5 δευτερόλεπτα προς την 3 λεπτά 51 δευτερόλεπτα.   

100% 2845342/2845342 [02:58<00:00, 135646.12it/s] Χρόνοι CPU: χρήστη 5.68 s, sys: 1.56 s, σύνολο: 7.23 s Χρόνος τοίχου: 3 λεπτά 51 δευτερόλεπτα

Τώρα θα μάθουμε για ένα άλλο πακέτο Python για την εκτέλεση παράλληλης επεξεργασίας. Σε αυτήν την ενότητα, θα χρησιμοποιήσουμε joblib's Παράλληλο και καθυστέρηση για να αντιγράψετε το χάρτη λειτουργία. 

  • Το Parallel απαιτεί δύο ορίσματα: n_jobs = 8 και backend = πολυεπεξεργασία.
  • Στη συνέχεια, θα προσθέσουμε καθαρό_κείμενο  στο καθυστέρηση λειτουργία. 
  • Δημιουργήστε έναν βρόχο για να τροφοδοτήσετε μια μεμονωμένη τιμή κάθε φορά. 

Η παρακάτω διαδικασία είναι αρκετά γενική και μπορείτε να τροποποιήσετε τη λειτουργία και τον πίνακα σύμφωνα με τις ανάγκες σας. Το έχω χρησιμοποιήσει για την επεξεργασία χιλιάδων αρχείων ήχου και βίντεο χωρίς κανένα πρόβλημα. 

Συνιστάται: προσθήκη χειρισμού εξαιρέσεων χρησιμοποιώντας try: και except:

def κείμενο_παράλληλο_καθαρό(πίνακας): αποτέλεσμα = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(clean_text) (text) for κείμενο in tqdm (πίνακας) ) απόδοση αποτέλεσμα

Προσθέστε τη στήλη "Περιγραφή" σε text_parallel_clean()

%%time df['Description'] = text_parallel_clean(df['Description'])

Παραγωγή

Η λειτουργία μας χρειάστηκε 13 δευτερόλεπτα περισσότερο από την πολυεπεξεργασία του Πισίνα. Ακόμα και τότε, Παράλληλο είναι 4 λεπτά και 59 δευτερόλεπτα ταχύτερη από σειράς επεξεργασία. 

100% 2845342/2845342 [04:03<00:00, 10514.98it/s] Χρόνοι CPU: χρήστη 44.2 s, sys: 2.92 s, σύνολο: 47.1 s Χρόνος τοίχου: 4 λεπτά 4 δευτερόλεπτα

Υπάρχει καλύτερος τρόπος για να επεξεργαστείτε μεγάλα αρχεία χωρίζοντάς τα σε παρτίδες και επεξεργάζοντάς τα παράλληλα. Ας ξεκινήσουμε δημιουργώντας μια συνάρτηση δέσμης που θα εκτελεί α clean_function σε μία μόνο παρτίδα τιμών. 

Λειτουργία επεξεργασίας παρτίδων

def proc_batch(σύνολο παραγωγής): απόδοση [ καθαρό_κείμενο(κείμενο) for κείμενο in παρτίδα]

Διαχωρισμός του αρχείου σε παρτίδες

Η παρακάτω συνάρτηση θα χωρίσει το αρχείο σε πολλές παρτίδες με βάση τον αριθμό των εργαζομένων. Στην περίπτωσή μας, παίρνουμε 8 παρτίδες. 

def batch_file(array,n_workers): file_len = len(array) batch_size = round(file_len / n_workers) batches = [ array[ix:ix+batch_size] for ix in tqdm(εύρος (0, file_len, batch_size)) ] απόδοση batches batches = batch_file(df['Description'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

Εκτέλεση παράλληλης επεξεργασίας παρτίδων

Τέλος, θα χρησιμοποιήσουμε Παράλληλο και καθυστέρηση για την επεξεργασία παρτίδων. 

Σημείωση: Για να λάβουμε έναν ενιαίο πίνακα τιμών, πρέπει να εκτελέσουμε την κατανόηση λίστας όπως φαίνεται παρακάτω. 

 

%%time batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(proc_batch) (batch) for σύνολο παραγωγής in tqdm(παρτίδες) ) df['Περιγραφή'] = [j for i in παρτίδα_εξόδου for j in i]

Παραγωγή

Έχουμε βελτιώσει τον χρόνο επεξεργασίας. Αυτή η τεχνική είναι διάσημη για την επεξεργασία πολύπλοκων δεδομένων και την εκπαίδευση μοντέλων βαθιάς μάθησης. 

100% 8/8 [00:00<00:00, 2.19it/s] Χρόνοι CPU: χρήστη 3.39 s, sys: 1.42 s, σύνολο: 4.81 s Χρόνος τοίχου: 3 λεπτά 56 δευτερόλεπτα

Το tqdm οδηγεί την πολυεπεξεργασία στο επόμενο επίπεδο. Είναι απλό και δυνατό. Θα το προτείνω σε κάθε επιστήμονα δεδομένων. 

Ελέγξτε το τεκμηρίωση για να μάθετε περισσότερα σχετικά με την πολυεπεξεργασία. 

Η process_map απαιτεί:

  1. Όνομα λειτουργίας
  2. Στήλη πλαισίου δεδομένων
  3. max_workers
  4. Το chucksize είναι παρόμοιο με το μέγεθος της παρτίδας. Θα υπολογίσουμε το μέγεθος της παρτίδας χρησιμοποιώντας τον αριθμό των εργαζομένων ή μπορείτε να προσθέσετε τον αριθμό με βάση την προτίμησή σας. 
%%χρόνος
από tqdm.contrib.concurrent εισαγωγή process_map batch = round(len(df)/n_workers) df["Description"] = process_map(clear_text, df["Description"], max_workers=n_workers, chunksize=batch )

Παραγωγή

Με μία μόνο γραμμή κώδικα, έχουμε το καλύτερο αποτέλεσμα. 

100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] Χρόνοι CPU: χρήστη 7.32 s, sys: 1.97 s, σύνολο: 9.29 s Χρόνος τοίχου: 3 λεπτά 51 δευτερόλεπτα

Πρέπει να βρείτε μια ισορροπία και να επιλέξετε την τεχνική που λειτουργεί καλύτερα για την περίπτωσή σας. Μπορεί να είναι σειριακή, παράλληλη ή ομαδική επεξεργασία. Η παράλληλη επεξεργασία μπορεί να αποτύχει εάν εργάζεστε με ένα μικρότερο, λιγότερο περίπλοκο σύνολο δεδομένων. 

Σε αυτό το mini-tutorial, μάθαμε για διάφορα πακέτα Python και τεχνικές που μας επιτρέπουν να επεξεργαζόμαστε παράλληλα τις λειτουργίες δεδομένων μας. 

Εάν εργάζεστε μόνο με ένα πίνακα δεδομένων και θέλετε να βελτιώσετε την απόδοση επεξεργασίας σας, τότε θα σας προτείνω να δοκιμάσετε Ντάσκ, πίνακας δεδομένων, να ΚΑΤΑΡΡΑΚΤΗΣ 

Αναφορά 

 
 
Αμπίντ Αλί Αουάν (@1abidaliawan) είναι πιστοποιημένος επαγγελματίας επιστήμονας δεδομένων που λατρεύει την κατασκευή μοντέλων μηχανικής μάθησης. Επί του παρόντος, εστιάζει στη δημιουργία περιεχομένου και στη σύνταξη τεχνικών ιστολογίων για τη μηχανική μάθηση και τις τεχνολογίες επιστήμης δεδομένων. Ο Abid είναι κάτοχος μεταπτυχιακού τίτλου στη Διοίκηση Τεχνολογίας και πτυχίου στη Μηχανική Τηλεπικοινωνιών. Το όραμά του είναι να δημιουργήσει ένα προϊόν τεχνητής νοημοσύνης χρησιμοποιώντας ένα νευρωνικό δίκτυο γραφημάτων για μαθητές που παλεύουν με ψυχικές ασθένειες.
 

Σφραγίδα ώρας:

Περισσότερα από KDnuggets

Κορυφαίες ιστορίες, 2-8 Αυγούστου: 3 λόγοι για τους οποίους θα πρέπει να χρησιμοποιείτε μοντέλα γραμμικής παλινδρόμησης αντί για νευρωνικά δίκτυα. Εκκινήστε μια σύγχρονη στοίβα δεδομένων σε 5 λεπτά με το Terraform

Κόμβος πηγής: 1860956
Σφραγίδα ώρας: 9 Αυγούστου 2021