Apache NiFi 2.x处理器:ListFTP 2.3.0
包
org.apache.nifi | nifi-standard-nar
描述
列出FTP服务器上的文件。对于在远程服务器上找到的每个文件,都会创建一个新的FlowFile,并将文件名属性设置为远程服务器上的文件名。这可以与FetchFTP结合使用,以便获取这些文件。
标签
files、ftp、ingest、input、list、remote、source
输入要求
FORBIDDEN
是否支持敏感动态属性
false
ListFTP 2.3.0补充细节
ListFTP会列出其在FTP服务器配置目录中遇到的所有文件。大致有两种常见的使用场景。
流处理场景
默认情况下,该处理器会为目录中的每个文件创建一个单独的FlowFile,并添加文件名、路径等属性。常见的使用场景是将ListFTP连接到FetchFTP处理器。这两个处理器结合使用,能够轻松监控目录,并以高效的流处理方式获取FTP服务器上新到达文件的内容。
批处理场景
另一个常见的使用场景是希望处理给定目录中所有新到达的文件,并且仅在所有文件都完成处理后执行某些操作。上述流处理数据的方法很难实现这一点,因为NiFi本质上是一个流处理平台,没有“作业”的起止概念。数据只是在可用时被获取。
为了解决这个问题,可以选择为ListFTP处理器配置一个Record Writer。配置Record Writer后,将创建一个单独的FlowFile,其中包含目录中每个文件的记录,而不是为每个文件创建一个单独的FlowFile。在这种模式下,为了获取每个文件的内容,必须将记录拆分为单独的FlowFile,然后进行获取。那么这对我们有什么帮助呢?
通过拆分FlowFile并在一个Process Group中处理所有数据,我们仍然可以实现等待目录中的所有文件都被处理的预期用例。将Process Group的“FlowFile Concurrency”配置为“Single FlowFile per Node”,意味着每次只有一个FlowFile会进入该Process Group。一旦发生这种情况,就可以拆分FlowFile并处理每个部分。将Process Group的“Outbound Policy”配置为“Batch Output”,意味着在所有FlowFile都完成处理之前,没有任何FlowFile会离开该Process Group。
在此流程中,我们使用ListFTP列出目录。处理器配置了一个Record Writer(在本例中为CSV Writer,但也可以使用任何Record Writer),因此整个列表仅生成一个FlowFile。然后,该列表被发送到“Process Listing”Process Group(如下所示)。只有在整个目录的内容都被处理后,数据才会离开“Process Listing”Process Group。此时,当Process Group中的所有数据都准备好离开时,每个已处理的文件将被发送到“Post - Processing”Process Group。同时,原始列表将被发送到“Processing Complete Notification”Process Group。为了实现这一点,Process Group必须配置为“Single FlowFile per Node”的FlowFile Concurrency和“Batch Output”的Outbound Policy。
“Process Listing”Process Group通过“Listing”输入端口接收列表。然后,该列表直接发送到“Listing of Processed Data”输出端口,以便在所有处理完成后,原始列表也将被发送出去。
接下来,列表被拆分为每个记录一个单独的FlowFile。因为我们想使用FetchFTP来获取数据,所以需要将文件的文件名和路径作为FlowFile的属性。这可以通过几种不同的方式完成,但最简单的机制是使用PartitionRecord处理器。此处理器配置了一个能够读取ListFTP写入数据的Record Reader(在本例中为CSV Reader)。该处理器还配置了两个额外的用户定义属性:
- path: /path
- filename: /filename
结果,进入PartitionRecord处理器的每个记录都将被拆分为一个单独的FlowFile(因为“path”和“filename”字段的组合对于每个记录都是唯一的),并且“filename”和“path”记录字段将成为FlowFile的属性。FetchFTP配置为使用“
{filename}”作为“Remote File”属性的值,利用了这些属性。
最后,我们处理数据——在本示例中,只是通过GZIP压缩进行压缩——并将输出发送到“Processed Data”输出端口。数据将在此处排队,直到所有数据都准备好离开Process Group,然后才会被释放。
记录模式
当处理器配置为使用Record Writer写入列表时,记录将使用以下模式(以Avro格式)写入:
{
"type": "record",
"name": "nifiRecord",
"namespace": "org.apache.nifi",
"fields": [
{
"name": "filename",
"type": "string"
},
{
"name": "path",
"type": "string"
},
{
"name": "directory",
"type": "boolean"
},
{
"name": "size",
"type": "long"
},
{
"name": "lastModified",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "permissions",
"type": [
"null",
"string"
]
},
{
"name": "owner",
"type": [
"null",
"string"
]
},
{
"name": "group",
"type": [
"null",
"string"
]
}
]
}
属性
Connection Mode
FTP连接模式。
显示名称 | Connection Mode |
描述 | FTP连接模式 |
API名称 | Connection Mode |
默认值 | Passive |
允许值 | Active |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | false |
Connection Timeout
创建连接时等待超时的时间。
显示名称 | Connection Timeout |
描述 | 创建连接时等待超时的时间 |
API名称 | Connection Timeout |
默认值 | 30 sec |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | true |
Data Timeout
在本地和远程系统之间传输文件时,此值指定在系统之间没有任何数据传输的情况下允许经过的时间。
显示名称 | Data Timeout |
描述 | 在本地和远程系统之间传输文件时,此值指定在系统之间没有任何数据传输的情况下允许经过的时间 |
API名称 | Data Timeout |
默认值 | 30 sec |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | true |
Entity Tracking Initial Listing Target
指定应如何处理初始列表。由“Tracking Entities”策略使用。
显示名称 | Entity Tracking Initial Listing Target |
描述 | 指定应如何处理初始列表。由“Tracking Entities”策略使用 |
API名称 | et-initial-listing-target |
默认值 | all |
允许值 | Tracking Time Window |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | false |
Entity Tracking State Cache
列出的实体存储在指定的缓存存储中,以便此处理器在NiFi重启或主节点更改的情况下可以继续列出文件。“Tracking Entities”策略要求跟踪过去“Tracking Time Window”内所有列出实体的信息。为了支持大量实体,该策略使用DistributedMapCache而不是管理状态。缓存键格式为“ListedEntities::{processorId}(::{nodeId})”。如果它按节点跟踪列出的实体,则会添加可选的“::{nodeId}”部分以单独管理状态。例如,集群范围的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b”,按节点的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b::nifi - node3”。存储的缓存内容是压缩的JSON字符串。当目标列表配置更改时,缓存键将被删除。由“Tracking Entities”策略使用。
显示名称 | Entity Tracking State Cache |
描述 | 列出的实体存储在指定的缓存存储中,以便此处理器在NiFi重启或主节点更改的情况下可以继续列出文件。“Tracking Entities”策略要求跟踪过去“Tracking Time Window”内所有列出实体的信息。为了支持大量实体,该策略使用DistributedMapCache而不是管理状态。缓存键格式为“ListedEntities::{processorId}(::{nodeId})”。如果它按节点跟踪列出的实体,则会添加可选的“::{nodeId}”部分以单独管理状态。例如,集群范围的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b”,按节点的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b::nifi - node3”。存储的缓存内容是压缩的JSON字符串。当目标列表配置更改时,缓存键将被删除。由“Tracking Entities”策略使用 |
API名称 | et-state-cache |
服务接口 | org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService |
实现类 | org.apache.nifi.hazelcast.services.cacheclient.HazelcastMapCacheClient |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | false |
Entity Tracking Time Window
指定此处理器应跟踪已列出实体的时长。“Tracking Entities”策略可以选择时间戳在指定时间窗口内的任何实体。例如,如果设置为“30 minutes”,则此处理器运行时,任何时间戳在最近30分钟内的实体都将成为列表目标。如果满足以下条件之一,列出的实体将被视为“new/updated”并发出一个FlowFile:1. 在已列出的实体中不存在;2. 时间戳比缓存的实体更新;3. 大小与缓存的实体不同。如果缓存实体的时间戳超过指定时间窗口,该实体将从缓存的已列出实体中删除。由“Tracking Entities”策略使用。
显示名称 | Entity Tracking Time Window |
描述 | 指定此处理器应跟踪已列出实体的时长。“Tracking Entities”策略可以选择时间戳在指定时间窗口内的任何实体。例如,如果设置为“30 minutes”,则此处理器运行时,任何时间戳在最近30分钟内的实体都将成为列表目标。如果满足以下条件之一,列出的实体将被视为“new/updated”并发出一个FlowFile:1. 在已列出的实体中不存在;2. 时间戳比缓存的实体更新;3. 大小与缓存的实体不同。如果缓存实体的时间戳超过指定时间窗口,该实体将从缓存的已列出实体中删除。由“Tracking Entities”策略使用 |
API名称 | et-time-window |
默认值 | 3 hours |
表达式语言作用域 | JVM级别定义的环境变量和系统属性 |
是否敏感 | false |
是否必填 | false |
File Filter Regex
提供用于过滤文件名的Java正则表达式;如果提供了过滤器,只有文件名与该正则表达式匹配的文件才会被获取。
显示名称 | File Filter Regex |
描述 | 提供用于过滤文件名的Java正则表达式;如果提供了过滤器,只有文件名与该正则表达式匹配的文件才会被获取 |
API名称 | File Filter Regex |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | false |
Follow symlink
如果为true,将提取符号链接文件以及嵌套的符号链接子目录;否则,将不读取符号链接文件,也不会遍历符号链接子目录。
显示名称 | Follow symlink |
描述 | 如果为true,将提取符号链接文件以及嵌套的符号链接子目录;否则,将不读取符号链接文件,也不会遍历符号链接子目录 |
API名称 | follow-symlink |
默认值 | false |
允许值 | true |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | true |
Use UTF-8 Encoding
告诉客户端在处理文件和文件名时使用UTF - 8编码。如果设置为true,服务器也必须支持UTF - 8编码。
显示名称 | Use UTF-8 Encoding |
描述 | 告诉客户端在处理文件和文件名时使用UTF - 8编码。如果设置为true,服务器也必须支持UTF - 8编码 |
API名称 | ftp-use-utf8 |
默认值 | false |
允许值 | true |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | true |
Hostname
远程系统的完全限定主机名或IP地址。
显示名称 | Hostname |
描述 | 远程系统的完全限定主机名或IP地址 |
API名称 | Hostname |
表达式语言作用域 | JVM级别定义的环境变量和系统属性 |
是否敏感 | false |
是否必填 | true |
Ignore Dotted Files
如果为true,名称以点(“.”)开头的文件将被忽略。
显示名称 | Ignore Dotted Files |
描述 | 如果为true,名称以点(“.”)开头的文件将被忽略 |
API名称 | Ignore Dotted Files |
默认值 | true |
允许值 | true |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | true |
Internal Buffer Size
设置缓冲数据流的内部缓冲区大小。
显示名称 | Internal Buffer Size |
描述 | 设置缓冲数据流的内部缓冲区大小 |
API名称 | Internal Buffer Size |
默认值 | 16KB |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | false |
Listing Strategy
指定如何确定新的/更新的实体。详见每个策略的描述。
显示名称 | Listing Strategy |
描述 | 指定如何确定新的/更新的实体。详见每个策略的描述 |
API名称 | listing-strategy |
默认值 | timestamps |
允许值 | Tracking Timestamps |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | true |
Password
用户账户的密码。
显示名称 | Password |
描述 | 用户账户的密码 |
API名称 | Password |
表达式语言作用域 | 环境变量和FlowFile属性 |
是否敏感 | true |
是否必填 | false |
Path Filter Regex
当“Search Recursively”为true时,只有路径与给定正则表达式匹配的子目录才会被扫描。
显示名称 | Path Filter Regex |
描述 | 当“Search Recursively”为true时,只有路径与给定正则表达式匹配的子目录才会被扫描 |
API名称 | Path Filter Regex |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | false |
Port
连接到远程主机以获取数据的端口。
显示名称 | Port |
描述 | 连接到远程主机以获取数据的端口 |
API名称 | Port |
默认值 | 21 |
表达式语言作用域 | JVM级别定义的环境变量和系统属性 |
是否敏感 | false |
是否必填 | true |
Proxy Configuration Service
指定用于代理网络请求的代理配置控制器服务。支持的代理:SOCKS + AuthN、HTTP + AuthN。
显示名称 | Proxy Configuration Service |
描述 | 指定用于代理网络请求的代理配置控制器服务。支持的代理:SOCKS + AuthN、HTTP + AuthN |
API名称 | proxy-configuration-service |
服务接口 | org.apache.nifi.proxy.ProxyConfigurationService |
实现类 | org.apache.nifi.proxy.StandardProxyConfigurationService |
表达式语言作用域 | 不支持 |
是否敏感 | false |
是否必填 | false |
Record Writer
指定用于创建列表的Record Writer。如果未指定,将为列出的每个实体创建一个FlowFile。如果指定了Record Writer,所有实体将被写入单个FlowFile,而不是为单个FlowFile添加属性。
显示名称 | Record Writer |
描述 | 指定用于创建列表的Record Writer。如果未指定,将为列出的每个实体创建一个FlowFile。如果指定了Record Writer,所有实体将被 |