پردازش موازی فایل بزرگ در پایتون

پردازش موازی فایل بزرگ در پایتون

گره منبع: 1970104

پردازش موازی فایل بزرگ در پایتون
تصویر توسط نویسنده
 

برای پردازش موازی، وظیفه خود را به واحدهای فرعی تقسیم می کنیم. تعداد کارهای پردازش شده توسط برنامه را افزایش می دهد و زمان کلی پردازش را کاهش می دهد. 

به عنوان مثال، اگر با یک فایل CSV بزرگ کار می کنید و می خواهید یک ستون را تغییر دهید. ما داده‌ها را به‌عنوان یک آرایه به تابع تغذیه می‌کنیم و چندین مقدار را همزمان بر اساس تعداد موجود پردازش می‌کند.  کارگران. این کارگران بر اساس تعداد هسته های پردازنده شما هستند. 
 

توجه داشته باشید: استفاده از پردازش موازی در یک مجموعه داده کوچکتر زمان پردازش را بهبود نمی بخشد.

 

در این وبلاگ یاد می گیریم که چگونه با استفاده از فایل های حجیم زمان پردازش را کاهش دهیم چند پردازش, joblibو tqdm بسته های پایتون این یک آموزش ساده است که می تواند برای هر فایل، پایگاه داده، تصویر، ویدئو و صدا اعمال شود. 
 

توجه داشته باشید: ما از نوت بوک Kaggle برای آزمایش ها استفاده می کنیم. زمان پردازش می تواند از ماشینی به ماشین دیگر متفاوت باشد.  

 

ما استفاده خواهیم کرد حوادث ایالات متحده (2016 - 2021) مجموعه داده از Kaggle که از 2.8 میلیون رکورد و 47 ستون تشکیل شده است. 

وارد خواهیم کرد multiprocessing, joblibو tqdm برای پردازش موازی, pandas برای مصرف داده هاو re, nltkو string برای پردازش متن

# محاسبات موازی
واردات چند پردازش as mp
از جانب joblib واردات موازی، با تاخیر
از جانب tqdm.notebook واردات tqdm # مصرف داده 
واردات پانداها as pd # پردازش متن 
واردات re از جانب nltk.corpus واردات کلمات توقف
واردات رشته

قبل از اینکه بپریم، بیایید راه بیفتیم n_workers با دو برابر شدن cpu_count(). همانطور که می بینید 8 کارگر داریم.

n_workers = 2 * mp.cpu_count() print(f"{n_workers} کارگر در دسترس هستند") «هفته | هفته» 8 کارگر موجود است

در مرحله بعد، فایل‌های CSV بزرگ را با استفاده از پانداها read_csv عملکرد. سپس شکل دیتافریم، نام ستون ها و زمان پردازش را چاپ کنید. 

توجه داشته باشید: عملکرد جادویی ژوپیتر %%time می توانید صفحه نمایش زمان های CPU و زمان دیوار در پایان فرآیند. 

 

٪٪زمان
file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
df = pd.read_csv(نام_فایل) print(f"شکل:{df.shape}nnنام ستون:n{df.columns}n")

تولید

شکل:(2845342، 47) نام ستون ها: Index(['ID'، 'Severity'، 'Start_Time'، 'End_Time'، 'Start_Lat'، 'Start_Lng'، 'End_Lat'، 'End_Lng'، 'Distance(mi) "، "توضیحات"، "شماره"، "خیابان"، "سمت"، "شهر"، "شهرستان"، "ایالت"، "کد پستی"، "کشور"، "منطقه زمانی"، "کد_فرودگاه"، "مهر_زمان_آب و هوا"، 'دما (F)'، 'Wind_Chill(F)'، 'رطوبت(%)'، 'Pressure(in)'، 'Visibility(mi)'، 'Wind_Direction'، 'Wind_Speed(mph)'، 'Recipitation(in )'، 'وضعیت_آب و هوا'، 'امکانات'، 'برآمدگی'، 'عبور'، 'مسیر دادن'، 'تقاطع'، 'بدون_خروج'، 'راه آهن'، 'دورگرد'، 'ایستگاه'، 'ایست'، 'ترافیک_آرام کننده' , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'],
dtype='object') زمان‌های CPU: کاربر 33.9 ثانیه، sys: 3.93 ثانیه، مجموع: 37.9 ثانیه
زمان دیوار: 46.9 ثانیه

