Data Engineering Zoomcamp – Semana 5

Batch Processing con Spark

Entramos en la recta final del bootcamp para Data Engineers. Las dos semanas que quedan están dedicadas al procesamiento de datos, comenzando esta con el batch o procesamiento por lotes empleando tecnología spark y la semana que viene con el procesamiento de datos en tiempo real o streaming con Apache Kafka.

Última actualización: 07/03/2023

Prerequisitos

En mi caso estoy trabajando en Windows, os recomiendo seguir la guía de instalación de los prerequesitos. Es importante tener en cuenta que si estamos usando Git bash o MINGW64 el catálogo de variables de entorno no se replica en Windows, por lo que debemos arrancar los cuadernos de jupyter desde el command shell de unix para que coja correctamente los path configurados.

  • Java: Mínimo la versión 11 del JDK de java desde su página oficial para trabajar con Spark. Recuerda que además de instalarlo, es necesario añadir a la variable de entorno PATH el JAVA_HOME con la ubicación de Java.
  • Apache Hadoop: Descargamos los binarios de la versión 3.2 desde aquí. Es necesario añadir también en el PATH la ubicación del directorio donde extraigamos los archivos y crear la variable HADOOP_HOME.
  • Apache Spark: vamos a descargar la versión 3.3.2 de Spark desde su repositorio. Si estás usando Git bash en Windows y no tienes instalado wget para descargar paquetes, debes hacerlo desde este enlace y copiar el archivo .exe en la ruta bin donde tengas instalado git bash (C:\Program Files\Git\mingw64\bin). Puedes acceder al GUI de Spark publicado en el puerto 4040: http://localhost:4040/jobs/. Creamos la variable de entorno SPARK_HOME y la añadimos también al PATH de Windows.
  • PySpark: librería de Python para trabajar con Spark. Podemos seguir esta guía para su instalación. Si nos da problemas, específicamente el error Module not found cuando añadimos al PATH la ruta de PySpark, revisa la nota indicada en la repo del bootcamp (hay que cambiar el nombre del fichero en la ruta PATH si hemos descargado otra versión). Añadimos la variable de entorno PYTHONPATH según las instrucciones.

Procesamiento de datos

El procesamiento de datos es el conjunto de operaciones y técnicas utilizadas para transformar datos brutos o sin procesar en información útil y significativa. Esta información puede ser utilizada para tomar decisiones, realizar análisis, predicciones o automatizar procesos. Existen distintas técnicas o enfoques para procesar los datos donde destacan los procesos batch (por lotes) y streaming (en tiempo real), que a su vez pueden clasificarse en procesamiento distribuido, paralelo, online, transaccional o analítico. Esta semana del bootcamp está centrada en el batch, por lo que vamos a dejar por aquí una breve pincelada del procesamiento streaming donde profundizaremos la semana que viene.

Batch

El procesamiento por lotes (batch) se utiliza para realizar periódicamente trabajos de datos repetitivos y de gran tamaño. Las tareas de transformación, filtrado y clasificación pueden ser intensivas en cálculo e ineficientes si se ejecutan en transacciones individuales. En su lugar se procesan estas tareas en lotes, a menudo en momentos de menor actividad cuando los recursos informáticos están más disponibles, como al final del día o durante la noche.

Consideremos una tienda online que recibe pedidos durante todo el día. En lugar de procesar cada pedido a medida que ocurre, el sistema podría recopilar todos los pedidos al final de cada día y compartirlos en un bloque con el equipo de cumplimiento de pedidos.

Streaming processing
Batch processing

Streaming

El procesamiento de datos en tiempo real o streaming (streaming processing / real-time processing) es un método que se realiza en tiempo real, a medida que se generan o se reciben los datos. Cuando la cantidad de datos es desconocida o infinita es preferible aplicar el procesamiento por streaming en lugar de batch.

Este enfoque es adecuado para tareas que requieren una respuesta rápida, como la detección de fraudes, la monitorización de sistemas en línea, el análisis de datos de sensores (IoT) o de logs.

El procesamiento de datos en streaming permite tomar decisiones rápidas basadas en la información más reciente disponible.

Batch processing
Streaming processing

Batch processing (procesamiento de datos por lotes)

Como hemos indicado en la introducción, el procesamiento batch o por lotes trabaja con conjuntos de datos que procesa a intervalos de tiempo. Por ejemplo, si un banco o aseguradora debe cumplir sus compromisos regulatorios hacia entidades de control de forma mensual, existirá un proceso batch mensual que agrupe toda la información, la transforme, agregue y cocine para generar el modelo de datos del reporting. Habitualmente los procesos batch tienen intervalos:

  • Mensual (más habitual)
  • Semanal (más habitual)
  • Diario (más habitual)
  • Cada hora
  • 3 por hora
  • Cada 5 minutos

A nivel de tecnología podemos crear un proceso batch en cualquier lenguaje de programación (por ejemplo, Python, Java, Scala…), modelar los datos con dbt y orquestar los scripts con Apache Airflow, Prefect, Control-M, etc.

Flujo de procesamiento batch
Flujo de procesamiento batch
VentajasDesventajas
Muy eficiente para tareas repetitivas y de gran tamaño. En lugar de procesar cada transacción de datos individualmente se trabaja con grandes cantidades de una sola vez.Como los procesos batch se realizan en intervalos de tiempo específico, los datos tienen retraso en estar disponibles.
Consume menos recursos que el procesamiento en tiempo real, lo que implica menores costes de infraestructura y capacidad de cómputo. Si ocurre un error durante el procesamiento por lotes se puede perder información que obliga a reprocesar.
Se aprovechan los períodos de menor actividad de máquina (fines de semana u horas nocturnas) para procesar grandes cantidades de datos de manera más rápida.El procesamiento por lotes puede ser complejo de implementar y mantener, especialmente cuando se trata de programar y garantizar la disponibilidad de recursos adecuados.
Facilita la escalabilidad, en caso necesario se puede aprovisionar mayor capacidad de cómputo (clusters de spark)
Un batch job se puede relanzar tantas veces sean necesarias.
Existen en el mercado multitud de herramientas y tecnologías para facilitar la gestión una malla batch.
Pros y contras de procesamiento por lotes (batch)

Apache Spark

¿Qué es Apache Spark?

Apache Spark es un motor de procesamiento de datos de código abierto utilizado para realizar análisis y transformación de grandes volúmenes de datos en clústeres de servidores distribuidos (paralelizando el procesamiento en distintos nodos). Fue desarrollado originalmente en la Universidad de California, Berkeley, y ahora es mantenido por la Apache Software Foundation. Básicamente lo que hace Spark es dividir una carga de trabajo en varias porciones que distribuye entre distintos nodos o máquinas para que trabajen de forma paralela y cuando finalizan, agrupan los resultados y lo devuelven.

Spark es conocido por su velocidad, ya que puede procesar grandes conjuntos de datos mucho más rápido que otras herramientas con el mismo objetivo, como Hadoop MapReduce. Spark también admite múltiples lenguajes de programación, como Scala, Java, Python y R. Además, proporciona una variedad de bibliotecas y herramientas para diferentes tareas de procesamiento de datos, como batch, streaming, procesamiento de gráficos y aprendizaje automático (ML o machine learning).

