Zookeeper入门

简介

原理

分布式搭建

  • 安装jdk
  • 下载地址, 解压到指定目录
  • 修改配置
1
2
3
cd /opt/zk/conf
cp zoo_sample.cfg zoo.cfg
# 配置 cfg中的datadir, 将/temp 改为/var
  • 分发zk
1
scp -r zk node3:`pwd`
  • 创建数据目录
1
2
3
mkdir -p /var/xy/zk
# 设置服务器id
echo 1 > /var/xy/zk/myid
  • 启动集群
1
2
3
4
5
6
7
8
9
zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
# status
zkServer.sh status # 如果没有开启,是否防火墙没关
ZooKeeper JMX enabled by default
Using config: /opt/zk/bin/../conf/zoo.cfg
Mode: leader # 角色类型
  • 启动客户端帮助
1
2
3
4
5
6
7
8
zkCli.sh
...
WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls
[zk: localhost:2181(CONNECTED) 1] ls /
[zookeeper]
  • 客户端命令原语
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
[zk: localhost:2181(CONNECTED) 4] help 
ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path [watch] # 显示
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version] # 删除路径
sync path
listquota path
rmr path # 删除
get path [watch]
create [-s] [-e] path data acl #创建
addauth scheme auth
quit
getAcl path
close
connect host:port

# 状态说明
[zk: localhost:2181(CONNECTED) 13] stat /zk
cZxid = 0x200000009
ctime = Thu Jan 31 17:09:13 CST 2019
mZxid = 0x200000009
mtime = Thu Jan 31 17:09:13 CST 2019
pZxid = 0x200000009 # 事务id
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0

开发实例

  • eclipse (idea还有些问题)
  • 测试连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
public class ZookeeperTest {

private static final int SESSION_TIMEOUT = 30000;

public static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperTest.class);

private Watcher watcher = new Watcher() {

public void process(WatchedEvent event) {
LOGGER.info("process : " + event.getType());
}
};

private ZooKeeper zooKeeper;

/**
* 连接zookeeper
* @throws IOException
*/
@Before
public void connect() throws IOException {
zooKeeper = new ZooKeeper("node2:2181,node3:2181,node4:2181", SESSION_TIMEOUT, watcher);
}

