zookeeper系列(一)基础

@author stormma
@date 2018/03/21


生命不息,奋斗不止!


zookeeper简介

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

zookeeper单机环境搭建

获取zookeeper

1
2
3
4
cd /usr/local
wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz
tar -zxvf zookeeper-3.4.10.tar.gz
mv zookeeper-3.4.10 zookeeper

修改配置, zookeeper默认读取zoo.cfg这个配置

1
mv zoo_sample.cfg zoo.cfg

修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/zookeeper/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

创建/usr/local/zookeeper/data目录

1
2
cd ..
mkdir data

启动zookeeper

1
2
cd /usr/local/zookeeper/bin
./zkServer.sh start

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
telnet stormma.me 2181
Trying 139.199.27.243...
Connected to stormma.me.
Escape character is '^]'.
stat
Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
Clients:
/113.140.11.4:52857[0](queued=0,recved=1,sent=0)
Latency min/avg/max: 0/0/43
Received: 165153
Sent: 165159
Connections: 1
Outstanding: 0
Zxid: 0x6b
Mode: standalone
Node count: 10
Connection closed by foreign host.

zookeeper集群环境搭建

zookeeper集群环境和单机搭建过程差不多, 准备n个服务器(n >= 3 && (n & 1) == 1), 因为zookeeper集群超过一半的机器可用时, zookeeper集群才能提供服务, 所以保证zookeeper集群数量最好是奇数个。

1
2
3
n1: 192.168.1.101
n2: 192.168.1.102
n3: 192.168.1.103

获取配置, 略

修改配置文件
三台机器执行以下指令

1
2
mv zoo_sample.cfg zoo.cfg
vi zoo.cfg

zoo.cfg文件添加以下配置

1
2
3
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888

创建data目录, 并创建myid文件

1
2
3
mkdir /usr/local/zookeeper/data
cd /usr/local/zookeeper/data
echo "当前操作机器编号" > myid

启动时, 依次启动三台机器, 当三台中的两台启动之后, 该集群即可正常工作。

ZkClient的使用

加入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
</dependencies>

创建会话

1
2
3
4
5
6
7
public class CreateSession {
public static void main(String[] args) {
String conn = "stormma.me:2181";
ZkClient client = new ZkClient(conn);
client.close();
}
}

创建节点

持久性节点

1
2
3
4
5
6
7
8
9
public class CreateNode {
public static void main(String[] args) {
String serverString = "stormma.me:2181";
ZkClient client = new ZkClient(serverString);
String path = client.create("/stormma", "blog.stormma.me", CreateMode.PERSISTENT);
System.out.println("created path: " + path);
client.close();
}
}

临时性节点

临时性节点, session关闭之后, 节点就被zk删除。

1
2
3
4
5
6
7
8
9
public class CreateNode {
public static void main(String[] args) {
String serverString = "stormma.me:2181";
ZkClient client = new ZkClient(serverString);
String path = client.create("/stormma", "blog.stormma.me", CreateMode.EPHEMERAL);
System.out.println("created path: " + path);
client.close();
}
}

获取节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class GetNode {
public static void main(String[] args) {
String serverString = "stormma.me:2181";
ZkClient client = new ZkClient(serverString);
// 读取节点数据
Object obj = client.readData("/dubbo");
System.out.println(obj);
List<String> list = client.getChildren("/dubbo");
// 获取子节点
for (String children : list) {
System.out.println(children);
}
client.close();
}
}

判断节点是否存在

1
2
3
4
5
6
7
8
9
public class NodeExists {
public static void main(String[] args) {
String serverString = "stormma.me:2181";
ZkClient client = new ZkClient(serverString);
String path = "/dubbo";
boolean exists = client.exists(path);
client.close();
}
}

删除节点

1
2
3
4
5
6
7
8
public class DeleteNode {
public static void main(String[] args) {
String serverString = "stormma.me:2181";
ZkClient client = new ZkClient(serverString);
client.delete("/duubo");
client.deleteRecursive("/stormma");
}
}

订阅事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class SubscribeEvent {
/**
* 子节点变化事件订阅
*/
static class SubscribeChildChanges implements IZkChildListener {
public void handleChildChange(String s, List<String> list) throws Exception {
// s是父path, list是当前孩子节点
System.out.println(s);
System.out.println(list.toString());
}
}
/**
* 数据改变事件订阅
*/
static class SubscribeDataChanges implements IZkDataListener {
public void handleDataChange(String s, Object o) throws Exception {
System.out.println(s + ":" + o.toString());
}
public void handleDataDeleted(String s) throws Exception {
System.out.println(s);
}
}
}

Curator的使用

引入依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.8.0</version>
</dependency>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import java.util.List;
public class CuratorDemo {
private static final String connectionString = "stormma.me:2181";
private static final int sessionTimeoutMs = 5000;
private static final int connectionTimeoutMs = 5000;
public CuratorFramework createSession() {
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString(connectionString)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(retryPolicy)
.build();
return client;
}
public String createNode(String path) throws Exception {
CuratorFramework client = createSession();
client.start();
String res = client.create()
.creatingParentsIfNeeded() // 父节点不存在,先创建父节点
.withMode(CreateMode.PERSISTENT)
.forPath("/users/stormma", "blog.stormma.me".getBytes());
client.close();
return res;
}
public void deleteNode(String path) throws Exception {
CuratorFramework client = createSession();
client.start();
client.delete()
.guaranteed()
.deletingChildrenIfNeeded()
.withVersion(-1)
.forPath(path);
client.close();
}
public List<String> getChildren(String path) throws Exception {
CuratorFramework client = createSession();
client.start();
List<String> nodes = client.getChildren()
.forPath("/users");
client.close();
return nodes;
}
public byte[] getData(String path) throws Exception {
CuratorFramework client = createSession();
client.start();
byte[] bytes = client.getData()
.forPath(path);
client.close();
return bytes;
}
public void nodeListener(String path) throws Exception {
CuratorFramework client = createSession();
client.start();
final NodeCache cache = new NodeCache(client,"/users");
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
byte[] ret = cache.getCurrentData().getData();
System.out.println("new data:" + new String(ret));
}
});
}
public void nodeChildrenListener(String parentPath) throws Exception {
CuratorFramework client = createSession();
client.start();
final PathChildrenCache cache = new PathChildrenCache(client,parentPath,true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED:" + event.getData());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED:" + event.getData());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED:" + event.getData());
break;
default:
break;
}
}
});
}
}