Apache NiFi 2.x处理器:ConsumeKafka 2.2.0

Apache NiFi 2.x处理器:ConsumeKafka 2.2.0

经验文章nimo972025-03-18 22:34:1310A+A-

org.apache.nifi | nifi-kafka-nar

描述

通过Apache Kafka Consumer API消费消息。与之互补的用于发送消息的NiFi处理器是PublishKafka。该处理器支持消费Kafka消息,也可选择将其解析为NiFi记录。请注意,目前(在读取记录模式下),该处理器假定从给定分区检索到的所有记录具有相同的模式。在这种模式下,如果拉取的任何Kafka消息无法使用配置的记录读取器或记录写入器进行解析或写入,消息内容将被写入单独的FlowFile,并且该FlowFile将被传输到“parse.failure”关系中。否则,每个FlowFile将被发送到“success”关系,并且单个FlowFile中可能包含许多独立的消息。会添加一个“record.count”属性,以指示FlowFile中包含的消息数量。如果两条Kafka消息具有不同的模式,或者对于“作为属性添加的头部”属性中包含的消息头部具有不同的值,则它们不会被放入同一个FlowFile中。

标签

消费、获取、摄取、引入、Kafka、发布/订阅、记录、主题、avro、csv、json

输入要求

禁止

支持敏感动态属性

属性

自动偏移重置

当找不到与Kafka消费者偏移对应的先前偏移时应用的自动偏移配置,对应Kafka的auto.offset.reset属性。

显示名称

自动偏移重置

描述

当找不到与Kafka消费者偏移对应的先前偏移时应用的自动偏移配置,对应Kafka的auto.offset.reset属性

API名称

auto.offset.reset

默认值

latest

允许值

earliest
latest
none

表达式语言范围

不支持

是否敏感

是否必需

提交偏移

指定此处理器在接收消息后是否应将偏移提交到Kafka。通常,此值应设置为true,以确保接收到的消息不会重复。但是,在某些情况下,我们可能希望避免提交偏移,以便数据可以被处理,然后由PublishKafka进行确认,以提供 Exactly Once(恰好一次)语义。

显示名称

提交偏移

描述

指定此处理器在接收消息后是否应将偏移提交到Kafka。通常,此值应设置为true,以确保接收到的消息不会重复。但是,在某些情况下,我们可能希望避免提交偏移,以便数据可以被处理,然后由PublishKafka进行确认,以提供 Exactly Once(恰好一次)语义

API名称

Commit Offsets

默认值

true

允许值

true
false

表达式语言范围

不支持

是否敏感

是否必需

组ID

与Kafka的group.id属性对应的Kafka消费者组标识符。

显示名称

组ID

描述

与Kafka的group.id属性对应的Kafka消费者组标识符

API名称

Group ID

表达式语言范围

不支持

是否敏感

是否必需

头部编码

读取Kafka记录头部值和写入FlowFile属性时应用的字符编码。

显示名称

头部编码

描述

读取Kafka记录头部值和写入FlowFile属性时应用的字符编码

API名称

Header Encoding

默认值

UTF8

表达式语言范围

不支持

是否敏感

是否必需

头部名称模式

应用于Kafka记录头部名称的正则表达式模式,用于选择要作为FlowFile属性写入的头部值。

显示名称

头部名称模式

描述

应用于Kafka记录头部名称的正则表达式模式,用于选择要作为FlowFile属性写入的头部值

API名称

Header Name Pattern

表达式语言范围

不支持

是否敏感

是否必需

Kafka连接服务

提供与Kafka代理的连接,用于发布Kafka记录。

显示名称

Kafka连接服务

描述

提供与Kafka代理的连接,用于发布Kafka记录

API名称

Kafka Connection Service

服务接口

org.apache.nifi.kafka.service.api.KafkaConnectionService

服务实现

org.apache.nifi.kafka.service.Kafka3ConnectionService

表达式语言范围

不支持

是否敏感

是否必需

键属性编码

配置的包含Kafka记录键的FlowFile属性值的编码。

显示名称

键属性编码

描述

配置的包含Kafka记录键的FlowFile属性值的编码

API名称

Key Attribute Encoding

默认值

utf-8

允许值

UTF-8编码
十六进制编码
不作为属性添加键

表达式语言范围

不支持

是否敏感

