Zookeeper

Paxos 算法


基于消息传递、高容错性的一致性算法。算法就某个决议达成一致,要解决的问题是在分布式系统中就某个决议达成一致性。
有名的 Paxos 工程实现有 Google Chubby、ZAB、微信的 PhxPaxos

拜占庭将军问题,该问题要说明的含义是:在不可靠信道上试图通过消息传递的方式达到一致性是不可能的。
所以,Paxos 算法的前提是不存在拜占庭将军问题,即信道是安全的、可靠的、集群节点间传递的消息是不会被篡改的。

一般情况下,分布式系统中各个节点间采用两种通讯模型,共享内存、消息传递。Paxos 是基于消息传递通讯模型。

算法描述

在 Paxos 算法中有三种角色,分别具有三种不同的行为,很多时候一个进程可能同时充当多种角色。

Proposer 提议者
Accepter 表决者,同时也能提交提议。
Learner 同步者

每个提议者在提出提议时都会先获取到一个具有全局唯一性的、递增的提议编号 N,整个集群中是唯一编号,然后将该编号与提议对应。
每个表决者在 accept 某提议后,将该提议的编号 N 记录在本地,这样每个表决者中保存的已经被 accept 的提议会存在一个编号最大的提议,编号假设为 maxN,每个表决者仅会 accept 编号大于自己本地 maxN 的提议。

一旦一个提议被选定,其他服务器会主动同步该提议到本地。

一个关于Paxos算法的故事

Paxos 算法执行过程分为两个阶段:准备阶段 prepare 与 接受阶段 accept

prepare 阶段

  • Proposer 准备提交一个编号为 N 的提议,于是其首先向所有 Accepter 发送 prepare(N) 请求,试探集群是否支持该编号提议
  • 每个 Accepter 保存自己曾经 accept 过的提议中的最大编号 maxN。当一个 Accepter 接受到其他主机发送的 prepare(N) 请求时,比较 N 与 maxN 的值
    • 若 N 小于 maxN,说明该提议已经过时,当前 Accepter 采取不回应或回应 Error 的方式拒绝 prepare 请求
    • 若 N 大于 maxN,则说明该提议时可以接受的,当前 Accepter 首先将 N 记录下来,并将其曾经已经 accept 的编号最大提议 proposal(myid, maxN, value) 反馈给 Proposer,表示自己支持该提议。
      • myid Accepter 标识id,
      • maxN 曾接受的提议最大编号
      • 提议的真正内容。
    • 若当前 Accepter 未曾 accept 过任何提议,返回 proposal(myid, null, null) 反馈给 Proposer
  • prepare 阶段 N 不可能等于 maxN。这是由 N 的生成机制决定,要获取 N 的值,必定会在原来数值基础上采用同步锁的方式增一。

accept 阶段

  1. 当 Proposer 发出 prepare(N) 后,收到了超过半数的 Accepter 的反馈,那么该 Proposer 就会将提议 proposal(myid, N, value) 发送给所有的表决者
  2. 当 Accepter 接受到 Proposer 发送的 proposal(myid, N, value),会再次拿出自己曾经 accept 过的 提议中最大的编号 maxN,或者曾记录下的 prepare 的最大编号,让 N 与它们比较,如果 N 大于等于 这两个编号,则当前 Accepter accept 该提议,并反馈给 Proposer。如果小于,则 Accepter 采取不回应或者回应 Error 的方式拒绝该提议。
  3. 如果 Proposer 没有接受到超过半数的 Accepter 的 accept 反馈,则重新进入 prepare 阶段,递增提议编号,重新提出 prepare 请求,或者放弃该提议,不再提出。
  4. 若 Proposer 接受到的 Accepter 反馈数量超过半数,则会向外广播两类信息:a) 向曾 accept 其提议的 Accepter 发送可执行数据同步信号,即让它们执行曾接收到的提议。b) 向未曾向其发送 accept 反馈的 Accepter 发送 提议 + 可执行数据同步信号,让它们接受到该提议后马上执行,执行时,所有 Accepter 状态发生改变,由 Accepter 变为 Learner,不再接受 proposal。

Paxos 算法存在活锁问题

死锁:由于线程A被阻塞,需要线程B才能执行,而线程B也被阻塞,需要线程A才能执行,无法自行解开。
活锁:没有阻塞,但由于某些条件不满足,导致一直重复尝试-失败-尝试-失败的过程,处于活锁的实体由于不断改变状态,有可能自行解开。

具体表现在,提议1 prepare(N1)后,新的提议2 prepare(N2) 刷新了每个 Accepter 的 maxN,导致 accept 阶段失败,于是提议1重新拿到编号 prepare,此时提议1的编号大于提议2,重新刷新了 Accepter 的 maxN 之后,导致提议2在 accept 阶段失败,于是如此循环往复。导致活锁。

解决方案:Fast Paxos 算法对 Paxos 算法进行了改进:只允许一个进程提交提议,即该进程具有对 N 的唯一操作权。该方式解决了活锁问题。

ZAB 协议


ZAB Zookeeper Atomic Broadcast zk 原子消息广播协议,专为 Zookeeper 设计的一种支持崩溃恢复原子广播协议,在 Zookeeper 中,主要依赖 ZAB 协议实现分布式数据一致性

Zookeeper 使用一个单一主进程来接收并处理客户端所有事务请求,当服务器数据状态发生变更,采用 ZAB 原子广播协议,以事务提议 Proposal 的形式广播到所有副本进程。ZAB 协议能够保证一个全局的变更序列,每一个事务分配一个全局的递增编号 xid。

Zookeeper 客户端连接到 Zookeeper 集群的一个节点后,若客户端提交的是读请求,当前节点就直接根据自己保存的数据对其响应,如果是写请求且当前节点不是 Leader,那么节点就会将该写请求转发给 Leader,Leader 会以提议的方式广播该写操作,只要超过半数节点同一该写操作,写操作请求就会被提交。然后 Leader 会再次广播给所有订阅者,即 Learner 通知它们同步数据。

ZAB 协议是 Fast Paxos 算法的一种工业实现算法,两者设计目标不一样,ZAB协议用于构建一个高可用的分布式数据主从系统,Follower 是 Leader 的备用机,Leader 挂了,马上可以选举一个新的 Leader。而 Fast Paxos 算法则用于构建一个分布式一致性状态机系统,确保系统中各个节点的状态都一致。

ZAB 使用了 Google 的 Chubby 算法作为分布式锁的实现。

zk 集群的三类角色

  • Leader 接收和处理客户端的读请求;zk 集群中事务请求的唯一处理者,负责发起决议和投票,然后将通过的事务请求在本地进行处理后,将处理结果同步给集群中的其他主机
  • Follower 接收和处理客户端的读请求;将事务请求转给 Leader;同步 Leader 中的数据;当 Leader 挂了,参与 Leader 的选举
  • Observer 没有选取和被选举权,没有投票权的 Follower。如果 zk 集群中的读压力很大,则需要增加 Observer,最好不要增加 Follower。因为增加 Follower 会增大投票与统计选票的压力,降低写操作效率,及 Leader 选举的效率。
  • Learner 当 Follower 进行数据同步时,只能接收和处理读请求,此时 Follower 状态为 Learner
  • QuorumServer Follower + Leader

Observer 的数量
Observer 数量一般与 Follower 数量相同。并不是越多越好,因为 Observer 增多虽然不会增加事务操作压力,但由于其需要从 Leader 同步数据,Observer 同步数据的时间是小于等于 Follower 同步数据时间的,当 Follower 同步数据完成,Leader 的 Observer 列表中的 Observer 主机将结束同步。完成同步的 Observer 会进入另一个对外提供服务的列表。其余的 Observer 会通过心跳包的形式连接 Leader 进行后续的数据同步。对于事务操作发生频繁的系统,不建议过多的Observer

Leader 中有2个 Observer 列表,all(包含所有 Observer 主机)、service(每发生一次事务更新,service 列表就会发生一次变化)

在 Leader 中,Follower 也存在两个列表,all(包含所有 Follower 主机)、service(每发生一次事务更新,service 列表就会发生一次变化),如果service <= all/2 ,则说明同步失败,Leader 会重新广播,Follower 重新同步

zxid、epoch、xid

  • zxid 长度为64位的 Long 类型数据,高32位表示 epoch,低32位表示 xid
  • epoch zk 集群中每个 Leader 都会有一个不同的 epoch,区分不同的 Leader 时期
  • xid 事务id,是一个流水号(递增)

zk 集群中的模式和状态

ZAB 协议中对 zkServer 的状态描述有三种,

  • 恢复模式 当集群启动时,或者当 Leader 挂了进行选举时,系统需要进入恢复模式。Leader 的选举和初始化同步
  • 广播模式 初始化广播和更新广播
  • 同步模式 初始化同步与更新同步

