Xử lý song song tệp lớn bằng Python

Xử lý song song tệp lớn bằng Python

Nút nguồn: 1970104

Xử lý song song tệp lớn bằng Python
Hình ảnh của Tác giả
 

Để xử lý song song, chúng tôi chia nhiệm vụ của mình thành các đơn vị con. Nó làm tăng số lượng công việc được chương trình xử lý và giảm thời gian xử lý tổng thể. 

Ví dụ: nếu bạn đang làm việc với một tệp CSV lớn và bạn muốn sửa đổi một cột. Chúng tôi sẽ cung cấp dữ liệu dưới dạng một mảng cho hàm và nó sẽ xử lý song song nhiều giá trị cùng một lúc dựa trên số lượng có sẵn  công nhân. Những công nhân này dựa trên số lượng lõi trong bộ xử lý của bạn. 
 

Lưu ý: sử dụng xử lý song song trên tập dữ liệu nhỏ hơn sẽ không cải thiện thời gian xử lý.

 

Trong blog này, chúng ta sẽ tìm hiểu cách giảm thời gian xử lý đối với các tệp lớn bằng cách sử dụng đa xử lý, việc làmtqdm Các gói Python. Đây là một hướng dẫn đơn giản có thể áp dụng cho bất kỳ tệp, cơ sở dữ liệu, hình ảnh, video và âm thanh nào. 
 

Lưu ý: chúng tôi đang sử dụng sổ tay Kaggle cho các thí nghiệm. Thời gian xử lý có thể khác nhau giữa các máy.  

 

Chúng tôi sẽ sử dụng US Accidents (2016 – 2021) tập dữ liệu từ Kaggle bao gồm 2.8 triệu bản ghi và 47 cột. 

Chúng tôi sẽ nhập khẩu multiprocessing, joblibtqdm cho tiến trình song song, pandas cho nhập dữ liệure, nltkstring cho xử lý văn bản

# Tính toán song song
nhập khẩu đa xử lý as mp
từ việc làm nhập khẩu Song song, trì hoãn
từ tqdm.notebook nhập khẩu tqdm # Nhập dữ liệu 
nhập khẩu gấu trúc as pd # Xử lý văn bản 
nhập khẩu re từ nltk.corpus nhập khẩu ngưng từ
nhập khẩu chuỗi

Before we jump right in, let’s set n_workers bằng cách nhân đôi cpu_count(). Như bạn có thể thấy, chúng tôi có 8 công nhân.

n_workers = 2 * mp.cpu_count() print(f"{n_workers} worker đang rảnh") >>> 8 công nhân có sẵn

Trong bước tiếp theo, chúng tôi sẽ nhập các tệp CSV lớn bằng cách sử dụng gấu trúc read_csv chức năng. Sau đó, in ra hình dạng của khung dữ liệu, tên của các cột và thời gian xử lý. 

Lưu ý: Chức năng ma thuật của Jupyter %%time có thể hiển thị thời gian CPUtường thời gian khi kết thúc quá trình. 

 

%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Shape:{df.shape}nnColumn Names:n{df.columns}n")

Đầu ra

