Apache NiFi 2.x处理器:ListFTP 2.3.0

Apache NiFi 2.x处理器:ListFTP 2.3.0

经验文章nimo972025-03-25 13:12:3110A+A-

org.apache.nifi | nifi-standard-nar

描述

列出FTP服务器上的文件。对于在远程服务器上找到的每个文件,都会创建一个新的FlowFile,并将文件名属性设置为远程服务器上的文件名。这可以与FetchFTP结合使用,以便获取这些文件。

标签

files、ftp、ingest、input、list、remote、source

输入要求

FORBIDDEN

是否支持敏感动态属性

false

ListFTP 2.3.0补充细节

ListFTP会列出其在FTP服务器配置目录中遇到的所有文件。大致有两种常见的使用场景。

流处理场景

默认情况下,该处理器会为目录中的每个文件创建一个单独的FlowFile,并添加文件名、路径等属性。常见的使用场景是将ListFTP连接到FetchFTP处理器。这两个处理器结合使用,能够轻松监控目录,并以高效的流处理方式获取FTP服务器上新到达文件的内容。

批处理场景

另一个常见的使用场景是希望处理给定目录中所有新到达的文件,并且仅在所有文件都完成处理后执行某些操作。上述流处理数据的方法很难实现这一点,因为NiFi本质上是一个流处理平台,没有“作业”的起止概念。数据只是在可用时被获取。

为了解决这个问题,可以选择为ListFTP处理器配置一个Record Writer。配置Record Writer后,将创建一个单独的FlowFile,其中包含目录中每个文件的记录,而不是为每个文件创建一个单独的FlowFile。在这种模式下,为了获取每个文件的内容,必须将记录拆分为单独的FlowFile,然后进行获取。那么这对我们有什么帮助呢?

通过拆分FlowFile并在一个Process Group中处理所有数据,我们仍然可以实现等待目录中的所有文件都被处理的预期用例。将Process Group的“FlowFile Concurrency”配置为“Single FlowFile per Node”,意味着每次只有一个FlowFile会进入该Process Group。一旦发生这种情况,就可以拆分FlowFile并处理每个部分。将Process Group的“Outbound Policy”配置为“Batch Output”,意味着在所有FlowFile都完成处理之前,没有任何FlowFile会离开该Process Group。

在此流程中,我们使用ListFTP列出目录。处理器配置了一个Record Writer(在本例中为CSV Writer,但也可以使用任何Record Writer),因此整个列表仅生成一个FlowFile。然后,该列表被发送到“Process Listing”Process Group(如下所示)。只有在整个目录的内容都被处理后,数据才会离开“Process Listing”Process Group。此时,当Process Group中的所有数据都准备好离开时,每个已处理的文件将被发送到“Post - Processing”Process Group。同时,原始列表将被发送到“Processing Complete Notification”Process Group。为了实现这一点,Process Group必须配置为“Single FlowFile per Node”的FlowFile Concurrency和“Batch Output”的Outbound Policy。

“Process Listing”Process Group通过“Listing”输入端口接收列表。然后,该列表直接发送到“Listing of Processed Data”输出端口,以便在所有处理完成后,原始列表也将被发送出去。

接下来,列表被拆分为每个记录一个单独的FlowFile。因为我们想使用FetchFTP来获取数据,所以需要将文件的文件名和路径作为FlowFile的属性。这可以通过几种不同的方式完成,但最简单的机制是使用PartitionRecord处理器。此处理器配置了一个能够读取ListFTP写入数据的Record Reader(在本例中为CSV Reader)。该处理器还配置了两个额外的用户定义属性:

  • path: /path
  • filename: /filename

结果,进入PartitionRecord处理器的每个记录都将被拆分为一个单独的FlowFile(因为“path”和“filename”字段的组合对于每个记录都是唯一的),并且“filename”和“path”记录字段将成为FlowFile的属性。FetchFTP配置为使用“

{filename}”作为“Remote File”属性的值,利用了这些属性。

最后,我们处理数据——在本示例中,只是通过GZIP压缩进行压缩——并将输出发送到“Processed Data”输出端口。数据将在此处排队,直到所有数据都准备好离开Process Group,然后才会被释放。

记录模式

当处理器配置为使用Record Writer写入列表时,记录将使用以下模式(以Avro格式)写入:

{
    "type": "record",
    "name": "nifiRecord",
    "namespace": "org.apache.nifi",
    "fields": [
        {
            "name": "filename",
            "type": "string"
        },
        {
            "name": "path",
            "type": "string"
        },
        {
            "name": "directory",
            "type": "boolean"
        },
        {
            "name": "size",
            "type": "long"
        },
        {
            "name": "lastModified",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            }
        },
        {
            "name": "permissions",
            "type": [
                "null",
                "string"
            ]
        },
        {
            "name": "owner",
            "type": [
                "null",
                "string"
            ]
        },
        {
            "name": "group",
            "type": [
                "null",
                "string"
            ]
        }
    ]
}

