Apache NiFi 2.x处理器:ConsumeKafka 2.2.0
包
org.apache.nifi | nifi-kafka-nar
描述
通过Apache Kafka Consumer API消费消息。与之互补的用于发送消息的NiFi处理器是PublishKafka。该处理器支持消费Kafka消息,也可选择将其解析为NiFi记录。请注意,目前(在读取记录模式下),该处理器假定从给定分区检索到的所有记录具有相同的模式。在这种模式下,如果拉取的任何Kafka消息无法使用配置的记录读取器或记录写入器进行解析或写入,消息内容将被写入单独的FlowFile,并且该FlowFile将被传输到“parse.failure”关系中。否则,每个FlowFile将被发送到“success”关系,并且单个FlowFile中可能包含许多独立的消息。会添加一个“record.count”属性,以指示FlowFile中包含的消息数量。如果两条Kafka消息具有不同的模式,或者对于“作为属性添加的头部”属性中包含的消息头部具有不同的值,则它们不会被放入同一个FlowFile中。
标签
消费、获取、摄取、引入、Kafka、发布/订阅、记录、主题、avro、csv、json
输入要求
禁止
支持敏感动态属性
否
属性
自动偏移重置
当找不到与Kafka消费者偏移对应的先前偏移时应用的自动偏移配置,对应Kafka的auto.offset.reset属性。
显示名称 | 自动偏移重置 |
描述 | 当找不到与Kafka消费者偏移对应的先前偏移时应用的自动偏移配置,对应Kafka的auto.offset.reset属性 |
API名称 | auto.offset.reset |
默认值 | latest |
允许值 | earliest |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
提交偏移
指定此处理器在接收消息后是否应将偏移提交到Kafka。通常,此值应设置为true,以确保接收到的消息不会重复。但是,在某些情况下,我们可能希望避免提交偏移,以便数据可以被处理,然后由PublishKafka进行确认,以提供 Exactly Once(恰好一次)语义。
显示名称 | 提交偏移 |
描述 | 指定此处理器在接收消息后是否应将偏移提交到Kafka。通常,此值应设置为true,以确保接收到的消息不会重复。但是,在某些情况下,我们可能希望避免提交偏移,以便数据可以被处理,然后由PublishKafka进行确认,以提供 Exactly Once(恰好一次)语义 |
API名称 | Commit Offsets |
默认值 | true |
允许值 | true |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
组ID
与Kafka的group.id属性对应的Kafka消费者组标识符。
显示名称 | 组ID |
描述 | 与Kafka的group.id属性对应的Kafka消费者组标识符 |
API名称 | Group ID |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
头部编码
读取Kafka记录头部值和写入FlowFile属性时应用的字符编码。
显示名称 | 头部编码 |
描述 | 读取Kafka记录头部值和写入FlowFile属性时应用的字符编码 |
API名称 | Header Encoding |
默认值 | UTF8 |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
头部名称模式
应用于Kafka记录头部名称的正则表达式模式,用于选择要作为FlowFile属性写入的头部值。
显示名称 | 头部名称模式 |
描述 | 应用于Kafka记录头部名称的正则表达式模式,用于选择要作为FlowFile属性写入的头部值 |
API名称 | Header Name Pattern |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 否 |
Kafka连接服务
提供与Kafka代理的连接,用于发布Kafka记录。
显示名称 | Kafka连接服务 |
描述 | 提供与Kafka代理的连接,用于发布Kafka记录 |
API名称 | Kafka Connection Service |
服务接口 | org.apache.nifi.kafka.service.api.KafkaConnectionService |
服务实现 | org.apache.nifi.kafka.service.Kafka3ConnectionService |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
键属性编码
配置的包含Kafka记录键的FlowFile属性值的编码。
显示名称 | 键属性编码 |
描述 | 配置的包含Kafka记录键的FlowFile属性值的编码 |
API名称 | Key Attribute Encoding |
默认值 | utf-8 |
允许值 | UTF-8编码 |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
依赖项 | 输出策略设置为[USE_VALUE]中的任何一个 |
键格式
指定在输出的FlowFile中如何表示Kafka记录键。
显示名称 | 键格式 |
描述 | 指定在输出的FlowFile中如何表示Kafka记录键 |
API名称 | Key Format |
默认值 | bytearray |
允许值 | 字符串 |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
依赖项 | 输出策略设置为[USE_WRAPPER]中的任何一个 |
键记录读取器
用于将Kafka记录键解析为记录的记录读取器。
显示名称 | 键记录读取器 |
描述 | 用于将Kafka记录键解析为记录的记录读取器 |
API名称 | Key Record Reader |
服务接口 | org.apache.nifi.serialization.RecordReaderFactory |
服务实现 | org.apache.nifi.avro.AvroReader |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
依赖项 | 键格式设置为[record]中的任何一个 |
最大未提交时间
指定在必须提交偏移之前允许经过的最长时间。此值会影响偏移提交的频率。提交偏移的频率越低,吞吐量越高,但在重新平衡或JVM重启(提交之间)的情况下,潜在的数据重复窗口也会越大。此值还与最大拉取记录数和消息分隔符的使用相关。使用消息分隔符时,我们可以拥有比不使用时多得多的未提交消息,因为在内存中需要跟踪的内容要少得多。
显示名称 | 最大未提交时间 |
描述 | 指定在必须提交偏移之前允许经过的最长时间。此值会影响偏移提交的频率。提交偏移的频率越低,吞吐量越高,但在重新平衡或JVM重启(提交之间)的情况下,潜在的数据重复窗口也会越大。此值还与最大拉取记录数和消息分隔符的使用相关。使用消息分隔符时,我们可以拥有比不使用时多得多的未提交消息,因为在内存中需要跟踪的内容要少得多 |
API名称 | Max Uncommitted Time |
默认值 | 1秒 |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
依赖项 | 提交偏移设置为[true]中的任何一个 |
消息分隔符
由于KafkaConsumer以批次接收消息,此处理器可以选择输出包含给定主题和分区的单个批次中所有Kafka消息的FlowFile,此属性允许您提供一个字符串(解释为UTF-8)用于分隔多个Kafka消息。这是一个可选属性,如果未提供,则每次触发时,接收到的每个Kafka消息都将生成一个单独的FlowFile。要输入特殊字符(如“换行符”),请根据操作系统使用CTRL+Enter或Shift+Enter。
显示名称 | 消息分隔符 |
描述 | 由于KafkaConsumer以批次接收消息,此处理器可以选择输出包含给定主题和分区的单个批次中所有Kafka消息的FlowFile,此属性允许您提供一个字符串(解释为UTF-8)用于分隔多个Kafka消息。这是一个可选属性,如果未提供,则每次触发时,接收到的每个Kafka消息都将生成一个单独的FlowFile。要输入特殊字符(如“换行符”),请根据操作系统使用CTRL+Enter或Shift+Enter |
API名称 | Message Demarcator |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
依赖项 | 处理策略设置为[DEMARCATOR]中的任何一个 |
输出策略
用于将Kafka记录输出到FlowFile记录的格式。
显示名称 | 输出策略 |
描述 | 用于将Kafka记录输出到FlowFile记录的格式 |
API名称 | Output Strategy |
默认值 | USE_VALUE |
允许值 | 使用内容作为值 |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
依赖项 | 处理策略设置为[RECORD]中的任何一个 |
处理策略
处理Kafka记录并将序列化输出写入FlowFile的策略。
显示名称 | 处理策略 |
描述 | 处理Kafka记录并将序列化输出写入FlowFile的策略 |
API名称 | Processing Strategy |
默认值 | FLOW_FILE |
允许值 | FLOW_FILE |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
记录读取器
用于处理传入Kafka消息的记录读取器。
显示名称 | 记录读取器 |
描述 | 用于处理传入Kafka消息的记录读取器 |
API名称 | Record Reader |
服务接口 | org.apache.nifi.serialization.RecordReaderFactory |
服务实现 | org.apache.nifi.avro.AvroReader |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
依赖项 | 处理策略设置为[RECORD]中的任何一个 |
记录写入器
用于序列化输出的FlowFile的记录写入器。
显示名称 | 记录写入器 |
描述 | 用于序列化输出的FlowFile的记录写入器 |
API名称 | Record Writer |
服务接口 | org.apache.nifi.serialization.RecordSetWriterFactory |
服务实现 | org.apache.nifi.avro.AvroRecordSetWriter |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
依赖项 | 处理策略设置为[RECORD]中的任何一个 |
按键分隔
启用此属性后,只有当两条Kafka消息具有相同的键时,它们才会被添加到同一个FlowFile中。
显示名称 | 按键分隔 |
描述 | 启用此属性后,只有当两条Kafka消息具有相同的键时,它们才会被添加到同一个FlowFile中 |
API名称 | Separate By Key |
默认值 | false |
允许值 | true |
依赖项 | 消息分隔符设置为任何指定值 |
主题格式
指定提供的主题是逗号分隔的名称列表还是单个正则表达式。
显示名称 | 主题格式 |
描述 | 指定提供的主题是逗号分隔的名称列表还是单个正则表达式 |
API名称 | Topic Format |
默认值 | names |
允许值 | names |
表达式语言范围 | 不支持 |
是否敏感 | 否 |
是否必需 | 是 |
主题
处理器从中消费Kafka记录的Kafka主题的名称或模式。如果用逗号分隔,可以提供多个。
显示名称 | 主题 |
描述 | 处理器从中消费Kafka记录的Kafka主题的名称或模式。如果用逗号分隔,可以提供多个 |
API名称 | Topics |
表达式语言范围 | JVM级别定义的环境变量和系统属性 |
是否敏感 | 否 |
是否必需 | 是 |
关系
名称 | 描述 |
success | 包含一个或多个序列化Kafka记录的FlowFile |
写入属性
名称 | 描述 |
record.count | 接收到的记录数 |
mime.type | 由配置的记录写入器提供的MIME类型 |
kafka.count | 如果有多个消息,则为写入的消息数 |
kafka.key | 如果存在且为单个消息,则为消息的键。键的编码方式取决于“键属性编码”属性的值 |
kafka.offset | 消息在主题分区中的偏移量 |
kafka.timestamp | 消息在主题分区中的时间戳 |
kafka.partition | 消息或消息包来自的主题分区 |
kafka.topic | 消息或消息包来自的主题 |
kafka.tombstone | 如果消费的消息是墓碑消息,则设置为true |
另请参阅
org.apache.nifi.kafka.processors.PublishKafka