المعالجة المتوازية لملف كبير في بايثون

المعالجة المتوازية لملف كبير في بايثون

عقدة المصدر: 1970104

المعالجة المتوازية لملف كبير في بايثون
صورة المؤلف
 

للمعالجة المتوازية ، نقسم مهمتنا إلى وحدات فرعية. يزيد من عدد الوظائف التي يعالجها البرنامج ويقلل من وقت المعالجة الإجمالي. 

على سبيل المثال ، إذا كنت تعمل مع ملف CSV كبير وتريد تعديل عمود واحد. سنقوم بتغذية البيانات كمصفوفة للوظيفة ، وسوف تقوم بمعالجة قيم متعددة في وقت واحد على التوازي بناءً على العدد المتاح  العمال. يعتمد هؤلاء العمال على عدد النوى داخل معالجك. 
 

ملحوظة: لن يؤدي استخدام المعالجة المتوازية على مجموعة بيانات أصغر إلى تحسين وقت المعالجة.

 

في هذه المدونة ، سنتعلم كيفية تقليل وقت المعالجة باستخدام الملفات الكبيرة المعالجة المتعددة, الوظيفةو تقدم حزم بايثون. إنه برنامج تعليمي بسيط يمكن تطبيقه على أي ملف وقاعدة بيانات وصورة وفيديو وصوت. 
 

ملحوظة: نحن نستخدم دفتر Kaggle للتجارب. يمكن أن يختلف وقت المعالجة من آلة إلى أخرى.  

 

سنستخدم ملف حوادث الولايات المتحدة (2016 - 2021) مجموعة بيانات من Kaggle تتكون من 2.8 مليون سجل و 47 عمودًا. 

سوف نستورد multiprocessing, joblibو tqdm For المعالجة المتوازية, pandas For استيعاب البياناتو re, nltkو string For معالجة النصوص

# الحوسبة المتوازية
استيراد المعالجة المتعددة as mp
تبدأ من الوظيفة استيراد بالتوازي مع تأخير
تبدأ من tqdm.notebook استيراد تقدم # استيعاب البيانات 
استيراد الباندا as pd # معالجة النص 
استيراد re تبدأ من nltk.corpus استيراد كلمات التوقف
استيراد سلسلة

قبل أن ننتقل مباشرة ، دعنا نبدأ n_workers عن طريق المضاعفة cpu_count(). كما ترون ، لدينا 8 عمال.

n_workers = 2 * mp.cpu_count () طباعة (يتوفر عدد "{n_workers} من العمال") >>> يتوفر 8 عمال

في الخطوة التالية ، سنستوعب ملفات CSV الكبيرة باستخدام امتداد الباندا read_csv وظيفة. ثم اطبع شكل إطار البيانات واسم الأعمدة ووقت المعالجة. 

ملحوظة: وظيفة جوبيتر السحرية %%time يمكن عرض مرات وحدة المعالجة المركزية و ساعة الجدار في نهاية العملية. 

 

٪٪ time file_name = "../ input / us-Accidents / US_Accidents_Dec21_updated.csv" df = pd.read_csv (file_name) print (f "Shape: {df.shape} nnColumn Names: n {df.columns} n")

الناتج

