Traitement parallèle de gros fichiers en Python

Traitement parallèle de gros fichiers en Python

Nœud source: 1970104

Traitement parallèle de gros fichiers en Python
Image par auteur
 

Pour le traitement parallèle, nous divisons notre tâche en sous-unités. Il augmente le nombre de travaux traités par le programme et réduit le temps de traitement global. 

Par exemple, si vous travaillez avec un gros fichier CSV et que vous souhaitez modifier une seule colonne. Nous alimenterons les données sous forme de tableau à la fonction, et elle traitera en parallèle plusieurs valeurs à la fois en fonction du nombre de disponibles  ouvriers. Ces travailleurs sont basés sur le nombre de cœurs de votre processeur. 
 

Remarque: l'utilisation du traitement parallèle sur un ensemble de données plus petit n'améliorera pas le temps de traitement.

 

Dans ce blog, nous allons apprendre à réduire le temps de traitement des fichiers volumineux en utilisant multitraitement, joblibet une tqdm Paquets Python. Il s'agit d'un didacticiel simple qui peut s'appliquer à n'importe quel fichier, base de données, image, vidéo et audio. 
 

Remarque: nous utilisons le cahier Kaggle pour les expériences. Le temps de traitement peut varier d'une machine à l'autre.  

 

Nous utiliserons le Accidents aux États-Unis (2016 - 2021) ensemble de données de Kaggle qui se compose de 2.8 millions d'enregistrements et de 47 colonnes. 

nous importerons multiprocessing, joblibet une tqdm en traitement parallèle, pandas en ingestions de donnéeset une re, nltket une string en traitement de texte

# Traitement en parallèle
importer multitraitement as mp
De joblib importer Parallèle, retardé
De tqdm.notebook importer tqdm # Ingestion de données 
importer pandas as pd # Traitement de texte 
importer re De nltk.corpus importer mots vides
importer un magnifique

Avant de nous lancer, préparons-nous n_workers en doublant cpu_count(). Comme vous pouvez le voir, nous avons 8 travailleurs.

n_workers = 2 * mp.cpu_count() print(f"{n_workers} les travailleurs sont disponibles") >>> 8 travailleurs sont disponibles

Dans l'étape suivante, nous ingérerons des fichiers CSV volumineux à l'aide du pandas read_csv fonction. Ensuite, imprimez la forme de la trame de données, le nom des colonnes et le temps de traitement. 

Remarque: La fonction magique de Jupyter %%time peut afficher Temps CPU ainsi que l'heure du mur à la fin du processus. 

 

%%temps
file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
df = pd.read_csv(file_name) print(f"Forme :{df.shape}nnNoms de colonnes :n{df.columns}n")

Sortie

Forme :(2845342, 47) Noms de colonnes : Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi) ', 'Description', 'Numéro', 'Rue', 'Côté', 'Ville', 'Comté', 'État', 'Code postal', 'Pays', 'Fuseau horaire', 'Code_aéroport', 'Weather_Timestamp', « Température (F) », « Wind_Chill (F) », « Humidité (%) », « Pression (po) », « Visibilité (mi) », « Direction_du vent », « Vitesse_vent (mph) », « Précipitations (en )', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Ced_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming' , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'],
dtype='object') Temps CPU : utilisateur 33.9 s, système : 3.93 s, total : 37.9 s
Temps de mur : 46.9 s

La clean_text est une fonction simple de traitement et de nettoyage du texte. Nous aurons l'anglais mots vides en utilisant nltk.copus l'utiliser pour filtrer les mots vides de la ligne de texte. Après cela, nous supprimerons les caractères spéciaux et les espaces supplémentaires de la phrase. Ce sera la fonction de référence pour déterminer le temps de traitement des en série, parallèleet une lot traitement. 

def texte_propre(texte): # Supprimer les mots vides arrêts = stopwords.words("anglais") text = " ".join([mot en mot in texte.split() if mot ne sauraient  in s'arrête]) # Supprimer les caractères spéciaux text = text.translate(str.maketrans('', '', string.punctuation)) # supprimer les espaces supplémentaires text = re.sub(' +',' ', text) retourner texte

Pour le traitement en série, nous pouvons utiliser les pandas .apply() fonction, mais si vous voulez voir la barre de progression, vous devez activer tqdm en pandas puis utilisez le .progress_apply() la fonction. 

Nous allons traiter les 2.8 millions d'enregistrements et enregistrer le résultat dans la colonne "Description". 

%%temps
tqdm.pandas() df['Description'] = df['Description'].progress_apply(clean_text)

Sortie

Il a fallu 9 minutes et 5 secondes pour le haut de gamme processeur pour traiter en série 2.8 millions de lignes. 

100 % 2845342/2845342 [09:05<00:00, 5724.25 it/s] Temps CPU : utilisateur 8 min 14 s, système : 53.6 s, total : 9 min 7 s
Temps du mur : 9min 5s

Il existe différentes manières de traiter le fichier en parallèle, et nous allons les découvrir toutes. Le multiprocessing est un package Python intégré couramment utilisé pour le traitement parallèle de fichiers volumineux. 

Nous allons créer un multitraitement Piscine comprenant travailleurs 8 et utiliser le Localisation fonction pour lancer le processus. Pour afficher les barres de progression, nous utilisons tqdm.

La fonction de carte se compose de deux sections. Le premier nécessite la fonction et le second un argument ou une liste d'arguments. 

En savoir plus en lisant Documentation

%%temps
p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))

Sortie

Nous avons amélioré notre temps de traitement de presque 3X. Le délai de traitement est passé de 9 minutes 5 secondes à 3 minutes 51 secondes.   

