Параллельная обработка большого файла в Python

Параллельная обработка большого файла в Python

Исходный узел: 1970104

Параллельная обработка большого файла в Python
Изображение по автору
 

Для параллельной обработки мы делим нашу задачу на подблоки. Это увеличивает количество заданий, обрабатываемых программой, и сокращает общее время обработки. 

Например, если вы работаете с большим CSV-файлом и хотите изменить один столбец. Мы будем передавать данные в виде массива в функцию, и она будет параллельно обрабатывать несколько значений одновременно в зависимости от количества доступных  рабочие. Эти рабочие основаны на количестве ядер в вашем процессоре. 
 

Примечание: использование параллельной обработки для меньшего набора данных не улучшит время обработки.

 

В этом блоге мы узнаем, как сократить время обработки больших файлов с помощью многопроцессорная обработка, Joblibи тквдм Пакеты Python. Это простое руководство, которое можно применить к любому файлу, базе данных, изображению, видео и аудио. 
 

Примечание: мы используем блокнот Kaggle для экспериментов. Время обработки может варьироваться от машины к машине.  

 

Мы будем использовать Аварии в США (2016 – 2021) набор данных от Kaggle, который состоит из 2.8 миллионов записей и 47 столбцов. 

Мы будем импортировать multiprocessing, joblibи tqdm для параллельная обработка, pandas для прием данныхи re, nltkи string для обработка текста

# Параллельные вычисления
Импортировать многопроцессорная обработка as mp
от Joblib Импортировать Параллельно, с задержкой
от tqdm.ноутбук Импортировать тквдм # Прием данных 
Импортировать панд as pd # Обработка текста 
Импортировать re от nltk.corpus Импортировать игнорируемые слова
Импортировать string

Прежде чем мы начнем, давайте установим n_workers удваивая cpu_count(). Как видите, у нас 8 рабочих.

n_workers = 2 * mp.cpu_count() print(f"доступно {n_workers} рабочих") >>> 8 рабочих доступны

На следующем шаге мы будем принимать большие CSV-файлы, используя панд read_csv функция. Затем распечатайте форму фрейма данных, имя столбца и время обработки. 

Примечание: Магическая функция Юпитера %%time может отображать процессорное время и настенное время в конце процесса. 

 

%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Shape:{df.shape}nnИмена столбцов:n{df.columns}n")

Результат

Форма: (2845342, 47) Имена столбцов: Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Расстояние (мили) ', 'Описание', 'Номер', 'Улица', 'Сторона', 'Город', 'Округ', 'Штат', 'Почтовый индекс', 'Страна', 'Часовой пояс', 'Airport_Code', 'Weather_Timestamp', «Температура (F)», «Ветер_Холод (F)», «Влажность (%)», «Давление (в)», «Видимость (мили)», «Направление ветра», «Скорость ветра (м/ч)», «Осадки (в )', 'Weather_Condition', 'Amenity', 'Hump', 'Перекрёсток', 'Give_Way', 'Junction', 'No_Exit', 'Железная дорога', 'Круговая развязка', 'Станция', 'Остановка', 'Traffic_Calming' , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') Время ЦП: пользователь 33.9 с, система: 3.93 с, всего: 37.9 с Время стены : 46.9 с

Ассоциация clean_text это простая функция для обработки и очистки текста. Мы получим английский игнорируемые слова через nltk.copus используйте его, чтобы отфильтровать стоп-слова из текстовой строки. После этого мы удалим из предложения специальные символы и лишние пробелы. Это будет базовая функция для определения времени обработки последовательный, параллельнои партия обработка. 

защиту чистый_текст(текст): # Удалить стоп-слова стопы = стоп-слова.слова("английский") текст = " ".join([слово для слово in текст.split() if слово не in останавливается]) # Удалить специальные символы текст = текст.перевод(str.maketrans('', '', строка.пунктуация)) # убираем лишние пробелы текст = re.sub(' +',' ', текст) возвращают текст

Для последовательной обработки мы можем использовать pandas .apply() функцию, но если вы хотите видеть индикатор выполнения, вам нужно активировать тквдм для панд а затем использовать .progress_apply() функции. 

Мы собираемся обработать 2.8 миллиона записей и сохранить результат обратно в столбец «Описание». 

%%time tqdm.pandas() df['Описание'] = df['Описание'].progress_apply(clean_text)

Результат

Потребовалось 9 минут и 5 секунд для высококачественный процессор для последовательной обработки 2.8 миллиона строк. 

100% 2845342/2845342 [09:05<00:00, 5724.25 ит/с] Время ЦП: пользователь 8 мин 14 с, система: 53.6 с, всего: 9 мин 7 с Время стены: 9 мин 5 с

Существуют различные способы параллельной обработки файла, и мы собираемся изучить их все. multiprocessing — это встроенный пакет Python, который обычно используется для параллельной обработки больших файлов. 

Мы создадим мультипроцессор Бассейн работники 8 и использовать карта функцию запуска процесса. Для отображения индикаторов выполнения мы используем тквдм.

Функция карты состоит из двух разделов. Для первого требуется функция, а для второго — аргумент или список аргументов. 

Узнайте больше, прочитав документации

%%time p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))

Результат

Мы сократили время обработки почти на 3X. Время обработки сократилось с 9 минуты 5 секунды в 3 минуты 51 секунды.   

