Data Engineering Zoomcamp – Semana 6

Stream processing con Apache Kafka

Llegamos a la última semana del bootcamp para Data Engineers organizado por DataTalksClub. Si la semana pasada vimos el procesamiento de datos por lotes, en esta vamos a abordar el procesamiento en tiempo real con Apache Kafka. En primer lugar analizando cómo funciona internamente para luego practicar levantando un clúster Spark y Kafka en local con Docker o utilizando el SaaS de Confluence que ejecuta un clúster de Kafka en cloud.

Última actualización: 18/03/2023

Prerequisitos

En mi caso estoy trabajando en Windows, os recomiendo seguir la guía de instalación de los prerequesitos. Es importante tener en cuenta que si estamos usando Git bash o MINGW64 el catálogo de variables de entorno no se replica en Windows, por lo que debemos arrancar los cuadernos de jupyter desde el command shell de unix para que coja correctamente los path configurados.

  • Apache Kafka: podemos instalarlo de forma manual siguiendo las instrucciones de la documentación oficial o bien levantar un contenedor de Docker preconfigurado. Puedes consultar cómo configurarlo y desplegarlo más adelante.
  • Revisa las variables de entorno en Windows, además de tenerlas declaradas de forma independiente, es necesario añadirlas al PATH:
    • HADOOP_HOME=C:\tools\hadoop-3.2.0
    • JAVA_HOME=C:\Program Files\Java\jdk-11.0.17
    • SPARK_HOME=C:\tools\spark-3.3.2-bin-hadoop3
    • PYSPARK_PYTHON=python

Stream processing (procesamiento de datos en tiempo real)

El stream processing (procesamiento de datos en tiempo real) es un método que implica el procesamiento continuo de datos en tiempo real a medida que se generan y se transmiten a través de un flujo (stream) en lugar de procesarlos como un lote (batch) estático.

Los datos se procesan a medida que se generan y se transmiten, lo que significa que se pueden analizar y tomar decisiones en tiempo real. Esto es especialmente útil en aplicaciones donde la velocidad y la precisión son críticas, como el monitoreo de transacciones financieras, la detección de fraudes, la gestión de inventario, análisis de redes sociales y seguimiento de la cadena de suministro, entre otros.

La mayoría de los sistemas de stream processing están diseñados para ser escalables, tolerantes a fallos y distribuidos, lo que permite el procesamiento de grandes cantidades de datos en tiempo real en múltiples nodos o servidores. Algunas herramientas populares de stream processing incluyen Apache Kafka, Apache Flink, Apache Storm, Spark Streaming, y AWS Kinesis.

Stream processing
Stream processing
VentajasDesventajas
Su arquitectura distribuida permite que se pueda escalar horizontalmente según las necesidades de la empresa.Tecnología compleja y puede requerir conocimientos especializados para configurarlo correctamente.
Es capaz de procesar y entregar los datos en tiempo real, lo que significa que los usuarios pueden tomar decisiones más rápidamente.Puede requerir una cantidad significativa de recursos de hardware para funcionar correctamente, especialmente si se trata de grandes volúmenes de datos.
Altamente tolerante a fallos y ofrece una alta durabilidad de los datos. Los datos se almacenan en múltiples nodos, lo que significa que si un nodo falla, los datos se pueden recuperar fácilmente.Aunque Kafka es una tecnología de código abierto, puede haber costos asociados con su implementación y mantenimiento.
Compatible con múltiples lenguajes de programación y sistemas operativos, lo que lo hace fácilmente integrable en diferentes sistemas.Se puede usar en conjunto con otras tecnologías, lo que puede crear dependencias en la infraestructura tecnológica de la empresa.
Pros y contras de procesamiento en tiempo real (streaming)

Apache Kafka

Apache Kafka es una plataforma de streaming de datos de código abierto desarrollada por la Apache Software Foundation. Se utiliza para la transmisión de datos en tiempo real a través de diferentes aplicaciones y sistemas, lo que permite a los usuarios procesar, analizar y almacenar grandes cantidades de datos en tiempo real.

Kafka se basa en el modelo de publicación-suscripción, donde los datos son enviados por los productores (producers) y recibidos por los consumidores (consumers) en tiempo real. Está diseñado para ser escalable y tolerante a fallos, lo que significa que es capaz de manejar grandes volúmenes de datos y puede continuar operando incluso si algunos nodos fallan. Ofrece una arquitectura distribuida, lo que significa que se puede implementar en múltiples servidores y se puede escalar horizontalmente según las necesidades. Además, cuenta con una API, lo que facilita su integración con diferentes sistemas y lenguajes de programación.

Productores y Consumidores

Los productores y los consumidores son los componentes principales de Kafka. Los productores son los encargados de enviar los datos a Kafka, mientras que los consumidores los reciben y los procesan. Es como si los productores fueran los que hablan y los consumidores los que escuchan. Los consumidores se pueden agrupar en grupos de consumidores (consumer.group.id) para leer un mismo topic.

Apache Kafka Producers y Consumers
Apache Kafka Producers y Consumers

Acks (confirmaciones) es una configuración de Apache Kafka que se refiere a la cantidad de réplicas de un mensaje que deben confirmar la recepción antes de que se considere que el mensaje ha sido correctamente procesado y se envíe una respuesta de confirmación al productor.

Cuando un productor envía un mensaje a un topic en Kafka, se envía a los brokers que son responsables de ese topic que tienen una o más réplicas del mismo. La configuración acks indica cuántas réplicas deben confirmar la recepción del mensaje antes de que el productor reciba una respuesta de confirmación. Hay tres valores posibles:

  • 0: El productor no espera confirmación de recepción. Esto significa que el mensaje se envía al broker y se considera enviado.
  • 1: El productor espera la confirmación de recepción de al menos un broker. Si uno confirma la recepción del mensaje se envía una respuesta de confirmación al productor.
  • all» o -1: El productor espera la confirmación de recepción de todas las réplicas. Cuando han confirmado la recepción del mensaje, se envía una respuesta de confirmación al productor.

