Zookeeper使用实战

发布时间 2023-07-19 01:49:49作者: JiuYou2020

前置阅读:https://javaguide.cn/distributed-system/distributed-process-coordination/zookeeper/zookeeper-plus.html

1. zookeeper下载

ZooKeeper 在 Java 版本 1.8 或更高版本中运行(JDK 8 LTS、JDK 11 LTS、JDK 12 - 不支持 Java 9 和 10)。它作为 ZooKeeper 服务器的集合运行。三个 ZooKeeper 服务器是整体建议的最小规模,我们还建议它们在单独的机器上运行。ZooKeeper 通常部署在专用的 RHEL 机器上,配备双核处理器、2GB RAM 和 80GB IDE 硬盘驱动器。

  1. 从https://zookeeper.apache.org/releases.html下载最新稳定版本(3.7.1)

因为从外网下载太慢,我们可以去阿里云找到相同版本号的镜像下载。

wget https://mirrors.aliyun.com/apache/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
  1. 验证文件完整性(可选)

这里博主就不做了,大家可以自行参照官网

image-20230717214622517
  1. 解压
tar -xzf apache-zookeeper-3.7.1-bin.tar.gz
  1. 启动
# 进入到解压后文件下的conf文件夹中,将zoo_sample.cfg 复制一份,参考内容如下
# 每个时钟周期的毫秒数
tickTime=2000
# 初始同步阶段可以使用的时钟周期数
initLimit=10
# 在发送请求和接收确认之间可以经过的时钟周期数
syncLimit=5
# 快照存储的目录
# 不要将存储目录设置为 /tmp,这里只是一个示例
dataDir=/tmp/zookeeper
# 客户端连接的端口
clientPort=2181
# 最大客户端连接数
# 如果需要处理更多的客户端,请增加此值
#maxClientCnxns=60
#
# 在打开自动清理之前,请确保阅读维护部分的管理员指南。
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# 在 dataDir 中保留的快照数量
#autopurge.snapRetainCount=3
# 自动清理任务间隔(小时)
# 设置为 "0" 禁用自动清理功能
#autopurge.purgeInterval=1

## 指标提供程序
#
# https://prometheus.io 指标导出器
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

这里我看很多其它博客都配了一个dataLogDir=压缩目录/logs,但是我发现zk这个目录下默认就是有这个目录的

再运行下面指令即可单机启动

bin/zkServer.sh start

如果没有下载jdksudo yum install java-1.8.0-openjdk

  1. 连接到zookeeper
bin/zkCli.sh -server 127.0.0.1:2181

2. zookeeper集群搭建

  1. 设置堆大小

这对于避免交换非常重要,交换会严重降低 ZooKeeper 的性能。要确定正确的值,请使用负载测试,并确保远低于导致交换的使用限制。保守一点 - 对于 4GB 机器,使用最大堆大小 3GB。 --官网

  1. 修改配置
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=5
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=2
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/usr/local/zookeeper/snapshots
# the port at which the clients will connect
clientPort=2183
autopurge.snapRetainCount=500
autopurge.purgeInterval=24
server.1=0.0.0.0:2888:3888#请务必注意,如果这个配置文件放在本机上,这里用0.0.0.0,不要用本机ip,我就这里被坑了
server.2=ip:2888:3888#前者为仲裁端口,后者为选举端口
server.3=ip:2888:3888
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
  1. 启动+指定堆大小
JVMFLAGS="-Xmx2g -Xms512m" ./bin/zkServer.sh start# 前者是最大堆,后者是初始堆

如果出现:QuorumCnxManager$Listener$ListenerHandler@1099] - Exception while listening java.net.BindException: Cannot assign requested address (Bind failed)

请将本机端口改为0.0.0.0而不是使用ip

image-20230718034001634

3. Java项目使用集群

博主手搓的RPC框架,项目地址:

也是为的这个才写这博客,哈哈

  1. 导入依赖
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-x-discovery</artifactId>
            <version>5.1.0</version>
        </dependency>
  1. 创建接口
package cn.jiuyou.serviceDiscovery;

import org.apache.curator.x.discovery.ServiceInstance;

import java.util.Collection;

