Procesare paralelă fișier mare în Python

Procesare paralelă fișier mare în Python

Nodul sursă: 1970104

Procesare paralelă fișier mare în Python
Imagine de autor
 

For parallel processing, we divide our task into sub-units. It increases the number of jobs processed by the program and reduces overall processing time. 

For example, if you are working with a large CSV file and you want to modify a single column. We will feed the data as an array to the function, and it will parallel process multiple values at once based on the number of available  muncitorii. These workers are based on the number of cores within your processor. 
 

Notă: using parallel processing on a smaller dataset will not improve processing time.

 

In this blog, we will learn how to reduce processing time on large files using multiprocesare, joblib, și tqdm Python packages. It is a simple tutorial that can apply to any file, database, image, video, and audio. 
 

Notă: we are using the Kaggle notebook for the experiments. The processing time can vary from machine to machine.  

 

Vom folosi US Accidents (2016 – 2021) dataset from Kaggle which consists of 2.8 million records and 47 columns. 

Vom importa multiprocessing, joblib, și tqdm pentru procesare paralelă, pandas pentru data ingestions, și re, nltk, și string pentru prelucrarea textului

# Parallel Computing
import multiprocesare as mp
din joblib import Parallel, delayed
din tqdm.notebook import tqdm # Data Ingestion 
import panda as pd # Text Processing 
import re din nltk.corpus import cuvinte de oprire
import şir

Before we jump right in, let’s set n_workers by doubling cpu_count(). As you can see, we have 8 workers.

n_workers = 2 * mp.cpu_count() print(f"{n_workers} workers are available") >>> 8 workers are available

In the next step, we will ingest large CSV files using the panda read_csv function. Then, print out the shape of the dataframe, the name of the columns, and the processing time. 

Notă: Jupyter’s magic function %%time poate afișa CPU times și wall time at the end of the process. 

 

%%time
file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
df = pd.read_csv(file_name) print(f"Shape:{df.shape}nnColumn Names:n{df.columns}n")

producție

Shape:(2845342, 47) Column Names: Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street', 'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone', 'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'],
dtype='object') CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s
Wall time: 46.9 s

clean_text is a straightforward function for processing and cleaning the text. We will get English cuvinte de oprire folosind nltk.copus the use it to filter out stop words from the text line. After that, we will remove special characters and extra spaces from the sentence. It will be the baseline function to determine processing time for de serie, paralel, și lot prelucrare. 

Def text_curat(text): # Remove stop words stops = stopwords.words("english") text = " ".join([word pentru cuvânt in text.split() if cuvânt nu in stops]) # Remove Special Characters text = text.translate(str.maketrans('', '', string.punctuation)) # removing the extra spaces text = re.sub(' +',' ', text) reveni a) Sport and Nutrition Awareness Day in Manasia
Around XNUMX people from the rural commune Manasia have participated in a sports and healthy nutrition oriented activity in one of the community’s sports ready yards. This activity was meant to gather, mainly, middle-aged people from a Romanian rural community and teach them about the benefits that sports have on both their mental and physical health and on how sporting activities can be used to bring people from a community closer together. Three trainers were made available for this event, so that the participants would get the best possible experience physically and so that they could have the best access possible to correct information and good sports/nutrition practices. 

b) Sports Awareness Day in Poiana Țapului
A group of young participants have taken part in sporting activities meant to teach them about sporting conduct, fairplay, and safe physical activities. The day culminated with a football match.


For serial processing, we can use the pandas .apply() function, but if you want to see the progress bar, you need to activate tqdm pentru panda și apoi utilizați .progress_apply() Funcția. 

We are going to process the 2.8 million records and save the result back to the “Description” column column. 

%%time
tqdm.pandas() df['Description'] = df['Description'].progress_apply(clean_text)

producție

It took 9 minutes and 5 seconds for the high-end processor to serial process 2.8 million rows. 

100% 2845342/2845342 [09:05<00:00, 5724.25it/s] CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7s
Wall time: 9min 5s

There are various ways to parallel process the file, and we are going to learn about all of them. The multiprocessing is a built-in python package that is commonly used for parallel processing large files. 

We will create a multiprocessing Piscină cu lucrătorilor 8 și utilizați Hartă function to initiate the process. To display progress bars, we are using tqdm.

The map function consists of two sections. The first requires the function, and the second requires an argument or list of arguments. 

Learn more by reading documentaţie

%%time
p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))

producție

We have improved our processing time by almost 3X. The processing time dropped from 9 minute 5 secunde la 3 minute 51 secunde.   

100% 2845342/2845342 [02:58<00:00, 135646.12it/s] CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s
Wall time: 3min 51s

We will now learn about another Python package to perform parallel processing. In this section, we will use joblib’s Paralel și întârziat to replicate the Hartă Funcția. 

  • The Parallel requires two arguments: n_jobs = 8 and backend = multiprocessing.
  • Then, we will add text_curat  la întârziat Funcția. 
  • Create a loop to feed a single value at a time. 

