إذا كان بإمكانك كتابة وظائف ، يمكنك استخدام Dask

عقدة المصدر: 1094095

إذا كان بإمكانك كتابة وظائف ، يمكنك استخدام Dask

هذه المقالة هي المقالة الثانية من سلسلة مستمرة حول استخدام Dask عمليًا. ستكون كل مقالة في هذه السلسلة بسيطة بما يكفي للمبتدئين ، ولكنها تقدم نصائح مفيدة للعمل الحقيقي. المقالة الأولى في السلسلة تدور حول استخدام LocalCluster.


By هوغو شي، مؤسس Saturn Cloud

لقد كنت أتحدث مع العديد من علماء البيانات الذين سمعوا عنهم داسك، إطار عمل Python للحوسبة الموزعة ، لكن لا تعرف من أين تبدأ. إنهم يعرفون أن Dask يمكنه على الأرجح تسريع العديد من مهام سير العمل من خلال جعلهم يعملون بالتوازي عبر مجموعة من الآلات ، لكن مهمة تعلم منهجية جديدة بالكامل قد تبدو شاقة. أنا هنا لأخبرك أنه يمكنك البدء في الحصول على قيمة من Dask دون الحاجة إلى تعلم إطار العمل بأكمله. لو لصحتك! قضاء بعض الوقت في انتظار تنفيذ خلايا دفتر الملاحظات ، فهناك فرصة جيدة يمكن أن يوفر لك Dask الوقت. حتى لو كنت تعرف فقط كيفية كتابة وظائف Python ، يمكنك الاستفادة من ذلك دون تعلم أي شيء آخر! منشور المدونة هذا عبارة عن برنامج تعليمي حول "كيفية استخدام Dask دون تعلم كل شيء".

Dask ، dataframes ، الحقائب ، المصفوفات ، المجدولين ، العمال ، الرسوم البيانية ، RAPIDS ، أوه لا!

 
 
هناك الكثير من أجزاء المحتوى المعقدة حول Dask ، والتي يمكن أن تكون ساحقة. وذلك لأن Dask يمكنها استخدام مجموعة من آلات العمال للقيام بالعديد من الأشياء الرائعة! لكن انسَ كل ذلك الآن. تركز هذه المقالة على الأساليب البسيطة التي يمكن أن توفر لك الوقت ، دون الحاجة إلى تغيير الكثير حول طريقة عملك.
 

للحلقات والوظائف

 
 
لقد فعل كل عالم بيانات شيئًا كهذا تقريبًا ، حيث لديك مجموعة من إطارات البيانات مخزنة في ملفات منفصلة وتستخدم حلقة for لقراءتها جميعًا ، والقيام ببعض المنطق ، ثم دمجها:

results = []
for file in files: defer = pd.read_csv(file) ## begin genius algorithm brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) results.append(magical_business_insight)

بمرور الوقت ، ينتهي بك الأمر مع المزيد من الملفات ، أو ملف genius_algorithm يصبح أكثر تعقيدًا ويستغرق وقتًا أطول للتشغيل. وينتهي بك الأمر بالانتظار. والانتظار.


 

الخطوة1 هو تغليف الكود الخاص بك في دالة. تريد تغليف الأشياء التي تدخل داخل حلقة for. هذا يجعل من السهل فهم ما تفعله الشفرة (تحويل ملف إلى شيء مفيد عبر السحر). والأهم من ذلك ، أنه يسهل استخدام هذا الرمز بطرق غير حلقات for.

def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = [] for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

الخطوة2 هو موازاة ذلك مع داسك. الآن ، بدلاً من استخدام حلقة for ، حيث يحدث كل تكرار بعد التكرار السابق ، سيقوم Dask بتشغيلها بالتوازي على كتلة. هذا يجب أن يعطينا نتائج أسرع بكثير ، وهو فقط ثلاثة أسطر أطول من رمز الحلقة!

from dask import delayed
from dask.distributed import Client # same function but with a Dask delayed decorator
@delayed
def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight) # new Dask code
c = Client()
results = c.compute(results, sync=True)

كيف يعمل:

  • يحول المصمم المتأخر وظيفتك. الآن ، عندما تسميها ، لا يتم تقييمها. بدلاً من ذلك ، تحصل على ملف delayed الذي يمكن لـ Dask تنفيذه لاحقًا.
  • Client().compute يرسل كل تلك الكائنات المتأخرة إلى مجموعة Dask ، حيث يتم تقييمها بالتوازي! هذا كل شيء ، لقد فزت!
  • تجسيد أ Client تلقائيا الأحكام أ LocalCluster. هذا يعني أن عمال Dask المتوازيين جميعهم عمليات على نفس الجهاز مثل الشخص الذي يستدعي Dask. هذا يجعل للحصول على مثال موجز. للعمل الحقيقي ، أوصي بإنشاء ملفات المجموعات المحلية في المحطة.

موضوعات عملية

 
 
توقف ما سبق حيث تتوقف معظم دروس Dask. لقد استخدمت هذا النهج مع عملي الخاص ، ومع العديد من العملاء ، وتظهر دائمًا بعض المشكلات العملية. ستساعدك هذه النصائح التالية على الانتقال من مثال الكتاب المدرسي أعلاه إلى طرق أكثر فائدة في الممارسة من خلال تغطية موضوعين يظهران باستمرار: الأشياء الكبيرة ومعالجة الأخطاء.
 

