Om du kan skriva funktioner kan du använda Dask

Källnod: 1094095

Om du kan skriva funktioner kan du använda Dask

Den här artikeln är den andra artikeln i en pågående serie om att använda Dask i praktiken. Varje artikel i den här serien kommer att vara enkel nog för nybörjare, men ge användbara tips för riktigt arbete. Den första artikeln i serien handlar om att använda LocalCluster.


By Hugo Shi, grundare av Saturn Cloud

Jag har chattat med många dataforskare som har hört talas om dask, Python-ramverket för distribuerad datoranvändning, men vet inte var jag ska börja. De vet att Dask förmodligen kan påskynda många av deras arbetsflöden genom att de körs parallellt över ett kluster av maskiner, men uppgiften att lära sig en helt ny metod kan verka skrämmande. Jag är här för att berätta att du kan börja få värde från Dask utan att behöva lära dig hela ramverket. Om dig spendera tid på att vänta på att bärbara celler ska köras, det finns en god chans att Dask kan spara tid. Även om du bara vet hur man skriver Python-funktioner kan du dra nytta av detta utan att lära dig något annat! Det här blogginlägget är en handledning om hur du använder Dask utan att lära dig allt.

Dask, dataramar, väskor, arrayer, schemaläggare, arbetare, grafer, RAPIDS, åh nej!

 
 
Det finns många komplicerade innehåll där ute om Dask, vilket kan vara överväldigande. Detta beror på att Dask kan använda ett kluster av arbetsmaskiner för att göra många coola saker! Men glöm allt det nu. Den här artikeln fokuserar på enkla tekniker som kan spara tid, utan att behöva ändra mycket på hur du arbetar.
 

För slingor och funktioner

 
 
I stort sett alla dataforskare har gjort något sånt här, där du har en uppsättning dataramar lagrade i separata filer och du använder en for-loop för att läsa dem alla, göra lite logik och sedan kombinera dem:

results = []
for file in files: defer = pd.read_csv(file) ## begin genius algorithm brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) results.append(magical_business_insight)

Med tiden får du fler filer, eller genius_algorithm blir mer komplicerat och tar längre tid att köra. Och det slutar med att du väntar. Och väntar.


 

steg 1 är att kapsla in din kod i en funktion. Du vill kapsla in det som går in i for-slingan. Detta gör det lättare att förstå vad koden gör (konvertera en fil till något användbart via magi). Ännu viktigare, det gör det lättare att använda den koden på andra sätt än för loopar.

def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = [] for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

steg 2 är att parallellisera den med Dask. Nu, istället för att använda en for-loop, där varje iteration sker efter den föregående, kommer Dask att köra dem parallellt på ett kluster. Detta borde ge oss resultat mycket snabbare, och är bara tre rader längre än for-loop-koden!

from dask import delayed
from dask.distributed import Client # same function but with a Dask delayed decorator
@delayed
def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight) # new Dask code
c = Client()
results = c.compute(results, sync=True)

Så här fungerar det:

  • Den fördröjda dekoratören förvandlar din funktion. Nu, när du kallar det, utvärderas det inte. Istället får du tillbaka en delayed objekt, som Dask kan köra senare.
  • Client().compute skickar alla dessa försenade objekt till Dask-klustret, där de utvärderas parallellt! Det är det, du vinner!
  • Instantiera en Client automatiskt bestämmelser a LocalCluster. Detta innebär att Dasks parallella arbetare är alla processer på samma maskin som den som anropar Dask. Detta ger ett kortfattat exempel. För riktigt arbete rekommenderar jag att skapa lokala kluster i terminalen.

Praktiska ämnen

 
 
Ovanstående slutar där de flesta Dask-tutorials slutar. Jag har använt detta tillvägagångssätt med mitt eget arbete och med många kunder, och ett par praktiska problem dyker alltid upp. Dessa nästa tips hjälper dig att gå från det läroboksexemplet ovan till mer användbara metoder i praktiken genom att täcka två ämnen som ständigt dyker upp: stora objekt och felhantering.
 

