Zookeeper入门

发布时间 2023-09-04 19:51:51作者: strongmore

简介

ZooKeeper 是 Apache 软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。可以理解为zookeeper是文件系统+监听通知机制。ZooKeeper 的架构通过冗余服务实现高可用性。

Zookeeper 的设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的接口提供给用户使用。

是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。

使用docker安装

docker pull zookeeper
docker run -d -p 2181:2181 --name zookeeper zookeeper

防火墙开启对端口2181的限制

客户端操作

下载地址

可以说 zookeeper 中的所有存储的数据都是由 znode 组成的,节点也称为 znode,并以 key/value 形式存储数据。整体结构类似于 linux 文件系统的模式以树形结构存储。其中根路径以 / 开头。

zkCli.cmd -server ip:2181
ls / #查看某个路径下目录列表
ls / -R #递归展示
create -s -e /name 'lisi' # -s代表顺序节点(/name会变成/name0000000000), -e代表临时节点(session关闭就会清除)
set /name0000000000 lisi2 #设置节点数据
get /name0000000000 #获取节点数据
stat /name0000000000 #查看节点状态信息
stat -w /name0000000000 #查看节点状态信息并监听数据变更
delete /name0000000000 #删除非空节点
deleteall /name0000000000 #删除任何节点,包括其子节点

java客户端操作

官方提供的客户端(原生API)

<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.5.5</version>
</dependency>
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 官方提供的API
 */
public class TestZookeeper {

    public static void main(String[] args) throws Exception {
        testGetAndSetData();
    }

    private static void testGetAndSetData() throws Exception {
        ZooKeeper zookeeper = createZookeeper();
        //-1表示不关心版本
        zookeeper.setData("/test2", "123".getBytes(), -1);
        byte[] data = zookeeper.getData("/test2", false, new Stat());
        System.out.println(new String(data));
        //设置监听
        zookeeper.getData("/test2", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println(event.getState());
                System.out.println(event.getType());
                System.out.println(event.getPath());
                //SyncConnected
                //NodeDataChanged
                //test2
            }
        },new Stat());
        TimeUnit.SECONDS.sleep(30);
    }

    private static void testCreateNode() throws Exception {
        ZooKeeper zookeeper = createZookeeper();
        zookeeper.create("/test2", "213".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    private static ZooKeeper createZookeeper() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper("ip:2181", 4000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
                    //如果收到了服务端的响应事件,连接成功
                    countDownLatch.countDown();
                }
            }
        });
        countDownLatch.await();
        return zooKeeper;
    }

}

Curator客户端

在原生API之上封装了一层,使用更方便。

<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-framework</artifactId>
  <version>4.0.0</version>
</dependency>
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>4.0.0</version>
</dependency>
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.concurrent.TimeUnit;

/**
 * 也是一个zookeeper客户端,在官方库的基础上又封装了一层,更加好用
 */
public class CuratorDemo {

    public static void main(String[] args) throws Exception {
//        testCreateNode();
        testSetAndGetData();
    }

    private static void testSetAndGetData() throws Exception {
        CuratorFramework curator = createCurator();
        curator.create().withMode(CreateMode.PERSISTENT).forPath("/test2", "222".getBytes());
        curator.setData().forPath("/test2", "123".getBytes());
        curator.getData()
                .usingWatcher(new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        System.out.println(event);
                    }
                })
                .forPath("/test2");
        TimeUnit.SECONDS.sleep(20);
        curator.close();
    }

    private static void testCreateNode() throws Exception {
        CuratorFramework curator = createCurator();
        //临时节点,连接关闭就删除
        curator.create().withMode(CreateMode.EPHEMERAL).forPath("/test2", "222".getBytes());
        TimeUnit.SECONDS.sleep(10);
        curator.close();
    }

    private static CuratorFramework createCurator() {
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .connectString("ip:2181")
                .sessionTimeoutMs(4000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .namespace("")
                .build();
        curatorFramework.start();
        return curatorFramework;
    }

}

参考

zookeeper教程:入门篇
Zookeeper框架Curator使用
Zookeeper入门实战(3)-Curator操作Zookeeper 原创