Rpc-实现Client对ZooKeeper的服务监听

发布时间 2023-03-22 19:12:31作者: zko0

1、前言

在上一篇文章中,完成了ZooKeeper注册中心,添加了一个简单的本地缓存

但是,存在一些问题:

  1. 当本地缓存OK,ZooKeeper对应服务有新的实例时,本地缓存不会自动更新
  2. 当ZooKeeper对应服务实例关闭,本地缓存不会监控到实例消失

2、编写

之前我们是将缓存直接放在ZooKeeperClientUtils中的,维护一个Map集合。我们将缓存部分移动到ZooKeeperClientCache中,缓存数据从这里获取:

我们监听树上所有节点的变化情况,对于包含实例的变化,每次获取对应的服务信息,然后通过Clinet查询现存的对应服务的实例,进行更新。

watchPathSet维护了Client调用过的服务集合,对于调用过的服务才开启本地的缓存,并且进行更新。

instances即为本地缓存集合

@Slf4j
public class ZookeeperClientCache {

    private static final Map<String, List<InetSocketAddress>> instances=new ConcurrentHashMap<>();

    private static final Set<String> watchPathSet=new ConcurrentHashSet<>();

    private static CuratorFramework zookeeperClient;

    private static boolean isListening=false;


    //将服务加入监听set中
    public static void addListenService(String service){
        //开启服务监听
        openListen();
        //path路径放入
        watchPathSet.add(ZookeeperUtil.serviceName2Path(service));
    }

    //添加本地缓存,同时开启监听服务
    public static void addLocalCache(String serviceName,List<InetSocketAddress> addressList){
        //直接替换原本的缓存
        instances.put(serviceName,addressList);
        //将服务加入监听set
        addListenService(serviceName);
    }

    public static void cleanLocalCache(String serviceName){
        log.info("服务调用失败,清除本地缓存,重新获取实例===>{}",serviceName);
        instances.remove(serviceName);
    }


    public static boolean containsKey(String serviceName){
        return instances.containsKey(serviceName);
    }

    public static List<InetSocketAddress> getOrDefault(String serviceName){
        return instances.getOrDefault(serviceName,null);
    }

    public static List<InetSocketAddress> getInstances(String serviceName){
        try {
            String path = ZookeeperUtil.serviceName2Path(serviceName);
            //获取路径下所有的实现
            List<String> instancePaths = zookeeperClient.getChildren().forPath(path);
            List<InetSocketAddress> addressList = new ArrayList<>();
            for (String instancePath : instancePaths) {
                byte[] bytes = zookeeperClient.getData().forPath(path+"/"+instancePath);
                String json = new String(bytes);
                InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
                addressList.add(instance);
            }
            return addressList;
        } catch (Exception e) {
            log.error("服务获取失败====>{}",e);
            throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
        }
    }

    private static synchronized void openListen(){
        //已初始化过
        if (isListening){
            return;
        }
        //注入client
        if (zookeeperClient==null) {
            zookeeperClient=ZookeeperUtil.getZookeeperClient();
        }
        TreeCache cache = TreeCache.newBuilder(zookeeperClient, "/cn/zko0/myRpc/api").setCacheData(true).build();
        cache.getListenable().addListener((c, event) -> {
            if ( event.getData() != null )
            {
                System.out.println("type=" + event.getType() + " path=" + event.getData().getPath());
                //可以通过event.type来进行节点的处理,我这里直接多节点每次行为做reload
                if (event.getData().getPath().contains("Service/")){
                    //是服务节点,做更新
                    String path = event.getData().getPath();
                    //去除尾部实例段
                    path=path.substring(0,path.lastIndexOf("/"));
                    String serviceName = ZookeeperUtil.path2ServiceName(path);
                    if (watchPathSet.contains(path)) {
                        log.info("更新本地缓存");
                        List<InetSocketAddress> addressList = getInstances(serviceName);
                        addLocalCache(serviceName,addressList);
                    }
                }
            }
            else
            {
                System.out.println("type=" + event.getType());
            }
        });
        try {
            cache.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        isListening=true;
    }
}

创建完Cache类,只需要修改之前ZooKeeperClientUtils中,从当前类改为Cache类获取即可:

image-20230220133343196

完整代码:

@Slf4j
public class ZookeeperClientUtils {

    private static CuratorFramework client = ZookeeperUtil.getZookeeperClient();

    public static InetSocketAddress searchService(String serviceName, LoadBalancer loadBalancer) {
        InetSocketAddress address;
        //本地缓存查询
        if (ZookeeperClientCache.containsKey(serviceName)){
            List<InetSocketAddress> addressList = ZookeeperClientCache.getOrDefault(serviceName);
            if (!addressList.isEmpty()){
                //使用lb进行负载均衡
                return loadBalancer.select(addressList);
            }
        }
        try {
            String path = ZookeeperUtil.serviceName2Path(serviceName);
            //获取路径下所有的实现
            List<String> instancePaths = client.getChildren().forPath(path);
            List<InetSocketAddress> addressList = new ArrayList<>();
            for (String instancePath : instancePaths) {
                byte[] bytes = client.getData().forPath(path+"/"+instancePath);
                String json = new String(bytes);
                InetSocketAddress instance = InetSocketAddressSerializerUtil.getInetSocketAddressByJson(json);
                addressList.add(instance);
            }
            ZookeeperClientCache.addLocalCache(serviceName,addressList);
            return loadBalancer.select(addressList);
        } catch (Exception e) {
            log.error("服务获取失败====>{}",e);
            throw new RpcException(RpcError.SERVICE_NONE_INSTANCE);
        }
    }
}

3、测试

实现上述代码,下面是服务监听的简单测试

开启Server,Client:

image-20230220134850987

关闭Server,Server自动进行服务的注销:

image-20230220135154126

Client服务监控:

image-20230220135320003