属性

Connection Mode

FTP连接模式。

显示名称

Connection Mode

描述

FTP连接模式

API名称

Connection Mode

默认值

Passive

允许值

Active
Passive

表达式语言作用域

不支持

是否敏感

false

是否必填

false

Connection Timeout

创建连接时等待超时的时间。

显示名称

Connection Timeout

描述

创建连接时等待超时的时间

API名称

Connection Timeout

默认值

30 sec

表达式语言作用域

不支持

是否敏感

false

是否必填

true

Data Timeout

在本地和远程系统之间传输文件时,此值指定在系统之间没有任何数据传输的情况下允许经过的时间。

显示名称

Data Timeout

描述

在本地和远程系统之间传输文件时,此值指定在系统之间没有任何数据传输的情况下允许经过的时间

API名称

Data Timeout

默认值

30 sec

表达式语言作用域

不支持

是否敏感

false

是否必填

true

Entity Tracking Initial Listing Target

指定应如何处理初始列表。由“Tracking Entities”策略使用。

显示名称

Entity Tracking Initial Listing Target

描述

指定应如何处理初始列表。由“Tracking Entities”策略使用

API名称

et-initial-listing-target

默认值

all

允许值

Tracking Time Window
All Available

表达式语言作用域

不支持

是否敏感

false

是否必填

false

Entity Tracking State Cache

列出的实体存储在指定的缓存存储中,以便此处理器在NiFi重启或主节点更改的情况下可以继续列出文件。“Tracking Entities”策略要求跟踪过去“Tracking Time Window”内所有列出实体的信息。为了支持大量实体,该策略使用DistributedMapCache而不是管理状态。缓存键格式为“ListedEntities::{processorId}(::{nodeId})”。如果它按节点跟踪列出的实体,则会添加可选的“::{nodeId}”部分以单独管理状态。例如,集群范围的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b”,按节点的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b::nifi - node3”。存储的缓存内容是压缩的JSON字符串。当目标列表配置更改时,缓存键将被删除。由“Tracking Entities”策略使用。

显示名称

Entity Tracking State Cache

描述

列出的实体存储在指定的缓存存储中,以便此处理器在NiFi重启或主节点更改的情况下可以继续列出文件。“Tracking Entities”策略要求跟踪过去“Tracking Time Window”内所有列出实体的信息。为了支持大量实体,该策略使用DistributedMapCache而不是管理状态。缓存键格式为“ListedEntities::{processorId}(::{nodeId})”。如果它按节点跟踪列出的实体,则会添加可选的“::{nodeId}”部分以单独管理状态。例如,集群范围的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b”,按节点的缓存键 = “ListedEntities::8dda2321 - 0164 - 1000 - 50fa - 3042fe7d6a7b::nifi - node3”。存储的缓存内容是压缩的JSON字符串。当目标列表配置更改时,缓存键将被删除。由“Tracking Entities”策略使用

API名称

et-state-cache

服务接口

org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService

实现类

org.apache.nifi.hazelcast.services.cacheclient.HazelcastMapCacheClient
org.apache.nifi.distributed.cache.client.MapCacheClientService
org.apache.nifi.redis.service.RedisDistributedMapCacheClientService
org.apache.nifi.redis.service.SimpleRedisDistributedMapCacheClientService

表达式语言作用域

不支持

是否敏感

false

是否必填

false

Entity Tracking Time Window

指定此处理器应跟踪已列出实体的时长。“Tracking Entities”策略可以选择时间戳在指定时间窗口内的任何实体。例如,如果设置为“30 minutes”,则此处理器运行时,任何时间戳在最近30分钟内的实体都将成为列表目标。如果满足以下条件之一,列出的实体将被视为“new/updated”并发出一个FlowFile:1. 在已列出的实体中不存在;2. 时间戳比缓存的实体更新;3. 大小与缓存的实体不同。如果缓存实体的时间戳超过指定时间窗口,该实体将从缓存的已列出实体中删除。由“Tracking Entities”策略使用。

显示名称

Entity Tracking Time Window

描述

指定此处理器应跟踪已列出实体的时长。“Tracking Entities”策略可以选择时间戳在指定时间窗口内的任何实体。例如,如果设置为“30 minutes”,则此处理器运行时,任何时间戳在最近30分钟内的实体都将成为列表目标。如果满足以下条件之一,列出的实体将被视为“new/updated”并发出一个FlowFile:1. 在已列出的实体中不存在;2. 时间戳比缓存的实体更新;3. 大小与缓存的实体不同。如果缓存实体的时间戳超过指定时间窗口,该实体将从缓存的已列出实体中删除。由“Tracking Entities”策略使用

