Si vous pouvez écrire des fonctions, vous pouvez utiliser Dask

Nœud source: 1094095

Si vous pouvez écrire des fonctions, vous pouvez utiliser Dask

Cet article est le deuxième article d'une série en cours sur l'utilisation pratique de Dask. Chaque article de cette série sera assez simple pour les débutants, mais fournira des conseils utiles pour un vrai travail. Le premier article de la série concerne l'utilisation de LocalCluster.


By Hugo Shi, Fondateur de Saturn Cloud

J'ai discuté avec de nombreux data scientists qui ont entendu parler de Bureau, le framework Python pour l'informatique distribuée, mais vous ne savez pas par où commencer. Ils savent que Dask peut probablement accélérer bon nombre de leurs flux de travail en les exécutant en parallèle sur un groupe de machines, mais la tâche d'apprendre une toute nouvelle méthodologie peut sembler décourageante. Je suis ici pour vous dire que vous pouvez commencer à tirer parti de Dask sans avoir à apprendre l'intégralité du framework. Si you passer du temps à attendre l'exécution des cellules du bloc-notes, il y a de fortes chances que Dask puisse vous faire gagner du temps. Même si vous ne savez écrire que des fonctions Python, vous pouvez en profiter sans rien apprendre d'autre ! Ce billet de blog est un tutoriel « comment utiliser Dask sans tout apprendre ».

Dask, dataframes, bags, arrays, schedulers, workers, graphs, RAPIDS, oh non !

 
 
Il existe de nombreux éléments de contenu compliqués sur Dask, ce qui peut être accablant. En effet, Dask peut utiliser un groupe de machines de travail pour faire de nombreuses choses intéressantes ! Mais oubliez tout ça pour l'instant. Cet article se concentre sur des techniques simples qui peuvent vous faire gagner du temps, sans avoir à changer grand-chose à votre façon de travailler.
 

Pour les boucles et les fonctions

 
 
Presque tous les data scientists ont fait quelque chose comme ça, où vous avez un ensemble de dataframes stockés dans des fichiers séparés et vous utilisez une boucle for pour les lire tous, faites un peu de logique, puis combinez-les :

results = []
for file in files: defer = pd.read_csv(file) ## begin genius algorithm brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) results.append(magical_business_insight)

Au fil du temps, vous vous retrouvez avec plus de fichiers, ou le genius_algorithm devient plus compliqué et prend plus de temps à exécuter. Et vous finissez par attendre. Et attendre.


 

Étape 1 est d'encapsuler votre code dans une fonction. Vous voulez encapsuler les choses qui vont à l'intérieur de la boucle for. Cela permet de comprendre plus facilement ce que fait le code (convertir un fichier en quelque chose d'utile via la magie). Plus important encore, cela facilite l'utilisation de ce code d'autres manières que les boucles for.

def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = [] for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Étape 2 est de le paralléliser avec Dask. Désormais, au lieu d'utiliser une boucle for, où chaque itération se produit après la précédente, Dask les exécutera en parallèle sur un cluster. Cela devrait nous donner des résultats beaucoup plus rapidement, et ce n'est que trois lignes de plus que le code de la boucle for !

from dask import delayed
from dask.distributed import Client # same function but with a Dask delayed decorator
@delayed
def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight) # new Dask code
c = Client()
results = c.compute(results, sync=True)

Comment ça marche:

  • Le décorateur retardé transforme votre fonction. Maintenant, quand vous l'appelez, il n'est pas évalué. Au lieu de cela, vous récupérez un delayed objet, que Dask peut exécuter plus tard.
  • Client().compute envoie tous ces objets retardés au cluster Dask, où ils sont évalués en parallèle ! Ça y est, vous avez gagné !
  • Instanciation d'un Client provisionne automatiquement un LocalCluster. Cela signifie que les travailleurs parallèles Dask sont tous des processus sur la même machine que celle qui appelle Dask. Cela donne un exemple concis. Pour un vrai travail, je recommande de créer clusters locaux dans l'aérogare.

Sujets pratiques

 
 
Ce qui précède s'arrête là où la plupart des tutoriels Dask s'arrêtent. J'ai utilisé cette approche avec mon propre travail et avec de nombreux clients, et quelques problèmes pratiques se posent toujours. Ces prochains conseils vous aideront à passer de cet exemple de manuel ci-dessus à des méthodes plus utiles dans la pratique en couvrant deux sujets qui reviennent constamment : les gros objets et la gestion des erreurs.
 

