网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
需要这份系统化资料的朋友,可以戳这里获取
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
使用WireShark嗅探GetDataRequest产生的TCP包(十六进制字节数组)
十六进制位 | 协议部分 | 数值或字符串 |
---|---|---|
00,00,00,1d | 0-3位:len 整个数据包长度 | 长度29 |
00,00,00,01 | 4-7位:xid 客户端请求的发起序号 | 1 |
00,00,00,04 | 8-11位:type 客户端请求类型 | 4 OpCode.getData |
00,00,00,10 | 12-15位:len 节点路径的长度 | 16 节点路径长度转换成十六进制是16位 |
2f,24,37,5f, 32,5f,34,2f, 67,65,74,5f, 64,61,74,61 | 16-31位:path 节点路径 | Hex编码 |
01 | 32位:是否注册Watcher | 1-是 |
响应
GetDataResponse响应完整协议定义
响应头 ReplyHeader
public class ReplyHeader implements Record { private int xid; // 请求时传过来的xid会在响应中原样返回 private long zxid; // zxid 代表ZK服务器上当前最新事务ID private int err; // 错误码:Code.OK-0,NONODE-101,NOAUTH-102,定义在KeeperException.Code中
响应体Response
//会话创建public class ConnectResponse implements Record { private int protocolVersion; private int timeOut; private long sessionId; private byte[] passwd;// 获取节点数据public class GetDataResponse implements Record { private byte[] data; private org.apache.zookeeper.data.Stat stat;// 更新节点数据public class SetDataResponse implements Record { private org.apache.zookeeper.data.Stat stat;
GetDataResponse 协议定义
十六进制位 | 协议解释 | 当前值 |
---|---|---|
00,00,00,63 | 0-3位:len 整个响应的数据包长度 | 99 |
00,00,00,05 | 4-7位:xid 客户端请求序号 | 5 本次请求所属会话创建后的第5次请求 |
00,00,00,00, 00,00,00,04 | 8-15位: zxid 当前服务器处理过的最大ZXID | 4 |
00,00,00,00 | 16-19位:err 错误码 | 0-Codes.OK |
00,00,00,0b | 20-23位:len 节点数据内容的长度 | 11 后面11位是数据内容的字节数组 |
xxx | 24-34位:data 节点数据内容 | Hex编码 |
00,00,00,00, 00,00,00,04 | 35-42位:czxid 创建该节点时的ZXID | 4 |
00,00,00,00, 00,00,00,04 | 43-50位:mzxid 最后一次访问该数据节点时的ZXID | 4 |
00,00,01,43,67,bd,0e,08 | 51-58位:ctime 数据节点的创建时间 | unix_timestamp 1389014879752 |
00,00,01,43,67,bd,0e,08 | 59-66位:mtime 数据节点最后一次变更的时间 | |
00,00,00,00 | 67-70位:version 数据节点内容的版本号 | 0 |
00,00,00,00 | 71-74位:cversion 数据节点的子版本号 | 0 |
00,00,00,00 | 75-78位:aversion 数据节点的ACL变更版本号 | 0 |
00,00,00,00,00,00,00,00 | 79-86位:ephemeralOwner 如果是临时节点,则记录创建该节点的sessionID,否则置0 | 0 (该节点是永久节点) |
00,00,00,0b | 87-90位:dataLength 数据节点的数据内容长度 | 11 |
00,00,00,00 | 91-94位:numChildren 数据节点的子节点个数 | 0 |
00,00,00,00,00,00,00,04 | 95-102位:pzxid 最后一次对子节点列表变更的ZXID | 4 |
ZK客户端
ZK客户端的组成:ZooKeeper实例-客户端入口,HostProvider - 客户端地址列表管理器,ClientCnxn-客户端核心线程,内部包含SendThread和EventThread两个线程。前者是一个IO线程,负责ZooKeeper客户端和服务器端间的网络IO通信,后者是一个事件线程,负责对服务端事件进行处理。
ZK会话的创建过程
初始化阶段
- 初始化ZK对象,通过调用ZooKeeper的构造方法实例化,在此过程中会创建客户端Watcher管理器 ClientWatcherManager
- 设置会话默认Watcher:如果在构造方法中传入了一个Watcher对象,客户端会将这个对象作为默认Watcher保存在ClientWatcherManager中
- 构造ZooKeeper服务器地址列表管理器 HostProvider:对于构造函数传入的服务器地址,客户端会将其存放在服务器地址列表管理器HostProvider中
- 创建并初始化客户端网络连接器 ClientCnxn:ClientCnxn连接器的底层IO处理器是ClientCnxnSocket。另外还会初始化客户端两个核心队列 outgoingQueue 和 pendingQueue 分别作为客户端的请求发送队列和服务端响应的等待队列。
- 初始化SendThread和EventThread:前者管理客户端与服务端之间的所有网络IO,后者用于客户端的事件处理
会话创建阶段
- 启动SendThread和EventThread
- 获取一个服务器地址:开始创建TCP连接前,SendThread从HostProvider中随机选择一个地址,调用ClientCnxnSocket 创建与ZK服务器之间的TCP连接
- 创建TCP长连接
- 构造ConnectRequest请求:SendThread根据当前客户端的实际设置,构造出一个ConnectRequest请求,代表了客户端视图与服务端创建一个会话。同时ZK客户端会将请求包装成IO层的Packet对象放入请求发送队列outgoingQueue中
- 发送请求:ClientCnxnSocket从outgoingQueue中取出一个待发送的Pocket对象序列化成ByteBuffer发送到服务端
响应处理阶段
- 接收并处理服务端响应:ClientCnxnSocket接收到服务端的响应后,会首先判断当前客户端状态是否是已初始化,才进行反序列化,得到ConnectResponse对象,从中获取ZK服务端分配的sessionID
- 连接成功:通知SendThread进一步对客户端进行会话参数的设置:readTimeout/connectTimeout,更新客户端状态。通知HostProvider当前成功连接的服务器地址
- 生成事件 SyncConnected - None:为了让上层应用感知到会话的成功创建,SendThread会生成该事件传递给EventThread,通知会话创建成功
- 查询Watcher:EventThread线程收到事件后,会从ClientWatchManager中获取对应Watcher,针对SyncConnected-None事件找到默认的Wathcer,放入EventThread的waitingEvents队列中
- 处理事件:EventThread不断从waitingEvents队列中取出待处理的Watcher对象,调用process方法触发Watcher
connectString解析
connectString 形如 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,ZK客户端允许将服务器所有地址配置在字符上,ZK客户端在连接服务器的过程中是如何从服务器列表中选择机器的?是顺序?还是随机?
org.apache.zookeeper.client.ConnectStringParser 中的构造方法对connectString进行的处理有:解析chrootPath + 保存服务器地址列表到 ArrayList serverAddresses
chroot 客户端命名空间
ZK3.2.0 之后的版本中添加了该特性,connectString 可 设置为 192.168.0.1:2181,192.168.0.2:2181/apps/domainName,将解析出chroot=/apps/domainName,这样客户端的所有操作都会限制在这个命名空间下
ZooKeeper.java
private static HostProvider createDefaultHostProvider(String connectString) { return new StaticHostProvider(new ConnectStringParser(connectString).getServerAddresses());}
解析的结果会返回 地址列表管理器 StaticHostProvider 的构造方法中
HostProvider
HostProvider 提供了客户端连接所需的host,每一个实现该接口的类需要确保下述几点:
- next() 方法必须有效的InetSocketAddress,这样迭代器能一直运行下去。必须返回解析过的InetSocketAddress实例
- size() 方法不能返回0
public interface HostProvider { //当前服务器地址列表的个数,不能返回0 int size(); // 获取下一个将要连接的InetSocketAddress,spinDelay 表示所有地址都尝试过后的等待时间 InetSocketAddress next(long spinDelay); //连接成功后的回调方法 void onConnected(); //更新服务器列表,返回是否需要改变连接用于负载均衡 boolean updateServerList(Collection<InetSocketAddress> serverAddresses, InetSocketAddress currentHost);}
解析服务器地址:StaticHostProvider会解析服务器地址放入serverAddress 集合中,同时使用Collections#shuffle方法将服务器地址列表进行随机打散。
获取可用的服务器地址:StaticHostProvider#next() 方法中将随机排序后的服务器地址列表拼成一个环形循环队列,该过程是一次性的。
HostProvider的实现:自动从配置文件中读取服务器地址列表、动态变更的地址列表管理器(定时从配置管理中心上解析ZK服务器地址)、实现服务调用时同机房优先的策略
ClientCnxn 网络IO
ClientCnxn维护客户端与服务器之间的网络连接并进行通信
Packet是ClientCnxn的内部类,定义:
static class Packet { RequestHeader requestHeader; ReplyHeader replyHeader; Record request; Record response; ByteBuffer bb; String clientPath; //server视角下的path,chroot不同 String serverPath; boolean finished; AsyncCallback cb; Object ctx; WatchRegistration watchRegistration; public boolean readOnly; WatchDeregistration watchDeregistration; //并不是Packet中的所有字段都进行网络传输,在createBB方法中定义了用于网络传输的ByteBuffer bb字段的生成逻辑 //里面只用到了RequestHeader requestHeader,Record request,boolean readOnly 3个字段 public void createBB() {}}
ClientCnxn的两个核心队列(都是Packet队列):
- outgoingQueue:客户端的请求发送队列,存储要发送到服务端的Packet集合
- pendingQueue:服务端响应的等待队列,存储已经从客户端发送到服务端但需要等待服务端响应的Packer集合
ClientCnxnSocket
ZK3.4之后ClientCnxnSocket从ClientCnxn中提取了出来,便于对底层Socket进行扩展(如使用Netty实现)
通过系统变量配合ClientCnxnSocket实现类的全类名:-Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNIO
ClientCnxnSocketNIO是ClientCnxnSocket的Java NIO原生实现
会话Session
【分布式】Zookeeper会话 - leesf - 博客园
会话状态有:CONNECTING CONNECTED RECONNECTING RECONNECTED CLOSE
Session是ZK中的会话实体,代表一个客户端会话,包含以下4个基本属性:
- sessionID 唯一标识一个会话,每次客户端创建新会话时,ZK会为其分配一个全局唯一的sessionID
- timeout 会话超时时间,客户端构造ZK实例时会传入sessionTImeout指定会话的超时时间,客户端向服务器发送这个超时时间后,服务器会根据自己的超时限定确定会话的超时时间
- tickTime 下次会话超时时间点,这个参数用于会话管理的分桶策略执行。TickTIme是一个13位的long型(unix_timestamp)
- isClosing 服务端检测到一个会话失效后会标记其isClosing=true,这样就不再处理来自该会话的新请求了
sessionID的生成原理
代码位于 SessionTrackerImpl#initializeNextSession//最终返回的sessionID:高8位是传入的id,剩下的56位最后16位被置零了,前面的40位是最高位截掉的timestamp(去掉数字1)public static long initializeNextSessionId(long id) { long nextSid; // nanoTime/10^6 就是 currentTimeMillis 13位long型,long型占空间8B,共64位 //如 1657349408123 对应 44 位的二进制是 00011000000111110001101110010000010101111011 //左移24位后再右移8位后的结果:00000000(-8位)1000000111110001101110010000010101111011(16位-)0000000000000000 //注意这个右移8位是无符号右移,防止unixtimes第5位是1带来的负数问题 nextSid = (System.nanoTime() / 1000000 << 24) >>> 8; //添加机器标识 sid 正好补在前面腾出的8位中 nextSid = nextSid | (id << 56); if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) { ++nextSid; // this is an unlikely edge case, but check it just in case } return nextSid;}
左移24位可以将高位的1去掉(unixTimestamp转二进制的44位数字开头总是0001),防止负数(负数右移8位后最高位的1不变),sid不能明确得出
SessionTracker
ZK服务端的会话管理器,负责会话的创建、管理和清理,使用3个数据结构管理Session:
- sessionsById:ConcurrentHashMap<Long, SessionImpl>类型,根据sessionID管理Session实体
- sessionsWithTimeout:ConcurrentMap<Long, Integer> 根据sessionID管理会话的超时时间,定期被持久化到快照文件中
- sessionSets:ExpiryQueue sessionExpiryQueue 服务于会话管理和超时检查,分桶策略会用到
Session管理 - 分桶策略
ZK的会话管理主要由SessionTracker负责,其采用了分桶策略:将理论上可以在同一时间点超时的会话放在同一区块中,便于进行会话的隔离处理和同一区块的统一管理。
对于一个会话的超时时间理论上就是客户端设置的超时时间之后,即图中的 ExpirationTime = CurrentTime + sessionTimeout(客户端进行设置),这样到达这个ExpirationTime检查各会话是否真的需要置超时状态
但是ZK服务端检查各区块的会话是否超时是有周期的,如每隔 ExpirationInterval 进行检查,这样实际的 ExpirationTime 是在原数值之后的最近一个周期上进行检查,这样
ExpirationTime_Adjust = ((CurrentTime + sessionTimeout) / ExpirationInterval + 1) * ExpirationInterval (单位均是ms)
如对于当前时间为4,,10 超时,检查周期为3,在15的时候才是第一个可能的超时时间。这样 ExpirationTime_Adjust 总是 ExpirationInterval 的整数倍。这样SessionTracker中的会话超时检查线程就可以在 ExpirationInterval 的整数倍的时间点上对会话进行批量清理(未及时移走的会话都是要被清理掉的,没有客户端触发会话激活)
会话激活
Leader服务器收到客户端的心跳消息PING后:
- 检查改会话是否是isClose
- 如果会话尚未关闭,则激活会话,计算出会话的下一次超时时间点 ExpirationTime_NEW
- 根据会话的旧超时时间点 ExpirationTime_Old 定位到会话所在的区块
- 迁移会话,将会话放入 ExpirationTime_NEW 对应的新区块中
触发会话激活的两种场景:
- 只要客户端向服务器发送请求(不论读/写)就会触发一次会话激活
- 客户端在sessionTimeout / 3 的时间间隔内没有向服务器发出任何请求,就会主动发起一次PING请求触发会话激活
会话清理的步骤
- 先将该会话的isClosing置为true,这样在会话清理期间再收到客户端的新请求就返回 Session_Expire,再标记会话状态为已关闭 - CLOSE
- 发起会话关闭 请求给 PrepRequestProcessor处理器进行处理
- 根据sessionID从内存数据库中找到对应的临时节点列表
- 将这些临时节点转换成 节点删除 请求,放入事务变更队列 outstandingChanges 中
- FinalRequestProcessor触发内存数据库,删除该会话对应的所有临时节点
- 节点删除后从SessionTracker中移除session(从sessionById sessionWithTimeout sessionExpiryQueue中移除对应session的信息)
- 从NIOServerCnxnFactory中找到会话对应的NIOServerCnxn进行关闭
重连机制
客户端与服务端网络连接断开时,ZK客户端会进行反复的重连
客户端经常看到的两种连接异常是:CONNECTION_LOSS 连接断开,SESSION_EXPIRE 会话过期;服务端可能看到的连接异常是SESSION_MOVED 会话转移
- CONNECTION_LOSS:客户端在发现连接断开时会逐个尝试连接 connectString 解析出的服务器地址,同时此时收到连接事件 None-Disconnected,同时抛出异常 KeeperException$ConnectionLossException,应用层应捕获住此异常并等待重连成功(收到None-SyncConnected事件)后进行重试
- SESSION_EXPIRE:通常发生在CONNECTION_LOSS,客户端重连成功后会话在服务端已过期被清理。应用层此时需要重新创建一个ZooKeeper实例进行初始化
- SESSION_MOVED:ZooKeeper在3.2.0版本后明确提出的概念,客户端 C 向服务端 S1发出的请求R1因网络抖动导致重连到S2,并重试请求R11,但后面R1成功到达S1,导致S1 S2 都执行了相同的请求。针对这一罕见场景,ZooKeeper提出的处理方案: 在处理客户端请求时检查此会话Owner是不是当前服务器,不是的话会抛出 SessionMovedException 异常,但C1因为已断开与S1的连接,看不到S1上的这个异常。在多个客户端使用相同的sessionId/pass连接不同服务端时才会看到这种异常
ZK服务端
ZK服务端架构
zookeeper学习笔记Sky_的博客-CSDN博客
单机版ZK服务器的启动流程
预启动
- 不论是单机还是集群模式,zkServer.cmd和zkServer.sh两个脚本中都配置了使用QuorumPeerMain 作为启动入口类
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
- 解析配置文件 zoo.cfg
- 在QuorumPeerMain#initializeAndRun方法中创建并启动了文件清理器 DatadirCleanupManager,包括对事物日志和快照数据文件的定时清理
- 根据zoo.cfg配置文件的解析判断当前是单机还是集群模式启动,单机模式使用ZooKeeperServerMain启动
- 创建ZooKeeperServer实例并进行初始化,包括连接器、内存数据库和请求处理器等组件的初始化
初始化
-
创建服务器统计器ServerStats,包含下述基本运行时信息:
- packetsSent: 从服务启动或重置以来,服务端向客户端发送的响应包次数
- packetsReceived: … 服务端接收到的来自客户端的请求包次数
- maxLatency/minLatency/totalLatency: 服务端请求处理的最大延时、最小延时、总延时
- count: 服务端处理的客户端请求总次数
-
创建ZK数据管理器FileTxnSnapLog:FileTxnSnapLog是ZK上层服务器和底层数据存储之间的对接层,提供了一些列操作数据文件的接口,包括事务日志文件(TxnLog接口)和快照数据文件(SnapShot接口)。ZK根据zoo.cfg文件中解析出的快照数据目录dataDir和事务日志目录dataLogDir来创建FileTxnSnapLog。
-
设置服务端 tickTime 和 会话超时时间 限制
-
创建并初始化 ServerCnxnFactory , 通过属性 zookeeper.serverCnxnFactory 指定zookeeper使用 Java原生NIO还是Netty框架作为ZooKeeper服务端网络连接工厂
-
启动ServerCnxnFactory主线程(执行主逻辑所在的run方法)此时ZK的NIO服务器已经对外开放了端口,客户端可以访问到2181端口,但此时zk服务器还无法正常处理客户端请求
-
恢复本地数据:ZK启动时都会从本地快照文件和事务日志文件中进行数据恢复
-
创建并启动会话管理器SessionTracker,同时会设置 expirationInterval 计算 nextExpirationTime、sessionID ,初始化本地数据结构 sessionsWithTimeout(保存每个会话的超时时间)。之后ZK就会开始会话管理器的会话超时检查
-
初始化ZK的请求处理链,ZK服务端对于请求的初始方式是典型的责任链模式,单机版服务器的处理链主要包括:PrepRequestProcessor -> SyncRequestProcessor ->FinalRequestProcessor
-
注册JMX服务:ZK会将服务器运行时的一些状态信息以JMX的方式暴露出来
-
注册ZK服务器实例:此时ZK服务器初始化完毕,注册到ServerCnxnFactory之后就可以对外提供服务了,至此单机版的ZK服务器启动完毕
集群版ZK服务器的启动过程
zk源码阅读26:集群版服务器启动概述 - 简书
预启动过程与单机版一致
初始化
- 创建并初始化 ServerCnxnFactory
- 创建ZooKeeper数据管理器 FileTxnSnapLog
- 创建QuorumPeer 实例:Quorum是集群模式下特有的对象,是ZooKeeper服务器实例ZooKeeperServer的托管者。从集群层面看QuorumPeer代表了ZooKeeper集群中的一台机器。在运行期间,Quorum会不断检测当前服务器实例的运行状态,同时根据情况发起Leader选举
- 创建内存数据库 ZKDatabase,管理ZooKeeper的所有会话记录以及DataTree 和事务日志的存储
- 初始化 QuorumPeer,将一些核心组件注册到QuorumPeer,包括 FileTxnSnapLog、ServerCnxnFactory、ZKDatabase,同时配置一些参数,包括服务器地址列表、Leader选举算法和会话超时时间限制等
- 恢复本地数据
- 启动 ServerCnxnFactory 主线程
Leader选举
- Leader选举初始化阶段:Leader选举是集群版启动流程与单机版最大的不同,ZK会根据SID(服务器分配的ID)、lastLoggedZxid(最新的ZXID)和当前的服务器epoch(currentEpoch)生成一个初始化的投票,初始化过程中每个服务器会为自己投票。 ZooKeeper会根据zoo.cfg中的配置(electionAlg),创建响应的Leader选举算法实现,3.4.0之前支持 LeaderElection/AuthFastLeaderElection/FastLeaderElection 三种算法实现,3.4.0之后只支持FastLeaderElection。 在初始化阶段,ZooKeeper会首先创建Leader选举所需的网络IO层 QuorumCnxManager,同时启动对Leader选举端口的监听,等待集群中的其他服务器创建连接
- 注册JMX服务
- 检测当前服务器状态:QuorumPeer不断检测当前服务器的状态做出相应的处理,正常情况下,ZK服务器的状态在LOOKING、LEADING和FOLLOWING/OBSERVING之间进行切换,。启动阶段QuorumPeer的状态是LOOKING,因此开始进行Leader选举
- Leader选举:投票选举产生Leader服务器,其他机器成为Follower或是Observer; Leader选举算法的原则:集群中的数据越新(根据每个服务器处理过的最大ZXID来确定数据是否比较新)越有可能成为Leader,ZXID相同时SID越大越有可能成为Leader。
Leader和Follower服务器启动期交互过程
-
PrepRequestProcessor是Leader服务器的请求预处理器,在ZK中,将创建删除节点/更新数据/创建会话等会改变服务器状态的请求称为事务请求,对于事务请求,预处理器会进行一系列预处理,如创建请求事务头、事务体、会话检查、ACL检查和版本检查
-
ProposalRequestProcessor Leader的事务投票处理器,也是Leader服务器事务处理流程的发起者。
- 对于非事务请求:直接将请求流转到CommitProcessor,不作其他处理
- 对于事务请求:除了交给CommitProcessor,还会根据对应请求类型创建对应的Proposal,并发送给所有Follower服务器发起一次集群内的事务投票。ProposalRequestProcessor还会将事务请求交给SyncRequestProcessor进行事务日志的记录
-
SyncRequestProcessor 事务日志处理器,将事务请求记录到事务日志文件中,触发ZooKeeper进行数据快照
-
AckRequestProcessor 是Leader特有的处理器,负责在SyncRequestProcessor处理器完成事务日志记录后向Proposal的投票收集器发送ACK反馈,通知投票收集器当前服务器已完成对该Proposal的事务日志记录
-
CommitProcessor 事务提交处理器
-
ToBeCommitProcessor 该处理类中有一个toBeApplied队列(ConcurrentLinkedQueue toBeApplied)存储被CommitProcessor处理过的可被提交的Proposal,等待FinalRequestProcessor处理完提交的请求后从队列中移除
-
FinalRequestProcessor 进行客户端请求返回前的收尾工作:创建客户端请求的响应、将事务应用到内存数据库
LearnerHandler:Leader服务器会与每一个Follower/Observer服务器建立一个TCP长链接,同时为每个Follower/Observer服务器创建LearnerHandler。LearnerHandler是ZK集群中的Learner服务器的管理器,负责Follower/Observer服务器和Leader服务器之间的网络通信:数据同步、请求转发、Proposal提议的投票。
Follower
Follower的职责:处理客户端非事务请求,转发事务请求给Leader服务器;参与事务请求Proposal的投票;参与Leader选举投票;
Follower不需要负责事务请求的投票处理(所以不需要ProposalRequestProcessor),所以其请求处理链简单一些
- FollowerRequestProcessor 识别出当前请求是否是事务请求,如果是事务请求,Follower就会将请求转发给Leader服务器,Leader服务器收到请求后提交给请求处理器链,按正常事务请求进行处理
- SendAckRequestProcessor Follower服务器上另一个和Leader服务器有差异的请求处理器,与Leader服务器上的AckRequestProcessor类似,SendAckRequestProcessor同样承担了事务日志记录反馈的角色,在完成事务日志记录后,会向Leader服务器发送ACK消息表明自身完成了事务日志的记录工作。两者的一个区别是:AckRequestProcessor在Leader服务器上,因此ACK反馈是一个本地操作,而SendAckRequestProcessor在Follower上,需要通过ACK消息的形式向Leader服务器进行反馈。
Observer
观察ZooKeeper集群的最新状态并将这些状态变更同步过来,Observer服务器在工作原理上与Follower基本一致,对于非事务请求可以进行独立的处理,对于事务请求同样需要转发到Leader服。与Follower的一大区别是:Observer不参与任何形式的投票,包括Leader选举和事务请求Proposal的投票。
集群内消息通信
ZK集群各服务器间消息类型分为4类:数据同步型、服务器初始化型、请求处理型、会话管理型
数据同步消息
Learner与Leader进行数据同步使用的消息,分为4种(消息类型定义在Leader.java中,使用常量数字标记):
- DIFF, 13 Leader发送给Learner,通知Learner进行DIFF方式的数据同步
- TRUNC, 14 Leader --> Learner 触发Learner服务器进行内存数据库的回滚操作
- SNAP, 15 Leader --> Learner 通知Learner,Leader即将与其进行全量数据同步
- UPTODATE, 12 Leader --> Learner 通知Learner完成了数据同步,可以对外提供服务
服务器初始化型消息
整个集群或某些机器初始化时,Leader与Learner之间相互通信所使用的消息类型:
- OBSERVERINFO,16: Observer在启动时发送消息给Leader,用于向Leader注册Observer身份,消息中包含当前Observer服务器的SID和已经处理的最新ZXID
- FOLLOWERINFO,11:Follower启动时发送包含SID和已处理的最新ZXID的注册消息到Leader
- LEADERINFO,17:上述两种情形下,Leader服务器会返回包含最新EPOCH值的LeaderInfo返回给Observer或Follower
- ACKEPOCH,18:Learner在收到LEADERINFO消息时会将自己的最新ZXID和EPOCH以ACKEPOCH消息的形式发送给Leader
- NEWLEADER,10:足够多的Follower连接上Leader服务器,或是Leader服务器完成数据同步后,Leader向Learner发送的阶段性标识信息,包含当前最新ZXID
请求处理型
请求处理过程中Leader和Learner之间互相通信所使用的消息:
- REQUEST,1:Learner收到事务请求时需要将请求转发给Leader,该请求使用REQUEST消息的形式进行转发
- PROPOSAL,2:在处理事务请求时,Leader服务器会将事务请求以PROPOSAL消息的形式创建投票发送给集群中的所有的Follower进行事务日志的记录
- ACK,3:Follower完成事务日志的记录后会以ACK消息的形式反馈给Leader
- COMMIT,4:Leader通知集群中的所有Follower,可以进行事务请求的提交了,Leader在收到过半Follower发来的ACK消息后,进入事务请求的最终提交流程——生成COMMIT消息,告知所有Follower进行事务请求的提交,这是一个2PC的过程
- INFORM,8:Leader发起事务投票并通知提交事务,只需要PROPOSAL和COMMIT消息给Follower就可以了,而Observer不参与事务投票,无法接收COMMIT消息,但需要知道事务提交的内容,所以ZK设计了INFORM消息发给Observer,消息中会携带事务请求的内容
- SYNC,7:Leader通知Learner服务器已完成Sync操作
会话管理型
ZK服务器在进行会话管理过程中,与Learner服务器之间通信所使用的消息:
- PING,5:ZK客户端随机选择一个服务器进行连接,所以Leader服务器无法直接收到所有客户端的心跳检测,所以需要委托Learner维护所有客户端的心跳检测。Leader定时向Learner发送PING消息就是要求Learner将一段时间内保持心跳检测的客户端列表同样以PING消息的形式返回给Leader,这样Leader就能获取到全部客户端的活跃状态并进行会话激活了。
- REVALIDATE,6:客户端发生重连后(可能切换了服务器)新连接的服务器需要向Leader发送REVALIDATE消息以确定客户端会话是否已经超时。
客户端请求的处理
会话创建请求
ZK服务端对于会话创建的处理,可以分为请求接收、会话创建、预处理、事务处理、事务应用和会话响应。
zookeeper源码分析(3)— 一次会话的创建过程 - 简书— 一次会话的创建过程 - 简书")
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
需要这份系统化资料的朋友,可以戳这里获取
用于确保事务请求的顺序性,便于CommitProcessor检测当前集群中是否正在进行事务请求的投票
+ 等待Proposal投票:Commit流程处理时,Leader根据当前事务请求生成Proposal广播给所有Follower,此时Commit流程需要等待
+ 投票通过,提议获得过半机器认可,ZK会将请求放入committedRequests队列中,同时唤醒Commit流程
+ 提交请求:将请求放入toProcess队列中,交给FinalRequestProcessor处理
事务应用
- FinalRequestProcessor检查outstandingChanges队列中请求的有效性,如果队列中的请求落后于当前正在处理的请求,则从队列中移除
- 之前的请求处理逻辑中仅仅是将事务请求记录到了事务日志中,内存数据库中的状态尚未变更。因此需要将事务变更应用到内存数据库中。对于会话创建这种“事务请求”,只需向SessionTracker进行会话注册
- 完成内容应用后将事务请求放入队列 commitProposal,这个队列保存最近被提交的事务请求,以便集群间机器进行数据的快速同步
[外链图片转存中…(img-YBQD3BIG-1715341903538)]
[外链图片转存中…(img-p0dH29Hs-1715341903539)]
[外链图片转存中…(img-7Ar2803B-1715341903539)]
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
需要这份系统化资料的朋友,可以戳这里获取