Hình dạng:(2845342, 47) Tên cột: Chỉ mục(['ID', 'Mức độ nghiêm trọng', 'Thời gian bắt đầu', 'Thời gian kết thúc', 'Lat_bắt đầu', 'Bắt ​​đầu_Lng', 'Lat_End', 'End_Lng', 'Khoảng cách(mi) ', 'Mô tả', 'Số', 'Đường', 'Phía', 'Thành phố', 'Hạt', 'Tiểu bang', 'Mã zip', 'Quốc gia', 'Múi giờ', 'Mã sân bay', 'Dấu_thời tiết', 'Nhiệt độ(F)', 'Wind_Chill(F)', 'Độ ẩm(%)', 'Áp suất(in)', 'Tầm nhìn(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 'Lượng mưa(in )', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Bùng binh', 'Trạm', 'Dừng', 'Traffic_Calming' , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') Thời gian CPU: người dùng 33.9 giây, sys: 3.93 giây, tổng cộng: 37.9 giây Thời gian treo tường : 46.9 giây

Sản phẩm clean_text là một chức năng đơn giản để xử lý và làm sạch văn bản. Chúng tôi sẽ nhận được tiếng Anh ngưng từ sử dụng nltk.copus việc sử dụng nó để lọc ra các từ dừng từ dòng văn bản. Sau đó, chúng tôi sẽ loại bỏ các ký tự đặc biệt và khoảng trắng thừa khỏi câu. Nó sẽ là chức năng cơ sở để xác định thời gian xử lý cho nối tiếp, song song, tương đônghàng loạt Chế biến. 

def văn bản sạch(chữ): # Xóa từ dừng điểm dừng = stopwords.words("tiếng Anh") văn bản = " ".join([từ cho lời in văn bản.split() if lời không in dừng lại]) # Xóa ký tự đặc biệt văn bản = text.translate(str.maketrans('', '', string.punctuation)) # loại bỏ các không gian thừa văn bản = re.sub(' +',' ', văn bản) trở lại văn bản

Để xử lý nối tiếp, chúng ta có thể sử dụng pandas .apply() chức năng, nhưng nếu bạn muốn xem thanh tiến trình, bạn cần kích hoạt tqdm cho gấu trúc và sau đó sử dụng .progress_apply() chức năng. 

Chúng tôi sẽ xử lý 2.8 triệu bản ghi và lưu kết quả trở lại cột "Mô tả". 

%%time tqdm.pandas() df['Description'] = df['Description'].progress_apply(clean_text)

Đầu ra

Mất 9 phút 5 giây để cao cấp bộ xử lý để xử lý nối tiếp 2.8 triệu hàng. 

100% 2845342/2845342 [09:05<00:00, 5724.25it/s] Thời gian CPU: người dùng 8 phút 14 giây, sys: 53.6 giây, tổng cộng: 9 phút 7 giây Thời gian tường: 9 phút 5 giây

Có nhiều cách khác nhau để xử lý tệp song song và chúng ta sẽ tìm hiểu về tất cả các cách đó. Các multiprocessing là gói python tích hợp thường được sử dụng để xử lý song song các tệp lớn. 

Chúng tôi sẽ tạo ra một đa xử lý Hồ Bơi với công nhân 8 và sử dụng bản đồ chức năng để bắt đầu quá trình. Để hiển thị các thanh tiến trình, chúng tôi đang sử dụng tqdm.

Chức năng bản đồ bao gồm hai phần. Cái đầu tiên yêu cầu hàm và cái thứ hai yêu cầu một đối số hoặc danh sách các đối số. 

Tìm hiểu thêm bằng cách đọc tài liệu hướng dẫn

%%time p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))

Đầu ra

Chúng tôi đã cải thiện thời gian xử lý gần như 3X. Thời gian xử lý giảm từ Số phút 9 5 giây đến Số phút 3 51 giây.   

100% 2845342/2845342 [02:58<00:00, 135646.12it/s] Thời gian CPU: người dùng 5.68 giây, sys: 1.56 giây, tổng cộng: 7.23 giây Thời gian tường: 3 phút 51 giây

Bây giờ chúng ta sẽ tìm hiểu về một gói Python khác để thực hiện xử lý song song. Trong phần này, chúng ta sẽ sử dụng joblib's Song song bị trì hoãn để tái tạo bản đồ chức năng. 

  • Parallel yêu cầu hai đối số: n_jobs = 8 và backend = multiprocessing.
  • Sau đó, chúng tôi sẽ thêm văn bản sạch  đến bị trì hoãn chức năng. 
  • Tạo một vòng lặp để cung cấp một giá trị tại một thời điểm. 

Quy trình bên dưới khá chung chung và bạn có thể sửa đổi hàm và mảng theo nhu cầu của mình. Tôi đã sử dụng nó để xử lý hàng nghìn tệp âm thanh và video mà không gặp sự cố nào. 

Đề nghị: thêm xử lý ngoại lệ bằng cách sử dụng try:except:

def text_parallel_clean(mảng): kết quả = Parallel(n_jobs=n_workers,backend="multiprocessing")( bị trì hoãn(clean_text) (văn bản) cho văn bản in tqdm(mảng) ) trở lại kết quả

Thêm cột “Mô tả” vào text_parallel_clean()

%%time df['Description'] = text_parallel_clean(df['Description'])

Đầu ra

Chức năng của chúng tôi mất nhiều hơn 13 giây so với đa xử lý Hồ bơi. Ngay cả sau đó, Song song nhanh hơn 4 phút 59 giây so với nối tiếp Chế biến. 

100% 2845342/2845342 [04:03<00:00, 10514.98it/s] Thời gian CPU: người dùng 44.2 giây, sys: 2.92 giây, tổng cộng: 47.1 giây Thời gian tường: 4 phút 4 giây

