Zookeeper概述

发布时间 2023-07-18 01:00:51作者: 突破铁皮

Zookeeper的功能

1.统一命名服务:对应用和服务进行统一命名

2.统一配置服务:在分布式系统下,让所有节点的配置信息一致,便于修改配置信息

3.统一管理服务:可以监控每个节点的状态变换,同时获取信息并作出调整

4.服务器动态上下线:客户端可以实时洞察服务器上下线变化

5.软负载均衡:可以根据每个节点的访问情况把任务优先交给访问数量较少的节点

Zookeeper工作特点

zookeeper集群有一个leader多个follower,只有leader有写权限,folower只有读权限

客户端可以访问集群中的任意一个节点,如果有写操作的话需要上报给leader节点,leader节点会先写然后把一部分任务分配给follower

Zookeeper客户端命令行

打开命令行

一些基本操作

create:创建节点,带-e就是临时节点,会话结束就会自动清除,-s是带序号的节点

delete:删除节点,deleteall可以递归删除

下面是创建监听的

监听主要分为两类:一个是数据的监听,一个是节点下子节点数量的监听

监听是注册一次,监听一次,不会重复监听

get -w:对节点值进行监听

ls -w:对节点的子节点数量进行监听

可以看到上面的监听只执行了一次,如果需要多次执行则需要多次注册

Zookeeper API

zookeeper锁机制:每次对于每个客户端都注册一个序号,每次都有最小的序号获取锁,不是最小的序号就创建监听上一个序号,如果上一个序号用完释放锁,就马上拿过来

package org.example.zookeeper;

import lombok.SneakyThrows;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZookeeperService {
    private final int sessionTimeout=2000;
    private ZooKeeper zk;
    private CountDownLatch connectionDownLatch=new CountDownLatch(1);
    private CountDownLatch waitDownLatch=new CountDownLatch(1);
    private String lastPath;
    private String myNode;
    //获取连接
    @SneakyThrows
    public void getConnection(String url){
        zk=new ZooKeeper(url, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if(watchedEvent.getState()== Event.KeeperState.SyncConnected){
                    System.out.println("连接成功");
                    connectionDownLatch.countDown();
                }
                if(watchedEvent.getType()== Event.EventType.NodeDeleted && watchedEvent.getPath().equals(lastPath)){
                    waitDownLatch.countDown();
                }
                watchChildren("/scp");
            }
        });
        System.out.println(connectionDownLatch.getCount());
        connectionDownLatch.await();
        if(!check("/locks")){
            zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        }
    }
    //创建永久节点
    @SneakyThrows
    public void createPerpetualNode(String hostname,boolean flag){
        if(!flag) zk.create("/scp/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        else zk.create("/scp/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
    }
    //创建临时节点
    @SneakyThrows
    public void createTemporaryNode(String hostname,boolean flag){
        if(!flag) zk.create("/scp/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        else zk.create("/scp/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }
    //创建监听
    @SneakyThrows
    public void watchChildren(String path){
        List<String> children=zk.getChildren(path,true);
        List<String> data=new ArrayList<>();
        for (String i:children){
            data.add(new String(i));
        }
        System.out.println(data);
    }
    //判断节点
    @SneakyThrows
    public boolean check(String path){
        if(zk.exists(path,false)==null){
            System.out.println("该节点不存在");
            return false;
        }
        else {
            System.out.println("该节点存在");
            return true;
        }
    }
    //创建锁
    @SneakyThrows
    public void getLock(){
        myNode=zk.create("/locks/user-",null, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL).substring("/locks/".length());
        List<String> children=zk.getChildren("/locks",false);
        Collections.sort(children);
        if(children.size()==0) System.out.println("没有节点");
        else if(children.get(0).equals(myNode)){
            System.out.println("本节点最小,可获取?");
        }
        else {
            int index=children.indexOf(myNode);
            lastPath="/locks/"+children.get(index-1);
            zk.getData(lastPath,true,null);
            waitDownLatch.await();
        }
    }
    //释放锁
    @SneakyThrows
    public void delLock(){
        zk.delete("/locks/"+myNode,-1);
    }
}
package org.example.zookeeper;

import lombok.SneakyThrows;

import java.sql.Time;

public class ZookeeperTest1 {
    @SneakyThrows
    public static void main(String[] args) {
        ZookeeperService zookeeperService=new ZookeeperService();
        zookeeperService.getConnection("billsaifu:2181");
        for(int i=1;i<11;i++){
        zookeeperService.createTemporaryNode(String.format("scp-%03d",i),false);
        }
        new Thread(new Runnable() {
            @Override
            @SneakyThrows
            public void run() {
                ZookeeperService zookeeperService=new ZookeeperService();
                zookeeperService.getConnection("billsaifu:2181");
                zookeeperService.getLock();
                System.out.println("线程1获取?");
                Thread.sleep(5000);
                zookeeperService.delLock();
                System.out.println("线程1释放?");
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            @SneakyThrows
            public void run() {
                ZookeeperService zookeeperService=new ZookeeperService();
                zookeeperService.getConnection("billsaifu:2181");
                zookeeperService.getLock();
                System.out.println("线程2获取?");
                Thread.sleep(5000);
                zookeeperService.delLock();
                System.out.println("线程2释放?");
            }
        }).start();

    }
}