Apache NiFi 2.x处理器:ConsumeAMQP 2.2.0
摘要
AMQP属性、配置详情
包
org.apache.nifi | nifi-amqp-nar
描述
使用AMQP 0.9.1协议从AMQP代理服务器消费AMQP消息。从AMQP代理服务器接收到的每一条消息,都会作为独立的FlowFile发送到“success”关系。
标签
amqp、消费、获取、消息、rabbit、接收
输入要求
禁止
支持敏感动态属性
否
ConsumeAMQP 2.2.0的更多详细信息
ConsumeAMQP
摘要
该处理器从AMQP消息队列消费消息,并将其转换为FlowFile,以便路由到流中的下一个组件。撰写本文档时,支持的AMQP协议版本为v0.9.1。
该组件基于RabbitMQ客户端API。以下指南和教程可能有助于你复习一些AMQP基础知识。
该处理器执行两项操作。它从消费的AMQP消息(包括消息体和属性)中提取信息来构建FlowFile。一旦消息被消费,就会构建一个FlowFile。消息体被写入FlowFile,其
com.rabbitmq.client.AMQP.BasicProperties属性会作为FlowFile的属性进行传输。AMQP属性名称会加上amqp$前缀。
AMQP属性
以下是消息可能携带的标准AMQP属性列表:(“amqp
contentEncoding”、“amqp
deliveryMode”、“amqp
correlationId”、“amqp
expiration”、“amqp
timestamp”、“amqp
userId”、“amqp
clusterId”、“amqp$routingKey” )
配置详情
撰写本文档时,它仅定义了适用于大多数情况的基本配置属性。随着该组件的发展,以后会定义其他属性。配置PublishAMQP:
- 队列 - [必填] 用于检索消息的AMQP队列名称。通常由管理员提供(例如,‘amq.direct’) 。
- 主机名 - [必填] AMQP代理服务器运行所在的主机名。通常由管理员提供(例如,‘myhost.com’)。默认值为’localhost’。
- 端口 - [必填] AMQP代理服务器运行的端口号。通常由管理员提供(例如, '2453’)。默认值为‘5672’。
- 用户名 - [必填] 连接到AMQP代理服务器的用户名。通常由管理员提供(例如,‘me’)。默认值为‘guest’。
- 密码 - [必填] 与用户名一起用于连接AMQP代理服务器的密码。通常由管理员提供。默认值为‘guest’。
- 使用证书认证 - [可选] 使用SSL证书的通用名称进行认证,而不是使用用户名/密码。此选项只能与SSL结合使用。默认值为‘false’。
- 虚拟主机 - [可选] 虚拟主机名,用于隔离AMQP系统以增强安全性。有关虚拟主机的更多详细信息,请参阅此博客。
属性
AMQP版本
AMQP版本。目前仅支持AMQP v0.9.1。
- 显示名称:AMQP版本
- 描述:AMQP版本。目前仅支持AMQP v0.9.1
- API名称:AMQP版本
- 默认值:0.9.1
- 允许值:0.9.1
- 表达式语言作用域:不支持
- 是否敏感:否
- 是否必填:是
自动确认消息
如果为false(非自动确认),在将FlowFile传输到success并提交NiFi会话后,处理器将确认消息。非自动确认模式提供“至少一次”的交付语义。如果为true(自动确认),发送到AMQP客户端的消息在发出后将由AMQP代理服务器自动确认。这通常会提供更好的吞吐量,但在AMQP代理服务器、NiFi或处理器重启/崩溃时,也会导致消息丢失。自动确认模式提供“至多一次”的交付语义,仅在允许丢失消息的情况下推荐使用。
- 显示名称:自动确认消息
- 描述:如果为false(非自动确认),在将FlowFile传输到success并提交NiFi会话后,处理器将确认消息。非自动确认模式提供“至少一次”的交付语义。如果为true(自动确认),发送到AMQP客户端的消息在发出后将由AMQP代理服务器自动确认。这通常会提供更好的吞吐量,但在AMQP代理服务器、NiFi或处理器重启/崩溃时,也会导致消息丢失。自动确认模式提供“至多一次”的交付语义,仅在允许丢失消息的情况下推荐使用
- API名称:auto.acknowledge
- 默认值:false
- 允许值:true、false
- 表达式语言作用域:不支持
- 是否敏感:否
- 是否必填:是
批次大小
单个会话中应处理的最大消息数。一旦接收到这么多消息(或者没有更多可用消息),接收到的消息将被传输到“success”关系,并向AMQP代理服务器确认这些消息。将此值设置得较大可能会提高性能,特别是对于非常小的消息,但在NiFi突然重启时,也可能导致更多消息重复。
- 显示名称:批次大小
- 描述:单个会话中应处理的最大消息数。一旦接收到这么多消息(或者没有更多可用消息),接收到的消息将被传输到“success”关系,并向AMQP代理服务器确认这些消息。将此值设置得较大可能会提高性能,特别是对于非常小的消息,但在NiFi突然重启时,也可能导致更多消息重复
- API名称:batch.size
- 默认值:10
- 表达式语言作用域:不支持
- 是否敏感:否
- 是否必填:是
代理服务器
以
- 显示名称:代理服务器
- 描述:以
: 格式(例如,localhost:5672)列出的已知AMQP代理服务器的逗号分隔列表。如果设置了此属性,则忽略“主机名”和“端口”属性。列表中应仅包含来自同一AMQP集群的主机 - API名称:Brokers
- 表达式语言作用域:JVM级别定义的环境变量和系统属性
- 是否敏感:否
- 是否必填:否
使用客户端证书认证
使用SSL证书而不是用户名/密码进行认证。
- 显示名称:使用客户端证书认证
- 描述:使用SSL证书而不是用户名/密码进行认证
- API名称:cert-authentication
- 默认值:false
- 允许值:true、false
- 表达式语言作用域:不支持
- 是否敏感:否
- 是否必填:否
头部输出格式
定义如何输出接收到消息的头部信息。
- 显示名称:头部输出格式
- 描述:定义如何输出接收到消息的头部信息
- API名称:header.format
- 默认值:逗号分隔字符串
- 允许值:逗号分隔字符串、JSON字符串、FlowFile属性
- 表达式语言作用域:不支持
- 是否敏感:否
- 是否必填:是
头部键前缀
添加到FlowFile属性中的头部键的前缀文本。处理器会在该属性值后附加一个“.”。
- 显示名称:头部键前缀
- 描述:添加到FlowFile属性中的头部键的前缀文本。处理器会在该属性值后附加一个“.”
- API名称:header.key.prefix
- 默认值:consume.amqp
- 表达式语言作用域:不支持
- 是否敏感:否
- 是否必填:是
头部分隔符
在字符串中用于分隔头部键值对的字符。该值必须为单个字符。
- 显示名称:头部分隔符
- 描述:在字符串中用于分隔头部键值对的字符。该值必须为单个字符
- API名称:header.separator
- 默认值:,
- 表达式语言作用域:不支持
- 是否敏感:否
- 是否必填:否
- 依赖项:头部输出格式设置为[逗号分隔字符串]中的任意一个
主机名
AMQP代理服务器的网络地址(例如,localhost)。如果设置了“代理服务器”属性,则忽略此属性。
- 显示名称:主机名
- 描述:AMQP代理服务器的网络地址(例如,localhost)。如果设置了“代理服务器”属性,则忽略此属性
- API名称:Host Name
- 默认值:localhost
- 表达式语言作用域:JVM级别定义的环境变量和系统属性
- 是否敏感:否
- 是否必填:否
密码
用于认证和授权的密码。
- 显示名称:密码
- 描述:用于认证和授权的密码
- API名称:Password
- 表达式语言作用域:不支持
- 是否敏感:是
- 是否必填:否
端口
标识AMQP代理服务器端口的数字值(例如,5671)。如果设置了“代理服务器”属性,则忽略此属性。
- 显示名称:端口
- 描述:标识AMQP代理服务器端口的数字值(例如,5671)。如果设置了“代理服务器”属性,则忽略此属性
- API名称:Port
- 默认值:5672
- 表达式语言作用域:JVM级别定义的环境变量和系统属性
- 是否敏感:否
- 是否必填:否
预取计数
消费者的最大未确认消息数。如果消费者有这么多未确认消息,AMQP代理服务器在消费者确认已交付的部分消息之前,将不再发送新消息。允许值范围:0到65535。0表示无限制。
- 显示名称:预取计数
- 描述:消费者的最大未确认消息数。如果消费者有这么多未确认消息,AMQP代理服务器在消费者确认已交付的部分消息之前,将不再发送新消息。允许值范围:0到65535。0表示无限制
- API名称:prefetch.count
- 默认值:0
- 表达式语言作用域:不支持
- 是否敏感:否
- 是否必填:是
队列
要从中消费消息的现有AMQP队列名称。通常由AMQP管理员预先定义。
- 显示名称:队列
- 描述:要从中消费消息的现有AMQP队列名称。通常由AMQP管理员预先定义
- API名称:Queue
- 表达式语言作用域:不支持
- 是否敏感:否
- 是否必填:是
删除花括号
如果选择“true删除花括号”,则会自动删除头部中的花括号。
- 显示名称:删除花括号
- 描述:如果选择“true删除花括号”,则会自动删除头部中的花括号
- API名称:remove.curly.braces
- 默认值:False
- 允许值:True、False
- 表达式语言作用域:不支持
- 是否敏感:否
- 是否必填:否
- 依赖项:头部输出格式设置为[逗号分隔字符串]中的任意一个
SSL上下文服务
用于为TLS/SSL连接提供客户端证书信息的SSL上下文服务。
- 显示名称:SSL上下文服务
- 描述:用于为TLS/SSL连接提供客户端证书信息的SSL上下文服务
- API名称:ssl-context-service
- 服务接口:org.apache.nifi.ssl.SSLContextProviderService
- 服务实现:org.apache.nifi.ssl.PEMEncodedSSLContextProvider、org.apache.nifi.ssl.StandardRestrictedSSLContextService、org.apache.nifi.ssl.StandardSSLContextService
- 表达式语言作用域:不支持
- 是否敏感:否
- 是否必填:否
用户名
用于认证和授权的用户名。
- 显示名称:用户名
- 描述:用于认证和授权的用户名
- API名称:User Name
- 表达式语言作用域:JVM级别定义的环境变量和系统属性
- 是否敏感:否
- 是否必填:否
虚拟主机
用于隔离AMQP系统以增强安全性的虚拟主机名。
- 显示名称:虚拟主机
- 描述:用于隔离AMQP系统以增强安全性的虚拟主机名
- API名称:Virtual Host
- 表达式语言作用域:JVM级别定义的环境变量和系统属性
- 是否敏感:否
- 是否必填:否
关系
名称 | 描述 |
success | 从AMQP队列接收到的所有FlowFile都将路由到该关系 |
写入属性
名称 | 描述 |
amqp$appId | AMQP消息中的应用程序ID字段 |
amqp$contentEncoding | AMQP消息报告的内容编码 |
amqp$contentType | AMQP消息报告的内容类型 |
amqp$headers | AMQP消息中的头部信息。仅当处理器配置为输出此属性时才会添加 |
<头部键前缀>.<属性> | 如果处理器配置为将头部作为属性输出,则每个消息头部都将以此属性名插入 |
amqp$deliveryMode | 消息交付模式的数字指示符 |
amqp$priority | 消息优先级 |
amqp$correlationId | 消息的关联ID |
amqp$replyTo | 消息的回复地址字段的值 |
amqp$expiration | 消息过期时间 |
amqp$messageId | 消息的唯一ID |
amqp$timestamp | 消息的时间戳,为自epoch以来的毫秒数 |
amqp$type | 消息类型 |
amqp$userId | 用户ID |
amqp$clusterId | AMQP集群的ID |
amqp$routingKey | AMQP消息的路由键 |
amqp$exchange | 接收AMQP消息的交换器 |