Continuando con el bootcamp para Data Engineers, en esta segunda semana hemos trabajado en la orquestación del flujo de datos con Prefect (similar a Apache Airflow pero basado en Python). Hemos creado dos ETLs en Python, la primera exporta un CSV de un repositorio de Github, lo transforma/limpia y genera un parquet que posteriormente lo ingesta en un Cloud Storage de Google Cloud Platform (GCP); la segunda ETL ingesta el parquet desde el Cloud Storage en una base de datos BigQuery.
- Repo Bootcamp: https://github.com/DataTalksClub/data-engineering-zoomcamp/tree/main/week_2_workflow_orchestration
- Repo personal con ejercicios: https://github.com/hegdehog/data-engineering-zoomcamp-2023
Última actualización 04/02/2023
Prerequisitos
- Es necesario descargar los módulos de Prefect en nuestro environment de Python. En los videos crean un nuevo entorno con Conda y cargan un requirements.txt con todas las librerías necesarias. En mi caso, he creado un nuevo entorno directamente con
venv
y descargado las librerías correspondientes con los siguientes comandos:
python -m venv pythonPre
pythonPre\Scripts\activate
python -m pip install -r C:\Users\Marcos\DataEngineeringBootcamp\Week2\requirements.txt
El fichero requirements.txt con todos los módulos necesarios es el siguiente:
pandas==1.5.2
prefect==2.7.7
prefect-sqlalchemy==0.2.2
prefect-gcp[cloud_storage]==0.2.3
protobuf==4.21.11
pyarrow==10.0.1
pandas-gbq==0.19.0
- [Opcional] Configurar Visual Studio Code para ejecutar trabajar con la terminal y dividirla en dos ventanas (una para la terminal del sistema y la otra para la de python). Este paso es ajeno al curso, pero me ha gustado como trabajan en los videos y he querido replicarlo. Acordaros que si queréis ejecutar desde Code debéis cambiar el intérprete de Python por el nuevo. El problema que me he encontrado es al intentar activar el nuevo entorno de Python que acabo de crear:
No se puede cargar el archivo C:\Users\Marcos\pythonPre\Scripts\Activate.ps1 porque la ejecución de scripts está deshabilitada en
este sistema. Para obtener más información, consulta el tema about_Execution_Policies en https:/go.microsoft.com/fwlink/?LinkID=135170.
Es necesario definir la política de ejecución de scripts de Windows. Ejecutamos Power Shell como administrador y lanzamos este comando:
Set-ExecutionPolicy RemoteSigned -Scope CurrentUser
Si ejecutamos este nuevo comando, veremos que ya existe política para RemoteSigned y podemos intentar activar de nuevo el entorno de Python.
Get-ExecutionPolicy -List
Prefect
Se trata de una herramienta de código abierto de orquestación de flujos basado en Python. Nos permite crear flujos con tareas y subtareas para, en nuestro caso, construir una ETL. Dispone de un GUI desde el que podremos monitorizar y programar la ejecución de los workflows. Además, incluye una especie de security vault para almacenar conexiones de orígenes/destino, contando con conectores para los servicios de almacenamiento más utilizados de nube pública. Fundamentalmente, consta de los siguientes elementos: Flow, Task y Block que funcionan como decorators de Python.
Flow
Colección de Tasks organizadas que vamos a programar y ejecutar. Se consideran DAG (directed acyclic graphs), que es un tipo de estructura matemática que cumple las siguientes premisas:
- Graph: estructura de datos formada por nodos y sus conexiones. Un Flow sería un graph y las Task serían los nodos.
- Directed: siempre van en una dirección
- Acyclic: no son cíclicos, nunca vuelve a pasar por una task ya ejecutada.
@flow("Ingesta de datos de taxis amarillos")
def my_flow():
Task
Representa una acción dentro de un Flow. Por ejemplo, para una ETL como mínimo tendríamos tres tasks: extract, transformation y load. Se pueden configurar para que se reejecuten en caso de fallo (ideal para la ingesta). En realidad, una task es un método Python que puede realizar cualquier tipo de acción. Se puede configurar con los siguientes parámetros:
- name: Se le asigna un nombre.
name = "Ingest data"
- log_prints: admite
True
oFalse
; permite pintar las trazas por consola.log_prints = true
- retries: configurar número de reintentos si falla.
retries = 3
- tags: podemos añadir etiquetas para catalogar las tasks.
tags = ["Ingest","ETL"]
- cache_key: Se especifica una función que debemos importar como librería
imported from prefect.tasks
. Almacena el resultado en la caché en caso de error, de forma que no necesita re-ejecutarse.cache_key_fn=task_input_hash
- cache_expiration: configuración del tiempo durante el que se va a guardar la caché.
cache_expiration=timedelta(days=1)
@task(retries=3, log_prints=True)
def ingest_data(source):
return
Blocks
Almacén de conexiones con recursos externos (GCS Bucket, Azure, Docker, Databricks, etc) que pueden reutilizarse con distintas tasks. Se pueden crear por línea de comandos (CLI) o desde el GUI.
gcs_block = GcsBucket.load("zoom-gcs")
Construir workflow con Prefect
Configuración de Block (conector a Cloud Storage Bucket de la GCP)
Lo primero que tenemos que hacer es arrancar Prefect utilizando el siguiente comando dentro de la terminal del nuevo entorno de Python que hemos creado:
prefect orion start
Vamos a registrar el conector de Prefect para Google Cloud Platform. Desde línea de comandos ejecutamos la siguiente sentencia que disponibiliza este tipo de conector para poder utilizarlo en nuestros flows.
prefect block register -m prefect_gcp
Para crear el Block que almacena la conexión a nuestro Cloud Storage de GCP accedemos al GUI a través de la URL que nos devuelve el comando start, que tendría que ser algo como http://127.0.0.0:4200. Seguimos los siguientes pasos:
- En la GCP crear una Account Service específica que tenga acceso al Cloud Storage Bucket y otorgarle permisos Storage Admin.
- Generar la key (en formato JSON)
- Abrimos el GUI de Prefect http://127.0.0.0:4200 y nos dirigimos a Blocks, desde donde buscaremos GCS Bucket. Lo configuramos de la siguiente forma:
- Block name: el que queramos, por ejemplo
zoom-gcs
- Bucket: nombre del Cloud Storage que hemos creado en nuestro proyecto.
- GCP Credentials: crearemos un credencial nuevo copiando la Key del service account que acabamos de generar. Sería suficiente con copiar el contenido del JSON y asignar un nombre:
zoom-gcp-creds
- Block name: el que queramos, por ejemplo
Este proceso genera dos bloques, uno para los credenciales y otro para la conexión al Cloud Storage que utiliza dicha autorización:
Pipelines en Python
Vamos a crear los script Python que se encarguen de hacer los tres pasos de la ETL (Extract, transform y load) tanto para descargar el CSV del repositorio a local, como para ingestarlo en nuestra base de datos BigQuery. Como hemos comentado, son métodos clásicos Python a los que se les añade un @flow y @task para que cuando se ejecute el script, se interpreten como un workflow de Prefect. Recordad añadir en el import todas las librerías necesarias, especialmente las de Prefect.
etl_web_to_gcs.py | Descargar CSV, limpieza, exportar a Parquet y subir a Cloud Storage
Vamos a llamar a este pipeline etl_web_to_gcs.py
. En el ejemplo del instructor se incluyen dos métodos de extract:
- Extract_data_local: que descarga el CSV a un directorio local.
- Extract_data_gcs: que carga el CSV transformado a parquet en el Cloud Storage.
Como hemos comentado, el método precedido por @flow() será el orquestador que marque las dependencias de las tasks.
from pathlib import Path
import pandas as pd
from prefect import flow, task
from prefect_gcp.cloud_storage import GcsBucket
@task(retries=3)
def ingest_data(dataset_url):
df = pd.read_csv(dataset_url)
return df
@task(log_prints=True)
def transform_data(df):
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
return df
@task(log_prints=True)
def export_data_local(df, color, dataset_file):
path = Path(f"data/{color}/{dataset_file}.parquet")
df.to_parquet(path, compression="gzip")
return path
@task(log_prints=True)
def export_data_gcs(path):
gcs_block = GcsBucket.load("zoom-gcs")
gcs_block.upload_from_path(from_path=path, to_path=path)
return path
@flow("Ingest file to GCS")
def etl_web_to_gcs():
color = "yellow"
year = 2021
month = 1
dataset_file = f"{color}_tripdata_{year}-{month:02}"
dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{color}/{dataset_file}.csv.gz"
df = ingest_data(dataset_url)
df_clean = transform_data(df)
path = export_data_local(df_clean, color, dataset_file)
export_data_gcs(path)
if __name__ == "__main__":
etl_web_to_gcs()
Es importante revisar la línea 29 del código gcs_block = GcsBucket.load("zoom-gcs")
, ya que debemos especificar el nombre del Block que hemos configurado previamente.
La primera vez que ejecuté el pipeline falló en el método export_data_local
porque la ruta donde descarga el CSV a local debe existir. Es necesario crearla ./data/yellow
.
Una vez corregidas las incidencias, ejecutamos el pipeline con éxito y vemos la salida:
También podemos comprobarlo desde el GUI de Prefect (http://127.0.0.0:4200):
Si accedemos al Cloud Storage comprobamos que se ha cargado correctamente el fichero parquet.
etl_gcs_to_bq.py | Ingestar parquet en BigQuery desde Cloud Storage
Siguiendo los videos del instructor, este pipeline se denomina etl_gcs_to_bq.py
y tiene el siguiente código fuente.
from pathlib import Path
import pandas as pd
from prefect import flow, task
from prefect_gcp.cloud_storage import GcsBucket
from prefect_gcp import GcpCredentials
@task(retries=3)
def extract_from_gcs(color, year, month) -> Path:
"""Data Extract from Google Cloud Storage"""
gcs_path = f"{color}/{color}_tripdata_{year}-{month:02}.parquet"
gcs_block = GcsBucket.load("zoom-gcs")
gcs_block.get_directory(from_path=gcs_path, local_path=f"data/")
print(gcs_path)
return Path(f"data/{gcs_path}")
@task()
def transform(path) -> pd.DataFrame:
"""Data Cleaning"""
df = pd.read_parquet(path)
print(f"pre: missing passenger count: {df['passenger_count'].isna().sum()}")
df['passenger_count'].fillna(0, inplace=True)
print(f"post: missing passenger count: {df['passenger_count'].isna().sum()}")
return df
@task()
def write_bq(df) -> None:
"""Write DataFrame to BigQuery"""
gcp_credentials_block = GcpCredentials.load("zoom-gcp-creds")
df.to_gbq(
destination_table="dezoomcamp.rides",
project_id="digital-aloe-375022",
credentials=gcp_credentials_block.get_credentials_from_service_account(),
chunksize=500_000,
if_exists="append"
)
@flow()
def etl_gcs_to_bq():
color = "yellow"
year = 2021
month = 1
path = extract_from_gcs(color, year, month)
df = transform(path)
write_bq(df)
if __name__ == "__main__":
etl_gcs_to_bq()
Antes de poder ejecutarlo y realizar la ingesta de datos en GCP, debemos crear una tabla en BigQuery con el esquema del fichero parquet (que lo incorpora además de los propios datos). Para ello debemos acceder al portal de Google Cloud Platform y:
- Seleccionar nuestro proyecto
- En el blade lateral pulsar sobre BigQuery
- Pulsar en el botón
+ Add Data
- Seleccionamos como origen el Google Cloud Storage
- En la sección de Source debemos seleccionar el fichero parquet que hemos subido previamente. Dejamos el formato Parquet
- En el Destino elegimos un nombre para el dataset, por ejemplo:
dezoomcamp
- Definimos el nombre de la tabla:
rides
- Si accedemos desde el explorador de BigQuery podremos ver que se ha cargado correctamente. pero lo que queremos es cargarla desde nuestro workflow Prefect, así que debemos trunctarla.
TRUNCATE TABLE dezoomcamp.rides
Cuando tenemos todo listo, ¡ejecutamos!
python etl_gcs_to_bq.py
ERROR | Flow run 'xxxxxx' - Finished in state Failed('Flow run encountered an exception. google.api_core.exceptions.Forbidden: 403 GET: Access Denied: Table xxxxx: Permission bigquery.tables.get denied on table xxxxxx (or it may not exist).\n')
La primera arroja este error. El motivo es que estoy utilizando el Block que creamos en el paso anterior para conectarnos al Cloud Storage Bucket y cuyo Service Account solo tiene permisos de almacenamiento. Debemos añadirle permisos como administrador BigQuery para que tenga acceso a la base de datos.
Ejecutamos de nuevo el pipeline y observamos que funciona correctamente:
Y consultamos la tabla en la base de datos de BigQuery:
Parametrización de pipelines
Con el fin de automatizar las ETLs vamos a parametrizar los pipelines que acabamos de construir. Siguiendo los videos creamos un nuevo fichero parameterized_flows.py
donde copiamos el código de los dos pipelines. Lo que vamos a hacer es sustituir los valores puestos a mano year
, month
y color
por parámetros que se reciben desde un flow padre, encargado de invocar a los dos flows etl_web_to_gcs
y etl_gcs_to_bq
.
from pathlib import Path
import pandas as pd
from prefect import flow, task
from prefect_gcp.cloud_storage import GcsBucket
from prefect.tasks import task_input_hash
from datetime import timedelta
from prefect_gcp import GcpCredentials
##########################################################
######## etl_web_to_gcs ########
##########################################################
@task(retries=3)
def ingest_data(dataset_url):
df = pd.read_csv(dataset_url)
return df
@task(log_prints=True)
def transform_data(df):
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
return df
@task(log_prints=True)
def export_data_local(df, color, dataset_file):
path = Path(f"data/{color}/{dataset_file}.parquet")
df.to_parquet(path, compression="gzip")
return path
@task(log_prints=True)
def export_data_gcs(path):
gcs_block = GcsBucket.load("zoom-gcs")
gcs_block.upload_from_path(from_path=path, to_path=path)
@flow()
def etl_web_to_gcs(year, month, color):
dataset_file = f"{color}_tripdata_{year}-{month:02}"
dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{color}/{dataset_file}.csv.gz"
df = ingest_data(dataset_url)
df_clean = transform_data(df)
path = export_data_local(df_clean, color, dataset_file)
export_data_gcs(path)
##########################################################
######## etl_gcs_to_bq ########
##########################################################
@task(retries=3)
def extract_from_gcs(color, year, month) -> Path:
"""Data Extract from Google Cloud Storage"""
gcs_path = f"{color}/{color}_tripdata_{year}-{month:02}.parquet"
gcs_block = GcsBucket.load("zoom-gcs")
gcs_block.get_directory(from_path=gcs_path, local_path=f"data/")
print(gcs_path)
return Path(f"data/{gcs_path}")
@task()
def transform(path) -> pd.DataFrame:
"""Data Cleaning"""
df = pd.read_parquet(path)
print(f"pre: missing passenger count: {df['passenger_count'].isna().sum()}")
df['passenger_count'].fillna(0, inplace=True)
print(f"post: missing passenger count: {df['passenger_count'].isna().sum()}")
return df
@task()
def write_bq(df) -> None:
"""Write DataFrame to BigQuery"""
gcp_credentials_block = GcpCredentials.load("zoom-gcp-creds")
df.to_gbq(
destination_table="dezoomcamp.rides",
project_id="digital-aloe-375022",
credentials=gcp_credentials_block.get_credentials_from_service_account(),
chunksize=500_000,
if_exists="append"
)
@flow()
def etl_gcs_to_bq(year, month, color):
path = extract_from_gcs(color, year, month)
df = transform(path)
write_bq(df)
##########################################################
######## parent flow ########
##########################################################
@flow()
def etl_parent_flow(months: list[int] = [1,2], year: int = 2021, color: str = "yellow"):
for month in months:
etl_web_to_gcs(year, month, color)
etl_gcs_to_bq(year, month, color)
if __name__ == "__main__":
color="yellow"
months=[1,2,3]
year=2021
etl_parent_flow(months, year, color)
Prefect Deployments
Un despliegue en Prefect es un artefacto de lado de servidor que encapsula un flujo y permite programarlo o lanzarlo vía API. Un flujo puede pertenecer a varios desployments. Podríamos decir que son contenedores con metadata que tienen todo lo que necesita un flow para ser programado. Se pueden crear por línea de comandos o por Python.
Crear Prefect Deployment por python
Una forma rápida de crear un deployment es construyendo un script en python donde se especifiquen las propiedades. Simplemente instanciamos la clase Prefect Deployment
para crear un nuevo deployment pasándole por parámetro el flujo que debe ejecutar, el nombre, el storage o la infraestructure (si queremos ejecutarlo sobre una imagen docker) y el entrypoint
si deseamos especificar la ubicación del fichero donde está el flujo.
En el siguiente ejemplo creamos un deployment para ejecutar un flow cuyo fichero está almacenado en una repo de Github:
from prefect.deployments import Deployment
from etl_web_to_gcs import etl_web_to_gcs
from prefect.filesystems import GitHub
storage = GitHub.load("zoomgithub")
deployment = Deployment.build_from_flow(
flow=etl_web_to_gcs,
name="github-exercise",
storage=storage,
entrypoint="week_2_workflow_orchestration/2_homework/04_question/etl_web_to_gcs.py:etl_web_to_gcs")
if __name__ == "__main__":
deployment.apply()
Para crear el deployment simplemente ejecutamos el script python:
python myDeployment.py
Crear Prefect Deployment por CLI (línea de comandos)
Si creamos un deployment por línea de comandos debemos usar deployment build
y pasándole por parámetro el fichero del pipeline (parameterized_flows.py
), separado por dos puntos el flujo dentro del pipeline que deseamos programar (etl_parent_flow
) y por último le asignamos un nombre con -n
.
prefect deployment build parameterized_flows.py:etl_parent_flow -n "Parameterized ETL"
Cuando ejecutamos con éxito el comando se crea un fichero etl_parent_flow-deployment.yaml
con toda la metadata necesaria para programar el flow. Podemos modificarlo y adaptarlo si fuera necesario. En nuestro caso, queremos programarlo para que ejecute los meses 1, 2 y 3 de 2021 para el color yellow, por lo que modificamos la línea 10 y sustituimos por parameters: { "color": "yellow", "months" :[1, 2, 3], "year": 2021}
###
### A complete description of a Prefect Deployment for flow 'etl-parent-flow'
###
name: Parameterized ETL
description: null
version: 069b00845abcb3eeeff506a311413184
# The work queue that will handle this deployment's runs
work_queue_name: default
tags: []
parameters: { "color": "yellow", "months" :[1, 2, 3], "year": 2021}
schedule: null
infra_overrides: {}
infrastructure:
type: process
env: {}
labels: {}
name: null
command: null
stream_output: true
working_dir: null
block_type_slug: process
_block_type_slug: process
###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: etl-parent-flow
manifest_path: null
storage: null
path: C:\Users\Marcos\Downloads\DataEngineeringBootcamp\week2
entrypoint: parameterized_flows.py:etl_parent_flow
parameter_openapi_schema:
title: Parameters
type: object
properties:
months:
title: months
default:
- 1
- 2
position: 0
type: array
items:
type: integer
year:
title: year
default: 2021
position: 1
type: integer
color:
title: color
default: yellow
position: 2
type: string
required: null
definitions: null
Una vez modificado, debemos ejecutar el comando prefect deployment apply
prefect deployment apply etl_parent_flow-deployment.yaml
Nos dirigimos al GUI de Prefect (http://127.0.0.1:4200/deployments) y en la sección de Deployments accedemos al que acabamos de crear. A la derecha pulsamos sobre el botón Run y en el popup seleccionarmos Quick Run (la opción de Custom Run) permite especificar nuevos valores para los parámetros.
Cuando ejecutamos el Deployments se crea un Work Queue que será el demonio encargado de ejecutarlo. Pero se levanta dormido, para arrancarlo es necesario ejecutar el comando:
prefect agent start --work-queue "default"
El Work Queue que se genera tiene el nombre default y si accedemos podemos copiar el comando para arrancarlo.
Una vez arrancamos el agente por línea de comandos, observamos que empieza a ejecutar el Deployment.
Docker + Prefect
Terminamos con la contenerización de Prefect en Docker. Vamos a crear una imagen y publicarla en Docker Hub (tendrás que crearte una cuenta si aun no la tienes). Construimos el fichero dockerfile
donde indicamos la imagen de prefect que usaremos, instalamos la lista de módulos de requeriments.txt
y por último copiamos los directorios de flows y data.
En primer lugar creamos el fichero docker-requirementes.txt
más adelgazado:
pandas==1.5.2
prefect-gcp[cloud_storage]==0.2.3
protobuf==4.21.11
pyarrow==10.0.1
pandas-gbq==0.18.1
Y posteriormente pasamos a crear el dockerfile
:
FROM prefecthq/prefect:2.7.7-python3.11
COPY docker-requirements.txt .
RUN pip install -r docker-requirements.txt --trusted-host pypi.python.org --no-cache-dir
COPY flows /opt/prefect/flows
RUN mkdir -p /opt/prefect/data/yellow
Una vez guardado, nos ubicamos en el mismo directorio y ejecutamos el siguiente comando. Donde pone n4gash
debes cambiarlo por tu cuenta de Docker Hub.
=> ERROR [5/5] RUN mkdir /opt/prefect/data/yellow 0.3s ------ > [5/5] RUN mkdir /opt/prefect/data/yellow: #9 0.299 mkdir: cannot create directory ‘/opt/prefect/data/yellow’: No such file or directory ------ executor failed running [/bin/sh -c mkdir /opt/prefect/data/yellow]: exit code: 1'
Si cuando intentamos montar la imagen de docker nos arroja este error, debéis añadir en la última línea -p
para que cree todo el árbol de subdirectorios: RUN mkdir -p /opt/prefect/data/yellow
docker image build -t n4gash/prefect:zoom .
Después hacemos un push para publicar nuestra imagen en Docker Hub.
docker image push n4gash/prefect:zoom
Creamos un block de tipo Docker Container indicando el nombre zoom
y en image indicamos el nombre y tags de nuestra imagen que acabamos de publicar n4gash/prefect:zoom
.
Creamos un nuevo fichero docker_deploy.py
en el directorio flows/03_deployments. Pegamos el código generado al crear el block (import de la librería prefect.infrastructure
y la creación del bloque docker_container_block
) y completamos:
from prefect.deployments import Deployment
from prefect.infrastructure.docker import DockerContainer
from parameterized_flows import etl_parent_flow
docker_container_block = DockerContainer.load("zoom")
docker_dep = Deployment.build_from_flow(flow=etl_parent_flow,name='docker-flow',infrastructure=docker_container_block)
if __name__ == "__main__":
docker_dep.apply()
Si lo ejecutamos, podemos conectarnos al GUI para comprobar que se ha creado el deployment correctamente.
python flows/03_deployments/docker_deploy.py
Podemos comprobar que estamos conectados con el profile por defecto con prefect profiles ls
. Lo que vamos a hacer es reemplazar el API local efímera por el API que levanta el agente al arrancar. Configuramos qué API vamos a utilizar con el comando prefect config set
y la URL del API que nos muestra Prefect cuando lo arrancamos.
prefect config set PREFECT_API_URL="http://127.0.0.1:4200/api"
Arrancamos el agente de Prefect:
prefect agent start --work-queue "default"
Activamos el agente y ejecutamos el deployment
por línea de comandos
prefect deployment run etl-parent-flow/docker-flow -p "months=[1,2]"
ValueError: Path /root/.prefect/storage/44bf1741162a49e8a0a878cc4a87824e does not exist. 12:45:05.414 | ERROR | Flow run 'camouflaged-dragonfly' - Finished in state Failed('Flow run encountered an exception. ValueError: Path /root/.prefect/storage/44bf1741162a49e8a0a878cc4a87824e does not exist.\n')
Al ejecutar el comando prefect deployment run etl-parent-flow/docker-flow -p "months=[1,2]"/docker-flow -p "months=[1,2]"
nos arroja un error de que no encuentra el volume de Prefect. La solución es eliminar la función de caché de la tarea ingest_data. Para ello borramos los parámetros cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1)
. Es necesario volver a generar y publicar la imagen en el Docker Hub.
Si todo ha ido bien, podemos ver por la consola del agente o desde el GUI de Prefect que tanto el flow maestro (etl-parent-flow
) como los hijos (etl-web-to-gc
s y etl-gcs-to-bq
) se han ejecutado correctamente (hay dos de cada porque porque al ejecutar el deployment en los parámetros hemos pasado un array con dos valores: 1,2):
Prefect Cloud
Hasta ahora hemos lanzado los pipelines desde el servidor local de Prefect que levantamos al ejecutar el comando prefect orion start
. Como alternativa, podemos desplegar y lanzar el flujo en el SaaS de Prefect Cloud. Además, tenemos a nuestra disposición los Automations que son como triggers que podemos utilizar por ejemplo, para notificar cuando un flujo se ha ejecutado correctamente o no. El registro es gratuito. La interfaz es prácticamente idéntica al GUI que levanta Prefect Orion en local. Seguiremos los siguientes pasos:
1. Creamos un workspace para empezar a trabajar.
2. Desde el perfil de usuario (profile settings) en Prefect Cloud creamos un API Key
3. Nos conectamos a Prefect Cloud por línea de comandos mediante el comando prefect cloud login
y la API Key que hemos generado.
prefect cloud login -k XXXXXXXXXX
4. Creamos los blocks que previamente habíamos creado en la instancia local de Prefect Orion. Podemos crearlos desde el GUI o por CLI (scripts python). En este punto debemos tener en cuenta que el catálogo de blocks disponible es distinto al que teníamos en local. Por ejemplo, sí tenemos a nuestra disposición los bloques de Github y GCS Credentials, pero no aparece el de GCS Bucket, en su lugar tenemos otro denominado GCS. Tras darle muchas vueltas, al final he lanzado la pregunta en slack y Jeff me ha dado la pista. Aunque no aparezca un bloque en el catálogo de Prefect Cloud, ¡podemos crearlo manualmente desde un script python! y voilá:
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import GcsBucket
bucket_block = GcsBucket(
gcp_credentials=GcpCredentials.load("zoom-gcp-creds"),
bucket="dtc_data_lake_digital-aloe-375022",
)
bucket_block.save("zoom-gcs", overwrite=True)
Y ejecutamos el script:
python python gcp_bucket_block.py
Si accedemos a los blocks en Prefect Cloud deberíamos ver el de Github, GCP Credentials, GCS Bucket:
5. Creamos un script para el Deployment. En el ejemplo vamos a usar un flow que se descarga de una repo en Github, para lo que se conecta a través del block correspondiente:
from prefect.deployments import Deployment
from etl_web_to_gcs import etl_web_to_gcs
from prefect.filesystems import GitHub
storage = GitHub.load("zoomgithub")
deployment = Deployment.build_from_flow(
flow=etl_web_to_gcs,
name="github-exercise",
storage=storage,
entrypoint="week_2_workflow_orchestration/2_homework/04_question/etl_web_to_gcs.py:etl_web_to_gcs")
if __name__ == "__main__":
deployment.apply()
Prefect Cloud Automations
Como mencionaba antes, los automations de Prefect Cloud son como triggers que realizan una acción como consecuencia de otra. Podemos configurarlos para responder al ciclo de ejecución de un flujo o para comprobar el estado del agente de Prefect. Las acciones disponibles son:
- Cancelar flujo
- Parar programación
- Parar agente
- Continuar programación
- Continuar agente
- Ejecutar deployment
- Enviar notificación
Para nuestro ejemplo vamos a usar el último, enviar notificación. Vamos a crear una automation que envíe un correo cuando acabe con éxito cualuier flujo de nuestro workspace. Para ello en primer lugar debemos crear un block de tipo email:
Ahora crearemos la automation de tipo Flow run state (queremos que se active cuando se ejecute un flujo). Para todos los flows cuyo estado pase a Completed:
Seleccionamos el bloque de email creado previamente y podríamos modificar el body del email:
Si ejecutamos un deployment o flujo, si termina con éxito nos llegará un email desde Prefect con la comunicación.