Dacă puteți scrie funcții, puteți utiliza Dask

Nodul sursă: 1094095

Dacă puteți scrie funcții, puteți utiliza Dask

Acest articol este al doilea articol dintr-o serie în curs de desfășurare despre utilizarea Dask în practică. Fiecare articol din această serie va fi suficient de simplu pentru începători, dar oferă sfaturi utile pentru munca reală. Primul articol din serie este despre utilizarea LocalCluster.


By Hugo Shi, fondatorul Saturn Cloud

Am vorbit cu mulți oameni de știință ai datelor despre care au auzit Bord, cadrul Python pentru calculul distribuit, dar nu știu de unde să încep. Ei știu că Dask poate accelera multe dintre fluxurile lor de lucru, punându-le să ruleze în paralel pe un grup de mașini, dar sarcina de a învăța o metodologie complet nouă poate părea descurajantă. Sunt aici pentru a vă spune că puteți începe să obțineți valoare de la Dask fără a fi nevoie să învățați întregul cadru. Dacă tu petreceți timp așteptând ca celulele notebook-ului să se execute, există șanse mari ca Dask să vă economisească timp. Chiar dacă știi doar să scrii funcții Python, poți profita de asta fără să înveți altceva! Această postare de blog este un tutorial „cum să utilizați Dask fără a învăța totul”.

Dask, cadre de date, pungi, matrice, programatori, lucrători, grafice, RAPIDS, oh, nu!

 
 
Există o mulțime de piese de conținut complicate despre Dask, care pot fi copleșitoare. Acest lucru se datorează faptului că Dask poate utiliza un grup de mașini de lucru pentru a face multe lucruri interesante! Dar uită de toate astea deocamdată. Acest articol se concentrează pe tehnici simple care vă pot economisi timp, fără a fi nevoie să schimbați prea multe despre modul în care lucrați.
 

Pentru bucle și funcții

 
 
Aproape fiecare cercetător de date a făcut ceva de genul acesta, în care aveți un set de cadre de date stocate în fișiere separate și folosiți o buclă for pentru a le citi pe toate, faceți ceva logică, apoi le combinați:

results = []
for file in files: defer = pd.read_csv(file) ## begin genius algorithm brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) results.append(magical_business_insight)

În timp, ajungi să ai mai multe fișiere sau genius_algorithm devine mai complicat și durează mai mult timp. Și ajungi să aștepți. Și așteptând.


 

Etapa 1 este să vă încapsulați codul într-o funcție. Vrei să încapsulezi lucrurile care intră în bucla for. Acest lucru face mai ușor de înțeles ce face codul (conversia unui fișier în ceva util prin magie). Mai important, este mai ușor de utilizat codul în moduri în afară de bucle pentru.

def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = [] for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Etapa 2 este să-l paralelizezi cu Dask. Acum, în loc să folosească o buclă for, unde fiecare iterație are loc după cea anterioară, Dask le va rula în paralel pe un cluster. Acest lucru ar trebui să ne dea rezultate mult mai rapid și este doar trei rânduri mai lungi decât codul for-loop!

from dask import delayed
from dask.distributed import Client # same function but with a Dask delayed decorator
@delayed
def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight) # new Dask code
c = Client()
results = c.compute(results, sync=True)

Cum funcționează:

  • Decoratorul întârziat vă transformă funcția. Acum, când îl numești, nu este evaluat. În schimb, te întorci a delayed obiect, pe care Dask îl poate executa ulterior.
  • Client().compute trimite toate acele obiecte întârziate către clusterul Dask, unde sunt evaluate în paralel! Asta e, ai câștigat!
  • Instanțierea a Client prevede automat a LocalCluster. Aceasta înseamnă că lucrătorii paraleli Dask sunt toate procese pe aceeași mașină ca și cea care apelează Dask. Acest lucru face un exemplu concis. Pentru munca reală, recomand să creați clustere locale în terminal.

Subiecte practice

 
 
Cele de mai sus se opresc acolo unde se opresc majoritatea tutorialelor Dask. Am folosit această abordare cu propria mea muncă și cu numeroși clienți și mereu apar câteva probleme practice. Următoarele sfaturi vă vor ajuta să treceți de la acel exemplu de manual de mai sus la metode mai utile în practică, acoperind două subiecte care apar în mod constant: obiectele mari și gestionarea erorilor.
 

