Note: 可查询状态的客户端API当前处于不断变化的状态,没有提供关于所提供接口的稳定性的保证。在即将到来的FLink版本中,可能会在客户端上中断API更改。
简而言之,这个特性公开了Flink的托管键控(分区)状态(请参见与State)一起工作到外部世界,并允许用户从外部Flink查询作业的状态)。对于某些场景,Queryable状态消除了对具有外部系统(例如键值存储)的分布式操作/事务的需求,而这往往是实际中的瓶颈。此外,此特性对于调试目的可能特别有用。
注意: 当查询状态对象时,可以从并发线程访问该对象,而不需要进行任何同步或复制。这是一个设计选择,因为上面的任何一个都会导致工作延迟的增加,这是我们想要避免的。由于使用Java堆空间的任何状态后端,例如_MemoryStateBackend
或FsStateBackend
,在检索值时都不会与副本一起工作,而是直接引用存储的值,因此读-修改-写入模式是不安全的,并且可能导致可查询的状态服务器由于并发修改而失败。RocksDBStateBackend
不受这些问题的影响。
Architecture 建筑学
在演示如何使用“可查询状态”之前,简要描述组成该状态的实体非常有用。“可查询状态”功能由三个主要实体组成:
QueryableStateClient
(可能)在FLink集群外部运行并提交用户查询,QueryableStateClientProxy
,它运行在每个TaskManager
(_在Flink集群内)上,负责接收客户端的查询,代表他从Responsible Task Manager获取所请求的状态,并将其返回给客户端,以及QueryableStateServer
运行在每个TaskManager
上,负责为本地存储的状态提供服务。
客户端连接到代理之一,并发送与特定密钥“k”关联的状态请求。正如使用State,键控状态组织在_KEY组中]中所述,每个TaskManager
都被分配了许多这些关键组。为了发现哪个TaskManager
负责持有k
的密钥组,代理将询问 JobManager
。根据答案,代理将查询在该 TaskManager
上运行的 QueryableStateServer
,以获得与 k
关联的状态,并将响应转发回客户端。
Activating Queryable State 激活可查询状态
要在Flink集群上启用可查询状态,只需将flink-queryable-state-runtime_2.11-1.7.1.jar
从您Flink distribution,文件夹中复制到lib/
文件夹。否则,无法启用可查询状态功能。
要验证您的群集是否已启用Queryable状态,请检查任何任务管理器的日志:"Started the Queryable State Proxy Server @ ..."
。
Making State Queryable 使状态Queryable
现在您已经激活了集群上的Queryable状态,现在是看看如何使用它的时候了。为了使状态对外部世界可见,需要使用以下方法显式地使其可查询:
QueryableStateStream
是一个方便的对象,充当接收器,并以可查询状态提供传入值,或者- stateDescriptor.setQueryable(String queryableStateName)`方法,它使由状态描述符表示的键状态是可查询的。
以下各节介绍了这两种方法的使用情况。
Queryable State Stream 可查询状态流
在KeyedStream
上调用.asQueryableState(stateName, stateDescriptor)
返回一个 QueryableStateStream
,该 QueryableStateStream
提供其值为Queryable状态。根据状态的类型,asQueryableState()
方法有以下变体:
// ValueState
QueryableStateStream asQueryableState(
String queryableStateName,
ValueStateDescriptor stateDescriptor)
// Shortcut for explicit ValueStateDescriptor variant
QueryableStateStream asQueryableState(String queryableStateName)
// FoldingState
QueryableStateStream asQueryableState(
String queryableStateName,
FoldingStateDescriptor stateDescriptor)
// ReducingState
QueryableStateStream asQueryableState(
String queryableStateName,
ReducingStateDescriptor stateDescriptor)
Note: 没有可查询的ListState
接收器,因为它会导致一个不断增长的列表,该列表可能不会被清理,因此最终会消耗过多的内存。
返回的QueryableStateStream
可视为sink,**不能进一步转换。在内部, QueryableStateStream
被转换为使用所有传入记录来更新可查询状态实例的运算符。 asQueryableState
调用中提供的StateDescriptor
类型暗示了更新逻辑。在如下程序中,键控流的所有记录都将用于通过ValueState.update(value)
更新状态实例:
stream.keyBy(0).asQueryableState("query-name")
这类似于ScalaAPI的flatMapWithState
。
Managed Keyed State 受管理的键控状态
操作符的托管键控状态(请参见使用托管键控State)可以通过StateDescriptor.setQueryable(String queryableStateName)
使适当的状态描述符可查询),如下例所示:
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
descriptor.setQueryable("query-name"); // queryable state name
Note: queryableStateName
参数可以任意选择,仅用于查询。它不一定要和国家的名字相同。
该变型没有限制可以使哪种类型的状态是可查询的。这意味着,这可用于任何ValueState
, ReduceState
, ListState
, MapState
, AggregatingState
和当前已过时的 FoldingState
。
Querying State 查询状态
到目前为止,您已经将集群设置为使用Queryable状态运行,并且声明(有些)您的状态为queryable。现在是了解如何查询此状态的时候了。
为此,您可以使用 QueryableStateClient
辅助类。这可以在flink-queryable-state-client
jar中提供,它必须明确包含在项目 pom.xml
和 flink-core
之间的依赖关系中,如下所示:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-client-java_2.11</artifactId>
<version>1.7.1</version>
</dependency>
有关此问题的详细信息,您可以检查如何设置FLink程序。
QueryableStateClient
将将您的查询提交给内部代理,然后内部代理将处理您的查询并返回最终结果。初始化客户机的唯一要求是提供一个有效的TaskManager
主机名(请记住,每个任务管理器上都运行着一个可查询的状态代理)和代理侦听的端口。更多关于如何配置Configuration节中的代理和状态服务器端口的信息。
QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
在客户端准备好的情况下,要查询与类型K
的密钥相关联的类型 V
的状态,您可以使用以下方法:
CompletableFuture<S> getKvState(
JobID jobId,
String queryableStateName,
K key,
TypeInformation<K> keyTypeInfo,
StateDescriptor<S, V> stateDescriptor)
以上返回CompletableFuture
,最终保存了ID为jobID
的作业的 queryableStateName
标识的可查询状态实例的状态值。key
是您感兴趣的状态,而keyTypeInfo
将告诉flink如何序列化/反序列化它。最后,stateDescriptor
包含有关所请求状态的必要信息,即它的类型(Value
, Reduce
等)以及有关如何序列化/反序列化它的必要信息。
仔细的读者会注意到,返回的未来包含一个类型为 S
、a State
对象的值,其中包含实际值。这可以是Flink支持的任何状态类型:ValueState
, ReduceState
, ListState
, MapState
, AggregatingState
,以及当前废弃的FoldingState
。
Note: 这些状态对象不允许对包含的状态进行修改。您可以使用它们获得状态的实际值,例如使用valueState.get()
,或者迭代包含的 <K, V>
条目,__使用 mapState.entries()
,但是不能修改它们。例如,在返回的列表状态上调用‘add()’方法将抛出一个 UnsupportedOperationException
. 注意: 客户端是异步的,可以由多个线程共享。它需要在未使用时通过 QueryableStateClient.shutdown()
关闭,以便释放资源。
Example 例子
下面的示例扩展了“Counterdinaverage”示例(参见[使用受管理的键状态](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#using-managed-keyed-state)),通过它可以查询并显示如何查询此值:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
Tuple2<Long, Long> currentSum = sum.value();
currentSum.f0 += 1;
currentSum.f1 += input.f1;
sum.update(currentSum);
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
descriptor.setQueryable("query-name");
sum = getRuntimeContext().getState(descriptor);
}
}
一旦在作业中使用,就可以检索作业ID,然后从此运算符查询任何键的当前状态:
QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
// the state descriptor of the state to be fetched.
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);
// now handle the returned value
resultFuture.thenAccept(response -> {
try {
Tuple2<Long, Long> res = response.get();
} catch (Exception e) {
e.printStackTrace();
}
});
Configuration 布局,构造
以下配置参数影响可查询状态服务器和客户端的行为。它们在“QueryableStateOptions”中定义。
State Server 状态服务器
query.server.ports
: 可查询状态服务器的服务器端口范围。如果超过1个任务管理器在同一台计算机上运行,则这可用于避免端口冲突。指定范围可以是:端口:“9123”、一系列端口:“50100-50200”或范围和/或点列表:“50100-50200,50300-50400,51234”。默认端口为9067。query.server.network-threads
: 接收状态服务器传入请求的网络(事件循环)线程数(0=>;#时隙)query.server.query-threads
: 处理/服务状态服务器传入请求的线程数(0=>;#槽)。
Proxy 代理服务器
query.proxy.ports
: 可查询状态代理的服务器端口范围。如果超过1个任务管理器在同一台计算机上运行,则这可用于避免端口冲突。指定范围可以是:端口:“9123”、一系列端口:“50100-50200”或范围和/或点列表:“50100-50200,50300-50400,51234”。默认端口为9069。query.proxy.network-threads
: 接收客户端代理传入请求的网络(事件循环)线程数(0=>;#时隙)query.proxy.query-threads
: 处理/服务客户端代理传入请求的线程数(0=>;#槽)。
Limitations 限制,边界
- 可查询状态生命周期被绑定到作业的生命周期,_例如_TATES在启动时注册Queryable状态,并在处理时注销它。在未来的版本中,需要将其解耦,以便在任务完成后允许查询,并通过状态复制加快恢复速度。
- 关于可用的KvState的通知是通过一个简单的Tell进行的。在未来,这一点应该得到改进,以便在请求和确认方面更加健壮。
- 服务器和客户端跟踪查询的统计信息。它们当前在默认情况下被禁用,因为它们不会在任何地方被曝光。只要有更好的支持通过度量系统发布这些数字,我们就应该启用统计信息。