zk 集群中的每一台主机,在不同的阶段会处于不同的状态。每一台主机具有四种状态。

  • Looking 选举状态
  • Following Follower 的正常工作状态,从 Leader 同步数据的状态
  • Observing Observer 的正常工作状态,从 Leader 同步数据
  • Leading Leader 的正常工作状态,Leader 广播数据更新的状态

恢复模式

恢复模式下有两个阶段,Leader 选举与初始化同步。当完成 Leader 选举后,此时 Leader 还是一个准 Leader,要经过初始化同步后才能变为真正的 Leader。

  1. 为了保证 Leader 向 Learner 发送提议的有序,Leader 会为每一个 Learner 服务器准备一个队列
  2. Leader 将没有被各个 Learner 同步的事务封装为一个 Proposal
  3. Leader 将这些 Proposal 逐条发给各个 Learner,并在每一个 Proposal 后都紧跟一个 Commit 消息,表示该事务已经被提交,Learner 可以直接接收并执行
  4. Learner 接收来自于 Learner 的 Proposal,并将其更新到本地
  5. 当 Learner 更新成功后,会向准 Learder 发送 ACK 信息
  6. Leader 服务器收到来自 Learner 的 ACK 后就会将该 Learner 加入到真正可用的 Follower 列表或 Observer 列表中,没有反馈 ACK 或者 反馈了但 Leader 没有收到的 Learner,Leader 不会将其加入到相应列表。

Leanner-3 没有反馈,则不会被加入到 Learner 的同步队列中

当集群正在启动过程中,或者 Leader 与超过半数的主机断连后,集群进入了恢复模式,对于要恢复的数据状态需要遵循两个原则,此时会比较 zxid 大小,zxid 打的 Follower 被选举为 Leader。

  • 已被处理过的消息不能丢

当 Leader 收到超过半数 Follower 的 ACKs 后,广播 Commit 消息时,如果在非全部 Follower 收到 Commit 消息之前就挂了,导致一种后果:部分 Server 已经执行了该事务,而部分 Server 尚未收到 Commit 消息,没有执行。当新的 Leader 被选举出后,集群经过恢复模式后需要保证所有的 Server 上都执行了哪些已经被部分 Server 执行过的事务

  • 被丢弃的消息不能再现

如果所有的 Follower 都没有收到 Commit 之前 Leader 就宕机了,所有 Follower 根本不知道该 Proposal 存在,当新的 Leader 选举出来,整个集群进入正常服务状态后,之前挂了的 Leader 主机重新启动并注册成为了 Follower。如果那个别人根本不知道的 Proposal 还保留在主机上,那么其数据就会比其他主机多出了内容,导致整个系统状态的不一致。此时,该 Proposal 应该被丢弃,类似这样应该被丢弃的事务,是不能再次出现在集群中的,应该被清除。

当一台新的 Follower 想连入 zk 集群,则需要将其的 zxid 与 Leader 的 zxid 递归比较,只要找到两个 zxid 相同并且 Proposal 也相同的事务,然后同步该 zxid 后面所有的事务,之前比较过不相等的事务会被丢弃。

广播模式

如果集群中的 Learner 节点收到客户端的事务请求,那么这些 Learner 会将请求转发给 Leader 服务器,然后执行如下具体过程:

  1. Learder 接收到事务请求后,为事务赋予一个全局唯一的 64 位自增 id,即 zxid,通过 zxid 的大小比较可实现事务的有序性管理,然后将事务封装为一个 Proposal。
  2. Leader 根据 Follower 列表获取所有的 Follower,然后将 Proposal 通过这些 Follower 的队列将提议发送给各个 Follower
  3. Follower 接收到提议后,将提议的 zxid 与本地记录的事务日志中最大的 zxid 比较(防止前面的事务的 Commit 由于网络原因指令晚到达造成事务数据错乱),若当前提议的 zxid 大于最大 zxid,将当前提议记录到本地事务日志中,并向 Leader 返回一个 ACK。(很重要)
  4. 当 Leader 接收超过半数 ACKs 后,Leader 就会向所有 Follower 队列发送 Commit 消息,向所有 Observer 队列发送 Proposal
  5. 当 Follower 收到 Commit 消息后,会将日志中的事务正式更新到本地。当 Observer 收到 Proposal后,会直接将事务更新到本地
  6. 无论是 Follower 还是 Observer,在同步完成后都需要向 Leader 发送成功 ACK。

Leader 选举


  • myid zk 集群中服务器的唯一标识
  • logicalclock 逻辑时钟,选举时称为 logicalclock,选举结束后称为 epoch。

集群启动时选举 Leader

