简化Amazon MWAA中的Airflow REST API交互

作者 Chandan Rupakheti 和 Hernan Garcia日期 2024年10月23日来源 Amazon Managed Workflows for Apache Airflow (Amazon MWAA)

关键要点

简化的集成 新的 InvokeRestApi 功能大大简化了与Airflow REST API的交互,无需管理访问令牌和会话cookie。改进的可用性 通过中介方式,增强后的API直接将Airflow API执行结果传递给客户端。自动管理 简化的REST API访问支持自动化管理任务。事件驱动架构 支持基于事件的触发,可以更灵活地处理数据流程。数据感知调度 利用数据集调度特性,提高对工作负载的管理和资源的调整。

今天,我们很高兴地宣布,Amazon MWAA与Airflow REST API 的集成得到了增强。这一改进简化了访问和管理您的Airflow环境的能力,同时也便于程序化地与工作流交互。Airflow REST API支持多种用例,包括集中和自动化管理任务,以及构建基于事件的、数据感知的数据管道。

接下来的部分,我们将讨论这一增强功能并展示若干用例,展示其在您的Amazon MWAA环境中的应用。

Airflow REST API

Airflow REST API是一个程序接口,允许您与Airflow的核心功能进行交互。它是一组HTTP端点,可以执行诸如调用有向无环图DAGs、检查任务状态、检索工作流元数据、管理连接和变量,甚至启动与数据集相关的事件,而无需直接访问Airflow的网页界面或命令行工具。

在此之前,Amazon MWAA为与Airflow REST API的交互提供了基础,但管理访问令牌和会话cookie的过程增加了复杂性。现在,Amazon MWAA支持通过AWS凭证与Airflow REST API交互,简化了操作并提高了可用性。

增强概述

新的InvokeRestApi 功能允许您使用有效的SigV4签名和您的现有AWS凭证发起Airflow REST API请求。这个特性现在适用于所有Amazon MWAA环境243中的受支持的AWS区域。通过作为中介,该REST API代表用户处理请求,只需环境名称和API请求有效负载作为输入。

通过增强的Amazon MWAA API与Airflow REST API的集成带来了几个关键好处:

好处描述简化集成新的 InvokeRestApi 功能消除了管理访问令牌和会话cookies的复杂性。提升可用性增强后的API直接向客户端提供Airflow REST API执行结果。自动管理支持自动化各种行政和管理任务,比如管理Airflow变量、连接等。事件驱动架构支持根据外部事件触发Airflow DAG。数据感知调度通过数据集调度特性,增强了对于工作负载的管理和资源的动态扩展。

在接下来的部分,我们将展示如何在各种用例中使用增强后的API。

如何使用增强的Amazon MWAA API

以下代码片段展示了增强REST API的一般请求格式:

httpPOST /restapi/Name HTTP/11Contenttype application/json

{ Name String Method String Path String QueryParameters Json Body Json}

所需的参数包括Amazon MWAA环境的Name、要调用的Airflow REST API端点的Path以及所使用的HTTP Method,而QueryParameters和Body为可选参数。

以下代码片段展示了一般的响应格式:

json{ RestApiStatusCode Number RestApiResponse Json}

RestApiStatusCode表示由Airflow REST API调用返回的HTTP状态代码,而RestApiResponse包含来自Airflow REST API的响应有效负载。

以下示例代码片段展示了如何使用增强集成更新Airflow变量的描述字段。该调用使用AWS Python SDK来调用Airflow REST API。

pythonimport boto3

创建一个boto3客户端

mwaaclient = boto3client(mwaa)

使用boto3客户端调用增强的REST API

response = mwaaclientinvokerestapi( Name= Method=PATCH Path=f/variables/ Body={ key value description } QueryParameters={ updatemask [description] })

访问REST调用的输出

statuscode = response[RestApiStatusCode]result = response[RestApiResponse]

要使用invokerestapi SDK调用,调用客户端需要有AWS身份和访问管理IAM权限,包括对所需环境的airflowInvokeRestAPI权限。该权限可以限定特定Airflow角色管理员、操作员、用户、查看器或公共的访问级别。