Obiecte mari

 
Pentru a calcula funcții pe un cluster distribuit, obiectele la care sunt apelate funcțiile trebuie să fie trimise lucrătorilor. Acest lucru poate duce la probleme de performanță, deoarece acestea trebuie să fie serializate (murate) pe computerul dvs. și trimise prin rețea. Imaginează-ți că faci procese pe gigaocteți de date – nu vrei să fii nevoit să le transferi de fiecare dată când rulează o funcție pe ea. Dacă trimiteți accidental obiecte mari, este posibil să vedeți un mesaj de la Dask ca acesta:

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

Există două modalități de a preveni acest lucru: puteți trimite obiecte mai mici lucrătorilor, astfel încât povara să nu fie atât de rea, sau puteți încerca să trimiteți fiecare obiect unui lucrător o singură dată, astfel încât să nu fiți nevoit să faceți transferuri în continuare. .
Remedierea 1: trimiteți obiecte mici atunci când este posibil
Acest exemplu este bun, deoarece trimitem o cale de fișier (șir mic), în loc de cadrul de date.

# good, do this
results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Mai jos este ce nu a face. Atât pentru că ați face citire CSV (costisitoare și lentă) în buclă, care nu este paralelă, dar și pentru că acum trimitem cadre de date (care pot fi mari).

# bad, do not do this
results = []
for file in files: df = pd.read_csv(file) magical_business_insight = make_all_the_magics(df) results.append(magical_business_insight)

De multe ori, codul poate fi rescris pentru a schimba locul în care datele sunt gestionate – fie asupra clientului, fie asupra lucrătorilor. În funcție de situația dvs., pot fi economii uriașe de timp, gândindu-vă la ce funcții iau drept intrare și la modul în care transferurile de date pot fi minimizate.
Remedierea 2: trimiteți obiecte o singură dată
Dacă trebuie să trimiteți un obiect mare, nu-l trimiteți de mai multe ori. De exemplu, dacă trebuie să trimit un obiect model mare pentru a calcula, pur și simplu adăugarea parametrului va serializa modelul de mai multe ori (o dată pe fișier)

# bad, do not do this
results = []
for file in files: # big model has to be sent to a worker each time the function is called magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Îi pot spune lui Dask să nu facă asta, învelindu-l într-un obiect întârziat.

# good, do this
results = []
big_model = client.scatter(big_model) #send the model to the workers first for file in files: magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Eșecul de manipulare

 
Pe măsură ce sarcinile dvs. de calcul cresc, de multe ori veți dori să puteți alimenta prin eșec. În acest caz, poate 5% dintre CSV-urile mele au date proaste pe care nu le pot gestiona. Aș dori să procesez cu succes 95% din fișierele CSV, dar să țin evidența eșecurilor, astfel încât să îmi pot ajusta metodele și să încerc din nou.

Această buclă face acest lucru.

import traceback
from distributed.client import wait, FIRST_COMPLETED, ALL_COMPLETED queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))] while queue: result = wait(queue, return_when=FIRST_COMPLETED) for future in result.done: index = futures_to_index[future] if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result() else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc() queue = result.not_done print(results)

Deoarece această funcție este destul de complicată la prima vedere, să o descompunem.

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

Noi sunam compute on results, dar din moment ce nu trecem sync=True, primim imediat înapoi futures, care reprezintă calculul, care nu s-a finalizat încă. De asemenea, creăm o mapare din viitor însuși, la al _n_lea argument de intrare care l-a generat. În cele din urmă, completăm o listă de rezultate completată cu None pentru moment.

while queue: result = wait(queue, return_when=FIRST_COMPLETED)

Apoi, așteptăm rezultatele și le procesăm pe măsură ce apar. Când așteptăm futures, acestea sunt separate în futures care sunt done, și cele care sunt not_done.

 if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result()

Dacă viitorul este finished, apoi tipărim că am reușit și stocăm rezultatul.

 else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc()

În caz contrar, stocăm excepția și imprimăm urma stivei.

 queue = result.not_done

În cele din urmă, punem coada acelor futures care nu au fost încă finalizate.
 

Concluzie

 
 
Dask vă poate economisi timp cu siguranță. Dacă petreceți timp așteptând rularea codului, ar trebui să utilizați aceste sfaturi simple pentru a vă paraleliza munca. Există, de asemenea, multe lucruri avansate pe care le puteți face cu Dask, dar acesta este un bun punct de plecare.

 
Bio: Hugo Shi este fondatorul Saturn Cloud, spațiul de lucru în cloud pentru a scala Python, a colabora, a implementa joburi și multe altele.

Original. Repostat cu permisiunea.

Related:

Sursa: https://www.kdnuggets.com/2021/09/write-functions-use-dask.html

Timestamp-ul:

Mai mult de la KDnuggets