पायथन में समानांतर प्रसंस्करण बड़ी फ़ाइल

पायथन में समानांतर प्रसंस्करण बड़ी फ़ाइल

स्रोत नोड: 1970104

पायथन में समानांतर प्रसंस्करण बड़ी फ़ाइल
लेखक द्वारा छवि
 

समानांतर प्रसंस्करण के लिए, हम अपने कार्य को उप-इकाइयों में विभाजित करते हैं। यह कार्यक्रम द्वारा संसाधित नौकरियों की संख्या बढ़ाता है और समग्र प्रसंस्करण समय को कम करता है। 

उदाहरण के लिए, यदि आप एक बड़ी CSV फ़ाइल के साथ काम कर रहे हैं और आप एक कॉलम को संशोधित करना चाहते हैं। हम डेटा को फ़ंक्शन में एक सरणी के रूप में फ़ीड करेंगे, और यह उपलब्ध संख्या के आधार पर एक साथ कई मानों को समानांतर रूप से संसाधित करेगा  श्रमिकों. ये कार्यकर्ता आपके प्रोसेसर के भीतर कोर की संख्या पर आधारित हैं। 
 

नोट: छोटे डेटासेट पर समानांतर प्रसंस्करण का उपयोग करने से प्रसंस्करण समय में सुधार नहीं होगा।

 

इस ब्लॉग में, हम सीखेंगे कि बड़ी फ़ाइलों का उपयोग करके प्रसंस्करण समय को कैसे कम किया जाए बहु, जॉबलीब, तथा टीक्यूडीएम पायथन पैकेज. यह एक सरल ट्यूटोरियल है जो किसी भी फ़ाइल, डेटाबेस, छवि, वीडियो और ऑडियो पर लागू हो सकता है। 
 

नोट: हम प्रयोगों के लिए कागल नोटबुक का उपयोग कर रहे हैं। प्रसंस्करण का समय हर मशीन में अलग-अलग हो सकता है।  

 

हम इसका उपयोग करेंगे US Accidents (2016 – 2021) कागल का डेटासेट जिसमें 2.8 मिलियन रिकॉर्ड और 47 कॉलम हैं। 

हम आयात करेंगे multiprocessing, joblib, तथा tqdm एसटी समानांतर प्रसंस्करण, pandas एसटी डेटा अंतर्ग्रहण, तथा re, nltk, तथा string एसटी पाठ प्रसंस्करण

#समानांतर कंप्यूटिंग
आयात बहु as mp
से जॉबलीब आयात समानांतर, विलंबित
से tqdm.नोटबुक आयात टीक्यूडीएम # डेटा अंतर्ग्रहण 
आयात पांडा as pd # पाठ प्रसंस्करण 
आयात re से nltk.corpus आयात रोक
आयात स्ट्रिंग

Before we jump right in, let’s set n_workers दोगुना करके cpu_count(). जैसा कि आप देख सकते हैं, हमारे पास 8 कर्मचारी हैं।

n_workers = 2 * mp.cpu_count() प्रिंट(f"{n_workers} कार्यकर्ता उपलब्ध हैं") >>> 8 कर्मचारी उपलब्ध हैं

अगले चरण में, हम इसका उपयोग करके बड़ी CSV फ़ाइलें अंतर्ग्रहण करेंगे पांडा read_csv समारोह। फिर, डेटाफ़्रेम का आकार, कॉलम का नाम और प्रोसेसिंग समय प्रिंट करें। 

नोट: ज्यूपिटर का जादुई कार्य %%time प्रदर्शित कर सकते हैं सीपीयू समय और दीवार का समय प्रक्रिया के अंत में. 

 

%)

उत्पादन

