简化Amazon MWAA中的Airflow REST API交互
作者 Chandan Rupakheti 和 Hernan Garcia日期 2024年10月23日来源 Amazon Managed Workflows for Apache Airflow (Amazon MWAA)
关键要点
简化的集成 新的 InvokeRestApi 功能大大简化了与Airflow REST API的交互,无需管理访问令牌和会话cookie。改进的可用性 通过中介方式,增强后的API直接将Airflow API执行结果传递给客户端。自动管理 简化的REST API访问支持自动化管理任务。事件驱动架构 支持基于事件的触发,可以更灵活地处理数据流程。数据感知调度 利用数据集调度特性,提高对工作负载的管理和资源的调整。今天,我们很高兴地宣布,Amazon MWAA与Airflow REST API 的集成得到了增强。这一改进简化了访问和管理您的Airflow环境的能力,同时也便于程序化地与工作流交互。Airflow REST API支持多种用例,包括集中和自动化管理任务,以及构建基于事件的、数据感知的数据管道。
接下来的部分,我们将讨论这一增强功能并展示若干用例,展示其在您的Amazon MWAA环境中的应用。
Airflow REST API
Airflow REST API是一个程序接口,允许您与Airflow的核心功能进行交互。它是一组HTTP端点,可以执行诸如调用有向无环图DAGs、检查任务状态、检索工作流元数据、管理连接和变量,甚至启动与数据集相关的事件,而无需直接访问Airflow的网页界面或命令行工具。
在此之前,Amazon MWAA为与Airflow REST API的交互提供了基础,但管理访问令牌和会话cookie的过程增加了复杂性。现在,Amazon MWAA支持通过AWS凭证与Airflow REST API交互,简化了操作并提高了可用性。
增强概述
新的InvokeRestApi 功能允许您使用有效的SigV4签名和您的现有AWS凭证发起Airflow REST API请求。这个特性现在适用于所有Amazon MWAA环境243中的受支持的AWS区域。通过作为中介,该REST API代表用户处理请求,只需环境名称和API请求有效负载作为输入。
通过增强的Amazon MWAA API与Airflow REST API的集成带来了几个关键好处:
好处描述简化集成新的 InvokeRestApi 功能消除了管理访问令牌和会话cookies的复杂性。提升可用性增强后的API直接向客户端提供Airflow REST API执行结果。自动管理支持自动化各种行政和管理任务,比如管理Airflow变量、连接等。事件驱动架构支持根据外部事件触发Airflow DAG。数据感知调度通过数据集调度特性,增强了对于工作负载的管理和资源的动态扩展。在接下来的部分,我们将展示如何在各种用例中使用增强后的API。
如何使用增强的Amazon MWAA API
以下代码片段展示了增强REST API的一般请求格式:
httpPOST /restapi/Name HTTP/11Contenttype application/json
{ Name String Method String Path String QueryParameters Json Body Json}
所需的参数包括Amazon MWAA环境的Name、要调用的Airflow REST API端点的Path以及所使用的HTTP Method,而QueryParameters和Body为可选参数。
以下代码片段展示了一般的响应格式:
json{ RestApiStatusCode Number RestApiResponse Json}
RestApiStatusCode表示由Airflow REST API调用返回的HTTP状态代码,而RestApiResponse包含来自Airflow REST API的响应有效负载。
以下示例代码片段展示了如何使用增强集成更新Airflow变量的描述字段。该调用使用AWS Python SDK来调用Airflow REST API。
pythonimport boto3
创建一个boto3客户端
mwaaclient = boto3client(mwaa)
使用boto3客户端调用增强的REST API
response = mwaaclientinvokerestapi( Name= Method=PATCH Path=f/variables/ Body={ key value description } QueryParameters={ updatemask [description] })
访问REST调用的输出
statuscode = response[RestApiStatusCode]result = response[RestApiResponse]
要使用invokerestapi SDK调用,调用客户端需要有AWS身份和访问管理IAM权限,包括对所需环境的airflowInvokeRestAPI权限。该权限可以限定特定Airflow角色管理员、操作员、用户、查看器或公共的访问级别。
这一简单而强大的REST API支持您Amazon MWAA环境中的多种用例。接下来,我们将回顾一些重要用例。
自动化管理和管理任务
在此次发布之前,要自动化配置和资源设置如变量、连接、槽池等,您需要编写冗长的样板代码以向Amazon MWAA网络服务器发出API请求,并在此过程中处理cookie和会话管理。现在,您可以通过新的增强REST API简化这种自动化。
假设您想自动维护您的Amazon MWAA环境中的变量,您需要对Airflow变量执行创建、读取、更新和删除等API操作。以下是一个简单的Python客户端示例mwaavariablesclientpy:
pythonimport boto3
MWAA环境变量管理客户端
class MWAAVariablesClient # 初始化客户端,包含环境名称和可选的MWAA boto3客户端 def init(self envname mwaaclient=None) selfenvname = envname selfclient = mwaaclient or boto3client(mwaa)
# 列出MWAA环境中的所有变量def list(self) response = selfclientinvokerestapi( Name=selfenvname Method=GET Path=/variables ) output = response[RestApiResponse][variables] return output# 通过键获取特定变量def get(self key) response = selfclientinvokerestapi( Name=selfenvname Method=GET Path=f/variables/{key} ) return response[RestApiResponse]# 创建一个新变量,包含键、值和可选描述def create(self key value description=None) response = selfclientinvokerestapi( Name=selfenvname Method=POST Path=/variables Body={ key key value value description description } ) return response[RestApiResponse]# 更新现有变量的值和描述def update(self key value description queryparameters=None) response = selfclientinvokerestapi( Name=selfenvname Method=PATCH Path=f/variables/{key} Body={ key key value value description description } QueryParameters=queryparameters ) return response[RestApiResponse]# 根据键删除变量def delete(self key) response = selfclientinvokerestapi( Name=selfenvname Method=DELETE Path=f/variables/{key} ) return response[RestApiStatusCode]if name == main client = MWAAVariablesClient()
print(n正在创建测试变量 )response = clientcreate( key=test value=Test value description=Test description)print(response)print(n正在列出所有变量 )variables = clientlist()print(variables)print(n正在获取测试变量 )response = clientget(test)print(response)print(n正在更新测试变量的值和描述 )response = clientupdate( key=test value=Updated Value description=Updated description)print(response)print(n仅更新测试变量的描述 )response = clientupdate( key=test value=Updated Value description=Yet another updated description queryparameters={updatemask [description]})print(response)print(n正在删除测试变量 )responsecode = clientdelete(test)print(f响应码 {responsecode})print(n最终获取已删除的测试变量 )try response = clientget(test) print(response)except Exception as e print(eresponse[RestApiResponse])确保您已使用适当的AWS凭证配置您的终端,您可以运行上面的Python脚本来实现以下结果:
plaintext python mwaavariablesclientpy
正在创建测试变量 {description Test description key test value Test value}
正在列出所有变量 [{key test value Test value}]
正在获取测试变量 {key test value Test value}
正在更新测试变量的值和描述 {description Updated description key test value Updated Value}
仅更新测试变量的描述 {description Yet another updated description key test value Updated Value}
正在删除测试变量 响应码 204