الشكل: (2845342 ، 47) أسماء الأعمدة: الفهرس (['ID' ، 'الخطورة' ، 'Start_Time' ، 'End_Time' ، 'Start_Lat' ، 'Start_Lng' ، 'End_Lat' ، 'End_Lng' ، 'المسافة (ميل) '،' Description '،' Number '،' Street '،' Side '،' City '،' County '،' State '،' Zipcode '،' Country '،' Timezone '،' Airport_Code '،' Weather_Timestamp '، "درجة الحرارة (F)" ، "Wind_Chill (F)" ، "الرطوبة (٪)" ، "الضغط (في)" ، "الرؤية (ميل)" ، "اتجاه الرياح" ، "سرعة الرياح (ميل في الساعة)" ، "هطول الأمطار (في ) '،' Weather_Condition '،' Amenity '،' Bump '،' Crossing '،' Give_Way '،' Junction '،' No_Exit '،' Railway '،' Roundabout '،' Station '،' Stop '،' Traffic_Calming ' ، 'Traffic_Signal'، 'Turning_Loop'، 'Sunrise_Sunset'، 'Civil_Twilight'، 'Nautical_Twilight'، 'Astronomical_Twilight']، dtype = 'object') أوقات وحدة المعالجة المركزية: المستخدم 33.9 ثانية ، النظام: 3.93 ثانية ، الإجمالي: 37.9 ثانية : 46.9 ثانية

clean_text هي وظيفة مباشرة لمعالجة النص وتنظيفه. سوف نحصل على اللغة الإنجليزية كلمات التوقف استخدام nltk.copus استخدامه لتصفية كلمات التوقف من سطر النص. بعد ذلك ، سنزيل الأحرف الخاصة والمسافات الزائدة من الجملة. ستكون الوظيفة الأساسية لتحديد وقت المعالجة مسلسل, موازىو دفعة معالجة. 

صفر clean_text(نص): # إزالة كلمات التوقف توقف = stopwords.words ("english") text = "" .join ([word For كلمة in text.split () if كلمة ليس in توقف]) # إزالة الأحرف الخاصة text = text.translate (str.maketrans (''، ''، string.punctuation)) # إزالة المسافات الزائدة text = re.sub ('+'، '، text) عائد أعلى نص

للمعالجة التسلسلية ، يمكننا استخدام الباندا .apply() وظيفة ، ولكن إذا كنت تريد رؤية شريط التقدم ، فأنت بحاجة إلى التنشيط تقدم For الباندا ثم استخدم ملف .progress_apply() وظيفة. 

سنقوم بمعالجة 2.8 مليون سجل وحفظ النتيجة مرة أخرى في عمود "الوصف". 

٪٪ الوقت tqdm.pandas () df ['الوصف'] = df ['الوصف']. progress_apply (clean_text)

الناتج

استغرق الأمر 9 دقائق و 5 ثوانٍ لـ الراقية المعالج لمعالجة 2.8 مليون صف تسلسلي. 

100٪ 2845342/2845342 [09:05 <00:00، 5724.25it / s] أوقات وحدة المعالجة المركزية: المستخدم 8 دقائق و 14 ثانية ، النظام: 53.6 ثانية ، الإجمالي: 9 دقائق 7 ثوانٍ وقت الجدار: 9 دقائق 5 ثوان

هناك طرق مختلفة لمعالجة الملف بشكل متواز ، وسوف نتعرف عليها جميعًا. ال multiprocessing هي حزمة python مضمنة تُستخدم بشكل شائع للمعالجة المتوازية للملفات الكبيرة. 

سنقوم بإنشاء معالجة متعددة مسبح مع عمال 8 واستخدام رسم خريطة وظيفة لبدء العملية. لعرض أشرطة التقدم ، نحن نستخدم تقدم.

تتكون وظيفة الخريطة من قسمين. الأول يتطلب الوظيفة ، والثاني يتطلب وسيطة أو قائمة من الوسائط. 

تعلم المزيد من خلال القراءة توثيق

٪٪ الوقت p = mp.Pool (n_workers) df ['Description'] = p.map (clean_text، tqdm (df ['Description']))

الناتج

لقد قمنا بتحسين وقت المعالجة لدينا تقريبًا 3X. انخفض وقت المعالجة من دقائق 9 5 ثانية إلى دقائق 3 51 ثانية.   

100٪ 2845342/2845342 [02:58 <00:00، 135646.12it / s] أوقات وحدة المعالجة المركزية: 5.68 ثانية ، sys: 1.56 ثانية ، الإجمالي: 7.23 ثانية ، وقت الجدار: 3 دقائق و 51 ثانية

سنتعرف الآن على حزمة Python أخرى لإجراء معالجة متوازية. في هذا القسم ، سوف نستخدم كتابات العمل موازية و تأخر لتكرار رسم خريطة وظيفة. 

  • يتطلب التوازي وسيطتين: n_jobs = 8 والخلفية = معالجة متعددة.
  • ثم نضيف clean_text  إلى تأخر وظيفة. 
  • قم بإنشاء حلقة لتغذية قيمة واحدة في كل مرة. 

العملية أدناه عامة تمامًا ، ويمكنك تعديل وظيفتك والمصفوفة وفقًا لاحتياجاتك. لقد استخدمته لمعالجة الآلاف من ملفات الصوت والفيديو دون أي مشكلة. 

أوصى: إضافة معالجة الاستثناء باستخدام try: و except:

صفر text_parallel_clean(مجموعة): نتيجة = متوازية (n_jobs = n_workers ، backend = "multiprocessing") (تأخر (clean_text) (نص) For نص in tqdm (مجموعة)) عائد أعلى نتيجة

أضف عمود "الوصف" إلى text_parallel_clean()

٪٪ time df ['Description'] = text_parallel_clean (df ['الوصف'])

الناتج

لقد استغرقت الدالة 13 ثانية أكثر من المعالجة المتعددة لـ حوض السباحة. حتى ذلك الحين ، موازية 4 دقائق و 59 ثانية أسرع من مسلسل معالجة. 

100٪ 2845342/2845342 [04:03 <00:00، 10514.98it / s] أوقات وحدة المعالجة المركزية: 44.2 ثانية ، sys: 2.92 ثانية ، الإجمالي: 47.1 ثانية ، وقت الجدار: 4 دقائق و 4 ثانية

هناك طريقة أفضل لمعالجة الملفات الكبيرة عن طريق تقسيمها إلى مجموعات ومعالجتها بالتوازي. لنبدأ بإنشاء دالة دفعية تقوم بتشغيل ملف clean_function على دفعة واحدة من القيم. 

وظيفة معالجة الدُفعات

صفر proc_batch(حزمة): عائد أعلى [clean_text (نص) For نص in حزمة ]

تقسيم الملف إلى دفعات

ستقوم الوظيفة أدناه بتقسيم الملف إلى دفعات متعددة بناءً على عدد العمال. في حالتنا ، نحصل على 8 دفعات. 

صفر دفعة_ملف(المصفوفة ، n_workers): file_len = len (array) batch_size = round (file_len / n_workers) دفعات = [مجموعة [ix: ix + batch_size] For ix in tqdm (range (0، file_len، batch_size))] عائد أعلى دفعات دفعات = ملف_دفعات (df ['الوصف']، n_workers) >>> 100٪ 8/8 [00:00 <00:00، 280.01it / s]

تشغيل معالجة الدُفعات المتوازية

أخيرًا ، سوف نستخدم موازية و تأخر لمعالجة الدُفعات. 

ملحوظة: للحصول على مصفوفة واحدة من القيم ، يتعين علينا تشغيل قائمة الفهم كما هو موضح أدناه. 

 

٪٪ time batch_output = Parallel (n_jobs = n_workers، backend = "multiprocessing") (متأخر (proc_batch) (دفعة) For دفعة in tqdm (دفعات)) df ['الوصف'] = [j For i in دفعة_إخراج For j in i]

الناتج

لقد قمنا بتحسين وقت المعالجة. تشتهر هذه التقنية بمعالجة البيانات المعقدة وتدريب نماذج التعلم العميق. 

100٪ 8/8 [00:00 <00:00، 2.19it / s] أوقات وحدة المعالجة المركزية: 3.39 ثانية ، sys: 1.42 ثانية ، الإجمالي: 4.81 ثانية ، وقت الجدار: 3 دقائق و 56 ثانية

يأخذ tqdm المعالجة المتعددة إلى المستوى التالي. إنه بسيط وقوي. سأوصي به لكل عالم بيانات. 

افحص توثيق لمعرفة المزيد حول المعالجة المتعددة. 

process_map يتطلب:

  1. اسم وظيفة
  2. عمود Dataframe
  3. الحد الأقصى للعمال
  4. chucksize مشابه لحجم الدُفعة. سنحسب حجم الدفعة باستخدام عدد العمال أو يمكنك إضافة الرقم بناءً على تفضيلاتك. 
٪٪زمن
تبدأ من tqdm.contrib.concurrent استيراد process_map batch = round (len (df) / n_workers) df ["Description"] = process_map (clean_text، df ["الوصف"]، max_workers = n_workers، chunksize = دُفعة)

الناتج

مع سطر واحد من التعليمات البرمجية ، نحصل على أفضل نتيجة. 

100٪ 2845342/2845342 [03:48 <00:00، 1426320.93it / s] أوقات وحدة المعالجة المركزية: 7.32 ثانية ، sys: 1.97 ثانية ، الإجمالي: 9.29 ثانية ، وقت الجدار: 3 دقائق و 51 ثانية

تحتاج إلى إيجاد توازن واختيار التقنية التي تناسب حالتك بشكل أفضل. يمكن أن تكون معالجة تسلسلية أو متوازية أو معالجة مجمعة. يمكن أن تأتي المعالجة المتوازية بنتائج عكسية إذا كنت تعمل مع مجموعة بيانات أصغر وأقل تعقيدًا. 

في هذا البرنامج التعليمي المصغر ، تعرفنا على حزم وتقنيات Python المختلفة التي تسمح لنا بمعالجة وظائف البيانات الخاصة بنا بشكل متوازي. 

إذا كنت تعمل فقط مع مجموعة بيانات مجدولة وترغب في تحسين أداء المعالجة ، فسأقترح عليك المحاولة داسك, جدول البياناتو رابيدز 

الرقم المرجعي 

 
 
عابد علي عوان (@ 1abidaliawan) هو عالم بيانات متخصص محترف يحب بناء نماذج التعلم الآلي. يركز حاليًا على إنشاء المحتوى وكتابة مدونات تقنية حول تقنيات التعلم الآلي وعلوم البيانات. عابد حاصل على درجة الماجستير في إدارة التكنولوجيا ودرجة البكالوريوس في هندسة الاتصالات. تتمثل رؤيته في بناء منتج للذكاء الاصطناعي باستخدام شبكة عصبية بيانية للطلاب الذين يعانون من مرض عقلي.
 

الطابع الزمني:

اكثر من KD nuggets