storm操作zookeeper源码分析-cluster.clj

时间:2022-09-07 19:04:36

storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中)。backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState。clojure中的protocol可以看成java中的接口,封装了一组方法。ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协议定义如下:

ClusterState协议
(defprotocol ClusterState
 (set-ephemeral-node [this path data])
 (delete-node [this path])
 (create-sequential [this path data])
 ;; if node does not exist, create persistent with this data
 (set-data [this path data])
 (get-data [this path watch?])
 (get-version [this path watch?])
 (get-data-with-version [this path watch?])
 (get-children [this path watch?])
 (mkdirs [this path])
 (close [this])
 (register [this callback])
 (unregister [this id]))

StormClusterState协议封装了一组storm与zookeeper进行交互的函数,可以将StormClusterState协议中的函数看成ClusterState协议中函数的"组合"。StormClusterState协议定义如下:

StormClusterState协议
(defprotocol StormClusterState
 (assignments [this callback])
 (assignment-info [this storm-id callback])
 (assignment-info-with-version [this storm-id callback])
 (assignment-version [this storm-id callback])
 (active-storms [this])
 (storm-base [this storm-id callback])
 (get-worker-heartbeat [this storm-id node port])
 (executor-beats [this storm-id executor->node+port])
 (supervisors [this callback])
 (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
 (setup-heartbeats! [this storm-id])
 (teardown-heartbeats! [this storm-id])
 (teardown-topology-errors! [this storm-id])
 (heartbeat-storms [this])
 (error-topologies [this])
 (worker-heartbeat! [this storm-id node port info])
 (remove-worker-heartbeat! [this storm-id node port])
 (supervisor-heartbeat! [this supervisor-id info])
 (activate-storm! [this storm-id storm-base])
 (update-storm! [this storm-id new-elems])
 (remove-storm-base! [this storm-id])
 (set-assignment! [this storm-id info])
 (remove-storm! [this storm-id])
 (report-error [this storm-id task-id node port error])
 (errors [this storm-id task-id])
 (disconnect [this]))

命名空间backtype.storm.cluster除了定义ClusterState和StormClusterState这两个重要协议外,还定义了两个重要函数:mk-distributed-cluster-state和mk-storm-cluster-state。
mk-distributed-cluster-state函数如下:
该函数返回一个实现了ClusterState协议的对象,通过这个对象就可以与zookeeper进行交互了。

mk-distributed-cluster-state函数
(defn mk-distributed-cluster-state
 ;; conf绑定了storm.yaml中的配置信息,是一个map对象
 [conf]
 ;; zk绑定一个zk client,Storm使用CuratorFramework与Zookeeper进行交互
 (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]
   ;; 创建storm集群在zookeeper上的根目录,默认值为/storm
   (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
   (.close zk))
 ;; callbacks绑定回调函数集合,是一个map对象
 (let [callbacks (atom {})
       ;; active标示zookeeper集群状态
       active (atom true)
       ;; zk重新绑定新的zk client,该zk client设置了watcher,这样当zookeeper集群的状态发生变化时,zk server会给zk client发送相应的event,zk client设置的watcher会调用callbacks中相应回调函数来处理event
       ;; 启动nimbus时,callbacks是一个空集合,所以nimbus端收到event后不会调用任何回调函数;但是启动supervisor时,callbacks中注册了回调函数,所以当supervisor收到zk server发送的event后,会调用相应的回调函数
       ;; mk-client函数定义在zookeeper.clj文件中,请参见其定义部分
     zk (zk/mk-client conf
                        (conf STORM-ZOOKEEPER-SERVERS)
                        (conf STORM-ZOOKEEPER-PORT)
                        :auth-conf conf
                        :root (conf STORM-ZOOKEEPER-ROOT)
                        ;; :watcher绑定一个函数,指定zk client的默认watcher函数,state标示当前zk client的状态;type标示事件类型;path标示zookeeper上产生该事件的znode
                        ;; 该watcher函数主要功能就是执行callbacks集合中的函数,callbacks集合中的函数是在mk-storm-cluster-state函数中通过调用ClusterState的register函数添加的
                        :watcher (fn [state type path]
                                   (when @active
                                     (when-not (= :connected state)
                                       (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
                                     (when-not (= :none type)
                                       (doseq [callback (vals @callbacks)]
                                         (callback type path))))))]
   ;; reify相当于java中的implements,这里表示实现一个协议
   (reify
    ClusterState
    ;; register函数用于将回调函数加入callbacks中,key是一个32位的标识
    (register
      [this callback]
      (let [id (uuid)]
        (swap! callbacks assoc id callback)
        id))
    ;; unregister函数用于将指定key的回调函数从callbacks中删除
    (unregister
      [this id]
      (swap! callbacks dissoc id))
    ;; 在zookeeper上添加一个临时节点
    (set-ephemeral-node
      [this path data]
      (zk/mkdirs zk (parent-path path))
      (if (zk/exists zk path false)
        (try-cause
          (zk/set-data zk path data) ; should verify that it's ephemeral
          (catch KeeperException$NoNodeException e
            (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
            (zk/create-node zk path data :ephemeral)
            ))
        (zk/create-node zk path data :ephemeral)))
    ;; 在zookeeper上添加一个顺序节点
    (create-sequential
      [this path data]
      (zk/create-node zk path data :sequential))
    ;; 修改某个节点数据
    (set-data
      [this path data]
      ;; note: this does not turn off any existing watches
      (if (zk/exists zk path false)
        (zk/set-data zk path data)
        (do
          (zk/mkdirs zk (parent-path path))
          (zk/create-node zk path data :persistent))))
    ;; 删除指定节点
    (delete-node
      [this path]
      (zk/delete-recursive zk path))
    ;; 获取指定节点数据。path标示节点路径;watch?是一个布尔类型值,表示是否需要对该节点进行"观察",如果watch?=true,当调用set-data函数修改该节点数据后,
    ;; 会给zk client发送一个事件,zk client接收事件后,会调用创建zk client时指定的默认watcher函数(即:watcher绑定的函数)
    (get-data
      [this path watch?]
      (zk/get-data zk path watch?))
    ;; 与get-data函数的区别就是获取指定节点数据的同时,获取节点数据的version,version表示节点数据修改的次数
    (get-data-with-version
      [this path watch?]
      (zk/get-data-with-version zk path watch?))
    ;; 获取指定节点的version,watch?的含义与get-data函数中的watch?相同
    (get-version
      [this path watch?]
      (zk/get-version zk path watch?))
    ;; 获取指定节点的子节点列表,watch?的含义与get-data函数中的watch?相同
    (get-children
      [this path watch?]
      (zk/get-children zk path watch?))
    ;; 在zookeeper上创建一个节点
    (mkdirs
      [this path]
      (zk/mkdirs zk path))
    ;; 关闭zk client
    (close
      [this]
      (reset! active false)
      (.close zk)))))

mk-storm-cluster-state函数定义如下:
mk-storm-cluster-state函数非常重要,该函数返回一个实现了StormClusterState协议的实例,通过该实例storm就可以更加方便与zookeeper进行交互在启动nimbus和supervisor的函数中均调用了

mk-storm-cluster-state函数。关于nimbus和supervisor的启动将在之后的文章中介绍。

mk-storm-cluster-state函数
( ))]
         ;; 删除to-kill中包含的节点
         (doseq [k to-kill]
           (delete-node cluster-state (str path "/" k)))))
     ;; 得到给定的storm-id component-id下的异常信息
     (errors
       [this storm-id component-id]
       (let [path (error-path storm-id component-id)
             _ (mkdirs cluster-state path)
             children (get-children cluster-state path false)
             errors (dofor [c children]
                           (let [data (-> (get-data cluster-state (str path "/" c) false)
                                          maybe-deserialize)]
                             (when data
                               (struct TaskError (:error data) (:time-secs data) (:host data) (:port data))
                               )))
             ]
         (->> (filter not-nil? errors)
              (sort-by (comp - :time-secs)))))
     ;; 关闭连接,在关闭连接前,将回调函数从cluster-state的callbacks中删除
     (disconnect
       [this]
       (unregister cluster-state state-id)
       (when solo?
         (close cluster-state))))))