Stora objekt

 
För att kunna beräkna funktioner på ett distribuerat kluster måste objekten som funktionerna anropas på skickas till arbetarna. Detta kan leda till prestandaproblem, eftersom de måste serialiseras (inlagda) på din dator och skickas över nätverket. Föreställ dig att du gjorde processer på gigabyte med data – du vill inte behöva överföra det varje gång en funktion körs på den. Om du av misstag skickar stora objekt kan du se ett meddelande från Dask så här:

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

Det finns två sätt att stoppa detta från att hända: du kan skicka mindre föremål till arbetare så att bördan inte är så illa, eller så kan du försöka skicka varje objekt till en arbetare bara en gång, så att du inte behöver göra överföringar .
Fix 1: skicka små föremål när det är möjligt
Det här exemplet är bra, eftersom vi skickar en filsökväg (liten sträng), istället för dataramen.

# good, do this
results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Nedan är vad inte att göra. Både för att du skulle göra CSV-läsning (dyrt och långsamt) i loopen, vilket inte är parallellt, men också för att vi nu skickar dataramar (som kan vara stora).

# bad, do not do this
results = []
for file in files: df = pd.read_csv(file) magical_business_insight = make_all_the_magics(df) results.append(magical_business_insight)

Ofta kan koden skrivas om för att ändra var data hanteras – antingen på klienten eller på arbetarna. Beroende på din situation kan det bli enorma tidsbesparingar genom att tänka igenom vilka funktioner som används och hur dataöverföringar kan minimeras.
Fix 2: skicka objekt endast en gång
Om du måste skicka ett stort objekt, skicka det inte flera gånger. Till exempel, om jag behöver skicka ett stort modellobjekt för att beräkna, om jag bara lägger till parametern kommer modellen att serialiseras flera gånger (en gång per fil)

# bad, do not do this
results = []
for file in files: # big model has to be sent to a worker each time the function is called magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Jag kan säga åt Dask att inte göra det genom att linda in det i ett försenat föremål.

# good, do this
results = []
big_model = client.scatter(big_model) #send the model to the workers first for file in files: magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Hantering misslyckande

 
När dina beräkningsuppgifter växer, vill du ofta kunna driva genom fel. I det här fallet har kanske 5 % av mina CSV:er dålig data som jag inte kan hantera. Jag skulle vilja bearbeta 95 % av CSV:erna, men håll reda på felen så att jag kan justera mina metoder och försöka igen.

Den här slingan gör detta.

import traceback
from distributed.client import wait, FIRST_COMPLETED, ALL_COMPLETED queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))] while queue: result = wait(queue, return_when=FIRST_COMPLETED) for future in result.done: index = futures_to_index[future] if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result() else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc() queue = result.not_done print(results)

Eftersom den här funktionen är ganska komplicerad vid första anblicken, låt oss dela upp den.

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

Vi ringer compute on results, men eftersom vi inte passerar sync=True, får vi omedelbart tillbaka terminer, som representerar beräkningen, som inte har slutförts ännu. Vi skapar också en mappning från själva framtiden till det _n_:e ingångsargumentet som genererade den. Slutligen fyller vi i en lista med resultat fylld med Nones för tillfället.

while queue: result = wait(queue, return_when=FIRST_COMPLETED)

Därefter väntar vi på resultat, och vi bearbetar dem när de kommer in. När vi väntar på terminer separeras de i terminer som är done, och de som är not_done.

 if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result()

Om framtiden är finished, sedan skriver vi ut att vi lyckats, och vi lagrar resultatet.

 else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc()

Annars lagrar vi undantaget och skriver ut stacktrace.

 queue = result.not_done

Slutligen ställer vi kön till de terminer som ännu inte är klara.
 

Slutsats

 
 
Dask kan definitivt spara tid. Om du lägger tid på att vänta på att koden ska köras bör du använda dessa enkla tips för att parallellisera ditt arbete. Det finns också många avancerade saker du kan göra med Dask, men det här är en bra utgångspunkt.

 
Bio: Hugo Shi är grundare av Saturn Cloud, moln-arbetsytan för att skala Python, samarbeta, distribuera jobb och mer.

Ursprungliga. Skickas om med tillstånd.

Relaterat:

Källa: https://www.kdnuggets.com/2021/09/write-functions-use-dask.html

Tidsstämpel:

Mer från KDnuggets