The process below is quite generic, and you can modify your function and array according to your needs. I have used it to process thousands of audio and video files without any issue. 

Recomandat: add exception handling using try: și except:

Def text_parallel_clean(array): result = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(clean_text) (text) pentru a) Sport and Nutrition Awareness Day in Manasia
Around XNUMX people from the rural commune Manasia have participated in a sports and healthy nutrition oriented activity in one of the community’s sports ready yards. This activity was meant to gather, mainly, middle-aged people from a Romanian rural community and teach them about the benefits that sports have on both their mental and physical health and on how sporting activities can be used to bring people from a community closer together. Three trainers were made available for this event, so that the participants would get the best possible experience physically and so that they could have the best access possible to correct information and good sports/nutrition practices. 

b) Sports Awareness Day in Poiana Țapului
A group of young participants have taken part in sporting activities meant to teach them about sporting conduct, fairplay, and safe physical activities. The day culminated with a football match.


 in tqdm(array) ) reveni rezultat

Add the “Description” column to text_parallel_clean()

%%time
df['Description'] = text_parallel_clean(df['Description'])

producție

It took our function 13 seconds more than multiprocessing the Bazin. Chiar si atunci, Paralel is 4 minutes and 59 seconds faster than de serie prelucrare. 

100% 2845342/2845342 [04:03<00:00, 10514.98it/s] CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s
Wall time: 4min 4s

There is a better way to process large files by splitting them into batches and processing them parallel. Let’s start by creating a batch function that will run a clean_function on a single batch of values. 

Batch Processing Function

Def proc_batch(batch): reveni [ clean_text(text) pentru a) Sport and Nutrition Awareness Day in Manasia
Around XNUMX people from the rural commune Manasia have participated in a sports and healthy nutrition oriented activity in one of the community’s sports ready yards. This activity was meant to gather, mainly, middle-aged people from a Romanian rural community and teach them about the benefits that sports have on both their mental and physical health and on how sporting activities can be used to bring people from a community closer together. Three trainers were made available for this event, so that the participants would get the best possible experience physically and so that they could have the best access possible to correct information and good sports/nutrition practices. 

b) Sports Awareness Day in Poiana Țapului
A group of young participants have taken part in sporting activities meant to teach them about sporting conduct, fairplay, and safe physical activities. The day culminated with a football match.


 in batch ]

Splitting the File into Batches

The function below will split the file into multiple batches based on the number of workers. In our case, we get 8 batches. 

Def batch_file(array,n_workers): file_len = len(array) batch_size = round(file_len / n_workers) batches = [ array[ix:ix+batch_size] pentru ix in tqdm(range(0, file_len, batch_size)) ] reveni batches batches = batch_file(df['Description'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

Running Parallel Batch Processing

Finally, we will use Paralel și întârziat to process batches. 

Notă: To get a single array of values, we have to run list comprehension as shown below. 

 

%%time
batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")( delayed(proc_batch) (batch) pentru lot in tqdm(batches) ) df['Description'] = [j pentru i in batch_output pentru j in i]

producție

We have improved the processing time. This technique is famous for processing complex data and training deep learning models. 

100% 8/8 [00:00<00:00, 2.19it/s] CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 s
Wall time: 3min 56s

tqdm takes multiprocessing to the next level. It is simple and powerful. I will recommend it to every data scientist. 

Check out documentaţie to learn more about multiprocessing. 

process_map necesită:

  1. Numele funcției
  2. Dataframe column
  3. max_workers
  4. chucksize is similar to batch size. We will calculate the batch size using the number of workers or you can add the number based on your preference. 
%%timp
din tqdm.contrib.concurrent import process_map
batch = round(len(df)/n_workers) df["Description"] = process_map( clean_text, df["Description"], max_workers=n_workers, chunksize=batch
)

producție

With a single line of code, we get the best result. 

100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 s
Wall time: 3min 51s

You need to find a balance and select the technique that works best for your case. It can be serial processing, parallel, or batch processing. The parallel processing can backfire if you are working with a smaller, less complex dataset. 

In this mini-tutorial, we have learned about various Python packages and techniques that allow us to parallel process our data functions. 

If you are only working with a tabular dataset and want to improve your processing performance, then I will suggest you try Bord, datatable, și PRAGURI 

Referinţă 

 
 
Abid Ali Awan (@ 1abidaliawan) este un profesionist certificat în domeniul științei datelor, căruia îi place să construiască modele de învățare automată. În prezent, se concentrează pe crearea de conținut și pe scrierea de bloguri tehnice despre învățarea automată și tehnologiile științei datelor. Abid deține o diplomă de master în managementul tehnologiei și o diplomă de licență în ingineria telecomunicațiilor. Viziunea lui este de a construi un produs AI folosind o rețea neuronală grafică pentru studenții care se luptă cu boli mintale.
 

Timestamp-ul:

Mai mult de la KDnuggets