一、demo
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(
1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(
"localhost:2181",
5000,
3000,
retryPolicy);
client.start();
PathChildrenCache pathChildrenCache = new PathChildrenCache(
client, "/cluster", true);
pathChildrenCache.start();
// cache就是把zk里的数据缓存到了你的客户端里来
// 你可以针对这个缓存的数据加监听器,去观察zk里的数据的变化
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
}
});
}
我们来看一下这行代码,是如何创建的
PathChildrenCache pathChildrenCache = new PathChildrenCache(
client, "/cluster", true);
/**
*@param client客户端
*@param-path要监视的路径
*@param cacheData如果为true,则除了stat之外还会缓存节点内容
*@param dataIsCompressed如果为true,则压缩路径中的数据
*@param executorService Closeable executorService用于PathChildrenCache的后台线程
*/
注释:启动缓存。缓存不会自动启动。您必须调用此方法。
说明调用start方法时默认不会缓存数据
默认走NORMAL分支,我们点进去看看
这里就启动了一个线程,将传递进来的Operation放入operationsQuantizer,如果放成功则启动一个线程,将Operation从operationsQuantizer删除,调用invoke方法,我们在返过去看一下这个Operation
这个里面invoke方法会去调用PathChildrenCache的refresh方法,我们在点击进去看一下
方法注释:
/**
*第一次,同步并确保创建路径中的所有节点。后续通话
*在这个例子中,是NOP。
*/
这个方法的是确保监听的目录在zk中已经存在了
这里会根据构造器的模式先获取出一个GetChildrenBuilder,在调用GetChildrenBuilder的usingWatcher方法将watcher监听器保存进去,在通过inBackground方法将刚刚创建的BackgroundCallback回调保存
实际逻辑则在forPath方法里面,这个因为是后台请求,我们直接看
processBackgroundOperation方法,这里的话会创建一个OperationAndData类,将自己作为operation传递进去
这里的话实际就是调用OperationAndData类的
callPerformBackgroundOperation方法,而这个方法其实就是调用刚刚传递进来的GetChildrenBuilderImpl的performBackgroundOperation方法
简要说明就是forPath方法会调用
performBackgroundOperation方法,这个方法会调用zk原生的api给他一个Watcher监听器,zk如果有子节点发生事件会通知原生的Watcher反过来调用我们注册的listener;