La configuración acks es importante porque afecta la disponibilidad y la durabilidad de los datos en Kafka. Si se establece en 0 se corre el riesgo de perder mensajes si se produce una incidencia en el broker antes de que se hayan replicado los datos. Si se establece en all se asegura la durabilidad de los datos, pero impacta en la entrega del mensaje debido a la necesidad de que todas las réplicas confirmen la recepción.

Brokers

Los brokers son los nodos de Kafka que se encargan de almacenar y distribuir los datos. Cada broker es capaz de almacenar uno o más topics y es responsable de asegurarse de que los datos sean entregados correctamente a los consumidores. Si un broker falla, los otros brokers pueden continuar operando sin interrupciones.

Apache Kafka Producers, Consumers y Brokers
Apache Kafka Producers, Consumers y Brokers

Clúster de Kafka

Un clúster de Kafka es un conjunto de servidores o brokers de Kafka que trabajan juntos para proporcionar una plataforma de streaming de datos distribuida y tolerante a fallos. Permite escalar la plataforma horizontalmente para procesar grandes cantidades de datos en tiempo real. Cada broker en el clúster de Kafka es responsable de almacenar y distribuir un subconjunto de los datos del clúster. Los productores envían mensajes a los brokers, que los almacenan en particiones y los distribuyen a los consumidores.

Uno de los broker se convierte en el controlador (controller) para coordinar la asignación de particiones a todos los brokers. El controlador supervisa el estado de los brokers y las particiones y se asegura de que las réplicas de las particiones estén distribuidas de manera uniforme entre los brokers. Por otro lado, utiliza grupos de consumidores (consumer groups) para equilibrar la carga de lectura de mensajes. Los consumidores de un grupo se asignan a diferentes particiones de un topic para leer los mensajes de forma paralela y procesarlos.

Apache Kafka Producers, Consumers, Brokers y Clúster
Apache Kafka Producers, Consumers, Brokers y Clúster

KRaft (sustituto de Zookeeper)

KRaft es un protocolo de consenso de Apache Kafka que fue introducido en la versión 2.4.0 (aunque se liberó oficialmente en la 3.1.1) y que tiene como objetivo reemplazar a ZooKeeper para la gestión de los metadatos del clúster de Kafka. ZooKeeper gestiona el estado del clúster y almacena los metadatos de los diferentes componentes, como los topics, particiones, brokers, etc. con la misión de gestionar su estado estado y coordinar los consumidores. Sin embargo, puede ser una fuente de problemas, ya que su arquitectura centralizada lo hace propenso a fallos y cuellos de botella.

Con el objetivo de mejorar la fiabilidad y escalabilidad del sistema, Kafka introdujo el protocolo KRaft, que permite almacenar los metadatos en los propios brokers de Kafka a través de un topic en lugar de depender de ZooKeeper. Kraft utiliza un sistema distribuido de almacenamiento de metadatos que permite a los brokers mantener una copia de los metadatos del clúster, lo que mejora la disponibilidad y elimina la necesidad de un sistema de gestión de estado centralizado como ZooKeeper.

Diferencia entre Zookeeper y KRaft en Apache Kafka

Históricamente, el plano de control de Kafka se gestionaba a través de un servicio de consenso externo llamado ZooKeeper. Uno de los brokers se designa como controlador y es responsable de comunicarse con ZooKeeper y los demás brokers del clúster. Los metadatos del clúster se almacenan en ZooKeeper. En KRaft se designa un grupo de brokers como controladores, que proporcionan los servicios de consenso que solían ser proporcionados por ZooKeeper. Todos los metadatos del clúster se almacenan en topics de Kafka y se gestionan internamente.

Apache Kafka Producers, Consumers, Brokers, Clúster y KRaft
Apache Kafka Producers, Consumers, Brokers, Clúster y KRaft

Topics

Un topic es una categoría o flujo de mensajes que se almacenan en un clúster de Kafka. Los productores envían mensajes a un topic específico y los consumidores pueden leerlos. Cada topic se compone de uno o más logs, que son la persistencia física de los mensajes en una secuencia ordenada. Cada log se almacena en uno o más brokers. Un topic divide los mensajes en múltiples particiones, lo que permite distribuir la carga de datos entre los nodos del clúster (brokers). Cada mensaje tiene asignado un par clave-valor, un timestamp y se identifica con un offset, número único que indica la posición del mensaje en la partición. Los consumidores utilizan los offsets para saber qué mensajes ya han sido procesados y cuáles están pendientes.

Arquitectura Apache Kafka
Arquitectura Apache Kafka

Particiones

Cada topic puede tener una o varias particiones, que son secuencias ordenadas y duraderas de mensajes que se almacenan en los brokers de Kafka. Cada partición se asigna a un único broker y se denomina líder de la partición, que es el responsable de recibir y servir las solicitudes de L/E de los productores y consumidores para esa partición. Una partición sólo se puede asignar a un consumidor, por lo que lo ideal es que haya tantas particiones como consumidores.

Los mensajes se asignan a las particiones en un topic mediante su clave: las claves de los mensajes se hashean y se dividen a partes iguales por las particiones. Las particiones mejoran el rendimiento y son una herramienta de escalabilidad de los productores y consumidores de Kafka. Al tener varias particiones, se pueden escribir varios mensajes de forma concurrente entre varios brokers.

Particiones en Apache Spark
Particiones en Apache Spark

El Consumer Group id es un parámetro clave en Kafka, ya que determina cómo se distribuyen los mensajes entre los consumidores. Si varios consumidores pertenecen al mismo grupo y comparten la misma suscripción al mismo topic, Kafka garantiza que cada mensaje se entregue a un solo consumidor en el grupo, lo que ayuda a distribuir la carga de procesamiento de mensajes entre los consumidores y asegura que cada mensaje se procese exactamente una vez.

La elección de la clave de partición adecuada en Kafka es una tarea crítica para el rendimiento y la escalabilidad del sistema. A continuación, se presentan algunos factores a considerar:

  1. Cardinalidad: la cardinalidad de la clave de partición es importante porque afecta la capacidad de distribuir uniformemente los mensajes a través de los diferentes consumidores. Si la clave de partición tiene baja cardinalidad, los mensajes se agruparán en un conjunto reducido de particiones, lo que puede generar un desequilibrio en la carga y un bajo rendimiento del sistema. Si la clave de partición tiene alta cardinalidad, los mensajes se distribuirán uniformemente en diferentes particiones.
  2. Ordenación: si el sistema requiere que los mensajes se procesen en un orden específico, la clave de partición debe elegirse de tal manera que garantice que se entreguen en el orden correcto.
  3. Distribución de carga: si el sistema tiene muchos consumidores , es importante elegir una clave de partición que permita una distribución uniforme de la carga a través de éstos. Si la carga está desequilibrada, algunos consumidores pueden estar sobrecargados, mientras que otros pueden estar inactivos.
  4. Reutilización de particiones: es posible que se puedan reutilizar particiones para diferentes flujos de mensajes. En este caso, la clave de partición debe elegirse de tal manera que permita la reutilización de particiones en diferentes flujos de mensajes.

Replicación (Replication)

Kafka utiliza la replicación de particiones para garantizar la tolerancia a fallos y la alta disponibilidad de los datos. Cada partición tiene una o varias réplicas (seguidores o followers), que son copias exactas de la partición líder en otros brokers de Kafka. Las réplicas garantizan que los datos de la partición estén disponibles incluso si el broker donde está ubicado el líder de la partición falla, en tal caso, uno de los seguidores en otro broker se convierte en el nuevo líder.

El factor de replicación se configura a nivel de topic y configura la cantidad de copias de una partición que se deben almacenar en diferentes brokers para garantizar la tolerancia a fallos y la disponibilidad de los datos en caso de que falle un broker. Por ejemplo, si un topic tiene un factor de replicación de 2, cada partición tendrá un líder y dos réplicas. Si el líder falla, una de las réplicas se elegirá como el nuevo líder y continuará gestionando las L/E en la partición.

Un factor de replicación 1 implica que no habrá copias de las particiones, es decir, solo habrá una instancia de cada partición en el broker líder que la aloja. Si falla, esa partición no estará disponible hasta que se restaure el broker o se cree un nuevo líder para esa partición en otro. Esta casuística puede provocar una interrupción de servicio y pérdida de datos en caso de que la partición no esté respaldada en otra ubicación. En general, se recomienda utilizar un factor de replicación mayor que 1 para garantizar la tolerancia a fallos y la disponibilidad de los datos en caso de que falle un broker.

Es importante tener en cuenta que un factor de replicación muy alto significa que se necesita más espacio de almacenamiento y ancho de banda de red para replicar los datos en los diferentes brokers, lo que también impacta en el rendimiento y velocidad.

Replicación en Apache Kafka
Replicación en Apache Kafka

Política de retención (retention policy)

La política de retención en Kafka determina cómo se manejan los registros en un topic después de cierto tiempo o tamaño. En otras palabras, es la configuración que determina cuánto tiempo se deben mantener los mensajes antes de ser eliminados.

Hay dos tipos de políticas de retención en Kafka: basadas en tiempo y basadas en tamaño.

  • Retención basada en tiempo: Los logs se eliminan después de un período de tiempo especificado, independientemente de su tamaño. Para habilitar esta política, se establece la propiedad de configuración log.retention.ms en el archivo de configuración de Kafka.
  • Retención basada en tamaño: Los registros se eliminan del topic después de que se haya alcanzado un tamaño máximo especificado. Para habilitar esta política, se establece la propiedad de configuración log.retention.bytes.

Otros parámetros que podemos configurar de la política de retención son:

  • retention.ms: este parámetro de configuración controla el tiempo máximo de retención de un log antes de que sea eliminado para liberar espacio. Si se indica a -1 no se aplica límite de tiempo (no se borran nunca los logs).
  • cleanup.policy: determina la política de retención de los logs. Por defecto es delete y borra todos los logs una vez superado el tiempo designado en la política de retención. Otra opción es configurarlo como compact para comprimirlos. La última opción combinar ambas separadas por coma delete, compact, que elimina los logs antiguos y comprime los logs retenidos.

Offsets

__consumer_offsets es un topic interno que se utiliza para almacenar el estado de compensación de los consumidores de Kafka. Este topic almacena los offsets de los mensajes usados por cada consumidor de Kafka en un grupo de consumidores determinado.

Los offsets son números que indican la posición del último mensaje que un consumidor ha leído en un topic determinado. Almacenar los offsets en __consumer_offsets permite que los consumidores de Kafka puedan reanudar la lectura de los mensajes desde el último punto en que se detuvieron, incluso después de un reinicio del consumidor o un fallo del mismo. También se utiliza para mantener la coordinación entre los miembros de un grupo de consumidores de Kafka. Cuando un consumidor se une o abandona un grupo, se actualiza la información en __consumer_offsets para asegurar que los offsets de los mensajes se mantengan consistentes entre todos los miembros del grupo.

Offset en Apache Kafka
Offset en Apache Kafka

auto.offset.reset es una configuración en Apache Kafka que especifica qué sucede cuando un consumidor se une a un grupo de consumidores para leer mensajes de un topic y no tiene un offset válido para comenzar a leerlos (porque es la primera vez que se conecta o porque ha perdido su posición de lectura). Puede tener uno de los siguientes valores:

  • earliest: Si no hay offset válido para el consumidor, el consumidor comenzará a leer mensajes desde el comienzo del topic.
  • latest: el consumidor comenzará a leer mensajes desde el final del topic.
  • none: se lanzará una excepción.

Es importante tener en cuenta que la configuración auto.offset.reset solo se aplica cuando un consumidor se une a un grupo de consumidores. Si un consumidor está leyendo desde un topic sin unirse a un grupo de consumidores, debe especificar el offset inicial de forma explícita.

Timestamps

Los timestamps se utilizan para registrar la hora en que se produce un evento en un topic de Kafka. Son importantes porque permiten ordenar los eventos en función del tiempo en que se produjeron, fundamental para procesar flujos de datos en tiempo real, ya que es necesario garantizar que los eventos se procesen en el orden correcto para generar resultados precisos, por ejemplo, si usamos funciones de ventana (windowing).

En Kafka, existen dos tipos de timestamps:

  • Timestamp del productor: se asigna al evento cuando se escribe en un topic. Por defecto, Kafka utiliza el timestamp del sistema en el momento en que se escribe el evento, pero se puede especificar uno personalizado si es necesario.
  • Timestamp del registro: se asigna al evento cuando se lee del topic. Por defecto, Kafka utiliza el del productor, pero se puede configurar para que utilice el del sistema en el momento en que se lee el evento.

Por último, existen tres conceptos de tiempo importantes:

  • Event time: momento en que un evento realmente ocurrió en el mundo real. Se puede incluir como un campo en el registro de Kafka o se puede inferir a partir de otros campos, como la marca de tiempo del sistema del dispositivo que generó el evento.
  • Processing time: momento en que un evento es procesado por una aplicación o un sistema de Kafka. Es decir, es el tiempo en que se procesó el evento. Es determinado por el sistema que está procesando el evento y se puede incluir como un campo en el registro de Kafka.
  • Ingestion time: momento en que un evento es recibido por un sistema de Kafka, es decir, es el tiempo en que el evento llega al broker y se escribe en un topic. Se puede incluir como un campo en el registro de Kafka o se puede inferir a partir de otros campos, como la marca de tiempo del sistema del broker que recibió el evento.


Schema Registry

El Schema Registry es un componente opcional de Kafka que se utiliza para gestionar los esquemas en formato JSON de los datos que se envían (productores) y reciben (consumidores). Usar esquemas en Kafka es como tener una plantilla para el tipo de datos. Esto ayuda a prevenir errores y garantiza que los datos sean consistentes y estén en el formato correcto. Además, permite agregar nuevos campos o cambiar los tipos de datos sin romper la compatibilidad con aplicaciones antiguas. Usar esquemas en Kafka hace que sea más fácil enviar y recibir datos de manera confiable entre diferentes tecnologías y aplicaciones.

Cuando un productor envía un mensaje a un topic, Kafka puede validar el mensaje utilizando el esquema asociado. Si el mensaje es válido según el esquema, se procesa normalmente. Si el mensaje no cumple con el esquema, se rechaza o envia una notificación de error. Cuando un consumidor lee un mensaje de un tema, Kafka puede recuperar el esquema asociado y utilizarlo para deserializar el mensaje. Esto ayuda a garantizar que el mensaje se procese correctamente y que los datos estén en el formato correcto.

El Schema Registry también puede manejar la evolución de los esquemas. Si se actualiza un esquema, Kafka puede almacenar la versión anterior y garantizar que los consumidores que utilizan el esquema anterior aún puedan procesar los mensajes. Esto permite una evolución más flexible de los datos a medida que cambian con el tiempo.

Avro

Avro es un formato de serialización de datos que se utiliza para estructurar y almacenar datos. La serialización es el proceso de convertir un objeto o estructura de datos en un formato que se pueda transmitir o almacenar, y que luego se pueda volver a convertir en el objeto original. Es similar a JSON en el sentido de que ambos son formatos de texto que se pueden leer y escribir en varios lenguajes de programación. Sin embargo, hay algunas diferencias clave entre los dos. En primer lugar, Avro es más compacto que JSON, lo que significa que utiliza menos espacio de almacenamiento y ancho de banda. Además, Avro admite la serialización de datos complejos, como registros y uniones, mientras que JSON solo admite tipos de datos simples, como cadenas y números.

Una de las mayores ventajas de Avro es que se puede utilizar en diferentes lenguajes de programación. Esto significa que un mensaje serializado en Avro en un lenguaje de programación se puede deserializar en otro lenguaje sin problemas de compatibilidad. En otras palabras, si estás usando Java para enviar un mensaje a través de Kafka y otro equipo está usando Python para recibir ese mensaje, ambos equipos pueden leer el mensaje en Avro sin ningún problema. Otra ventaja de Avro es que se integra bien con el Schema Registry para garantizar que todos los mensajes que se envían a través de Kafka se ajusten a un esquema específico.

Cuando se utiliza Avro para serializar datos, es posible que los esquemas cambien con el tiempo, lo que puede causar problemas si no se manejan correctamente. La evolución de Avro gestiona los cambios en los esquemas de datos. Básicamente, Avro tiene un sistema que permite manejar estos cambios de manera eficiente, sin afectar la capacidad de los clientes para leer registros más antiguos. Por ejemplo, cuando se serializa un registro en Avro este incluye su propio esquema. Si éste cambia en el futuro, Avro utilizará el esquema almacenado en el registro para interpretar los datos, lo que garantiza que los datos se lean correctamente.

Además, Avro permite agregar nuevos campos a los esquemas sin romper la compatibilidad hacia atrás. Esto significa que se pueden agregar nuevos campos a los registros sin afectar la capacidad de los clientes más antiguos para leer registros más antiguos. Sin embargo, si se eliminan campos, esto puede romper la compatibilidad hacia atrás y puede requerir una conversión de esquema.

  • La compatibilidad hacia atrás (backward compatibility) es la capacidad de nuevos consumidores de leer datos generados por productores antiguos. En otras palabras, si un consumidor más nuevo puede leer los datos que fueron producidos por un productor más antiguo sin problemas, entonces se dice que existe compatibilidad hacia atrás.
  • La compatibilidad hacia adelante (forward compatibility) se refiere a la capacidad de productores más nuevos para producir datos que pueden ser leídos por consumidores más antiguos. Es decir, si un productor más nuevo puede producir datos que un consumidor más antiguo puede leer sin problemas, entonces se dice que existe compatibilidad hacia adelante.
  • La compatibilidad mixta (mixed compatibility) se refiere a la capacidad de sistemas antiguos y nuevos para interactuar juntos de manera adecuada.

Kafka Connect

Kafka Connect es un framework que se utiliza para conectar Kafka con otros sistemas de forma que los usuarios pueden enviar y recibir datos desde y hacia Kafka utilizando conectores preconstruidos. Utiliza una arquitectura de plugin que permite a los usuarios conectar diferentes sistemas de origen y destino. Los plugins de origen son responsables de extraer los datos de la fuente y enviarlos a Kafka, mientras que los plugins de destino son responsables de recibir los datos y enviarlos al sistema de destino.

Proporciona una amplia gama de características que mejoran la usabilidad, escalabilidad y flexibilidad de la integración de datos:

  1. Conectores preconstruidos: tiene una variedad de conectores preconstruidos para integrar diferentes sistemas, se incluyen conectores para bases de datos, sistemas de archivos, servicios web y más.
  2. Configuración sencilla: utiliza una configuración simple en formato JSON para los conectores, lo que facilita el proceso y la personalización de los conectores.
  3. Escalabilidad: altamente escalable y puede manejar grandes volúmenes de datos de manera eficiente. Además, admite la configuración de múltiples conectores para la misma tarea, lo que permite una mayor escalabilidad.
  4. Tolerancia a fallos: está diseñado para ser tolerante a fallos. Si un proceso de Kafka Connect falla, otro proceso lo reemplaza automáticamente sin interrupción del flujo de datos.
  5. Integración con Kafka: está diseñado para integrarse sin problemas con Kafka y utiliza el mismo sistema de particiones y replicación que Kafka para garantizar la tolerancia a fallos y la alta disponibilidad de los datos.

Kafka Streams

Kafka Streams es una herramienta open source para procesar y analizar datos en tiempo real mientras almacenados en Apache Kafka, donde se consumen datos de un topic de Kafka y son enviados a otro topic para su procesamiento. Permite realizar realizar operaciones de transformación, filtrado, combinación (join) y hacer cálculos en los flujos de datos en tiempo real para detectar patrones y tendencias. Es muy útil para aplicaciones que requieren análisis en tiempo real de grandes volúmenes de datos, como el análisis de datos de IoT, la detección de fraude, migración de datos o monitoreo y alertas.

Imagina Kafka Streams es como un cocinero que recibe datos de uno o varios lugares (producers), los mezcla y los prepara de una manera especial para después enviarlos a otro lugar (consumer). En este caso, los datos son como los ingredientes, los topics de Kafka son los lugares donde se encuentran estos ingredientes, y Kafka Streams es el chef que usa los ingredientes, los mezcla y transforma en algo diferente y sabroso para luego enviarlo a otro lugar donde pueda ser utilizado.

En Kafka Streams un thread es como un trabajador que se encarga de procesar datos de Kafka en tiempo real. Cada thread puede procesar varias tareas llamadas tasks» que son como paquetes de trabajo que contienen datos de una parte del topic de Kafka. Para procesar grandes cantidades de datos en tiempo real, se dividen las tareas en subconjuntos más pequeños y se distribuyen en varios threads. De esta manera, se pueden procesar varias tareas al mismo tiempo y de manera eficiente.

El número de threads y tasks se puede ajustar según las necesidades de procesamiento de la aplicación y los recursos disponibles. De esta manera, se puede asegurar que Kafka Streams tenga suficientes workers para procesar grandes cantidades de datos en tiempo real de manera rápida y eficiente.

Kafka Streams
Kafka Streams

Algunas de las principales funciones de Kafka Streams incluyen:

  • Transformación de datos: transformar, filtrar y enriquecer flujos de datos en tiempo real.
  • Procesamiento de ventanas de tiempo: analizar datos en ventanas de tiempo definidas, lo que facilita la detección de patrones y tendencias en los flujos de datos.
  • Join: combinar flujos de datos de múltiples fuentes para obtener información más completa y precisa.
  • Agregación: realizar cálculos en los flujos de datos en tiempo real, como el recuento de eventos o la sumatoria de valores.

Stream

Un stream es una corriente continua de datos que se actualiza constantemente y cuyo tamaño es desconocido o ilimitado. Es como un río que fluye y nunca se detiene. En Kafka Streams, una stream puede estar compuesto por una o más particiones que son secuencias de datos ordenados, inmutables, y que se pueden reproducir y recuperar en caso de fallos.

Cada registro de datos en un Stream es un par de valores llamado (key-value) donde key es una etiqueta que se utiliza para identificar el registro y value es el valor real que contiene el registro.

Processor Topology

El processor topology es un grafo formado por Streams (nodos) conectados por Streams processors (aristas) o la State Store que representa las distintas etapas de transformación de los datos en tiempo real de un topic de Kafka.

Los stream processors procesan un flujo de datos de entrada y producen uno de salida. Pueden realizar diversas operaciones en los datos, como filtrar, transformar, combinar o agregar. Cada stream processor puede recibir y generr uno o varios streams, lo que le permite realizar múltiples operaciones de procesamiento en los datos. Hay dos tipos de stream processors:

  • Source processor: Es un stream processor especial que no tiene ningún nodo anterior, es el primer procesador que lee datos directamente de uno o varios topics.
  • Sink processor: por contra, este tipo de stream processor sería el último de la topología. Se encarga de enviar todos los datos recibidos de los stream processor previos a un topic de Kafka.
Kafka Stream - Processor topology
Kafka Stream – Processor topology

Para trabajar con una topología de procesamiento de datos podemos utilizar la Processor API para trabajar a bajo nivel con mayor flexibilidad con los streams e interactuar con las state stores. Sin embargo, la opción más recomendada es utilizar la API Kafka Streams DSL (Domain Specific Language), que construida sobre el Processor API permite trabajar con los streams de una forma mucho más sencilla en muy pocas líneas de código, aunque menos personalizada.

KStreams vs KTables

La principal diferencia entre un kStream y una KTable (o State) es que el primero representa un flujo continuo de datos en tiempo real, mientras que una tabla es una vista instantánea de los datos almacenados en un momento dado. En otras palabras, los streams permiten procesar datos a medida que fluyen, mientras que las tablas permiten realizar consultas y análisis sobre los datos almacenados en un momento específico en el tiempo. Los streams pueden ser infinitos, lo que significa que pueden procesar y transmitir datos continuamente, mientras que las tablas tienen un tamaño finito y se actualizan en tiempo real a medida que fluyen los datos.

Supongamos que tienes una aplicación de chat en tiempo real y deseas proporcionar una funcionalidad de búsqueda de mensajes. En primer lugar crearemos un stream que recibe todos los mensajes enviados en el chat y los procesa en tiempo real. Por otro lado, podemos crear una tabla que almacene los mensajes en un formato estructurado y les asigne una marca de tiempo. Esta tabla se actualiza automáticamente a medida que llegan nuevos mensajes, lo que te permite buscar mensajes antiguos con facilidad.

En este caso, el stream representa el flujo continuo de mensajes del chat en tiempo real, mientras que la tabla es una vista materializada de los datos almacenados en un momento dado (los mensajes antiguos). Ambos se utilizan juntos para proporcionar una funcionalidad de búsqueda de mensajes en la aplicación de chat.

Las KTables se almacenan en la state store, una caché en memoria que se utiliza para proporcionar un acceso más rápido y eficiente a los datos. Cada instancia de la aplicación de Kafka Streams tiene su propia copia del state store en la memoria, lo que permite que cada instancia procese su propia partición de los datos. También admite la persistencia de KTables en un almacenamiento externo como un sistema de archivos o una base de datos, con lo que incluso podríamos acceder a los datos aunque nuestra aplicación ya no esté corriendo.

Transformaciones Stateful y Stateless

Algunas aplicaciones de stream processing no requieren estado (stateless), lo que significa que el procesamiento de un mensaje es independiente del resto mensajes y no necesitan almacenar datos adicionales en Kafka. Un ejemplo sería cuando sólo necesitas transformar un único mensaje a la vez o filtrar mensajes en función de alguna condición.

Por otro lado, nuestra aplicación necesitará estado (stateful) cuando realiza operaciones como unir, agregar o dividir registros que necesitan información de los registros previos, de forma que los datos del Stream son almacenados en kafka.

Uniones (join)

Los joins de streams son muy similares a los joins de tablas en bases de datos relacionales. Sin embargo, hay algunas diferencias importantes debido a la naturaleza de los flujos de datos en Kafka. En Kafka Streams, los joins de streams se realizan mediante la unión de dos o más streams que se producen en tiempo real a través de las claves de los eventos.

Los joins en Kafka Streams utilizan un modelo de tiempo de evento, es decir, los eventos se procesan en orden de llegada, en lugar de utilizar un modelo de tiempo de procesamiento. Esto garantiza que los joins se realicen de manera precisa y coherente a medida que llegan los eventos. Los joins de streams también pueden ser realizados con ventanas de tiempo (windowed joins), lo que permite unir streams en ventanas de tiempo específicas en lugar de en todo el flujo. Esto es útil en situaciones donde se quiere hacer uniones con datos históricos o en eventos que ocurrieron en un período de tiempo específico.

En general, hay dos tipos de joins de streams en Kafka Streams:

  1. Stream-Stream Join: Este tipo de join se utiliza para combinar dos o más streams en uno nuevo. Para realizar esta unión los streams deben estar particionados por las mismas claves. Se puede utilizar una ventana de tiempo para limitar el alcance del join y unir solamente los eventos que caen dentro ésta. Puede ser útil para limitar la cantidad de eventos que se procesan y reducir el costo de procesamiento.
  2. Table-Stream Join: se utiliza para combinar un stream con una tabla utilizando una clave común. El resultado es un nuevo stream enriquecido con la información de la tabla. La ventana de tiempo se puede utilizar para definir el período de tiempo durante el cual se debe realizar el join entre la tabla y el stream. Esto permite realizar uniones en la tabla basados en un período de tiempo específico, en lugar de en la tabla completa.
  3. Table-table join: unión similar a la de dos tablas relacionales que se combinan cruzando por la misma clave. Este tipo de unión no admite ventanas de tiempo.
from pyspark.sql.functions import from_json, col, struct
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# Configuración de Kafka
kafka_bootstrap_servers = "localhost:9092"
stream1_topic = "stream1"
stream2_topic = "stream2"
stream1_starting_offsets = "earliest"
stream2_starting_offsets = "earliest"

# Definición del esquema de los datos
stream1_schema = StructType([
    StructField("id", IntegerType()),
    StructField("producto", StringType()),
    StructField("cantidad", IntegerType())
])

stream2_schema = StructType([
    StructField("producto", StringType()),
    StructField("precio", DoubleType())
])

# Creación de los DataFrames que representan los streams
stream1_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", stream1_topic) \
    .option("startingOffsets", stream1_starting_offsets) \
    .load() \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", stream1_schema).alias("data")) \
    .select("data.*")

stream2_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", stream2_topic) \
    .option("startingOffsets", stream2_starting_offsets) \
    .load() \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", stream2_schema).alias("data")) \
    .select("data.*")

Una vez que tenemos los DataFrames, podemos hacer el join utilizando la función join de PySpark:

# Realizamos el join entre los dos streams
joined_df = stream1_df.join(stream2_df, "producto")

# Escribimos el resultado en la consola para visualizarlo
query = joined_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

Funciones de ventana (windowing)

Las funciones de ventana (windowing) en Kafka Streams son una herramienta poderosa que permiten procesar flujos de datos en ventanas de tiempo discretas. Básicamente, una función de ventana divide el flujo de datos en segmentos de tiempo llamados ventanas, y aplica una operación de agregación (como sumar, contar, promediar) sobre los eventos que caen dentro de cada ventana.

Las funciones de ventana son muy útiles para realizar análisis en tiempo real de flujos de datos, como calcular promedios móviles, contar eventos por hora o sumar valores por día. Al procesar datos en ventanas de tiempo, se pueden obtener resultados más precisos y oportunos que al procesar datos en tiempo real sin una ventana de tiempo definida. Utilizan cinco unidades de tiempo: DD (día), HH (hora), MI (minuto), SS (segundo) y MS (milisegundo).

En Kafka Streams, existen varios tipos de funciones de ventana, entre las que se incluyen:

Tumbling windows
Son ventanas de tiempo fijas y no solapadas. Por ejemplo, una ventana de 10 segundos que inicia en los segundos 0, 10, 20, etc.

Tumbling window Kafka
Tumbling window Kafka
SELECT count(*) FROM demo GROUP BY ID, TUMBLINGWINDOW(ss, 10);


Hopping windows
Son ventanas de tiempo que se solapan. Por ejemplo, una ventana de 10 segundos que se mueve cada 5 segundos, lo que resulta en ventanas que cubren los segundos 0-10, 5-15, 10-20, etc. Un evento puede estar en dos ventanas de tiempo.

Hoping window Kafka
Hoping window Kafka
SELECT count(*) FROM demo GROUP BY ID, HOPPINGWINDOW(ss, 10, 5);


Sliding windows
Se desplazan a lo largo del tiempo. Por ejemplo, una ventana de 10 segundos que se desplaza cada segundo, lo que resulta en ventanas que cubren los segundos 0-10, 1-11, 2-12, etc. Un evento puede pertenecer a más de una ventana. Solo generan un output si se produce un evento y cada ventana tiene al menos uno. Os recomiendo esta lectura de Amazon AWS.

Sliding window Kafka
Sliding window Kafka
SELECT count(*) FROM demo GROUP BY ID, SLIDINGWINDOW(mi, 1);


Session windows
Se definen en función de una brecha de tiempo entre eventos. Si no se reciben eventos durante un período de tiempo determinado, se cierra la ventana actual y se abre una nueva ventana. Una ventana de sesión comienza cuando ocurre el primer evento. Si otro evento ocurre dentro del timeout especificado desde la ingesta del último evento, la ventana se amplía para incluirlo.

Session window Kafka
Session window Kafka
SELECT count(*) FROM demo GROUP BY ID, SESSIONWINDOW(mi, 2, 1);

Cada tipo de ventana tiene sus propias características y se utiliza en diferentes situaciones. Además, en Kafka Streams se pueden configurar ventanas con diferentes duraciones y períodos de desplazamiento según las necesidades del caso de uso. Os recomiendo esta lectura de la certificación Data Engineer Associate de Azure.

Ejecutar Spark y Cluster Kafka en docker (PySpark Streaming)

Vamos a seguir las instrucciones del bootcamp para levantar un contenedor Docker con Kafka.

1. Si aun no lo has hecho, clona la repo del bootcamp en local.

2. Arrancamos Docker desktop

3. Ejecutamos el shell week_6_streaming_processing\docker\spark\build.sh para descargar las imágenes de docker que vamos a necesitar para construir el contendor de Spark (spark-master, spark-worker y jupyterlab). Tardará un ratito ;).

bash build.sh


4. Creamos la red para que los contenedores Docker de Kafka y Spark que vamos a crear tengan conectividad entre sí:

docker network  create kafka-spark-network

docker volume create --name=hadoop-distributed-file-system

5. Arrancamos los contenedores de Kafka y Spark (están ubicados en las carpetas con este nombre de la repo que hemos clonado en el primer paso).

docker compose up -d

6. Comprobamos si se han levantado todos los servicios accediendo a los frontales web:

7. Para nuestra primera demo de PySpark y Kafka vamos a usar la repo del bootcamp, si la clonas, realmente usaremos los ficheros ubicados en los directorios streams-example y resources. Creamos un nuevo entorno de Python y nos ayudamos de un requirements.txt para instalar las librerías necesarias:

Creamos el nuevo entorno de Python:

virtualenv venv-kafka
virtualenv/scripts/activate

El fichero requirements.txt lo puedes copiar en la carpeta raíz del proyecto:

kafka-python
confluent_kafka
requests
avro
pyspark

Instalamos todas las librerías:

pip install -r requirements.txt

Iniciamos las pruebas lanzando el script del productor producer.py:

python producer.py

Y observamos como se empiezan a generar topics!!

Kafka producer en Python

Si lanzamos el script del consumer.py observamos cómo se cargan:

python consumer.py
Kafka consumer en Python

Podemos comprobar desde el Confluent Control Center (http://localhost:9021/)el comportamiento de nuestro topic:

Confluent Control Center

Producer.py

Este script se encarga de conectarse al fichero de datos de origen (rides.csv) con los datos y alimentar el topic con mensajes:

import csv
from time import sleep
from typing import Dict
from kafka import KafkaProducer

from settings import BOOTSTRAP_SERVERS, INPUT_DATA_PATH, PRODUCE_TOPIC_RIDES_CSV


def delivery_report(err, msg):
    if err is not None:
        print("Delivery failed for record {}: {}".format(msg.key(), err))
        return
    print('Record {} successfully produced to {} [{}] at offset {}'.format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))


class RideCSVProducer:
    def __init__(self, props: Dict):
        self.producer = KafkaProducer(**props)
        # self.producer = Producer(producer_props)

    @staticmethod
    def read_records(resource_path: str):
        records, ride_keys = [], []
        i = 0
        with open(resource_path, 'r') as f:
            reader = csv.reader(f)
            header = next(reader)  # skip the header
            for row in reader:
                # vendor_id, passenger_count, trip_distance, payment_type, total_amount
                records.append(f'{row[0]}, {row[1]}, {row[2]}, {row[3]}, {row[4]}, {row[9]}, {row[16]}')
                ride_keys.append(str(row[0]))
                i += 1
                if i == 5:
                    break
        return zip(ride_keys, records)

    def publish(self, topic: str, records: [str, str]):
        for key_value in records:
            key, value = key_value
            try:
                self.producer.send(topic=topic, key=key, value=value)
                print(f"Producing record for <key: {key}, value:{value}>")
            except KeyboardInterrupt:
                break
            except Exception as e:
                print(f"Exception while producing record - {value}: {e}")

        self.producer.flush()
        sleep(1)


if __name__ == "__main__":
    config = {
        'bootstrap_servers': [BOOTSTRAP_SERVERS],
        'key_serializer': lambda x: x.encode('utf-8'),
        'value_serializer': lambda x: x.encode('utf-8')
    }
    producer = RideCSVProducer(props=config)
    ride_records = producer.read_records(resource_path=INPUT_DATA_PATH)
    print(ride_records)
    producer.publish(topic=PRODUCE_TOPIC_RIDES_CSV, records=ride_records)

En este caso, se ha generado un settings.py para guardar toda la configuración:

import pyspark.sql.types as T

INPUT_DATA_PATH = 'resources/fhv_tripdata_2019-01.csv'
BOOTSTRAP_SERVERS = 'localhost:9092'

TOPIC_WINDOWED_VENDOR_ID_COUNT = 'vendor_counts_windowed'

PRODUCE_TOPIC_RIDES_CSV = CONSUME_TOPIC_RIDES_CSV = 'fhv_csv'

RIDE_SCHEMA = T.StructType(
    [T.StructField("dispatching_base_num", T.StringType()),
     T.StructField('pickup_datetime', T.TimestampType()),
     T.StructField('dropOff_datetime', T.TimestampType()),
     T.StructField("PUlocationID", T.IntegerType()),
     T.StructField("DOlocationID", T.FloatType()),
     T.StructField("SR_Flag", T.IntegerType()),
     T.StructField("Affiliated_base_number", T.FloatType()),
     ])

Consumer.py

Por otro lado, desde este script vamos a ir recibiendo los mensajes del topic:

import argparse
from typing import Dict, List
from kafka import KafkaConsumer

from settings import BOOTSTRAP_SERVERS, CONSUME_TOPIC_RIDES_CSV


class RideCSVConsumer:
    def __init__(self, props: Dict):
        self.consumer = KafkaConsumer(**props)

    def consume_from_kafka(self, topics: List[str]):
        self.consumer.subscribe(topics=topics)
        print('Consuming from Kafka started')
        print('Available topics to consume: ', self.consumer.subscription())
        while True:
            try:
                # SIGINT can't be handled when polling, limit timeout to 1 second.
                msg = self.consumer.poll(1.0)
                if msg is None or msg == {}:
                    continue
                for msg_key, msg_values in msg.items():
                    for msg_val in msg_values:
                        print(f'Key:{msg_val.key}-type({type(msg_val.key)}), '
                              f'Value:{msg_val.value}-type({type(msg_val.value)})')
            except KeyboardInterrupt:
                break

        self.consumer.close()


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Kafka Consumer')
    parser.add_argument('--topic', type=str, default=CONSUME_TOPIC_RIDES_CSV)
    args = parser.parse_args()

    topic = args.topic
    config = {
        'bootstrap_servers': [BOOTSTRAP_SERVERS],
        'auto_offset_reset': 'earliest',
        'enable_auto_commit': True,
        'key_deserializer': lambda key: int(key.decode('utf-8')),
        'value_deserializer': lambda value: value.decode('utf-8'),
        'group_id': 'consumer.group.id.csv-example.1',
    }
    csv_consumer = RideCSVConsumer(props=config)
    csv_consumer.consume_from_kafka(topics=[topic])

PySpark Structured Streaming

La última sección de la semana es sobre PySpark structured streaming, módulo de la biblioteca PySpark que permite el procesamiento en tiempo real de grandes conjuntos de datos estructurados. Utiliza una estructura de datos en forma de tabla, lo que significa que puedes procesar y analizar los datos con mucha más facilidad y profundidad. Además, es muy resistente a fallos y escalable, por lo que puede manejar grandes volúmenes de datos sin problemas.

Ha sido la parte en la que más he sufrido para poder lanzar el script streaming.py con spark-submit. Tras reinstalar Python y reconfigurar las variables de entorno, he logrado ejecutarlo en Windows 10 con Powershell. A continuación indico las instrucciones:

1. Revisa las variables de entorno en Windows, además de tenerlas declaradas de forma independiente, es necesario añadirlas al PATH:

  • HADOOP_HOME=C:\tools\hadoop-3.2.0
  • JAVA_HOME=C:\Program Files\Java\jdk-11.0.17
  • SPARK_HOME=C:\tools\spark-3.3.2-bin-hadoop3
  • PYSPARK_PYTHON=python

2. Arranca los contenedores de Docker que hemos comentado previametne con el clúster de Kafka y el Standalone Spark (ubicados en las carpetas correspondietnes).

docker compose up -d

3. Ahora sí, dirígete a la carpeta donde está ubicado el script streaming.py (\week_6_streaming_processing\streams-example\pyspark) y ejecuta el spark-submit:

spark-submit --master spark://localhost:7077 --num-executors 2 --executor-memory 512M --executor-cores 1  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1,org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.1 streaming.py
PySpark Structured Streaming
PySpark Structured Streaming
WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

Tras pelearme mucho con este error finalmente lo he solucionado añadiendo un nuevo worker al clúster de Spark. Como no tengo claro por qué con los dos workers que creamos en el contenedor no es suficiente, estoy pendiente de la respuesta en Slack. En este artículo indican dos opciones para resolverlo, pero ninguna me ha funcionado. El error indica que no hay memoria o cores suficientes ejecutar un nuevo job en el clúster, aunque si revisamos el Spark UI vemos que sí.

Solución

Crear un nuevo worker desde línea de comandos. Nos ubicamos en la carpeta donde hemos descomprimido spark (C:\tools\spark-3.3.2-bin-hadoop3) y ejecutamos:

./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077

Vemos en Spark UI cómo se ha creado el nuevo worker:

Spark Worker
Spark Worker

Y como devuelve los resultados de las dos operaciones de streaming:

PySpark structured streaming
PySpark structured streaming

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

What is 6 + 15 ?
Please leave these two fields as-is: