flink中的Keyed State

发布时间 2023-05-09 22:53:26作者: aminor

Keyed state是指在Flink中与一个特定key相关联的状态。在Flink中,数据被分区并按key分组。当数据流被分区和分组后,每个key都有一个对应的状态,这就是Keyed state。它可以用于计算窗口、聚合操作和连续查询等。Keyed state通常用于在流处理中跟踪关键得分、计数或其他与特定数据点相关的值。

MapState、ListState和ValueState都属于Keyed state的不同类型,它们的作用如下:

  1. MapState:MapState是一种Key-Value集合状态,它允许用户存储和访问与特定key相关联的值。MapState通常用于数据聚合、初始化数据或将一个状态映射到另一个状态。

假设有一个订单流,其中每个订单都有一个唯一的订单号(orderId)和相应的商品数量(itemCount)。我们想要按订单号将订单总数聚合,每次收到一个新订单时更新相应的订单数。此时,我们可以使用MapState来实现:

// 创建一个 MapState 用于存储订单数量
MapState<String, Integer> orderCountMap = getRuntimeContext().getMapState(
    new MapStateDescriptor<>("orderCountMap", Types.STRING, Types.INT));

/**
 * 处理每个订单的方法,统计每个订单的商品数量
 * @param order 表示要被处理的订单对象
 * @throws Exception
 */
public void processElement(Order order) throws Exception {
    String orderId = order.getOrderId(); // 获取订单ID
    Integer itemCount = order.getItemCount(); // 获取订单商品数量
    Integer countSoFar = orderCountMap.get(orderId); // 获取该订单已有的商品数量
    if (countSoFar == null) { // 如果还未有该订单的记录,则初始化为0
        countSoFar = 0;
    }
    countSoFar += order.getItemCount(); // 把新的商品数量加入已有的记录里
    orderCountMap.put(orderId, countSoFar); // 更新该订单的记录
}

  1. ListState:ListState是一种列表状态,它允许用户将多个值存储在一个state单元里。ListState通常用于在某些场景中存储状态,例如窗口聚合中的中间结果。

假设有一个统计每个小时内用户访问网站的用户列表,我们可以使用ListState来实现:

// 创建一个 ListState 用于存储用户列表
ListState<String> userListState = getRuntimeContext().getListState(
    new ListStateDescriptor<String>("userListState", Types.STRING));

/**
 * 处理每个用户访问事件的方法,将用户添加到用户列表中
 * @param event 表示要被处理的用户访问事件对象
 * @throws Exception
 */
public void processElement(UserAccessEvent event) throws Exception {
    long currentTime = event.getTimestamp(); // 获取事件的时间戳
    DateTimeZone timeZone = DateTimeZone.forID("Asia/Shanghai"); // 设置时区
    DateTime dateTime = new DateTime(currentTime, timeZone); // 根据时间戳创建 DateTime 对象
    String hourKey = dateTime.toString("YYYYMMddHH"); // 根据时间戳创建按小时分组的 Key
    String userName = event.getUserName(); // 获取访问用户的用户名
    Iterable<String> userList = userListState.get(); // 获取当前存储的用户列表
    if (Iterables.size(userList) == 0) { // 如果列表为空,则直接添加用户
        userListState.add(userName);
    } else { // 否则,复制一份列表,并在其中查找是否已经存在该用户
        List<String> userListCopy = Lists.newArrayList(userList);
        if (!userListCopy.contains(userName)) { // 如果不存在,则添加该用户
            userListCopy.add(userName);
            userListState.update(userListCopy); // 更新存储的用户列表
        }
    }
}

  1. ValueState:ValueState是一种单值状态,允许用户存储和更新与特定key相关联的单个值。ValueState通常用于记录特定key的最新状态或跟踪特定key的计数器状态。

假设有一个网站流,记录了每次浏览的页面和用户ID,我们想要在特定的时间段内统计用户浏览不同页面的次数。在这种情况下,我们可以使用ValueState来实现:

// 创建一个 ValueState 用于存储当前页面的访问次数
ValueState<Integer> visitCountState = getRuntimeContext().getState(
    new ValueStateDescriptor<>("visitCount", Types.INT));

/**
 * 处理每个页面访问事件的方法,更新该页面的访问次数
 * @param event 表示要被处理的页面访问事件对象
 * @throws Exception
 */
public void processElement(PageVisitEvent event) throws Exception {
    int visitCountSoFar = visitCountState.value() == null ? 0 : visitCountState.value(); // 获取当前页面的访问次数
    visitCountSoFar += 1; // 增加访问次数
    visitCountState.update(visitCountSoFar); // 更新当前页面的访问次数
    output.collect(event); // 发送处理后的事件对象到下游算子
}