Keyed state是指在Flink中与一个特定key相关联的状态。在Flink中,数据被分区并按key分组。当数据流被分区和分组后,每个key都有一个对应的状态,这就是Keyed state。它可以用于计算窗口、聚合操作和连续查询等。Keyed state通常用于在流处理中跟踪关键得分、计数或其他与特定数据点相关的值。
MapState、ListState和ValueState都属于Keyed state的不同类型,它们的作用如下:
- MapState:MapState是一种Key-Value集合状态,它允许用户存储和访问与特定key相关联的值。MapState通常用于数据聚合、初始化数据或将一个状态映射到另一个状态。
假设有一个订单流,其中每个订单都有一个唯一的订单号(orderId)和相应的商品数量(itemCount)。我们想要按订单号将订单总数聚合,每次收到一个新订单时更新相应的订单数。此时,我们可以使用MapState来实现:
// 创建一个 MapState 用于存储订单数量
MapState<String, Integer> orderCountMap = getRuntimeContext().getMapState(
new MapStateDescriptor<>("orderCountMap", Types.STRING, Types.INT));
/**
* 处理每个订单的方法,统计每个订单的商品数量
* @param order 表示要被处理的订单对象
* @throws Exception
*/
public void processElement(Order order) throws Exception {
String orderId = order.getOrderId(); // 获取订单ID
Integer itemCount = order.getItemCount(); // 获取订单商品数量
Integer countSoFar = orderCountMap.get(orderId); // 获取该订单已有的商品数量
if (countSoFar == null) { // 如果还未有该订单的记录,则初始化为0
countSoFar = 0;
}
countSoFar += order.getItemCount(); // 把新的商品数量加入已有的记录里
orderCountMap.put(orderId, countSoFar); // 更新该订单的记录
}
- ListState:ListState是一种列表状态,它允许用户将多个值存储在一个state单元里。ListState通常用于在某些场景中存储状态,例如窗口聚合中的中间结果。
假设有一个统计每个小时内用户访问网站的用户列表,我们可以使用ListState来实现:
// 创建一个 ListState 用于存储用户列表
ListState<String> userListState = getRuntimeContext().getListState(
new ListStateDescriptor<String>("userListState", Types.STRING));
/**
* 处理每个用户访问事件的方法,将用户添加到用户列表中
* @param event 表示要被处理的用户访问事件对象
* @throws Exception
*/
public void processElement(UserAccessEvent event) throws Exception {
long currentTime = event.getTimestamp(); // 获取事件的时间戳
DateTimeZone timeZone = DateTimeZone.forID("Asia/Shanghai"); // 设置时区
DateTime dateTime = new DateTime(currentTime, timeZone); // 根据时间戳创建 DateTime 对象
String hourKey = dateTime.toString("YYYYMMddHH"); // 根据时间戳创建按小时分组的 Key
String userName = event.getUserName(); // 获取访问用户的用户名
Iterable<String> userList = userListState.get(); // 获取当前存储的用户列表
if (Iterables.size(userList) == 0) { // 如果列表为空,则直接添加用户
userListState.add(userName);
} else { // 否则,复制一份列表,并在其中查找是否已经存在该用户
List<String> userListCopy = Lists.newArrayList(userList);
if (!userListCopy.contains(userName)) { // 如果不存在,则添加该用户
userListCopy.add(userName);
userListState.update(userListCopy); // 更新存储的用户列表
}
}
}
- ValueState:ValueState是一种单值状态,允许用户存储和更新与特定key相关联的单个值。ValueState通常用于记录特定key的最新状态或跟踪特定key的计数器状态。
假设有一个网站流,记录了每次浏览的页面和用户ID,我们想要在特定的时间段内统计用户浏览不同页面的次数。在这种情况下,我们可以使用ValueState来实现:
// 创建一个 ValueState 用于存储当前页面的访问次数
ValueState<Integer> visitCountState = getRuntimeContext().getState(
new ValueStateDescriptor<>("visitCount", Types.INT));
/**
* 处理每个页面访问事件的方法,更新该页面的访问次数
* @param event 表示要被处理的页面访问事件对象
* @throws Exception
*/
public void processElement(PageVisitEvent event) throws Exception {
int visitCountSoFar = visitCountState.value() == null ? 0 : visitCountState.value(); // 获取当前页面的访问次数
visitCountSoFar += 1; // 增加访问次数
visitCountState.update(visitCountSoFar); // 更新当前页面的访问次数
output.collect(event); // 发送处理后的事件对象到下游算子
}