/**
* 关闭连接
*/
@After
public void close() {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 创建一个znode
* 1.CreateMode 取值
* PERSISTENT:持久化,这个目录节点存储的数据不会丢失
* PERSISTENT_SEQUENTIAL:顺序自动编号的目录节点,这种目录节点会根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名;
* EPHEMERAL:临时目录节点,一旦创建这个节点的客户端与服务器端口也就是 session过期超时,这种节点会被自动删除
* EPHEMERAL_SEQUENTIAL:临时自动编号节点
* <br>------------------------------<br>
*/
@Test
public void testCreate() {
String result = null;
try {
result = zooKeeper.create("/zk", "zk001data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
LOGGER.info("create result : {}", result);
}

/**
* 删除节点 忽略版本
*/
@Test
public void testDelete() {
try {
zooKeeper.delete("/zk001", -1);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
}

/**
* 获取数据
*/
@Test
public void testGetData() {
String result = null;
try {
byte[] bytes = zooKeeper.getData("/zk", null, null);
result = new String(bytes);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
LOGGER.info("getdata result : {}", result);
}
@Test
public void testGetData01() throws Exception {
String result = null;
try {
byte[] bytes = zooKeeper.getData("/zk", null, null);
result = new String(bytes);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
LOGGER.info("getdata result : {}", result);

Thread.sleep(20000);

byte[] bytes;
try {
bytes = zooKeeper.getData("/zk", null, null);
result = new String(bytes);
} catch (KeeperException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
LOGGER.info("getdata result : {}", result);





}

/**
* 获取数据 设置watch
*/
@Test
public void testGetDataWatch() {
String result = null;
try {
System.out.println("get:");
byte[] bytes = zooKeeper.getData("/zk", new Watcher() {
public void process(WatchedEvent event) {
LOGGER.info("testGetDataWatch watch : {}", event.getType());
System.out.println("watcher ok");

}
}, null);
result = new String(bytes);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
LOGGER.info("getdata result : {}", result);

// 触发wacth NodeDataChanged
try {
System.out.println("set:");
zooKeeper.setData("/zk", "testSetData".getBytes(), -1);
System.out.println("set:");
zooKeeper.setData("/zk", "testSetData".getBytes(), -1);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
System.out.println("over");
}

/**
* 判断节点是否存在
* 设置是否监控这个目录节点,这里的 watcher 是在创建 ZooKeeper实例时指定的 watcher
*/
@Test
public void testExists() {
Stat stat = null;
try {
stat = zooKeeper.exists("/zk001", false);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
Assert.assertNotNull(stat);
LOGGER.info("exists result : {}", stat.getCzxid());
}

/**
* 设置对应znode下的数据 , -1表示匹配所有版本
*/
@Test
public void testSetData() {
Stat stat = null;
try {
stat = zooKeeper.setData("/zk001", "testSetData".getBytes(), -1);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
Assert.assertNotNull(stat);
LOGGER.info("exists result : {}", stat.getVersion());
}

/**
* 判断节点是否存在,
* 设置是否监控这个目录节点,这里的 watcher 是在创建 ZooKeeper实例时指定的 watcher
*/
@Test
public void testExistsWatch1() {
Stat stat = null;
try {
stat = zooKeeper.exists("/zk001", true);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
Assert.assertNotNull(stat);

try {
zooKeeper.delete("/zk001", -1);
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 判断节点是否存在,
* 设置监控这个目录节点的 Watcher
*/
@Test
public void testExistsWatch2() {
Stat stat = null;
try {
stat = zooKeeper.exists("/zk002", new Watcher() {
public void process(WatchedEvent event) {
LOGGER.info("testExistsWatch2 watch : {}", event.getType());
}
});
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
Assert.assertNotNull(stat);

// 触发watch 中的process方法 NodeDataChanged
try {
zooKeeper.setData("/zk002", "testExistsWatch2".getBytes(), -1);
} catch (Exception e) {
e.printStackTrace();
}

// 不会触发watch 只会触发一次
try {
zooKeeper.delete("/zk002", -1);
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 获取指定节点下的子节点
*/
@Test
public void testGetChild() {
try {
zooKeeper.create("/zk/001", "001".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create("/zk/002", "002".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

List<String> list = zooKeeper.getChildren("/zk", true);
for (String node : list) {
LOGGER.info("fffffff {}", node);
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
}
}

全解析-Paxos

引用

先说Paxos,它是一个基于消息传递的一致性算法,Leslie Lamport在1990年提出,近几年被广泛应用于分布式计算中,Google的Chubby,Apache的Zookeeper都是基于它的理论来实现的,Paxos还被认为是到目前为止唯一的分布式一致性算法,其它的算法都是Paxos的改进或简化。有个问题要提一下,Paxos有一个前提:没有拜占庭将军问题。就是说Paxos只有在一个可信的计算环境中才能成立,这个环境是不会被入侵所破坏的。

关于Paxos的具体描述可以在Wiki中找到:http://zh.wikipedia.org/zh-cn/Paxos算法。网上关于Paxos分析的文章也很多。这里希望用最简单的方式加以描述并建立起Paxos和ZK Server的对应关系。

Paxos描述了这样一个场景,有一个叫做Paxos的小岛(Island)上面住了一批居民,岛上面所有的事情由一些特殊的人决定,他们叫做议员(Senator)。议员的总数(Senator Count)是确定的,不能更改。岛上每次环境事务的变更都需要通过一个提议(Proposal),每个提议都有一个编号(PID),这个编号是一直增长的,不能倒退。每个提议都需要超过半数((Senator Count)/2 +1)的议员同意才能生效。每个议员只会同意大于当前编号的提议,包括已生效的和未生效的。如果议员收到小于等于当前编号的提议,他会拒绝,并告知对方:你的提议已经有人提过了。这里的当前编号是每个议员在自己记事本上面记录的编号,他不断更新这个编号。整个议会不能保证所有议员记事本上的编号总是相同的。现在议会有一个目标:保证所有的议员对于提议都能达成一致的看法。

好,现在议会开始运作,所有议员一开始记事本上面记录的编号都是0。有一个议员发了一个提议:将电费设定为1元/度。他首先看了一下记事本,嗯,当前提议编号是0,那么我的这个提议的编号就是1,于是他给所有议员发消息:1号提议,设定电费1元/度。其他议员收到消息以后查了一下记事本,哦,当前提议编号是0,这个提议可接受,于是他记录下这个提议并回复:我接受你的1号提议,同时他在记事本上记录:当前提议编号为1。发起提议的议员收到了超过半数的回复,立即给所有人发通知:1号提议生效!收到的议员会修改他的记事本,将1好提议由记录改成正式的法令,当有人问他电费为多少时,他会查看法令并告诉对方:1元/度。

现在看冲突的解决:假设总共有三个议员S1-S3,S1和S2同时发起了一个提议:1号提议,设定电费。S1想设为1元/度, S2想设为2元/度。结果S3先收到了S1的提议,于是他做了和前面同样的操作。紧接着他又收到了S2的提议,结果他一查记事本,咦,这个提议的编号小于等于我的当前编号1,于是他拒绝了这个提议:对不起,这个提议先前提过了。于是S2的提议被拒绝,S1正式发布了提议: 1号提议生效。S2向S1或者S3打听并更新了1号法令的内容,然后他可以选择继续发起2号提议。

好,我觉得Paxos的精华就这么多内容。现在让我们来对号入座,看看在ZK Server里面Paxos是如何得以贯彻实施的。

小岛(Island)——ZK Server Cluster

议员(Senator)——ZK Server

提议(Proposal)——ZNode Change(Create/Delete/SetData…)

提议编号(PID)——Zxid(ZooKeeper Transaction Id)

正式法令——所有ZNode及其数据

貌似关键的概念都能一一对应上,但是等一下,Paxos岛上的议员应该是人人平等的吧,而ZK Server好像有一个Leader的概念。没错,其实Leader的概念也应该属于Paxos范畴的。如果议员人人平等,在某种情况下会由于提议的冲突而产生一个“活锁”(所谓活锁我的理解是大家都没有死,都在动,但是一直解决不了冲突问题)。Paxos的作者Lamport在他的文章”The Part-Time Parliament“中阐述了这个问题并给出了解决方案——在所有议员中设立一个总统,只有总统有权发出提议,如果议员有自己的提议,必须发给总统并由总统来提出。好,我们又多了一个角色:总统。

总统——ZK Server Leader

又一个问题产生了,总统怎么选出来的?oh, my god! It’s a long story. 在淘宝核心系统团队的Blog上面有一篇文章是介绍如何选出总统的,有兴趣的可以去看看:http://rdc.taobao.com/blog/cs/?p=162

现在我们假设总统已经选好了,下面看看ZK Server是怎么实施的。

  • 情况一:

屁民甲(Client)到某个议员(ZK Server)那里询问(Get)某条法令的情况(ZNode的数据),议员毫不犹豫的拿出他的记事本(local storage),查阅法令并告诉他结果,同时声明:我的数据不一定是最新的。你想要最新的数据?没问题,等着,等我找总统Sync一下再告诉你。

  • 情况二:

屁民乙(Client)到某个议员(ZK Server)那里要求政府归还欠他的一万元钱,议员让他在办公室等着,自己将问题反映给了总统,总统询问所有议员的意见,多数议员表示欠屁民的钱一定要还,于是总统发表声明,从国库中拿出一万元还债,国库总资产由100万变成99万。屁民乙拿到钱回去了(Client函数返回)。

  • 情况三:

总统突然挂了,议员接二连三的发现联系不上总统,于是各自发表声明,推选新的总统,总统大选期间政府停业,拒绝屁民的请求。

呵呵,到此为止吧,当然还有很多其他的情况,但这些情况总是能在Paxos的算法中找到原型并加以解决。这也正是我们认为Paxos是Zookeeper的灵魂的原因。当然ZK Server还有很多属于自己特性的东西:Session, Watcher,Version等等等等,需要我们花更多的时间去研究和学习。

Donate comment here