zookeeper之curator API

发布时间 2023-11-15 09:54:14作者: hasome

参考:https://www.jianshu.com/p/075f3262938c

概述

Apache Curator是一个比较完善的,由Netflix公司开源的一套ZooKeeper的JAVA客户端框架组件。解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”给Curator予高度评价。

基本API

  • pom
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.1.0</version>
        </dependency>
  • 初始化客户端
curatorFramework = CuratorFrameworkFactory.builder() // 使用工厂类来建造客户端的实例对象
                .connectString(ipPort) // 配置zk服务器IP port
                .sessionTimeoutMs(4000)// 设定会话时间
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))// 设置及重连策略
                .namespace("curator")// 方便管理的命名空间,其实就是一级目录
                .build();// 建立管道
        curatorFramework.start();//启动
  • 创建节点
    public static String createZnode(String path, String value) throws Exception {
        return curatorFramework.create()// 创建Znode
                .creatingParentsIfNeeded()// 如果是多级结点,这里声明如果需要,自动创建父节点
                .withMode(CreateMode.PERSISTENT)// 声明结点类型
                .forPath(path, value.getBytes());// 声明结点路径和值
    }
  • 删除节点
public static boolean  checkExists(String path) throws Exception {
        Stat stat = curatorFramework.checkExists().forPath(path);
        return stat!=null;
    }
  • 节点是否存在
public static boolean  checkExists(String path) throws Exception {
        Stat stat = curatorFramework.checkExists().forPath(path);
        return stat!=null;
    }
  • 修改节点

    public static void updateZnode(String path, String value) throws Exception {
        Stat stat = curatorFramework.checkExists().forPath(path);
        if (stat == null) {
            System.out.println("Znode does not exists");
        } else {
            curatorFramework.setData().withVersion(stat.getVersion()).forPath(path, value.getBytes());
        }
    }

  • 查询节点

    public static String getZnodeData(String path) throws Exception {
        byte[] dataBytes = curatorFramework.getData().forPath(path);
        return new String(dataBytes);
    }
  • 查询子节点
    public static List<String> getnodeChildren(String path) throws Exception {
        List<String> dataList = curatorFramework.getChildren().forPath(path);
        return dataList;
    }

高级API

  • pom
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.1.0</version>
        </dependency>

CacheListener监听器

CacheListener用来监控ZNode的节点. 当节点进行增,删,改时,会触发响应事件。主要有:NodeCacheListener,PathChildrenCacheListener,TreeCacheListener三类监听器

  • NodeCacheListener:只是监听节点是否有变化,无法知道是什么事件
  • PathChildrenCacheListener:监听子节点的增删改事件
  • TreeCacheListener:监听增删改事件,包括自己跟子节点

拿PathChildrenCacheListener举例

        public static void addWatcherWithChildrenCache(String path) throws Exception {

        CuratorCache curatorCache = CuratorCache.builder(curatorFramework, path).build();
        // 缓存数据
        PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
                    throws Exception {
                System.out.println("事件路径:" + pathChildrenCacheEvent.getData().getPath() + "事件类型"
                        + pathChildrenCacheEvent.getType());
            }
        };
        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forPathChildrenCache(path, curatorFramework, pathChildrenCacheListener).build();
        curatorCache.listenable().addListener(listener);
        curatorCache.start();
    }

分布式锁

Curator的分布式锁有四个核心类,分别是:InterProcessMutex、InterProcessSemaphoreMutex、InterProcessReadWriteLock、InterProcessMultiLock。其作用:

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器

API操作

   public void acquire() throws Exception;//获取锁,获取不到锁一直阻塞,zk连接中断则抛异常
   public boolean acquire(long time, TimeUnit unit) throws Exception;//获取锁,超过该时间后,直接返回false,zk连接中断则抛异常
   public void release() throws Exception;//释放锁

分布式计数器

分布式计数器有三个核心类:DistributedAtomicInteger,DistributedAtomicLong。这个两个除了计数范围不同外,没有任何不同。操作也非常简单,跟AtomicInteger大同小异。

increment() //加1
decrement() //减1
compareAndSet(Integer expectedValue, Integer newValue) //cas操作
get() //获取当前值