100 % 2845342/2845342 [02:58<00:00, 135646.12it/s] Temps CPU : utilisateur 5.68 s, système : 1.56 s, total : 7.23 s
Temps du mur : 3min 51s

Nous allons maintenant découvrir un autre package Python pour effectuer un traitement parallèle. Dans cette section, nous utiliserons les joblib's Parallèle ainsi que en retard pour reproduire le Localisation la fonction. 

  • Le Parallel nécessite deux arguments : n_jobs = 8 et backend = multiprocessing.
  • Ensuite, nous ajouterons texte_propre  à la en retard la fonction. 
  • Créez une boucle pour alimenter une seule valeur à la fois. 

Le processus ci-dessous est assez générique et vous pouvez modifier votre fonction et votre tableau en fonction de vos besoins. Je l'ai utilisé pour traiter des milliers de fichiers audio et vidéo sans aucun problème. 

Recommandée: ajouter la gestion des exceptions en utilisant try: ainsi que except:

def text_parallel_clean(tableau) : résultat = Parallèle (n_jobs = n_workers, backend = "multiprocessing") (retardé (clean_text) (texte) en texte in tqdm(tableau) ) retourner résultat

Ajoutez la colonne "Description" à text_parallel_clean()

%%temps
df['Description'] = text_parallel_clean(df['Description'])

Sortie

Il a fallu à notre fonction 13 secondes de plus que le multitraitement du Bassin. Même à ce moment là, Parallèle est 4 minutes et 59 secondes plus rapide que en série traitement. 

100 % 2845342/2845342 [04:03<00:00, 10514.98it/s] Temps CPU : utilisateur 44.2 s, système : 2.92 s, total : 47.1 s
Temps du mur : 4min 4s

Il existe un meilleur moyen de traiter des fichiers volumineux en les divisant en lots et en les traitant en parallèle. Commençons par créer une fonction batch qui exécutera un clean_function sur un seul lot de valeurs. 

Fonction de traitement par lots

def proc_batch(lot): retourner [ clean_text(texte) en texte in lot ]

Fractionner le fichier en lots

La fonction ci-dessous divisera le fichier en plusieurs lots en fonction du nombre de travailleurs. Dans notre cas, nous obtenons 8 lots. 

def fichier_lot(array, n_workers) : file_len = len (array) batch_size = round (file_len / n_workers) batches = [ array[ix:ix+batch_size] en ix in tqdm(range(0, file_len, batch_size)) ] retourner lots lots = batch_file(df['Description'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

Exécution du traitement par lots parallèle

Enfin, nous utiliserons Parallèle ainsi que en retard pour traiter les lots. 

Remarque: Pour obtenir un seul tableau de valeurs, nous devons exécuter la compréhension de liste comme indiqué ci-dessous. 

 

%%temps
batch_output = Parallèle(n_jobs=n_workers,backend="multiprocessing"))( retardé(proc_batch) (batch) en lot in tqdm(batches) ) df['Description'] = [j en i in batch_output en j in i]

Sortie

Nous avons amélioré le temps de traitement. Cette technique est réputée pour le traitement de données complexes et la formation de modèles d'apprentissage en profondeur. 

100 % 8/8 [00:00<00:00, 2.19it/s] Temps CPU : utilisateur 3.39 s, système : 1.42 s, total : 4.81 s
Temps du mur : 3min 56s

tqdm fait passer le multitraitement au niveau supérieur. C'est simple et puissant. Je le recommanderai à tous les data scientists. 

Jetez un coup d'œil au Documentation pour en savoir plus sur le multitraitement. 

La process_map a besoin:

  1. Nom de la fonction
  2. Colonne de trame de données
  3. max_workers
  4. la taille du mandrin est similaire à la taille du lot. Nous calculerons la taille du lot en utilisant le nombre de travailleurs ou vous pourrez ajouter le nombre en fonction de vos préférences. 
%%temps
De tqdm.contrib.concurrent importer carte_processus
batch = round(len(df)/n_workers) df["Description"] = process_map( clean_text, df["Description"], max_workers=n_workers, chunksize=batch
)

Sortie

Avec une seule ligne de code, nous obtenons le meilleur résultat. 

100 % 2845342/2845342 [03:48<00:00, 1426320.93it/s] Temps CPU : utilisateur 7.32 s, système : 1.97 s, total : 9.29 s
Temps du mur : 3min 51s

Vous devez trouver un équilibre et sélectionner la technique qui convient le mieux à votre cas. Il peut s'agir d'un traitement en série, d'un traitement en parallèle ou d'un traitement par lots. Le traitement parallèle peut se retourner contre vous si vous travaillez avec un jeu de données plus petit et moins complexe. 

Dans ce mini-tutoriel, nous avons découvert divers packages et techniques Python qui nous permettent de traiter en parallèle nos fonctions de données. 

Si vous ne travaillez qu'avec un ensemble de données tabulaires et que vous souhaitez améliorer vos performances de traitement, je vous suggérerai d'essayer Bureau, datableet une RAPIDES 

Référence 

 
 
Abid Ali Awan (@1abidaliawan) est un spécialiste des données certifié qui aime créer des modèles d'apprentissage automatique. Actuellement, il se concentre sur la création de contenu et la rédaction de blogs techniques sur les technologies d'apprentissage automatique et de science des données. Abid est titulaire d'une maîtrise en gestion de la technologie et d'un baccalauréat en génie des télécommunications. Sa vision est de créer un produit d'IA utilisant un réseau de neurones graphiques pour les étudiants aux prises avec une maladie mentale.
 

Horodatage:

Plus de KDnuggetsGenericName