كائنات كبيرة

 
من أجل حساب الوظائف على الكتلة الموزعة ، يجب إرسال الكائنات التي يتم استدعاء الوظائف إليها إلى العمال. يمكن أن يؤدي ذلك إلى مشكلات في الأداء ، حيث يلزم إجراء تسلسل (مخلل) على جهاز الكمبيوتر الخاص بك وإرساله عبر الشبكة. تخيل أنك كنت تجري عمليات على غيغابايت من البيانات - لا تريد أن تضطر إلى نقل ذلك في كل مرة تعمل فيها إحدى الوظائف عليها. إذا أرسلت كائنات كبيرة عن طريق الخطأ ، فقد ترى رسالة من Dask مثل هذه:

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

هناك طريقتان لمنع حدوث ذلك: يمكنك إرسال أشياء أصغر إلى العمال حتى لا يكون العبء سيئًا للغاية ، أو يمكنك محاولة إرسال كل عنصر إلى العامل مرة واحدة فقط ، حتى لا تضطر إلى إجراء عمليات نقل. .
الإصلاح 1: أرسل أشياء صغيرة عندما يكون ذلك ممكنًا
هذا المثال جيد ، لأننا نرسل مسار ملف (سلسلة صغيرة) ، بدلاً من إطار البيانات.

# good, do this
results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

أدناه ما ليس لكى يفعل. كلاهما لأنك ستقوم بقراءة CSV (باهظة الثمن وبطيئة) في الحلقة ، وهذا ليس متوازيًا ، ولكن أيضًا لأننا نرسل الآن إطارات بيانات (والتي يمكن أن تكون كبيرة).

# bad, do not do this
results = []
for file in files: df = pd.read_csv(file) magical_business_insight = make_all_the_magics(df) results.append(magical_business_insight)

في كثير من الأحيان ، يمكن إعادة كتابة التعليمات البرمجية لتغيير مكان إدارة البيانات - إما على العميل أو على العاملين. اعتمادًا على موقفك ، قد يكون هناك توفير كبير للوقت من خلال التفكير في الوظائف التي تتخذها كمدخلات وكيف يمكن تقليل عمليات نقل البيانات.
الإصلاح 2: إرسال الأشياء مرة واحدة فقط
إذا كان عليك إرسال كائن كبير ، فلا ترسله عدة مرات. على سبيل المثال ، إذا كنت بحاجة إلى إرسال كائن نموذج كبير من أجل الحساب ، فإن إضافة المعلمة ببساطة ستؤدي إلى إجراء تسلسل للنموذج عدة مرات (مرة واحدة لكل ملف)

# bad, do not do this
results = []
for file in files: # big model has to be sent to a worker each time the function is called magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

يمكنني إخبار Dask بعدم القيام بذلك ، عن طريق لفه في كائن متأخر.

# good, do this
results = []
big_model = client.scatter(big_model) #send the model to the workers first for file in files: magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

معالجة الفشل

 
مع نمو مهامك الحسابية ، غالبًا ما تريد أن تكون قادرًا على تجاوز الفشل. في هذه الحالة ، ربما يحتوي 5٪ من ملفات CSV على بيانات سيئة لا يمكنني التعامل معها. أرغب في معالجة 95٪ من ملفات CSV بنجاح ، ولكن مع تتبع حالات الفشل حتى أتمكن من ضبط أسالي والمحاولة مرة أخرى.

هذه الحلقة تفعل هذا.

import traceback
from distributed.client import wait, FIRST_COMPLETED, ALL_COMPLETED queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))] while queue: result = wait(queue, return_when=FIRST_COMPLETED) for future in result.done: index = futures_to_index[future] if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result() else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc() queue = result.not_done print(results)

نظرًا لأن هذه الوظيفة معقدة إلى حد ما للوهلة الأولى ، فلنقم بتفصيلها.

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

نحن نتصل compute on results، ولكن بما أننا لم نمر sync=True، نعود فورًا إلى العقود الآجلة ، والتي تمثل الحساب الذي لم يكتمل بعد. نقوم أيضًا بإنشاء تعيين من المستقبل نفسه ، إلى وسيطة الإدخال _n_th التي أنشأتها. أخيرًا ، نقوم بملء قائمة النتائج المملوءة بـ Nones في الوقت الحالي.

while queue: result = wait(queue, return_when=FIRST_COMPLETED)

بعد ذلك ، ننتظر النتائج ، ونعالجها فور ظهورها. وعندما ننتظر العقود الآجلة ، يتم فصلها إلى عقود مستقبلية done، وتلك not_done.

 if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result()

إذا كان المستقبل finished، ثم نطبع بأننا نجحنا ، ونخزن النتيجة.

 else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc()

خلاف ذلك ، نقوم بتخزين الاستثناء وطباعة تتبع المكدس.

 queue = result.not_done

أخيرًا ، قمنا بتعيين قائمة الانتظار لتلك العقود الآجلة التي لم تكتمل بعد.
 

وفي الختام

 
 
يمكن لـ Dask بالتأكيد توفير الوقت لك. إذا كنت تقضي وقتًا في انتظار تشغيل التعليمات البرمجية ، فيجب عليك استخدام هذه النصائح البسيطة لموازنة عملك. هناك أيضًا العديد من الأشياء المتقدمة التي يمكنك القيام بها باستخدام Dask ، ولكن هذه نقطة بداية جيدة.

 
السيرة الذاتية: هوغو شي هو مؤسس Saturn Cloud ، مساحة العمل السحابية للانتقال لتوسيع نطاق Python والتعاون ونشر الوظائف والمزيد.

أصلي. تم إعادة النشر بإذن.

هذا الموضوع ذو علاقة بـ:

المصدر: https://www.kdnuggets.com/2021/09/write-functions-use-dask.html

الطابع الزمني:

اكثر من KD nuggets