Apache NiFi 2.x处理器:ListFile 2.3.0
包
org.apache.nifi | nifi-standard-nar
描述
从输入目录检索文件列表。对于列出的每个文件,都会创建一个代表该文件的FlowFile,以便可以结合FetchFile来获取文件内容。当“Input Directory Location”设置为“Remote”时,此处理器设计为仅在集群中的主节点上运行。如果主节点发生变化,新的主节点将从先前节点停止的地方继续,而不会重复处理所有数据。当“Input Directory Location”为“Local”时,“Execution”模式可以是任意的,并且不会发生同步。与GetFile不同,此处理器不会从本地文件系统删除任何数据。
标签
file、filesystem、get、ingest、list、source
输入要求
FORBIDDEN
是否支持敏感动态属性
false
ListFile 2.3.0补充细节
ListFile会列出在配置目录中遇到的所有文件。大致有两种常见的使用场景。
流处理场景
默认情况下,该处理器会为目录中的每个文件创建一个单独的FlowFile,并添加文件名、路径等属性。常见的使用场景是将ListFile连接到FetchFile处理器。这两个处理器结合使用,能够轻松监控目录,并以高效的流处理方式获取新到达文件的内容。
批处理场景
另一个常见的使用场景是希望处理给定目录中所有新到达的文件,并且仅在所有文件都完成处理后执行某些操作。上述流处理数据的方法很难实现这一点,因为NiFi本质上是一个流处理平台,没有“作业”的起止概念。数据只是在可用时被获取。
为了解决这个问题,可以选择为ListFile处理器配置一个Record Writer。配置Record Writer后,将创建一个单独的FlowFile,其中包含目录中每个文件的记录,而不是为每个文件创建一个单独的FlowFile。在这种模式下,为了获取每个文件的内容,必须将记录拆分为单独的FlowFile,然后进行获取。那么这对我们有什么帮助呢?
通过拆分FlowFile并在一个Process Group中处理所有数据,我们仍然可以实现等待目录中的所有文件都被处理的预期用例。将Process Group的“FlowFile Concurrency”配置为“Single FlowFile per Node”,意味着每次只有一个FlowFile会进入该Process Group。一旦发生这种情况,就可以拆分FlowFile并处理每个部分。将Process Group的“Outbound Policy”配置为“Batch Output”,意味着在所有FlowFile都完成处理之前,没有任何FlowFile会离开该Process Group。
在此流程中,我们使用ListFile列出目录。处理器配置了一个Record Writer(在本例中为CSV Writer,但也可以使用任何Record Writer),因此整个列表仅生成一个FlowFile。然后,该列表被发送到“Process Listing”Process Group(如下所示)。只有在整个目录的内容都被处理后,数据才会离开“Process Listing”Process Group。此时,当Process Group中的所有数据都准备好离开时,每个已处理的文件将被发送到“Post - Processing”Process Group。同时,原始列表将被发送到“Processing Complete Notification”Process Group。为了实现这一点,Process Group必须配置为“Single FlowFile per Node”的FlowFile Concurrency和“Batch Output”的Outbound Policy。
“Process Listing”Process Group通过“Listing”输入端口接收列表。然后,该列表直接发送到“Listing of Processed Data”输出端口,以便在所有处理完成后,原始列表也将被发送出去。
接下来,列表被拆分为每个记录一个单独的FlowFile。因为我们想使用FetchFile来获取数据,所以需要将文件的文件名和路径作为FlowFile的属性。这可以通过几种不同的方式完成,但最简单的机制是使用PartitionRecord处理器。此处理器配置了一个能够读取ListFile写入数据的Record Reader(在本例中为CSV Reader)。该处理器还配置了两个额外的用户定义属性:
- absolute.path: /path
- filename: /filename
结果,进入PartitionRecord处理器的每个记录都将被拆分为一个单独的FlowFile(因为“path”和“filename”字段的组合对于每个记录都是唯一的),并且“filename”和“path”记录字段将成为FlowFile的属性(使用“absolute.path”和“filename”作为属性名称)。FetchFile使用默认配置,该配置引用这些属性。
最后,我们处理数据 - 在本示例中,只是通过GZIP压缩进行压缩 - 并将输出发送到“Processed Data”输出端口。数据将在此处排队,直到所有数据都准备好离开Process Group,然后才会被释放。
记录模式
当处理器配置为使用Record Writer写入列表时,记录将使用以下模式(以Avro格式)写入:
{
"type": "record",
"name": "nifiRecord",
"namespace": "org.apache.nifi",
"fields": [
{
"name": "filename",
"type": "string"
},
{
"name": "path",
"type": "string"
},
{
"name": "directory",
"type": "boolean"
},
{
"name": "size",
"type": "long"
},
{
"name": "lastModified",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "permissions",
"type": [
"null",
"string"
]
},
{
"name": "owner",
"type": [
"null",
"string"
]
},
{
"name": "group",
"type": [
"null",
"string"
]
}
]
}
属性
Entity Tracking Initial Listing Target
指定应如何处理初始列表。由“Tracking Entities”策略使用。
显示名称 | Entity Tracking Initial Listing Target |
描述 | 指定应如何处理初始列表。由“Tracking Entities”策略使用。 |
API名称 | et-initial-listing-target |
默认值 | all |
允许值 | Tracking Time Window |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | false |
Entity Tracking Node Identifier
配置的值将附加到缓存键,以便在跟踪状态作用域为LOCAL时,按NiFi节点而不是集群范围跟踪列表状态。由“Tracking Entities”策略使用。
显示名称 | Entity Tracking Node Identifier |
描述 | 配置的值将附加到缓存键,以便在跟踪状态作用域为LOCAL时,按NiFi节点而不是集群范围跟踪列表状态。由“Tracking Entities”策略使用。 |
API名称 | et-node-identifier |
默认值 | ${hostname()} |
表达式语言作用域 | JVM级别定义的环境变量和系统属性 |
是否敏感 | false |
是否必填 | false |
Entity Tracking State Cache
列出的实体存储在指定的缓存存储中,以便此处理器在NiFi重启或主节点更改的情况下可以继续列出文件。“Tracking Entities”策略要求跟踪过去“Tracking Time Window”内所有列出实体的信息。为了支持大量实体,该策略使用DistributedMapCache而不是管理状态。缓存键格式为“ListedEntities::{processorId}(::{nodeId})”。如果它按节点跟踪列出的实体,则会添加可选的“::{nodeId}”部分以单独管理状态。例如,集群范围的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b”,按节点的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b::nifi - node3”。存储的缓存内容是压缩的JSON字符串。当目标列表配置更改时,缓存键将被删除。由“Tracking Entities”策略使用。
显示名称 | Entity Tracking State Cache |
描述 | 列出的实体存储在指定的缓存存储中,以便此处理器在NiFi重启或主节点更改的情况下可以继续列出文件。“Tracking Entities”策略要求跟踪过去“Tracking Time Window”内所有列出实体的信息。为了支持大量实体,该策略使用DistributedMapCache而不是管理状态。缓存键格式为“ListedEntities::{processorId}(::{nodeId})”。如果它按节点跟踪列出的实体,则会添加可选的“::{nodeId}”部分以单独管理状态。例如,集群范围的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b”,按节点的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b::nifi - node3”。存储的缓存内容是压缩的JSON字符串。当目标列表配置更改时,缓存键将被删除。由“Tracking Entities”策略使用。 |
API名称 | et-state-cache |
服务接口 | org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService |
实现类 | org.apache.nifi.hazelcast.services.cacheclient.HazelcastMapCacheClient |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | false |
Entity Tracking Time Window
指定此处理器应跟踪已列出实体的时长。“Tracking Entities”策略可以选择时间戳在指定时间窗口内的任何实体。例如,如果设置为“30 minutes”,则此处理器运行时,任何时间戳在最近30分钟内的实体都将成为列表目标。如果满足以下条件之一,列出的实体将被视为“new/updated”并发出一个FlowFile:1. 在已列出的实体中不存在;2. 时间戳比缓存的实体更新;3. 大小与缓存的实体不同。如果缓存实体的时间戳超过指定时间窗口,该实体将从缓存的已列出实体中删除。由“Tracking Entities”策略使用。
显示名称 | Entity Tracking Time Window |
描述 | 指定此处理器应跟踪已列出实体的时长。“Tracking Entities”策略可以选择时间戳在指定时间窗口内的任何实体。例如,如果设置为“30 minutes”,则此处理器运行时,任何时间戳在最近30分钟内的实体都将成为列表目标。如果满足以下条件之一,列出的实体将被视为“new/updated”并发出一个FlowFile:1. 在已列出的实体中不存在;2. 时间戳比缓存的实体更新;3. 大小与缓存的实体不同。如果缓存实体的时间戳超过指定时间窗口,该实体将从缓存的已列出实体中删除。由“Tracking Entities”策略使用。 |
API名称 | et-time-window |
默认值 | 3 hours |
表达式语言作用域 | JVM级别定义的环境变量和系统属性 |
是否敏感 | false |
是否必填 | true |
File Filter
只有文件名与给定正则表达式匹配的文件才会被选中。
显示名称 | File Filter |
描述 | 只有文件名与给定正则表达式匹配的文件才会被选中。 |
API名称 | File Filter |
默认值 | [^.].* |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | true |
Ignore Hidden Files
指示是否应忽略隐藏文件。
显示名称 | Ignore Hidden Files |
描述 | 指示是否应忽略隐藏文件。 |
API名称 | Ignore Hidden Files |
默认值 | true |
允许值 | true |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | true |
Include File Attributes
指示是否将文件的最后修改时间和所有者等信息作为FlowFile属性包含在内。根据所使用的文件系统,收集这些信息可能成本较高,因此应禁用此选项。对于远程文件共享尤其如此。
显示名称 | Include File Attributes |
描述 | 指示是否将文件的最后修改时间和所有者等信息作为FlowFile属性包含在内。根据所使用的文件系统,收集这些信息可能成本较高,因此应禁用此选项。对于远程文件共享尤其如此。 |
API名称 | Include File Attributes |
默认值 | true |
允许值 | true |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | true |
Input Directory
从中提取文件的输入目录。
显示名称 | Input Directory |
描述 | 从中提取文件的输入目录。 |
API名称 | Input Directory |
表达式语言作用域 | JVM级别定义的环境变量和系统属性 |
是否敏感 | false |
是否必填 | true |
Input Directory Location
指定输入目录的位置。这用于确定状态应存储在本地还是集群范围内。
显示名称 | Input Directory Location |
描述 | 指定输入目录的位置。这用于确定状态应存储在本地还是集群范围内。 |
API名称 | Input Directory Location |
默认值 | Local |
允许值 | Local |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | true |
Listing Strategy
指定如何确定新的/更新的实体。详见每个策略的描述。
显示名称 | Listing Strategy |
描述 | 指定如何确定新的/更新的实体。详见每个策略的描述。 |
API名称 | listing-strategy |
默认值 | timestamps |
允许值 | Tracking Timestamps |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | true |
Max Directory Listing Time
列出任何单个目录预计花费的最长时间。如果“Input Directory”属性指定的目录或任何子目录(如果“Recurse”设置为true)的列出时间超过此时间,将为每个超出此时间的目录列出操作生成一个警告公告。
显示名称 | Max Directory Listing Time |
描述 | 列出任何单个目录预计花费的最长时间。如果“Input Directory”属性指定的目录或任何子目录(如果“Recurse”设置为true)的列出时间超过此时间,将为每个超出此时间的目录列出操作生成一个警告公告。 |
API名称 | max-listing-time |
默认值 | 3 mins |
表达式语言作用域 | JVM级别定义的环境变量和系统属性 |
是否敏感 | false |
是否必填 | false |
Max Disk Operation Time
任何单个磁盘操作预计花费的最长时间。如果任何磁盘操作花费的时间超过此时间,将为每个超出此时间的操作生成一个警告公告。
显示名称 | Max Disk Operation Time |
描述 | 任何单个磁盘操作预计花费的最长时间。如果任何磁盘操作花费的时间超过此时间,将为每个超出此时间的操作生成一个警告公告。 |
API名称 | max-operation-time |
默认值 | 10 secs |
表达式语言作用域 | JVM级别定义的环境变量和系统属性 |
是否敏感 | false |
是否必填 | false |
Minimum File Age
文件必须达到的最小年龄才能被提取;任何比此时间年轻(根据最后修改日期)的文件将被忽略。
显示名称 | Minimum File Age |
描述 | 文件必须达到的最小年龄才能被提取;任何比此时间年轻(根据最后修改日期)的文件将被忽略。 |
API名称 | Minimum File Age |
默认值 | 0 sec |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | true |
Maximum Number of Files to Track
如果“Track Performance”属性设置为“true”,此属性指示应保留其性能指标的最大文件数。此属性的值较小将导致较少的堆使用,而较大的值可能会更准确地洞察磁盘访问操作的性能。
显示名称 | Maximum Number of Files to Track |
描述 | 如果“Track Performance”属性设置为“true”,此属性指示应保留其性能指标的最大文件数。此属性的值较小将导致较少的堆使用,而较大的值可能会更准确地洞察磁盘访问操作的性能。 |
API名称 | max-performance-metrics |
默认值 | 100000 |
表达式语言作用域 | JVM级别定义的环境变量和系统属性 |
是否敏感 | false |
是否必填 | true |
Maximum File Age
文件必须达到的最大年龄才能被提取;任何比此时间老(根据最后修改日期)的文件将被忽略。
显示名称 | Maximum File Age |
描述 | 文件必须达到的最大年龄才能被提取;任何比此时间老(根据最后修改日期)的文件将被忽略。 |
API名称 |