Dentro de un ecosistema Data Lake utilizar Spark nos va a ayudar en el proceso de transformación de los datos. En un Data lake los datos se almacenan como ficheros, habitualmente csv o parquet, que podemos consultar como si fuera un modelo de datos SQL mediante herramientas tales como Hive, Presto o Athena (en cloud Amazon AWS), o BigQuery (en Google Cloud Platform). En el caso de que la lógica sea más compleja y no podamos resolverla mediante SQL, entra en juego Spark. En un mismo flujo de trabajo podemos combinar ambas opciones, cuando los datos puedan transformarse por SQL usaremos este camino, y cuando sean transformaciones complejas lo haremos con Spark.

Workflow de transformación de datos con Spark o Hive / BigQuery
Workflow de transformación de datos con Spark o Hive / BigQuery

Arquitectura Spark

Spark se basa en una arquitectura de procesamiento distribuido, lo que significa que utiliza un clúster o grupo de ordenadores para procesar datos. Consta de varios componentes que se comunican entre sí para ejecutar las tareas.

Un clúster de Spark consiste en un proceso Driver que se ejecuta dentro de un nodo Master y procesos Executor que se ejecutan dentro de cada uno de los nodos Worker. Cuando se envía un trabajo a Spark, el Driver particiona y distribuye el trabajo en forma de tareas a los procesos Executor (en diferentes nodos Worker) para su procesamiento adicional. A medida que se ejecuta el trabajo de la aplicación, los Executor informan al Driver sobre el estado de la tareas, y así éste mantiene el estado general del trabajo de la aplicación. Cada Worker tiene su propia memoria y CPU, y está conectado a otros Workers a través de una red de alta velocidad. Pueden ser añadidos o eliminados del clúster según sea necesario para ajustar la capacidad de procesamiento

¿Cómo sabe el proceso Driver qué Executors están disponibles para el procesamiento y a quién distribuir las tareas? gracias al Administrador de Clúster (Cluster Manager). Realiza un seguimiento del estado de los recursos del clúster (qué procesos Executor en qué nodos Worker están disponibles, etc.). El Driver está conectado con el Cluster Manager a través de una SparkSession o un SparkContext (SparkSession estaría por encima del SparkContext).

Arquitectura Spark
Arquitectura Spark

La arquitectura de Apache Spark cosiste principalmente en dos capas de abstracción:

Resilient Distributed Datasets (RDD):
Es la célula del ecosistema Spark, el elemento básico para trabajar con los datos. Se caracterizan porque son inmutables (los datos no pueden cambiarse una vez se crean), distribuidos (siguiendo el patrón de Spark, se dividen en particiones entre los nodos del clúster) y resilientes (automáticamente es capaz de regenerar una partición que se haya perdido). Hay dos operaciones que pueden realizarse sobre los RDDs: transformaciones y acciones.

Directed Acyclic Graph (DAG):
El driver convierte cada tarea en un job de tipo DAG (grafo acíclico dirigido) formado por vértices (RDD) y aristas (sus transformaciones). En lenguaje coloquial, cada tarea es un trabajo dividido en etapas (vértices) que siguen un secuencia lineal (acíclico). Las etapas están construidas con alguno de los componentes de Spark (API Core, Spark SQL, Streaming, real-time processing, MLlIB o GraphX).

Ecosistema Spark

El ecosistema de Spark lo forman los siguientes elementos:

  • Spark Core: Es el componente principal de Spark y proporciona las funcionalidades básicas, como el procesamiento distribuido, la programación paralela y la tolerancia a fallos. Es la API para el procesamiento batch.
  • Spark SQL: Proporciona una API para trabajar con datos estructurados o semiestructurados usando SQL. Nos ofrece tres vías para ello:
    • DataFrames: estructura de datos distribuida que se organiza en columnas con nombres y tipos de datos (similar a una tabla relacional). Se pueden crear a partir de archivos de datos estructurados como CSV o JSON, o mediante la lectura de datos de una base de datos relacional utilizando Spark SQL. Los DataFrames también se pueden transformar mediante operaciones de filtrado, agregación y unión para realizar tareas de análisis de datos.
    • Datasets: es una API más segura y fuertemente tipada que se encuentra en la parte superior de los DataFrames. Los Datasets permiten trabajar con datos estructurados de manera más fácil y natural, ya que se definen los esquemas de los datos de forma estática. Se generan a partir de ficheros CSV, JSON, bases de datos relacionales, etc. También se pueden transformar mediante operaciones de filtrado, agregación y unión.
    • Lenguaje SQL a través de una API SQL pata trabajar sobre DataFrames y Datasets. Admite una amplia gama de funciones SQL como SELECT, FROM, WHERE, JOIN, GROUP BY, ORDER BY, etc.
  • Spark Streaming: Es un componente que permite procesar datos en tiempo real, como publicaciones de Twitter o Facebook. Procesa los datos en lotes y utiliza la misma API que Spark Core.
  • Spark MLlib: Proporciona algoritmos de aprendizaje automático para realizar tareas como clasificación, regresión y agrupamiento de datos en modo distribuido.
  • Spark GraphX: Proporciona herramientas para trabajar con datos gráficos y realizar análisis de redes y grafos.

PySpark (Python + Apache Spark)

PySpark es una librería Python para desarrollar aplicaciones que explotan todas las capacidades de Apache Spark (procesamiento distribuido paralelizando las cargas de trabajo entre nodos) ideal para proyectos de datos a gran escala y machine learning (ML). Necesitamos descargar la librería pyspark bien por pip o siguiendo las instrucciones del bootcamp.

import pyspark

SparkSession

SparkSession es una clase en PySpark que se utiliza para trabajar con Spark y que proporciona una interfaz única para interactuar con diferentes tipos de datos en Spark, como RDD, DataFrames y DataSet. SparkSession se utiliza para crear y configurar SparkContext, SQLContext y HiveContext en una sola sesión.

Para instanciar una SparkSession debemos invocar el constructor y pasarle varios parámetros, aunque en vamos a trabajar sólo con los dos primeros:

  • appName: Nombre de la aplicación de Spark, por ejemplo «test«
  • master: especifica la dirección del clúster de Spark en el que se ejecutará la aplicación. Puede ser una URL de un cluster de Spark independiente o ejecución local:
    • local: Especifica el modo de ejecución local, es decir, que se ejecutará en una única máquina como un proceso local.
    • local[N]: Especifica el modo de ejecución local con N hilos.
    • local[*]: Especifica el modo de ejecución local con tantos hilos como núcleos de CPU disponibles.
    • yarn: Especifica el modo de ejecución en un cluster de YARN.
    • mesos: Especifica el modo de ejecución en un cluster de Mesos.
    • spark://HOST:PORT: Especifica la URL de un cluster de Spark independiente.
    • k8s://https://HOST:PORT: Especifica la URL del API server de Kubernetes en el que se ejecutará la aplicación.
  • config: Configuraciones adicionales de Spark.
    • spark.executor.memory: Cantidad de memoria asignada a cada ejecutor.
    • spark.driver.memory: Cantidad de memoria asignada al driver.
    • spark.sql.shuffle.partitions: Número de particiones utilizadas por las operaciones de shuffle en SQL.
    • spark.serializer: Serializador utilizado para serializar/deserializar objetos.
    • spark.ui.port: Puerto utilizado por la interfaz de usuario web de Spark.
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("test") \
    .master("local[*]") \
    .getOrCreate()

