Fonksiyon Yazabiliyorsanız Dask Kullanabilirsiniz

Kaynak Düğüm: 1094095

Fonksiyon Yazabiliyorsanız Dask Kullanabilirsiniz

Bu makale, Dask'ın pratikte kullanımına ilişkin devam eden bir serinin ikinci makalesidir. Bu serideki her makale yeni başlayanlar için yeterince basit olacak, ancak gerçek çalışma için yararlı ipuçları sağlayacak. Serinin ilk makalesi LocalCluster kullanımıyla ilgili.


By Hugo Şi, Saturn Cloud'un Kurucusu

Bunu duyan birçok veri bilimciyle sohbet ediyorum. daskdağıtılmış hesaplama için Python çerçevesi, ancak nereden başlayacağımı bilmiyorum. Dask'ın iş akışlarının çoğunu bir makine kümesinde paralel olarak çalıştırarak muhtemelen hızlandırabileceğini biliyorlar, ancak tamamen yeni bir metodoloji öğrenme görevi göz korkutucu görünebilir. Çerçevenin tamamını öğrenmek zorunda kalmadan Dask'tan değer elde etmeye başlayabileceğinizi söylemek için buradayım. Eğer sen Dizüstü bilgisayar hücrelerinin çalışmasını bekleyerek zaman harcarsanız, Dask'ın size zaman kazandırma şansı yüksektir. Yalnızca Python işlevlerinin nasıl yazılacağını bilseniz bile, başka hiçbir şey öğrenmeden bundan yararlanabilirsiniz! Bu blog yazısı, "her şeyi öğrenmeden Dask'ın nasıl kullanılacağı" konulu bir eğitimdir.

Dask, veri çerçeveleri, çantalar, diziler, zamanlayıcılar, çalışanlar, grafikler, RAPIDS, ah hayır!

 
 
Dask hakkında çok fazla karmaşık olabilecek pek çok içerik var. Bunun nedeni, Dask'ın birçok harika şey yapmak için bir grup çalışan makineden yararlanabilmesidir! Ama şimdilik bunların hepsini unutun. Bu makale, çalışma şeklinizi çok fazla değiştirmenize gerek kalmadan size zaman kazandırabilecek basit tekniklere odaklanmaktadır.
 

Döngüler ve işlevler için

 
 
Hemen hemen her veri bilimci buna benzer bir şey yapmıştır; burada ayrı dosyalarda saklanan bir dizi veri çerçevesi vardır ve hepsini okumak için bir for döngüsü kullanırsınız, biraz mantık yürütür ve ardından bunları birleştirirsiniz:

results = []
for file in files: defer = pd.read_csv(file) ## begin genius algorithm brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) results.append(magical_business_insight)

Zamanla daha fazla dosya elde edersiniz veya genius_algorithm daha karmaşık hale gelir ve çalıştırılması daha uzun sürer. Ve sonunda beklemek zorunda kalıyorsun. Ve bekliyorum.


 

1. Adım kodunuzu bir fonksiyon içinde kapsüllemektir. For döngüsünün içine giren şeyleri kapsüllemek istiyorsunuz. Bu, kodun ne yaptığını (bir dosyayı sihir yoluyla yararlı bir şeye dönüştürmek) anlamayı kolaylaştırır. Daha da önemlisi, bu kodun for döngüleri dışında başka şekillerde de kullanılmasını kolaylaştırır.

def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = [] for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

2. Adım Dask ile paralel hale getirmektir. Artık her yinelemenin bir öncekinden sonra gerçekleştiği bir for döngüsü kullanmak yerine Dask bunları bir küme üzerinde paralel olarak çalıştıracak. Bu bize çok daha hızlı sonuç verecektir ve yalnızca üç satır daha uzun for döngüsü kodundan daha!

from dask import delayed
from dask.distributed import Client # same function but with a Dask delayed decorator
@delayed
def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight) # new Dask code
c = Client()
results = c.compute(results, sync=True)

Nasıl çalışır:

  • Gecikmeli dekoratör işlevinizi dönüştürür. Artık aradığınızda değerlendirmeye alınmıyor. Bunun yerine, bir delayed Dask'ın daha sonra çalıştırabileceği nesne.
  • Client().compute tüm bu gecikmiş nesneleri paralel olarak değerlendirilecekleri Dask kümesine gönderir! İşte bu, sen kazandın!
  • Bir örneği oluşturma Client otomatik olarak provizyon LocalCluster. Bu, Dask paralel çalışanlarının hepsinin, Dask'ı çağıran makineyle aynı makinedeki işlemler olduğu anlamına gelir. Bu kısa bir örnek sağlar. Gerçek iş için oluşturmanızı öneririm terminaldeki yerel kümeler.

Pratik Konular

 
 
Yukarıdakiler çoğu Dask öğreticisinin durduğu yerde durur. Bu yaklaşımı kendi işimde ve çok sayıda müşterimde kullandım ve her zaman birkaç pratik sorun ortaya çıktı. Sonraki ipuçları, sürekli gündeme gelen iki konuyu ele alarak yukarıdaki ders kitabı örneğinden pratikte daha yararlı yöntemlere geçmenize yardımcı olacaktır: büyük nesneler ve hata yönetimi.
 