API名称

et-time-window

默认值

3 hours

表达式语言作用域

JVM级别定义的环境变量和系统属性

是否敏感

false

是否必填

false

File Filter Regex

提供用于过滤文件名的Java正则表达式;如果提供了过滤器,只有文件名与该正则表达式匹配的文件才会被获取。

显示名称

File Filter Regex

描述

提供用于过滤文件名的Java正则表达式;如果提供了过滤器,只有文件名与该正则表达式匹配的文件才会被获取

API名称

File Filter Regex

表达式语言作用域

不支持

是否敏感

false

是否必填

false

Follow symlink

如果为true,将提取符号链接文件以及嵌套的符号链接子目录;否则,将不读取符号链接文件,也不会遍历符号链接子目录。

显示名称

Follow symlink

描述

如果为true,将提取符号链接文件以及嵌套的符号链接子目录;否则,将不读取符号链接文件,也不会遍历符号链接子目录

API名称

follow-symlink

默认值

false

允许值

true
false

表达式语言作用域

不支持

是否敏感

false

是否必填

true

Use UTF-8 Encoding

告诉客户端在处理文件和文件名时使用UTF - 8编码。如果设置为true,服务器也必须支持UTF - 8编码。

显示名称

Use UTF-8 Encoding

描述

告诉客户端在处理文件和文件名时使用UTF - 8编码。如果设置为true,服务器也必须支持UTF - 8编码

API名称

ftp-use-utf8

默认值

false

允许值

true
false

表达式语言作用域

不支持

是否敏感

false

是否必填

true

Hostname

远程系统的完全限定主机名或IP地址。

显示名称

Hostname

描述

远程系统的完全限定主机名或IP地址

API名称

Hostname

表达式语言作用域

JVM级别定义的环境变量和系统属性

是否敏感

false

是否必填

true

Ignore Dotted Files

如果为true,名称以点(“.”)开头的文件将被忽略。

显示名称

Ignore Dotted Files

描述

如果为true,名称以点(“.”)开头的文件将被忽略

API名称

Ignore Dotted Files

默认值

true

允许值

true
false

表达式语言作用域

不支持

是否敏感

false

是否必填

true

Internal Buffer Size

设置缓冲数据流的内部缓冲区大小。

显示名称

Internal Buffer Size

描述

设置缓冲数据流的内部缓冲区大小

API名称

Internal Buffer Size

默认值

16KB

表达式语言作用域

不支持

是否敏感

false

是否必填

false

Listing Strategy

指定如何确定新的/更新的实体。详见每个策略的描述。

显示名称

Listing Strategy

描述

指定如何确定新的/更新的实体。详见每个策略的描述

API名称

listing-strategy

默认值

timestamps

允许值

Tracking Timestamps
Tracking Entities
No Tracking
Time Window

表达式语言作用域

不支持

是否敏感

false

是否必填

true

Password

用户账户的密码。

显示名称

Password

描述

用户账户的密码

API名称

Password

表达式语言作用域

环境变量和FlowFile属性

是否敏感

true

是否必填

false

Path Filter Regex

当“Search Recursively”为true时,只有路径与给定正则表达式匹配的子目录才会被扫描。

显示名称

Path Filter Regex

描述

当“Search Recursively”为true时,只有路径与给定正则表达式匹配的子目录才会被扫描

API名称

Path Filter Regex

表达式语言作用域

不支持

是否敏感

false

是否必填

false

Port

连接到远程主机以获取数据的端口。

显示名称

Port

描述

连接到远程主机以获取数据的端口

API名称

Port

默认值

21

表达式语言作用域

JVM级别定义的环境变量和系统属性

是否敏感

false

是否必填

true

Proxy Configuration Service

指定用于代理网络请求的代理配置控制器服务。支持的代理:SOCKS + AuthN、HTTP + AuthN。

显示名称

Proxy Configuration Service

描述

指定用于代理网络请求的代理配置控制器服务。支持的代理:SOCKS + AuthN、HTTP + AuthN

API名称

proxy-configuration-service

服务接口

org.apache.nifi.proxy.ProxyConfigurationService

实现类

org.apache.nifi.proxy.StandardProxyConfigurationService

表达式语言作用域

不支持

是否敏感

false

是否必填

false

Record Writer

指定用于创建列表的Record Writer。如果未指定,将为列出的每个实体创建一个FlowFile。如果指定了Record Writer,所有实体将被写入单个FlowFile,而不是为单个FlowFile添加属性。

显示名称

Record Writer

描述

指定用于创建列表的Record Writer。如果未指定,将为列出的每个实体创建一个FlowFile。如果指定了Record Writer,所有实体将被

点击这里复制本文地址 以上内容由nimo97整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

尼墨宝库 © All Rights Reserved.  蜀ICP备2024111239号-7