/**
 * {@code @Author: } JiuYou
 * {@code @Date: } 2023/7/18
 * {@code @Description: }服务注册与发现,应该有注册,取消注册和服务发现的功能
 */
public interface Discovery {
    /**
     * 注册服务
     *
     * @param serviceInstance 服务实例
     * @throws Exception 注册失败抛出异常
     */
    void registerService(ServiceInstance<String> serviceInstance) throws Exception;

    /**
     * 取消注册服务
     *
     * @param serviceInstance 服务实例
     * @throws Exception 取消注册失败抛出异常
     */

    void unregisterService(ServiceInstance<String> serviceInstance) throws Exception;

    /**
     * 查询服务
     *
     * @param serviceName 服务名称
     * @return 服务实例集合
     * @throws Exception 查询失败抛出异常
     */
    Collection<ServiceInstance<String>> queryForInstances(String serviceName) throws Exception;
}
  1. 接口实现
package cn.jiuyou.serviceDiscovery.impl;

import cn.jiuyou.serviceDiscovery.Discovery;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;

import static cn.jiuyou.constant.Constants.*;

/**
 * {@code @Author: } JiuYou
 * {@code @Date: } 2023/7/18
 * {@code @Description: }
 */
//单例模式,
public class ZookeeperServiceDiscovery implements Discovery {
    private final CuratorFramework client;
    private final ServiceDiscovery<String> serviceDiscovery;
    private volatile static ZookeeperServiceDiscovery INSTANCE;

    public static ZookeeperServiceDiscovery getInstance() throws Exception {
        if (INSTANCE == null) {
            synchronized (ZookeeperServiceDiscovery.class) {
                if (INSTANCE == null) {
                    INSTANCE = new ZookeeperServiceDiscovery();
                }
            }
        }
        return INSTANCE;
    }

    private ZookeeperServiceDiscovery() throws Exception {
        client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS, new ExponentialBackoffRetry(1000, 3));
        client.start();
        //创建 ServiceDiscovery实例
        serviceDiscovery = ServiceDiscoveryBuilder.builder(String.class)
                .client(client)
                .basePath(BASE_PATH)
                .serializer(new JsonInstanceSerializer<>(String.class))
                .build();

        serviceDiscovery.start();
    }

    @Override
    public void registerService(ServiceInstance<String> serviceInstance) throws Exception {
        serviceDiscovery.registerService(serviceInstance);
    }

    @Override
    public void unregisterService(ServiceInstance<String> serviceInstance) throws Exception {
        serviceDiscovery.unregisterService(serviceInstance);
    }

    @Override
    public Collection<ServiceInstance<String>> queryForInstances(String serviceName) throws Exception {
        return serviceDiscovery.queryForInstances(serviceName);
    }

    public void close() throws Exception {
        serviceDiscovery.close();
        client.close();
    }

    /**
     * 负载均衡策略,随机,后续会有补充权重,轮询以及hash
     */
    public ServiceInstance<String> randomChooseInstance(String serviceName) throws Exception {
        Collection<ServiceInstance<String>> instances = queryForInstances(serviceName);

        if (instances.isEmpty()) {
            throw new RuntimeException("No instances available for service: " + serviceName);
        }

        List<ServiceInstance<String>> instanceList = new ArrayList<>(instances);
        int randomIndex = new Random().nextInt(instanceList.size());

        return instanceList.get(randomIndex);
    }

}
  1. 编写测试
    private ZookeeperServiceDiscovery discovery;

    @Before
    public void setup() throws Exception {
        discovery = ZookeeperServiceDiscovery.getInstance();
    }

    @Test
    public void addService() throws Exception {
        // 构建 ServiceInstance 对象
        ServiceInstance<String> serviceInstance = ServiceInstance.<String>builder()
                .name("userService")
                .address(HOST)
                .port(PORT)
                .payload("10")
                .build();

        try {
            // 注册服务
            discovery.registerService(serviceInstance);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void findService() throws Exception {
        // 构建 ServiceInstance 对象
        Collection<ServiceInstance<String>> serviceInstances = discovery.queryForInstances("userService");
        System.out.println(serviceInstances);
    }
  1. 测试结果

image-20230718033800519