Mejora del procesamiento de datos con Spark 3.0 y Delta Lake

Nodo de origen: 1013539

Recopilar, procesar y realizar análisis de transmisión de datos, en industrias como la tecnología publicitaria implica una intensa ingeniería de datos. Los datos generados diariamente son enormes (cientos de GB de datos) y requieren un tiempo de procesamiento significativo para procesar los datos para los pasos posteriores.

Otro desafío es la unión de conjuntos de datos para obtener conocimientos. Cada proceso tiene en promedio más de 10 conjuntos de datos y un número igual de uniones con múltiples claves. El tamaño de la partición para cada clave es impredecible en cada ejecución.

Y, finalmente, si la cantidad de datos supera en determinadas ocasiones, el almacenamiento puede quedarse sin memoria. Esto significa que el proceso moriría en medio de las escrituras finales, lo que haría que los consumidores leyeran claramente los marcos de datos de entrada.

En este blog, cubriremos una descripción general de Lagos delta, sus ventajas y cómo se pueden superar los desafíos anteriores moviéndose a Delta Lake y migrando a Spark 3.0 desde Spark 2.4. 

¿Qué es Delta Lake?

Desarrollado en Databricks, “Delta Lake es una capa de almacenamiento de datos de código abierto que se ejecuta en el Data Lake existente y coopera plenamente con Apache Spark API. Junto con la capacidad de implementar transacciones ACID y el manejo escalable de metadatos, Delta Lakes también puede unificar el procesamiento de datos por lotes y la transmisión por secuencias ”. 

Delta Lake utiliza archivos de Parquet versionados para almacenar datos en la nube. Una vez que se configura la ubicación en la nube, Delta Lake realiza un seguimiento de todos los cambios realizados en la tabla o el directorio de la tienda de blobs para proporcionar transacciones ACID. 

Ventajas de usar Delta Lakes 

Delta Lake permite que miles de datos se ejecuten en paralelo, aborde los desafíos de optimización y partición, operaciones de metadatos más rápidas, mantiene un registro de transacciones y actualiza continuamente los datos. A continuación, analizamos algunas ventajas importantes: 

Registro de transacciones de Delta Lake

Los registros de transacciones de Delta Lake son un archivo de solo anexo y contienen un registro ordenado de todas las transacciones realizadas en la tabla de Delta Lake. El registro de transacciones permite a varios usuarios leer y escribir en la tabla dada en paralelo. Actúa como una única fuente de información o el repositorio central que registra todos los cambios realizados por el usuario en la tabla. Mantiene la atomicidad y vigila continuamente las transacciones realizadas en Delta Lake.

Como se mencionó anteriormente, Spark verifica el registro delta en busca de nuevas transacciones, luego de lo cual Delta Lake se asegura de que la versión del usuario esté siempre sincronizada con el registro maestro. También garantiza que no se realicen cambios conflictivos en la tabla. Si el proceso falla antes de actualizar el registro delta, los archivos no estarán disponibles para ningún proceso de lectura, ya que las lecturas siempre pasan por el registro de transacciones.

Compromisos atómicos y de trabajo del registro de transacciones

Delta Lake hace un punto de control cada diez confirmaciones. El archivo con puntos de control contiene el estado actual de los datos en formato Parquet, que se puede leer rápidamente. Cuando varios usuarios intentan modificar la tabla al mismo tiempo, Delta Lake resuelve los conflictos utilizando un control de concurrencia optimista.

El esquema de los metadatos es el siguiente: 

ColumnaTipo de PropiedadDescripción
formatocadenaFormato de la tabla, es decir, “delta”.
idcadenaID único de la tabla
nombre cadenaNombre de la tabla tal como se define en la tienda de metadatos
descripcióncadenaDescripción de la tabla.
UbicacióncadenaUbicación de la mesa
Creado enfecha y horaCuando se creó la mesa
última modificaciónfecha y horaCuándo se modificó la tabla por última vez
particiónColumnasconjunto de cadenasNombres de las columnas de partición si la tabla está particionada
numArchivosCortoNúmero de archivos en la última versión de la tabla
propiedadesMapa de cadena-cadenaTodas las propiedades establecidas para esta tabla
minReaderVersiónintVersión mínima de lectores (según protocolo de registro) que pueden leer la tabla.
minWriterVersionintVersión mínima de lectores (según el protocolo de registro) que pueden escribir en la tabla.
Fuente: GitHub

Agregar y quitar archivo

Siempre que se agrega un archivo o se elimina un archivo existente, estas acciones se registran. La ruta del archivo es única y se considera la clave principal para el conjunto de archivos que contiene. Cuando se agrega un nuevo archivo en una ruta que ya está presente en la tabla, las estadísticas y otros metadatos de la ruta se actualizan desde la versión anterior. Del mismo modo, la acción de eliminación se indica mediante una marca de tiempo. Una acción de eliminación permanece en la tabla como lápida hasta que expire. Una lápida expira cuando se excede el TTL (tiempo de vida).

Dado que no se garantiza que las acciones dentro de un archivo Delta determinado se apliquen en orden, no es válido que existan varias operaciones de archivo con la misma ruta en una única versión.

El indicador dataChange en 'agregar' o 'eliminar' se puede establecer en falso para minimizar los conflictos de operaciones concurrentes.

El esquema de la acción de agregar es el siguiente:

