Se você pode escrever funções, você pode usar o Dask

Nó Fonte: 1094095

Se você pode escrever funções, você pode usar o Dask

Este artigo é o segundo de uma série contínua sobre o uso do Dask na prática. Cada artigo desta série será bastante simples para iniciantes, mas fornecerá dicas úteis para o trabalho real. O primeiro artigo da série é sobre o uso do LocalCluster.


By Hugo Shi, fundador da Saturn Cloud

Tenho conversado com muitos cientistas de dados que já ouviram falar Painel, a estrutura Python para computação distribuída, mas não sei por onde começar. Eles sabem que Dask provavelmente pode acelerar muitos de seus fluxos de trabalho fazendo-os rodar em paralelo em um cluster de máquinas, mas a tarefa de aprender uma metodologia totalmente nova pode parecer assustadora. Estou aqui para lhe dizer que você pode começar a obter valor do Dask sem precisar aprender todo o framework. Se Você gaste tempo esperando a execução das células do notebook, há uma boa chance de Dask economizar seu tempo. Mesmo que você saiba apenas escrever funções em Python, você pode tirar vantagem disso sem aprender mais nada! Esta postagem do blog é um tutorial “como usar o Dask sem aprender tudo”.

Dask, dataframes, bags, arrays, agendadores, trabalhadores, gráficos, RAPIDS, ah, não!

 
 
Existem muitos conteúdos complicados sobre Dask, o que pode ser opressor. Isso ocorre porque Dask pode utilizar um cluster de máquinas de trabalho para fazer muitas coisas legais! Mas esqueça tudo isso por enquanto. Este artigo se concentra em técnicas simples que podem economizar seu tempo, sem precisar mudar muito a forma como você trabalha.
 

Para loops e funções

 
 
Praticamente todo cientista de dados fez algo assim, onde você tem um conjunto de dataframes armazenados em arquivos separados e usa um loop for para ler todos eles, fazer alguma lógica e depois combiná-los:

results = []
for file in files: defer = pd.read_csv(file) ## begin genius algorithm brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) results.append(magical_business_insight)

Com o tempo, você acaba com mais arquivos, ou o genius_algorithm fica mais complicado e leva mais tempo para ser executado. E você acaba esperando. E esperando.


 

Passo 1 é encapsular seu código em uma função. Você deseja encapsular o que está dentro do loop for. Isso torna mais fácil entender o que o código está fazendo (convertendo um arquivo em algo útil via mágica). Mais importante ainda, torna mais fácil usar esse código de outras maneiras além dos loops for.

def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = [] for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Passo 2 é paralelizá-lo com Dask. Agora, em vez de usar um loop for, onde cada iteração acontece após a anterior, Dask irá executá-las em paralelo em um cluster. Isto deverá dar-nos resultados muito mais rapidamente e só é três linhas a mais do que o código for-loop!

from dask import delayed
from dask.distributed import Client # same function but with a Dask delayed decorator
@delayed
def make_all_the_magics(file): df = pd.read_csv(file) brilliant_features = [] for feature in features: brilliant_features.append(compute_brilliant_feature(df, feature)) magical_business_insight = make_the_magic(brilliant_features) return magical_business_insight results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight) # new Dask code
c = Client()
results = c.compute(results, sync=True)

Como funciona:

  • O decorador atrasado transforma sua função. Agora, quando você chama, não é avaliado. Em vez disso, você recebe de volta um delayed objeto, que Dask pode executar mais tarde.
  • Client().compute envia todos esses objetos atrasados ​​para o cluster Dask, onde são avaliados em paralelo! É isso, você venceu!
  • Instanciando um Client provisiona automaticamente um LocalCluster. Isso significa que os trabalhadores paralelos do Dask são todos processos na mesma máquina que chama o Dask. Isto constitui um exemplo conciso. Para um trabalho real, recomendo criar clusters locais no terminal.

Tópicos Práticos

 
 
O acima para onde a maioria dos tutoriais do Dask para. Usei essa abordagem em meu próprio trabalho e com vários clientes, e sempre surgem algumas questões práticas. As próximas dicas irão ajudá-lo a passar do exemplo do livro acima para métodos mais úteis na prática, cobrindo dois tópicos que surgem constantemente: objetos grandes e tratamento de erros.
 

Objetos grandes

 
Para calcular funções em um cluster distribuído, os objetos nos quais as funções são chamadas precisam ser enviados aos trabalhadores. Isso pode levar a problemas de desempenho, pois eles precisam ser serializados (decapados) no seu computador e enviados pela rede. Imagine que você estivesse processando gigabytes de dados – você não quer ter que transferi-los toda vez que uma função for executada neles. Se você acidentalmente enviar objetos grandes, poderá ver uma mensagem do Dask como esta:

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