如果要进行 Leader 选举,至少需要两台主机,这里以三台主机组成的集群为例。集群初始化阶段,Server1 启动时,会给自己投票,然后发布自己的投票结果。投票包含推举服务器的 myid 和 zxid,使用(myid, zxid)表示。此时 Server1 的投票为 (1, 0)。由于其他机器还未启动,所以收不到反馈信息,Server1 的状态便一直处于 Looking,非服务状态。

当第二台服务器 Server2 启动时,此时两台服务器可以相互通信,每台机器试图找到 Leader,选举过程如下:

  1. 每个 Server 发出一个投票。此时 Server1 投票为 (1, 0) Server2 投票为 (2, 0),然后各自将这个投票发送给集群其他服务器。
  2. 接受到来自各个服务器的投票后,集群的每个服务器首先判断该投票的有效性,如检查是否是本轮投票,是否来自 Looking 状态的服务器等。
  3. 针对每一个投票,服务器需要将别人的投票和自己的投票进行 PK,规则如下
    • 有限检查 zxid,zxid 比较大的服务器优先作为 Leader
    • 如果 zxid 相同,比较 myid。myid 较大的服务器作为 Leader 服务器。
  4. 每次投票后,服务器会统计投票信息,判断是否已经有半数以上机器接收到相同投票。对于 Server1、Server2 而言,集群中已经有两台主机接受了 (2, 0) 的投票信息,此时就选出了新的 Leader,Server2
  5. 一旦确定了 Leader,每个服务器会更新自己的状态,如果是 Follower,则会变更为 Following,如果是 Leader,则会变更为 Leading
  6. 新的 Leader 选举出来后 Server3 启动,其向发出新一轮选举,但由于当前集群各个主机的状态并不是 Looking,而是各司其职的正常服务,所以只能以 Follower 的身份加入到集群中。

集群宕机后的 Leader 选举

如果 Leader 突然宕机,整个集群将暂停对外服务,进入新一轮 Leader 选举,其过程和启动时期的 Leader 选举过程基本一致。

假设正在运行的 Server1、Server2、Server3 三台服务器,当前 Leader 为 Server2,如果某一时间,Server2 挂了。选举过程如下:

  1. Leader 挂后,余下非 Observer 服务器都会将自己的服务器状态由 Following 变更为 Looking,然后进入 Leader 选举过程。
  2. 每个 Server 会发出一个投票,仍然会首先投自己。不过,运行期间每个服务器上的 zxid 可能是不同的,假定 Server1 产生投票 (1, 333),Server3 产生投票 (3, 111),然后各自将投票发送给集群中所有机器。
  3. 集群中的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自 Looking 状态的服务器。
  4. 服务器都需要将别人的投票和自己的投票进行 PK,对于 Server1 而言,它的投票是 (1, 333),接受 Server3 的投票为 (3, 111)。首先会比较两者的 zxid,Server1 的 zxid 大于 Server3 的,然后 Server3 更新自己的投票 (1, 333) ,然后广播重新投票。对于 Server1 而言,无需更新自己的投票,只是再次向所有主机发送上一次投票信息。
  5. 对于 Server1、Server3 而言,有两台主机接受了 (1, 333),便认为已经选出了新的 Leader,即 Server3。
  6. 一旦确定了 Leader,每个服务器会更新状态。Server1 变为 Leading,Server3 变为 Following

CAP 定理


  • Consistency 一致性
  • Availability 可用性
  • Partition tolerance 分区容错性

C 分布式系统中过个主机之间是否能够保持数据一致的特性。当系统数据发生更新操作后,各个主机中的数据仍然处于一致的状态
A 系统提供的服务必须一直处于可用的状态,即对用户的每一个请求,系统总是可以在有限的时间内对用户做出响应
P 分布式系统在遇到任何网络分区故障时,仍能够保证对外提供满足一致性和可用性服务。

对于分布式系统来说,网络环境相对是不可控的,出现了网络分区是不可避免的,因此系统必须具备分区容错性。但其并不能同时保证一致性与可用性。 CAP 原则即对一个分布式系统来说只可能满足两项,要么 CP 要么 AP。

理解:由于需要满足分区一致性,假设3台机器,每台机器不能放置大于n/2台参与选举的服务器,此时假设某一写事务操作需要,在决议通过后,Leader Commit当前机器的数据库后,其余机器需要同步更新,而同步更新期间则是不可用的。如果要保证服务一直是可用的,则在异步更新期间,Leader 数据与 Observer 数据便不一致。直到更新完毕

