Storm 的新消息传输机制

作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明
网址: http://xumingming.sinaapp.com/1007/storm-new-messaging-merchanism/

Storm 0.9 里面引入了新的消息传输机制,在这之前消息传输机制是写死的用的 ZeroMQ, 而在 Storm 发展的过程中发现 ZeroMQ(JZMQ) 存在一些问题,比如 ZeroMQ 本身是 C 实现的,所使用的内存不受 Java 的控制,是的我们没有办法通过-Xmx 参数来调节 Storm 的内存使用;ZeroMQ 对于 Storm 来说有点黑盒的感觉,因此 Storm 想要获得的一些信息比如有多少消息被 buffer 了没有发送出去是获得不了的;而也因为 ZeroMQ 是非 Java 实现的,使得 Storm 的安装过程复杂了一些。

新引入的消息传输机制使得消息传输机制是可以被配置替换的,这意味着我们不必被捆绑于某种具体的传输机制比如 ZeroMQ,来自 Yahoo 的 Andy Feng 实现了一个基于 Netty 的纯 Java 的消息传输机制。虽然说消息传输机制是可以被替换的,但是要实现一个可用的消息传输层还是需要满足一些条件的,以满足 Storm 的语意:

  • 消息发送方可以在连接建立之前发送消息,而不需要等连接建立起来,因为这时候消息接收方可能还没有运行起来。因此这就需要在消息传输的 Client 端有个 buffer,在连接没有建立之前把要发送的消息 buffer 起来。
  • 在消息传输层,消息『最多只能发送一次』,因为在 Storm 层面有 ACK 机制来保证没有被发送成功的消息会被重发,如果传输层面自己再重发,会导致消息被发多次。

新的消息传输机制由两个接口来定义 backtype.storm.messaging.IContext 和 backtype.storm.messaging.IConnection。IContext 负责客户端和服务器端连接的建立,主要定义了四个方法:

  • prepare(Map stormConf) —  遵从 Storm 的风格定义的 prepare 方法,可以把 storm 的配置接收进来
  • term() — term 是 terminate 的意思,这个方法会在 worker 卸载这个消息传输插件的时候调用,我们实现的时候可以在这里释放占用的资源、链接之类的。
  • bind(String topologyId, int port) — 建立一个服务器端的连接
  • connect(String stormId, String host, int port) — 建立一个客户端的连接

IConnection 则定义了在其上发送、接受数据的接口:

  • recv(int flag) — 接收消息
  • send(int taskId, byte[] payload) —  发送消息
  • close() — 该连接关闭的时候调用以释放资源之类的

因为消息传输插件是可插拔的,不是核心代码的一部分,因此 使用 netty 实现的传输插件被放在了一个单独目录里面,Storm 整个的源代码结构也从原来的单一项目变成了多项目(你可以在 Storm 源码里面找到多个 project.clj)。因为是基于 netty 的,绝大部分关于消息传输的事情 netty 已经都搞定了,因此  storm-netty 的实现非常的简单,有兴趣的可以打开源码研究一下。

发表在 storm | 29 条评论

Clojure惰性序列的头保持问题

作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明
网址: http://xumingming.sinaapp.com/977/clojure-lazyseq-head-retention/

 
《Clojure编程》一书中有一个例子:

(let [[t d] (split-with #(< % 12) (range 1e8))]
  [(count d) (count t)])
;= OutOfMemoryError Java heap space  clojure.lang.ChunkBuffer.<init> (ChunkBuffer.java:20);

(let [[t d] (split-with #(< % 12) (range 1e8))]
  [(count t) (count d)])
;= [12 99999988]

只是(count t) (count d)的顺序不同,前一段代码会抛OutOfMemoryError错误,后一个则完全没有问题,书里面解释的不是很详细,这里展开详细说下其中的缘由。这里面涉及到Clojure里面集合的数据结构共享机制。

Clojure数据结构共享

我们知道Clojure里面强调的是不可变数据结构,几乎任何操作都不会改变现有值,而是会产生新值,比如我们有这么一个惰性序列:

(let [h (range 1 6)])

它在内存中的表示大概是这样的:

头指针h指向第一个元素(还没有被实例化的元素), 这些格子是虚线的,表示这些格子还没有实例化。现在我们执行下列代码:

(let [h1 (next h))

(next h)并不会改变h本身,而是会产生一个新的h1序列,那么Clojure会把h里面所有的内存元素都拷贝一遍以产生这个新的h1么?当然不会,Clojure没那么傻,内存结构会变成下面这样:

一个元素都没有复制,只是产生了一个新的h1头指针,指向了h序列的第二个元素,原来的h头指针还是指向第一个元素,这样虽然从程序员的角度来看有两个独立的序列hh1, 但是内存里面只有一份数据。这就是Clojure里面的数据结构共享机制。

count的执行过程

要解释我们前面提到的问题,我们先看看count一个惰性序列是怎么样的一个过程,这个过程中的内存占用是怎么样的。

(let [t (range 1 6)]
  (count t))

我们来看看上面的代码是怎么执行的。(range 1 6)在内存里面的结构在上面介绍数据结构共享机制的时候已经展示过了,会有一个头指针指向这个数据结构的头,我们看看在count执行过程中,内存结构会发生怎样的变化。

Clojure里面的count函数最终是调用clojure.lang.RT.countFrom(Object obj)来实现的,下列代码是当要count的集合是惰性序列的时候执行的逻辑:

可以看出对于惰性序列(以及其它持久性的数据结构),count是通过for循环遍历集合里面的每个元素(从而实例化每个元素)来计算出惰性序列的数量的。遍历的时候调用的是s.next()方法,s.next()方法相当于调用(next s), 因此产生的也是一个新的持久性集合。遍历完第一个元素之后的内存结构是这样的:

上面的第一个元素(1)会被JVM回收掉的。也许有人会问了,上面(count t)在执行的时候,t还在有效作用域内,它下面的元素怎么会被垃圾回收呢?不是应该等(count t)全部都执行完等程序控制流出了这个作用域才能回收t所占用的内存吗?在Java里面是也许是这样的,但是Clojure里面对这方面做了优化,Clojure的编译器发现t在当前作用域后面没有再被用到了((count t)后面已经没有再用到t了),因此可以放心地把不再被引用的元素1回收掉,这种技术叫做locals clearing[1]。

以此类推,不管要count的惰性序列所含数据量有多大,count所占用的内存都是恒定的,因此下面的代码是不会导致OutOfMemoryError的:

(count (range 1e8))

从这里我们可以总结出来一个道理:我们讨论的这个头保持问题不是count本身导致的

头保持(head retention)

我们再来看看下面代码求头尾两个count的时候内存中的数据结构会怎么样:

(let [[t d] (split-with #(< % 4) (range 1 6))]
  [(count d) (count t)])

[(count d) (count t)]执行之前,整个序列是这样的:

t的头指向第一个元素,d的头指向第四个元素。现在先执行(count d)(Clojure代码是从左向右执行的), count过第一个元素之后整个序列是这样的:

注意,这里4这个元素是无法被垃圾回收掉的,因为整个数据的头还被t引用,因此整个数据结构上的任意节点都是不能被垃圾回收。想想如果d后面有很多数据,那么都得存在内存里面不能被回收,最后的结果就是OutOfMemoryError

如果我们稍微调换下两个count的顺序呢:

(let [[t d] (split-with #(< % 4) (range 1 6))]
  [(count t) (count d)])

那么这样Clojure会先执行(count t), count到第二个元素的时候内存结构是这样的:

这里已经实例化的元素1是可以被垃圾回收的,因为两个头指针t,d都在元素1的后面,已经没有人需要这个元素1了,因此它是可以被垃圾回收的。因此不管d后面有多少数据,只要我们先执行的是(count t), 整个序列的头不被保持,那么在我们count过程中内存会被不断的回收,不会有所有元素保持在内存的问题,因此也就不会有OutOfMemoryError的问题了。

参考资料

[1] Rich关于local clearing的简介: https://groups.google.com/forum/?fromgroups=#!msg/clojure/FLrtjyYJdRU/1gzChYsmTpsJ
[2] 邮件列表里面一大牛关于head retention的解释: https://groups.google.com/forum/?fromgroups=#!topic/clojure/bTAYeLXc25w

发表在 clojure | 3 条评论

在SAE上部署Clojure应用

作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明
网址: http://xumingming.sinaapp.com/957/deploy-clojure-app-to-sae/



Clojure的一个优点是它的应用可以部署到任何Java应用可以部署的地方, Sina App Engine(SAE)现在已经支持Java应用了,最近尝试了把一个小应用部署到SAE上面,下面给大家介绍一下。

我写的是一个计算公积金贷款每月还款额度的一个小应用[1]。它主要用到了以下两个包:

  1. ring — web框架
  2. ring.velocity — velocity模板引擎的clojure版本

首先SAE支持的是标准的war包,而lein有个ring的插件支持直接将ring应用打包成war包:

lein ring uberwar <sae-appname>.war

这里有两个需要注意的地方:

  1. SAE要求你上传的war包的名字必须是<sae-appname>.war, 我的应用名是loan,因此我的war包叫loan.war
  2. 要使用lein ring uberwar, 而不是简单lein ring war。因为要把所有的依赖打到这个war包里面去。

因为用到ring.velocity, 在部署的时候还遇到了两个小问题:

一是通常我们是让velocity加载文件系统里模板文件,配置是这样的:

file.resource.loader.class =
org.apache.velocity.runtime.resource.loader.FileResourceLoader

而我们这里要加载的是war包里面的资源,因此要改成加载classpath里的资源:

file.resource.loader.class =
org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader

另一个问题是ring.velocity默认会向当前目录下写它的运行时日志, 文件名为velocity.log, 而由于SAE提供的是一个沙箱环境,它是不允许你去直接写文件系统的,因此我这里直接把运行时日志给禁掉了:

runtime.log.logsystem.class=
org.apache.velocity.runtime.log.NullLogSystem

搞定这些配置,把这个war包部署到SAE上就完全没问题了,还是很轻松方便的。

[1] http://loanv.sinaapp.com

发表在 clojure | 一条评论

《Clojure编程》译者序

作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明
网址: http://xumingming.sinaapp.com/901/clojure-programming-translator-foreword/



还记得第一次接触Clojure是在JavaEye论坛上,那是庄晓丹同学的一篇名为《几行代码解决淘宝面试题之Clojure版》[0]的帖子,给了我很深的印象。淘宝那道面试题目是:

“有一个很大的整数链表,需要求这个链表中所有整数的和,写一个可以充分利用多核CPU的代码来计算结果。”

用Java来解答这道题需要七八十行的代码[1]:程序员要自己手动把这个大链表切割成一个个子链表;自己手动创建多个线程去分别计算切割出来的子链表;然后再等待所有线程做完之后对结果进行归约以计算出最后的结果。确实是个很难的面试题,涉及到的知识点包括线程、线程池、线程同步等等,一般人还真不一定能把它写出来。而在Clojure看来完全不能作为面试题 —— 因为利用Clojure来解决太简单了,稍微学过一点Clojure,掌握了map, reduce, pmap这几个基本函数的同学都能搞定。这个对比是很震撼的,Clojure解决起问题来是那么的简洁、优雅、自然,这算是我对Clojure的初体验吧。

在Clojure之前我也尝试过几种函数式语言,比如“最纯的函数式语言”Haskell、“天生擅长高并发的”Erlang,这些都是很棒的语言,但是学过之后一段时间就忘了,因为工作、生活中实在是用不到。而Clojure对我则有独特的吸引力,首先因为它是LISP —— 一门富有传奇色彩的语言,一直希望有机会可以学习一门LISP的方言;其次Clojure是一门接地气的语言,它运行在JVM这个最成功、应用最广泛平台之上,能够跟Java代码无缝互操作,JVM上所有资源都可以为Clojure所用。

Clojure是这样的有潜力、接地气,那么如果你要选择一门新语言来玩玩,不选它选什么?

对Clojure有了初次接触之后就有了深入学习的想法,在网络上到处寻找资料,找到的资料几乎都是英文的,中文资料是少之又少。最终发现一篇比较简单而又相对完整的教程[2], 当时就想如果把这个教程翻译成中文既可以加深自己对于Clojure概念的理解,也可以丰富Clojure的中文资料便于以后对这门语言感兴趣的同学查阅,于是边学边把这篇教程给翻译了一下[3]。目前用Google搜Clojure中文资料这篇文章还是排在第一位的,深感欣慰。

“空有一身的Clojure本事,没地方施展啊!”学了一段时间Clojure之后有了这样的感觉,相信这也是很多利用业余时间学习新语言的同学的困惑。这时恰逢Twitter开源了他们的实时计算框架Storm,我对实时计算很感兴趣而它又是用Clojure开发的,太好了,正愁没有项目练手呢,于是去读它的源代码。看到真正在生产环境使用的Clojure代码才发现自己还有很多东西是不会的,对于Clojure只是学会了形,没有学到神。比如Clojure对于函数作为头等公民的强调、Clojure对于值语义的注重,这些对于从Java世界过来的程序员来说,不是那么容易转换思维的(本书有专门一节强调要转换思维、讲解如何转换思维,读了之后获益良多,建议大家重点看一下)。而通过学习Storm源码学到很多这方面活生生的例子,对自己的水平有很大的提高,这里也建议每个程序员都应该参与到开源项目里面去,会收益良多的。

去年5月张春雨同学在Clojure中文邮件列表里面发帖给本书找译者,因为自己之前也看过一些,而且也算有一定的Clojure编码经验、有一定的翻译经验,于是就把这个活应了下来。这本书大体可以分为两部分,前半部分重点讲解Clojure的基本概念、原理;后半部分则介绍用Clojure来解决实际编程生活中的各种问题的实例。你既可以从最基本原理开始,“自底向上”的去读;也可以从实例入手,去看看怎么用Clojure去解决你感兴趣的问题,遇到不懂的概念再去前半部分找对应的章节深入读。

翻译这本书的过程也是一个学习的过程,之前零散地学了很多Clojure知识,但都不是很系统,《Clojure编程》给我最大的帮助是帮我把这些零碎的知识点都串起来了,形成了一个系统。它从最基本的原理讲起,在讲解各个知识点的同时由浅入深地把背后的原理都一点点娓娓道来,翻译的过程中不时地发出这样的感叹:“原来那么实现是因为这个原因啊”,每天都会发现自己又多懂了那么一点点,这算是翻译过程中最大的乐趣吧,相信大家去读这本书的时候也会有类似的体会。

翻译的过程中得到了很多人的帮助,首先感谢我的老爸老妈,是你们把我带到了这个世界,才有机会去探索这神奇的计算机世界;感谢我自己,这几个月你格外辛苦了,没想到你的名字也会出现在一本书的封面上,干得不错;感谢小废同学,如果没有你的捣乱,这本书可能去年就已经面市了;感谢我的合译者杨寿勋同学,没有你的通力合作,这本书不会这么快上市;感谢我们的审稿同学huangz、孙宁、庄晓丹,是你们使得这本书提高了一个档次,帮我挑出了那么多技术上的、语法上的错误; 感谢我们的策划编辑张春雨同学,没有你这本书的译者可能就是别人了;感谢我们的编辑刘舫,虽然从来没算正式的打过交道,但是从一次次反馈中感受到了你编辑工作的严谨;感谢所有帮助过我的人。

{:author           “徐明明”
:date: “2013年3月6日”
:city:                  ”杭州”}

[0] http://www.iteye.com/topic/713259
[1] http://www.iteye.com/topic/711162
[2] http://java.ociweb.com/mark/clojure/article.html
[3] http://xumingming.sinaapp.com/302/clojure-functional-programming-for-the-jvm-clojure-tutorial/

发表在 clojure | 11 条评论

Twitter Storm: How to develop a pluggable scheduler

作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明
网址: http://xumingming.sinaapp.com/885/twitter-storm-how-to-develop-a-pluggable-scheduler/

 

Storm 0.8.0 will introduce a new amazing feature named pluggable scheduler, what scheduler does is schduling the task assignment: which task should run on which supervisor(machine). So it opens up the opportunity to do a lot of things, for example:

  • make sure a particular task run on a particular machine
  • Topology priority: when the resource is limited, always make sure TopologyA is scheduled first.
  • There are two CPU intensive bolts, we dont want them to be assigned to the same machine
  • and so on…

This blog post will walk through a simple scheduler demo(the first usecase we listed above) to show the steps to develop a scheduler.

Problem Description

Lets retell the usecase in more detail, what we are going to achieve is:

We have a topology named special-topology, and in this topology we have a topology named special-spout, this spout need to use some software which needs license, we only have the license for a purticular machine named special-machine. Our scheduler need to make sure our special-spout always assigned to supervisors on special-machine.

Two questions need to be clarified for this problem:

How do we specify topology name?

Topology name is specified in the command line when you submit the topology to storm cluster, e.g.:

    $STORM_HOME/bin/storm jar storm-starter-0.0.1-SNAPSHOT.jar storm.starter.ExclamationTopology special-topology

How to tell whether a topology is on the “special-machine”?

It’s simple. Scheduler provides a new config item: supervisor.scheduler.meta, you can config anything in it, storm itself will not use this config, for our case we can config like this:

    supervisor.scheduler.meta:
      name: "special-supervisor"

then, in the java code we can get this metadata using:

    supervisor.getSchedulerMeta()

So we can config this for all the supervisors on the special-machine, and in java code we check whether the supervisor’s “name” is special-supervisor.

Lets begin

What we need to implement is clear, now lets begin to implement. The steps will be:

  • Check whether our topology needs scheduling. Because the scheduler will be called periodically, so it is possible that our topology is already scheduled when the scheduler is called.
  • If the topology needs scheduling, then we need to check whether our special-spout needs scheduling, the reason is the same as above.
  • If our special-spout needs scheduling, we need to find out the available special-supervisor.
  • There are cases when the special-supervisor is occupied by other topologies or other spouts/bolts, we need to free some slots(supevisor), so our special-spout can use it.
  • We assign the special-spout to the special-supervisor.

Now, lets walk through each of the steps.

Check whether our topology needs scheduling

To implement a pluggable scheduler we need to implement backtype.storm.scheduler.IScheduler. It defines a single method which has the following signature:

    /**
     * Set assignments for the topologies which needs scheduling. The new assignments is available
     * through <code>cluster.getAssignments()</code>
     *
     *@param topologies all the topologies in the cluster, some of them need schedule. Topologies object here
     *       only contain static information about topologies. Information like assignments, slots are all in
     *       the <code>cluster</code>object.
     *@param cluster the cluster these topologies are running in. <code>cluster</code> contains everything user
     *       need to develop a new scheduling logic. e.g. supervisors information, available slots, current
     *       assignments for all the topologies etc. User can set the new assignment for topologies using
     *       <code>cluster.setAssignmentById</code>
     */
    public void schedule(Topologies topologies, Cluster cluster);

our special-topology is in the topologies object, lets get it out:

        // Gets the topology which we want to schedule
        TopologyDetails topology = topologies.getByName("special-topology");

cluster object contains all the information about the cluster, to check whether our topology needs scheduling we need to ask cluster:

    boolean needsScheduling = cluster.needsScheduling(topology);

Check whether our special-spout needs scheduling

Because it is possible that part of a topology is scheduled, and part of a topology is not, so we need to check whether our special-spout needs scheduling, again ask the cluster, first lets figure out all the components which needs scheduling:

    Map<String, List<ExecutorDetails>> componentToExecutors = cluster.getNeedsSchedulingComponentToExecutors(topology);

The returned map is a mapping from component-name to the corresponding executors, so to check, we just need to check whether the returned map contains our special-spout:

    componentToExecutors.containsKey("special-spout")

Find the special-supervisor

Now we know the special-spout needs scheduling, we need to find out the special-supervisor. First lets find out all the supervisors:

    Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();

Then to find out the special-supervisor, we need to filter it by name:

    SupervisorDetails specialSupervisor = null;
    for (SupervisorDetails supervisor : supervisors) {
        Map meta = (Map) supervisor.getSchedulerMeta();

        if (meta.get("name").equals("special-supervisor")) {
            specialSupervisor = supervisor;
            break;
        }
}

Get the available slots

Now we have the special-supervisor, but we dont know whether there are any available slots for us to use, lets check it:

    List<WorkerSlot> availableSlots = cluster.getAvailableSlots(specialSupervisor);

if there is indeed no available slots, lets free out some slots, because this topology is our first priority:

    if (availableSlots.isEmpty() && !executors.isEmpty()) {
        for (Integer port : cluster.getUsedPorts(specialSupervisor)) {
            cluster.freeSlot(new WorkerSlot(specialSupervisor.getId(), port));
        }
    }

Assign the special-spout to special-supervisor

Assign is simple:

    // re-get the aviableSlots
    availableSlots = cluster.getAvailableSlots(specialSupervisor);

    // since it is just a demo, to keep things simple, we assign all the
    // executors into one slot.
    cluster.assign(availableSlots.get(0), topology.getId(), executors);

Are we done? NO. Smart reader might already noticed that we have only assigned the special-spout, what about other “regular” spouts/bolts? You would say my scheduler dont care, please take care of them for me, fortunately storm does provide a merchanism: Your scheduler can call other scheduler to scheduling, Storm provides two system scheduler:

  • backtype.storm.scheduler.EvenScheduler
  • backtype.storm.scheduler.DefaultScheduler

DefaultScheduler is built on top of EvenScheduler, if you dont specify your custom scheduler, DefaultScheduler is used, you do not want to use it direclty, most time you will want to use EvenScheduler, it will schedule the remaining spouts/bolts to the remaining slots evenly:

    // let system's even scheduler handle the rest scheduling work
    // you can also use your own other scheduler here, this is what
    // makes storm's scheduler composable.
    new EvenScheduler().schedule(topologies, cluster);

And as the last step, we need to put the jar which contains the scheduler code in nimbus’s $STORM_HOME/lib, and add the following config to tell nimbus to use this scheduler:

    storm.scheduler: "storm.DemoScheduler"

That’s all. The full source code for this DemoScheduler is on github: DemoScheduler.

发表在 storm, 未分类 | 18 条评论

Twitter Storm邮件组

作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明
网址: http://xumingming.sinaapp.com/867/twitter-storm-mailinglist-cn/

如果你使用Storm的过程中有问题,可以从以下几条途径寻求帮助:

发表在 storm, 未分类 | 15 条评论

Twitter Storm的新利器Pluggable Scheduler

作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明
网址: http://xumingming.sinaapp.com/854/twitter-storm-pluggable-scheduler/

 

最近把Twitter Storm的新特性:可插拔式的任务分配器(Pluggable Scheduler)实现了,将在0.8.0版本里面跟大家见面。这篇文章先给大家尝尝鲜,介绍下这个新特性。

在Pluggable Scheduler之前,Twitter Storm里面对于用户提交的每个Topology进行任务分配是由nimbus来做的,nimbus的任务分配算法可是非常牛逼的哦,主要特点如下

  • 在slot充沛的情况下,能够保证所有topology的task被均匀的分配到整个机器的所有机器上
  • 在slot不足的情况下,它会把topology的所有的task分配到仅有的slot上去,这时候其实不是理想状态,所以。。
    • 在nimbus发现有多余slot的时候,它会重新分配topology的task分配到空余的slot上去以达到理想状态。
  • 在没有slot的时候,它什么也不做

一般情况下,用这种默认的task分配机制就已经足够了。但是也会有一些应用场景是默认的task分配机制所搞定不了的,比如

  • 如果你想你的spout分配到固定的机器上去 — 比如你的数据就在那上面
  • 如果你有两个topology都很耗CPU,你不想他们运行在同一台机器上

这些情况下我们默认的task分配机制虽然强大,却是搞不定的,因为它根本就不考虑这些。所以我们设计了新的Pluggable Scheduler机制,使得用户可以编写自己的task分配算法 — Scheduler来实现自己特定的需求。下面我们就来亲自动手来看看怎么才能实现上面提到的默认Scheduler搞不定的第一个场景,为了后面叙述的方便,我们来细化一下这个需求:让我们的名为special-spout的组件分配到名为special-supervisor的supervisor上去

要实现一个Scheduler其实很简单,只要实现IScheduler

public interface IScheduler {
    /**
     * Set assignments for the topologies which needs scheduling. The new assignments is available
     * through <code>cluster.getAssignments()</code>
     *
     *@param topologies all the topologies in the cluster, some of them need schedule. Topologies object here
     *       only contain static information about topologies. Information like assignments, slots are all in
     *       the <code>cluster</code>object.
     *@param cluster the cluster these topologies are running in. <code>cluster</code> contains everything user
     *       need to develop a new scheduling logic. e.g. supervisors information, available slots, current
     *       assignments for all the topologies etc. User can set the new assignment for topologies using
     *       <code>cluster.setAssignmentById</code>
     */
    public void schedule(Topologies topologies, Cluster cluster);
}

这个接口会提供两个参数,其中Topologies包含当前集群里面运行的所有Topology的信息:StormTopology对象,配置信息,以及从task到组件(bolt, spout)id的映射信息。Cluster对象则包含了当前集群的所有状态信息:对于系统所有Topology的task分配信息,所有的supervisor信息等等 — 已经足够我们实现上面的那个需求了,让我们动手吧

找出我们的目标Topology

首先我们要确定我们的topology是否已经提交到集群了,很简单,到topologies对象里面找找看,找到了的话就说明已经提交了。

// Gets the topology which we want to schedule
TopologyDetails topology = topologies.getByName("special-topology");

只要这个topology不为null的话就说明这个topology已经提交了。

目标Topology是否需要分配

紧接着我们要看看这个topology需不需要进行task分配 — 有可能之前分配过了。怎么弄呢?很简单,Cluster对象已经提供了api可以使用

boolean needsScheduling = cluster.needsScheduling(topology);

这里要说明的一点是,有关Scheduler编写的几乎所有api都是定义在Cluster类里面,大家只要把这个类搞熟悉,编写起Scheduler起来应该就得心应手了。如果这个topology需要进行task分配我们还要看下有那些task需要进行分配 — 因为可能有部分task已经被分配过了

// find out all the needs-scheduling components of this topology
Map<String, List<Integer>> componentToTasks = cluster.getNeedsSchedulingComponentToTasks(topology);

我们的目标spout是否需要分配?

因为我们的目标是让名为special-spout的组件运行在名为special-supervisor的supervisor上,所以我们要看看这些task里面有没有是属于special-spout的task,很简单,上面返回的componentToTasks就是从component-idtask-ids的一个映射。所以要找出special-spout就很简单了

List<Integer> tasks = componentToTasks.get("special-spout");

找出目标supervisor

找到我们要分配的task之后,我们还要把我们的special-supervisor找出来,Cluster同样提供了方便的方法:

// find out the our "special-supervisor" from the supervisor metadata
Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
SupervisorDetails specialSupervisor = null;
for (SupervisorDetails supervisor : supervisors) {
    Map meta = (Map) supervisor.getSchedulerMeta();

    if (meta.get("name").equals("special-supervisor")) {
       specialSupervisor = supervisor;
       break;
    }
}

这里要特别说明一下Map meta = (Map) supervisor.getSchedulerMeta();, 我们前面说名为special-supervisor的supevisor,其实在storm里面supervisor是没有名字的,这里我们所谓的名字是从supervisor.getSchedulerMeta里面找出来的,这个schedulerMeta是supervisor上面配置的给scheduler使用的一些meta信息,你可以配置任意信息!比如在这个例子里面,我在storm.yaml里面配置了:

supervisor.scheduler.meta:
  name: "special-supervisor"

这样我们才能用meta.get("name").equals("special-supervisor")找到我们的special-supervisor到这里我们就找到了我们的special-supervisor,但是要记住一点的是,我们的集群里面有很多topology,这个supervisor的slot很可能已经被别的topology占用掉了。所以我们要检查下有没有slot了

List<WorkerSlot> availableSlots = cluster.getAvailableSlots(specialSupervisor);

判断上面的availableSlots是不是空就知道有没有空余的slot了,如果没有slot了怎么办?没别的topology占用掉了怎么办?很简单!把它赶走

// if there is no available slots on this supervisor, free some.
if (availableSlots.isEmpty() && !tasks.isEmpty()) {
    for (Integer task : specialSupervisor.getAllPorts()) {
        cluster.freeSlot(new WorkerSlot(specialSupervisor.getId(), task));
    }
}

最后一步:分配

到这里为止呢,我们要分配的tasks已经有了,要分配到的slot也搞定了,剩下的就分配下就好了(注意,这里因为为了保持例子简单,代码做了简化)

// re-get the aviableSlots
availableSlots = cluster.getAvailableSlots(specialSupervisor);

// since it is just a demo, to keep things simple, we assign all the
// tasks into one slot.
cluster.assign(availableSlots.get(0), topology.getId(), tasks);

我们的目标实现了! 随着cluster.assign的调用,我们已经把我们的special-spout分配到special-supervisor上去了。不难吧 :)

别的任务谁来分配?

不过有件事情别忘了,我们只给special-spout分配了task, 别的task谁来分配啊?你可能会说我不关心啊,没关系,把这个交给系统默认的分配器吧:我们已经把系统的默认分配器包装到backtype.storm.scheduler.EvenScheduler里面去了,所以你简单调用下就好了

new backtype.storm.scheduler.EvenScheduler().schedule(topologies, cluster);

让Storm知道我们的Scheduler

哦,有一件事情忘记说了,我们完成了我们的自定义Scheduler,怎么让storm知道并且使用我们的Scheduler呢?两件事情:

  • 把包含这个Scheduler的jar包放到$STORM_HOME/lib下面去
  • 在storm.yaml 里面作如下配置:
    storm.scheduler: "storm.DemoScheduler"
    

这样Storm在做任务分配的时候就会用你的storm.DemoScheduler, 而不会使用默认的系统Scheduler

这个例子的完整代码可以在这里看到。

发表在 storm | 29 条评论

clojure.core学习之collections

作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明
网址: http://xumingming.sinaapp.com/850/clojure-core-collections/

 

今天学习的是clojure.core里面跟集合相关的一些函数

count

作用: 返回集合元素个数。

user=> (count (range 10))
10
user=> (count [1 2 3])
3
user=> (count '(1 2 3))
3
user=> (count {:name "james" :age 20})
2
user=> (count #{:james :jack :rose :bond})
4

empty

作用: 返回跟所给参数类型相同的空集合。

user=> (empty [1 2])
[]
user=> (type (empty [1 2]))
clojure.lang.PersistentVector
user=> (empty '(1 2))
()
user=> (type (empty '(1 2)))
clojure.lang.PersistentList$EmptyList
user=> (empty {:james :bond})
{}
user=> (type (empty {:james :bond}))
clojure.lang.PersistentArrayMap
user=> (empty #{1 2 "james"})
#{}
user=> (type (empty #{1 2 "james"}))
clojure.lang.PersistentHashSet

not-empty

作用:如果所给集合是空的后者是nil那么返回nil, 否则返回所给参数。

user=> (not-empty nil)
nil
user=> (not-empty [])
nil
user=> (not-empty '(1))
(1)

into

作用:返回两个集合conj的结果。

user=> (into [] [1 2 3])
[1 2 3]
user=> (into [1 2] [3 4])
[1 2 3 4]
user=> (into '(1 2) [3 4])
(4 3 1 2)

conj

作用: 把元素添加到给定集合,所加的位置则是所给集合效率最高的位置。

user=> (conj [1 2] 3 4)
[1 2 3 4]
user=> (conj '(1 2) 3 4)
(4 3 1 2)

contains?

作用: 检查所给集合是否包含某个元素。

user=> (contains? [100 200] 1)
true
user=> (contains? [100 200] 100)
false
user=> (contains? #{100 200} 100)
true
user=> (contains? #{100 200} 1)
false

distinct?

作用: 所给集合是否有相同元素。

user=> (distinct? [1 2 1])
true
user=> (distinct? '(1 2 3))
true

empty?

作用: 检查所给集合是否是空的。

user=> (empty? [])
true
user=> (empty? nil)
true
user=> (empty? {})
true

every?

user=> (every? even? [1 2])
false
user=> (not-every? even? [1 2])
true

not-every?

every?相反.

some

user=> (some even? [1 2])
true
user=> (not-any? even? [1 2])
false

not-any?

some相反。

sequential?

作用:检测集合是否实现了Sequential

user=> (sequential? [1 2])
true
user=> (sequential? #{1 2})
false

associative?

作用:检测集合是否实现了Sequential

user=> (associative? [1 2])
true
user=> (associative? #{1 2})
false
user=> (associative? {:james :bond})
true

sorted?

作用:检测集合是否实现了Sorted

user=> (sorted? [1 2])
false
user=> (sorted? #{1 2})
false
user=> (sorted-set)
#{}
user=> (sorted? (sorted-set [1 2]))
true

counted?

作用:检测集合是否实现了Counted

user=> (counted? [1 2])
true
user=> (counted? '(1 2))
true
user=> (counted? #{1 2})
true
user=> (counted? {1 2 3 4})
true

reversible?

作用:检测集合是否实现了Reversible

user=> (reversible? [1 2])
true
user=> (reversible? #{1 2})
false
user=> (reversible? (sorted-set [1 2]))
true

coll?

作用: 检查集合是否实现IPersistentCollection

user=>(map coll? [[] '() #{} {}])
(true true true true)

seq?

作用: 检查集合是否实现ISeq

user=> (map seq? [[] '() #{} {}])
(false true false false)

vector?

作用: 检查集合是否实现IPersistentVector

user=> (map vector? [[] '() #{} {}])
(true false false false)

list?

作用: 检查集合是否实现IPersistentList

user=> (map list? [[] '() #{} {}])
(false true false false)

map?

作用: 检查集合是否实现IPersistentMap

user=> (map map? [[] '() #{} {}])
(false false false true)

set?

作用: 检查集合是否实现IPersistentSet

user=>  (map set? [[] '() #{} {}])
(false false true false)
发表在 clojure | 3 条评论