Há duas maneiras de impedir que isso aconteça: você pode enviar objetos menores aos trabalhadores para que a carga não seja tão grande, ou você pode tentar enviar cada objeto para um trabalhador apenas uma vez, para não ter que fazer transferências continuamente. .
Correção 1: envie objetos pequenos quando possível
Este exemplo é bom porque estamos enviando um caminho de arquivo (string pequena), em vez do dataframe.

# good, do this
results = []
for file in files: magical_business_insight = make_all_the_magics(file) results.append(magical_business_insight)

Abaixo está o que não pendência. Tanto porque você estaria fazendo leitura de CSV (caro e lento) no loop, que não é paralelo, mas também porque agora estamos enviando dataframes (que podem ser grandes).

# bad, do not do this
results = []
for file in files: df = pd.read_csv(file) magical_business_insight = make_all_the_magics(df) results.append(magical_business_insight)

Muitas vezes, o código pode ser reescrito para alterar onde os dados estão sendo gerenciados – no cliente ou nos trabalhadores. Dependendo da sua situação, pode haver uma enorme economia de tempo ao pensar em quais funções são utilizadas como entrada e como as transferências de dados podem ser minimizadas.
Correção 2: envie objetos apenas uma vez
Se você tiver que enviar um objeto grande, não o envie várias vezes. Por exemplo, se eu precisar enviar um objeto de modelo grande para calcular, basta adicionar o parâmetro para serializar o modelo várias vezes (uma vez por arquivo)

# bad, do not do this
results = []
for file in files: # big model has to be sent to a worker each time the function is called magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Posso dizer a Dask para não fazer isso, envolvendo-o em um objeto atrasado.

# good, do this
results = []
big_model = client.scatter(big_model) #send the model to the workers first for file in files: magical_business_insight = make_all_the_magics(file, big_model) results.append(magical_business_insight)

Lidando com falhas

 
À medida que suas tarefas computacionais aumentam, muitas vezes você desejará ser capaz de superar falhas. Nesse caso, talvez 5% dos meus CSVs contenham dados incorretos com os quais não consigo lidar. Gostaria de processar 95% dos CSVs com êxito, mas acompanhar as falhas para poder ajustar meus métodos e tentar novamente.

Este loop faz isso.

import traceback
from distributed.client import wait, FIRST_COMPLETED, ALL_COMPLETED queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))] while queue: result = wait(queue, return_when=FIRST_COMPLETED) for future in result.done: index = futures_to_index[future] if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result() else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc() queue = result.not_done print(results)

Como esta função é bastante complicada à primeira vista, vamos decompô-la.

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

Nós chamamos compute on results, mas como não estamos passando sync=True, recebemos imediatamente os futuros, que representam o cálculo, que ainda não foi concluído. Também criamos um mapeamento do próprio futuro para o _n_ésimo argumento de entrada que o gerou. Finalmente, preenchemos uma lista de resultados preenchida com Nones por enquanto.

while queue: result = wait(queue, return_when=FIRST_COMPLETED)

Em seguida, esperamos pelos resultados e os processamos à medida que chegam. Quando esperamos pelos futuros, eles são separados em futuros que são done, e os que são not_done.

 if future.status == 'finished': print(f'finished computation #{index}') results[index] = future.result()

Se o futuro for finished, então imprimimos que obtivemos sucesso e armazenamos o resultado.

 else: print(f'errored #{index}') try: future.result() except Exception as e: results[index] = e traceback.print_exc()

Caso contrário, armazenamos a exceção e imprimimos o rastreamento de pilha.

 queue = result.not_done

Por fim, definimos a fila para os futuros que ainda não foram concluídos.
 

Conclusão

 
 
Dask definitivamente pode economizar seu tempo. Se você gasta tempo esperando a execução do código, você deve usar estas dicas simples para paralelizar seu trabalho. Também há muitas coisas avançadas que você pode fazer com o Dask, mas este é um bom ponto de partida.

 
Bio: Hugo Shi é fundador do Saturn Cloud, o espaço de trabalho em nuvem ideal para dimensionar Python, colaborar, implantar trabalhos e muito mais.

Óptimo estado. Original. Republicado com permissão.

Relacionado:

Fonte: https://www.kdnuggets.com/2021/09/write-functions-use-dask.html

Carimbo de hora:

Mais de KDnuggetsGenericName