اگر آپ فنکشن لکھ سکتے ہیں، تو آپ Dask استعمال کر سکتے ہیں۔

ماخذ نوڈ: 1094095

اگر آپ فنکشن لکھ سکتے ہیں، تو آپ Dask استعمال کر سکتے ہیں۔

یہ مضمون عملی طور پر Dask کے استعمال پر جاری سیریز کا دوسرا مضمون ہے۔ اس سیریز کا ہر مضمون ابتدائی افراد کے لیے کافی آسان ہوگا، لیکن حقیقی کام کے لیے مفید مشورے فراہم کرتا ہے۔ سیریز کا پہلا مضمون لوکل کلسٹر کے استعمال کے بارے میں ہے۔


By ہیوگو شی, Saturn Cloud کے بانی

میں بہت سے ڈیٹا سائنسدانوں کے ساتھ بات چیت کر رہا ہوں جنہوں نے سنا ہے۔ ڈسک، تقسیم شدہ کمپیوٹنگ کے لیے ازگر کا فریم ورک، لیکن معلوم نہیں کہاں سے شروع کرنا ہے۔ وہ جانتے ہیں کہ Dask شاید مشینوں کے ایک جھرمٹ میں متوازی طور پر چلانے کے ذریعے ان کے بہت سے ورک فلو کو تیز کر سکتا ہے، لیکن بالکل نیا طریقہ کار سیکھنے کا کام مشکل معلوم ہو سکتا ہے۔ میں آپ کو یہ بتانے کے لیے حاضر ہوں کہ آپ پورے فریم ورک کو سیکھے بغیر Dask سے قدر حاصل کرنا شروع کر سکتے ہیں۔ اگر آپ نوٹ بک سیلز کے مکمل ہونے کے انتظار میں وقت گزاریں، ایک اچھا موقع ہے کہ Dask آپ کا وقت بچا سکے۔ یہاں تک کہ اگر آپ صرف Python فنکشن لکھنا جانتے ہیں، تو آپ کچھ اور سیکھے بغیر اس کا فائدہ اٹھا سکتے ہیں! یہ بلاگ پوسٹ "سب کچھ سیکھے بغیر Dask کا استعمال کیسے کریں" ٹیوٹوریل ہے۔

Dask، ڈیٹا فریم، بیگ، arrays، شیڈولرز، کارکنان، گرافس، RAPIDS، اوہ نہیں!

 
 
Dask کے بارے میں بہت سارے پیچیدہ مواد موجود ہیں، جو بہت زیادہ ہو سکتے ہیں۔ اس کی وجہ یہ ہے کہ Dask کئی ٹھنڈی چیزیں کرنے کے لیے ورکر مشینوں کے کلسٹر کو استعمال کر سکتا ہے! لیکن فی الحال یہ سب بھول جاؤ۔ یہ مضمون سادہ تکنیکوں پر توجہ مرکوز کرتا ہے جو آپ کا وقت بچا سکتی ہیں، آپ کے کام کرنے کے طریقے کے بارے میں زیادہ تبدیلی کیے بغیر۔
 

لوپس اور فنکشنز کے لیے

 
 
تقریباً ہر ڈیٹا سائنسدان نے کچھ ایسا ہی کیا ہے، جہاں آپ کے پاس ڈیٹا فریم کا ایک سیٹ الگ الگ فائلوں میں محفوظ ہے اور آپ ان سب کو پڑھنے کے لیے ایک لوپ کا استعمال کرتے ہیں، کچھ منطق کریں، پھر ان کو یکجا کریں:

results = []
for file in files: defer = pd.read_csv(file) ## begin genius algorithm brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) results.append(magical_business_insight)

وقت گزرنے کے ساتھ، آپ کو مزید فائلیں، یا genius_algorithm زیادہ پیچیدہ ہو جاتا ہے اور چلانے میں زیادہ وقت لگتا ہے۔ اور آپ کا انتظار ختم ہو جاتا ہے۔ اور انتظار۔


 

مرحلہ 1 آپ کے کوڈ کو فنکشن میں سمیٹنا ہے۔ آپ اس چیز کو سمیٹنا چاہتے ہیں جو لوپ کے اندر جاتا ہے۔ اس سے یہ سمجھنا آسان ہو جاتا ہے کہ کوڈ کیا کر رہا ہے (جادو کے ذریعے کسی مفید چیز میں فائل کو تبدیل کرنا)۔ زیادہ اہم بات یہ ہے کہ اس کوڈ کو لوپس کے علاوہ دیگر طریقوں سے استعمال کرنا آسان بناتا ہے۔

def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = [] for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

مرحلہ 2 اسے Dask کے ساتھ متوازی کرنا ہے۔ اب، لوپ کے لیے استعمال کرنے کے بجائے، جہاں ہر تکرار پچھلے ایک کے بعد ہوتی ہے، Dask انہیں ایک کلسٹر پر متوازی طور پر چلائے گا۔ یہ ہمیں بہت تیزی سے نتائج دینا چاہئے، اور صرف ہے تین لائنیں لمبی فار لوپ کوڈ سے زیادہ!

