Apache NiFi 2.x处理器:ConsumeAMQP 2.2.0

Apache NiFi 2.x处理器:ConsumeAMQP 2.2.0

经验文章nimo972025-03-27 12:51:0910A+A-

摘要

AMQP属性、配置详情

org.apache.nifi | nifi-amqp-nar

描述

使用AMQP 0.9.1协议从AMQP代理服务器消费AMQP消息。从AMQP代理服务器接收到的每一条消息,都会作为独立的FlowFile发送到“success”关系。

标签

amqp、消费、获取、消息、rabbit、接收

输入要求

禁止

支持敏感动态属性

ConsumeAMQP 2.2.0的更多详细信息

ConsumeAMQP

摘要

该处理器从AMQP消息队列消费消息,并将其转换为FlowFile,以便路由到流中的下一个组件。撰写本文档时,支持的AMQP协议版本为v0.9.1。

该组件基于RabbitMQ客户端API。以下指南和教程可能有助于你复习一些AMQP基础知识。

该处理器执行两项操作。它从消费的AMQP消息(包括消息体和属性)中提取信息来构建FlowFile。一旦消息被消费,就会构建一个FlowFile。消息体被写入FlowFile,其
com.rabbitmq.client.AMQP.BasicProperties属性会作为FlowFile的属性进行传输。AMQP属性名称会加上amqp$前缀。

AMQP属性

以下是消息可能携带的标准AMQP属性列表:(“amqp

contentEncoding”、“amqp

deliveryMode”、“amqp

correlationId”、“amqp

expiration”、“amqp

timestamp”、“amqp

userId”、“amqp

clusterId”、“amqp$routingKey” )

配置详情

