Apache NiFi 2.x处理器:PublishKafka 2.2.0

Apache NiFi 2.x处理器:PublishKafka 2.2.0

经验文章nimo972025-03-16 15:50:2315A+A-

org.apache.nifi | nifi-kafka-nar

描述

使用Kafka Producer API将FlowFile的内容作为消息或单个记录发送到Apache Kafka。要发送的消息可以是单个FlowFile,也可以使用用户指定的分隔符(如换行符)进行分隔,或者是配置的记录读取器能够读取的面向记录的数据。用于获取消息的互补NiFi处理器是ConsumeKafka。在使用
PublishStrategy.USE_WRAPPER时,要生成Kafka墓碑消息,只需将记录的值设置为“null”即可。

标签

Apache、Kafka、消息、发布/订阅、放置、记录、发送、avro、csv、json、日志

输入要求

必填

支持敏感动态属性

属性

交付保证

指定确保消息发送到Kafka的要求。对应Kafka客户端的acks属性。

显示名称

交付保证

描述

指定确保消息发送到Kafka的要求。对应Kafka客户端的acks属性。

API名称

acks

默认值

all

允许值

保证复制交付
保证单节点交付
尽力而为

表达式语言作用域

不支持

敏感

必填

压缩类型

指定发送到Kafka的记录的压缩策略。对应Kafka客户端的compression.type属性。

显示名称

压缩类型

描述

指定发送到Kafka的记录的压缩策略。对应Kafka客户端的compression.type属性。

API名称

compression.type

默认值

none

允许值

none
gzip
snappy
lz4

表达式语言作用域

不支持

敏感

必填

失败策略

指定处理器在无法将数据发布到Kafka时如何处理FlowFile。

显示名称

失败策略

描述

指定处理器在无法将数据发布到Kafka时如何处理FlowFile。

API名称

Failure Strategy

默认值

Route to Failure

允许值

路由到失败
回滚

表达式语言作用域

不支持

敏感

必填

FlowFile属性头模式

一个正则表达式,用于与所有FlowFile属性名称进行匹配。任何名称与该模式匹配的属性都将作为消息头添加到Kafka消息中。如果未指定,则不会将FlowFile属性添加为消息头。

显示名称

FlowFile属性头模式

描述

一个正则表达式,用于与所有FlowFile属性名称进行匹配。任何名称与该模式匹配的属性都将作为消息头添加到Kafka消息中。如果未指定,则不会将FlowFile属性添加为消息头。

API名称

FlowFile Attribute Header Pattern

表达式语言作用域

不支持

敏感

必填

依赖项

发布策略设置为[USE_VALUE]中的任意一个。

头编码

对于作为Kafka记录头添加的任何属性,此属性指示用于序列化消息头的字符编码。

显示名称

头编码

描述

对于作为Kafka记录头添加的任何属性,此属性指示用于序列化消息头的字符编码。

API名称

Header Encoding

默认值

UTF-8

表达式语言作用域

不支持

敏感

必填

依赖项

FlowFile属性头模式设置为任何指定值。

Kafka连接服务

为发布Kafka记录提供到Kafka代理的连接。

显示名称

Kafka连接服务

描述

为发布Kafka记录提供到Kafka代理的连接。

API名称

Kafka Connection Service

服务接口

org.apache.nifi.kafka.service.api.KafkaConnectionService

服务实现

org.apache.nifi.kafka.service.Kafka3ConnectionService

表达式语言作用域

不支持

敏感

必填

Kafka键

用于消息的键。如果未指定,则在FlowFile属性“kafka.key”存在时,将其用作消息键。请注意,同时设置Kafka键和分隔符可能会导致许多Kafka消息具有相同的键。通常这不是问题,因为Kafka不强制或假设消息和键的唯一性。但是,同时设置分隔符和Kafka键会在Kafka上存在数据丢失的风险。在Kafka进行主题压缩时,消息将基于此键进行去重。

显示名称

Kafka键

描述

用于消息的键。如果未指定,则在FlowFile属性“kafka.key”存在时,将其用作消息键。请注意,同时设置Kafka键和分隔符可能会导致许多Kafka消息具有相同的键。通常这不是问题,因为Kafka不强制或假设消息和键的唯一性。但是,同时设置分隔符和Kafka键会在Kafka上存在数据丢失的风险。在Kafka进行主题压缩时,消息将基于此键进行去重。

API名称

Kafka Key