是否必需

依赖项

输出策略设置为[USE_VALUE]中的任何一个

键格式

指定在输出的FlowFile中如何表示Kafka记录键。

显示名称

键格式

描述

指定在输出的FlowFile中如何表示Kafka记录键

API名称

Key Format

默认值

bytearray

允许值

字符串
字节数组
记录

表达式语言范围

不支持

是否敏感

是否必需

依赖项

输出策略设置为[USE_WRAPPER]中的任何一个

键记录读取器

用于将Kafka记录键解析为记录的记录读取器。

显示名称

键记录读取器

描述

用于将Kafka记录键解析为记录的记录读取器

API名称

Key Record Reader

服务接口

org.apache.nifi.serialization.RecordReaderFactory

服务实现

org.apache.nifi.avro.AvroReader
org.apache.nifi.cef.CEFReader
org.apache.nifi.csv.CSVReader
org.apache.nifi.excel.ExcelReader
org.apache.nifi.grok.GrokReader
org.apache.nifi.json.JsonPathReader
org.apache.nifi.json.JsonTreeReader
org.apache.nifi.services.protobuf.ProtobufReader
org.apache.nifi.lookup.ReaderLookup
org.apache.nifi.record.script.ScriptedReader
org.apache.nifi.syslog.Syslog5424Reader
org.apache.nifi.syslog.SyslogReader
org.apache.nifi.windowsevent.WindowsEventLogReader
org.apache.nifi.xml.XMLReader
org.apache.nifi.yaml.YamlTreeReader

表达式语言范围

不支持

是否敏感

是否必需

依赖项

键格式设置为[record]中的任何一个

最大未提交时间

指定在必须提交偏移之前允许经过的最长时间。此值会影响偏移提交的频率。提交偏移的频率越低,吞吐量越高,但在重新平衡或JVM重启(提交之间)的情况下,潜在的数据重复窗口也会越大。此值还与最大拉取记录数和消息分隔符的使用相关。使用消息分隔符时,我们可以拥有比不使用时多得多的未提交消息,因为在内存中需要跟踪的内容要少得多。

显示名称

最大未提交时间

描述

指定在必须提交偏移之前允许经过的最长时间。此值会影响偏移提交的频率。提交偏移的频率越低,吞吐量越高,但在重新平衡或JVM重启(提交之间)的情况下,潜在的数据重复窗口也会越大。此值还与最大拉取记录数和消息分隔符的使用相关。使用消息分隔符时,我们可以拥有比不使用时多得多的未提交消息,因为在内存中需要跟踪的内容要少得多

API名称

Max Uncommitted Time

默认值

1秒

表达式语言范围

不支持

是否敏感

是否必需

依赖项

提交偏移设置为[true]中的任何一个

消息分隔符

由于KafkaConsumer以批次接收消息,此处理器可以选择输出包含给定主题和分区的单个批次中所有Kafka消息的FlowFile,此属性允许您提供一个字符串(解释为UTF-8)用于分隔多个Kafka消息。这是一个可选属性,如果未提供,则每次触发时,接收到的每个Kafka消息都将生成一个单独的FlowFile。要输入特殊字符(如“换行符”),请根据操作系统使用CTRL+Enter或Shift+Enter。

显示名称

消息分隔符

描述

由于KafkaConsumer以批次接收消息,此处理器可以选择输出包含给定主题和分区的单个批次中所有Kafka消息的FlowFile,此属性允许您提供一个字符串(解释为UTF-8)用于分隔多个Kafka消息。这是一个可选属性,如果未提供,则每次触发时,接收到的每个Kafka消息都将生成一个单独的FlowFile。要输入特殊字符(如“换行符”),请根据操作系统使用CTRL+Enter或Shift+Enter

API名称

Message Demarcator

表达式语言范围

不支持

是否敏感

是否必需

依赖项

处理策略设置为[DEMARCATOR]中的任何一个

输出策略

用于将Kafka记录输出到FlowFile记录的格式。

显示名称

输出策略

描述

用于将Kafka记录输出到FlowFile记录的格式

API名称

Output Strategy

默认值

USE_VALUE

允许值

使用内容作为值
使用包装器

表达式语言范围

不支持

是否敏感

是否必需

依赖项

处理策略设置为[RECORD]中的任何一个