from dask import delayed
from dask.distributed import Client # same function but with a Dask delayed decorator
@delayed
def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight) # new Dask code
c = Client()
results = c.compute(results, sync=True)

یہ کیسے کام کرتا ہے:

  • تاخیر سے ڈیکوریٹر آپ کے فنکشن کو بدل دیتا ہے۔ اب، جب آپ اسے کال کرتے ہیں، تو اس کا اندازہ نہیں ہوتا ہے۔ اس کے بجائے، آپ واپس حاصل کریں a delayed آبجیکٹ، جسے Dask بعد میں انجام دے سکتا ہے۔
  • Client().compute ان تمام تاخیر شدہ اشیاء کو Dask کلسٹر میں بھیجتا ہے، جہاں ان کا متوازی طور پر جائزہ لیا جاتا ہے! بس، تم جیتو!
  • فوری کرنا a Client خود بخود دفعات a LocalCluster. اس کا مطلب یہ ہے کہ Dask کے متوازی کارکن تمام عمل اسی مشین پر ہوتے ہیں جو Dask کو کال کرتی ہے۔ یہ ایک جامع مثال بناتا ہے۔ حقیقی کام کے لیے، میں تخلیق کرنے کی تجویز کرتا ہوں۔ ٹرمینل میں مقامی کلسٹرز.

عملی موضوعات

 
 
مذکورہ بالا رک جاتا ہے جہاں زیادہ تر Dask ٹیوٹوریل رکتے ہیں۔ میں نے اس نقطہ نظر کو اپنے کام کے ساتھ استعمال کیا ہے، اور متعدد گاہکوں کے ساتھ، اور کچھ عملی مسائل ہمیشہ پیدا ہوتے ہیں۔ یہ اگلی تجاویز آپ کو اوپر کی اس نصابی کتاب کی مثال سے عملی طور پر مزید مفید طریقوں تک جانے میں مدد کریں گی جس میں مسلسل سامنے آنے والے دو عنوانات کا احاطہ کیا جائے گا: بڑی اشیاء اور غلطی سے نمٹنے۔
 

بڑے آبجیکٹ

 
تقسیم شدہ کلسٹر پر فنکشنز کی گنتی کرنے کے لیے، جن اشیاء پر فنکشنز کو بلایا جاتا ہے انہیں کارکنوں کو بھیجنے کی ضرورت ہوتی ہے۔ یہ کارکردگی کے مسائل کا باعث بن سکتا ہے، کیونکہ ان کو آپ کے کمپیوٹر پر سیریلائز (اچار) کرنے اور نیٹ ورک پر بھیجنے کی ضرورت ہے۔ تصور کریں کہ آپ گیگا بائٹس ڈیٹا پر عمل کر رہے ہیں – آپ ہر بار جب کوئی فنکشن اس پر چلتا ہے تو آپ اسے منتقل نہیں کرنا چاہتے ہیں۔ اگر آپ غلطی سے بڑی اشیاء بھیجتے ہیں، تو آپ کو Dask سے اس طرح کا پیغام نظر آ سکتا ہے:

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

اسے ہونے سے روکنے کے دو طریقے ہیں: آپ کارکنوں کو چھوٹی اشیاء بھیج سکتے ہیں تاکہ بوجھ اتنا برا نہ ہو، یا آپ ہر چیز کو صرف ایک بار کارکن کو بھیجنے کی کوشش کر سکتے ہیں، تاکہ آپ کو ٹرانسفر کرنے کی ضرورت نہ پڑے۔ .
درست کریں 1: جب ممکن ہو چھوٹی اشیاء بھیجیں۔
یہ مثال اچھی ہے، کیونکہ ہم ڈیٹا فریم کے بجائے فائل پاتھ (چھوٹی تار) بھیج رہے ہیں۔

# good, do this
results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

ذیل میں کیا ہے نوٹ ایسا کرنے کے لئے. دونوں اس لیے کہ آپ لوپ میں CSV ریڈنگ (مہنگی اور سست) کر رہے ہوں گے، جو متوازی نہیں ہے، بلکہ اس لیے بھی کہ اب ہم ڈیٹا فریم بھیج رہے ہیں (جو بڑے ہو سکتے ہیں)۔

# bad, do not do this
results = []
for file in files: df = pd.read_csv(file) magical_business_insight = make_all_the_magics(df) results.append(magical_business_insight)

