包
org.apache.nifi | nifi-scripting-nar
描述
实验性的 - 根据传入的流文件和流程会话执行脚本。该脚本负责处理传入的流文件(例如转移到“成功”关系或删除),以及脚本创建的任何流文件。如果处理不完整或不正确,会话将回滚。实验性:长期使用的影响尚未得到验证。
标签
clojure、执行、groovy、脚本
输入要求
允许
支持敏感的动态属性
是
ExecuteScript 2.2.0的更多详细信息
ExecuteScript处理器提供了使用脚本语言的能力,以便利用NiFi API执行以下任务:
- 从传入的流文件中读取内容和 / 或属性。
- 创建一个新的流文件(可以有父流文件,也可以没有)。
- 向传出的流文件写入内容和 / 或属性。
- 与流程会话进行交互,将流文件转移到不同的关系中。
- 读取 / 写入状态管理器,以便在处理器的多次执行之间跟踪变量。
注意:
ExecuteScript使用JSR - 223脚本引擎API来评估脚本,因此有时会限制惯用语言结构的使用。例如,对于Groovy语言,有一个单独的ExecuteGroovyScript处理器,它允许你执行更多符合Groovy语言习惯的任务。例如,通过ExecuteGroovyScript与控制器服务进行交互比使用ExecuteScript更容易(更多详细信息,请参阅ExecuteGroovyScript的文档)。
变量绑定
处理器期望用户定义一个脚本,在处理器被触发时对其进行评估。脚本中可以使用以下变量:
变量名 | 描述 | 变量类型 |
session | 这是对分配给处理器的流程会话的引用。该会话允许你对流文件执行诸如create()、putAttribute()、transfer() 等操作,以及read() 和write() 操作。 | ProcessSession |
context | 这是对处理器的流程上下文的引用。它可用于检索处理器属性、关系、控制器服务和状态管理器。 | ProcessContext |
log | 这是对处理器的组件日志的引用。使用它向NiFi记录日志消息,例如log.info('Hello world!')。 | ComponentLog |
REL_SUCCESS | 这是对为处理器定义的“成功”关系的引用。也可以通过引用父类(ExecuteScript)的静态成员来继承它,这是一个便利变量。它还避免了使用关系的完全限定名。 | Relationship |
REL_FAILURE | 这是对为处理器定义的“失败”关系的引用。与REL_SUCCESS一样,也可以通过引用父类(ExecuteScript)的静态成员来继承它,这是一个便利变量。它还避免了使用关系的完全限定名。 | Relationship |
动态属性对应的PropertyValue对象 | 这允许你获取属性的字符串值,还可以根据NiFi表达式语言评估属性,将值转换为适当的数据类型(例如布尔型)等。由于动态属性名成为脚本的变量名,你必须注意所选脚本引擎的变量命名规则。例如,Groovy不允许变量名中出现句点(.),所以如果动态属性名为“my.property”,将会出错。与这些变量的交互是通过NiFi Java API进行的。下面的“动态属性”部分将在介绍相关API调用时进行讨论。 |
示例脚本
从会话中获取一个传入的流文件
用例
你有传入到ExecuteScript的连接,并且想要从队列中检索一个流文件进行处理。
方法
使用session对象的get()方法。此方法返回下一个要处理的优先级最高的流文件。如果没有要处理的流文件,该方法将返回null。注意:即使有稳定的流文件流入处理器,也有可能返回null。如果处理器有多个并发任务,并且其他任务已经检索了流文件,就会出现这种情况。如果脚本需要一个流文件才能继续处理,那么如果session.get()返回null,它应该立即返回。
Groovy示例
flowFile = session.get()
if (!flowFile) return
从会话中获取多个传入的流文件
用例
你有传入到ExecuteScript的连接,并且想要从队列中检索多个流文件进行处理。
方法
使用session对象的get(maxResults)方法。此方法从工作队列中返回最多maxResults个流文件。如果没有可用的流文件,将返回一个空列表(该方法不会返回null)。注意:如果存在多个传入队列,在一次调用中是轮询所有队列还是仅轮询单个队列,其行为是未指定的。话虽如此,这里描述了(对于NiFi 1.1.0及更高版本和之前版本)观察到的行为。
Groovy示例
flowFileList = session.get(100)
if (!flowFileList.isEmpty()) {
flowFileList.each { flowFile ->
// 在此处处理每个流文件
}
}
创建一个新的流文件
用例
你想要生成一个新的流文件并发送到下一个处理器。
方法
使用session对象的create()方法。此方法返回一个新的流文件对象,你可以对其进行进一步处理。
Groovy示例
flowFile = session.create()
// 在此处进行额外处理
从父流文件创建一个新的流文件
用例
你想要基于传入的流文件生成新的流文件。
方法
使用session对象的create(parentFlowFile)方法。此方法接受一个父流文件引用,并返回一个新的子流文件对象。新创建的流文件将继承父流文件的所有属性,但UUID除外。此方法将根据在提交流程会话之前是否从同一父流文件生成其他流文件,自动生成一个溯源分叉事件或一个溯源合并事件。
Groovy示例
flowFile = session.get()
if (!flowFile) return
newFlowFile = session.create(flowFile)
// 在此处进行额外处理
向流文件添加一个属性
用例
你有一个流文件,想要向其添加一个自定义属性。
方法
使用session对象的putAttribute(flowFile, attributeKey, attributeValue)方法。此方法使用给定的键值对更新给定流文件的属性。注意:流文件的“uuid”属性是固定的,不能修改;如果键名为“uuid”,它将被忽略。
另外,这里需要提到的是,流文件对象是不可变的;这意味着如果你通过API更新流文件的属性(或以其他方式更改它),你将获得新的流文件版本的新引用。在将流文件转移到关系中时,这一点非常重要。你必须保留对流文件最新版本的引用,并且必须转移或删除从会话中检索到或创建的所有流文件的最新版本,否则在执行时会出错。大多数情况下,用于存储流文件引用的变量将被修改流文件的方法返回的最新版本覆盖(中间的流文件引用将自动丢弃)。在这些示例中,你会看到在添加属性时重用流文件引用的技巧。注意,当前对流文件的引用被传递到putAttribute()方法中。结果流文件将有一个名为“myAttr”、值为“myValue”的属性。还请注意,该方法接受一个字符串作为值;如果你有一个对象,则必须将其序列化为字符串。最后,请注意,如果你要添加多个属性,最好创建一个Map并使用putAllAttributes()方法(详细信息请参阅下一个示例)。
Groovy示例
flowFile = session.get()
flowFile = session.putAttribute(flowFile,'myAttr','myValue')
if (!flowFile) return
向流文件添加多个属性
用例
你有一个流文件,想要向其添加自定义属性。
方法
使用session对象的putAllAttributes(flowFile, attributeMap)方法。此方法使用给定Map中的键值对更新给定流文件的属性。注意:流文件的“uuid”属性是固定的,不能修改;如果键名为“uuid”,它将被忽略。
Groovy示例
attrMap = ['myAttr1': '1','myAttr2': Integer.toString(2)]
flowFile = session.get()
if (!flowFile) return
flowFile = session.putAllAttributes(flowFile, attrMap)
从流文件获取一个属性
用例
你有一个流文件,想要检查其中的一个属性。
方法
使用FlowFile对象的getAttribute(attributeKey)方法。此方法返回给定属性键的字符串值,如果未找到属性键,则返回null。示例展示了如何检索“filename”属性的值。
Groovy示例
myAttr = flowFile.getAttribute('filename')
flowFile = session.get()
if (!flowFile) return
从流文件获取所有属性
用例
你有一个流文件,想要检索其所有属性。
方法
使用FlowFile对象的getAttributes()方法。此方法返回一个包含字符串键和字符串值的Map,代表流文件的属性键值对。示例展示了如何遍历流文件所有属性的Map。
Groovy示例
flowFile = session.get()
if (!flowFile) return
flowFile.getAttributes().each { key, value ->
// 对键值对进行处理
}
将流文件转移到一个关系中
用例
在处理完一个流文件(新的或传入的)后,你想要将该流文件转移到一个关系(“成功”或“失败”)中。在这个简单的例子中,假设存在一个名为“errorOccurred”的变量,它指示流文件应该转移到哪个关系中。本系列的第2部分将讨论其他错误处理技术。
方法
使用session对象的transfer(flowFile, relationship)方法。根据文档:此方法根据给定的关系将给定的流文件转移到适当的目标处理器工作队列中。如果关系指向多个目标,则流文件的状态将被复制,以便每个目标都收到流文件的精确副本,尽管每个副本都有自己唯一的标识。
注意:ExecuteScript将在每次执行结束时执行session.commit(),以确保操作已提交。你不需要(也不应该)在脚本中执行session.commit()。
Groovy示例
flowFile = session.get()
if (!flowFile) return
// 处理过程在此处
if (errorOccurred) {
session.transfer(flowFile, REL_FAILURE)
} else {
session.transfer(flowFile, REL_SUCCESS)
}
以指定的日志级别向日志发送消息
用例
你想要将处理过程中发生的某个事件报告给日志框架。
方法
使用log变量的warn()、trace()、debug()、info()或error()方法。这些方法可以接受一个字符串,或者一个字符串后跟一个对象数组,或者一个字符串后跟一个对象数组再后跟一个Throwable。第一种形式用于简单消息。第二种形式用于当你有一些动态对象 / 值想要记录时。在消息字符串中使用“{}”来引用这些对象。这些对象将按照出现的顺序与对象数组进行匹配,所以如果消息为“Found these things: {} {} {}”,对象数组为['Hello', 1, true],那么记录的消息将是“Found these things: Hello 1 true”。这些日志方法的第三种形式还接受一个Throwable参数,当捕获到异常并想要记录它时很有用。
Groovy示例
log.info('Found these things: {} {} {}', ['Hello', 1, true] as Object[])
使用回调读取传入流文件的内容
用例
你有传入到ExecuteScript的连接,并且想要从队列中检索流文件的内容进行处理。
方法
使用session对象的read(flowFile, inputStreamCallback)方法。需要一个InputStreamCallback对象传递给read()方法。注意,由于InputStreamCallback是一个对象,默认情况下内容仅对该对象可见。如果你需要在read()方法之外使用数据,请使用作用域更全局的变量。示例将使用Apache Commons的IOUtils类将传入流文件的完整内容存储到一个字符串中。注意:对于大的流文件,这不是最佳技术;你应该只读取所需的数据量,并进行适当的处理。对于像SplitText这样的操作,你可以一次读取一行并在InputStreamCallback中进行处理,或者使用前面提到的session.read(flowFile)方法获取一个InputStream引用,以便在回调之外使用。
Groovy示例
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
// 将带有inputStream参数的闭包转换为InputStreamCallback
flowFile = session.get()
if (!flowFile) return
def text = ''
session.read(flowFile, { inputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
// 在此处对text进行处理
} as InputStreamCallback)
使用回调向传出流文件写入内容
用例
你想要为传出的流文件生成内容。
方法
使用session对象的write(flowFile, outputStreamCallback)方法。需要一个OutputStreamCallback对象传递给write()方法。注意,由于OutputStreamCallback是一个对象,默认情况下内容仅对该对象可见。如果你需要在write()方法之外使用数据,请使用作用域更全局的变量。示例将向流文件写入一个示例字符串。
Groovy示例
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
if (!flowFile) return
flowFile = session.get()
// 将带有outputStream参数的闭包转换为OutputStreamCallback
def text = 'Hello world!'
flowFile = session.write(flowFile, { outputStream ->
outputStream.write(text.getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
使用回调用更新后的内容覆盖传入的流文件
用例
你想要重用传入的流文件,但想要为传出的流文件修改其内容。
方法
使用session对象的write(flowFile, streamCallback)方法。需要一个StreamCallback对象传递给write()方法。StreamCallback提供了一个InputStream(来自传入的流文件)和一个outputStream(用于该流文件的下一个版本),所以你可以使用InputStream获取流文件的当前内容,然后修改它们并写回到流文件中。这将覆盖流文件的内容,所以如果要追加内容,你必须通过追加读取的内容来处理,或者使用不同的方法(使用session.append()而不是session.write())。注意,由于StreamCallback是一个对象,默认情况下内容仅对该对象可见。如果你需要在write()方法之外使用数据,请使用作用域更全局的变量。示例将反转传入流文件的内容(假设是一个字符串),并将反转后的字符串写入流文件的新版本中。
Groovy示例
import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
if (!flowFile) return
flowFile = session.get()
def text = 'Hello world!'
// 将带有inputStream和outputStream参数的闭包转换为StreamCallback
flowFile = session.write(flowFile, { inputStream, outputStream ->
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(text.reverse().getBytes(StandardCharsets.UTF_8))
session.transfer(flowFile, REL_SUCCESS)
} as StreamCallback)
处理脚本处理过程中的错误
用例
脚本中发生错误(无论是数据验证错误还是抛出的异常),你希望脚本能够优雅地处理它。
方法
对于异常,使用脚本语言的异常处理机制(通常是try / catch块)。对于数据验证,你可以使用类似的方法,但定义一个布尔变量(如“valid”)和一个if / else子句,而不是try / catch子句。ExecuteScript定义了“成功”和“失败”关系;通常你的try / catch子句中,处理过程会将“好”的流文件转移到成功关系,将“坏”的流文件转移到失败关系(在后一种情况下记录错误)。
Groovy示例
flowFile = session.get()
if (!flowFile) return
try {
// 此处可能会抛出异常的操作
// 最后一个操作是转移到成功关系(失败情况在catch块中处理)
} catch (e) {
log.error('Something went wrong', e)
session.transfer(flowFile, REL_FAILURE)
}
属性
名称 | 描述 | API名称 | 表达式语言作用域 | 敏感 | 是否必需 |
模块目录 | 包含脚本所需模块的文件和 / 或目录路径的逗号分隔列表。 | 模块目录 | JVM级别定义的环境变量和系统属性 | 否 | 否 |
脚本主体 | 要执行的脚本主体。只能使用脚本文件或脚本主体中的一个。 | 脚本主体 | 不支持 | 否 | 否 |
脚本引擎 | 用于执行脚本的语言引擎。 | 脚本引擎 | 不支持 | 否 | 是 |
脚本文件 | 要执行的脚本文件的路径。只能使用脚本文件或脚本主体中的一个。 | 脚本文件 | JVM级别定义的环境变量和系统属性 | 否 | 否 |
脚本引擎绑定属性 | 使用动态属性的值更新由动态属性的键指定的脚本引擎属性。 | 脚本引擎绑定属性 | 流文件属性 | 否 | 否 |
状态管理
作用域 | 描述 |
LOCAL、CLUSTER | 脚本可以使用状态管理API存储和检索状态。有关更多详细信息,请参阅开发者指南的状态管理器部分。 |