Якщо ви можете писати функції, ви можете використовувати Dask

Вихідний вузол: 1094095

Якщо ви можете писати функції, ви можете використовувати Dask

Ця стаття є другою статтею з серії про використання Dask на практиці. Кожна стаття з цієї серії буде досить простою для початківців, але дасть корисні поради для реальної роботи. Перша стаття серії присвячена використанню LocalCluster.


By Х'юго Ши, засновник Хмари Сатурна

Я спілкувався з багатьма дослідниками даних, які чули про них Панель приладів, фреймворк Python для розподілених обчислень, але не знаю, з чого почати. Вони знають, що Dask, ймовірно, може прискорити багато їхніх робочих процесів, запустивши їх паралельно на кластері машин, але завдання вивчення абсолютно нової методології може здатися складним. Я тут, щоб сказати вам, що ви можете почати отримувати користь від Dask, не вивчаючи всю структуру. Якщо ви проводите час, чекаючи виконання клітинок блокнота, є велика ймовірність, що Dask заощадить ваш час. Навіть якщо ви знаєте, як писати функції Python, ви можете скористатися цим, не вивчаючи нічого іншого! Ця публікація в блозі є підручником «як використовувати Dask, не вивчаючи всього».

Dask, фрейми даних, пакети, масиви, планувальники, працівники, графіки, RAPIDS, о ні!

 
 
Про Dask є багато складного контенту, який може бути приголомшливим. Це тому, що Dask може використовувати кластер робочих машин, щоб робити багато цікавих речей! Але поки що забудьте про все це. Ця стаття зосереджена на простих прийомах, які можуть заощадити ваш час, при цьому не потрібно сильно змінювати свою роботу.
 

Для циклів і функцій

 
 
Практично кожен науковець з даних робив щось подібне, коли у вас є набір фреймів даних, що зберігаються в окремих файлах, і ви використовуєте цикл for, щоб прочитати їх усі, виконати певну логіку, а потім об’єднати їх:

results = []
for file in files: defer = pd.read_csv(file) ## begin genius algorithm brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) results.append(magical_business_insight)

З часом у вас буде більше файлів або genius_algorithm стає складнішим і займає більше часу. І в кінцевому підсумку ви чекаєте. І чекаємо.


 

крок 1 полягає в тому, щоб інкапсулювати ваш код у функцію. Ви хочете інкапсулювати те, що входить усередину циклу for. Це полегшує розуміння того, що робить код (перетворення файлу в щось корисне за допомогою магії). Що ще важливіше, це полегшує використання цього коду іншими способами, окрім циклів for.

def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = [] for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

крок 2 полягає в тому, щоб паралелізувати його з Dask. Тепер замість використання циклу for, де кожна ітерація відбувається після попередньої, Dask запускатиме їх паралельно в кластері. Це повинно дати нам результати набагато швидше, і тільки на три рядки довше ніж код циклу for!

from dask import delayed
from dask.distributed import Client # same function but with a Dask delayed decorator
@delayed
def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight) # new Dask code
c = Client()
results = c.compute(results, sync=True)

Як це працює:

  • Відкладений декоратор перетворює вашу функцію. Тепер, коли ви називаєте це, воно не оцінюється. Натомість ви отримуєте назад a delayed об'єкт, який Dask може виконати пізніше.
  • Client().compute надсилає всі ці затримані об’єкти в кластер Dask, де вони паралельно оцінюються! Ось і все, ви виграли!
  • Створення екземплярів a Client автоматично забезпечує а LocalCluster. Це означає, що паралельні працівники Dask — це всі процеси на тій же машині, що й той, що викликає Dask. Це дає стислий приклад. Для справжньої роботи рекомендую творити локальні кластери в терміналі.

Практичні теми

 
 
Наведене вище зупиняється там, де зупиняється більшість навчальних посібників Dask. Я використовував цей підхід у своїй роботі та з численними клієнтами, і завжди виникає пара практичних проблем. Ці наступні поради допоможуть вам перейти від наведеного вище прикладу з підручника до більш корисних методів на практиці, охоплюючи дві теми, які постійно виникають: великі об’єкти та обробка помилок.
 