这一简单而强大的REST API支持您Amazon MWAA环境中的多种用例。接下来,我们将回顾一些重要用例。

自动化管理和管理任务

在此次发布之前,要自动化配置和资源设置如变量、连接、槽池等,您需要编写冗长的样板代码以向Amazon MWAA网络服务器发出API请求,并在此过程中处理cookie和会话管理。现在,您可以通过新的增强REST API简化这种自动化。

假设您想自动维护您的Amazon MWAA环境中的变量,您需要对Airflow变量执行创建、读取、更新和删除等API操作。以下是一个简单的Python客户端示例mwaavariablesclientpy:

pythonimport boto3

MWAA环境变量管理客户端

class MWAAVariablesClient # 初始化客户端,包含环境名称和可选的MWAA boto3客户端 def init(self envname mwaaclient=None) selfenvname = envname selfclient = mwaaclient or boto3client(mwaa)

# 列出MWAA环境中的所有变量def list(self)    response = selfclientinvokerestapi(        Name=selfenvname        Method=GET        Path=/variables    )    output = response[RestApiResponse][variables]    return output# 通过键获取特定变量def get(self key)    response = selfclientinvokerestapi(        Name=selfenvname        Method=GET        Path=f/variables/{key}    )    return response[RestApiResponse]# 创建一个新变量,包含键、值和可选描述def create(self key value description=None)    response = selfclientinvokerestapi(        Name=selfenvname        Method=POST        Path=/variables        Body={            key key            value value            description description        }    )    return response[RestApiResponse]# 更新现有变量的值和描述def update(self key value description queryparameters=None)    response = selfclientinvokerestapi(        Name=selfenvname        Method=PATCH        Path=f/variables/{key}        Body={            key key            value value            description description        }        QueryParameters=queryparameters    )    return response[RestApiResponse]# 根据键删除变量def delete(self key)    response = selfclientinvokerestapi(        Name=selfenvname        Method=DELETE        Path=f/variables/{key}    )    return response[RestApiStatusCode]

if name == main client = MWAAVariablesClient()

print(n正在创建测试变量 )response = clientcreate(    key=test    value=Test value    description=Test description)print(response)print(n正在列出所有变量 )variables = clientlist()print(variables)print(n正在获取测试变量 )response = clientget(test)print(response)print(n正在更新测试变量的值和描述 )response = clientupdate(    key=test    value=Updated Value    description=Updated description)print(response)print(n仅更新测试变量的描述 )response = clientupdate(    key=test     value=Updated Value     description=Yet another updated description     queryparameters={updatemask [description]})print(response)print(n正在删除测试变量 )responsecode = clientdelete(test)print(f响应码 {responsecode})print(n最终获取已删除的测试变量 )try    response = clientget(test)    print(response)except Exception as e    print(eresponse[RestApiResponse])

确保您已使用适当的AWS凭证配置您的终端,您可以运行上面的Python脚本来实现以下结果:

plaintext python mwaavariablesclientpy

正在创建测试变量 {description Test description key test value Test value}

正在列出所有变量 [{key test value Test value}]

正在获取测试变量 {key test value Test value}

正在更新测试变量的值和描述 {description Updated description key test value Updated Value}

仅更新测试变量的描述 {description Yet another updated description key test value Updated Value}

正在删除测试变量 响应码 204

在 Amazon MWAA 中简化与 Airflow REST API 的交互 大数据博客

最终获取已删除的测试变量 {detail Variable does not exist status 404 title Variable not found type https//airflowapacheorg/docs/apacheairflow/281/stablerestapirefhtml#section/Errors/NotFound}

接下来,让我们进一步探索其他有用的用例。

构建事件驱动的数据管道

Airflow社区一直在积极创新,以增强平台的数据感知能力,使您能够构建更动态和响应式的工作流程。当我们宣布在Amazon MWAA中支持版本292时,我们引入了使管道能对数据集变化做反应的新功能。这种与Airflow REST API的简化集成使得实现数据驱动的管道变得更为直接。

考虑一个用例,您需要运行一个使用外部事件输入的管道。以下示例DAG执行一个作为参数提供的bash命令anybashcommandpy:

python此DAG允许您执行作为参数提供的bash命令。命令作为参数传递,称为command。

from airflow import DAGfrom airflowoperatorsbashoperator import BashOperatorfrom airflowmodelsparam import Paramfrom datetime import datetime

with DAG( dagid=anybashcommand schedule=None startdate=datetime(2022 1 1) catchup=False params={ command Param(env type=string) }) as dag clicommand = BashOperator( taskid=triggeredbashcommand bashcommand={{ dagrunconf[command] }} )

借助增强的REST API,您可以创建一个客户端来调用这个DAG,并提供您选择的bash命令,如下所示mwaadagrunclientpy:

安易加速器下载官网

pythonimport boto3

触发DAG运行的客户端

class MWAADagRunClient # 初始化客户端,包含MWAA环境名称和可选的MWAA boto3客户端 def init(self envname mwaaclient=None) selfenvname = envname selfclient = mwaaclient or boto3client(mwaa)

# 触发DAG运行,指定参数def triggerrun(self         dagid         dagrunid=None        logicaldate=None        dataintervalstart=None        dataintervalend=None        note=None        conf=None)    body = {}    if dagrunid        body[dagrunid] = dagrunid    if logicaldate        body[logicaldate] = logicaldate    if dataintervalstart        body[dataintervalstart] = dataintervalstart    if dataintervalend        body[dataintervalend] = dataintervalend    if note        body[note] = note    body[conf] = conf or {}    response = selfclientinvokerestapi(        Name=selfenvname        Method=POST        Path=f/dags/{dagid}/dagRuns        Body=body    )    return response[RestApiResponse]

if name == main client = MWAADagRunClient()

print(n正在触发DAG运行 )result = clienttriggerrun(    dagid=anybashcommand     conf={        command echo Hello from external trigger!    })print(result)

以下代码片段展示了该脚本的样本运行:

plaintext python mwaadagrunclientpy正在触发DAG运行 {conf {command echo Hello from external trigger!} dagid anybashcommand dagrunid manual20241021T1656098529080000 dataintervalend 20241021T1656098529080000 dataintervalstart 20241021T1656098529080000 executiondate 20241021T1656098529080000 externaltrigger True logicaldate 20241021T1656098529080000 runtype manual state queued}

在Airflow UI中,triggerbashcommand任务显示了以下执行日志:

plaintext[20241021 165612 UTC] {localtaskjobrunnerpy123} 任务执行前日志[20241021 165612 UTC] {subprocesspy63} INFO 临时目录根位置 /tmp[20241021 165612 UTC] {subprocesspy75} INFO 运行命令 [/usr/bin/bash c echo Hello from external trigger!][20241021 165612 UTC] {subprocesspy86} INFO 输出[20241021 165612 UTC] {subprocesspy93} INFO 外部触发的Hello![20241021 165612 UTC] {subprocesspy97} INFO 命令以返回代码0退出

您还可以进一步扩展此示例以创建更有用的事件驱动架构。让我们将用例扩展到在新文件进入Amazon S3数据湖时运行数据管道并执行提取、转换和加载ETL作业。以下图表展示了一种架构方法。

在通过外部输入调用DAG的上下文中,AWS Lambda函数对Amazon MWAA网络服务器的繁忙程度一无所知,这可能导致函数在短时间内处理大量文件,从而压倒Amazon MWAA网络服务器。

一种调节文件处理吞吐量的方法是在S3桶和Lambda函数之间引入一个Amazon Simple Queue ServiceAmazon SQS队列,这可以帮助限制对网络服务器的API请求速率。您可以通过配置最大并发性来实现Lambda的SQS事件源。然而,Lambda函数仍然不知道Amazon MWAA环境中可用的处理能力,以