zookeeper.clj中mk-client函数定义如下:
mk-client函数创建一个CuratorFramework实例,为该实例注册了CuratorListener,当一个后台操作完成或者指定的watch被触发时将会执行CuratorListener中的eventReceived()。eventReceived中调用的wacher函数就是mk-distributed-cluster-state中:watcher绑定的函数。

mk-client函数
(defnk mk-client
 [conf servers port
  :root ""
  :watcher default-watcher
  :auth-conf nil]
 (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]
   (.. fk
       (getCuratorListenable)
       (addListener
         (reify CuratorListener
           (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
                  (when (= (.getType e) CuratorEventType/WATCHED)
                    (let [^WatchedEvent event (.getWatchedEvent e)]
                      (watcher (zk-keeper-states (.getState event))
                               (zk-event-types (.getType event))
                               (.getPath event))))))))
   (.start fk)
   fk))

以上就是storm与zookeeper进行交互的源码分析,我觉得最重要的部分就是如何给zk client添加"wacher",storm的很多功能都是通过zookeeper的wacher机制实现的,如"分配信息领取"。添加"wacher"大概分为以下几个步骤:

  1. mk-distributed-cluster-state函数创建了一个zk client,并通过:watcher给该zk client指定了"wacher"函数,这个"wacher"函数只是简单调用ClusterState的callbacks集合中的函数,这样这个"wacher"函数执行哪些函数将由ClusterState实例决定

  2. ClusterState实例提供register函数来更新callbacks集合,ClusterState实例被传递给了mk-storm-cluster-state函数,在mk-storm-cluster-state中调用register添加了一个函数(fn [type path] ... ),这个函数实现了"watcher"函数的全部逻辑
  3. mk-storm-cluster-state中注册的函数执行的具体内容由StormClusterState实例决定,对zookeeper节点添加"观察"也是通过StormClusterState实例实现的,这样我们就可以通过StormClusterState实例对我们感兴趣的节点添加"观察"和"回调函数",当节点或节点数据发生变化后,zk server就会给zk client发送"通知",zk client中的"wather"函数将被调用,进而我们注册的"回到函数"将被执行。