आकार:(2845342, 47) कॉलम नाम: इंडेक्स(['आईडी', 'गंभीरता', 'स्टार्ट_टाइम', 'एंड_टाइम', 'स्टार्ट_लैट', 'स्टार्ट_एलएनजी', 'एंड_लैट', 'एंड_एलएनजी', 'डिस्टेंस(मील) ', 'विवरण', 'संख्या', 'सड़क', 'पक्ष', 'शहर', 'काउंटी', 'राज्य', 'ज़िपकोड', 'देश', 'समयक्षेत्र', 'एयरपोर्ट_कोड', 'वेदर_टाइमस्टैंप', 'तापमान(F)', 'हवा_ठंडक(F)', 'आर्द्रता(%)', 'दबाव(में)', 'दृश्यता(मील)', 'हवा की दिशा', 'हवा की गति(मील ​​प्रति घंटे)', 'वर्षा(में) )', 'मौसम_स्थिति', 'सुविधा', 'टक्कर', 'क्रॉसिंग', 'गिव_वे', 'जंक्शन', 'नो_एग्जिट', 'रेलवे', 'राउंडअबाउट', 'स्टेशन', 'स्टॉप', 'ट्रैफिक_कैलमिंग' , 'ट्रैफिक_सिग्नल', 'टर्निंग_लूप', 'सनराइज_सनसेट', 'सिविल_ट्वाइलाइट', 'नॉटिकल_ट्वाइलाइट', 'एस्ट्रोनॉमिकल_ट्वाइलाइट'], डीटाइप='ऑब्जेक्ट') सीपीयू समय: उपयोगकर्ता 33.9 सेकेंड, सिस्टम: 3.93 सेकेंड, कुल: 37.9 सेकेंड वॉल टाइम : 46.9 से

RSI clean_text पाठ को संसाधित करने और साफ़ करने का एक सीधा कार्य है। हमें अंग्रेजी मिलेगी रोक का उपयोग nltk.copus इसका उपयोग टेक्स्ट लाइन से स्टॉप शब्दों को फ़िल्टर करने के लिए करें। उसके बाद, हम वाक्य से विशेष वर्ण और अतिरिक्त रिक्त स्थान हटा देंगे। प्रसंस्करण समय निर्धारित करने के लिए यह आधारभूत कार्य होगा धारावाहिक, समानांतर, तथा बैच प्रसंस्करण। 

डीईएफ़ क्लीन_टेक्स्ट(मूलपाठ): # विराम शब्द हटाएँ स्टॉप्स = स्टॉपवर्ड्स.वर्ड्स("अंग्रेजी") टेक्स्ट = " ".जॉइन([वर्ड एसटी   शब्द in पाठ.विभाजन() if शब्द नहीं in रुकता है]) # विशेष वर्ण हटाएँ टेक्स्ट = text.translate(str.maketrans('', '', string.punctuation)) #अतिरिक्त रिक्त स्थान हटाना टेक्स्ट = पुनः.उप('+','', टेक्स्ट) वापसी टेक्स्ट

क्रमिक प्रसंस्करण के लिए, हम पांडा का उपयोग कर सकते हैं .apply() फ़ंक्शन, लेकिन यदि आप प्रगति बार देखना चाहते हैं, तो आपको सक्रिय करना होगा टीक्यूडीएम एसटी पांडा और फिर उपयोग करें .progress_apply() समारोह. 

हम 2.8 मिलियन रिकॉर्ड संसाधित करने जा रहे हैं और परिणाम को वापस "विवरण" कॉलम कॉलम में सहेजेंगे। 

%%समय tqdm.pandas() df['विवरण'] = df['विवरण'].progress_apply(clean_text)

उत्पादन

इसमें 9 मिनट 5 सेकंड का समय लगा उच्च अंत प्रोसेसर से सीरियल प्रोसेस 2.8 मिलियन पंक्तियाँ। 

100% 2845342/2845342 [09:05<00:00, 5724.25आईटी/एस] सीपीयू समय: उपयोगकर्ता 8 मिनट 14 सेकंड, सिस्टम: 53.6 सेकंड, कुल: 9 मिनट 7 सेकंड दीवार का समय: 9 मिनट 5 सेकंड

फ़ाइल को समानांतर रूप से संसाधित करने के कई तरीके हैं, और हम उन सभी के बारे में जानने जा रहे हैं। multiprocessing एक अंतर्निर्मित पायथन पैकेज है जिसका उपयोग आमतौर पर बड़ी फ़ाइलों के समानांतर प्रसंस्करण के लिए किया जाता है। 

हम एक मल्टीप्रोसेसिंग बनाएंगे पूल साथ में 8 कार्यकर्ताओं और का उपयोग करें नक्शा प्रक्रिया आरंभ करने के लिए कार्य करें. प्रगति पट्टियों को प्रदर्शित करने के लिए, हम इसका उपयोग कर रहे हैं टीक्यूडीएम.

मानचित्र फ़ंक्शन में दो अनुभाग होते हैं. पहले को फ़ंक्शन की आवश्यकता होती है, और दूसरे को तर्क या तर्कों की सूची की आवश्यकता होती है। 

पढ़कर और जानें दस्तावेज़ीकरण