撰写本文档时,它仅定义了适用于大多数情况的基本配置属性。随着该组件的发展,以后会定义其他属性。配置PublishAMQP:

  1. 队列 - [必填] 用于检索消息的AMQP队列名称。通常由管理员提供(例如,‘amq.direct’) 。
  2. 主机名 - [必填] AMQP代理服务器运行所在的主机名。通常由管理员提供(例如,‘myhost.com’)。默认值为’localhost’。
  3. 端口 - [必填] AMQP代理服务器运行的端口号。通常由管理员提供(例如, '2453’)。默认值为‘5672’。
  4. 用户名 - [必填] 连接到AMQP代理服务器的用户名。通常由管理员提供(例如,‘me’)。默认值为‘guest’。
  5. 密码 - [必填] 与用户名一起用于连接AMQP代理服务器的密码。通常由管理员提供。默认值为‘guest’。
  6. 使用证书认证 - [可选] 使用SSL证书的通用名称进行认证,而不是使用用户名/密码。此选项只能与SSL结合使用。默认值为‘false’。
  7. 虚拟主机 - [可选] 虚拟主机名,用于隔离AMQP系统以增强安全性。有关虚拟主机的更多详细信息,请参阅此博客。

属性

AMQP版本

AMQP版本。目前仅支持AMQP v0.9.1。

  • 显示名称:AMQP版本
  • 描述:AMQP版本。目前仅支持AMQP v0.9.1
  • API名称:AMQP版本
  • 默认值:0.9.1
  • 允许值:0.9.1
  • 表达式语言作用域:不支持
  • 是否敏感:否
  • 是否必填:是

自动确认消息

如果为false(非自动确认),在将FlowFile传输到success并提交NiFi会话后,处理器将确认消息。非自动确认模式提供“至少一次”的交付语义。如果为true(自动确认),发送到AMQP客户端的消息在发出后将由AMQP代理服务器自动确认。这通常会提供更好的吞吐量,但在AMQP代理服务器、NiFi或处理器重启/崩溃时,也会导致消息丢失。自动确认模式提供“至多一次”的交付语义,仅在允许丢失消息的情况下推荐使用。

  • 显示名称:自动确认消息
  • 描述:如果为false(非自动确认),在将FlowFile传输到success并提交NiFi会话后,处理器将确认消息。非自动确认模式提供“至少一次”的交付语义。如果为true(自动确认),发送到AMQP客户端的消息在发出后将由AMQP代理服务器自动确认。这通常会提供更好的吞吐量,但在AMQP代理服务器、NiFi或处理器重启/崩溃时,也会导致消息丢失。自动确认模式提供“至多一次”的交付语义,仅在允许丢失消息的情况下推荐使用
  • API名称:auto.acknowledge
  • 默认值:false
  • 允许值:true、false
  • 表达式语言作用域:不支持
  • 是否敏感:否
  • 是否必填:是

批次大小

单个会话中应处理的最大消息数。一旦接收到这么多消息(或者没有更多可用消息),接收到的消息将被传输到“success”关系,并向AMQP代理服务器确认这些消息。将此值设置得较大可能会提高性能,特别是对于非常小的消息,但在NiFi突然重启时,也可能导致更多消息重复。

  • 显示名称:批次大小
  • 描述:单个会话中应处理的最大消息数。一旦接收到这么多消息(或者没有更多可用消息),接收到的消息将被传输到“success”关系,并向AMQP代理服务器确认这些消息。将此值设置得较大可能会提高性能,特别是对于非常小的消息,但在NiFi突然重启时,也可能导致更多消息重复
  • API名称:batch.size
  • 默认值:10
  • 表达式语言作用域:不支持
  • 是否敏感:否
  • 是否必填:是

代理服务器

:格式(例如,localhost:5672)列出的已知AMQP代理服务器的逗号分隔列表。如果设置了此属性,则忽略“主机名”和“端口”属性。列表中应仅包含来自同一AMQP集群的主机。

  • 显示名称:代理服务器
  • 描述:以:格式(例如,localhost:5672)列出的已知AMQP代理服务器的逗号分隔列表。如果设置了此属性,则忽略“主机名”和“端口”属性。列表中应仅包含来自同一AMQP集群的主机
  • API名称:Brokers
  • 表达式语言作用域:JVM级别定义的环境变量和系统属性
  • 是否敏感:否
  • 是否必填:否

使用客户端证书认证

使用SSL证书而不是用户名/密码进行认证。

  • 显示名称:使用客户端证书认证
  • 描述:使用SSL证书而不是用户名/密码进行认证
  • API名称:cert-authentication
  • 默认值:false
  • 允许值:true、false
  • 表达式语言作用域:不支持
  • 是否敏感:否
  • 是否必填:否

头部输出格式

定义如何输出接收到消息的头部信息。

  • 显示名称:头部输出格式
  • 描述:定义如何输出接收到消息的头部信息
  • API名称:header.format
  • 默认值:逗号分隔字符串
  • 允许值:逗号分隔字符串、JSON字符串、FlowFile属性
  • 表达式语言作用域:不支持
  • 是否敏感:否
  • 是否必填:是

头部键前缀

添加到FlowFile属性中的头部键的前缀文本。处理器会在该属性值后附加一个“.”。

  • 显示名称:头部键前缀
  • 描述:添加到FlowFile属性中的头部键的前缀文本。处理器会在该属性值后附加一个“.”
  • API名称:header.key.prefix
  • 默认值:consume.amqp
  • 表达式语言作用域:不支持
  • 是否敏感:否
  • 是否必填:是

头部分隔符

在字符串中用于分隔头部键值对的字符。该值必须为单个字符。

  • 显示名称:头部分隔符
  • 描述:在字符串中用于分隔头部键值对的字符。该值必须为单个字符
  • API名称:header.separator
  • 默认值:,
  • 表达式语言作用域:不支持
  • 是否敏感:否
  • 是否必填:否
  • 依赖项:头部输出格式设置为[逗号分隔字符串]中的任意一个

主机名

AMQP代理服务器的网络地址(例如,localhost)。如果设置了“代理服务器”属性,则忽略此属性。

  • 显示名称:主机名
  • 描述:AMQP代理服务器的网络地址(例如,localhost)。如果设置了“代理服务器”属性,则忽略此属性
  • API名称:Host Name
  • 默认值:localhost
  • 表达式语言作用域:JVM级别定义的环境变量和系统属性
  • 是否敏感:否
  • 是否必填:否

密码

用于认证和授权的密码。

  • 显示名称:密码
  • 描述:用于认证和授权的密码
  • API名称:Password
  • 表达式语言作用域:不支持
  • 是否敏感:是
  • 是否必填:否

端口

标识AMQP代理服务器端口的数字值(例如,5671)。如果设置了“代理服务器”属性,则忽略此属性。

  • 显示名称:端口
  • 描述:标识AMQP代理服务器端口的数字值(例如,5671)。如果设置了“代理服务器”属性,则忽略此属性
  • API名称:Port
  • 默认值:5672
  • 表达式语言作用域:JVM级别定义的环境变量和系统属性
  • 是否敏感:否
  • 是否必填:否

预取计数

消费者的最大未确认消息数。如果消费者有这么多未确认消息,AMQP代理服务器在消费者确认已交付的部分消息之前,将不再发送新消息。允许值范围:0到65535。0表示无限制。

  • 显示名称:预取计数
  • 描述:消费者的最大未确认消息数。如果消费者有这么多未确认消息,AMQP代理服务器在消费者确认已交付的部分消息之前,将不再发送新消息。允许值范围:0到65535。0表示无限制
  • API名称:prefetch.count
  • 默认值:0
  • 表达式语言作用域:不支持
  • 是否敏感:否
  • 是否必填:是

队列

要从中消费消息的现有AMQP队列名称。通常由AMQP管理员预先定义。

  • 显示名称:队列
  • 描述:要从中消费消息的现有AMQP队列名称。通常由AMQP管理员预先定义
  • API名称:Queue
  • 表达式语言作用域:不支持
  • 是否敏感:否
  • 是否必填:是

删除花括号

如果选择“true删除花括号”,则会自动删除头部中的花括号。

  • 显示名称:删除花括号
  • 描述:如果选择“true删除花括号”,则会自动删除头部中的花括号
  • API名称:remove.curly.braces
  • 默认值:False
  • 允许值:True、False
  • 表达式语言作用域:不支持
  • 是否敏感:否
  • 是否必填:否
  • 依赖项:头部输出格式设置为[逗号分隔字符串]中的任意一个

SSL上下文服务

用于为TLS/SSL连接提供客户端证书信息的SSL上下文服务。

  • 显示名称:SSL上下文服务
  • 描述:用于为TLS/SSL连接提供客户端证书信息的SSL上下文服务
  • API名称:ssl-context-service
  • 服务接口:org.apache.nifi.ssl.SSLContextProviderService
  • 服务实现:org.apache.nifi.ssl.PEMEncodedSSLContextProvider、org.apache.nifi.ssl.StandardRestrictedSSLContextService、org.apache.nifi.ssl.StandardSSLContextService
  • 表达式语言作用域:不支持
  • 是否敏感:否
  • 是否必填:否

用户名

用于认证和授权的用户名。

  • 显示名称:用户名
  • 描述:用于认证和授权的用户名
  • API名称:User Name
  • 表达式语言作用域:JVM级别定义的环境变量和系统属性
  • 是否敏感:否
  • 是否必填:否

虚拟主机

用于隔离AMQP系统以增强安全性的虚拟主机名。

  • 显示名称:虚拟主机
  • 描述:用于隔离AMQP系统以增强安全性的虚拟主机名
  • API名称:Virtual Host
  • 表达式语言作用域:JVM级别定义的环境变量和系统属性
  • 是否敏感:否
  • 是否必填:否

关系

名称

描述

success

从AMQP队列接收到的所有FlowFile都将路由到该关系

写入属性

名称

描述

amqp$appId

AMQP消息中的应用程序ID字段

amqp$contentEncoding

AMQP消息报告的内容编码

amqp$contentType

AMQP消息报告的内容类型

amqp$headers

AMQP消息中的头部信息。仅当处理器配置为输出此属性时才会添加

<头部键前缀>.<属性>

如果处理器配置为将头部作为属性输出,则每个消息头部都将以此属性名插入

amqp$deliveryMode

消息交付模式的数字指示符

amqp$priority

消息优先级

amqp$correlationId

消息的关联ID

amqp$replyTo

消息的回复地址字段的值

amqp$expiration

消息过期时间

amqp$messageId

消息的唯一ID

amqp$timestamp

消息的时间戳,为自epoch以来的毫秒数

amqp$type

消息类型

amqp$userId

用户ID

amqp$clusterId

AMQP集群的ID

amqp$routingKey

AMQP消息的路由键

amqp$exchange

接收AMQP消息的交换器

点击这里复制本文地址 以上内容由nimo97整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

尼墨宝库 © All Rights Reserved.  蜀ICP备2024111239号-7