Если вы умеете писать функции, вы можете использовать Dask

Исходный узел: 1094095

Если вы умеете писать функции, вы можете использовать Dask

Эта статья является второй из продолжающейся серии статей об использовании Dask на практике. Каждая статья из этой серии будет достаточно простой для новичков, но даст полезные советы для реальной работы. Первая статья в серии посвящена использованию LocalCluster.


By Хьюго Ши, основатель Saturn Cloud

Я общался со многими специалистами по данным, которые слышали о Даск, фреймворк Python для распределенных вычислений, но не знаю, с чего начать. Они знают, что Dask, вероятно, может ускорить многие из их рабочих процессов, запустив их параллельно в кластере машин, но задача изучения совершенно новой методологии может показаться сложной. Я здесь, чтобы сказать вам, что вы можете начать получать пользу от Dask, не изучая всю структуру. Если являетесь тратьте время на ожидание выполнения ячеек блокнота, есть хороший шанс, что Dask может сэкономить ваше время. Даже если вы умеете только писать функции Python, вы можете воспользоваться этим, не изучая ничего другого! Этот пост в блоге представляет собой руководство «Как использовать Dask, не изучая все это».

Dask, датафреймы, мешки, массивы, планировщики, воркеры, графики, RAPIDS, о нет!

 
 
О Dask есть много сложного контента, который может быть ошеломляющим. Это потому, что Dask может использовать кластер рабочих машин, чтобы делать много интересных вещей! Но пока забудьте обо всем этом. В этой статье основное внимание уделяется простым методам, которые могут сэкономить ваше время, не требуя значительных изменений в том, как вы работаете.
 

Для циклов и функций

 
 
Почти каждый специалист по данным делал что-то подобное, когда у вас есть набор фреймов данных, хранящихся в отдельных файлах, и вы используете цикл for, чтобы прочитать их все, выполнить некоторую логику, а затем объединить их:

results = []
for file in files: defer = pd.read_csv(file) ## begin genius algorithm brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) results.append(magical_business_insight)

Со временем у вас появляется больше файлов или genius_algorithm усложняется и требует больше времени для выполнения. И в итоге ждешь. И жду.


 

Шаг 1 заключается в том, чтобы инкапсулировать ваш код в функцию. Вы хотите инкапсулировать то, что происходит внутри цикла for. Это облегчает понимание того, что делает код (преобразование файла во что-то полезное с помощью магии). Что еще более важно, это упрощает использование этого кода помимо циклов for.

def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = [] for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Шаг 2 заключается в том, чтобы распараллелить его с Dask. Теперь вместо цикла for, когда каждая итерация происходит после предыдущей, Dask будет запускать их параллельно в кластере. Это должно дать нам результаты гораздо быстрее, и только на три строки длиннее чем код цикла for!

from dask import delayed
from dask.distributed import Client # same function but with a Dask delayed decorator
@delayed
def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight) # new Dask code
c = Client()
results = c.compute(results, sync=True)

Как это работает:

  • Отложенный декоратор преобразует вашу функцию. Теперь, когда вы вызываете его, он не оценивается. Вместо этого вы получаете обратно delayed объект, который Dask может выполнить позже.
  • Client().compute отправляет все эти задержанные объекты в кластер Dask, где они оцениваются параллельно! Вот и все, ты выиграл!
  • Создание экземпляра Client автоматически предоставляет LocalCluster. Это означает, что все параллельные рабочие процессы Dask — это процессы на той же машине, что и тот, который вызывает Dask. Это делает для краткого примера. Для реальной работы рекомендую создать локальные кластеры в терминале.

Практические темы

 
 
Приведенное выше останавливается там, где останавливается большинство руководств по Dask. Я использовал этот подход в своей работе и с многочисленными клиентами, и всегда возникает пара практических вопросов. Следующие советы помогут вам перейти от приведенного выше примера из учебника к более полезным методам на практике, охватив две постоянно возникающие темы: большие объекты и обработку ошибок.
 

Большие объекты

 
Чтобы вычислять функции в распределенном кластере, объекты, для которых вызываются функции, должны быть отправлены рабочим процессам. Это может привести к проблемам с производительностью, поскольку их необходимо сериализовать (выбрать) на вашем компьютере и отправить по сети. Представьте, что вы выполняете процессы с гигабайтами данных — вам не нужно передавать их каждый раз, когда над ними запускается функция. Если вы случайно отправили большие объекты, вы можете увидеть сообщение от Dask, подобное этому:

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

Есть два способа предотвратить это: вы можете отправлять меньшие объекты рабочим, чтобы нагрузка не была такой большой, или вы можете попытаться отправить каждый объект рабочему только один раз, чтобы вам не приходилось делать передачи. .
Исправление 1: отправляйте небольшие объекты, когда это возможно
Этот пример хорош, потому что мы отправляем путь к файлу (небольшую строку) вместо фрейма данных.

# good, do this
results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Ниже то, что не сделать. Как потому, что вы будете выполнять чтение CSV (дорого и медленно) в цикле, который не является параллельным, так и потому, что теперь мы отправляем кадры данных (которые могут быть большими).

# bad, do not do this
results = []
for file in files: df = pd.read_csv(file) magical_business_insight = make_all_the_magics(df) results.append(magical_business_insight)

Часто код можно переписать, чтобы изменить способ управления данными — либо на клиенте, либо на рабочих процессах. В зависимости от вашей ситуации, вы можете значительно сэкономить время, продумав, какие функции принимают в качестве входных данных и как можно свести к минимуму передачу данных.
Исправление 2: отправлять объекты только один раз
Если вам нужно отправить большой объект, не отправляйте его несколько раз. Например, если мне нужно отправить большой объект модели для вычисления, простое добавление параметра приведет к сериализации модели несколько раз (один раз для каждого файла).

# bad, do not do this
results = []
for file in files: # big model has to be sent to a worker each time the function is called magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Я могу сказать Даску не делать этого, завернув его в отложенный объект.

# good, do this
results = []
big_model = client.scatter(big_model) #send the model to the workers first for file in files: magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Обработка отказа

 
По мере того как ваши вычислительные задачи растут, часто вам может понадобиться иметь возможность преодолевать сбои. В этом случае, возможно, 5% моих CSV содержат неверные данные, с которыми я не могу справиться. Я хотел бы успешно обработать 95 % CSV-файлов, но отслеживать сбои, чтобы я мог скорректировать свои методы и повторить попытку.

Этот цикл делает это.

import traceback
from distributed.client import wait, FIRST_COMPLETED, ALL_COMPLETED queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))] while queue: result = wait(queue, return_when=FIRST_COMPLETED) for future in result.done: index = futures_to_index[future] if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result() else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc() queue = result.not_done print(results)

Поскольку эта функция на первый взгляд довольно сложна, давайте разберем ее.

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

Мы называем compute on results, но так как мы не проходим sync=True, мы сразу же получаем обратное будущее, которое представляет собой вычисление, которое еще не завершено. Мы также создаем отображение из самого будущего на _n_th входной аргумент, который его сгенерировал. Наконец, мы заполняем список результатов, заполненных Nones на данный момент.

while queue: result = wait(queue, return_when=FIRST_COMPLETED)

Затем мы ждем результатов и обрабатываем их по мере поступления. Когда мы ждем фьючерсы, они разделяются на фьючерсы, которые done, и те, что есть not_done.

 if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result()

Если будущее finished, затем мы печатаем, что нам это удалось, и сохраняем результат.

 else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc()

В противном случае мы сохраняем исключение и печатаем трассировку стека.

 queue = result.not_done

Наконец, мы ставим в очередь те фьючерсы, которые еще не были завершены.
 

Заключение

 
 
Dask определенно может сэкономить ваше время. Если вы тратите время на ожидание запуска кода, вам следует воспользоваться этими простыми советами, чтобы распараллелить свою работу. Есть также много дополнительных вещей, которые вы можете сделать с Dask, но это хорошая отправная точка.

 
Bio: Хьюго Ши является основателем Saturn Cloud, облачной рабочей среды для масштабирования Python, совместной работы, развертывания заданий и многого другого.

Оригинал, Перемещено с разрешения.

Связанный:

Источник: https://www.kdnuggets.com/2021/09/write-functions-use-dask.html.

Отметка времени:

Больше от КДнаггетс