함수를 작성할 수 있다면 Dask를 사용할 수 있습니다

소스 노드 : 1094095

함수를 작성할 수 있다면 Dask를 사용할 수 있습니다

이 기사는 Dask를 실제로 사용하는 방법에 대한 진행 중인 시리즈의 두 번째 기사입니다. 이 시리즈의 각 기사는 초보자에게 충분히 간단하지만 실제 작업에 유용한 팁을 제공합니다. 시리즈의 첫 번째 기사는 LocalCluster 사용에 관한 것입니다.


By 휴고시, 새턴 클라우드 설립자

나는 들어본 많은 데이터 과학자들과 이야기를 나눴다. 다 스크, 분산 컴퓨팅을 위한 Python 프레임워크이지만 어디서부터 시작해야 할지 모릅니다. 그들은 Dask가 기계 클러스터에서 병렬로 실행함으로써 많은 워크플로의 속도를 높일 수 있다는 것을 알고 있지만 완전히 새로운 방법론을 배우는 작업은 벅차게 보일 수 있습니다. 저는 여러분이 전체 프레임워크를 배우지 않고도 Dask에서 가치를 얻을 수 있다는 것을 알려드리기 위해 왔습니다. 만약에 당신 노트북 셀이 실행되기를 기다리는 데 시간을 보내면 Dask가 시간을 절약할 수 있는 좋은 기회가 있습니다. Python 함수를 작성하는 방법만 알고 있더라도 다른 것을 배우지 않고도 이것을 활용할 수 있습니다! 이 블로그 게시물은 "전체를 배우지 않고 Dask를 사용하는 방법" 튜토리얼입니다.

Dask, 데이터 프레임, 가방, 배열, 스케줄러, 작업자, 그래프, RAPIDS, 오 안돼!

 
 
Dask에 대한 복잡한 콘텐츠가 많이 있으며 압도적일 수 있습니다. Dask가 작업자 머신 클러스터를 활용하여 많은 멋진 작업을 수행할 수 있기 때문입니다! 그러나 지금은 그 모든 것을 잊어버리십시오. 이 문서에서는 작업 방식을 크게 변경하지 않고도 시간을 절약할 수 있는 간단한 기술에 중점을 둡니다.
 

For 루프 및 함수

 
 
거의 모든 데이터 과학자는 다음과 같은 작업을 수행했습니다. 데이터 프레임 집합이 별도의 파일에 저장되어 있고 for 루프를 사용하여 모두 읽고 일부 논리를 수행한 다음 결합합니다.

results = []
for file in files: defer = pd.read_csv(file) ## begin genius algorithm brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) results.append(magical_business_insight)

시간이 지남에 따라 더 많은 파일이 생성되거나 genius_algorithm 더 복잡해지고 실행하는 데 더 오래 걸립니다. 그리고 기다리게 됩니다. 그리고 기다리고 있습니다.


 

1단계 코드를 함수로 캡슐화하는 것입니다. for 루프 내부에 들어가는 내용을 캡슐화하려고 합니다. 이렇게 하면 코드가 하는 일을 더 쉽게 이해할 수 있습니다(마법을 통해 파일을 유용한 것으로 변환). 더 중요한 것은 for 루프 외에 다른 방법으로 해당 코드를 더 쉽게 사용할 수 있다는 것입니다.

def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = [] for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

2단계 Dask와 병렬화하는 것입니다. 이제 각 반복이 이전 반복 다음에 발생하는 for 루프를 사용하는 대신 Dask는 클러스터에서 병렬로 실행합니다. 이것은 우리에게 훨씬 더 빠른 결과를 제공해야 하며, 세 줄 더 길게 for 루프 코드보다!

from dask import delayed
from dask.distributed import Client # same function but with a Dask delayed decorator
@delayed
def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight) # new Dask code
c = Client()
results = c.compute(results, sync=True)

작동 원리 :

  • 지연된 데코레이터는 기능을 변환합니다. 이제 호출하면 평가되지 않습니다. 대신, 당신은 다시 delayed Dask가 나중에 실행할 수 있는 개체입니다.
  • Client().compute 지연된 모든 객체를 Dask 클러스터로 보내고 병렬로 평가됩니다! 그게 다야, 당신이 이겼어!
  • 인스턴스화 Client 자동으로 프로비저닝 LocalCluster. 이것은 Dask 병렬 작업자가 Dask를 호출하는 것과 동일한 시스템의 모든 프로세스임을 의미합니다. 이것은 간결한 예를 만듭니다. 실제 작업을 위해 생성하는 것이 좋습니다. 터미널의 로컬 클러스터.

실용 주제

 
 
위의 내용은 대부분의 Dask 자습서가 중지되는 곳에서 중지됩니다. 저는 이 접근 방식을 제 작업과 수많은 고객과 함께 사용해 왔으며 몇 가지 실용적인 문제가 항상 발생합니다. 이 다음 팁은 큰 개체와 오류 처리라는 두 가지 주제를 지속적으로 다루면서 위의 교과서 예제에서 실제로 더 유용한 방법으로 이동하는 데 도움이 될 것입니다.
 