处理策略

处理Kafka记录并将序列化输出写入FlowFile的策略。

显示名称

处理策略

描述

处理Kafka记录并将序列化输出写入FlowFile的策略

API名称

Processing Strategy

默认值

FLOW_FILE

允许值

FLOW_FILE
DEMARCATOR
RECORD

表达式语言范围

不支持

是否敏感

是否必需

记录读取器

用于处理传入Kafka消息的记录读取器。

显示名称

记录读取器

描述

用于处理传入Kafka消息的记录读取器

API名称

Record Reader

服务接口

org.apache.nifi.serialization.RecordReaderFactory

服务实现

org.apache.nifi.avro.AvroReader
org.apache.nifi.cef.CEFReader
org.apache.nifi.csv.CSVReader
org.apache.nifi.excel.ExcelReader
org.apache.nifi.grok.GrokReader
org.apache.nifi.json.JsonPathReader
org.apache.nifi.json.JsonTreeReader
org.apache.nifi.services.protobuf.ProtobufReader
org.apache.nifi.lookup.ReaderLookup
org.apache.nifi.record.script.ScriptedReader
org.apache.nifi.syslog.Syslog5424Reader
org.apache.nifi.syslog.SyslogReader
org.apache.nifi.windowsevent.WindowsEventLogReader
org.apache.nifi.xml.XMLReader
org.apache.nifi.yaml.YamlTreeReader

表达式语言范围

不支持

是否敏感

是否必需

依赖项

处理策略设置为[RECORD]中的任何一个

记录写入器

用于序列化输出的FlowFile的记录写入器。

显示名称

记录写入器

描述

用于序列化输出的FlowFile的记录写入器

API名称

Record Writer

服务接口

org.apache.nifi.serialization.RecordSetWriterFactory

服务实现

org.apache.nifi.avro.AvroRecordSetWriter
org.apache.nifi.csv.CSVRecordSetWriter
org.apache.nifi.text.FreeFormTextRecordSetWriter
org.apache.nifi.json.JsonRecordSetWriter
org.apache.nifi.lookup.RecordSetWriterLookup
org.apache.nifi.record.script.ScriptedRecordSetWriter
org.apache.nifi.xml.XMLRecordSetWriter

表达式语言范围

不支持

是否敏感

是否必需

依赖项

处理策略设置为[RECORD]中的任何一个

按键分隔

启用此属性后,只有当两条Kafka消息具有相同的键时,它们才会被添加到同一个FlowFile中。

显示名称

按键分隔

描述

启用此属性后,只有当两条Kafka消息具有相同的键时,它们才会被添加到同一个FlowFile中

API名称

Separate By Key

默认值

false

允许值

true
false

依赖项

消息分隔符设置为任何指定值

主题格式

指定提供的主题是逗号分隔的名称列表还是单个正则表达式。

显示名称

主题格式

描述

指定提供的主题是逗号分隔的名称列表还是单个正则表达式

API名称

Topic Format

默认值

names

允许值

names
pattern

表达式语言范围

不支持

是否敏感

是否必需

主题

处理器从中消费Kafka记录的Kafka主题的名称或模式。如果用逗号分隔,可以提供多个。

显示名称

主题

描述

处理器从中消费Kafka记录的Kafka主题的名称或模式。如果用逗号分隔,可以提供多个

API名称

Topics

表达式语言范围

JVM级别定义的环境变量和系统属性

是否敏感

是否必需

关系

名称

描述

success

包含一个或多个序列化Kafka记录的FlowFile

写入属性

名称

描述

record.count

接收到的记录数

mime.type

由配置的记录写入器提供的MIME类型

kafka.count

如果有多个消息,则为写入的消息数

kafka.key

如果存在且为单个消息,则为消息的键。键的编码方式取决于“键属性编码”属性的值

kafka.offset

消息在主题分区中的偏移量

kafka.timestamp

消息在主题分区中的时间戳

kafka.partition

消息或消息包来自的主题分区

kafka.topic

消息或消息包来自的主题

kafka.tombstone

如果消费的消息是墓碑消息,则设置为true

另请参阅

org.apache.nifi.kafka.processors.PublishKafka

点击这里复制本文地址 以上内容由nimo97整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

尼墨宝库 © All Rights Reserved.  蜀ICP备2024111239号-7