表达式语言作用域

环境变量和FlowFile属性

敏感

必填

Kafka键属性编码

发出的FlowFile具有名为“kafka.key”的属性。此属性规定了该属性的值应如何编码。

显示名称

Kafka键属性编码

描述

发出的FlowFile具有名为“kafka.key”的属性。此属性规定了该属性的值应如何编码。

API名称

Kafka Key Attribute Encoding

默认值

utf-8

允许值

UTF-8编码
十六进制编码
不作为属性添加键

表达式语言作用域

不支持

敏感

必填

依赖项

发布策略设置为[USE_WRAPPER]中的任意一个。

最大请求大小

请求的最大字节数。对应Kafka客户端的max.request.size属性。

显示名称

最大请求大小

描述

请求的最大字节数。对应Kafka客户端的max.request.size属性。

API名称

max.request.size

默认值

1 MB

表达式语言作用域

不支持

敏感

必填

消息分隔符

指定用于在单个FlowFile中分隔多个消息的字符串(解释为UTF-8)。如果未指定,则FlowFile的整个内容将作为单个消息使用。如果指定,则FlowFile的内容将根据此分隔符进行拆分,并将每个部分作为单独的Kafka消息发送。要输入特殊字符(如换行符),请根据操作系统使用CTRL+Enter或Shift+Enter。

显示名称

消息分隔符

描述

指定用于在单个FlowFile中分隔多个消息的字符串(解释为UTF-8)。如果未指定,则FlowFile的整个内容将作为单个消息使用。如果指定,则FlowFile的内容将根据此分隔符进行拆分,并将每个部分作为单独的Kafka消息发送。要输入特殊字符(如换行符),请根据操作系统使用CTRL+Enter或Shift+Enter。

API名称

Message Demarcator

表达式语言作用域

环境变量和FlowFile属性

敏感

必填

消息键字段

输入记录中应作为Kafka消息键使用的字段名称。

显示名称

消息键字段

描述

输入记录中应作为Kafka消息键使用的字段名称。

API名称

Message Key Field

表达式语言作用域

环境变量和FlowFile属性

敏感

必填

依赖项

发布策略设置为[USE_VALUE]中的任意一个。

分区

指定记录的Kafka分区目标。

显示名称

分区

描述

指定记录的Kafka分区目标。

API名称

partition

表达式语言作用域

环境变量和FlowFile属性

敏感

必填

分区器类

指定用于计算消息分区ID的类。对应Kafka客户端的partitioner.class属性。

显示名称

分区器类

描述

指定用于计算消息分区ID的类。对应Kafka客户端的partitioner.class属性。

API名称

partitioner.class

默认值

org.apache.kafka.clients.producer.internals.DefaultPartitioner

允许值

RoundRobinPartitioner
DefaultPartitioner
Expression Language Partitioner

表达式语言作用域

不支持

敏感

必填

发布策略

用于将传入的FlowFile记录发布到Kafka的格式。

显示名称

发布策略

描述

用于将传入的FlowFile记录发布到Kafka的格式。

API名称

Publish Strategy

默认值

USE_VALUE

允许值

使用内容作为记录值
使用包装器

表达式语言作用域

不支持

敏感

必填

依赖项

记录读取器设置为任何指定值。

记录键写入器

用于输出FlowFile的记录键写入器。

显示名称

记录键写入器

描述

用于输出FlowFile的记录键写入器。

API名称

Record Key Writer

服务接口

org.apache.nifi.serialization.RecordSetWriterFactoryService

服务实现

org.apache.nifi.avro.AvroRecordSetWriter
org.apache.nifi.csv.CSVRecordSetWriter
org.apache.nifi.text.FreeFormTextRecordSetWriter
org.apache.nifi.json.JsonRecordSetWriter
org.apache.nifi.lookup.RecordSetWriterLookup
org.apache.nifi.record.script.ScriptedRecordSetWriter
org.apache.nifi.xml.XMLRecordSetWriter

表达式语言作用域

不支持

敏感

必填

依赖项

发布策略设置为[USE_WRAPPER]中的任意一个。

记录元数据策略

指定记录的元数据(主题和分区)应来自记录的元数据字段,还是来自配置的主题名称和分区/分区器类属性。

显示名称

记录元数据策略

描述

指定记录的元数据(主题和分区)应来自记录的元数据字段,还是来自配置的主题名称和分区/分区器类属性。

API名称