큰 물체

 
분산 클러스터에서 함수를 계산하려면 함수가 호출되는 객체를 작업자에게 보내야 합니다. 컴퓨터에서 직렬화(피클)하고 네트워크를 통해 전송해야 하기 때문에 성능 문제가 발생할 수 있습니다. 기가바이트의 데이터에 대해 프로세스를 수행하고 있다고 상상해 보십시오. 함수가 실행될 때마다 데이터를 전송할 필요가 없습니다. 실수로 큰 개체를 보낸 경우 Dask에서 다음과 같은 메시지를 볼 수 있습니다.

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

이를 방지하는 방법에는 두 가지가 있습니다. 작업자에게 더 작은 개체를 보내 부담을 덜 수 있도록 하거나 각 개체를 작업자에게 한 번만 보내서 계속 전송하지 않아도 되도록 할 수 있습니다. .
수정 1: 가능한 경우 작은 개체 보내기
이 예제는 데이터 프레임 대신 파일 경로(작은 문자열)를 보내기 때문에 좋습니다.

# good, do this
results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

아래는 무엇입니까 지원 할 것. 둘 다 병렬이 아닌 루프에서 CSV 읽기(비싸고 느림)를 수행하기 때문일 뿐만 아니라 이제 데이터 프레임(크기가 클 수 있음)을 보내고 있기 때문입니다.

# bad, do not do this
results = []
for file in files: df = pd.read_csv(file) magical_business_insight = make_all_the_magics(df) results.append(magical_business_insight)

종종 데이터가 관리되는 위치(클라이언트 또는 작업자)를 변경하기 위해 코드를 다시 작성할 수 있습니다. 상황에 따라 어떤 함수를 입력으로 사용하고 데이터 전송을 최소화할 수 있는지 생각하면 엄청난 시간 절약이 있을 수 있습니다.
수정 2: 개체를 한 번만 보내기
큰 물건을 보내야 한다면 여러 번 보내지 마십시오. 예를 들어, 계산하기 위해 큰 모델 객체를 보내야 하는 경우 단순히 매개변수를 추가하면 모델을 여러 번 직렬화합니다(파일당 한 번).

# bad, do not do this
results = []
for file in files: # big model has to be sent to a worker each time the function is called magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

나는 Dask에게 그렇게 하지 말라고 지시할 수 있습니다.

# good, do this
results = []
big_model = client.scatter(big_model) #send the model to the workers first for file in files: magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

처리 실패

 
계산 작업이 늘어남에 따라 종종 실패를 극복할 수 있기를 원할 것입니다. 이 경우 내 CSV의 5%에 내가 처리할 수 없는 잘못된 데이터가 있을 수 있습니다. CSV의 95%를 성공적으로 처리하고 싶지만 실패를 추적하여 방법을 조정하고 다시 시도할 수 있습니다.

이 루프는 이를 수행합니다.

import traceback
from distributed.client import wait, FIRST_COMPLETED, ALL_COMPLETED queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))] while queue: result = wait(queue, return_when=FIRST_COMPLETED) for future in result.done: index = futures_to_index[future] if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result() else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc() queue = result.not_done print(results)

이 기능은 언뜻 보기에 상당히 복잡하므로 분해해 보겠습니다.

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

우리는 전화 compute on results, 하지만 우리는 통과하지 않기 때문에 sync=True, 아직 완료되지 않은 계산을 나타내는 futures를 즉시 반환합니다. 또한 미래 자체에서 생성한 _n_번째 입력 인수로의 매핑을 생성합니다. 마지막으로 현재로서는 None으로 채워진 결과 목록을 채웁니다.

while queue: result = wait(queue, return_when=FIRST_COMPLETED)

다음으로, 우리는 결과를 기다리고, 그들이 들어오는 대로 처리합니다. 우리가 미래를 기다릴 때, 그것들은 다음과 같은 미래로 분리됩니다. done, 그리고 그것들은 not_done.

 if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result()

만약 미래가 finished, 우리는 우리가 성공했다는 것을 인쇄하고 결과를 저장합니다.

 else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc()

그렇지 않으면 예외를 저장하고 스택 추적을 인쇄합니다.

 queue = result.not_done

마지막으로 아직 완료되지 않은 미래에 대한 대기열을 설정합니다.
 

결론

 
 
Dask는 확실히 시간을 절약할 수 있습니다. 코드가 실행될 때까지 기다리는 데 시간을 보낸다면 다음과 같은 간단한 팁을 사용하여 작업을 병렬화해야 합니다. Dask로 할 수 있는 고급 작업도 많이 있지만 이것은 좋은 출발점입니다.

 
바이오 : 휴고시 Python 확장, 협업, 작업 배포 등을 위한 클라우드 작업 공간인 Saturn Cloud의 설립자입니다.

실물. 허가를 받아 다시 게시했습니다.

관련 :

출처: https://www.kdnuggets.com/2021/09/write-functions-use-dask.html

타임 스탬프 :

더보기 너 겟츠