Nombre del campoTipo de datosDescripción
caminoCordónUna ruta relativa, desde la raíz de la tabla, hasta un archivo que debe agregarse a la tabla.
PartitionValuesMapa [Cadena, Cadena]Un mapa de la columna de partición al valor de este archivo. 
tamañoLargoEl tamaño de este archivo en bytes
tiempo de modificaciónLargoLa hora en que se creó este archivo, expresada en milisegundos desde la época
cambio de datosBooleanCuando es falso, el archivo ya debe estar presente en la tabla o los registros en el archivo agregado deben estar contenidos en una o más acciones de eliminación en la misma versión.
estadísticasEstructura estadísticaContiene estadísticas (p. Ej., Recuento, valores mínimos / máximos para las columnas) sobre los datos de este archivo
etiquetasMapa [Cadena, Cadena]Mapa que contiene metadatos sobre este archivo

El esquema de la acción de eliminación es el siguiente:

Nombre del campoDatos Tipo de PropiedadDescripción
caminocadenaUna ruta absoluta o relativa a un archivo que debe eliminarse de la tabla.
eliminaciónMarca de tiempoCortoLa hora en que ocurrió la eliminación, representada en milisegundos desde la época.
cambio de datosBooleanCuando es falso, los registros del archivo eliminado deben estar incluidos en una o más acciones de agregar archivo en la misma versión
metadatos de archivo extendidoBooleanCuando es verdadero, los campos partitionValues, size y etiquetas están presentes
PartitionValuesMapa [Cadena, Cadena]Un mapa de la columna de partición al valor de este archivo. Consulte también Serialización de valores de partición
tamañoLargoEl tamaño de este archivo en bytes
etiquetasMapa [Cadena, Cadena]Mapa que contiene metadatos sobre este archivo
Fuente: GitHub

El esquema de los metadatos contiene la ruta del archivo en cada acción de agregar / quitar y el proceso de lectura de Spark no necesita hacer un escaneo completo para obtener las listas de archivos.

Si una escritura falla sin actualizar el registro de transacciones, dado que la lectura del consumidor siempre pasará por los metadatos, esos archivos se ignorarán. 

Ventajas de migrar a Spark 3.0

Además de aprovechar los beneficios de Delta Lake, la migración a Spark 3.0 mejoró el procesamiento de datos de las siguientes maneras:

Optimización de unión sesgada

La desviación de datos es una condición en la que los datos de una tabla se distribuyen de manera desigual entre las particiones del clúster y pueden degradar gravemente el rendimiento de las consultas, especialmente aquellas con combinaciones. La asimetría puede conducir a un desequilibrio extremo en el clúster, lo que aumenta el tiempo de procesamiento de datos.

La condición de sesgo de datos se puede manejar principalmente mediante tres enfoques.

  1. Usando la configuración "spark.sql.shuffle.partitions" para un mayor paralelismo en datos distribuidos de manera más uniforme.
  2. Aumentar el umbral de unión de hash de transmisión mediante la configuración spark.sql.autoBroadcastJoinThreshold al tamaño máximo en bytes de la tabla que debe transmitirse a todos los nodos trabajadores durante la realización de una unión.
  3. Salazón de claves (agregue un prefijo a las claves sesgadas para hacer que la misma clave sea diferente y luego ajuste la distribución de datos).

Spark 3.0 ha agregado una optimización al manejo automático de la combinación sesgada basada en las estadísticas de tiempo de ejecución con el nuevo marco de ejecución adaptable.

Condición de partición sesgada

El desafío de las particiones sesgadas que existían en la versión anterior de Spark 2.4 tuvo un gran impacto en el tiempo de red y el tiempo de ejecución de una tarea en particular. Además, los métodos para tratarlo eran en su mayoría manuales. Spark 3.0 supera estos desafíos.

La partición sesgada tendrá un impacto en el tráfico de la red y en el tiempo de ejecución de la tarea, ya que esta tarea en particular tendrá muchos más datos para procesar. También necesita saber cómo afecta esto a la ciberseguridad, ya que el volumen de tráfico de la red es algo que aprovechan los piratas informáticos.

La partición de unión sesgada se calcula mediante el tamaño de los datos y los recuentos de filas de las estadísticas del mapa de tiempo de ejecución.

Optimización

Adaptado de: Apache Spark Jira

De la tabla anterior, las Campañas de Dataframe se unen a las Organizaciones de Dataframe. Una de las particiones (Partición 0) de Organizaciones es grande y está sesgada. La partición 0 es el resultado de 9 mapas de la etapa anterior (Mapa-0 a Mapa-8). La regla OptimizeSkewedJoin de Spark dividirá la partición en 3 y luego creará 3 tareas separadas, cada una de las cuales será una partición parcial de la partición 0 (Map-0 to Map-2, Map-3 to Map-5 y Map-6 to Map-9) y se une con la Partición 0 de Campañas. Este enfoque genera un costo adicional al leer la Partición 0 de la tabla Campañas igual al número de particiones parciales de la tabla Organizaciones.

Resultado final

Usando Delta Lake y Spark 3.0, habilitamos los siguientes resultados para la empresa de tecnología publicitaria:

  • El tiempo de procesamiento de datos se redujo de 15 horas a 5-6 horas.
  • 50% de reducción en el costo de AWS EMR
  • Prevención de la pérdida de datos y la muerte de procesos, lo que ocurría con frecuencia cuando el sistema se quedaba sin memoria o el procesamiento se detenía debido a una falla en el sistema.
  • Se instalaron funciones de monitoreo y alerta para notificar en caso de que el proceso fallara
  • Orquestación completa con Airflow para lograr una automatización completa y una gestión de la dependencia entre procesos.
Fuente: https://www.smartdatacollective.com/improving-data-processing-with-spark-3-delta-lake/

Sello de tiempo:

Mas de Colectivo SmartData