Apache NiFi 2.x处理器:ListFile 2.3.0

Apache NiFi 2.x处理器:ListFile 2.3.0

经验文章nimo972025-03-19 15:39:2615A+A-

org.apache.nifi | nifi-standard-nar

描述

从输入目录检索文件列表。对于列出的每个文件,都会创建一个代表该文件的FlowFile,以便可以结合FetchFile来获取文件内容。当“Input Directory Location”设置为“Remote”时,此处理器设计为仅在集群中的主节点上运行。如果主节点发生变化,新的主节点将从先前节点停止的地方继续,而不会重复处理所有数据。当“Input Directory Location”为“Local”时,“Execution”模式可以是任意的,并且不会发生同步。与GetFile不同,此处理器不会从本地文件系统删除任何数据。

标签

file、filesystem、get、ingest、list、source

输入要求

FORBIDDEN

是否支持敏感动态属性

false

ListFile 2.3.0补充细节

ListFile会列出在配置目录中遇到的所有文件。大致有两种常见的使用场景。

流处理场景

默认情况下,该处理器会为目录中的每个文件创建一个单独的FlowFile,并添加文件名、路径等属性。常见的使用场景是将ListFile连接到FetchFile处理器。这两个处理器结合使用,能够轻松监控目录,并以高效的流处理方式获取新到达文件的内容。

批处理场景

另一个常见的使用场景是希望处理给定目录中所有新到达的文件,并且仅在所有文件都完成处理后执行某些操作。上述流处理数据的方法很难实现这一点,因为NiFi本质上是一个流处理平台,没有“作业”的起止概念。数据只是在可用时被获取。

为了解决这个问题,可以选择为ListFile处理器配置一个Record Writer。配置Record Writer后,将创建一个单独的FlowFile,其中包含目录中每个文件的记录,而不是为每个文件创建一个单独的FlowFile。在这种模式下,为了获取每个文件的内容,必须将记录拆分为单独的FlowFile,然后进行获取。那么这对我们有什么帮助呢?

通过拆分FlowFile并在一个Process Group中处理所有数据,我们仍然可以实现等待目录中的所有文件都被处理的预期用例。将Process Group的“FlowFile Concurrency”配置为“Single FlowFile per Node”,意味着每次只有一个FlowFile会进入该Process Group。一旦发生这种情况,就可以拆分FlowFile并处理每个部分。将Process Group的“Outbound Policy”配置为“Batch Output”,意味着在所有FlowFile都完成处理之前,没有任何FlowFile会离开该Process Group。

在此流程中,我们使用ListFile列出目录。处理器配置了一个Record Writer(在本例中为CSV Writer,但也可以使用任何Record Writer),因此整个列表仅生成一个FlowFile。然后,该列表被发送到“Process Listing”Process Group(如下所示)。只有在整个目录的内容都被处理后,数据才会离开“Process Listing”Process Group。此时,当Process Group中的所有数据都准备好离开时,每个已处理的文件将被发送到“Post - Processing”Process Group。同时,原始列表将被发送到“Processing Complete Notification”Process Group。为了实现这一点,Process Group必须配置为“Single FlowFile per Node”的FlowFile Concurrency和“Batch Output”的Outbound Policy。

“Process Listing”Process Group通过“Listing”输入端口接收列表。然后,该列表直接发送到“Listing of Processed Data”输出端口,以便在所有处理完成后,原始列表也将被发送出去。

接下来,列表被拆分为每个记录一个单独的FlowFile。因为我们想使用FetchFile来获取数据,所以需要将文件的文件名和路径作为FlowFile的属性。这可以通过几种不同的方式完成,但最简单的机制是使用PartitionRecord处理器。此处理器配置了一个能够读取ListFile写入数据的Record Reader(在本例中为CSV Reader)。该处理器还配置了两个额外的用户定义属性:

  • absolute.path: /path
  • filename: /filename

结果,进入PartitionRecord处理器的每个记录都将被拆分为一个单独的FlowFile(因为“path”和“filename”字段的组合对于每个记录都是唯一的),并且“filename”和“path”记录字段将成为FlowFile的属性(使用“absolute.path”和“filename”作为属性名称)。FetchFile使用默认配置,该配置引用这些属性。

最后,我们处理数据 - 在本示例中,只是通过GZIP压缩进行压缩 - 并将输出发送到“Processed Data”输出端口。数据将在此处排队,直到所有数据都准备好离开Process Group,然后才会被释放。

记录模式

当处理器配置为使用Record Writer写入列表时,记录将使用以下模式(以Avro格式)写入:

{
    "type": "record",
    "name": "nifiRecord",
    "namespace": "org.apache.nifi",
    "fields": [
        {
            "name": "filename",
            "type": "string"
        },
        {
            "name": "path",
            "type": "string"
        },
        {
            "name": "directory",
            "type": "boolean"
        },
        {
            "name": "size",
            "type": "long"
        },
        {
            "name": "lastModified",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            }
        },
        {
            "name": "permissions",
            "type": [
                "null",
                "string"
            ]
        },
        {
            "name": "owner",
            "type": [
                "null",
                "string"
            ]
        },
        {
            "name": "group",
            "type": [
                "null",
                "string"
            ]
        }
    ]
}

属性

Entity Tracking Initial Listing Target

指定应如何处理初始列表。由“Tracking Entities”策略使用。

显示名称

Entity Tracking Initial Listing Target

描述

指定应如何处理初始列表。由“Tracking Entities”策略使用。

API名称

et-initial-listing-target

默认值

all

允许值

Tracking Time Window
All Available

表达式语言作用域

不支持

是否敏感

false

是否必填

false

Entity Tracking Node Identifier

配置的值将附加到缓存键,以便在跟踪状态作用域为LOCAL时,按NiFi节点而不是集群范围跟踪列表状态。由“Tracking Entities”策略使用。

显示名称

Entity Tracking Node Identifier

描述

配置的值将附加到缓存键,以便在跟踪状态作用域为LOCAL时,按NiFi节点而不是集群范围跟踪列表状态。由“Tracking Entities”策略使用。

API名称

et-node-identifier

默认值

${hostname()}

表达式语言作用域

JVM级别定义的环境变量和系统属性

是否敏感

false

是否必填

false

Entity Tracking State Cache

列出的实体存储在指定的缓存存储中,以便此处理器在NiFi重启或主节点更改的情况下可以继续列出文件。“Tracking Entities”策略要求跟踪过去“Tracking Time Window”内所有列出实体的信息。为了支持大量实体,该策略使用DistributedMapCache而不是管理状态。缓存键格式为“ListedEntities::{processorId}(::{nodeId})”。如果它按节点跟踪列出的实体,则会添加可选的“::{nodeId}”部分以单独管理状态。例如,集群范围的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b”,按节点的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b::nifi - node3”。存储的缓存内容是压缩的JSON字符串。当目标列表配置更改时,缓存键将被删除。由“Tracking Entities”策略使用。

显示名称

Entity Tracking State Cache

描述

列出的实体存储在指定的缓存存储中,以便此处理器在NiFi重启或主节点更改的情况下可以继续列出文件。“Tracking Entities”策略要求跟踪过去“Tracking Time Window”内所有列出实体的信息。为了支持大量实体,该策略使用DistributedMapCache而不是管理状态。缓存键格式为“ListedEntities::{processorId}(::{nodeId})”。如果它按节点跟踪列出的实体,则会添加可选的“::{nodeId}”部分以单独管理状态。例如,集群范围的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b”,按节点的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b::nifi - node3”。存储的缓存内容是压缩的JSON字符串。当目标列表配置更改时,缓存键将被删除。由“Tracking Entities”策略使用。

API名称

et-state-cache

服务接口

org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService

实现类

org.apache.nifi.hazelcast.services.cacheclient.HazelcastMapCacheClient
org.apache.nifi.distributed.cache.client.MapCacheClientService
org.apache.nifi.redis.service.RedisDistributedMapCacheClientService
org.apache.nifi.redis.service.SimpleRedisDistributedMapCacheClientService

表达式语言作用域

不支持

是否敏感

false

是否必填

false

Entity Tracking Time Window

指定此处理器应跟踪已列出实体的时长。“Tracking Entities”策略可以选择时间戳在指定时间窗口内的任何实体。例如,如果设置为“30 minutes”,则此处理器运行时,任何时间戳在最近30分钟内的实体都将成为列表目标。如果满足以下条件之一,列出的实体将被视为“new/updated”并发出一个FlowFile:1. 在已列出的实体中不存在;2. 时间戳比缓存的实体更新;3. 大小与缓存的实体不同。如果缓存实体的时间戳超过指定时间窗口,该实体将从缓存的已列出实体中删除。由“Tracking Entities”策略使用。

显示名称

Entity Tracking Time Window

描述

指定此处理器应跟踪已列出实体的时长。“Tracking Entities”策略可以选择时间戳在指定时间窗口内的任何实体。例如,如果设置为“30 minutes”,则此处理器运行时,任何时间戳在最近30分钟内的实体都将成为列表目标。如果满足以下条件之一,列出的实体将被视为“new/updated”并发出一个FlowFile:1. 在已列出的实体中不存在;2. 时间戳比缓存的实体更新;3. 大小与缓存的实体不同。如果缓存实体的时间戳超过指定时间窗口,该实体将从缓存的已列出实体中删除。由“Tracking Entities”策略使用。

API名称

et-time-window

默认值

3 hours

表达式语言作用域

JVM级别定义的环境变量和系统属性

是否敏感

false

是否必填

true

File Filter

只有文件名与给定正则表达式匹配的文件才会被选中。

显示名称