BASE

Basically Available 基本可用、Soft state 软状态、Eventually consistent 最终一致性。BASE CP 与 AP 权衡的结果。

即使无法做到强一致性,但每个系统都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性。

zk 遵循的是 CP 原则,保证了一致性,但牺牲了可用性。当 Leader 宕机后,zk 集群会马上进行新的 Leader 选举,选举时长一般在200毫秒内,最长不超过60秒。整个选举期间 zk 集群不接受客户端的读写操作。处于瘫痪状态。不满足可用性。

脑裂状态

zk 可能会引发脑裂,在多机房部署中,如果出现了网络连接问题,形成多个分区,可能出现数据不一致。

机房B 感知不到 Leader,其中的服务器会进入 Looking 状态,发送选举消息到机房C,但机房C的服务器都是 Following 状态可以感知 Leader。所以机房B选举失败。

此时,如果B节点收到了数据更改请求,就会造成 A 与 BC 数据不一致的情况。脑裂现象。

容灾设计方案


设置多台主机部署一个集群避免单点问题,需要考虑集群部署在多个机房、多个楼宇。在多机房部署设计中,要充分考虑过半原则,尽量确保 zk 集群中有过半的机器能够正常运行。

三机房部署

在生产环境下,三机房部署是最常见的、容灾性最好的部署方案。
假定 zk 集群中机器总数为 N ,三个机房中部署的机器数量分别为 N1、N2、N3

N1 = (N - 1) / 2。保证第一季芳中具有刚不到半数主机。
N2 = 1 ~ (N - N1) / 2
N3 = N - N1 - N2

扩容与缩容

水平扩展对于提高系统服务能力,是一种非常重要的方式。但 zk 对于水平扩容和缩容做的不完美,主机数量的变化需要修改配置文件后整个集群进行重启。集群重启的方式:

整体重启,整个集群停止,然后更新所有主机的配置然后再重启集群。该方式会使集群停止对外服务,所以慎用。
部分重启,每次重启一小部分主机,不能多于半数,因为重启的主机过半,意味着剩余主机无法产生合法投票结果,如果宕机无法进行选举,不宕机无法提供写服务。

源码


判断 zk 集群的某台服务器不可用

每台服务器中都存在一个维护消息发送的 Map,Map 中的各个 Value(队列)是否为空,判断当前 Server 与与整个集群的连接状态:

规则:每个 Value 对应集群中的一台 Server,而队列中存放的是发送失败的消息。

如果所有队列均为空 说明当前 Server 与集群的连接是没有任何问题的
如果所有队列均不为空 说明当前 Server 与集群已经完全失联
如果某一队列不为空 说明当前 Server 与该队列对应的 Server 的连接出现问题
如果任意一个队列为空 说明当前 Server 与集群的连接是没有问题的。

在源码中在方法 QuorumCnxManager.haveDelivered() 中提现了上述规则

FastLeaderElection.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* Implementation of leader election using TCP. It uses an object of the class
* QuorumCnxManager to manage connections. Otherwise, the algorithm is push-based
* as with the other UDP implementations.
* 使用 TCP 实现了 Leader 的选举,这种方式使用 QuorumCnxManager 类的对象进行与其他 Server
* 间的连接管理。如果不使用 QuorumCnxManager 对象的话,将使用 UDP 的基于推送的算法实现。
*
* There are a few parameters that can be tuned to change its behavior. First,
* finalizeWait determines the amount of time to wait until deciding upon a leader.
* This is part of the leader election algorithm.
*/

public class FastLeaderElection implements Election {

private static final Logger LOG = LoggerFactory.getLogger(FastLeaderElection.class);

/**
* Determine how much time a process has to wait
* once it believes that it has reached the end of
* leader election.
* 决定一轮的选举过程需要等待的时间,如果超过该时间,当轮选举结束。等待其他 Server 发送通知的时间
*/
static final int finalizeWait = 200;

/**
* Upper bound on the amount of time between two consecutive
* notification checks. This impacts the amount of time to get
* the system up again after long partitions. Currently 60 seconds.
* finalizeWait 的最大值,默认60秒
*/

private static int maxNotificationInterval = 60000;

/**
* Lower bound for notification check. The observer don't need to use
* the same lower bound as participant members
*/
private static int minNotificationInterval = finalizeWait;

...
}

public static class Notification {
/*
* Format version, introduced in 3.4.6
*/

public static final int CURRENTVERSION = 0x2;
int version;

/*
* Proposed leader 当前推荐做 Leader 的 ServerId
*/ long leader;

/*
* zxid of the proposed leader 当前推荐做 Leader 的最大的 zxid
*/ long zxid;

/*
* Epoch 当前轮选举的 逻辑时钟
*/ long electionEpoch;

/*
* current state of sender 当前通知发送者的状态
*/ QuorumPeer.ServerState state;

/*
* Address of sender 当前通知发送者的 ServerId
*/ long sid;

QuorumVerifier qv;
/*
* epoch of the proposed leader 当前选票推荐做 Leader 的逻辑时钟
*/ long peerEpoch;

}

totalOrderPredicate()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Check if a pair (server id, zxid) succeeds our
* current vote.
*
*/
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug(
"id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}",
newId,
curId,
Long.toHexString(newZxid),
Long.toHexString(curZxid));

// 对于 Observer 来说,其权重 weight 为 0,没资格选举
if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}

/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
// Leader 的比较逻辑
return ((newEpoch > curEpoch)
|| ((newEpoch == curEpoch)
&& ((newZxid > curZxid)
|| ((newZxid == curZxid)
&& (newId > curId)))));
}

getVoteTracker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Given a set of votes, return the SyncedLearnerTracker which is used to
* determines if have sufficient to declare the end of the election round.
*
* @param votes
* Set of votes
* @param vote
* Identifier of the vote received last
* @return the SyncedLearnerTracker with vote details
*/
protected SyncedLearnerTracker getVoteTracker(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}

/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}

return voteSet;
}

