程序员快速提升:精通Zookeeper的经典应用场景(2)
顺序节点的应用,类似于我们在用zookeeper实现分布式锁的时候如何去处理惊群效应的做法。 且根据队列的特点:FIFO(先进先出),入队时我们创建顺序节点(ps:为什么上面我们是用了顺序节点而不是说是临时顺序节点,是因为我们根本不考虑客户端挂掉的情况)并把元素传入队列,出队时我们取出最小的节点。使用watch机制来监听队列的状态,在队列满时进行阻塞,在队列空时进行写入即可。 入队操作 如上图,我们生产者需要对资源进行访问时,会申请获取一个分布式锁,如果未成功抢占锁,就会进行阻塞,抢到锁的生产者会尝试把任务提交到消息队列,此时又会进行判断,如果队列满了,就监听队列中的消费事件,当有消费队列存在空位时进行入队,未消费时阻塞。入队时它会进行释放锁的操作,唤醒之前抢占锁的请求,并让之后的生产者来获取。 出队操作 出队和入队的机制是十分相似的。 ② JDK阻塞队列操作 阻塞队列:BlockingQueue---线程安全的阻塞队列 它以4种形式出现,对于不能立即满足但是在将来某一时刻可能满足的操作,4种形式的处理方式皆不同 1.抛出一个异常2.返回一个特殊值,true or false3.在操作可以成功前,无限阻塞当前线程4.放弃前只在给定的最大时间限制内阻塞复制代码 我们将会实现这个阻塞队列接口来实现我们的分布式队列 内容三:分布式队列的代码实现 public class ZkDistributeQueue extends AbstractQueue 继承了AbstractQueue,可以省略部分基础实现 ① 基本的配置信息及使用到的参数 首先我们需要一个zkClient的客户端,然后queueRootNode是分布式队列的存放元素的位置,指定了一个默认的根目录default_queueRootNode,把队列中的元素存放于/distributeQueue下,写锁节点代表往队列中存放元素,读锁节点代表从队列中去取元素,这个设计简单点来说就是,queueRootNode作为最大的目录,其下有3个子目录,分别是queueWriteLockNode,queueReadLockNode和queueElementNode,其他的就是一些需要使用到的配置信息 ② 构造器 提供两个构造方法,一个为使用默认参数实现,另外一个是自定义实现 此时在我们分布式锁的构造器中,createPersistent()的参数true是指如果我父目录queueRootNode并没有事先创建完成,这个方法会自动创建出父目录,这样就不怕我们在跑程序之前遗漏掉一些创建文件结构的工作 ③ 初始化队列信息的init()方法 重新定义好读锁写写锁和任务存放路径,然后把zkClient连接上,创建queueElementNode作为任务元素目录,参数true上文作用已经提到了 ④ 使用put()方法进行队列元素入队操作 checkElement()方法是一个简单的参数检查,我们也可以定义有关于znode的命名规范的一些检查,不过一般情况下只要是String类型的参数都是没有问题的 size()方法也很简单,就是先取得父目录然后调用zkClient自带的countChildren()方法得出结果返回即可 主要就是通过subscribeChildChanges()监听子节点的数据变化,在size() < capacity条件成立时,就会唤醒等待队列,而当size() >= capacity,就会判断队列已经被填满,从而进行阻塞 在waitForRemove()方法执行后,我们的等待线程被唤醒,这时重新执行put(e),尝试重新入队 入队操作由enqueue(e)来完成,就是创建顺序节点的步骤 ⑤ 消费操作take 附:生产者和消费者的模拟 ① 生产者 模拟了两台服务器,两个并发,每睡3秒钟就往消息队列put (编辑:ASP站长网) |