侧边栏壁纸
博主头像
coydone博主等级

记录学习,分享生活的个人站点

  • 累计撰写 306 篇文章
  • 累计创建 51 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Java操作ZK

coydone
2022-05-19 / 0 评论 / 0 点赞 / 307 阅读 / 6,384 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-04-27,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

基本使用

1、创建普通的Maven工程。

2、pom文件导入依赖。

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.11</version>
</dependency>

3、测试连接、基本CRUD操作。

public class TestZKServer {
    //普通连接、集群连接
	//private static final String SERVERSTRING = "116.62.44.5:2184";
    private static final String SERVERSTRING = "116.62.44.5:2181,116.62.44.5:2182,116.62.44.5:2183";
    private static ZkClient zkClient=null;
    static {
        zkClient=new ZkClient(SERVERSTRING,10000,10000);
    }
    public static void main(String[] args) {
        //create(路径,数据, 节点类型  持久的)
        zkClient.create("/java", "java", CreateMode.PERSISTENT);
        //修改
		zkClient.writeData("/java", "hello");
        //查询
        Object data = zkClient.readData("/java");
        System.out.println(data);
        //删除
        zkClient.delete("/java");
        System.out.println("操作成功zkClient");
    }
}

写String类型

1、创建CustomSerializer

public class CustomSerializer implements ZkSerializer {
    /** default utf 8*/
    private String charset = "UTF-8";

    public CustomSerializer(){

    }

    public CustomSerializer(String charset){
        this.charset = charset;
    }
    public byte[] serialize(Object data) throws ZkMarshallingError {
        try{
            byte[] bytes = String.valueOf(data).getBytes(charset);
            return bytes;
        }catch (UnsupportedEncodingException e){
            throw new ZkMarshallingError("Wrong Charset:" + charset);
        }
    }

    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        String result=null;
        try {
               result = new String(bytes,charset);
        } catch (UnsupportedEncodingException e) {
            throw new ZkMarshallingError("Wrong Charset:" + charset);
        }
        return result;
    }
}

2、测试

public class TestZKStringData {
//    private static final String SERVERSTRING = "116.62.44.5:2184";
    private static final String SERVERSTRING = "116.62.44.5:2181,116.62.44.5:2182,116.62.44.5:2183";
    private static ZkClient zkClient=null;
    static {
        zkClient=new ZkClient(SERVERSTRING,10000,10000,new CustomSerializer());
    }
    public static void main(String[] args) {
        zkClient.create("/date1", "data1", CreateMode.PERSISTENT);
        Object data = zkClient.readData("/date1");
        System.out.println(data);
        System.out.println("操作成功zkClient");
    }
}

写对象

1、pom文件引入fastjson

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.66</version>
</dependency>

2、创建User类

public class User implements Serializable {
    private Integer id;
    private String name;
    //...
}

3、创建序列化工具类UserSerializer

public class UserSerializer implements ZkSerializer {
    /** default utf 8*/
    private String charset = "UTF-8";
    public UserSerializer(){}
    public UserSerializer(String charset){
        this.charset = charset;
    }

    //序列化
    public byte[] serialize(Object data) throws ZkMarshallingError {
        String jsonString = JSON.toJSONString(data);
        System.out.println(jsonString);
        try{
            byte[] bytes = String.valueOf(jsonString).getBytes(charset);
            return bytes;
        }catch (UnsupportedEncodingException e){
            throw new ZkMarshallingError("Wrong Charset:" + charset);
        }
    }
    //反序列化
    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        User user=null;
        try {
            String   result = new String(bytes,charset);
            System.out.println(result);
            user = JSON.parseObject(result, User.class);
        } catch (UnsupportedEncodingException e) {
            throw new ZkMarshallingError("Wrong Charset:" + charset);
        }
        return user;
    }
}

4、测试

public class TestZKUserData {
//    private static final String SERVERSTRING = "116.62.44.5:2184";
    private static final String SERVERSTRING = "116.62.44.5:2181,116.62.44.5:2182,116.62.44.5:2183";
    private static ZkClient zkClient=null;
    static {
        zkClient=new ZkClient(SERVERSTRING,10000,10000,new UserSerializer());
    }
    public static void main(String[] args) {
        //zkClient.create("/date1", new Date(), CreateMode.PERSISTENT);
        //Object data = zkClient.readData("/date1");
        //System.out.println(data);

        //存放用户对象
        User user=new User(1,"张三");
       // String res = zkClient.create("/user", user, CreateMode.PERSISTENT);
       // System.out.println(res);

        Object readData = zkClient.readData("/user");
        System.out.println(readData.getClass().getSimpleName());
        System.out.println("操作成功zkClient");
    }
}

订阅监控

监听器原理

1、首先要有一个main()线程

2、在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。

3、通过connet线程将注册的监听事件发送给Zookeeper。

4、在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。

5、Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。

6、listener线程内部调用了process()方法。

常见的监听

1、监听节点数据的变化:get path [watch]

2、监听子节点增减的变化:ls path [watch]

public class TestZKSubData {
    //    private static final String SERVERSTRING = "116.62.44.5:2184";
    private static final String SERVERSTRING = "116.62.44.5:2181,116.62.44.5:2182,116.62.44.5:2183";
    private static ZkClient zkClient=null;
    static {
        zkClient=new ZkClient(SERVERSTRING,10000,10000,new CustomSerializer());
    }
    public static void main(String[] args) throws IOException {
        //创建java
        //zkClient.create("/java","java", CreateMode.PERSISTENT);
        //zkClient.create("/java/java1","java1", CreateMode.PERSISTENT);
        //zkClient.create("/java/java2","java2", CreateMode.PERSISTENT);
        //zkClient.create("/java/java3","java3", CreateMode.PERSISTENT);
        //zkClient.create("/java/java4","java4", CreateMode.PERSISTENT);

        //监听,参数1  监听的节点,参数2 方法回调
        /*zkClient.subscribeDataChanges("/java",new IZkDataListener(){

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println("节点数据发生变更"+dataPath+"  "+data);
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("节点被删除"+dataPath);
            }
        });*/


        //监听子节点数据的添加 和删除
        zkClient.subscribeChildChanges("/java", new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("父节点路径:"+parentPath);
                System.out.println("变更之后的子节点的信息");
                for (String currentChild : currentChilds) {
                    System.out.println(currentChild);
                }
            }
        });

        //不能让程序退出
        System.in.read();
        System.out.println("操作成功zkClient");
    }
}

全局自增id生成器

public class ZkIdGenerator {
    private static final String SERVERSTRING = "116.62.44.5:2181,116.62.44.5:2182,116.62.44.5:2183";
    private static ZkClient zkClient=null;
    static {
        zkClient=new ZkClient(SERVERSTRING,10000,10000);
    }

    //得到下一个版本号
    public static Integer next(String path){
        if(!zkClient.exists(path)){
            zkClient.create(path,new byte[0], CreateMode.PERSISTENT);
        }
        Stat stat = zkClient.writeDataReturnStat(path, new byte[0], -1);
        int version = stat.getVersion();
        return version;
    }

    public static void main(String[] args) {
        for (int i = 0; i <1000 ; i++) {
            System.out.println(next("/id-gen"));
        }
    }
}
0

评论区