Para acceder al Spark UI podemos consultar la URL http://localhost:4040 (si no hemos especificado otro puerto en la configuración). Si deseamos crear otra SparkSession para otro notebook podemos especificar un nuevo puerto distinto al de por defecto 4040 en el .config():

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .config("spark.ui.port", "4041") \
    .getOrCreate()

Spark DataFrames con CSV y Parquet

Como hemos mencionado previamente, un DataFrame es un set de datos con tipado y organizado en columnas. A continuación vamos a ver cómo cargar en un DataFrame de PySpark el contenido de un CSV o un Parquet. Como breve pincelada del formato Parquet indicar que es un tipo de archivo muy utilizado en el mundo de datos al tener un alto grado de compresión (1:10 respecto a CSV) gracias a que almacena los datos de forma columnar, no por filas, lo que permite ajustar el tipado y por tanto el espacio necesario. Otra de las ventajas es que el esquema de los datos está incluido en el propio fichero, por lo que no es necesario inferirlo o asignarlo.

CSV
Para cargar un CSV utilizaremos el método read.csv(). Podemos especificar algunos parámetros de configuración, por ejemplo header=True indica que la primera fila es la cabecera y inferSchema=True va a inferir esquema en base a los datos de cada columna.

df = spark.read.csv('taxi+_zone_lookup.csv', header=True, inferSchema=True)

Parquet
En el caso de de Parquet usaremos el método read.parquet(). La ventaja de cargar un archivo Parquet en lugar de otro formato es que estos archivos suelen tener un tamaño más pequeño y una mayor eficiencia de procesamiento debido a su estructura columnar y compresión de datos.

df = spark.read.parquet('taxi+_zone_lookup.parquet')

Partitions

En PySpark, las particiones se utilizan para dividir un conjunto de datos en fragmentos más pequeños y distribuirlos a través de un clúster de Spark. Cada partición es procesada en paralelo por un executor en el clúster, lo que permite procesar grandes conjuntos de datos de manera más eficiente y escalable. Por ejemplo, si deseamos procesar un fichero CSV o parquet de 1GB, podríamos segmentarlo en 10 particiones para que cada una sea trabajada de forma paralela en 10 nodos del clúster.

Las particiones son una de las unidades fundamentales de procesamiento en Spark y se utilizan en varios tipos de objetos de datos, como RDD, DataFrames y DataSets. En general, se recomienda tener un número adecuado de particiones para un conjunto de datos dado, ya que demasiadas particiones pueden provocar un exceso de sobrecarga en la comunicación entre los ejecutores y demasiado pocas particiones pueden resultar en una utilización ineficiente de los recursos del clúster.

Las particiones se pueden especificar al crear un RDD o al leer un conjunto de datos en un DataFrame o DataSet. Por ejemplo, al leer un archivo CSV en un DataFrame, se puede especificar el número de particiones mediante el parámetro numPartitions:

df = spark.read.csv("path/to/csv/file.csv" \
        , header=True \
        , inferSchema=True \
        , numPartitions=8)

También es posible ajustar el número de particiones de un RDD o DataFrame existente utilizando el método repartition o coalesce. El método repartition redistribuirá los datos aleatoriamente a través del clúster y creará el número especificado de particiones, mientras que coalesce fusionará particiones adyacentes para crear el número especificado de particiones.

df = df.repartition(24)

Si queremos persistir el DataFrame en un fichero parquet, por ejemplo, usaremos el método write.parquet pasándole como parámetro el directorio para la salida:

df = df.repartition(24)
df.write.parquet('fhvhv/2021/01/')