最终获取已删除的测试变量 {detail Variable does not exist status 404 title Variable not found type https//airflowapacheorg/docs/apacheairflow/281/stablerestapirefhtml#section/Errors/NotFound}
接下来,让我们进一步探索其他有用的用例。
构建事件驱动的数据管道
Airflow社区一直在积极创新,以增强平台的数据感知能力,使您能够构建更动态和响应式的工作流程。当我们宣布在Amazon MWAA中支持版本292时,我们引入了使管道能对数据集变化做反应的新功能。这种与Airflow REST API的简化集成使得实现数据驱动的管道变得更为直接。
考虑一个用例,您需要运行一个使用外部事件输入的管道。以下示例DAG执行一个作为参数提供的bash命令anybashcommandpy:
python此DAG允许您执行作为参数提供的bash命令。命令作为参数传递,称为command。
from airflow import DAGfrom airflowoperatorsbashoperator import BashOperatorfrom airflowmodelsparam import Paramfrom datetime import datetime
with DAG( dagid=anybashcommand schedule=None startdate=datetime(2022 1 1) catchup=False params={ command Param(env type=string) }) as dag clicommand = BashOperator( taskid=triggeredbashcommand bashcommand={{ dagrunconf[command] }} )
借助增强的REST API,您可以创建一个客户端来调用这个DAG,并提供您选择的bash命令,如下所示mwaadagrunclientpy:
安易加速器下载官网pythonimport boto3
触发DAG运行的客户端
class MWAADagRunClient # 初始化客户端,包含MWAA环境名称和可选的MWAA boto3客户端 def init(self envname mwaaclient=None) selfenvname = envname selfclient = mwaaclient or boto3client(mwaa)
# 触发DAG运行,指定参数def triggerrun(self dagid dagrunid=None logicaldate=None dataintervalstart=None dataintervalend=None note=None conf=None) body = {} if dagrunid body[dagrunid] = dagrunid if logicaldate body[logicaldate] = logicaldate if dataintervalstart body[dataintervalstart] = dataintervalstart if dataintervalend body[dataintervalend] = dataintervalend if note body[note] = note body[conf] = conf or {} response = selfclientinvokerestapi( Name=selfenvname Method=POST Path=f/dags/{dagid}/dagRuns Body=body ) return response[RestApiResponse]if name == main client = MWAADagRunClient()
print(n正在触发DAG运行 )result = clienttriggerrun( dagid=anybashcommand conf={ command echo Hello from external trigger! })print(result)以下代码片段展示了该脚本的样本运行:
plaintext python mwaadagrunclientpy正在触发DAG运行 {conf {command echo Hello from external trigger!} dagid anybashcommand dagrunid manual20241021T1656098529080000 dataintervalend 20241021T1656098529080000 dataintervalstart 20241021T1656098529080000 executiondate 20241021T1656098529080000 externaltrigger True logicaldate 20241021T1656098529080000 runtype manual state queued}
在Airflow UI中,triggerbashcommand任务显示了以下执行日志:
plaintext[20241021 165612 UTC] {localtaskjobrunnerpy123} 任务执行前日志[20241021 165612 UTC] {subprocesspy63} INFO 临时目录根位置 /tmp[20241021 165612 UTC] {subprocesspy75} INFO 运行命令 [/usr/bin/bash c echo Hello from external trigger!][20241021 165612 UTC] {subprocesspy86} INFO 输出[20241021 165612 UTC] {subprocesspy93} INFO 外部触发的Hello![20241021 165612 UTC] {subprocesspy97} INFO 命令以返回代码0退出
您还可以进一步扩展此示例以创建更有用的事件驱动架构。让我们将用例扩展到在新文件进入Amazon S3数据湖时运行数据管道并执行提取、转换和加载ETL作业。以下图表展示了一种架构方法。
在通过外部输入调用DAG的上下文中,AWS Lambda函数对Amazon MWAA网络服务器的繁忙程度一无所知,这可能导致函数在短时间内处理大量文件,从而压倒Amazon MWAA网络服务器。
一种调节文件处理吞吐量的方法是在S3桶和Lambda函数之间引入一个Amazon Simple Queue ServiceAmazon SQS队列,这可以帮助限制对网络服务器的API请求速率。您可以通过配置最大并发性来实现Lambda的SQS事件源。然而,Lambda函数仍然不知道Amazon MWAA环境中可用的处理能力,以