基本使用
1、创建普通的Maven工程。
2、pom文件导入依赖。
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
3、测试连接、基本CRUD操作。
public class TestZKServer {
//普通连接、集群连接
//private static final String SERVERSTRING = "116.62.44.5:2184";
private static final String SERVERSTRING = "116.62.44.5:2181,116.62.44.5:2182,116.62.44.5:2183";
private static ZkClient zkClient=null;
static {
zkClient=new ZkClient(SERVERSTRING,10000,10000);
}
public static void main(String[] args) {
//create(路径,数据, 节点类型 持久的)
zkClient.create("/java", "java", CreateMode.PERSISTENT);
//修改
zkClient.writeData("/java", "hello");
//查询
Object data = zkClient.readData("/java");
System.out.println(data);
//删除
zkClient.delete("/java");
System.out.println("操作成功zkClient");
}
}
写String类型
1、创建CustomSerializer
public class CustomSerializer implements ZkSerializer {
/** default utf 8*/
private String charset = "UTF-8";
public CustomSerializer(){
}
public CustomSerializer(String charset){
this.charset = charset;
}
public byte[] serialize(Object data) throws ZkMarshallingError {
try{
byte[] bytes = String.valueOf(data).getBytes(charset);
return bytes;
}catch (UnsupportedEncodingException e){
throw new ZkMarshallingError("Wrong Charset:" + charset);
}
}
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
String result=null;
try {
result = new String(bytes,charset);
} catch (UnsupportedEncodingException e) {
throw new ZkMarshallingError("Wrong Charset:" + charset);
}
return result;
}
}
2、测试
public class TestZKStringData {
// private static final String SERVERSTRING = "116.62.44.5:2184";
private static final String SERVERSTRING = "116.62.44.5:2181,116.62.44.5:2182,116.62.44.5:2183";
private static ZkClient zkClient=null;
static {
zkClient=new ZkClient(SERVERSTRING,10000,10000,new CustomSerializer());
}
public static void main(String[] args) {
zkClient.create("/date1", "data1", CreateMode.PERSISTENT);
Object data = zkClient.readData("/date1");
System.out.println(data);
System.out.println("操作成功zkClient");
}
}
写对象
1、pom文件引入fastjson
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.66</version>
</dependency>
2、创建User类
public class User implements Serializable {
private Integer id;
private String name;
//...
}
3、创建序列化工具类UserSerializer
public class UserSerializer implements ZkSerializer {
/** default utf 8*/
private String charset = "UTF-8";
public UserSerializer(){}
public UserSerializer(String charset){
this.charset = charset;
}
//序列化
public byte[] serialize(Object data) throws ZkMarshallingError {
String jsonString = JSON.toJSONString(data);
System.out.println(jsonString);
try{
byte[] bytes = String.valueOf(jsonString).getBytes(charset);
return bytes;
}catch (UnsupportedEncodingException e){
throw new ZkMarshallingError("Wrong Charset:" + charset);
}
}
//反序列化
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
User user=null;
try {
String result = new String(bytes,charset);
System.out.println(result);
user = JSON.parseObject(result, User.class);
} catch (UnsupportedEncodingException e) {
throw new ZkMarshallingError("Wrong Charset:" + charset);
}
return user;
}
}
4、测试
public class TestZKUserData {
// private static final String SERVERSTRING = "116.62.44.5:2184";
private static final String SERVERSTRING = "116.62.44.5:2181,116.62.44.5:2182,116.62.44.5:2183";
private static ZkClient zkClient=null;
static {
zkClient=new ZkClient(SERVERSTRING,10000,10000,new UserSerializer());
}
public static void main(String[] args) {
//zkClient.create("/date1", new Date(), CreateMode.PERSISTENT);
//Object data = zkClient.readData("/date1");
//System.out.println(data);
//存放用户对象
User user=new User(1,"张三");
// String res = zkClient.create("/user", user, CreateMode.PERSISTENT);
// System.out.println(res);
Object readData = zkClient.readData("/user");
System.out.println(readData.getClass().getSimpleName());
System.out.println("操作成功zkClient");
}
}
订阅监控
监听器原理
1、首先要有一个main()线程
2、在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
3、通过connet线程将注册的监听事件发送给Zookeeper。
4、在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。
5、Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。
6、listener线程内部调用了process()方法。
常见的监听
1、监听节点数据的变化:get path [watch]
。
2、监听子节点增减的变化:ls path [watch]
。
public class TestZKSubData {
// private static final String SERVERSTRING = "116.62.44.5:2184";
private static final String SERVERSTRING = "116.62.44.5:2181,116.62.44.5:2182,116.62.44.5:2183";
private static ZkClient zkClient=null;
static {
zkClient=new ZkClient(SERVERSTRING,10000,10000,new CustomSerializer());
}
public static void main(String[] args) throws IOException {
//创建java
//zkClient.create("/java","java", CreateMode.PERSISTENT);
//zkClient.create("/java/java1","java1", CreateMode.PERSISTENT);
//zkClient.create("/java/java2","java2", CreateMode.PERSISTENT);
//zkClient.create("/java/java3","java3", CreateMode.PERSISTENT);
//zkClient.create("/java/java4","java4", CreateMode.PERSISTENT);
//监听,参数1 监听的节点,参数2 方法回调
/*zkClient.subscribeDataChanges("/java",new IZkDataListener(){
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("节点数据发生变更"+dataPath+" "+data);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("节点被删除"+dataPath);
}
});*/
//监听子节点数据的添加 和删除
zkClient.subscribeChildChanges("/java", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("父节点路径:"+parentPath);
System.out.println("变更之后的子节点的信息");
for (String currentChild : currentChilds) {
System.out.println(currentChild);
}
}
});
//不能让程序退出
System.in.read();
System.out.println("操作成功zkClient");
}
}
全局自增id生成器
public class ZkIdGenerator {
private static final String SERVERSTRING = "116.62.44.5:2181,116.62.44.5:2182,116.62.44.5:2183";
private static ZkClient zkClient=null;
static {
zkClient=new ZkClient(SERVERSTRING,10000,10000);
}
//得到下一个版本号
public static Integer next(String path){
if(!zkClient.exists(path)){
zkClient.create(path,new byte[0], CreateMode.PERSISTENT);
}
Stat stat = zkClient.writeDataReturnStat(path, new byte[0], -1);
int version = stat.getVersion();
return version;
}
public static void main(String[] args) {
for (int i = 0; i <1000 ; i++) {
System.out.println(next("/id-gen"));
}
}
}
评论区