اکثر اوقات، کوڈ کو تبدیل کرنے کے لیے دوبارہ لکھا جا سکتا ہے جہاں ڈیٹا کا انتظام کیا جا رہا ہے—یا تو کلائنٹ پر یا کارکنوں پر۔ آپ کی صورتحال پر منحصر ہے، یہ سوچ کر وقت کی بہت زیادہ بچت ہو سکتی ہے کہ کون سے فنکشنز ان پٹ کے طور پر لیتے ہیں اور ڈیٹا کی منتقلی کو کیسے کم کیا جا سکتا ہے۔
درست کریں 2: اشیاء صرف ایک بار بھیجیں۔
اگر آپ کو کوئی بڑی چیز بھیجنی ہے تو اسے متعدد بار نہ بھیجیں۔ مثال کے طور پر، اگر مجھے شمار کرنے کے لیے ایک بڑا ماڈل آبجیکٹ بھیجنے کی ضرورت ہے، تو صرف پیرامیٹر کو شامل کرنے سے ماڈل کئی بار سیریلائز ہو جائے گا (ایک بار فی فائل)

# bad, do not do this
results = []
for file in files: # big model has to be sent to a worker each time the function is called magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

میں ڈسک کو ایک تاخیری چیز میں لپیٹ کر ایسا نہ کرنے کو کہہ سکتا ہوں۔

# good, do this
results = []
big_model = client.scatter(big_model) #send the model to the workers first for file in files: magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

ہینڈلنگ میں ناکامی۔

 
جیسے جیسے آپ کے کمپیوٹیشنل کام بڑھتے ہیں، اکثر اوقات آپ ناکامی کے ذریعے طاقت حاصل کرنا چاہیں گے۔ اس صورت میں، ہو سکتا ہے کہ میرے 5% CSVs کا ڈیٹا خراب ہو جسے میں سنبھال نہیں سکتا۔ میں 95% CSVs پر کامیابی سے کارروائی کرنا چاہوں گا، لیکن ناکامیوں پر نظر رکھیں تاکہ میں اپنے طریقوں کو ایڈجسٹ کر سکوں اور دوبارہ کوشش کر سکوں۔

یہ لوپ ایسا کرتا ہے۔

import traceback
from distributed.client import wait, FIRST_COMPLETED, ALL_COMPLETED queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))] while queue: result = wait(queue, return_when=FIRST_COMPLETED) for future in result.done: index = futures_to_index[future] if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result() else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc() queue = result.not_done print(results)

چونکہ یہ فنکشن پہلی نظر میں کافی پیچیدہ ہے، آئیے اسے توڑ دیتے ہیں۔

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

ہم فون کرتے ہیں compute on resultsلیکن چونکہ ہم گزر نہیں رہے ہیں۔ sync=True، ہمیں فوری طور پر فیوچر واپس مل جاتے ہیں، جو کہ حساب کی نمائندگی کرتے ہیں، جو ابھی مکمل نہیں ہوئے ہیں۔ ہم مستقبل سے ہی _n_th ان پٹ آرگیومینٹ تک ایک میپنگ بھی بناتے ہیں جس نے اسے بنایا۔ آخر میں، ہم ابھی کے لیے Nones سے بھرے نتائج کی فہرست تیار کرتے ہیں۔

while queue: result = wait(queue, return_when=FIRST_COMPLETED)

اس کے بعد، ہم نتائج کا انتظار کرتے ہیں، اور جیسے ہی وہ آتے ہیں ہم ان پر کارروائی کرتے ہیں۔ جب ہم مستقبل کا انتظار کرتے ہیں، تو وہ مستقبل میں الگ ہو جاتے ہیں جو کہ done، اور وہ جو ہیں not_done.

 if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result()

اگر مستقبل ہے۔ finished، پھر ہم پرنٹ کرتے ہیں کہ ہم کامیاب ہوئے، اور ہم نتیجہ ذخیرہ کرتے ہیں۔

 else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc()

بصورت دیگر، ہم استثناء کو ذخیرہ کرتے ہیں اور اسٹیک ٹریس پرنٹ کرتے ہیں۔

 queue = result.not_done

آخر میں، ہم نے ان فیوچرز کے لیے قطار لگائی جو ابھی تک مکمل نہیں ہوئے ہیں۔
 

نتیجہ

 
 
Dask یقینی طور پر آپ کا وقت بچا سکتا ہے۔ اگر آپ کوڈ کے چلنے کے انتظار میں وقت گزارتے ہیں، تو آپ کو اپنے کام کو متوازی بنانے کے لیے ان آسان تجاویز کا استعمال کرنا چاہیے۔ Dask کے ساتھ آپ بہت ساری جدید چیزیں بھی کر سکتے ہیں، لیکن یہ ایک اچھا نقطہ آغاز ہے۔

 
بیو: ہیوگو شی Saturn Cloud کے بانی ہیں، Python کو پیمانہ کرنے، تعاون کرنے، نوکریوں کی تعیناتی اور مزید بہت کچھ کرنے کے لیے کلاؤڈ ورک اسپیس۔

حقیقی. اجازت کے ساتھ دوبارہ پوسٹ کیا۔

متعلقہ:

ماخذ: https://www.kdnuggets.com/2021/09/write-functions-use-dask.html

ٹائم اسٹیمپ:

سے زیادہ KDnuggets