Si consultamos en la Spark UI (http://localhost:4040) el progreso del trabajo podemos ver que está dividido en dos etapas: en primer lugar Spark crea tantas particiones como cores tenga nuestra CPU (recuerda que hemos creado el SparkSession con Local[*]), por lo que si tenemos 4, va a dividir el DataFrame en 4 particiones. Cada partición genera una task. Cuando hayan finalizado las 4 tareas de esta etapa, se genera el particionado que hayamos especificado. En el ejemplo, se generarán 10 tasks, cada una para cada partición. Como nuestra CPU sólo tiene 4 cores las tasks se van a ir encolando y ejecutando según se vayan quedando disponibles. Cada una de las particiones se almacena en un fichero con el sufijo _snappy.parquet, que es el formato de compresión de alta velocidad por defecto de parquet.

Para ilustrar el ejemplo, mi equipo tiene 12 cores y vamos a crear 24 particiones del fichero parquet.

CPU y cores para calcular tasks de Spark
CPU y cores para calcular tasks de Spark

Al lanzar el comando .write.parquet() Spark va a generar dos etapas: la primera de 12 tasks que se corresponden con los 12 cores de mi CPU y la segunda con las 24 tasks correspondientes a las 24 particiones que he especificado.

Spark UI y Spark partitions
Spark UI y Spark partitions

Si consultamos el directorio podremos observar como se han generado 24 ficheros de tipo .snappy.parquet:

Ficheros snappy.parquet de Spark partitions
Ficheros snappy.parquet de Spark partitions

Transformaciones y Acciones de PySpark

En PySpark, las operaciones se dividen en dos categorías: Transformaciones (Transformations) y Acciones (Actions).

Las transformaciones son operaciones que toman un DataFrame como entrada, aplican una transformación y generan un nuevo DataFrame como resultado. Son operaciones «perezosas» (lazy), lo que significa que no se ejecutan inmediatamente sino que se almacenan en el grafo de transformación hasta que se requiere una acción.

  • select(): selecciona columnas específicas.
  • filter(): filtra filas que satisfacen una condición específica.
  • groupBy(): agrupa filas por una o más columnas.
  • join(): une dos DataFrames en función de una o más columnas comunes.
  • distinct(): devuelve un nuevo DataFrame que contiene solo valores distintos.
  • orderBy(): ordena las filas en función de una o más columnas.
  • withColumn(): agrega una nueva columna o reemplaza una columna existente con una nueva.
  • drop(): elimina una o más columnas.

Por otro lado, las acciones son operaciones que toman un DataFrame como entrada y producen un resultado que se almacena o se muestra. Las acciones son operaciones que «activan» el grafo de transformación y hacen que se ejecuten las transformaciones almacenadas.

  • show(): muestra una vista previa de un número determinado de filas.
  • count(): cuenta el número de filas.
  • collect(): recopila todos los datos en la memoria del driver.
  • write(): escribe en un archivo o fuente de datos externa.
  • first(): devuelve la primera fila.
  • max() y min(): devuelve el valor máximo o mínimo en una columna numérica.
  • sum(): devuelve la suma de los valores en una columna numérica.
  • mean(): devuelve la media de los valores en una columna numérica.
  • pivot(): crea una tabla dinámica a partir.

En general, se recomienda minimizar el número de acciones en PySpark y maximizar el uso de transformaciones, ya que éstas son más eficientes y permiten una mejor optimización del flujo de procesamiento de datos.

Funciones de Spark

Dentro del ecosistema Spark podemos encontrar dos tipos de funciones: un grupo de funciones built-in incluidas por defecto para realizar distintas operaciones y las User-defined functions (UDF) que son funciones personalizadas que podemos desarrollar a nuestro gusto.

Funciones built-in
La forma de importarlas es la siguiente. Para ver todas las opciones disponibles basta con escribir en una celda de un cuaderno F. y pulsar el tabulador.

from pyspark.sql import functions as F

Siguiendo el ejemplo del curso, podemos convertir un DATETIME a DATE utilizando la función to_date():

df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

User-defined functions (UDF)
Una UDF (User-Defined Function) en Spark es una función definida por el usuario que se puede usar para realizar transformaciones de datos personalizadas en un DataFrame o RDD. Las UDF se definen en Python, Java, Scala o R, y se pueden aplicar en PySpark, Spark SQL y Spark Streaming. Para definir una UDF en PySpark, podemos usar la función udf() de la biblioteca pyspark.sql.functions. A continuación se muestra un ejemplo de cómo definir una UDF en PySpark para calcular el cuadrado de un número:

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

def square(x):
    return x*x

square_udf = F.udf(square, DoubleType())

En este ejemplo, se define una función square() que calcula el cuadrado de un número, y luego se usa la función F.udf() para convertirla en una UDF. La UDF se define para que tome un argumento de entrada de tipo double y devuelva un valor de tipo double.

Una vez que se ha definido una UDF, se puede aplicar a una columna de un DataFrame mediante la función withColumn() de PySpark:

df \
    .withColumn('square_trip_miles', square_udf(df.trip_miles)) \
    .select("hvfhs_license_num","trip_miles","square_trip_miles") \
    .show(10)

Trabajando con DataFrames y Spark SQL

Podemos trabajar con un DataFrame utilizando los métodos de PySpark o bien aprovechar uno de los componentes clave de Spark: API SQL. Gracias al cual podemos consultar DataFrames como si fueran tablas relacionales y lanzar queries en SQL estándar. Vamos a verlo con dos ejemplos del mismo caso de uso. Vamos a cargar un parquet con las ventas de una tienda online. El fichero tiene tres columnas (producto, cantidad y fecha):

Consulta de DataFrame con PySpark
Para explotar un DataFrame con PySpark tenemos a nuestra los operadores de transformación que comentamos más arriba. Todos ellos se apilan sobre el mismo DataFrame por puntos » . «:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ConsultaVentas").getOrCreate()

df = spark.read.parquet("ruta/ventas.parquet")

fecha = "2022-02-28"
df_productos_vendidos= df.filter(df.fecha == fecha).groupBy("producto").sum("cantidad")

df_productos_vendidos.show()

Consulta de DataFrame con Spark SQL
Al utilizar el API de Spark SQL vamos a poder escribir SQL estándar y consultar el DataFrame como si se tratase de una tabla relacional utilizando el método spark.sql(). Como paso previo es necesario crear una tabla temporal a partir del DataFrame utilizando la función createOrReplaceTempView() y pasándole como parámetro el nombre de la tabla, en nuestro caso ventas.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ConsultaVentas").getOrCreate()

df = spark.read.parquet("ruta/ventas.parquet")

df.createOrReplaceTempView("ventas")

fecha = "2022-02-28"
df_productos_vendidos = spark.sql(f"SELECT producto, SUM(cantidad) FROM ventas WHERE fecha = '{fecha}' GROUP BY producto")

df_productos_vendidos.show()

DataFrames de NY Taxis
En los ejemplos del bootcamp vamos a utilizar los datos de las carreras de taxis de NY green y yellow de 2020 y 2021. Podemos descargarlos de forma manual, desde un notebook de Jupyter y con el shell script que han preparado en el curso (download_data.sh) y lanzando los comandos:

bash download_data.sh yellow 2020
bash download_data.sh yellow 2021
bash download_data.sh green 2020
bash download_data.sh green 2021


1. Vamos a generar un DataFrame agrupando todos los ficheros mensuales por cada tipo de taxi (green y yellow).

df_green = spark.read.csv('data/raw/green/*/*', header=True, inferSchema=True)
df_yellow = spark.read.csv('data/raw/yellow/*/*', header=True, inferSchema=True)


2. Como el objetivo es unirlos en un único DataFrame debemos asegurarnos de que tienen el mismo esquema. Podemos consultarlo con df_yellow.schema() y df_green.schema(). Vamos a realizar varias acciones para crear el nuevo df con las columnas que tienen en común ambos y además, añadir una nueva que identifique el origen:

  • Añadir una nueva columna en cada df para identificar el origen usando el operador de transformación .withColumn(nombre_columna, valor) y la función F.lit() de las librería built-in de Spark que nos permite especificar un valor literal: .withColumn('service_type', F.lit('green'))
  • Columnas que no están en ambos DataFrames. La forma rápida es convirtiendo la lista de columnas a SET de python y combinarlos, pero perdemos el orden de las columnas, por lo que tenemos que usar un bucle FOR para recorrer comparar ambas listas y generar una nueva con las que tienen en común.
  • Columnas con las fechas de subida y bajada (xy_pickup _datetime y xy_dropoff_datetime) tienen distinto nombre. Vamos a renombrarlas en ambos df con el operador de transformación .withColumnRenamed(columna, nueva_columna).
from pyspark.sql import functions as F

# renombramos columnas de fecha:

df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

# generamos lista con columnas en común entre ambos DataFrames
common_colums = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_colums.append(col)

# generamos los nuevos df por cada tipo sólo con las columnas en común y añadiendo el service_type para identificar el tipo de taxi:
df_green_sel = df_green \
    .select(common_colums) \
    .withColumn('service_type', F.lit('green'))

df_yellow_sel = df_yellow \
    .select(common_colums) \
    .withColumn('service_type', F.lit('yellow'))


3. Por último vamos a combinar ambos DataFrames que ahora sí tienen el mismo esquema y una nueva columna para identificar el tipo de taxi.

df_trips_data = df_green_sel.unionAll(df_yellow_sel)

Podemos comprobar el número de registros por tipo de taxi usando PySpark:

df_trips_data.groupBy('service_type').count().show()
Comprobamos con PySpark la distribución de datos en el nuevo DataFrame
Comprobamos con PySpark la distribución de datos en el nuevo DataFrame

Si observamos el DAG generado por el job Spark vemos que se leen los dos CSV por separado y se unen en único RDD:

DAG de job de Spark
DAG de job de Spark

GroupBy en Spark

Vamos a analizar cómo gestiona internamente Spark el operador GroupBy. Podemos probar con Spark SQL o con PySpark. Para nuestro ejemplo vamos a calcular el beneficio y cantidad de viajes por hora y zona de los taxis.

Si lo queremos calcular con PySpark:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test_groupby') \
    .config("spark.ui.port", "4042") \
    .getOrCreate()

df_green = spark.read.csv('data/raw/green/*/*', header=True, inferSchema=True)

df_green_revenue = df_green.filter("lpep_pickup_datetime >= '2020-01-01 00:00:00'") \
    .withColumn("hour", F.date_trunc("hour", "lpep_pickup_datetime")) \
    .groupBy("hour", "PULocationID") \
    .agg({"total_amount": "sum", "*": "count"}) \
    .withColumnRenamed("sum(total_amount)", "amount") \
    .withColumnRenamed("count(1)", "number_records") \
    .orderBy("hour","PULocationID")


O bien con Spark SQL:

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test_groupby') \
    .config("spark.ui.port", "4042") \
    .getOrCreate()

df_green = spark.read.csv('data/raw/green/*/*', header=True, inferSchema=True)

df_green.createOrReplaceTempView("green")

df_green_revenue = spark.sql("""
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2  
""")

df_green_revenue.show()
Resultado del group by sobre el DataFrame de Spark
Resultado del group by sobre el DataFrame de Spark

Vamos a persistir la salida en un parquet y analizar cómo realiza la tarea Spark:

df_green_revenue.write.parquet('data/report/revenue/green', mode="overwrite")


Al tratarse de procesamiento distribuido se dividen los datos en particiones que son enviadas a cada no de los executors. Para combinar los resultados de cada uno se realiza la operación de shuffle.

Si observamos en la Spark UI, vemos que la tarea se ha dividido en dos stages, el primero para preparar el groupBy (genera las agrupaciones intermedias de cada executor) y el segundo genera el resultado final combinándolas (shuffle). Spark no sólo ha generado dos stages para la tarea, si no que además ha ejecutado cada stage en un job independiente. El motivo es que la operación de shuffle es muy costosa. Para agilizar el procesamiento el primer job almacena los resultados intermedios en la caché que luego usará el shuffle, por eso en el segundo job aparece el primer stage como Skipped, que ya ha sido procesado en el primero. Más info en la documentación de Shuffle de Spark.

Tarea de groupBy en Spark se divide en dos jobs y stages
Tarea de groupBy en Spark se divide en dos jobs y stages

Si vemos el DAG de cada job:

DAG de groupBy en Spark
DAG de groupBy en Spark

Si analizamos cada uno de los stages en detalle, en el primero los datos se dividen en particiones y cada una es enviada a un executor, donde se realizan las agrupaciones y operaciones necesarias (filter y groupBy). Esto genera las agrupaciones o resultados intermedios, donde H es la hora, Z la zona y los dos KPIs con el beneficio y total de carreras.

Spark GroupBy

En el segundo stage se realiza la operación de shuffle para combinar todos los resultados intermedios y agrupar los datos con la misma clave en la misma partición. Spark identifica como clave las columnas del GroupBy (en nuestro ejemplo hour y zone). Por último, se realiza un nuevo GroupBy sobre las nuevas particiones para realizar reducir los datos agrupándolos por la clave. Es posible que en alguna de las particiones generadas por la operación shuffle haya datos de distintas claves, pero en la última operación de agrupación se reparten en las particiones correspondientes.

Spark GroupBy

Join en Spark

En Spark podemos combinar dos DataFrames como si se tratasen de dos tablas relacionales mediante el operador .join(df, on, how), donde:

  1. df: El DataFrame que se unirá con el DataFrame principal. Debe especificarse como un objeto DataFrame de Pyspark.
  2. on: Una o varias columnas comunes a ambas tablas que se utilizarán para unir los df. Puede especificarse como una cadena que contenga el nombre de la columna, o una lista de cadenas que contengan los nombres de las columnas (["id","nombre"]).
  3. how: El tipo de unión que se realizará. Puede tomar uno de los siguientes valores:
    • 'inner': Realiza una unión interna, es decir, devuelve sólo los registros que tienen coincidencias en ambas tablas.
    • 'outer' o 'full': Realiza una unión externa completa, es decir, devuelve todos los registros de ambas tablas, incluso si no tienen una coincidencia en la otra tabla.
    • 'left' o 'left_outer': Realiza una unión externa izquierda, es decir, devuelve todos los registros del DataFrame izquierdo y los registros coincidentes del DataFrame derecho. Si no hay coincidencias en el DataFrame derecho, los valores para las columnas del DataFrame derecho serán null.
    • 'right' o 'right_outer': Realiza una unión externa derecha, es decir, devuelve todos los registros del DataFrame derecho y los registros coincidentes del DataFrame izquierdo. Si no hay coincidencias en el DataFrame izquierdo, los valores para las columnas del DataFrame izquierdo serán null.
    • 'left_semi': Realiza una unión semijoin izquierda, es decir, devuelve sólo los registros del DataFrame izquierdo que tienen una coincidencia en el DataFrame derecho.
    • 'left_anti': Realiza una unión anti-izquierda, es decir, devuelve sólo los registros del DataFrame izquierdo que no tienen una coincidencia en el DataFrame derecho.
from pyspark.sql.functions import *

df = df1.join(df2, on='id', how='left')

En el bootcamp profundizan en cómo se comporta Spark internamente cuando debe realizar operaciones de join entre tablas muy grandes o cuando una de las tablas es muy grande y la otra pequeña. En el primer caso, el comportamiento es muy similar al del GroupBy. Spark particiona los DataFrames, hace un shuffle para reorganizarlos por la clave y finalmente los agrupa en particiones por clave.

Join Spark

En el ejemplo observamos como hacemos un outer join entre los DataFrames de carreras de taxis yellow y green para quedarnos todos los registros de ambas tablas. Esto a a generar un nuevo DataFrame con las columnas de la clave (hour y zone) y las columnas de amount y number_records de yellow y green. Para facilitar el trabajo se renombran. Podéis consultar el notebook completo en mi repo.

df_green_revenue_tmp = df_green_revenue \
    .withColumnRenamed('amount', 'green_amount') \
    .withColumnRenamed('number_records', 'green_number_records')

df_yellow_revenue_tmp = df_yellow_revenue \
    .withColumnRenamed('amount', 'yellow_amount') \
    .withColumnRenamed('number_records', 'yellow_number_records')

df_join = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=['hour', 'PULocationID'], how='outer')

