Python'da Büyük Dosyayı Paralel İşleme

Python'da Büyük Dosyayı Paralel İşleme

Kaynak Düğüm: 1970104

Python'da Büyük Dosyayı Paralel İşleme
Yazara göre resim
 

Paralel işleme için görevimizi alt birimlere ayırıyoruz. Program tarafından işlenen işlerin sayısını artırır ve genel işlem süresini azaltır. 

Örneğin, büyük bir CSV dosyasıyla çalışıyorsanız ve tek bir sütunu değiştirmek istiyorsanız. Verileri işleve bir dizi olarak besleyeceğiz ve mevcut sayıya bağlı olarak birden çok değeri aynı anda paralel olarak işleyecektir.  işçiler. Bu çalışanlar, işlemcinizdeki çekirdek sayısına bağlıdır. 
 

Not: daha küçük bir veri kümesinde paralel işleme kullanmak, işlem süresini iyileştirmez.

 

Bu blogda, kullanarak büyük dosyalar üzerinde işlem süresini nasıl azaltacağımızı öğreneceğiz. çoklu işlem, iş kütüphanesi, ve tqdm Python paketleri. Herhangi bir dosya, veritabanı, resim, video ve sese uygulanabilen basit bir öğreticidir. 
 

Not: deneyler için Kaggle defterini kullanıyoruz. İşlem süresi makineden makineye değişebilir.  

 

kullanacağız ABD Kazaları (2016 – 2021) 2.8 milyon kayıt ve 47 sütundan oluşan Kaggle'dan veri seti. 

İthal edeceğiz multiprocessing, joblib, ve tqdm için paralel işleme, pandas için veri girişleri, ve re, nltk, ve string için metin işleme

# Paralel Hesaplama
ithalat çoklu işlem as mp
itibaren iş kütüphanesi ithalat paralel, gecikmeli
itibaren tqdm.notebook ithalat tqdm # Veri Alma 
ithalat pandalar as pd # Metin İşleme 
ithalat re itibaren nltk.corpus ithalat engellenecek kelimeler
ithalat dizi

Hemen içeri atlamadan önce, hadi ayarlayalım n_workers ikiye katlayarak cpu_count(). Gördüğünüz gibi 8 işçimiz var.

n_workers = 2 * mp.cpu_count() print(f"{n_workers} işçi mevcut") >>> 8 işçi mevcuttur

Bir sonraki adımda, aşağıdakileri kullanarak büyük CSV dosyalarını alacağız. pandalar read_csv işlev. Ardından veri çerçevesinin şeklini, sütunların adını ve işlem süresini yazdırın. 

Not: Jüpyter'in sihirli işlevi %%time görüntüleyebilir CPU süreleri ve duvar zamanı sürecin sonunda. 

 

%%zaman
file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
df = pd.read_csv(dosya_adı) print(f"Şekil:{df.shape}nnSütun Adları:n{df.columns}n")

Çıktı

Şekil:(2845342, 47) Sütun Adları: Dizin(['ID', 'Serity', 'Start_Time', 'Bitiş_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi) ', 'Açıklama', 'Numara', 'Sokak', 'Yan', 'Şehir', 'İlçe', 'Eyalet', 'Posta kodu', 'Ülke', 'Saat Dilimi', 'Havaalanı_Kodu', 'Hava Durumu_Zaman Damgası', 'Sıcaklık(F)', 'Rüzgar_Soğukluk(F)', 'Nem(%)', 'Basınç(inç)', 'Görünürlük(mi)', 'Rüzgar_Yön', 'Rüzgar_Hızı(mph)', 'Yağış(inç) )', 'Hava Durumu', 'Tesis', 'Çarpışma', 'Geçiş', 'Yol Ver', 'Kavşak', 'Çıkış Yok', 'Demiryolu', 'Döner Kavşak', 'İstasyon', 'Durdur', 'Trafik_Sakinleştirici' , 'Trafik_Sinyali', 'Dönme_Döngüsü', 'Gündoğumu_Gün Batımı', 'Sivil_Alacakaranlık', 'Denizcilik_Alacakaranlık', 'Astronomik_Alacakaranlık'],
dtype='object') CPU süreleri: kullanıcı 33.9 sn, sistem: 3.93 sn, toplam: 37.9 sn
Duvar süresi: 46.9 sn

The clean_text metni işlemek ve temizlemek için basit bir işlevdir. ingilizce alacağız engellenecek kelimeler kullanma nltk.copus metin satırındaki durdurma sözcüklerini filtrelemek için kullanın. Bundan sonra, cümleden özel karakterleri ve fazladan boşlukları kaldıracağız. için işlem süresini belirlemek için temel işlev olacaktır. seri, paralel, ve yığın işleme. 

def temiz_metin(Metin): # Duran kelimeleri kaldır durur = stopwords.words("english") text = " ".join([word için sözcük in metin.split() if sözcük değil in durur]) # Özel Karakterleri Kaldır metin = text.translate(str.maketrans('', '', string.noktalama)) # fazladan boşlukları kaldırma metin = re.sub(' +',' ', metin) dönüş metin

Seri işleme için pandaları kullanabiliriz. .apply() ancak ilerleme çubuğunu görmek istiyorsanız, etkinleştirmeniz gerekir. tqdm için pandalar ve sonra .progress_apply() fonksiyonu. 

2.8 milyon kaydı işleyeceğiz ve sonucu tekrar “Açıklama” sütun sütununa kaydedeceğiz. 

%%zaman
tqdm.pandas() df['Açıklama'] = df['Açıklama'].progress_apply(clean_text)

Çıktı

9 dakika 5 saniye sürdü high-end 2.8 milyon satırı seri işlemek için işlemci. 

%100 2845342/2845342 [09:05<00:00, 5724.25it/s] CPU süreleri: kullanıcı 8 dk 14 sn, sistem: 53.6 sn, toplam: 9 dk 7 sn
Duvar süresi: 9 dakika 5 saniye

Dosyayı paralel olarak işlemenin çeşitli yolları vardır ve bunların hepsini öğreneceğiz. multiprocessing Büyük dosyaların paralel işlenmesi için yaygın olarak kullanılan yerleşik bir python paketidir. 

Çoklu işlem oluşturacağız havuz ile 8 işçiler ve kullan harita süreci başlatma işlevi görür. İlerleme çubuklarını görüntülemek için kullanıyoruz tqdm.

Harita işlevi iki bölümden oluşur. Birincisi işlevi gerektirir ve ikincisi bir argüman veya argüman listesi gerektirir. 

Okuyarak daha fazla bilgi edinin belgeleme

%%zaman
p = mp.Pool(n_workers) df['Açıklama'] = p.map(temiz_metin,tqdm(df['Açıklama']))

Çıktı

İşlem süremizi neredeyse 3X. İşlem süresi düştü 9 dakika 5 saniye için 3 dakika 51 saniye.   

%100 2845342/2845342 [02:58<00:00, 135646.12it/s] CPU süreleri: kullanıcı 5.68 sn, sistem: 1.56 sn, toplam: 7.23 sn
Duvar süresi: 3 dakika 51 saniye

Şimdi paralel işleme yapmak için başka bir Python paketini öğreneceğiz. Bu bölümde joblib'leri kullanacağız. Paralel ve gecikmeli çoğaltmak için harita fonksiyonu. 

  • Paralel iki bağımsız değişken gerektirir: n_jobs = 8 ve arka uç = çoklu işlem.
  • Sonra ekleyeceğiz temiz_metin  için gecikmeli fonksiyonu. 
  • Bir seferde tek bir değeri beslemek için bir döngü oluşturun. 

Aşağıdaki süreç oldukça geneldir ve işlevinizi ve dizinizi ihtiyaçlarınıza göre değiştirebilirsiniz. Binlerce ses ve video dosyasını sorunsuz bir şekilde işlemek için kullandım. 

Önerilen: kullanarak istisna işleme ekleyin try: ve except:

def text_parallel_clean(dizi): sonuç = Paralel(n_jobs=n_workers,backend='çoklu işlem")( gecikmeli(temiz_metin) (metin) için metin in tqdm(dizi)) dönüş sonuç

"Açıklama" sütununu şuraya ekleyin: text_parallel_clean()

%%zaman
df['Açıklama'] = text_parallel_clean(df['Açıklama'])

Çıktı

Fonksiyonumuzu çoklu işlemden 13 saniye daha fazla sürdü. Havuz. O zaman bile, Paralel göre 4 dakika 59 saniye daha hızlıdır. seri işleme. 

%100 2845342/2845342 [04:03<00:00, 10514.98it/s] CPU süreleri: kullanıcı 44.2 sn, sistem: 2.92 sn, toplam: 47.1 sn
Duvar süresi: 4 dakika 4 saniye

Büyük dosyaları toplu işlere bölerek ve paralel olarak işleyerek işlemenin daha iyi bir yolu vardır. Çalıştıracak bir toplu iş işlevi oluşturarak başlayalım. clean_function tek bir değer kümesinde. 

Toplu İşleme İşlevi

def proc_batch(grup): dönüş [ temiz_metin(metin) için metin in grup ]

Dosyayı Gruplara Bölme

Aşağıdaki işlev, dosyayı çalışan sayısına göre birden çok gruba böler. Bizim durumumuzda 8 parti alıyoruz. 

def toplu dosya(dizi,n_işçi): dosya_len = len(dizi) toplu iş_boyutu = yuvarlak(dosya_uzunluk / n_işçi) toplu iş = [ dizi[ix:ix+batch_size] için ix in tqdm(aralık(0, dosya_boyu, toplu_boyut)) ] dönüş toplu iş toplu iş = toplu_dosya(df['Açıklama'],n_işçi) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

Paralel Toplu İşlemi Çalıştırma

Son olarak, kullanacağız Paralel ve gecikmeli toplu işlemek için. 

Not: Tek bir değer dizisi elde etmek için aşağıda gösterildiği gibi liste kavramayı çalıştırmalıyız. 

 

%%zaman
Batch_output = Paralel(n_jobs=n_workers,backend='çoklu işlem")( gecikmeli(proc_batch) (toplu iş) için yığın in tqdm(toplu gruplar) ) df['Açıklama'] = [j için i in toplu çıktı için j in i]

Çıktı

İşlem süresini iyileştirdik. Bu teknik, karmaşık verilerin işlenmesi ve derin öğrenme modellerinin eğitimi ile ünlüdür. 

%100 8/8 [00:00<00:00, 2.19it/s] CPU süreleri: kullanıcı 3.39 sn, sistem: 1.42 sn, toplam: 4.81 sn
Duvar süresi: 3 dakika 56 saniye

tqdm, çoklu işlemeyi bir sonraki seviyeye taşır. Basit ve güçlüdür. Her veri bilimcisine tavsiye edeceğim. 

Check out belgeleme çoklu işlem hakkında daha fazla bilgi edinmek için. 

The process_map gerektirir:

  1. Fonksiyon adı
  2. Veri çerçevesi sütunu
  3. max_workers
  4. chucksize parti boyutuna benzer. Parti büyüklüğünü işçi sayısını kullanarak hesaplayacağız veya siz tercihinize göre sayıyı ekleyebilirsiniz. 
%%zaman
itibaren tqdm.contrib.concurrent ithalat İşlem haritası
toplu iş = round(len(df)/n_workers) df["Açıklama"] = proses_map( temiz_metin, df["Açıklama"], max_workers=n_workers, chunksize=batch
)

Çıktı

Tek satır kod ile en iyi sonucu alıyoruz. 

%100 2845342/2845342 [03:48<00:00, 1426320.93it/s] CPU süreleri: kullanıcı 7.32 sn, sistem: 1.97 sn, toplam: 9.29 sn
Duvar süresi: 3 dakika 51 saniye

Bir denge bulmanız ve durumunuz için en uygun tekniği seçmeniz gerekir. Seri işleme, paralel veya toplu işleme olabilir. Daha küçük, daha az karmaşık bir veri kümesiyle çalışıyorsanız paralel işleme geri tepebilir. 

Bu mini eğitimde, veri fonksiyonlarımızı paralel olarak işlememize izin veren çeşitli Python paketleri ve teknikleri hakkında bilgi edindik. 

Yalnızca tablo halinde bir veri kümesiyle çalışıyorsanız ve işleme performansınızı geliştirmek istiyorsanız, denemenizi öneririm. dask, veri tablosu, ve HIZLI 

Referans 

 
 
Abid Ali Avan (@1abidaliwan), makine öğrenimi modelleri oluşturmayı seven sertifikalı bir veri bilimcisi uzmanıdır. Şu anda, makine öğrenimi ve veri bilimi teknolojileri üzerine içerik oluşturmaya ve teknik bloglar yazmaya odaklanıyor. Abid, Teknoloji Yönetimi alanında yüksek lisans ve Telekomünikasyon Mühendisliği alanında lisans derecesine sahiptir. Vizyonu, akıl hastalığı ile mücadele eden öğrenciler için bir grafik sinir ağı kullanarak bir AI ürünü oluşturmaktır.
 

Zaman Damgası:

Den fazla KDNuggets