%%समय पी = एमपी.पूल(एन_वर्कर्स) डीएफ['विवरण'] = पी.मैप(क्लीन_टेक्स्ट,टीक्यूडीएम(डीएफ['विवरण']))

उत्पादन

हमने अपने प्रसंस्करण समय में लगभग सुधार किया है 3X. प्रसंस्करण समय कम हो गया 9 मिनट 5 सेकंड सेवा मेरे 3 मिनट 51 सेकंड.   

100% 2845342/2845342 [02:58<00:00, 135646.12आईटी/एस] सीपीयू समय: उपयोगकर्ता 5.68 सेकेंड, सिस्टम: 1.56 सेकेंड, कुल: 7.23 सेकेंड दीवार का समय: 3 मिनट 51 सेकेंड

अब हम समानांतर प्रोसेसिंग करने के लिए एक और पायथन पैकेज के बारे में सीखेंगे। इस अनुभाग में, हम जॉबलिब का उपयोग करेंगे समानांतर और विलंबित को दोहराने के लिए नक्शा समारोह. 

  • समानांतर के लिए दो तर्कों की आवश्यकता है: n_jobs = 8 और बैकएंड = मल्टीप्रोसेसिंग।
  • फिर, हम जोड़ देंगे क्लीन_टेक्स्ट  को विलंबित समारोह. 
  • एक समय में एक ही मान फीड करने के लिए एक लूप बनाएं। 

नीचे दी गई प्रक्रिया काफी सामान्य है, और आप अपनी आवश्यकताओं के अनुसार अपने फ़ंक्शन और ऐरे को संशोधित कर सकते हैं। मैंने इसका उपयोग हजारों ऑडियो और वीडियो फ़ाइलों को बिना किसी समस्या के संसाधित करने के लिए किया है। 

अनुशंसित: का उपयोग करके अपवाद हैंडलिंग जोड़ें try: और except:

डीईएफ़ text_parallel_clean(सरणी): परिणाम = समानांतर (n_jobs = n_workers, बैकएंड = "मल्टीप्रोसेसिंग") (विलंबित (clean_text) (पाठ) एसटी   टेक्स्ट in tqdm(सरणी) ) वापसी परिणाम

इसमें "विवरण" कॉलम जोड़ें text_parallel_clean()

%%समय df['विवरण'] = text_parallel_clean(df['विवरण'])

उत्पादन

मल्टीप्रोसेसिंग में हमारे कार्य को 13 सेकंड अधिक समय लगा पूल। लेकिन फिर भी, समानांतर से 4 मिनट 59 सेकंड तेज है धारावाहिक प्रसंस्करण। 

100% 2845342/2845342 [04:03<00:00, 10514.98आईटी/एस] सीपीयू समय: उपयोगकर्ता 44.2 सेकेंड, सिस्टम: 2.92 सेकेंड, कुल: 47.1 सेकेंड दीवार का समय: 4 मिनट 4 सेकेंड

बड़ी फ़ाइलों को बैचों में विभाजित करके और उन्हें समानांतर में संसाधित करके संसाधित करने का एक बेहतर तरीका है। आइए एक बैच फ़ंक्शन बनाकर शुरुआत करें जो चलेगा clean_function मूल्यों के एक ही बैच पर. 

बैच प्रोसेसिंग फ़ंक्शन

डीईएफ़ proc_batch(बैच): वापसी [clean_text(पाठ) एसटी   टेक्स्ट in बैच ]

फ़ाइल को बैचों में विभाजित करना

नीचे दिया गया फ़ंक्शन श्रमिकों की संख्या के आधार पर फ़ाइल को कई बैचों में विभाजित करेगा। हमारे मामले में, हमें 8 बैच मिलते हैं। 

डीईएफ़ बैच फ़ाइल(सरणी, n_श्रमिक): फ़ाइल_लेन = लेन (सरणी) बैच_आकार = गोल (फ़ाइल_लेन / n_श्रमिक) बैच = [ सरणी[ix:ix+बैच_आकार] एसटी   ix in tqdm(रेंज(0, फाइल_लेन, बैच_साइज)) ] वापसी बैच बैच = बैच_फाइल(डीएफ['विवरण'],एन_वर्कर्स) >>> 100% 8/8 [00:00<00:00, 280.01आईटी/एस]

समानांतर बैच प्रोसेसिंग चलाना

अंत में, हम उपयोग करेंगे समानांतर और विलंबित बैचों को संसाधित करने के लिए. 