df_join.write.parquet('data/report/revenue/total', mode='overwrite')

Si comprobamos en el Spark UI observamos que la operación se ha dividido en tres jobs que procesan un stage cada uno. Los dos primeros se encargan de la lectura de cada uno de los DataFrames y el último (con 12 tasks) es el encargado de realizar el join.

Stages de join Spark
Stages de join Spark

Si entramos a ver el DAG del último job observamos que los dos primeros stages están skipped (se han ejecutado previamente). En el Stage 87 se encarga de realizar el shuffle y por último agrupar el resultado (reduces).

DAG de join en Spark
DAG de join en Spark

En el segundo caso, vamos a cruzar una tabla lookup pequeña (zones) con una muy grande (el DataFrame que hemos acabamos de generar combinando las carreras de ambos tipos de taxi).

En Spark cuando se realiza una operación de join entre dos DataFrames el proceso de unión puede ser costoso en términos de recursos computacionales, especialmente si uno de ellos es mucho más grande que el otro. Una forma de optimizar este proceso de unión es utilizar el broadcasting.

Arquitectura de Broadcasting en Spark
Arquitectura de Broadcasting en Spark

En lugar de enviar el DataFrame pequeño a través de la red y unirlo con el DataFrame grande en los nodos de trabajo (wokers), Spark envía el DataFrame pequeño a todos los nodos de trabajo que procesan el DataFrame grande. De esta manera, cada worker puede realizar la unión localmente sin necesidad de realizar una operación de red costosa.

El broadcasting de Spark se puede utilizar en operaciones de join cuando se cumplen las siguientes condiciones:

  • Uno de los DataFrames es significativamente más pequeño que el otro, por lo general, cuando el tamaño del DataFrame pequeño es menor que el tamaño de la memoria disponible en cada nodo del clúster.
  • El tamaño del DataFrame pequeño es menor que el parámetro spark.sql.autoBroadcastJoinThreshold. Este parámetro define el tamaño máximo que un DataFrame puede tener para que Spark utilice la técnica de broadcasting.