lookForLeader()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
/**
* Starts a new round of leader election. Whenever our QuorumPeer
* changes its state to LOOKING, this method is invoked, and it
* sends notifications to all other peers.
* 开启新一轮的 Leader 选举,无论何时,只要 QuorumPeer 的状态变为 looking,这个方法
* 就会被调用,发送 notifications 给所有其他同级服务器
*/
public Vote lookForLeader() throws InterruptedException {
try {
// 1.创建选举对象,选举前的初始化
// jmx Java Management Extensions Oracle 提供的分布式应用程序监控技术
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}

// 获取与当前 JVM 高分辨率时间源相对应的时间,单位毫秒,
// 防止启动 JVM 之后有人改动时间
self.start_fle = Time.currentElapsedTime();
try {
/*
* The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset
* if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a majority
* of participants has voted for it.
* 记录当前 Server 收到来自其他 Server 的本轮投票信息
* key 为投票者的 ServerId value 为选票
* recvset 集合就相当于票箱
*/
Map<Long, Vote> recvset = new HashMap<Long, Vote>();

/*
* The votes from previous leader elections, as well as the votes from the current leader election are
* stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection.
* Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use
* outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than
* the electionEpoch of the received notifications) in a leader election.
* 用于记录所有不合法的选票
*/
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

int notTimeout = minNotificationInterval;

// 2.将自己作为新的 Leader 广播出去
synchronized (this) {
// 新一轮选举,逻辑时钟+1
logicalclock.incrementAndGet();
// 选举自己,修改当前 Server 的推荐信息
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}

LOG.info(
"New election. My id = {}, proposed zxid=0x{}",
self.getId(),
Long.toHexString(proposedZxid));
sendNotifications();

SyncedLearnerTracker voteSet;

/*
* Loop in which we exchange notifications until we find a leader
*/
// 3. 验证当前自己的选票与大家的选票谁更适合做 Leader
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if (n == null) {
// 判断当前服务器在集群中的连接是否正常
if (manager.haveDelivered()) {
// 重新发送通知。目的是为了接受其他 Server 再次发送通知
sendNotifications();
} else {
// 说明当前 Server 与集群已经失联
// 连接所有其他 Server
// 当前 Server 如果与集群失联,则其他 Server 一定不可能收到当前 Server 发送的通知
// 所以就会执行 sendNotifications(),只需要连接上所有其他 Server 即可,不用重新发送通知
// 也会收到其他所有 Server 所发送的通知
manager.connectAll();
}

/*
* Exponential backoff
* 延长选举时长,但是最长不会超过60秒
*/
int tmpTimeOut = notTimeout * 2;
notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
LOG.info("Notification time out: {}", notTimeout);
} else if (validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*/
switch (n.state) {
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
// If notification > current, replace and send messages out
// 外来通知 n 的 epoch 大于 本轮选举的逻辑时钟,说明本轮选举已经过时
if (n.electionEpoch > logicalclock.get()) {
// 更新本轮选举的逻辑时钟,使已经过时的本轮选举变为了当下选举
logicalclock.set(n.electionEpoch);
// 清空票箱
recvset.clear();
// 比较当前提议,谁更适合做 Leader
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
// 无论是谁更适合做 Leader 都要更新提议
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
Long.toHexString(n.electionEpoch),
Long.toHexString(logicalclock.get()));
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}

LOG.debug(
"Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
n.sid,
n.leader,
Long.toHexString(n.zxid),
Long.toHexString(n.electionEpoch));

// don't care about the version if it's in LOOKING state
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

// 4. 判断本轮选举是否所有服务器都投票了
if (voteSet.hasAllQuorums()) {

// Verify if there is any change in the proposed leader
/**
* 当前 while 有两个出口
* 1:循环条件结束,这个出口说明剩余通知中没有找到比任何当前(过半的选票)更适合的通知,出去时 n = null
* 2:break 这里出去,说明剩余的通知中找到了一个比当前(过半选票)更适合的通知,n 不为 null
*/
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}

/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
* 结束选举、发送结束选举的通知,当前的选票过半的服务器就是最终的 Leader 了
*/
if (n == null) {
// 判断自己是不是 leader,如果是就修改状态
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
// 5. 无需选举的情况
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
// 1.新的 Server(非 Observer) 加入到正常运行的集群
// 2.当 Leader 挂了,并不是所有 Follower 都同时能够感知到 Leader 挂了。
// 先感知到的 Server 会发送通知发送给其他 Server,但由于其他 Server 还未感知到
// 所以发送给这个 Server 的通知状态就是 Following
// 3.本轮选举中,其他 Server 已经选举出了新的 Leader,但还没有通知到当前 Server
// 这些已经知道了 Leader 选举完毕的 Server 向该 Server 发送的通知就是 Leading 或
// Following
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* 外来通知的逻辑时钟与本轮选举的逻辑时钟相同,上述第三种情况
*/
if (n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}

/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
* Note that the outofelection map also stores votes from the current leader election.
* See ZOOKEEPER-1732 for more information.
* 以下代码用于处理一个 Server 加入到一个已经选举出 Leader 的集群中的 情况
*/
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
setPeerState(n.leader, voteSet);
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if (self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
}
}

Zookeeper 技术内幕


节点

zk 数据存储结构,与 Unix 文件系统非常相似,都是在根节点下挂很多子节点。zk 中没有引入传统文件系统中目录与文件的概念,而是使用 znode 的数据节点概念。znode 是 zk 中数据的最小单元,每个 znode 上都可以保存数据,同时还可以挂载子节点,形成一个树形化命名空间。

节点分为:持久节点,持久顺序节点、临时节点(生命周期与客户端的会话绑定在一起,不能有子节点)、临时顺序节点
节点状态:

  • cZxid 当前 znode 被创建时的事务id
  • ctime 当前 znode 被创建的时间
  • mZxid 当前 znode 最后一次被修改时的事务id
  • mtime 当前 znode 最后一次被修改时的时间
  • pZxid 表示当前 znode 的子节点列表最后一次被修改时的事务 id,只能是其子节点列表变更了才会引起 pZxid 的变更,子节点内容的修改不会影响 pZxid
  • cversion 子节点的版本号,该版本号用于充当乐观锁
  • dataVersion 当前 znode 数据的版本号。该版本号用于充当乐观锁
  • aclVertsion 当前 znode 的权限 ACL 的版本号。该版本号用于充当乐观锁
  • ephemeralOwner 若当前 znode 是持久节点,则其值为0,若为临时节点,其值为创建该节点的会话的 sessionId,会话消失后,会根据 sessionId 来查找与会话相关的临时节点进行删除
  • dataLength 当前 znode 中存放的数据长度
  • numChildren 当前 znode 所包含的子节点的个数

会话

会话状态

常见的会话状态三种:

Connecting 连接中,客户端要创建连接,其首先会在客户端创建一个 zk 对象,代表服务器。采用轮询的方式对服务器列表逐个尝试连接,知道成功。为了对 Server 进行负载均衡,会首先对服务器列表进行打散操作,然后再轮询

Connected 已经连接

Closed 连接关闭

会话连接事件

客户端与服务端的长连接失效后,客户端将进行重连。重连过程中,客户端会产生三种会话连接事件

  • Connection_loss 连接丢失,因为网络抖动等原因导致连接中断,在客户端会引发连接丢失,该事件会触发客户端逐个尝试连接服务器,直到连接成功
  • session_moved 会话转移,当连接丢失后,在 SessionTimeout 内重连成功,则 SessionId 是不变的,若两次连接上的 Server 不是同一个,则会引发会话转移事件。会引发客户端更新本地 zk 对象的相关信息。
  • session_expired 会话失效。若客户端在 SessionTimeout 时间内没有连接成功,则服务器会将该会话清除,并向 Client 发送通知,但在 Client 收到通知之前,又连上了 Server,此时这个会话是失效的,会引发会话失效事件。该事件会触发客户端重新生成新的 SessionId 重新连接 Server

会话连接的超时管理-分桶策略

将时间相近的会话放在一个块中管理,减少管理的复杂度。表示,只需要在某一个时间点,检查当前桶中剩下的会话即可,因为没有超时的会话已经被移出了桶,桶中存在的会话就是超时会话。

分桶计算依据:
ExpirationTime = CurrentTime(连接时间点) + SessionTimeout
Bucket = ExpirationTime / ExpirationInterval + 1 –> 第几个桶

一个桶的时长为 ExpirationInterval,只要 ExpirationTime 落入到同一个桶中,系统就会对其中的超时会话进行统一管理。

ACL 访问控制列表

Access Control List,细粒度的权限管理策略。可以对任意用户与组进行细粒度的权限控制。zk 利用 ACL 控制 znode 节点的访问权限,如 节点数据读写、节点创建、删除、读取子节点列表、设置节点权限等。

UGO,粗粒度权限管理,Linux 2.4 以后支持 ACL。User Group Other。无法单独管理 Other 下的用户

Unix/Linux 系统的 ACL 分为两个维度:组与权限,且目录的子目录或文件能够继承父目录的 ACL。
Zookeeper 的 ACL 分为三个维度:授权策略scheme、授权对象id、用户权限 permission,子znode 不会继承父 znode 的权限

授权策略 scheme

通过什么来验证权限,即一个用户要访问某个 znode,如何验证其身份,在 zk 中最常用的有四种

  • ip 根据ip进行验证
  • digest 根据用户名密码进行验证
  • world 对所有用户不做任何验证
  • super 超级用户对任意节点具有任意操作权限

授权对象id

权限赋予的用户

  • ip 将权限授予某个指定 ip
  • digest 将权限授予具有指定用户名与密码的用户
  • world 将权限授予某一个用户 anyone
  • super 将权限授予具有指定用户名密码的用户

权限 Permission

通过验证的用户可以对 znode 做出的操作行为,一共五种权限,zk 支持自定义权限

  • c Create,允许授权对象在当前节点创建子节点
  • d Delete,允许授权对象删除当前节点
  • r Read 允许授权对象读取当前节点的数据内容及子节点列表
  • w Write 允许授权对象修改当前节点数据内容及子节点列表(可以为当前节点增/删子节点)
  • a ACL,允许授权对象对当前节点进行 ACL 设置

Watcher 机制

zk 的 watcher 机制有以下特性

  • watcher 不适合监听变化非常频繁的场景
  • 只有当当前的 watcher 回调执行完毕了,才会向 Server 注册新的 watcher(这里的新的 watcher 是对同一个节点相同事件类型的监听)
  • Client 向 Server 发送的 watcher 不是一个完整的,是一个简易版 watcher,回调逻辑不是 Server 端的,而是 Client。所以服务端较轻量。

客户端命令


启动客户端

  1. 连接本机 zk 服务器 zkcli.sh
  2. 连接其他 zk 服务器 zkcli.sh -server 192.168.59.117:2181

查看子节点 -ls

查看根节点及 /brokers 节点下所包含的所有子节点列表

创建节点

  • 创建永久节点

创建一个名称为 china 的 znode,值为 999

  • 创建顺序节点

/china 节点下创建了顺序子节点 beijing、shanghai、guangzhou,他们数据分别为 bj、sh、gz。

  • 创建临时节点

临时节点与持久节点的区别,在后面 get 命令中可以看到。

获取节点信息

  • 获取持久节点数据

  • 获取顺序节点信息

  • 获取临时节点信息

更新节点数据内容

更新前:

更新后:

删除节点

若要删除具有子节点的节点会报错

ACL 操作

  • 查看权限

  • 设置权限

下面的命令是,首先增加了一个认证用户 zs,密码为123,然后为 /china 节点指定只有 zs 用户才可访问该节点,而访问权限为所有权限。

文章作者: Ammar
文章链接: http://lizhaoloveit.cn/2020/09/07/Zookeeper/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Ammar's Blog
打赏
  • 微信
  • 支付宝

评论