नोट: मानों की एक एकल सरणी प्राप्त करने के लिए, हमें सूची समझ को चलाना होगा जैसा कि नीचे दिखाया गया है। 

 

%% समय बैच_आउटपुट = समानांतर (n_jobs = n_workers, बैकएंड = "मल्टीप्रोसेसिंग") (विलंबित (proc_batch) (बैच) एसटी   बैच in tqdm(बैच) ) df['विवरण'] = [j एसटी   i in बैच_आउटपुट एसटी   j in i]

उत्पादन

हमने प्रसंस्करण समय में सुधार किया है। यह तकनीक जटिल डेटा को संसाधित करने और गहन शिक्षण मॉडल के प्रशिक्षण के लिए प्रसिद्ध है। 

100% 8/8 [00:00<00:00, 2.19आईटी/एस] सीपीयू समय: उपयोगकर्ता 3.39 सेकेंड, सिस्टम: 1.42 सेकेंड, कुल: 4.81 सेकेंड दीवार का समय: 3 मिनट 56 सेकेंड

tqdm मल्टीप्रोसेसिंग को अगले स्तर पर ले जाता है। यह आसान और शक्तिशाली है। मैं प्रत्येक डेटा वैज्ञानिक को इसकी अनुशंसा करूंगा। 

चेक आउट दस्तावेज़ीकरण मल्टीप्रोसेसिंग के बारे में अधिक जानने के लिए। 

RSI process_map आवश्यकता है:

  1. कार्य का नाम
  2. डेटाफ़्रेम स्तंभ
  3. max_workers
  4. चक का आकार बैच के आकार के समान है। हम श्रमिकों की संख्या का उपयोग करके बैच आकार की गणना करेंगे या आप अपनी पसंद के आधार पर संख्या जोड़ सकते हैं। 
%%समय
से tqdm.contrib.concurrent आयात प्रोसेस_मैप बैच = राउंड(लेन(डीएफ)/एन_वर्कर्स) डीएफ["विवरण"] = प्रोसेस_मैप(क्लीन_टेक्स्ट, डीएफ["विवरण"], मैक्स_वर्कर्स=एन_वर्कर्स, चंकसाइज=बैच)

उत्पादन

कोड की एक पंक्ति से, हमें सर्वोत्तम परिणाम मिलता है। 

100% 2845342/2845342 [03:48<00:00, 1426320.93आईटी/एस] सीपीयू समय: उपयोगकर्ता 7.32 सेकेंड, सिस्टम: 1.97 सेकेंड, कुल: 9.29 सेकेंड दीवार का समय: 3 मिनट 51 सेकेंड

आपको एक संतुलन खोजने और उस तकनीक का चयन करने की ज़रूरत है जो आपके मामले के लिए सबसे अच्छा काम करती है। यह सीरियल प्रोसेसिंग, समानांतर या बैच प्रोसेसिंग हो सकता है। यदि आप छोटे, कम जटिल डेटासेट के साथ काम कर रहे हैं तो समानांतर प्रसंस्करण प्रतिकूल प्रभाव डाल सकता है। 

इस मिनी-ट्यूटोरियल में, हमने विभिन्न पायथन पैकेजों और तकनीकों के बारे में सीखा है जो हमें अपने डेटा कार्यों को समानांतर रूप से संसाधित करने की अनुमति देते हैं। 

यदि आप केवल सारणीबद्ध डेटासेट के साथ काम कर रहे हैं और अपने प्रसंस्करण प्रदर्शन में सुधार करना चाहते हैं, तो मैं आपको प्रयास करने का सुझाव दूंगा नकाब, डेटा तालिका, तथा उतार 

संदर्भ 

 
 
आबिद अली अवनी (@1अबिदलियावान) एक प्रमाणित डेटा वैज्ञानिक पेशेवर है जो मशीन लर्निंग मॉडल बनाना पसंद करता है। वर्तमान में, वह सामग्री निर्माण और मशीन लर्निंग और डेटा विज्ञान प्रौद्योगिकियों पर तकनीकी ब्लॉग लिखने पर ध्यान केंद्रित कर रहा है। आबिद के पास प्रौद्योगिकी प्रबंधन में मास्टर डिग्री और दूरसंचार इंजीनियरिंग में स्नातक की डिग्री है। उनका दृष्टिकोण मानसिक बीमारी से जूझ रहे छात्रों के लिए ग्राफ न्यूरल नेटवर्क का उपयोग करके एआई उत्पाद बनाना है।
 

समय टिकट:

से अधिक केडनगेट्स