Grands objets

 
Afin de calculer des fonctions sur un cluster distribué, les objets sur lesquels les fonctions sont appelées doivent être envoyés aux travailleurs. Cela peut entraîner des problèmes de performances, car ceux-ci doivent être sérialisés (séparés) sur votre ordinateur et envoyés sur le réseau. Imaginez que vous effectuiez des processus sur des gigaoctets de données - vous ne voulez pas avoir à transférer cela chaque fois qu'une fonction s'exécute dessus. Si vous envoyez accidentellement de gros objets, vous pouvez voir un message de Dask comme celui-ci :

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

Il y a deux façons d'empêcher que cela se produise : vous pouvez envoyer des objets plus petits aux travailleurs afin que la charge ne soit pas si lourde, ou vous pouvez essayer d'envoyer chaque objet à un travailleur une seule fois, afin de ne pas avoir à effectuer de transferts. .
Correction 1 : envoyez de petits objets lorsque cela est possible
Cet exemple est bon, car nous envoyons un chemin de fichier (petite chaîne) au lieu de la trame de données.

# good, do this
results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Ci-dessous ce que ne sauraient  à faire. À la fois parce que vous feriez de la lecture CSV (coûteuse et lente) dans la boucle, qui n'est pas parallèle, mais aussi parce que nous envoyons maintenant des trames de données (qui peuvent être volumineuses).

# bad, do not do this
results = []
for file in files: df = pd.read_csv(file) magical_business_insight = make_all_the_magics(df) results.append(magical_business_insight)

Souvent, le code peut être réécrit pour modifier l'endroit où les données sont gérées, soit sur le client, soit sur les travailleurs. Selon votre situation, il peut y avoir d'énormes gains de temps en réfléchissant à ce que les fonctions prennent en entrée et à la façon dont les transferts de données peuvent être minimisés.
Correctif 2 : n'envoyer les objets qu'une seule fois
Si vous devez envoyer un gros objet, ne l'envoyez pas plusieurs fois. Par exemple, si j'ai besoin d'envoyer un gros objet de modèle afin de calculer, le simple fait d'ajouter le paramètre sérialisera le modèle plusieurs fois (une fois par fichier)

# bad, do not do this
results = []
for file in files: # big model has to be sent to a worker each time the function is called magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Je peux dire à Dask de ne pas le faire, en l'enveloppant dans un objet retardé.

# good, do this
results = []
big_model = client.scatter(big_model) #send the model to the workers first for file in files: magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Échec de la gestion

 
Au fur et à mesure que vos tâches de calcul augmentent, vous souhaiterez souvent être en mesure de surmonter les échecs. Dans ce cas, peut-être 5 % de mes CSV contiennent des données erronées que je ne peux pas gérer. J'aimerais traiter 95 % des CSV avec succès, mais garder une trace des échecs afin de pouvoir ajuster mes méthodes et réessayer.

Cette boucle fait cela.

import traceback
from distributed.client import wait, FIRST_COMPLETED, ALL_COMPLETED queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))] while queue: result = wait(queue, return_when=FIRST_COMPLETED) for future in result.done: index = futures_to_index[future] if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result() else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc() queue = result.not_done print(results)

Comme cette fonction est assez compliquée à première vue, décomposons-la.

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

Nous appelons compute on results, mais comme on ne passe pas sync=True, nous récupérons immédiatement les futures, qui représentent le calcul, qui n'est pas encore terminé. Nous créons également un mappage du futur lui-même vers le _n_ième argument d'entrée qui l'a généré. Enfin, nous remplissons une liste de résultats remplis de Nones pour l'instant.

while queue: result = wait(queue, return_when=FIRST_COMPLETED)

Ensuite, nous attendons les résultats et nous les traitons au fur et à mesure qu'ils arrivent. Lorsque nous attendons les contrats à terme, ils sont séparés en contrats à terme qui sont done, et ceux qui sont not_done.

 if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result()

Si l'avenir est finished, puis on imprime que l'on a réussi, et on stocke le résultat.

 else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc()

Sinon, nous stockons l'exception et imprimons la trace de la pile.

 queue = result.not_done

Enfin, nous définissons la file d'attente pour les contrats à terme qui ne sont pas encore terminés.
 

Conclusion

 
 
Dask peut certainement vous faire gagner du temps. Si vous passez du temps à attendre que le code s'exécute, vous devez utiliser ces conseils simples pour paralléliser votre travail. Il y a aussi beaucoup de choses avancées que vous pouvez faire avec Dask, mais c'est un bon point de départ.

 
Bio: Hugo Shi est le fondateur de Saturn Cloud, l'espace de travail cloud incontournable pour faire évoluer Python, collaborer, déployer des tâches, etc.

ORIGINALE. Republié avec permission.

Connexe:

Source : https://www.kdnuggets.com/2021/09/write-functions-use-dask.html

Horodatage:

Plus de KDnuggetsGenericName