1. zookeeper下载
ZooKeeper 在 Java 版本 1.8 或更高版本中运行(JDK 8 LTS、JDK 11 LTS、JDK 12 - 不支持 Java 9 和 10)。它作为 ZooKeeper 服务器的集合运行。三个 ZooKeeper 服务器是整体建议的最小规模,我们还建议它们在单独的机器上运行。ZooKeeper 通常部署在专用的 RHEL 机器上,配备双核处理器、2GB RAM 和 80GB IDE 硬盘驱动器。
- 从https://zookeeper.apache.org/releases.html下载最新稳定版本(3.7.1)
因为从外网下载太慢,我们可以去阿里云找到相同版本号的镜像下载。
wget https://mirrors.aliyun.com/apache/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
- 验证文件完整性(可选)
这里博主就不做了,大家可以自行参照官网
- 解压
tar -xzf apache-zookeeper-3.7.1-bin.tar.gz
- 启动
# 进入到解压后文件下的conf文件夹中,将zoo_sample.cfg 复制一份,参考内容如下
# 每个时钟周期的毫秒数
tickTime=2000
# 初始同步阶段可以使用的时钟周期数
initLimit=10
# 在发送请求和接收确认之间可以经过的时钟周期数
syncLimit=5
# 快照存储的目录
# 不要将存储目录设置为 /tmp,这里只是一个示例
dataDir=/tmp/zookeeper
# 客户端连接的端口
clientPort=2181
# 最大客户端连接数
# 如果需要处理更多的客户端,请增加此值
#maxClientCnxns=60
#
# 在打开自动清理之前,请确保阅读维护部分的管理员指南。
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# 在 dataDir 中保留的快照数量
#autopurge.snapRetainCount=3
# 自动清理任务间隔(小时)
# 设置为 "0" 禁用自动清理功能
#autopurge.purgeInterval=1
## 指标提供程序
#
# https://prometheus.io 指标导出器
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
这里我看很多其它博客都配了一个dataLogDir=压缩目录/logs,但是我发现zk这个目录下默认就是有这个目录的
再运行下面指令即可单机启动
bin/zkServer.sh start
如果没有下载jdk
sudo yum install java-1.8.0-openjdk
- 连接到zookeeper
bin/zkCli.sh -server 127.0.0.1:2181
2. zookeeper集群搭建
- 设置堆大小
这对于避免交换非常重要,交换会严重降低 ZooKeeper 的性能。要确定正确的值,请使用负载测试,并确保远低于导致交换的使用限制。保守一点 - 对于 4GB 机器,使用最大堆大小 3GB。 --官网
- 修改配置
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=5
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=2
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/zookeeper/snapshots
# the port at which the clients will connect
clientPort=2183
autopurge.snapRetainCount=500
autopurge.purgeInterval=24
server.1=0.0.0.0:2888:3888#请务必注意,如果这个配置文件放在本机上,这里用0.0.0.0,不要用本机ip,我就这里被坑了
server.2=ip:2888:3888#前者为仲裁端口,后者为选举端口
server.3=ip:2888:3888
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
- 启动+指定堆大小
JVMFLAGS="-Xmx2g -Xms512m" ./bin/zkServer.sh start# 前者是最大堆,后者是初始堆
如果出现:
QuorumCnxManager$Listener$ListenerHandler@1099] - Exception while listening java.net.BindException: Cannot assign requested address (Bind failed)
请将本机端口改为0.0.0.0而不是使用ip
3. Java项目使用集群
博主手搓的RPC框架,项目地址:
也是为的这个才写这博客,哈哈
- 导入依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>5.1.0</version>
</dependency>
- 创建接口
package cn.jiuyou.serviceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import java.util.Collection;
/**
* {@code @Author: } JiuYou
* {@code @Date: } 2023/7/18
* {@code @Description: }服务注册与发现,应该有注册,取消注册和服务发现的功能
*/
public interface Discovery {
/**
* 注册服务
*
* @param serviceInstance 服务实例
* @throws Exception 注册失败抛出异常
*/
void registerService(ServiceInstance<String> serviceInstance) throws Exception;
/**
* 取消注册服务
*
* @param serviceInstance 服务实例
* @throws Exception 取消注册失败抛出异常
*/
void unregisterService(ServiceInstance<String> serviceInstance) throws Exception;
/**
* 查询服务
*
* @param serviceName 服务名称
* @return 服务实例集合
* @throws Exception 查询失败抛出异常
*/
Collection<ServiceInstance<String>> queryForInstances(String serviceName) throws Exception;
}
- 接口实现
package cn.jiuyou.serviceDiscovery.impl;
import cn.jiuyou.serviceDiscovery.Discovery;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import static cn.jiuyou.constant.Constants.*;
/**
* {@code @Author: } JiuYou
* {@code @Date: } 2023/7/18
* {@code @Description: }
*/
//单例模式,
public class ZookeeperServiceDiscovery implements Discovery {
private final CuratorFramework client;
private final ServiceDiscovery<String> serviceDiscovery;
private volatile static ZookeeperServiceDiscovery INSTANCE;
public static ZookeeperServiceDiscovery getInstance() throws Exception {
if (INSTANCE == null) {
synchronized (ZookeeperServiceDiscovery.class) {
if (INSTANCE == null) {
INSTANCE = new ZookeeperServiceDiscovery();
}
}
}
return INSTANCE;
}
private ZookeeperServiceDiscovery() throws Exception {
client = CuratorFrameworkFactory.newClient(CONNECTION_STRING, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS, new ExponentialBackoffRetry(1000, 3));
client.start();
//创建 ServiceDiscovery实例
serviceDiscovery = ServiceDiscoveryBuilder.builder(String.class)
.client(client)
.basePath(BASE_PATH)
.serializer(new JsonInstanceSerializer<>(String.class))
.build();
serviceDiscovery.start();
}
@Override
public void registerService(ServiceInstance<String> serviceInstance) throws Exception {
serviceDiscovery.registerService(serviceInstance);
}
@Override
public void unregisterService(ServiceInstance<String> serviceInstance) throws Exception {
serviceDiscovery.unregisterService(serviceInstance);
}
@Override
public Collection<ServiceInstance<String>> queryForInstances(String serviceName) throws Exception {
return serviceDiscovery.queryForInstances(serviceName);
}
public void close() throws Exception {
serviceDiscovery.close();
client.close();
}
/**
* 负载均衡策略,随机,后续会有补充权重,轮询以及hash
*/
public ServiceInstance<String> randomChooseInstance(String serviceName) throws Exception {
Collection<ServiceInstance<String>> instances = queryForInstances(serviceName);
if (instances.isEmpty()) {
throw new RuntimeException("No instances available for service: " + serviceName);
}
List<ServiceInstance<String>> instanceList = new ArrayList<>(instances);
int randomIndex = new Random().nextInt(instanceList.size());
return instanceList.get(randomIndex);
}
}
- 编写测试
private ZookeeperServiceDiscovery discovery;
@Before
public void setup() throws Exception {
discovery = ZookeeperServiceDiscovery.getInstance();
}
@Test
public void addService() throws Exception {
// 构建 ServiceInstance 对象
ServiceInstance<String> serviceInstance = ServiceInstance.<String>builder()
.name("userService")
.address(HOST)
.port(PORT)
.payload("10")
.build();
try {
// 注册服务
discovery.registerService(serviceInstance);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void findService() throws Exception {
// 构建 ServiceInstance 对象
Collection<ServiceInstance<String>> serviceInstances = discovery.queryForInstances("userService");
System.out.println(serviceInstances);
}
- 测试结果