100% 2845342/2845342 [02:58<00:00, 135646.12 ит/с] Процессорное время: пользователь 5.68 с, система: 1.56 с, всего: 7.23 с Время стены: 3 мин 51 с

Теперь мы узнаем о другом пакете Python для выполнения параллельной обработки. В этом разделе мы будем использовать joblib Параллельные и задерживается воспроизвести карта функции. 

  • Parallel требует два аргумента: n_jobs = 8 и backend = multiprocessing.
  • Затем мы добавим чистый_текст  до задерживается функции. 
  • Создайте цикл для подачи одного значения за раз. 

Приведенный ниже процесс довольно общий, и вы можете изменить свою функцию и массив в соответствии с вашими потребностями. Я использовал его для обработки тысяч аудио и видео файлов без каких-либо проблем. 

Рекомендуется: добавить обработку исключений с помощью try: и except:

защиту text_parallel_clean(массив): результат = Parallel(n_jobs=n_workers,backend="multiprocessing")(delayed(clean_text) (text) для текст in tqdm(массив) ) возвращают результат

Добавьте столбец «Описание» в text_parallel_clean()

%%time df['Описание'] = text_parallel_clean(df['Описание'])

Результат

Это заняло у нашей функции на 13 секунд больше, чем многопроцессорность Бассейн. Даже тогда, Параллельные на 4 минуты 59 секунд быстрее, чем последовательный обработка. 

100% 2845342/2845342 [04:03<00:00, 10514.98 ит/с] Процессорное время: пользователь 44.2 с, система: 2.92 с, всего: 47.1 с Время стены: 4 мин 4 с

Есть лучший способ обрабатывать большие файлы, разбивая их на пакеты и обрабатывая их параллельно. Начнем с создания пакетной функции, которая будет запускать clean_function на одном пакете значений. 

Функция пакетной обработки

защиту proc_batch(партия): возвращают [ чистый_текст (текст) для текст in партия ]

Разделение файла на пакеты

Приведенная ниже функция разделит файл на несколько пакетов в зависимости от количества рабочих процессов. В нашем случае мы получаем 8 партий. 

защиту пакетный_файл(массив,n_workers): file_len = len(массив) batch_size = round(file_len / n_workers) batches = [ array[ix:ix+batch_size] для ix in tqdm(диапазон(0, длина_файла, размер_пакета))] возвращают пакеты пакеты = пакетный_файл(df['Описание'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01 ит/с]

Запуск параллельной пакетной обработки

Наконец, мы будем использовать Параллельные и задерживается для обработки пакетов. 

Примечание: Чтобы получить единый массив значений, мы должны запустить понимание списка, как показано ниже. 

 

%%time batch_output = Parallel (n_jobs = n_workers, backend = "multiprocessing") (отложенный (proc_batch) (пакетный) для партия in tqdm(партии)) df['Описание'] = [j для i in пакетный_вывод для j in i]

Результат

Мы улучшили время обработки. Этот метод известен обработкой сложных данных и обучением моделей глубокого обучения. 

100% 8/8 [00:00<00:00, 2.19 ит/с] Процессорное время: пользователь 3.39 с, система: 1.42 с, всего: 4.81 с Время стены: 3 мин 56 с

tqdm выводит многопроцессорность на новый уровень. Это просто и мощно. Я рекомендую его каждому исследователю данных. 

Попробуйте документации чтобы узнать больше о многопроцессорной обработке. 

Ассоциация process_map требует:

  1. Имя функции
  2. Столбец фрейма данных
  3. max_workers
  4. размер патрона аналогичен размеру партии. Мы рассчитаем размер партии, используя количество рабочих, или вы можете добавить число в зависимости от ваших предпочтений. 
%%время
от tqdm.contrib.concurrent Импортировать process_map batch = round(len(df)/n_workers) df["Описание"] = process_map(clean_text, df["Описание"], max_workers=n_workers, chunksize=batch)

Результат

С помощью одной строки кода мы получаем лучший результат. 

100% 2845342/2845342 [03:48<00:00, 1426320.93 ит/с] Процессорное время: пользователь 7.32 с, система: 1.97 с, всего: 9.29 с Время стены: 3 мин 51 с

Вам нужно найти баланс и выбрать технику, которая лучше всего подходит для вашего случая. Это может быть последовательная обработка, параллельная или пакетная обработка. Параллельная обработка может иметь неприятные последствия, если вы работаете с меньшим и менее сложным набором данных. 

В этом мини-учебнике мы узнали о различных пакетах и ​​методах Python, которые позволяют нам параллельно обрабатывать наши функции данных. 

Если вы работаете только с табличным набором данных и хотите повысить производительность обработки, я предлагаю вам попробовать Даск, Таблица данныхи БЫСТРЫЕ 

Справка 

 
 
Абид Али Аван (@ 1abidaliawan) — сертифицированный специалист по анализу данных, который любит создавать модели машинного обучения. В настоящее время он занимается созданием контента и ведением технических блогов по технологиям машинного обучения и обработки данных. Абид имеет степень магистра в области управления технологиями и степень бакалавра в области телекоммуникаций. Его видение состоит в том, чтобы создать продукт искусственного интеллекта с использованием графовой нейронной сети для студентов, борющихся с психическими заболеваниями.
 

Отметка времени:

Больше от КДнаггетс