Có một cách tốt hơn để xử lý các tệp lớn bằng cách chia nhỏ chúng thành các lô và xử lý chúng song song. Hãy bắt đầu bằng cách tạo một hàm hàng loạt sẽ chạy một clean_function trên một loạt các giá trị. 

Chức năng xử lý hàng loạt

def proc_batch(lô hàng): trở lại [ clean_text(văn bản) cho văn bản in lô hàng ]

Chia nhỏ tệp thành các đợt

Hàm dưới đây sẽ chia tệp thành nhiều lô dựa trên số lượng công nhân. Trong trường hợp của chúng tôi, chúng tôi nhận được 8 lô. 

def batch_file(mảng,n_workers): file_len = len(mảng) batch_size = round(file_len / n_workers) đợt = [ mảng[ix:ix+batch_size] cho ix in tqdm(phạm vi(0, file_len, batch_size))] trở lại lô hàng loạt = batch_file(df['Description'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

Chạy xử lý hàng loạt song song

Cuối cùng, chúng ta sẽ sử dụng Song song bị trì hoãn để xử lý lô. 

Lưu ý: Để có được một mảng giá trị duy nhất, chúng ta phải chạy hiểu danh sách như hình bên dưới. 

 

%%time batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")( bị trì hoãn(proc_batch) (lô) cho hàng loạt in tqdm(lô) ) df['Mô tả'] = [j cho i in batch_output cho j in i]

Đầu ra

Chúng tôi đã cải thiện thời gian xử lý. Kỹ thuật này nổi tiếng để xử lý dữ liệu phức tạp và đào tạo các mô hình học sâu. 

100% 8/8 [00:00<00:00, 2.19it/s] Thời gian CPU: người dùng 3.39 giây, sys: 1.42 giây, tổng cộng: 4.81 giây Thời gian tường: 3 phút 56 giây

tqdm đưa đa xử lý lên cấp độ tiếp theo. Nó đơn giản và mạnh mẽ. Tôi sẽ giới thiệu nó cho mọi nhà khoa học dữ liệu. 

Kiểm tra các tài liệu hướng dẫn để tìm hiểu thêm về đa xử lý. 

Sản phẩm process_map đòi hỏi:

  1. Tên chức năng
  2. Cột khung dữ liệu
  3. max_worker
  4. chucksize tương tự như kích thước lô. Chúng tôi sẽ tính toán kích thước lô bằng cách sử dụng số lượng công nhân hoặc bạn có thể thêm số lượng tùy theo sở thích của mình. 
%%thời gian
từ tqdm.contrib.concurrent nhập khẩu lô process_map = round(len(df)/n_workers) df["Description"] = process_map( clean_text, df["Description"], max_workers=n_workers, chunksize=batch )

Đầu ra

Với một dòng mã, chúng tôi nhận được kết quả tốt nhất. 

100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] Thời gian CPU: người dùng 7.32 giây, sys: 1.97 giây, tổng cộng: 9.29 giây Thời gian tường: 3 phút 51 giây

Bạn cần cân bằng và chọn kỹ thuật phù hợp nhất với trường hợp của mình. Nó có thể là xử lý nối tiếp, song song hoặc xử lý hàng loạt. Quá trình xử lý song song có thể phản tác dụng nếu bạn đang làm việc với tập dữ liệu nhỏ hơn, ít phức tạp hơn. 

Trong hướng dẫn nhỏ này, chúng ta đã tìm hiểu về các gói và kỹ thuật Python khác nhau cho phép chúng ta xử lý song song các hàm dữ liệu của mình. 

Nếu bạn chỉ làm việc với tập dữ liệu dạng bảng và muốn cải thiện hiệu suất xử lý của mình, thì tôi khuyên bạn nên thử bảng điều khiển, bảng dữ liệuNHANH CHÓNG 

Tài liệu tham khảo 

 
 
Abid Ali Awan (@ 1abidaliawan) là một nhà khoa học dữ liệu chuyên nghiệp được chứng nhận, người yêu thích việc xây dựng các mô hình học máy. Hiện tại, anh đang tập trung sáng tạo nội dung và viết blog kỹ thuật về công nghệ máy học và khoa học dữ liệu. Abid có bằng Thạc sĩ về Quản lý Công nghệ và bằng cử nhân về Kỹ thuật Viễn thông. Tầm nhìn của ông là xây dựng một sản phẩm AI bằng cách sử dụng mạng nơ-ron đồ thị cho những sinh viên đang chống chọi với bệnh tâm thần.
 

Dấu thời gian:

Thêm từ Xe đẩy