
Zookeeper方案
首先什么是zookeeper?参看在《System》当中的《ZK分布式》章节。
kazoo库简介
kazoo库使用纯Python实现Zookeeper协议,因此不需要安装任何额外的软件就可以直接连接Zookeeper。
安装Kazoo也很简单,直接通过 pip
安装:
pip install kazoo
要开始使用 Kazoo,必须创建一个 KazooClient
对象并建立连接:
from kazoo.client import KazooClient
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
?> 默认情况下,客户端将通过默认端口(2181)连接到本地Zookeeper服务器。首先您应该确保Zookeeper在那里运行,否则start命令将一直等到其默认超时。
连接后,无论间歇性连接丢失或 Zookeeper 会话过期,客户端都将尝试保持连接。可以通过调用stop指示客户端断开连接:
zk.stop()
监听状态
**客户端的常见状态可以分为三种:丢失(LOST)、连接(CONNECTED)、暂停(SUSPENDED)。**可以通过查看 state
属性来确定:
from kazoo.client import KazooClient
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
print(zk.state) # CONNECTED 注释:当前处于连接状态
# 注释:KazooClient首次创建实例,它是在LOST状态。建立连接后,它转换到 CONNECTED状态。
当客户端状态为SUSPENDED时,如果客户端正在执行需要与其他系统达成一致的操作(例如使用Lock),它就会暂停它正在执行的操作。重新建立连接后,客户端状态为CONNECTED时,客户端可以继续执行操作。
当客户端状态为LOST时,Zookeeper 将删除任何已创建的临时节点,这将会影响所有创建临时节点(例如使用Lock),当状态再次转换为CONNECTED后,需要重新获取锁。
常见的状态转换:
- LOST -> CONNECTED:新连接,或之前丢失的连接正在连接。
- CONNECTED -> SUSPENDED:与服务器的连接丢失发生在连接上。
- CONNECTED -> LOST:仅当在建立连接后提供无效的身份验证凭据时才会发生。
- SUSPENDED -> LOST:连接恢复到服务器,但由于会话过期而丢失。
- SUSPENDED -> CONNECTED:丢失的连接已恢复。
针对客户端状态,通过 KazooState
来进行判断可以写出如下状态监听器:
from kazoo.client import KazooState
def my_listener(state):
if state == KazooState.LOST:
# Register somewhere that the session was lost
elif state == KazooState.SUSPENDED:
# Handle being disconnected from Zookeeper
else:
# Handle being connected/reconnected to Zookeeper
zk.add_listener(my_listener)
?> 在使用 kazoo.recipe.lock.Lock
或创建临时节点时,强烈建议添加状态侦听器,以便您的程序可以正确处理连接中断或 Zookeeper 会话丢失。
节点CRUD
Zookeeper 有用于创建、读取、更新和删除 Zookeeper 节点(这里称为 znodes 或节点)的功能。针对的,Kazoo 添加了几个方便的方法和一个更 Pythonic 的 API。
首先需要明白的一点就是,节点是一个抽象的事物存在于会话当中,因此节点的路径也不是真实存在的。
创建节点
Kazoo创建节点的最常用两个方法:
ensure_path()
:递归创建节点和沿途所需路径中的任何节点,但不能为节点设置数据,只能设置 ACL。
create()
:创建一个节点,并可以在节点上设置数据以及一个 watch 函数。它要求它的路径首先存在,除非 makepath
选项设置为True。
# 创建一个节点
zk.ensure_path("/my/favorite")
# 创建一个带有数据的永久节点,前提是/my/favorite路径必须存在
zk.create("/my/favorite/node", b"a value")
# 创建一个带有数据的永久节点,/my/favorite路径可以不存在
zk.create("/my/favorite/node", b"a value", makepath=True)
# 创建一个带有数据的临时节点,/my/favorite路径可以不存在
zk.create("/my/favorite/node", b"a value", makepath=True, ephemeral=True)
?> 子节点可以被无限递归的创建。
检查节点
Kazoo检查节点的最常用三个方法:
exists()
:检查节点是否存在。
get()
:获取节点的数据以及 ZnodeStat
结构中的详细节点信息。
get_children()
:获取给定节点的子节点列表。
# 判断节点
if zk.exists("/my/favorite"):
# Do something
# 打印一个节点的版本和它的数据
data, stat = zk.get("/my/favorite")
print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
# 列出子节点
children = zk.get_children("/my/favorite")
print("There are %s children with names %s" % (len(children), children))
更新节点
set()
:更新指定节点的数据,在更新数据之前需要匹配提供的节点版本,否则将引发BadVersionError
而不是更新。
zk.set("/my/favorite", b"some data")
删除节点
delete()
:删除一个节点,也可以选择递归删除该节点的所有子节点。在删除节点之前需要匹配节点的版本,否则将引发 BadVersionError
而不是删除。
zk.delete("/my/favorite/node", recursive=True)
重试连接
retry方法
如果 Zookeeper 服务器出现故障或无法访问,与 Zookeeper 的连接可能会中断。默认情况下,kazoo 不会重试命令,因此这些失败将导致引发异常。为了帮助解决失败,kazoo 附带了一个 retry()
助手,如果 Zookeeper 连接异常被触发,重试连接到Zookeeper。
result = zk.retry(zk.get, "/path/to/node")
重试函数
**某些命令可能具有独特的行为,不保证在每个命令的基础上自动重试。**例如,当使用临时和序列选项集创建一个节点,在命令成功返回之前可能会丢失连接,但该节点实际上已创建,再次运行时引发 kazoo.exceptions.NodeExistsError
。
由于 retry()
方法需要调用一个函数及其参数,因此可以将运行多个 Zookeeper 命令的函数传递给它,以便在连接丢失时重试整个函数。锁实现中的这个片段显示了它如何使用重试重新运行获取锁的函数,并检查它是否已经被创建来处理这种情况:
# kazoo.recipe.lock snippet
def acquire(self):
"""获取互斥锁,阻塞直到它被获取"""
try:
self.client.retry(self._inner_acquire)
self.is_acquired = True
except KazooException:
# if we did ultimately fail, attempt to clean up
self._best_effort_cleanup()
self.cancelled = False
raise
def _inner_acquire(self):
self.wake_event.clear()
# 确保我们指定的节点存在
if not self.assured_path:
self.client.ensure_path(self.path)
node = None
if self.create_tried:
node = self._find_node()
else:
self.create_tried = True
if not node:
node = self.client.create(self.create_path, self.data,
ephemeral=True, sequence=True)
# 去除节点的路径
node = node[len(self.path) + 1:]
自定义重试
有时,您可能希望为与 retry()
方法不同的命令或命令集设置特定的重试策略 。您可以使用您喜欢的特定重试策略手动创建 KazooRetry
实例:
from kazoo.retry import KazooRetry
kr = KazooRetry(max_tries=3, ignore_expire=False)
result = kr(client.get, "/some/path")
这将重试client.get
命令最多 3 次,并在发生时引发会话过期。您还可以使用默认行为创建一个实例,在重试期间忽略会话过期。
监视
Kazoo 可以在节点上设置监视功能,该功能可以在更改或删除节点或其子节点更改时触发。
Watchers 可以通过两种不同的方式设置:
**第一种是 Zookeeper 默认支持一次性 watch 事件样式。**这些 watch 函数会被 kazoo 调用一次,并且不接收 session 事件。使用此样式需要将 watch 函数传递给get()
、get_children()
、exists()
方法之一,当节点上的数据发生变化或节点本身被删除时,将调用传递给该方法的 watch 函数。它将传递一个 WatchedEvent
实例:
def my_func(event):
# 检查子节点现在情况
pass
# 当子节点改变调用watch事件对应的my_func函数
children = zk.get_children("/my/favorite/node", watch=my_func)
**第二种是使用 Kazoo 包含的一个用于监视数据和子节点修改的 API。**在每次触发事件时它不需要重新设置监视,即使用此 API 注册的 Watch 函数将在每次发生更改时立即调用,或者直到函数返回 False。如果 allow_session_lost
设置为 True
,则在会话丢失时将不再调用该函数。以下方法 ChildrenWatch
、DataWatch
可直接在 KazooClient
实例上使用,并且在以这种方式使用时不需要传入客户端对象,通过实例可以直接调用,允许它们用作装饰器:
@zk.ChildrenWatch("/my/favorite/node")
def watch_children(children):
print("Children are now: %s" % children)
# Above function called immediately, and from then on
@zk.DataWatch("/my/favorite")
def watch_node(data, stat):
print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
命令流
Zookeeper 3.4 及更高版本支持一次发送多个命令,这些命令将作为单个原子单元提交。他们要么全部成功,要么全部失败。事务的结果将是事务中每个命令的成功/失败结果列表。
# transaction()方法返回一个TransactionRequest实例
transaction = zk.transaction()
# 调用它的方法来排队完成事务中的命令
transaction.check('/node/a', version=3)
transaction.create('/node/b', b"a value")
# 当事务准备好发送时,调用它的commit()方法。
results = transaction.commit()
在上面的示例中,只要有一个命令不可用,就会失败报错。这可以用来检查特定版本的节点,节点 /node/a
版本必须是3,如果节点与它应该处于的版本不匹配,将不会创建 /node/b
,则该版本事务失败。
异步使用
所有异步 Kazoo API 依赖于异步方法返回的 IAsyncResult
对象。可以使用 rawlink()
方法添加回调,无论使用线程还是使用 gevent 等异步框架,该方法都可以兼容。
连接处理创建连接:
from kazoo.client import KazooClient
from kazoo.handlers.gevent import SequentialGeventHandler
# 使用IHandler接口来抽象回调系统
zk = KazooClient(handler=SequentialGeventHandler())
# 立即返回
event = zk.start_async()
# 设置超时等待30秒
event.wait(timeout=30)
if not zk.connected:
# 未连接,停止尝试连接
zk.stop()
raise Exception("Unable to connect.")
?> Kazoo 不依赖 gevents/eventlet 猴子补丁,并要求传入适当的处理程序,默认处理程序是 SequentialThreadingHandler
。
除了 start_async()
之外的所有kazoo _async方法都返回一个 IAsyncResult
实例。回调函数将传递 IAsyncResult
实例,并应调用其上的 get()
方法以检索值。如果异步函数遇到错误,则此调用可能会导致引发异常。它应该被抓住并妥善处理。
例子:
import os
from kazoo.exceptions import ConnectionLossException
from kazoo.exceptions import NoAuthException
def my_callback(async_obj):
try:
children = async_obj.get()
do_something(children)
except (ConnectionLossException, NoAuthException):
os.exit(1)
# 这两个语句立即返回,第二个设置一个回调
# 当 get_children_async 有它的返回值
async_obj = zk.get_children_async("/some/node")
async_obj.rawlink(my_callback)
除了返回 IAsyncResult
对象之外,以下 CRUD 方法的工作方式都与它们的同步对应方法相同。
- 创建方法:
create_async()
- 检查方法:
exists_async()
、get_async()
、get_children_async()
- 更新方法:
set_async()
- 删除方法:
delete_async()
!> 注意: ensure_path()
具有目前没有异步版本也不能使用 delete_async()
方法来执行递归删除。