Jos voit kirjoittaa toimintoja, voit käyttää Daskia

Lähdesolmu: 1094095

Jos voit kirjoittaa toimintoja, voit käyttää Daskia

Tämä artikkeli on toinen artikkeli käynnissä olevasta sarjasta, jossa käsitellään Daskin käyttöä käytännössä. Jokainen tämän sarjan artikkeli on riittävän yksinkertainen aloittelijoille, mutta tarjoaa hyödyllisiä vinkkejä todelliseen työhön. Sarjan ensimmäinen artikkeli käsittelee LocalClusterin käyttöä.


By Hugo Shi, Saturn Cloudin perustaja

Olen keskustellut monien tietotieteilijöiden kanssa, jotka ovat kuulleet siitä dask, Python-kehys hajautettua tietojenkäsittelyä varten, mutta en tiedä mistä aloittaa. He tietävät, että Dask voi luultavasti nopeuttaa monia heidän työnkulkuaan ajamalla ne rinnakkain useiden koneiden yli, mutta kokonaan uuden menetelmän oppiminen voi tuntua pelottavalta. Haluan kertoa sinulle, että voit alkaa saada lisäarvoa Daskista ilman, että sinun tarvitsee oppia koko viitekehystä. Jos te viettää aikaa odottamalla muistikirjan solujen suorittamista, Dask voi todennäköisesti säästää aikaa. Vaikka osaat vain kirjoittaa Python-funktioita, voit hyödyntää tätä oppimatta mitään muuta! Tämä blogiviesti on "miten Daskia käytetään oppimatta koko asiaa" -opastus.

Dask, datakehykset, pussit, taulukot, aikataulut, työntekijät, kaaviot, RAPIDS, voi ei!

 
 
Daskista löytyy paljon monimutkaisia ​​sisältökappaleita, jotka voivat olla ylivoimaisia. Tämä johtuu siitä, että Dask voi hyödyntää työkoneiden klusteria tehdäkseen monia hienoja asioita! Mutta unohda tämä kaikki toistaiseksi. Tämä artikkeli keskittyy yksinkertaisiin tekniikoihin, jotka voivat säästää aikaa ilman, että sinun tarvitsee muuttaa paljon työskentelytapaasi.
 

Silmukoille ja funktioille

 
 
Melkein jokainen datatieteilijä on tehnyt jotain tällaista, jossa sinulla on joukko datakehyksiä, jotka on tallennettu erillisiin tiedostoihin ja käytät for-silmukkaa lukeaksesi ne kaikki, tekemällä logiikkaa ja yhdistämällä ne:

results = []
for file in files: defer = pd.read_csv(file) ## begin genius algorithm brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) results.append(magical_business_insight)

Ajan myötä saat enemmän tiedostoja tai genius_algorithm muuttuu monimutkaisemmaksi ja kestää kauemmin. Ja päädyt odottamaan. Ja odottaa.


 

Vaihe 1 on kapseloida koodisi funktioon. Haluat kapseloida for-silmukan sisällä olevat tavarat. Tämä helpottaa koodin tekemisen ymmärtämistä (tiedoston muuntaminen hyödylliseksi magian avulla). Vielä tärkeämpää on, että se helpottaa koodin käyttöä muilla tavoilla kuin silmukoille.

def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = [] for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Vaihe 2 on rinnastaa se Daskin kanssa. Nyt sen sijaan, että käytettäisiin for-silmukkaa, jossa jokainen iteraatio tapahtuu edellisen jälkeen, Dask suorittaa ne rinnakkain klusterissa. Tämän pitäisi antaa meille tuloksia paljon nopeammin, ja se on vain kolme riviä pidempi kuin for-silmukan koodi!

from dask import delayed
from dask.distributed import Client # same function but with a Dask delayed decorator
@delayed
def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight) # new Dask code
c = Client()
results = c.compute(results, sync=True)

Miten se toimii:

  • Viivästetty sisustaja muuttaa toimintasi. Nyt kun kutsut sitä, sitä ei arvioida. Sen sijaan saat takaisin a delayed objekti, jonka Dask voi suorittaa myöhemmin.
  • Client().compute lähettää kaikki viivästyneet objektit Dask-klusteriin, jossa ne arvioidaan rinnakkain! Siinä se, sinä voitat!
  • Instantointi a Client automaattisesti määräykset a LocalCluster. Tämä tarkoittaa, että Daskin rinnakkaistyöntekijät ovat kaikki prosessit samassa koneessa kuin Daskia kutsuva. Tästä saadaan ytimekäs esimerkki. Oikeaa työtä varten suosittelen luomista paikalliset klusterit terminaalissa.

Käytännön aiheita

 
 
Yllä olevat pysähtyvät siihen, missä useimmat Daskin opetusohjelmat pysähtyvät. Olen käyttänyt tätä lähestymistapaa omassa työssäni ja useiden asiakkaiden kanssa, ja pari käytännön ongelmaa tulee aina esille. Nämä seuraavat vinkit auttavat sinua siirtymään yllä olevasta oppikirjaesimerkistä hyödyllisempiin menetelmiin käytännössä kattamalla kaksi jatkuvasti esiin nousevaa aihetta: suuret objektit ja virheiden käsittely.
 