这部分源码与zookeeper联系十分紧密,涉及了很多zookeeper中的概念和特性,如"数据观察"和"节点观察"等,有关zookeeper的wacher机制请参考
http://www.cnblogs.com/ggjucheng/p/3369946.html
http://www.cnblogs.com/zhangchaoyang/articles/3813217.html
storm并没有直接使用zookeeper的api,而是使用Curator框架,Curator框架简化了访问zookeeper的操作。关于Curator框架请参考
http://f.dataguru.cn/thread-120125-1-1.html

storm操作zookeeper源码分析-cluster.clj的更多相关文章

  1. Nimbus<二>storm启动nimbus源码分析-nimbus.clj

    nimbus是storm集群的"控制器",是storm集群的重要组成部分.我们可以通用执行bin/storm nimbus >/dev/null 2>&1 &a ...

  2. storm启动nimbus源码分析-nimbus.clj

    nimbus是storm集群的"控制器",是storm集群的重要组成部分.我们可以通用执行bin/storm nimbus >/dev/null 2>&1 &a ...

  3. storm启动supervisor源码分析-supervisor.clj

    supervisor是storm集群重要组成部分,supervisor主要负责管理各个"工作节点".supervisor与zookeeper进行通信,通过zookeeper的&qu ...

  4. storm shell命令源码分析-shell_submission.clj

    当我们在shell里执行storm shell命令时会调用shell_submission.clj里的main函数.shell_submission.clj如下: shell_submission.c ...

  5. storm定时器timer源码分析-timer.clj

    storm定时器与java.util.Timer定时器比较相似.java.util.Timer定时器实际上是个线程,定时调度所拥有的TimerTasks:storm定时器也有一个线程负责调度所拥有的& ...

  6. supervisor启动worker源码分析-worker.clj

    supervisor通过调用sync-processes函数来启动worker,关于sync-processes函数的详细分析请参见"storm启动supervisor源码分析-superv ...

  7. worker启动executor源码分析-executor.clj

    在"supervisor启动worker源码分析-worker.clj"一文中,我们详细讲解了worker是如何初始化的.主要通过调用mk-worker函数实现的.在启动worke ...

  8. zookeeper源码分析之五服务端(集群leader)处理请求流程

    leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

  9. zookeeper源码分析之四服务端(单机)处理请求流程

    上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...

随机推荐

  1. VS2013常用快捷键你敢不会?

    F1 帮助文档 F5 运行 F12 跳转到定义 F11 单步调试 Shift+F5 停止调试 Ctrl+滚轮 放大缩小当前视图 Ctrl+L 删除当前行 Ctrl+K,Ctrl+C 注释选中代码 Ct ...

  2. hdu 4738 2013杭州赛区网络赛 桥+重边+连通判断 ***

    题意:有n座岛和m条桥,每条桥上有w个兵守着,现在要派不少于守桥的士兵数的人去炸桥,只能炸一条桥,使得这n座岛不连通,求最少要派多少人去. 处理重边 边在遍历的时候,第一个返回的一定是之前去的边,所以 ...

  3. JBOSS最大连接数配置和jvm内存配置

    一.调整JBOSS最大连接数. 配置deploy/jboss-web.deployer/server.xml文件 .       <Connector         port="80 ...

  4. JAVA中的NIO&lpar;一&rpar;

    1.IO与NIO IO就是普通的IO,或者说原生的IO.特点:阻塞式.内部无缓冲,面向流. NIO就是NEW IO,比原生的IO要高效.特点:非阻塞.内部有缓存,面向缓冲. 要实现高效的IO操作,尤其 ...

  5. Beta版本——冲刺计划及安排

    我说的都队 031402304 陈燊 031402342 许玲玲 031402337 胡心颖 03140241 王婷婷 031402203 陈齐民 031402209 黄伟炜 031402233 郑扬 ...

  6. 去掉网址中的 html编码

    修改 web\urlManager  createUrl函数,去掉 urlEncode函数

  7. Surround the Trees(凸包)

    Surround the Trees Time Limit: 2000/1000 MS (Java/Others)    Memory Limit: 65536/32768 K (Java/Other ...

  8. 关于Mybatis的SQL映射文件中in关键字的用法

    有一个需求是可以选择多个设备进行删除,于是想到将多个设备id拼成字符串作为参数,以逗号隔开,如:"123,234,456". SQL如下: <delete id=" ...

  9. ORACLE中的字符串替换 replce、regexp&lowbar;replace 和 translate

    一.语法 replace(str_source,str1,str2)  把 str_source 中 str1 字符串替换为 str2 字符串,当 str2 为 null 或'' 时,与下个作用相同 ...

  10. CCNA实验3&period;单臂路由器

    拓扑图: 一.交换机配置 通过路由器子接口的方式实现vlan之间的路由. conf t vlan 10 vlan 20 int f0/1 switchport access vlan 10 int f ...