La clean_text یک تابع ساده برای پردازش و تمیز کردن متن است. انگلیسی خواهیم گرفت کلمات توقف با استفاده از nltk.copus از آن برای فیلتر کردن کلمات توقف از خط متن استفاده کنید. پس از آن کاراکترهای خاص و فاصله های اضافی را از جمله حذف می کنیم. این تابع خط پایه برای تعیین زمان پردازش خواهد بود سریال, موازیو دسته در حال پردازش. 

دف متن پاک(متن): # کلمات توقف را حذف کنید stops = stopwords.words("english") text = " ".join([word برای کلمه in text.split() if کلمه نه in متوقف می شود]) # کاراکترهای خاص را حذف کنید text = text.translate(str.maketrans('', '', string.punctuation)) # حذف فضاهای اضافی text = re.sub(' +',' ', text) برگشت متن

برای پردازش سریال می توانیم از پانداها استفاده کنیم .apply() عملکرد، اما اگر می خواهید نوار پیشرفت را ببینید، باید آن را فعال کنید tqdm برای پانداها و سپس از .progress_apply() تابع. 

ما 2.8 میلیون رکورد را پردازش می کنیم و نتیجه را به ستون ستون "توضیحات" ذخیره می کنیم. 

٪٪زمان
tqdm.pandas() df['Description'] = df['Description'].progress_apply(clean_text)

تولید

9 دقیقه و 5 ثانیه طول کشید بالا پایان پردازشگر به سریال 2.8 میلیون ردیف. 

100% 2845342/2845342 [09:05<00:00، 5724.25it/s] زمان پردازنده: کاربر 8 دقیقه و 14 ثانیه، سیستم: 53.6 ثانیه، کل: 9 دقیقه و 7 ثانیه
زمان دیوار: 9 دقیقه و 5 ثانیه

روش های مختلفی برای پردازش موازی فایل وجود دارد و ما قصد داریم با همه آنها آشنا شویم. را multiprocessing یک بسته داخلی پایتون است که معمولا برای پردازش موازی فایل های بزرگ استفاده می شود. 

ما یک پردازش چندگانه ایجاد خواهیم کرد استخر با کارگران 8 و استفاده از نقشه عملکرد برای شروع فرآیند برای نمایش نوارهای پیشرفت، از آن استفاده می کنیم tqdm.

تابع نقشه از دو بخش تشکیل شده است. اولی به تابع نیاز دارد و دومی به آرگومان یا فهرستی از آرگومان ها نیاز دارد. 

با مطالعه بیشتر بیاموزید مستندات

٪٪زمان
p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))

تولید

ما زمان پردازش خود را تقریباً بهبود بخشیده ایم 3X. زمان پردازش کاهش یافت 9 دقیقه 5 ثانیه به 3 دقیقه 51 ثانیه.   

100% 2845342/2845342 [02:58<00:00, 135646.12it/s] زمان‌های CPU: کاربر 5.68 ثانیه، sys: 1.56 ثانیه، کل: 7.23 ثانیه
زمان دیوار: 3 دقیقه و 51 ثانیه

اکنون با یک بسته پایتون دیگر برای انجام پردازش موازی آشنا خواهیم شد. در این قسمت از joblib's استفاده خواهیم کرد موازی و به تاخیر افتاده برای تکرار نقشه تابع. 

  • Parallel به دو آرگومان نیاز دارد: n_jobs = 8 و backend = multiprocessing.
  • سپس، اضافه می کنیم متن پاک  به به تاخیر افتاده تابع. 
  • یک حلقه برای تغذیه یک مقدار در یک زمان ایجاد کنید. 

فرآیند زیر کاملاً عمومی است و شما می توانید تابع و آرایه خود را بر اساس نیاز خود تغییر دهید. من از آن برای پردازش هزاران فایل صوتی و تصویری بدون هیچ مشکلی استفاده کرده ام. 

توصیه می شود: اضافه کردن کنترل استثنا با استفاده از try: و except:

دف text_parallel_clean(آرایه): نتیجه = موازی(n_jobs=n_workers,backend="multiprocessing")( delayed(clean_text) (متن) برای متن in tqdm (آرایه)) برگشت نتیجه

ستون "Description" را به آن اضافه کنید text_parallel_clean()

٪٪زمان
df['Description'] = text_parallel_clean(df['Description'])

تولید

عملکرد ما 13 ثانیه بیشتر از چند پردازش طول کشید استخر. حتی پس از آن ، موازی 4 دقیقه و 59 ثانیه سریعتر از سریال در حال پردازش. 

100% 2845342/2845342 [04:03<00:00, 10514.98it/s] زمان‌های CPU: کاربر 44.2 ثانیه، sys: 2.92 ثانیه، کل: 47.1 ثانیه
زمان دیوار: 4 دقیقه و 4 ثانیه