File Filter

描述

只有文件名与给定正则表达式匹配的文件才会被选中。

API名称

File Filter

默认值

[^.].*

表达式语言作用域

不支持

是否敏感

false

是否必填

true

Ignore Hidden Files

指示是否应忽略隐藏文件。

显示名称

Ignore Hidden Files

描述

指示是否应忽略隐藏文件。

API名称

Ignore Hidden Files

默认值

true

允许值

true
false

表达式语言作用域

不支持

是否敏感

false

是否必填

true

Include File Attributes

指示是否将文件的最后修改时间和所有者等信息作为FlowFile属性包含在内。根据所使用的文件系统,收集这些信息可能成本较高,因此应禁用此选项。对于远程文件共享尤其如此。

显示名称

Include File Attributes

描述

指示是否将文件的最后修改时间和所有者等信息作为FlowFile属性包含在内。根据所使用的文件系统,收集这些信息可能成本较高,因此应禁用此选项。对于远程文件共享尤其如此。

API名称

Include File Attributes

默认值

true

允许值

true
false

表达式语言作用域

不支持

是否敏感

false

是否必填

true

Input Directory

从中提取文件的输入目录。

显示名称

Input Directory

描述

从中提取文件的输入目录。

API名称

Input Directory

表达式语言作用域

JVM级别定义的环境变量和系统属性

是否敏感

false

是否必填

true

Input Directory Location

指定输入目录的位置。这用于确定状态应存储在本地还是集群范围内。

显示名称

Input Directory Location

描述

指定输入目录的位置。这用于确定状态应存储在本地还是集群范围内。

API名称

Input Directory Location

默认值

Local

允许值

Local
Remote

表达式语言作用域

不支持

是否敏感

false

是否必填

true

Listing Strategy

指定如何确定新的/更新的实体。详见每个策略的描述。

显示名称

Listing Strategy

描述

指定如何确定新的/更新的实体。详见每个策略的描述。

API名称

listing-strategy

默认值

timestamps

允许值

Tracking Timestamps
Tracking Entities
No Tracking

表达式语言作用域

不支持

是否敏感

false

是否必填

true

Max Directory Listing Time

列出任何单个目录预计花费的最长时间。如果“Input Directory”属性指定的目录或任何子目录(如果“Recurse”设置为true)的列出时间超过此时间,将为每个超出此时间的目录列出操作生成一个警告公告。

显示名称

Max Directory Listing Time

描述

列出任何单个目录预计花费的最长时间。如果“Input Directory”属性指定的目录或任何子目录(如果“Recurse”设置为true)的列出时间超过此时间,将为每个超出此时间的目录列出操作生成一个警告公告。

API名称

max-listing-time

默认值

3 mins

表达式语言作用域

JVM级别定义的环境变量和系统属性

是否敏感

false

是否必填

false

Max Disk Operation Time

任何单个磁盘操作预计花费的最长时间。如果任何磁盘操作花费的时间超过此时间,将为每个超出此时间的操作生成一个警告公告。

显示名称

Max Disk Operation Time

描述

任何单个磁盘操作预计花费的最长时间。如果任何磁盘操作花费的时间超过此时间,将为每个超出此时间的操作生成一个警告公告。

API名称

max-operation-time

默认值

10 secs

表达式语言作用域

JVM级别定义的环境变量和系统属性

是否敏感

false

是否必填

false

Minimum File Age

文件必须达到的最小年龄才能被提取;任何比此时间年轻(根据最后修改日期)的文件将被忽略。

显示名称

Minimum File Age

描述

文件必须达到的最小年龄才能被提取;任何比此时间年轻(根据最后修改日期)的文件将被忽略。

API名称

Minimum File Age

默认值

0 sec

表达式语言作用域

不支持

是否敏感

false

是否必填

true

Maximum Number of Files to Track

如果“Track Performance”属性设置为“true”,此属性指示应保留其性能指标的最大文件数。此属性的值较小将导致较少的堆使用,而较大的值可能会更准确地洞察磁盘访问操作的性能。

显示名称

Maximum Number of Files to Track

描述

如果“Track Performance”属性设置为“true”,此属性指示应保留其性能指标的最大文件数。此属性的值较小将导致较少的堆使用,而较大的值可能会更准确地洞察磁盘访问操作的性能。

API名称

max-performance-metrics

默认值

100000

表达式语言作用域

JVM级别定义的环境变量和系统属性

是否敏感

false

是否必填

true

Maximum File Age

文件必须达到的最大年龄才能被提取;任何比此时间老(根据最后修改日期)的文件将被忽略。

显示名称

Maximum File Age

描述

文件必须达到的最大年龄才能被提取;任何比此时间老(根据最后修改日期)的文件将被忽略。

API名称

点击这里复制本文地址 以上内容由nimo97整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

尼墨宝库 © All Rights Reserved.  蜀ICP备2024111239号-7