Suuret esineet

 
Funktioiden laskemiseksi hajautetussa klusterissa objektit, joita funktioita kutsutaan, on lähetettävä työntekijöille. Tämä voi johtaa suorituskykyongelmiin, koska ne on sarjoitettava (marinoitava) tietokoneellasi ja lähetettävä verkon kautta. Kuvittele, että suoritit prosesseja gigatavuilla dataa – sinun ei tarvitse siirtää sitä joka kerta, kun toiminto suoritetaan sillä. Jos lähetät vahingossa suuria esineitä, saatat nähdä Daskilta seuraavanlaisen viestin:

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

On kaksi tapaa estää tämän tapahtuman: voit lähettää pienempiä esineitä työntekijöille, jotta taakka ei ole niin paha, tai voit yrittää lähettää jokaisen esineen työntekijälle vain kerran, jotta sinun ei tarvitse tehdä siirtoja jatkuvasti. .
Korjaus 1: Lähetä pieniä esineitä, kun mahdollista
Tämä esimerkki on hyvä, koska lähetämme tiedostopolun (pienen merkkijonon) datakehyksen sijaan.

# good, do this
results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Alla mitä emme tehdä. Sekä siksi, että lukisit CSV:tä (kallista ja hidasta) silmukassa, joka ei ole rinnakkainen, mutta myös siksi, että lähetämme nyt datakehyksiä (jotka voivat olla suuria).

# bad, do not do this
results = []
for file in files: df = pd.read_csv(file) magical_business_insight = make_all_the_magics(df) results.append(magical_business_insight)

Usein koodia voidaan kirjoittaa uudelleen tietojen hallintapaikan muuttamiseksi – joko asiakkaan tai työntekijöiden kohdalla. Tilanteesta riippuen voi olla suuriakin ajansäästöjä pohtimalla, mitä toimintoja syötetään ja miten tiedonsiirrot voidaan minimoida.
Korjaus 2: Lähetä objektit vain kerran
Jos sinun on lähetettävä suuri esine, älä lähetä sitä useita kertoja. Jos minun on esimerkiksi lähetettävä suuri malliobjekti laskentaa varten, vain parametrin lisääminen sarjoittaa mallin useita kertoja (kerran per tiedosto).

# bad, do not do this
results = []
for file in files: # big model has to be sent to a worker each time the function is called magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Voin kertoa Daskille, ettei se tee niin, käärimällä sen viivästettyyn esineeseen.

# good, do this
results = []
big_model = client.scatter(big_model) #send the model to the workers first for file in files: magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Käsittelyvirhe

 
Laskennallisten tehtäviesi kasvaessa haluat usein selviytyä epäonnistumisista. Tässä tapauksessa ehkä 5 prosentissa CSV-tiedostoistani on huonoja tietoja, joita en voi käsitellä. Haluaisin käsitellä 95 % CSV-tiedostoista onnistuneesti, mutta pidän kirjaa virheistä, jotta voin säätää menetelmiäni ja yrittää uudelleen.

Tämä silmukka tekee tämän.

import traceback
from distributed.client import wait, FIRST_COMPLETED, ALL_COMPLETED queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))] while queue: result = wait(queue, return_when=FIRST_COMPLETED) for future in result.done: index = futures_to_index[future] if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result() else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc() queue = result.not_done print(results)

Koska tämä toiminto on ensi silmäyksellä melko monimutkainen, puretaan se.

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

Me kutsumme compute on results, mutta koska emme ole ohitse sync=True, saamme heti takaisin futuurit, jotka edustavat laskentaa, joka ei ole vielä valmis. Luomme myös kartoituksen tulevaisuudesta itsestään sen luoneeseen _n_nteen syöttöargumenttiin. Lopuksi täytämme luettelon tuloksista, jotka on toistaiseksi täynnä Ei mitään.

while queue: result = wait(queue, return_when=FIRST_COMPLETED)

Seuraavaksi odotamme tuloksia ja käsittelemme niitä niiden saapuessa. Kun odotamme futuureja, ne erotetaan futuureiksi, jotka ovat done, ja ne, jotka ovat not_done.

 if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result()

Jos tulevaisuus on finished, sitten tulostamme onnistumisen ja tallennamme tuloksen.

 else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc()

Muussa tapauksessa tallennamme poikkeuksen ja tulostamme pinon jäljen.

 queue = result.not_done

Lopuksi asetimme jonoon niille futuureille, joita ei ole vielä tehty.
 

Yhteenveto

 
 
Dask voi varmasti säästää aikaa. Jos vietät aikaa odottaessasi koodin suorittamista, sinun tulee rinnastaa työsi näiden yksinkertaisten vinkkien avulla. Daskilla voi myös tehdä monia edistyneitä asioita, mutta tämä on hyvä lähtökohta.

 
Bio: Hugo Shi on Saturn Cloudin perustaja, pilvityötila Pythonin skaalaamiseen, yhteistyöhön, töiden käyttöönottoon ja paljon muuta.

Alkuperäinen. Postitettu luvalla.

Related:

Lähde: https://www.kdnuggets.com/2021/09/write-functions-use-dask.html

Aikaleima:

Lisää aiheesta KDnuggets