راه بهتری برای پردازش فایل های بزرگ با تقسیم آنها به دسته و پردازش موازی آنها وجود دارد. بیایید با ایجاد یک تابع دسته ای شروع کنیم که a را اجرا می کند clean_function روی یک دسته از مقادیر 

تابع پردازش دسته ای

دف proc_batch(دسته ای): برگشت [متن_پاک (متن) برای متن in دسته ای ]

تقسیم فایل به دسته ای

تابع زیر فایل را بر اساس تعداد کارگران به چند دسته تقسیم می کند. در مورد ما، ما 8 دسته دریافت می کنیم. 

دف دسته_فایل(آرایه،n_workers): file_len = len (آرایه) batch_size = دور (file_len / n_workers) دسته = [ آرایه[ix:ix+batch_size] برای ix in tqdm (محدوده (0، file_len، batch_size)) ] برگشت دسته دسته = batch_file(df['Description'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

اجرای پردازش دسته ای موازی

در نهایت استفاده خواهیم کرد موازی و به تاخیر افتاده برای پردازش دسته ها 

توجه داشته باشید: برای به دست آوردن یک آرایه واحد از مقادیر، باید درک لیست را مطابق شکل زیر اجرا کنیم. 

 

٪٪زمان
batch_output = موازی(n_jobs=n_workers,backend="multiprocessing")( delayed(proc_batch) (batch) برای دسته in tqdm(دسته‌ها) df['توضیح'] = [j برای i in Batch_output برای j in i]

تولید

ما زمان پردازش را بهبود بخشیده ایم. این تکنیک برای پردازش داده های پیچیده و آموزش مدل های یادگیری عمیق مشهور است. 

100% 8/8 [00:00<00:00, 2.19it/s] زمان‌های CPU: کاربر 3.39 ثانیه، sys: 1.42 ثانیه، کل: 4.81 ثانیه
زمان دیوار: 3 دقیقه و 56 ثانیه

tqdm چند پردازش را به سطح بعدی می برد. ساده و قدرتمند است. من آن را به هر دانشمند داده توصیه می کنم. 

اتمام مستندات برای کسب اطلاعات بیشتر در مورد چند پردازش 

La process_map نیاز به:

  1. نام عملکرد
  2. ستون Dataframe
  3. max_workers
  4. chucksize شبیه اندازه دسته است. ما اندازه دسته را با استفاده از تعداد کارگران محاسبه می کنیم یا می توانید بر اساس ترجیح خود تعداد را اضافه کنید. 
٪٪زمان
از جانب tqdm.contrib.concurrent واردات process_map
batch = round(len(df)/n_workers) df["Description"] = process_map(clean_text, df["Description"], max_workers=n_workers, chunksize=batch
)

تولید

با یک خط کد، بهترین نتیجه را می گیریم. 

100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] زمان‌های CPU: کاربر 7.32 ثانیه، sys: 1.97 ثانیه، کل: 9.29 ثانیه
زمان دیوار: 3 دقیقه و 51 ثانیه

باید تعادلی پیدا کنید و تکنیکی را انتخاب کنید که برای مورد شما بهترین کار را دارد. این می تواند پردازش سریال، موازی یا پردازش دسته ای باشد. اگر با مجموعه داده کوچکتر و پیچیده تر کار می کنید، پردازش موازی می تواند نتیجه معکوس داشته باشد. 

در این آموزش کوچک، ما با بسته‌ها و تکنیک‌های مختلف پایتون آشنا شده‌ایم که به ما امکان پردازش موازی توابع داده‌هایمان را می‌دهد. 

اگر فقط با یک مجموعه داده جدولی کار می کنید و می خواهید عملکرد پردازش خود را بهبود ببخشید، پیشنهاد می کنم امتحان کنید داسک, جدول داده هاو سریع 

ارجاع 

 
 
عابد علی اعوان (@1abidaliawan) یک متخصص دانشمند داده معتبر است که عاشق ساخت مدل های یادگیری ماشینی است. در حال حاضر، او بر تولید محتوا و نوشتن وبلاگ های فنی در زمینه یادگیری ماشین و فناوری های علم داده تمرکز دارد. عابد دارای مدرک کارشناسی ارشد در رشته مدیریت فناوری و مدرک کارشناسی در رشته مهندسی مخابرات است. چشم انداز او ساخت یک محصول هوش مصنوعی با استفاده از یک شبکه عصبی نمودار برای دانش آموزانی است که با بیماری های روانی دست و پنجه نرم می کنند.
 

تمبر زمان:

بیشتر از kdnuggets