Para el ejemplo los nombres de columna no son iguales en ambos df en lugar de especificar el parámetro on vamos a indicar directamente las columnas clave de cada uno df_join.PULocationID == df_zones.LocationID. La operación de join se lleva todas las columnas al nuevo DataFrame. Como en este caso la clave tiene un nombre distinto en cada df, en el nuevo estarán las dos columnas de Location. Como buena práctica de limpieza es recomendable borrar una de ellas .drop('LocationID').

!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

df_result = df_join.join(df_zones, df_join.PULocationID == df_zones.LocationID)

df_result.drop('LocationID').write.parquet('tmp/revenue-zones')

Vamos a volver a Spark UI a ver cómo ha generado la orden de trabajo. Observamos que se ha dividido en 4 jobs con una stage cada uno. Si prestamos atención, vemos que el job con id 69 con sólo 3″ de duración habría cargado el DataFrame pequeño de zonas, a continuación el job 70 realiza la operación de broadcast, el 71 carga el segundo DataFrame y por último el job que realiza el join.

Spark UI broadcasting DataFrames
Spark UI broadcasting DataFrames

Resilient Distributed Datasets (RDDs)

Imagina que Spark es como una gran caja de herramientas para trabajar con datos en un grupo de ordenadores (clúster). En esa caja, una de las herramientas más importantes es el RDD, que es un conjunto de objetos distribuidos entre todos los nodos del clúster. Los RDD son muy útiles porque se pueden procesar en paralelo y procesar datos de manera más rápida.

Pero para trabajar con datos estructurados, como si fueran una tabla de una base de datos, se necesita una herramienta un poco diferente. Esa herramienta es el DataFrame, que es como una tabla que se puede manipular con facilidad. La ventaja de los DataFrames es que son más fáciles de usar que los RDDs y se pueden procesar de manera más eficiente.

Aunque los DataFrames son más fáciles de usar, siguen utilizando los RDDs en segundo plano para hacer todo el trabajo duro. Cuando se convierte un RDD en un DataFrame, se están organizando los datos en una tabla fácil de usar, y cuando se convierte un DataFrame en un RDD, se están sacando los datos de esa tabla para que se puedan procesar de manera más eficiente en paralelo. En general, se recomienda utilizar DataFrames siempre que sea posible debido a que son más fáciles de usar y optimizados para consultas SQL y de tipo tabla. Sin embargo, hay algunas situaciones donde puede ser útil utilizar RDD, como por ejemplo, si se necesita realizar operaciones más complejas o cálculos de bajo nivel que no son posibles con DataFrames. Además, si estamos trabajando con tipos de datos no estructurados, como archivos de registro, los RDD pueden ser una opción más adecuada.

Los datos en un RDD están organizados en un tipo de objeto llamado Row, que sería un concepto similar al de fila o tupla de una base de datos relacional o estructurada. Podemos acceder a los valores de cada columna en esa fila, ya sea por su índice (como un número de posición) o por su nombre (como un título).

Rows de Spark RDD
Rows de Spark RDD

Dentro de RDD, se pueden utilizar tres operaciones principales: Map(), Filter() y ReduceByKey()

Map
Se utiliza para transformar cada elemento de un RDD en otro elemento. Por ejemplo, se puede aplicar una función a cada elemento del RDD para convertirlo en una cadena o para realizar una operación. La función map crea un nuevo RDD que contiene los elementos resultantes de aplicar la transformación a cada elemento del RDD original.

# Creamos un RDD con una lista de números
numeros = sc.parallelize([1, 2, 3, 4, 5])

# Utilizamos la función map para multiplicar cada número por 2
numeros_por_dos = numeros.map(lambda x: x * 2)

# Imprimimos el nuevo RDD con los elementos resultantes
print(numeros_por_dos.collect())  # Resultado: [2, 4, 6, 8, 10]


Filter
se utiliza para seleccionar un subconjunto de elementos de un RDD que cumple una determinada condición. Crea un nuevo RDD que contiene los elementos seleccionados.

# Creamos un RDD con una lista de palabras
palabras = sc.parallelize(['hola', 'adios', 'gato', 'perro', 'casa'])

# Utilizamos la función filter para seleccionar las palabras con más de 5 letras
palabras_largas = palabras.filter(lambda x: len(x) > 5)

# Imprimimos el nuevo RDD con los elementos seleccionados
print(palabras_largas.collect())  # Resultado: ['adios']


ReduceByKey
sirve para reducir los valores de un RDD que tienen la misma clave. Por ejemplo, se puede aplicar una función a cada valor de un RDD para agregarlos y, a continuación, utilizar la función reduceByKey para agregar los valores que tienen la misma clave. Crea un nuevo RDD que contiene las claves y los valores resultantes de aplicar la función de agregación.

# Creamos un RDD con una lista de tuplas que representan ventas de productos
ventas = sc.parallelize(
[('norte', 'producto1', 100), 
('sur', 'producto1', 200), 
('norte', 'producto2', 50), 
('sur', 'producto2', 150)])

# Utilizamos la función reduceByKey para sumar las ventas de cada producto por región
ventas_por_region = ventas.reduceByKey(lambda x, y: x + y)

# Imprimimos el nuevo RDD con los resultados de las ventas por región
print(ventas_por_region.collect())  # Resultado: [('norte', 150), ('sur', 350)]

MapPartition
El método mapPartitions es útil cuando necesitas aplicar una función a un conjunto de datos que está dividido en particiones, especialmente en proyectos ML. Digamos que tienes una gran cantidad de datos que Spark divide en múltiples particiones para procesarlos más rápido. Esta función es recomendable usarla cuando deseas aplicar una transformación a cada partición de un conjunto de datos en lugar de aplicarla a cada registro individualmente (que para eso usaríamos .map()).

Por ejemplo, imagina que tienes un conjunto de datos que contiene información de ventas de diferentes tiendas en todo el país. Cada partición contiene información de ventas de una región geográfica específica. Si deseas agregar la cantidad de ventas de cada región, en lugar de aplicar una función a cada registro, puedes usar mapPartitions para aplicar la función a cada partición de datos y luego sumar los resultados.

# Crear un RDD de números
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 3) # Dividir en 3 particiones

# Definir una función que devuelve el número máximo en cada partición
def max_partition(iterator):
    yield max(iterator)

# Aplicar la función a cada partición usando mapPartitions
max_per_partition = rdd.mapPartitions(max_partition).collect()

# Imprimir el número máximo en cada partición
print(max_per_partition)

En este ejemplo creamos un RDD con 10 números y lo dividimos en 3 particiones usando el argumento numSlices en la función parallelize. Luego, definimos una función llamada max_partition que toma un iterador de números y devuelve el número máximo en esa partición. Usamos yield en lugar de return en la función porque estamos generando un único valor para cada partición.

Finalmente, usamos mapPartitions para aplicar la función max_partition a cada partición y obtener el número máximo en cada una de ellas. El resultado es una lista con el número máximo de cada partición, que imprimimos usando la función print.

Trabajando con RDDs en Spark

Vamos a realizar el ejercicio de generar la consulta de Spark SQL que hemos visto previamente con RDD en lugar de DataFrames. Esta es la consulta original:

df_green_revenue = spark.sql("""
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2  
""")

Vamos a utilizar los datos de los taxis green que hemos descargado previamente generando el DataFrame df_green con todos los CSVs correspondientes. Para convertir un DataFrame a RDD simplemente utilizamos el operador .rdd. Vamos a comenzar convirtiendo el SELECT a RDD con el mismo operador de transformación .select() (líneas 18-20).

El WHERE lo vamos a generar utilizando el operador .filter(), para lo que se crea la función filter_outliers(row) que recibe como parámetro una Row que el propio RDD va iterando (como las funciones lambda cuando las usamos para transformar DataFrames). La función simplemente realiza la comprobación de que el la fecha de recogida (lpep_pickup_datetime ) sea a partir del 1.1.2022.

Para realizar las operaciones de agregación SUM y COUNT del GROUPBY necesitamos crear dos funciones para preparar los datos y agruparlos. En primer lugar vamos a generar un nuevo RDD con los resultados intermedios (con una Key y los valores) usando el operador .map(). Vamos a dividir cada Row en dos tuplas (clave(x,y) , valor(k,z)). La función prepare_for_grouping genera las tuplas key(hour, zone) y value (amount, count). El count es 1 por cada Row para que cuando se sumen todas nos de el número total de Rows, es decir, un count(1).

Por otro lado, vamos a crear otra función (calculate_revenue) para realizar el cálculo de agregación (sumatorio del amount y el conteo del nº de carreras) y vamos a invocarla desde reduceByKey(). Este operador infiere cuál es la clave del RDD original (columnas con los mismos valores), por lo que en cada iteración va a agrupar dos Rows con la misma Key sumando los valores amount y count.

El método unwrap lo vamos a usar para mostrar los resultados como si fueran una fila y no un conjunto de tuplas, para después generar el DataFrame de una forma más higiénica con el operador .toDF(). Si lo deseamos, podemos pasarle como parámetro el esquema de los datos.

from datetime import datetime
from collections import namedtuple
from pyspark.sql import types

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()


start = datetime(year=2020, month=1, day=1)

df_green = spark.read.csv('data/raw/green/*/*', header=True, inferSchema=True)

rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

def prepare_for_grouping(row): 
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)

def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )



result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema) 

df_result.write.parquet('tmp/green-revenue')
PicklingError: Could not serialize object: IndexError: tuple index out of range

Si intentamos realizar una acción con el RDD (por ejemplo .take(), .filter(), etc) y nos arroja este error, es necesario utilizar una versión de Python inferior a la 3.11. Podemos crear un nuevo entorno de Python con Anaconda, a mano, o si resulta más sencillo, utilizar un colab de Google que por defecto instala Python 3.8.

Spark Standalone Mode

Spark Standalone Mode es un modo de ejecución de Apache Spark que permite ejecutar aplicaciones en un clúster dedicado. En este modo, Spark actúa como un administrador de clúster y controla la asignación de recursos y la planificación de tareas. El modo Standalone es una de las opciones de implementación de Spark, junto con YARN (Yet Another Resource Negotiator) y Mesos. Podemos consultar la documentación de Spark Standalone Mode para más información.

En el modo Standalone, se puede iniciar un clúster Spark simplemente ejecutando una serie de comandos para arrancar el nodo maestro (master) y varios nodos de trabajo (workers). El nodo maestro coordina el clúster y administra los recursos disponibles, mientras que los nodos de trabajo ejecutan tareas.

Para arrancar el Spark Standalone mode en Windows 10 por línea de comandos CMD debemos seguir los siguientes pasos. Antes de empezar, recuerda que tienes que tener definida la variable de entorno SPARK_HOME (si lo has hecho por Git Bash, no está en Windows). Para declararla desde línea de comandos:

setx SPARK_HOME "c:\tools\spark-3.3.2-bin-hadoop3"


1. Abrimos un prompt de CMD y nos dirigimos a la ruta donde hemos descargado Spark.

cd %SPARK_HOME%\bin


2. Ejecutamos el comando el siguiente comando para arrancar el nodo master. Nos va a generar una URL tipo spark://192.168.0.38:7077 que usaremos para arrancar el worker. Por otro lado, también arranca el Spark UI que por defecto se despliega en el puerto 8080 (http//localhost:8080).

spark-class org.apache.spark.deploy.master.Master 


3. Abrimos otro prompt de CMD y arrancamos un worker con el siguiente comando. Fíjate en que se le pasa como parámetro la URL del master que hemos iniciado en el paso preivo:

spark-class org.apache.spark.deploy.worker.Worker spark://192.168.0.38:7077


4. Creamos un nuevo cuaderno de jupyter y conectamos la SparkSession al nodo master que acabamos de levantar indicando la URL spark://192.168.0.38:7077:


spark = SparkSession.builder \
    .master("spark://192.168.0.38:7077") \
    .appName('test') \
    .getOrCreate()


5. Nos dirigimos a la Spark UI para ver cómo se han arrancado los nodos master y worker, y la app:

Spark UI de Spark Standalone Mode
Spark UI de Spark Standalone Mode
failed to launch: nice -n 0 /c/tools/spark-3.3.2-bin-hadoop3/bin/spark-class org.apache.spark.deploy.master.Master --host --port 7077 --webui-port 8080
  ps: unknown option -- o
  Try `ps --help' for more information.
  Error: Could not find or load main class org.apache.spark.launcher.Main
  Caused by: java.lang.ClassNotFoundException: org.apache.spark.launcher.Main
  /c/tools/spark-3.3.2-bin-hadoop3/bin/spark-class: line 96: CMD: bad array subscript
full log in /c/tools/spark-3.3.2-bin-hadoop3/logs/spark--org.apache.spark.deploy.master.Master-1-FORGEWORLD.out

Este error lo lanza Git bash al intentar arrancar Spark con el comando start-master.sh. Al parecer, el lanzador %SPARK_HOME%\sbin no está soportado en Windows.

¡Bien! ya tenemos arrancado nuestro propio clúster Spark en local, vamos a probar a crear scripts python que corran sobre el mismo. Para nuestra prueba vamos a aprovechar uno de los notebooks que hemos realizado previamente y vamos a convertirlo en un script python con jupyter nbconvert desde Git Bash. Una vez generado el fichero debemos limpiarlo de código autogenerado por Jupyter.

jupyter nbconvert --to=script 06_spark_sql.ipynb

Vamos a sustituir los nombres de los ficheros y los directorios que están puestos a fuego por parámetros, para lo que usaremos la librería argparse. Podéis consultar el fichero en la repo de Github:

import argparse

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

parser.add_argument('--input_green', required=True)
parser.add_argument('--input_yellow', required=True)
parser.add_argument('--output', required=True)

input_green = args.input_green
input_yellow = args.input_yellow
output = args.output

Para lanzar el script Python en el clúster local de Spark que hemos levantado, desde la terminal ejecutamos:

python my_script.py \
    --input_green=data/pq/green/2020/*/ \
    --input_yellow=data/pq/yellow/2020/*/ \
    --output=data/report-2020

Spark submit

Spark submit es una herramienta de línea de comandos en Apache Spark que permite enviar y ejecutar aplicaciones Spark en un cluster de Spark. Para enviar una aplicación Spark con spark-submit simplemente indicamos el script python y los parámetros necesarios. También proporciona opciones para especificar el número de cores y memoria que se asignarán a la aplicación en el clúster de Spark. Además, puede ser utilizado para enviar aplicaciones a diferentes modos de ejecución de Spark, incluyendo modo local, modo cluster y modo cliente.

Para utilizarlo con nuestros ejemplos, dentro del mismo script Python en lugar de conectarnos al SparkSession indicando el master, vamos a eliminar este parámetro dejándolo así:

spark = SparkSession.builder \
    .appName('test') \
    .getOrCreate()


Lo que vamos a hacer es ejecutarlo desde spark-submit y pasarle como parámetro la URL del master de Spark que hemos levantado:

spark-submit \
    --master="spark://192.168.0.38:7077" \
    my_script.py \
        --input_green=data/pq/green/2020/*/ \
        --input_yellow=data/pq/yellow/2020/*/ \
        --output=data/report-2020

Spark en Google Cloud Platform: Dataproc

Dataproc es un servicio de Google Cloud que te permite ejecutar tareas de procesamiento de datos a gran escala de manera fácil y rápida. En otras palabras, es una herramienta que te permite procesar grandes cantidades de datos en paralelo usando el poder de la nube.

Con Dataproc, puedes crear un clúster de computación en la nube y ejecutar tareas de procesamiento de datos como procesamiento por lotes, análisisy aprendizaje automático. El servicio está diseñado para ser fácil de usar, escalable y rentable, lo que lo hace ideal para empresas y organizaciones que necesitan procesar grandes cantidades de datos de manera eficiente. Además, Dataproc está integrado con otras herramientas de Google Cloud, como BigQuery, Cloud Storage y Cloud Dataprep, lo que te permite integrar y analizar datos de diferentes fuentes de manera eficiente.

Vamos a seguir los siguientes pasos para ejecutar nuestro script Python en Dataproc:

1. En primer lugar vamos a otorgar permisos sobre Dataproc al service account que estamos usando en el bootcamp. Desde el IAM lo seleccionamos y añadimos privilegios sobre el rol Dataproc Administrator.

Dataproc role en IAM
Dataproc role en IAM

2. Para crear una instancia de Dataproc simplemente buscamos desde en la barra de búsqueda por Dataproc. La primera vez que accedamos nos solicitará habilitar la API.

Dataproc API
Dataproc API

3. Creamos un nuevo cluster de Dataproc en modo Compute Engine:

Cluster Dataproc Compute Engine
Cluster Dataproc Compute Engine

3. Vamos a definir un nombre, región (debe ser la misma que nuestro Bucket (GCS)). En los siguientes pasos nos permite especificar el número de nodos (la versión Single Node es ideal para hacer pruebas, sólo incluye un nodo y sale más económico). Por último, podemos especificar que el clúster de Dataproc incluya ciertos componentes (Docker, Anaconda, Jupyter Notebook…).

Dataproc
Dataproc

Ejecutar job Spark en Dataproc

Hay varias formas de ejecutar un job de Spark en Dataproc. En el curso vemos desde el UI de la GCP y por línea de comandos con la SDK gcloud, pero también podríamos lanzarlo a través del API, node.js, etc.. Más info en la documentación de Google.

Ejecutar job Spark desde Google Cloud Patform UI

Ejecutamos un job desde el UI simplemente entrando en el Dataproc Cluster y pulsando en el botón Submit job en el menú superior. Se abrirá un blade donde podemos configurar las propiedades del mismo:

Nuevo job en Cluster Dataproc
Nuevo job en Cluster Dataproc
  • Job ID: identificador del job
  • Job Type: lenguaje de programación del script, en nuestro caso, PySpark.
  • Main python file: ubicación física del script Python (porque hemos seleccionado PySpark como type). Vamos a utilizar nuestro Bucket para alojar el script, para subirlo podemos hacerlo a mano desde el UI o utilizando el SDK gcloud que instalamos en la primera semana del curso. Recuerda que no debemos especificar en el SparkSession el clúster master, debemos utilizar el script del ejemplo de spark-submit. Por tanto, introducimos la ruta física del script en nuestro bucket: gs://dtc_data_lake_digital-aloe-375022/code/06_spark_sql_big.py. Para copiarlo de local al Bucket:
gsutil cp 06_spark_sql.py gs://dtc_data_lake_digital-aloe-375022/code/06_spark_sql_big.py
  • Jar files: para este ejemplo no es necesario, ya que sólo ejecutamos un script Python.
  • Arguments: aquí especificamos los parámetros que espera recibir el script, recuerda que son tres. Se añaden uno por uno (como si fueran tags) incluyendo el prefijo de «–«. Revisa la ubicación de los ficheros en tu bucket.
--input_green=data/green/2020/*/
--input_yellow=data/yellow/2020/*/ 
--output=data/report-2020

3. Pulsamos sobre el botón submit y comienza a ejecutarse de inmediato.

Ejecutar job Spark desde gcloud SDK

Otra forma de lanzar un job de Spark en Dataproc es usando el gcloud SDK. Abrimos una terminal y lanzamos el siguiente comando. Como nuestro script es un Python con PySpark, indicamos pyspark en job-command. Especificamos el nombre del clúster Dataproc que hemos instanciado, la región y por último

gcloud dataproc jobs submit pyspark \
    --cluster=de-zoomcamp-cluster \
    --region=europe-west6 \
    gs://dtc_data_lake_de-zoomcamp-nytaxi/code/06_spark_sql.py \
    -- \
        --input_green=gs://dtc_data_lake_de-zoomcamp-nytaxi/pq/green/2020/*/ \
        --input_yellow=gs://dtc_data_lake_de-zoomcamp-nytaxi/pq/yellow/2020/*/ \
        --output=gs://dtc_data_lake_de-zoomcamp-nytaxi/report-2020


Ejemplo:

gcloud dataproc jobs submit pyspark \
    --cluster=<your-cluster-name> \
    --region=europe-west6 \
    gs://<url-of-your-script> \
    -- \
        --param1=<your-param-value> \
        --param2=<your-param-value>

Job Spark + Dataproc + BigQuery

Por último, vemos cómo ejecutar un job en Dataproc para que ingeste y transforme los datos para finalmente guardarlos en una tabla de BigQuery en lugar de ficheros parquet.

1. Vamos a crear otro script de Python copia de 06_spark_sql.py donde modificaremos dónde queremos persistir el output. En lugar de un fichero parquet, vamos a guardarlos en una tabla de BigQuery. Modificamos la última línea del .write(). En .option() indicamos que es una tabla y la variable output es uno de los parámetros que se recibe al ejecutar el job.

df_result.write.format('bigquery') \
    .option('table', output) \
    .save()


2. Subimos el nuevo script al Bucket de Google:

gsutil cp 06_spark_sql_big_query.py gs://dtc_bucket/code/06_spark_sql_big_query.py


3. Vamos a crear un nuevo job de Spark donde además de seguir la misma configuración del ejemplo anterior, vamos a añadir la librería gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar en el apartado de jar files. Para más información, consulta la documentación.

gcloud dataproc jobs submit pyspark \
    --cluster=de-zoomcamp-cluster \
    --region=europe-west6 \
    --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    gs://dtc_data_lake_de-zoomcamp-nytaxi/code/06_spark_sql_big_query.py \
    -- \
        --input_green=gs://dtc_data_lake_de-zoomcamp-nytaxi/pq/green/2020/*/ \
        --input_yellow=gs://dtc_data_lake_de-zoomcamp-nytaxi/pq/yellow/2020/*/ \
        --output=trips_data_all.reports-2020

4. Lanzamos el job y observamos cómo se ha creado la tabla en nuestro esquema de BigQuery:

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

What is 8 + 6 ?
Please leave these two fields as-is: