Data Engineering Zoomcamp – Semana 2

Continuando con el bootcamp para Data Engineers, en esta segunda semana hemos trabajado en la orquestación del flujo de datos con Prefect (similar a Apache Airflow pero basado en Python). Hemos creado dos ETLs en Python, la primera exporta un CSV de un repositorio de Github, lo transforma/limpia y genera un parquet que posteriormente lo ingesta en un Cloud Storage de Google Cloud Platform (GCP); la segunda ETL ingesta el parquet desde el Cloud Storage en una base de datos BigQuery.

Última actualización 04/02/2023

Prerequisitos

  • Es necesario descargar los módulos de Prefect en nuestro environment de Python. En los videos crean un nuevo entorno con Conda y cargan un requirements.txt con todas las librerías necesarias. En mi caso, he creado un nuevo entorno directamente con venv y descargado las librerías correspondientes con los siguientes comandos:
python -m venv pythonPre
pythonPre\Scripts\activate
python -m pip install -r C:\Users\Marcos\DataEngineeringBootcamp\Week2\requirements.txt

El fichero requirements.txt con todos los módulos necesarios es el siguiente:

pandas==1.5.2
prefect==2.7.7
prefect-sqlalchemy==0.2.2
prefect-gcp[cloud_storage]==0.2.3
protobuf==4.21.11
pyarrow==10.0.1
pandas-gbq==0.19.0
  • [Opcional] Configurar Visual Studio Code para ejecutar trabajar con la terminal y dividirla en dos ventanas (una para la terminal del sistema y la otra para la de python). Este paso es ajeno al curso, pero me ha gustado como trabajan en los videos y he querido replicarlo. Acordaros que si queréis ejecutar desde Code debéis cambiar el intérprete de Python por el nuevo. El problema que me he encontrado es al intentar activar el nuevo entorno de Python que acabo de crear:

No se puede cargar el archivo C:\Users\Marcos\pythonPre\Scripts\Activate.ps1 porque la ejecución de scripts está deshabilitada en
este sistema. Para obtener más información, consulta el tema about_Execution_Policies en https:/go.microsoft.com/fwlink/?LinkID=135170.

Es necesario definir la política de ejecución de scripts de Windows. Ejecutamos Power Shell como administrador y lanzamos este comando:

Set-ExecutionPolicy RemoteSigned -Scope CurrentUser 

Si ejecutamos este nuevo comando, veremos que ya existe política para RemoteSigned y podemos intentar activar de nuevo el entorno de Python.

Get-ExecutionPolicy -List 
Visual Studio Code con doble terminal
Visual Studio Code con doble terminal

Prefect

Se trata de una herramienta de código abierto de orquestación de flujos basado en Python. Nos permite crear flujos con tareas y subtareas para, en nuestro caso, construir una ETL. Dispone de un GUI desde el que podremos monitorizar y programar la ejecución de los workflows. Además, incluye una especie de security vault para almacenar conexiones de orígenes/destino, contando con conectores para los servicios de almacenamiento más utilizados de nube pública. Fundamentalmente, consta de los siguientes elementos: Flow, Task y Block que funcionan como decorators de Python.

Flow

Colección de Tasks organizadas que vamos a programar y ejecutar. Se consideran DAG (directed acyclic graphs), que es un tipo de estructura matemática que cumple las siguientes premisas:

  • Graph: estructura de datos formada por nodos y sus conexiones. Un Flow sería un graph y las Task serían los nodos.
  • Directed: siempre van en una dirección
  • Acyclic: no son cíclicos, nunca vuelve a pasar por una task ya ejecutada.
@flow("Ingesta de datos de taxis amarillos")
    def my_flow():

Task

Representa una acción dentro de un Flow. Por ejemplo, para una ETL como mínimo tendríamos tres tasks: extract, transformation y load. Se pueden configurar para que se reejecuten en caso de fallo (ideal para la ingesta). En realidad, una task es un método Python que puede realizar cualquier tipo de acción. Se puede configurar con los siguientes parámetros:

  • name: Se le asigna un nombre. name = "Ingest data"
  • log_prints: admite True o False; permite pintar las trazas por consola. log_prints = true
  • retries: configurar número de reintentos si falla. retries = 3
  • tags: podemos añadir etiquetas para catalogar las tasks. tags = ["Ingest","ETL"]
  • cache_key: Se especifica una función que debemos importar como librería imported from prefect.tasks. Almacena el resultado en la caché en caso de error, de forma que no necesita re-ejecutarse. cache_key_fn=task_input_hash
  • cache_expiration: configuración del tiempo durante el que se va a guardar la caché. cache_expiration=timedelta(days=1)
@task(retries=3, log_prints=True)
def ingest_data(source):
    return

Blocks

Almacén de conexiones con recursos externos (GCS Bucket, Azure, Docker, Databricks, etc) que pueden reutilizarse con distintas tasks. Se pueden crear por línea de comandos (CLI) o desde el GUI.

gcs_block = GcsBucket.load("zoom-gcs")

Construir workflow con Prefect

Configuración de Block (conector a Cloud Storage Bucket de la GCP)

Lo primero que tenemos que hacer es arrancar Prefect utilizando el siguiente comando dentro de la terminal del nuevo entorno de Python que hemos creado:

prefect orion start
Output al arrancar Prefect

Vamos a registrar el conector de Prefect para Google Cloud Platform. Desde línea de comandos ejecutamos la siguiente sentencia que disponibiliza este tipo de conector para poder utilizarlo en nuestros flows.

prefect block register -m prefect_gcp

Para crear el Block que almacena la conexión a nuestro Cloud Storage de GCP accedemos al GUI a través de la URL que nos devuelve el comando start, que tendría que ser algo como http://127.0.0.0:4200. Seguimos los siguientes pasos:

  1. En la GCP crear una Account Service específica que tenga acceso al Cloud Storage Bucket y otorgarle permisos Storage Admin.
  2. Generar la key (en formato JSON)
  3. Abrimos el GUI de Prefect http://127.0.0.0:4200 y nos dirigimos a Blocks, desde donde buscaremos GCS Bucket. Lo configuramos de la siguiente forma:
    • Block name: el que queramos, por ejemplo zoom-gcs
    • Bucket: nombre del Cloud Storage que hemos creado en nuestro proyecto.
    • GCP Credentials: crearemos un credencial nuevo copiando la Key del service account que acabamos de generar. Sería suficiente con copiar el contenido del JSON y asignar un nombre: zoom-gcp-creds
Autorización de conexión a servicios de Google Cloud Platform con la key del Account Service
Autorización de conexión a servicios de Google Cloud Platform con la key del Account Service
Creación de nuevo Block en Prefect
Creación de nuevo Block en Prefect utilizando los credenciales creados (zoom-gcp-creds)

Este proceso genera dos bloques, uno para los credenciales y otro para la conexión al Cloud Storage que utiliza dicha autorización:

Bloques creados en Prefect
Bloques creados en Prefect

Pipelines en Python

Vamos a crear los script Python que se encarguen de hacer los tres pasos de la ETL (Extract, transform y load) tanto para descargar el CSV del repositorio a local, como para ingestarlo en nuestra base de datos BigQuery. Como hemos comentado, son métodos clásicos Python a los que se les añade un @flow y @task para que cuando se ejecute el script, se interpreten como un workflow de Prefect. Recordad añadir en el import todas las librerías necesarias, especialmente las de Prefect.

etl_web_to_gcs.py | Descargar CSV, limpieza, exportar a Parquet y subir a Cloud Storage

Vamos a llamar a este pipeline etl_web_to_gcs.py. En el ejemplo del instructor se incluyen dos métodos de extract:

  • Extract_data_local: que descarga el CSV a un directorio local.
  • Extract_data_gcs: que carga el CSV transformado a parquet en el Cloud Storage.

Como hemos comentado, el método precedido por @flow() será el orquestador que marque las dependencias de las tasks.

from pathlib import Path
import pandas as pd
from prefect import flow, task
from prefect_gcp.cloud_storage import GcsBucket


@task(retries=3)
def ingest_data(dataset_url):
    df = pd.read_csv(dataset_url)

    return df

@task(log_prints=True)
def transform_data(df):
    df['tpep_pickup_datetime']  = pd.to_datetime(df['tpep_pickup_datetime'])
    df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

    return df

@task(log_prints=True)
def export_data_local(df, color, dataset_file):
    path = Path(f"data/{color}/{dataset_file}.parquet")
    df.to_parquet(path, compression="gzip")
    return path


@task(log_prints=True)
def export_data_gcs(path):
    gcs_block = GcsBucket.load("zoom-gcs")
    gcs_block.upload_from_path(from_path=path, to_path=path)
    
    return path


@flow("Ingest file to GCS")
def etl_web_to_gcs():
    color = "yellow"
    year  = 2021
    month = 1
    dataset_file = f"{color}_tripdata_{year}-{month:02}"
    dataset_url  = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{color}/{dataset_file}.csv.gz"

    df = ingest_data(dataset_url)
    df_clean = transform_data(df)
    path = export_data_local(df_clean, color, dataset_file)
    export_data_gcs(path)

if __name__ == "__main__":
    etl_web_to_gcs()

Es importante revisar la línea 29 del código gcs_block = GcsBucket.load("zoom-gcs"), ya que debemos especificar el nombre del Block que hemos configurado previamente.

La primera vez que ejecuté el pipeline falló en el método export_data_local porque la ruta donde descarga el CSV a local debe existir. Es necesario crearla ./data/yellow.

Error ejecución pipeline Python con Prefect
Error ejecución pipeline Python con Prefect

Una vez corregidas las incidencias, ejecutamos el pipeline con éxito y vemos la salida:

Output ejecución exitosa pipeline Python con Prefect
Output ejecución exitosa pipeline Python con Prefect

También podemos comprobarlo desde el GUI de Prefect (http://127.0.0.0:4200):

Ejecuciones de flows en Prefect
Ejecuciones de flows en Prefect

Si accedemos al Cloud Storage comprobamos que se ha cargado correctamente el fichero parquet.

Parquet almacenado en Cloud Storage
Parquet almacenado en Cloud Storage

etl_gcs_to_bq.py | Ingestar parquet en BigQuery desde Cloud Storage

Siguiendo los videos del instructor, este pipeline se denomina etl_gcs_to_bq.py y tiene el siguiente código fuente.

from pathlib import Path
import pandas as pd
from prefect import flow, task
from prefect_gcp.cloud_storage import GcsBucket
from prefect_gcp import GcpCredentials

@task(retries=3)
def extract_from_gcs(color, year, month) -> Path:
    """Data Extract from Google Cloud Storage"""
    gcs_path = f"{color}/{color}_tripdata_{year}-{month:02}.parquet"
    gcs_block = GcsBucket.load("zoom-gcs")
    gcs_block.get_directory(from_path=gcs_path, local_path=f"data/")
    print(gcs_path)
    return Path(f"data/{gcs_path}")

@task()
def transform(path) -> pd.DataFrame:
    """Data Cleaning"""
    df = pd.read_parquet(path)
    print(f"pre: missing passenger count: {df['passenger_count'].isna().sum()}")
    df['passenger_count'].fillna(0, inplace=True)
    print(f"post: missing passenger count: {df['passenger_count'].isna().sum()}")
    return df

@task()
def write_bq(df) -> None:
    """Write DataFrame to BigQuery"""

    gcp_credentials_block = GcpCredentials.load("zoom-gcp-creds")

    df.to_gbq(
        destination_table="dezoomcamp.rides",
        project_id="digital-aloe-375022",
        credentials=gcp_credentials_block.get_credentials_from_service_account(),
        chunksize=500_000, 
        if_exists="append"
    )

@flow()
def etl_gcs_to_bq():
    color = "yellow"
    year  = 2021
    month = 1

    path = extract_from_gcs(color, year, month)
    df = transform(path)
    write_bq(df)


if __name__ == "__main__":
    etl_gcs_to_bq()

Antes de poder ejecutarlo y realizar la ingesta de datos en GCP, debemos crear una tabla en BigQuery con el esquema del fichero parquet (que lo incorpora además de los propios datos). Para ello debemos acceder al portal de Google Cloud Platform y:

  1. Seleccionar nuestro proyecto
  2. En el blade lateral pulsar sobre BigQuery
  3. Pulsar en el botón + Add Data
  4. Seleccionamos como origen el Google Cloud Storage
  5. En la sección de Source debemos seleccionar el fichero parquet que hemos subido previamente. Dejamos el formato Parquet
  6. En el Destino elegimos un nombre para el dataset, por ejemplo: dezoomcamp
  7. Definimos el nombre de la tabla: rides
  8. Si accedemos desde el explorador de BigQuery podremos ver que se ha cargado correctamente. pero lo que queremos es cargarla desde nuestro workflow Prefect, así que debemos trunctarla.
TRUNCATE TABLE dezoomcamp.rides

Cuando tenemos todo listo, ¡ejecutamos!

python etl_gcs_to_bq.py
ERROR   | Flow run 'xxxxxx' - Finished in state Failed('Flow run encountered an exception. google.api_core.exceptions.Forbidden: 403 GET: Access Denied: Table xxxxx: Permission bigquery.tables.get denied on table xxxxxx (or it may not exist).\n')

La primera arroja este error. El motivo es que estoy utilizando el Block que creamos en el paso anterior para conectarnos al Cloud Storage Bucket y cuyo Service Account solo tiene permisos de almacenamiento. Debemos añadirle permisos como administrador BigQuery para que tenga acceso a la base de datos.

IAM de Google Cloud Platform

Ejecutamos de nuevo el pipeline y observamos que funciona correctamente:

Output ejecución pipeline para ingestar parquet desde Cloud Storage en BigQuery

Y consultamos la tabla en la base de datos de BigQuery:

Comprobamos tabla en base de datos BigQuery

Parametrización de pipelines

Con el fin de automatizar las ETLs vamos a parametrizar los pipelines que acabamos de construir. Siguiendo los videos creamos un nuevo fichero parameterized_flows.py donde copiamos el código de los dos pipelines. Lo que vamos a hacer es sustituir los valores puestos a mano year, month y color por parámetros que se reciben desde un flow padre, encargado de invocar a los dos flows etl_web_to_gcs y etl_gcs_to_bq.

from pathlib import Path
import pandas as pd
from prefect import flow, task
from prefect_gcp.cloud_storage import GcsBucket
from prefect.tasks import task_input_hash
from datetime import timedelta
from prefect_gcp import GcpCredentials


##########################################################
######## etl_web_to_gcs ########
##########################################################
@task(retries=3)
def ingest_data(dataset_url):
    df = pd.read_csv(dataset_url)

    return df

@task(log_prints=True)
def transform_data(df):
    df['tpep_pickup_datetime']  = pd.to_datetime(df['tpep_pickup_datetime'])
    df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

    return df

@task(log_prints=True)
def export_data_local(df, color, dataset_file):
    path = Path(f"data/{color}/{dataset_file}.parquet")
    df.to_parquet(path, compression="gzip")
    return path


@task(log_prints=True)
def export_data_gcs(path):
        gcs_block = GcsBucket.load("zoom-gcs")
        gcs_block.upload_from_path(from_path=path, to_path=path)
  


@flow()
def etl_web_to_gcs(year, month, color):
    dataset_file = f"{color}_tripdata_{year}-{month:02}"
    dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{color}/{dataset_file}.csv.gz"

    df = ingest_data(dataset_url)
    df_clean = transform_data(df)
    path = export_data_local(df_clean, color, dataset_file)
    export_data_gcs(path)


##########################################################
######## etl_gcs_to_bq ########
##########################################################

@task(retries=3)
def extract_from_gcs(color, year, month) -> Path:
    """Data Extract from Google Cloud Storage"""
    gcs_path = f"{color}/{color}_tripdata_{year}-{month:02}.parquet"
    gcs_block = GcsBucket.load("zoom-gcs")
    gcs_block.get_directory(from_path=gcs_path, local_path=f"data/")
    print(gcs_path)
    return Path(f"data/{gcs_path}")

@task()
def transform(path) -> pd.DataFrame:
    """Data Cleaning"""
    df = pd.read_parquet(path)
    print(f"pre: missing passenger count: {df['passenger_count'].isna().sum()}")
    df['passenger_count'].fillna(0, inplace=True)
    print(f"post: missing passenger count: {df['passenger_count'].isna().sum()}")
    return df

@task()
def write_bq(df) -> None:
    """Write DataFrame to BigQuery"""

    gcp_credentials_block = GcpCredentials.load("zoom-gcp-creds")

    df.to_gbq(
        destination_table="dezoomcamp.rides",
        project_id="digital-aloe-375022",
        credentials=gcp_credentials_block.get_credentials_from_service_account(),
        chunksize=500_000, 
        if_exists="append"
    )

@flow()
def etl_gcs_to_bq(year, month, color):
    path = extract_from_gcs(color, year, month)
    df = transform(path)
    write_bq(df)


##########################################################
######## parent flow ########
##########################################################
@flow()
def etl_parent_flow(months: list[int] = [1,2], year: int = 2021, color: str = "yellow"):
    for month in months:
        etl_web_to_gcs(year, month, color)
        etl_gcs_to_bq(year, month, color)
    

if __name__ == "__main__":
    color="yellow"
    months=[1,2,3]
    year=2021
    etl_parent_flow(months, year, color)

Prefect Deployments

Un despliegue en Prefect es un artefacto de lado de servidor que encapsula un flujo y permite programarlo o lanzarlo vía API. Un flujo puede pertenecer a varios desployments. Podríamos decir que son contenedores con metadata que tienen todo lo que necesita un flow para ser programado. Se pueden crear por línea de comandos o por Python.

Flujo de un Prefect Deployment
Flujo de un Prefect Deployment

Crear Prefect Deployment por python

Una forma rápida de crear un deployment es construyendo un script en python donde se especifiquen las propiedades. Simplemente instanciamos la clase Prefect Deployment para crear un nuevo deployment pasándole por parámetro el flujo que debe ejecutar, el nombre, el storage o la infraestructure (si queremos ejecutarlo sobre una imagen docker) y el entrypoint si deseamos especificar la ubicación del fichero donde está el flujo.

En el siguiente ejemplo creamos un deployment para ejecutar un flow cuyo fichero está almacenado en una repo de Github:

from prefect.deployments import Deployment
from etl_web_to_gcs import etl_web_to_gcs
from prefect.filesystems import GitHub 

storage = GitHub.load("zoomgithub")

deployment = Deployment.build_from_flow(
     flow=etl_web_to_gcs,
     name="github-exercise",
     storage=storage,
     entrypoint="week_2_workflow_orchestration/2_homework/04_question/etl_web_to_gcs.py:etl_web_to_gcs")

if __name__ == "__main__":
    deployment.apply()

Para crear el deployment simplemente ejecutamos el script python:

python myDeployment.py

Crear Prefect Deployment por CLI (línea de comandos)

Si creamos un deployment por línea de comandos debemos usar deployment build y pasándole por parámetro el fichero del pipeline (parameterized_flows.py), separado por dos puntos el flujo dentro del pipeline que deseamos programar (etl_parent_flow) y por último le asignamos un nombre con -n.

prefect deployment build parameterized_flows.py:etl_parent_flow -n "Parameterized ETL"
Output de prefect deployment build

Cuando ejecutamos con éxito el comando se crea un fichero etl_parent_flow-deployment.yaml con toda la metadata necesaria para programar el flow. Podemos modificarlo y adaptarlo si fuera necesario. En nuestro caso, queremos programarlo para que ejecute los meses 1, 2 y 3 de 2021 para el color yellow, por lo que modificamos la línea 10 y sustituimos por parameters: { "color": "yellow", "months" :[1, 2, 3], "year": 2021}

###
### A complete description of a Prefect Deployment for flow 'etl-parent-flow'
###
name: Parameterized ETL
description: null
version: 069b00845abcb3eeeff506a311413184
# The work queue that will handle this deployment's runs
work_queue_name: default
tags: []
parameters: { "color": "yellow", "months" :[1, 2, 3], "year": 2021}
schedule: null
infra_overrides: {}
infrastructure:
  type: process
  env: {}
  labels: {}
  name: null
  command: null
  stream_output: true
  working_dir: null
  block_type_slug: process
  _block_type_slug: process

###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: etl-parent-flow
manifest_path: null
storage: null
path: C:\Users\Marcos\Downloads\DataEngineeringBootcamp\week2
entrypoint: parameterized_flows.py:etl_parent_flow
parameter_openapi_schema:
  title: Parameters
  type: object
  properties:
    months:
      title: months
      default:
      - 1
      - 2
      position: 0
      type: array
      items:
        type: integer
    year:
      title: year
      default: 2021
      position: 1
      type: integer
    color:
      title: color
      default: yellow
      position: 2
      type: string
  required: null
  definitions: null

Una vez modificado, debemos ejecutar el comando prefect deployment apply

prefect deployment apply etl_parent_flow-deployment.yaml
Output prefect deployment apply

Nos dirigimos al GUI de Prefect (http://127.0.0.1:4200/deployments) y en la sección de Deployments accedemos al que acabamos de crear. A la derecha pulsamos sobre el botón Run y en el popup seleccionarmos Quick Run (la opción de Custom Run) permite especificar nuevos valores para los parámetros.

Prefect Deployments

Cuando ejecutamos el Deployments se crea un Work Queue que será el demonio encargado de ejecutarlo. Pero se levanta dormido, para arrancarlo es necesario ejecutar el comando:

prefect agent start --work-queue "default" 

El Work Queue que se genera tiene el nombre default y si accedemos podemos copiar el comando para arrancarlo.

Work Queues de Prefect

Una vez arrancamos el agente por línea de comandos, observamos que empieza a ejecutar el Deployment.

Output prefect agent start

Docker + Prefect

Terminamos con la contenerización de Prefect en Docker. Vamos a crear una imagen y publicarla en Docker Hub (tendrás que crearte una cuenta si aun no la tienes). Construimos el fichero dockerfile donde indicamos la imagen de prefect que usaremos, instalamos la lista de módulos de requeriments.txt y por último copiamos los directorios de flows y data.

En primer lugar creamos el fichero docker-requirementes.txt más adelgazado:

pandas==1.5.2
prefect-gcp[cloud_storage]==0.2.3
protobuf==4.21.11
pyarrow==10.0.1
pandas-gbq==0.18.1

Y posteriormente pasamos a crear el dockerfile:

FROM prefecthq/prefect:2.7.7-python3.11

COPY docker-requirements.txt .

RUN pip install -r docker-requirements.txt --trusted-host pypi.python.org --no-cache-dir

COPY flows /opt/prefect/flows
RUN mkdir -p /opt/prefect/data/yellow

Una vez guardado, nos ubicamos en el mismo directorio y ejecutamos el siguiente comando. Donde pone n4gash debes cambiarlo por tu cuenta de Docker Hub.

=> ERROR [5/5] RUN mkdir /opt/prefect/data/yellow 0.3s ------ > [5/5] RUN mkdir /opt/prefect/data/yellow: #9 0.299 mkdir: cannot create directory ‘/opt/prefect/data/yellow’: No such file or directory ------ executor failed running [/bin/sh -c mkdir /opt/prefect/data/yellow]: exit code: 1'

Si cuando intentamos montar la imagen de docker nos arroja este error, debéis añadir en la última línea -p para que cree todo el árbol de subdirectorios: RUN mkdir -p /opt/prefect/data/yellow

docker image build -t n4gash/prefect:zoom .

Después hacemos un push para publicar nuestra imagen en Docker Hub.

docker image push n4gash/prefect:zoom

Creamos un block de tipo Docker Container indicando el nombre zoom y en image indicamos el nombre y tags de nuestra imagen que acabamos de publicar n4gash/prefect:zoom.

Creamos un nuevo fichero docker_deploy.py en el directorio flows/03_deployments. Pegamos el código generado al crear el block (import de la librería prefect.infrastructure y la creación del bloque docker_container_block) y completamos:

from prefect.deployments import Deployment
from prefect.infrastructure.docker import DockerContainer
from parameterized_flows import etl_parent_flow

docker_container_block = DockerContainer.load("zoom")
docker_dep = Deployment.build_from_flow(flow=etl_parent_flow,name='docker-flow',infrastructure=docker_container_block)

if __name__ == "__main__":
    docker_dep.apply()

Si lo ejecutamos, podemos conectarnos al GUI para comprobar que se ha creado el deployment correctamente.

python flows/03_deployments/docker_deploy.py

Podemos comprobar que estamos conectados con el profile por defecto con prefect profiles ls. Lo que vamos a hacer es reemplazar el API local efímera por el API que levanta el agente al arrancar. Configuramos qué API vamos a utilizar con el comando prefect config set y la URL del API que nos muestra Prefect cuando lo arrancamos.

prefect config set PREFECT_API_URL="http://127.0.0.1:4200/api"

Arrancamos el agente de Prefect:

prefect agent start --work-queue "default" 

Activamos el agente y ejecutamos el deployment por línea de comandos

prefect deployment run etl-parent-flow/docker-flow -p "months=[1,2]" 
Output prefect deployment run
ValueError: Path /root/.prefect/storage/44bf1741162a49e8a0a878cc4a87824e does not exist.
12:45:05.414 | ERROR   | Flow run 'camouflaged-dragonfly' - Finished in state Failed('Flow run encountered an exception. ValueError: Path /root/.prefect/storage/44bf1741162a49e8a0a878cc4a87824e does not exist.\n')

Al ejecutar el comando prefect deployment run etl-parent-flow/docker-flow -p "months=[1,2]"/docker-flow -p "months=[1,2]" nos arroja un error de que no encuentra el volume de Prefect. La solución es eliminar la función de caché de la tarea ingest_data. Para ello borramos los parámetros cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1). Es necesario volver a generar y publicar la imagen en el Docker Hub.

Si todo ha ido bien, podemos ver por la consola del agente o desde el GUI de Prefect que tanto el flow maestro (etl-parent-flow) como los hijos (etl-web-to-gcs y etl-gcs-to-bq) se han ejecutado correctamente (hay dos de cada porque porque al ejecutar el deployment en los parámetros hemos pasado un array con dos valores: 1,2):

Ejecuciones de flows en Prefect
Ejecuciones de flows en Prefect

Prefect Cloud

Hasta ahora hemos lanzado los pipelines desde el servidor local de Prefect que levantamos al ejecutar el comando prefect orion start. Como alternativa, podemos desplegar y lanzar el flujo en el SaaS de Prefect Cloud. Además, tenemos a nuestra disposición los Automations que son como triggers que podemos utilizar por ejemplo, para notificar cuando un flujo se ha ejecutado correctamente o no. El registro es gratuito. La interfaz es prácticamente idéntica al GUI que levanta Prefect Orion en local. Seguiremos los siguientes pasos:

1. Creamos un workspace para empezar a trabajar.

2. Desde el perfil de usuario (profile settings) en Prefect Cloud creamos un API Key

API key de Prefect Cloud
API key de Prefect Cloud

3. Nos conectamos a Prefect Cloud por línea de comandos mediante el comando prefect cloud login y la API Key que hemos generado.

 prefect cloud login -k XXXXXXXXXX

4. Creamos los blocks que previamente habíamos creado en la instancia local de Prefect Orion. Podemos crearlos desde el GUI o por CLI (scripts python). En este punto debemos tener en cuenta que el catálogo de blocks disponible es distinto al que teníamos en local. Por ejemplo, sí tenemos a nuestra disposición los bloques de Github y GCS Credentials, pero no aparece el de GCS Bucket, en su lugar tenemos otro denominado GCS. Tras darle muchas vueltas, al final he lanzado la pregunta en slack y Jeff me ha dado la pista. Aunque no aparezca un bloque en el catálogo de Prefect Cloud, ¡podemos crearlo manualmente desde un script python! y voilá:

from prefect_gcp import GcpCredentials
        from prefect_gcp.cloud_storage import GcsBucket


        bucket_block = GcsBucket(
        gcp_credentials=GcpCredentials.load("zoom-gcp-creds"),
        bucket="dtc_data_lake_digital-aloe-375022",
        )

        bucket_block.save("zoom-gcs", overwrite=True)

Y ejecutamos el script:

python python gcp_bucket_block.py

Si accedemos a los blocks en Prefect Cloud deberíamos ver el de Github, GCP Credentials, GCS Bucket:

Block Prefect Cloud

5. Creamos un script para el Deployment. En el ejemplo vamos a usar un flow que se descarga de una repo en Github, para lo que se conecta a través del block correspondiente:

from prefect.deployments import Deployment
from etl_web_to_gcs import etl_web_to_gcs
from prefect.filesystems import GitHub 

storage = GitHub.load("zoomgithub")

deployment = Deployment.build_from_flow(
     flow=etl_web_to_gcs,
     name="github-exercise",
     storage=storage,
     entrypoint="week_2_workflow_orchestration/2_homework/04_question/etl_web_to_gcs.py:etl_web_to_gcs")

if __name__ == "__main__":
    deployment.apply()

Prefect Cloud Automations

Como mencionaba antes, los automations de Prefect Cloud son como triggers que realizan una acción como consecuencia de otra. Podemos configurarlos para responder al ciclo de ejecución de un flujo o para comprobar el estado del agente de Prefect. Las acciones disponibles son:

  • Cancelar flujo
  • Parar programación
  • Parar agente
  • Continuar programación
  • Continuar agente
  • Ejecutar deployment
  • Enviar notificación

Para nuestro ejemplo vamos a usar el último, enviar notificación. Vamos a crear una automation que envíe un correo cuando acabe con éxito cualuier flujo de nuestro workspace. Para ello en primer lugar debemos crear un block de tipo email:

Creación de block Email en Prefect cloud
Creación de block Email en Prefect cloud

Ahora crearemos la automation de tipo Flow run state (queremos que se active cuando se ejecute un flujo). Para todos los flows cuyo estado pase a Completed:

Creación de automation en Prefect cloud
Creación de automation en Prefect cloud

Seleccionamos el bloque de email creado previamente y podríamos modificar el body del email:

Creación de automation en Prefect cloud
Creación de automation en Prefect cloud

Si ejecutamos un deployment o flujo, si termina con éxito nos llegará un email desde Prefect con la comunicación.

Ejecución con éxito de un flow en Prefect Cloud
Correo de confirmación enviado desde el Automation de Prefect

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *

What is 6 + 6 ?
Please leave these two fields as-is: