אם אתה יכול לכתוב פונקציות, תוכל להשתמש ב- Dask

צומת המקור: 1094095

אם אתה יכול לכתוב פונקציות, תוכל להשתמש ב- Dask

מאמר זה הוא המאמר השני מתוך סדרה מתמשכת על השימוש ב-Dask בפועל. כל מאמר בסדרה זו יהיה פשוט מספיק למתחילים, אך יספק עצות שימושיות לעבודה אמיתית. המאמר הראשון בסדרה עוסק בשימוש ב-LocalCluster.


By הוגו שי, מייסד שבתאי ענן

שוחחתי עם מדעני נתונים רבים ששמעו עליהם לוּחַ מַחווָנִים, מסגרת Python למחשוב מבוזר, אבל לא יודע מאיפה להתחיל. הם יודעים ש-Dask כנראה יכול להאיץ הרבה מתזרימי העבודה שלהם על ידי כך שהם יפעלו במקביל על פני אשכול של מכונות, אבל המשימה של לימוד מתודולוגיה חדשה לגמרי יכולה להיראות מרתיעה. אני כאן כדי לומר לך שאתה יכול להתחיל לקבל ערך מ-Dask מבלי שתצטרך ללמוד את כל המסגרת. אם אתה להשקיע זמן בהמתנה לתאי מחברת לביצוע, יש סיכוי טוב ש-Dask יכול לחסוך לך זמן. גם אם אתה יודע רק לכתוב פונקציות של Python, אתה יכול לנצל זאת מבלי ללמוד שום דבר אחר! פוסט זה בבלוג הוא הדרכה "כיצד להשתמש ב-Dask מבלי ללמוד את כל העניין".

דסק, מסגרות נתונים, תיקים, מערכים, מתזמנים, עובדים, גרפים, RAPIDS, הו לא!

 
 
יש הרבה קטעי תוכן מסובכים בחוץ על Dask, שיכולים להיות מהמם. הסיבה לכך היא ש-Dask יכול להשתמש במקבץ של מכונות עובדים כדי לעשות הרבה דברים מגניבים! אבל תשכח מכל זה לעת עתה. מאמר זה מתמקד בטכניקות פשוטות שיכולות לחסוך לך זמן, מבלי שתצטרך לשנות הרבה באופן שבו אתה עובד.
 

עבור לולאות ופונקציות

 
 
כמעט כל מדען נתונים עשה משהו כזה, שבו יש לך קבוצה של מסגרות נתונים המאוחסנות בקבצים נפרדים ואתה משתמש בלולאת for כדי לקרוא את כולם, לעשות קצת היגיון ואז לשלב אותם:

results = []
for file in files: defer = pd.read_csv(file) ## begin genius algorithm brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) results.append(magical_business_insight)

עם הזמן, אתה מקבל יותר קבצים, או את genius_algorithm נהיה יותר מסובך ולוקח יותר זמן לרוץ. ובסוף אתה מחכה. ומחכה.


 

שלב 1 זה לכלול את הקוד שלך בפונקציה. אתה רוצה לכלול את החומר שנכנס לתוך לולאת for. זה מקל על ההבנה מה הקוד עושה (המרת קובץ למשהו שימושי באמצעות קסם). חשוב מכך, זה מקל על השימוש בקוד הזה בדרכים מלבד לולאות.

def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = [] for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

שלב 2 זה להקביל אותו עם Dask. כעת, במקום להשתמש בלולאת for, שבה כל איטרציה מתרחשת אחרי הקודמת, Dask יריץ אותן במקביל על אשכול. זה אמור לתת לנו תוצאות הרבה יותר מהירות, וזה רק שלוש שורות יותר מאשר קוד for-loop!

from dask import delayed
from dask.distributed import Client # same function but with a Dask delayed decorator
@delayed
def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight) # new Dask code
c = Client()
results = c.compute(results, sync=True)

איך זה עובד:

  • הדקורטור המושהה משנה את הפונקציה שלך. עכשיו, כשאתה קורא לזה, זה לא מוערך. במקום זאת, אתה מקבל בחזרה א delayed אובייקט, ש-Dask יכול לבצע מאוחר יותר.
  • Client().compute שולח את כל אותם אובייקטים מושהים לאשכול Dask, שם הם מוערכים במקביל! זהו, אתה מנצח!
  • מופע א Client הוראות אוטומטית א LocalCluster. המשמעות היא שהעובדים המקבילים של Dask הם כולם תהליכים באותו מחשב כמו זה שקורא ל-Dask. זה מהווה דוגמה תמציתית. לעבודה אמיתית, אני ממליץ ליצור אשכולות מקומיים בטרמינל.

נושאים מעשיים

 
 
האמור לעיל נעצר במקום שבו נעצרים רוב הדרכות של Dask. השתמשתי בגישה הזו עם העבודה שלי ועם לקוחות רבים, ותמיד עולות כמה בעיות מעשיות. הטיפים הבאים יעזרו לך לעבור מדוגמה זו של ספר הלימוד למעלה לשיטות שימושיות יותר בפועל על ידי כיסוי שני נושאים שעולים כל הזמן: חפצים גדולים וטיפול בשגיאות.
 

חפצים גדולים

 
על מנת לחשב פונקציות באשכול מבוזר, יש לשלוח לעובדים את האובייקטים שעליהם נקראות הפונקציות. זה יכול להוביל לבעיות ביצועים, שכן יש לבצע סדרה (לחבוש) במחשב שלך ולשלוח אותן דרך הרשת. תאר לעצמך שאתה עושה תהליכים על ג'יגה-בייט של נתונים - אתה לא רוצה להעביר את זה בכל פעם שפונקציה פועלת עליו. אם אתה שולח בטעות חפצים גדולים, ייתכן שתראה הודעה מ-Dask כך:

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

ישנן שתי דרכים לעצור את זה: אתה יכול לשלוח חפצים קטנים יותר לעובדים כך שהעומס לא יהיה כל כך נורא, או שאתה יכול לנסות לשלוח כל חפץ לעובד פעם אחת בלבד, כדי שלא תצטרך כל הזמן לבצע העברות .
תיקון 1: שלח חפצים קטנים כשאפשר
הדוגמה הזו טובה, מכיוון שאנו שולחים נתיב קובץ (מחרוזת קטנה), במקום ה-dataframe.

# good, do this
results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

להלן מה לֹא לעשות. גם בגלל שהיית עושה קריאת CSV (יקרה ואיטית) בלולאה, שאינה מקבילה, אבל גם בגלל שאנחנו שולחים כעת מסגרות נתונים (שיכולות להיות גדולות).

# bad, do not do this
results = []
for file in files: df = pd.read_csv(file) magical_business_insight = make_all_the_magics(df) results.append(magical_business_insight)

לעתים קרובות, ניתן לשכתב את הקוד כדי לשנות את המקום שבו הנתונים מנוהלים - או על הלקוח או על העובדים. בהתאם למצב שלך, עשוי להיות חיסכון עצום בזמן על ידי חשיבה דרך אילו פונקציות לוקחות כקלט וכיצד ניתן למזער העברות נתונים.
תיקון 2: שלח אובייקטים פעם אחת בלבד
אם אתה צריך לשלוח חפץ גדול, אל תשלח אותו מספר פעמים. לדוגמה, אם אני צריך לשלוח אובייקט דגם גדול כדי לחשב, הוספת הפרמטר תעשה את המודל בסידרה מספר פעמים (פעם בקובץ)

# bad, do not do this
results = []
for file in files: # big model has to be sent to a worker each time the function is called magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

אני יכול להגיד ל-Dask לא לעשות את זה, על ידי לעטוף אותו בחפץ מושהה.

# good, do this
results = []
big_model = client.scatter(big_model) #send the model to the workers first for file in files: magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

כשל בטיפול

 
ככל שהמשימות החישוביות שלך גדלות, פעמים רבות תרצה להיות מסוגל להפעיל כוח באמצעות כישלון. במקרה זה, אולי ל-5% מה-CSV שלי יש נתונים גרועים שאני לא יכול להתמודד איתם. אני רוצה לעבד 95% מה-CSV בהצלחה, אבל עקוב אחר הכשלים כדי שאוכל להתאים את השיטות שלי ולנסות שוב.

הלולאה הזו עושה זאת.

import traceback
from distributed.client import wait, FIRST_COMPLETED, ALL_COMPLETED queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))] while queue: result = wait(queue, return_when=FIRST_COMPLETED) for future in result.done: index = futures_to_index[future] if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result() else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc() queue = result.not_done print(results)

מכיוון שהפונקציה הזו די מסובכת במבט ראשון, בואו נפרק אותה.

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

אנו קוראים compute on results, אבל מאז אנחנו לא עוברים sync=True, אנו מקבלים מיד בחזרה חוזים עתידיים, המייצגים את החישוב, שעדיין לא הושלם. אנו גם יוצרים מיפוי מהעתיד עצמו, לארגומנט הקלט ה-_n_ה שיצר אותו. לבסוף, אנו ממלאים רשימה של תוצאות ב-Nones לעת עתה.

while queue: result = wait(queue, return_when=FIRST_COMPLETED)

לאחר מכן, אנו מחכים לתוצאות, ומעבדים אותן כשהן נכנסות. כאשר אנו מחכים לעתידים, הם מופרדים לעתידים שהם done, ואלה שכן not_done.

 if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result()

אם העתיד הוא finished, אז אנחנו מדפיסים שהצלחנו, ואנו מאחסנים את התוצאה.

 else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc()

אחרת, אנו מאחסנים את החריגה ומדפיסים את עקבות הערימה.

 queue = result.not_done

לבסוף, הגדרנו את התור לאותם עתידים שעדיין לא הושלמו.
 

סיכום

 
 
Dask בהחלט יכול לחסוך לך זמן. אם אתה מבלה זמן בהמתנה להפעלת הקוד, עליך להשתמש בעצות הפשוטות האלה כדי להקביל את העבודה שלך. יש גם הרבה דברים מתקדמים שאתה יכול לעשות עם Dask, אבל זו נקודת התחלה טובה.

 
Bio you הוגו שי הוא המייסד של Saturn Cloud, סביבת העבודה שעומדת לענן להגדלת פייתון, שיתוף פעולה, פריסת משרות ועוד.

מְקוֹרִי. פורסם מחדש באישור.

מידע נוסף:

מקור: https://www.kdnuggets.com/2021/09/write-functions-use-dask.html

בול זמן:

עוד מ KDnuggets