Record Metadata Strategy

默认值

FROM_PROPERTIES

允许值

从记录获取元数据
使用配置值

表达式语言作用域

不支持

敏感

必填

依赖项

发布策略设置为[USE_WRAPPER]中的任意一个。

记录读取器

用于传入FlowFile的记录读取器。

显示名称

记录读取器

描述

用于传入FlowFile的记录读取器。

API名称

Record Reader

服务接口

org.apache.nifi.serialization.RecordReaderFactoryService

服务实现

org.apache.nifi.avro.AvroReader
org.apache.nifi.cef.CEFReader
org.apache.nifi.csv.CSVReader
org.apache.nifi.excel.ExcelReader
org.apache.nifi.grok.GrokReader
org.apache.nifi.json.JsonPathReader
org.apache.nifi.json.JsonTreeReader
org.apache.nifi.services.protobuf.ProtobufReader
org.apache.nifi.lookup.ReaderLookup
org.apache.nifi.record.script.ScriptedReader
org.apache.nifi.syslog.Syslog5424Reader
org.apache.nifi.syslog.SyslogReader
org.apache.nifi.windowsevent.WindowsEventLogReader
org.apache.nifi.xml.XMLReader
org.apache.nifi.yaml.YamlTreeReader

表达式语言作用域

不支持

敏感

必填

记录写入器

用于在发送到Kafka之前序列化数据的记录写入器。

显示名称

记录写入器

描述

用于在发送到Kafka之前序列化数据的记录写入器。

API名称

Record Writer

服务接口

org.apache.nifi.serialization.RecordSetWriterFactoryService

服务实现

org.apache.nifi.avro.AvroRecordSetWriter
org.apache.nifi.csv.CSVRecordSetWriter
org.apache.nifi.text.FreeFormTextRecordSetWriter
org.apache.nifi.json.JsonRecordSetWriter
org.apache.nifi.lookup.RecordSetWriterLookup
org.apache.nifi.record.script.ScriptedRecordSetWriter
org.apache.nifi.xml.XMLRecordSetWriter

表达式语言作用域

不支持

敏感

必填

主题名称

处理器向其发布Kafka记录的Kafka主题的名称。

显示名称

主题名称

描述

处理器向其发布Kafka记录的Kafka主题的名称。

API名称

Topic Name

表达式语言作用域

环境变量和FlowFile属性

敏感

必填

事务ID前缀

指定KafkaProducer配置的transactional.id将是一个生成的UUID,并将使用配置的字符串作为前缀。

显示名称

事务ID前缀

描述

指定KafkaProducer配置的transactional.id将是一个生成的UUID,并将使用配置的字符串作为前缀。

API名称

Transactional ID Prefix

表达式语言作用域

在JVM级别定义的环境变量和系统属性

敏感

必填

依赖项

事务启用设置为[true]中的任意一个。

事务启用

指定在与Kafka通信时是否提供事务保证。如果在发送数据到Kafka时出现问题,并且此属性设置为false,则已发送到Kafka的消息将继续传递给消费者。如果设置为true,则Kafka事务将回滚,以便这些消息不会提供给消费者。将此属性设置为true要求将[交付保证]属性设置为[保证复制交付]。

显示名称

事务启用

描述

指定在与Kafka通信时是否提供事务保证。如果在发送数据到Kafka时出现问题,并且此属性设置为false,则已发送到Kafka的消息将继续传递给消费者。如果设置为true,则Kafka事务将回滚,以便这些消息不会提供给消费者。将此属性设置为true要求将[交付保证]属性设置为[保证复制交付]。

API名称

Transactions Enabled

默认值

true

允许值

true
false

表达式语言作用域

不支持

敏感

必填

关系

名称

描述

success

所有内容都已发送到Kafka的FlowFile。

failure

任何无法发送到Kafka的FlowFile都将被路由到此关系。

读取属性

名称

描述

kafka.tombstone

如果此属性设置为“true”,并且处理器未配置分隔符,且FlowFile的内容为空,则将向Kafka发送一个零字节的墓碑消息。

写入属性

名称

描述

msg.count

为该FlowFile发送到Kafka的消息数量。此属性仅添加到路由到success的FlowFile中。

另请参阅

org.apache.nifi.kafka.processors.ConsumeKafka

点击这里复制本文地址 以上内容由nimo97整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

尼墨宝库 © All Rights Reserved.  蜀ICP备2024111239号-7