Pemrosesan Paralel File Besar dengan Python

Pemrosesan Paralel File Besar dengan Python

Node Sumber: 1970104

Pemrosesan Paralel File Besar dengan Python
Gambar oleh Penulis
 

Untuk pemrosesan paralel, kami membagi tugas kami menjadi sub-unit. Ini meningkatkan jumlah pekerjaan yang diproses oleh program dan mengurangi waktu pemrosesan secara keseluruhan. 

Misalnya, jika Anda bekerja dengan file CSV besar dan ingin mengubah satu kolom. Kami akan memberi makan data sebagai array ke fungsi, dan itu akan memproses beberapa nilai secara paralel sekaligus berdasarkan jumlah yang tersedia  pekerja. Pekerja ini didasarkan pada jumlah inti dalam prosesor Anda. 
 

Catatan: menggunakan pemrosesan paralel pada kumpulan data yang lebih kecil tidak akan meningkatkan waktu pemrosesan.

 

Di blog ini, kita akan belajar cara mengurangi waktu pemrosesan pada file besar menggunakan multiproses, pekerjaan, dan tqdm Paket Python. Ini adalah tutorial sederhana yang dapat diterapkan ke file, database, gambar, video, dan audio apa pun. 
 

Catatan: kami menggunakan buku catatan Kaggle untuk eksperimen. Waktu pemrosesan dapat bervariasi dari mesin ke mesin.  

 

Kami akan menggunakan Kecelakaan AS (2016 – 2021) dataset dari Kaggle yang terdiri dari 2.8 juta record dan 47 kolom. 

Kami akan mengimpor multiprocessing, joblib, dan tqdm untuk proses paralel, pandas untuk penyerapan data, dan re, nltk, dan string untuk pemrosesan teks

# Komputasi Paralel
mengimpor multiproses as mp
dari pekerjaan mengimpor Paralel, tertunda
dari tqdm.notebook mengimpor tqdm # Penyerapan Data 
mengimpor panda as pd # Pemrosesan Teks 
mengimpor re dari nltk.corpus mengimpor kata-kata penghenti
mengimpor tali

Sebelum kita langsung masuk, mari kita atur n_workers dengan menggandakan cpu_count(). Seperti yang Anda lihat, kami memiliki 8 pekerja.

n_workers = 2 * mp.cpu_count() print(f"{n_workers} pekerja tersedia") >>> 8 pekerja tersedia

Pada langkah selanjutnya, kami akan mencerna file CSV besar menggunakan panda read_csv fungsi. Kemudian, cetak bentuk kerangka data, nama kolom, dan waktu pemrosesan. 

Catatan: Fungsi ajaib Jupyter %%time dapat menampilkan waktu CPU dan waktu dinding di akhir proses. 

 

%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Bentuk:{df.shape}nnNama Kolom:n{df.columns}n")

Keluaran

Bentuk:(2845342, 47) Nama Kolom: Indeks(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi) ', 'Deskripsi', 'Nomor', 'Jalan', 'Samping', 'Kota', 'Kabupaten', 'Negara Bagian', 'Kode Pos', 'Negara', 'Zona Waktu', 'Kode_Bandara', 'Cap_Waktu_Cuaca', 'Suhu(F)', 'Angin_Dingin(F)', 'Kelembaban(%)', 'Tekanan(dalam)', 'Visibilitas(mi)', 'Arah_Angin', 'Kecepatan_Angin(mph)', 'Curah Hujan(dalam )', 'Kondisi_Cuaca', 'Kemudahan', 'Bump', 'Penyeberangan', 'Beri_Jalan', 'Persimpangan', 'Tanpa Pintu Keluar', 'Kereta Api', 'Bundaran', 'Stasiun', 'Berhenti', 'Tenang_Lalu Lintas' , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') Waktu CPU: pengguna 33.9 dtk, sistem: 3.93 dtk, total: 37.9 dtk Waktu dinding : 46.9 detik

Grafik clean_text adalah fungsi langsung untuk memproses dan membersihkan teks. Kami akan mendapatkan bahasa Inggris kata-kata penghenti menggunakan nltk.copus menggunakannya untuk menyaring kata-kata berhenti dari baris teks. Setelah itu, kami akan menghapus karakter khusus dan spasi tambahan dari kalimat. Ini akan menjadi fungsi dasar untuk menentukan waktu pemrosesan untuk serial, paralel, dan sekumpulan pengolahan. 

def teks_bersih(teks): # Hapus kata berhenti stop = stopwords.words("english") text = " ".join([word untuk kata in teks.split() if kata tidak in berhenti]) # Hapus Karakter Khusus teks = teks.translate(str.maketrans('', '', string.punctuation)) # menghapus spasi ekstra teks = re.sub(' +',' ', teks) kembali teks

Untuk pemrosesan serial, kita dapat menggunakan panda .apply() fungsi, tetapi jika Anda ingin melihat bilah kemajuan, Anda perlu mengaktifkan tqdm untuk panda dan kemudian gunakan .progress_apply() fungsi. 

Kami akan memproses 2.8 juta catatan dan menyimpan hasilnya kembali ke kolom "Deskripsi". 

%%waktu tqdm.pandas() df['Deskripsi'] = df['Deskripsi'].progress_apply(clean_text)

Keluaran

Butuh 9 menit dan 5 detik untuk high-end prosesor ke proses serial 2.8 juta baris. 

100% 2845342/2845342 [09:05<00:00, 5724.25it/s] Waktu CPU: pengguna 8 menit 14 detik, sistem: 53.6 detik, total: 9 menit 7 detik Waktu dinding: 9 menit 5 detik

Ada berbagai cara untuk memproses file secara paralel, dan kita akan mempelajari semuanya. Itu multiprocessing adalah paket python bawaan yang biasa digunakan untuk pemrosesan paralel file besar. 

Kami akan membuat multiprocessing Kolam dengan Pekerja 8 dan gunakan peta berfungsi untuk memulai proses. Untuk menampilkan bilah kemajuan, kami menggunakan tqdm.

Fungsi peta terdiri dari dua bagian. Yang pertama membutuhkan fungsi, dan yang kedua membutuhkan argumen atau daftar argumen. 

Pelajari lebih lanjut dengan membaca dokumentasi

%%waktu p = mp.Pool(n_workers) df['Deskripsi'] = p.map(clean_text,tqdm(df['Deskripsi']))

Keluaran

Kami telah meningkatkan waktu pemrosesan kami hampir 3X. Waktu pemrosesan turun dari 9 menit 5 detik untuk 3 menit 51 detik.   

100% 2845342/2845342 [02:58<00:00, 135646.12it/s] Waktu CPU: pengguna 5.68 dtk, sistem: 1.56 dtk, total: 7.23 dtk Waktu dinding: 3 menit 51 dtk

Sekarang kita akan belajar tentang paket Python lain untuk melakukan pemrosesan paralel. Di bagian ini, kita akan menggunakan joblib's Paralel dan terlambat untuk meniru peta fungsi. 

  • Paralel membutuhkan dua argumen: n_jobs = 8 dan backend = multiprocessing.
  • Kemudian, kami akan menambahkan teks_bersih  ke terlambat fungsi. 
  • Buat loop untuk memberi makan satu nilai pada satu waktu. 

Proses di bawah ini cukup umum, dan Anda dapat memodifikasi fungsi dan array Anda sesuai dengan kebutuhan Anda. Saya telah menggunakannya untuk memproses ribuan file audio dan video tanpa masalah. 

Rekomendasi: tambahkan penanganan pengecualian menggunakan try: dan except:

def teks_parallel_clean(array): hasil = Paralel(n_jobs=n_workers,backend="multiprocessing")( tertunda(clean_text) (teks) untuk teks in tqdm(array) ) kembali mengakibatkan

Tambahkan kolom “Deskripsi” ke text_parallel_clean()

%%waktu df['Deskripsi'] = text_parallel_clean(df['Deskripsi'])

Keluaran

Butuh fungsi kami 13 detik lebih banyak daripada multiprocessing Kolam. Bahkan kemudian, Paralel 4 menit 59 detik lebih cepat dari serial pengolahan. 

100% 2845342/2845342 [04:03<00:00, 10514.98it/s] Waktu CPU: pengguna 44.2 dtk, sistem: 2.92 dtk, total: 47.1 dtk Waktu dinding: 4 menit 4 dtk

