一、概述
Curator是 Netflix 開源的一套 zookeeper 客户端框架,解決原生 Api 的好多問題。
二、添加依賴
<!-- 對zookeeper的底層api的一些封裝 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<!-- 封裝了一些高級特性,如:Cache事件監聽、選舉、分佈式鎖、分佈式Barrier -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
三、創建連接
package com.snails.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.retry.RetryUntilElapsed;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class CuratorConection {
private String IP = "127.0.0.1:2181";
CuratorFramework curatorFramework = null;
@Before
public void brfore() {
ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
curatorFramework = CuratorFrameworkFactory.builder()
//IP地址端口號
.connectString(IP)
//會話超時時間
.sessionTimeoutMs(5000)
//重連機制
.retryPolicy(exponentialBackoffRetry)
//命名空間
.namespace("create")
//構建連接對象
.build();
curatorFramework.start();
System.out.println(curatorFramework.isStarted());
}
@After
public void after() {
curatorFramework.close();
}
@Test
public void test() {}
}
四、session重連策略
1、3秒後重連一次,只重連一次
new RetryOneTime(3000)
2、每3秒重連一次,重試3次
new RetryNTimes(3, 3000)
3、每3秒重連一次,總等待時間超過10秒後停止重連
new RetryUntilElapsed(1000, 3000)
4、根據公式獲取重試時間 baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount +1)))
new ExponentialBackoffRetry(1000, 3)
五、創建節點
@Test
public void create01() throws Exception {
curatorFramework.create()
//節點類型
.withMode(CreateMode.PERSISTENT)
//節點權限列表
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
//會加上命名空間指定的名稱
.forPath("/tsing", "測試數據".getBytes());
System.out.println("end");
}
//自定義權限列表
@Test
public void create02() throws Exception {
ArrayList<ACL> list = new ArrayList<>();
Id id = new Id("ip", "127.0.0.1");
list.add(new ACL(ZooDefs.Perms.ALL, id));
curatorFramework
.create()
.withMode(CreateMode.PERSISTENT)
.withACL(list)
.forPath("/tsing1", "tsing1".getBytes());
}
//遞歸創建節點
@Test
public void create03() throws Exception {
curatorFramework
.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/tsing2/child1", "tsing2".getBytes());
}
六、修改節點
@Test
public void set01() throws Exception {
curatorFramework
.setData()
.forPath("/nodo", "set-node-update".getBytes());
System.out.println("end");
}
七、刪除節點
@Test
public void delete01() throws Exception {
curatorFramework
.delete()
.forPath("/node");
}
//刪除包含子節點的節點
@Test
public void delete02() throws Exception {
curatorFramework
.delete()
.deletingChildrenIfNeeded()
.forPath("/node");
}
八、查看節點
@Test
public void get01() throws Exception {
byte[] bytes = curatorFramework
.getData()
.forPath("/node");
System.out.println(new String(bytes));
}
//查看子節點數據
@Test
public void get02() throws Exception {
List<String> list = curatorFramework
.getChildren()
.forPath("/node");
for (String str : list) {
System.out.println(str);
}
}
九、查看節點是否存在
@Test
public void exists01() throws Exception {
Stat stat = curatorFramework
.checkExists()
.forPath("/node");
System.out.println(stat.getVersion());
}
十、事務
@Test
public void set01() throws Exception {
curatorFramework
//開啓事務
.inTransaction()
.create().forPath("/node3", "node3".getBytes())
.and()
.setData().forPath("/node4", "node4".getBytes()).and()
//提交事務
.commit();
System.out.println("end");
}