Великі об'єкти

 
Щоб обчислити функції в розподіленому кластері, об’єкти, до яких викликаються функції, потрібно надіслати працівникам. Це може призвести до проблем із продуктивністю, оскільки їх потрібно серіалізувати (запікати) на вашому комп’ютері та надіслати по мережі. Уявіть, що ви виконуєте процеси з гігабайтами даних – вам не потрібно передавати їх щоразу, коли на ньому виконується функція. Якщо ви випадково надсилаєте великі об’єкти, ви можете побачити таке повідомлення від Dask:

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

Є два способи запобігти цьому: ви можете надіслати працівникам менші об’єкти, щоб навантаження не було таким великим, або ви можете спробувати надіслати кожен об’єкт працівнику лише один раз, щоб вам не доводилося робити передачі. .
Виправлення 1: надсилайте невеликі об’єкти, коли це можливо
Цей приклад хороший, тому що ми надсилаємо шлях до файлу (невеликий рядок), а не фрейм даних.

# good, do this
results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Нижче що НЕ зробити. І тому, що ви будете виконувати читання CSV (дороге і повільне) у циклі, який не є паралельним, а також тому, що зараз ми надсилаємо кадри даних (які можуть бути великими).

# bad, do not do this
results = []
for file in files: df = pd.read_csv(file) magical_business_insight = make_all_the_magics(df) results.append(magical_business_insight)

Часто код можна переписати, щоб змінити місце керування даними – або на клієнті, або на працівниках. Залежно від вашої ситуації, ви можете значно заощадити час, якщо продумати, які функції беруть на вхідні дані та як можна мінімізувати передачу даних.
Виправлення 2: надсилати об’єкти лише один раз
Якщо вам потрібно надіслати великий предмет, не надсилайте його кілька разів. Наприклад, якщо мені потрібно надіслати великий об’єкт моделі для обчислення, просто додавання параметра серіалізує модель кілька разів (один раз на файл)

# bad, do not do this
results = []
for file in files: # big model has to be sent to a worker each time the function is called magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Я можу сказати Dask не робити цього, загорнувши його в об’єкт із затримкою.

# good, do this
results = []
big_model = client.scatter(big_model) #send the model to the workers first for file in files: magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Помилка обробки

 
У міру того, як ваші обчислювальні задачі зростають, часто вам захочеться мати можливість працювати в разі збою. У цьому випадку, можливо, 5% моїх файлів CSV мають погані дані, які я не можу обробити. Я хотів би успішно обробити 95% файлів CSV, але слідкуйте за помилками, щоб налаштувати свої методи та спробувати ще раз.

Цей цикл робить це.

import traceback
from distributed.client import wait, FIRST_COMPLETED, ALL_COMPLETED queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))] while queue: result = wait(queue, return_when=FIRST_COMPLETED) for future in result.done: index = futures_to_index[future] if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result() else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc() queue = result.not_done print(results)

Оскільки ця функція на перший погляд досить складна, давайте розберемо її.

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

Ми називаємо compute on results, але оскільки ми не проходимо sync=True, ми негайно отримуємо назад ф'ючерси, які представляють обчислення, яке ще не завершено. Ми також створюємо відображення від самого майбутнього до _n_-го вхідного аргументу, який його створив. Нарешті, ми заповнюємо список результатів, заповнений наразі Nones.

while queue: result = wait(queue, return_when=FIRST_COMPLETED)

Далі ми чекаємо результатів і обробляємо їх у міру їх надходження. Коли ми чекаємо на ф’ючерси, вони поділяються на ф’ючерси, які done, і ті, які є not_done.

 if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result()

Якщо майбутнє є finished, потім друкуємо, що нам вдалося, і зберігаємо результат.

 else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc()

В іншому випадку ми зберігаємо виняток і друкуємо трасування стека.

 queue = result.not_done

Нарешті, ми ставимо чергу до тих ф’ючерсів, які ще не завершені.
 

Висновок

 
 
Dask безперечно може заощадити ваш час. Якщо ви витрачаєте час на запуск коду, вам слід скористатися цими простими порадами, щоб розподілити свою роботу. Існує також багато розширених речей, які ви можете зробити з Dask, але це хороша відправна точка.

 
Біо: Х'юго Ши є засновником Saturn Cloud, хмарного робочого простору для масштабування Python, співпраці, розгортання завдань тощо.

Оригінал. Повідомлено з дозволу.

За темою:

Джерело: https://www.kdnuggets.com/2021/09/write-functions-use-dask.html

Часова мітка:

Більше від KDnuggets