Ada cara yang lebih baik untuk memproses file besar dengan membaginya menjadi beberapa kelompok dan memprosesnya secara paralel. Mari kita mulai dengan membuat fungsi batch yang akan berjalan clean_function pada satu kumpulan nilai. 

Fungsi Pemrosesan Batch

def proc_batch(kelompok): kembali [ teks_bersih(teks) untuk teks in kelompok ]

Memisahkan File menjadi Batch

Fungsi di bawah ini akan membagi file menjadi beberapa batch berdasarkan jumlah pekerja. Dalam kasus kami, kami mendapatkan 8 batch. 

def batch_file(array,n_workers): file_len = len(array) batch_size = bulat(file_len / n_workers) batches = [ array[ix:ix+batch_size] untuk ix in tqdm(rentang(0, file_len, ukuran_batch)) ] kembali batch batch = batch_file(df['Deskripsi'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

Menjalankan Pemrosesan Batch Paralel

Akhirnya, kami akan menggunakan Paralel dan terlambat untuk memproses batch. 

Catatan: Untuk mendapatkan satu array nilai, kita harus menjalankan pemahaman daftar seperti yang ditunjukkan di bawah ini. 

 

%%waktu batch_output = Paralel(n_jobs=n_workers,backend="multiprosesing")( tertunda(proc_batch) (batch) untuk sekumpulan in tqdm(batch) ) df['Deskripsi'] = [j untuk i in batch_output untuk j in i]

Keluaran

Kami telah meningkatkan waktu pemrosesan. Teknik ini terkenal untuk memproses data yang kompleks dan melatih model pembelajaran yang mendalam. 

100% 8/8 [00:00<00:00, 2.19it/s] Waktu CPU: pengguna 3.39 dtk, sistem: 1.42 dtk, total: 4.81 dtk Waktu dinding: 3 menit 56 dtk

tqdm membawa multiprocessing ke tingkat berikutnya. Hal ini sederhana dan kuat. Saya akan merekomendasikannya kepada setiap ilmuwan data. 

Check out dokumentasi untuk mempelajari lebih lanjut tentang multiprosesor. 

Grafik process_map membutuhkan:

  1. Nama fungsi
  2. Kolom kerangka data
  3. max_workers
  4. ukuran chuck mirip dengan ukuran batch. Kami akan menghitung ukuran batch menggunakan jumlah pekerja atau Anda dapat menambahkan nomor berdasarkan preferensi Anda. 
%%waktu
dari tqdm.contrib.bersamaan mengimpor process_map batch = bulat(len(df)/n_workers) df["Deskripsi"] = process_map( clean_text, df["Deskripsi"], max_workers=n_workers, chunksize=batch )

Keluaran

Dengan satu baris kode, kami mendapatkan hasil terbaik. 

100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] Waktu CPU: pengguna 7.32 dtk, sistem: 1.97 dtk, total: 9.29 dtk Waktu dinding: 3 menit 51 dtk

Anda perlu menemukan keseimbangan dan memilih teknik yang paling sesuai untuk kasus Anda. Ini bisa berupa pemrosesan serial, paralel, atau pemrosesan batch. Pemrosesan paralel dapat menjadi bumerang jika Anda bekerja dengan kumpulan data yang lebih kecil dan tidak terlalu rumit. 

Dalam tutorial mini ini, kita telah belajar tentang berbagai paket dan teknik Python yang memungkinkan kita untuk memproses fungsi data secara paralel. 

Jika Anda hanya bekerja dengan kumpulan data tabular dan ingin meningkatkan kinerja pemrosesan Anda, maka saya sarankan Anda mencoba Senja, tabel data, dan CEPAT 

Referensi 

 
 
Abi Ali Awan (@1abidaliawan) adalah ilmuwan data profesional bersertifikat yang suka membuat model pembelajaran mesin. Saat ini, ia berfokus pada pembuatan konten dan penulisan blog teknis tentang pembelajaran mesin dan teknologi ilmu data. Abid memiliki gelar Magister Manajemen Teknologi dan gelar Sarjana Teknik Telekomunikasi. Visinya adalah untuk membangun produk AI menggunakan jaringan saraf grafik untuk siswa yang berjuang dengan penyakit mental.
 

Stempel Waktu:

Lebih dari KDnugget