Büyük Nesneler

 
Dağıtılmış bir kümede işlevlerin hesaplanabilmesi için, işlevlerin çağrıldığı nesnelerin işçilere gönderilmesi gerekir. Bu, performans sorunlarına yol açabilir, çünkü bunların bilgisayarınızda serileştirilmesi (salamura edilmesi) ve ağ üzerinden gönderilmesi gerekir. Gigabaytlarca veri üzerinde işlemler yaptığınızı hayal edin; üzerinde bir işlev her çalıştırıldığında bunu aktarmak zorunda kalmak istemezsiniz. Yanlışlıkla büyük nesneler gönderirseniz Dask'tan şöyle bir mesaj görebilirsiniz:

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

Bunun olmasını engellemenin iki yolu vardır: Yükün o kadar da kötü olmaması için çalışanlara daha küçük nesneler gönderebilirsiniz veya her nesneyi bir çalışana yalnızca bir kez göndermeyi deneyebilirsiniz, böylece transfer yapmak zorunda kalmazsınız. .
Düzeltme 1: Mümkün olduğunda küçük nesneler gönderin
Bu örnek iyidir çünkü veri çerçevesi yerine bir dosya yolu (küçük dize) gönderiyoruz.

# good, do this
results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Aşağıda ne var değil yapmak. Hem döngüde paralel olmayan CSV okuması (pahalı ve yavaş) yapacağınız için, hem de artık veri çerçeveleri (büyük olabilir) gönderdiğimiz için.

# bad, do not do this
results = []
for file in files: df = pd.read_csv(file) magical_business_insight = make_all_the_magics(df) results.append(magical_business_insight)

Çoğu zaman kod, verilerin yönetildiği yeri (istemcide veya çalışanlarda) değiştirmek için yeniden yazılabilir. Durumunuza bağlı olarak hangi işlevlerin girdi olarak alınacağını ve veri aktarımlarının nasıl en aza indirilebileceğini düşünerek büyük zaman tasarrufu sağlayabilirsiniz.
Düzeltme 2: nesneleri yalnızca bir kez gönderin
Büyük bir nesne göndermeniz gerekiyorsa onu birden çok kez göndermeyin. Örneğin, hesaplamak için büyük bir model nesnesi göndermem gerekiyorsa, yalnızca parametreyi eklemek, modeli birden çok kez seri hale getirecektir (dosya başına bir kez)

# bad, do not do this
results = []
for file in files: # big model has to be sent to a worker each time the function is called magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Dask'a onu gecikmeli bir nesneye sararak bunu yapmamasını söyleyebilirim.

# good, do this
results = []
big_model = client.scatter(big_model) #send the model to the workers first for file in files: magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Başarısızlıkla Mücadele

 
Hesaplamalı görevleriniz büyüdükçe çoğu zaman başarısızlıkların üstesinden gelmeyi isteyeceksiniz. Bu durumda CSV'lerimin belki %5'inde başa çıkamayacağım kötü veriler var. CSV'lerin %95'ini başarılı bir şekilde işlemek istiyorum ancak yöntemlerimi ayarlayıp tekrar deneyebilmek için hataları takip etmek istiyorum.

Bu döngü bunu yapar.

import traceback
from distributed.client import wait, FIRST_COMPLETED, ALL_COMPLETED queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))] while queue: result = wait(queue, return_when=FIRST_COMPLETED) for future in result.done: index = futures_to_index[future] if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result() else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc() queue = result.not_done print(results)

Bu işlev ilk bakışta oldukça karmaşık olduğundan, onu parçalara ayıralım.

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

Biz ararız compute on results, ama geçemediğimiz için sync=True, henüz tamamlanmayan hesaplamayı temsil eden vadeli işlemleri hemen geri alıyoruz. Ayrıca geleceğin kendisinden, onu oluşturan _n_th giriş argümanına kadar bir eşleme yaratırız. Son olarak, şimdilik Hiçbiri ile doldurulmuş sonuçların bir listesini dolduruyoruz.

while queue: result = wait(queue, return_when=FIRST_COMPLETED)

Daha sonra sonuçları bekleriz ve onları geldikleri gibi işleriz. Gelecekleri beklediğimizde, bunlar geleceklere ayrılır. done, ve olanlar not_done.

 if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result()

Gelecek ise finishedsonra başarılı olduğumuzu yazdırır ve sonucu saklarız.

 else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc()

Aksi takdirde istisnayı saklar ve yığın izlemesini yazdırırız.

 queue = result.not_done

Son olarak henüz tamamlanmamış vadeli işlemler için sırayı belirliyoruz.
 

Sonuç

 
 
Dask kesinlikle size zaman kazandırabilir. Kodun çalışmasını bekleyerek zaman harcıyorsanız, çalışmanızı paralel hale getirmek için bu basit ipuçlarını kullanmalısınız. Ayrıca Dask ile yapabileceğiniz birçok gelişmiş şey var ancak bu iyi bir başlangıç ​​noktası.

 
Bio: Hugo Şi Python'u ölçeklendirmek, işbirliği yapmak, işleri dağıtmak ve daha fazlasını yapmak için kullanılan bulut çalışma alanı Saturn Cloud'un kurucusudur.

orijinal. İzinle yeniden yayınlandı.

İlgili:

Kaynak: https://www.kdnuggets.com/2021/09/write-functions-use-dask.html

Zaman Damgası:

Den fazla KDNuggets