关于storm的一些记录

echosilly 2014-08-12

Storm分享文档

一.    快速开始

1.开发环境:
安装jdk、zookeeper、python。对于0.9版本发布后,引入新的消息传输机制,不需要再依赖ZeroMQ(后续将详细说明)。

2.安装:
下载release版本,解压缩即可。配置文件在./storm/conf/storm.yaml。配置项相当清晰,见名知意
storm.zookeeper.servers:zookeeper集群主机
nimbus.host:nimbus节点主机
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
这是默认的worker port,当在建立topology结构时,需要设置conf.setNumWorkers(?);如果当前storm集群中的worker总数小于你这里设置的worker数量时,需要对上述配置项进行修改,添加到大于或等于,才能成功启动相应的worker进程数量。

3.启动:
启动zookeeper集群;
storm nimbus (启动nimbus节点进程)-- 相当于Map/Reduce的Jobtracker
storm supervisor (启动supervisor节点进程) -- 相当于Tasktracker
storm jar storm_demo.jar com.cmdb.storm.topo.DemoTopology (启动storm demo)
//----- 辅助   
storm ui(启动strom web控制台进程)   
storm logviewer (启动后,可在web控制台上查看工作节点的日志)

4.建立storm项目
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.1-incubating</version>
<scope>provided</scope>  <!—这里必须为provided
</dependency>

二.    结构
1.    storm有两种操作模式: 本地模式和远程模式。
使用本地模式的时候,你可以在你的本地机器上开发测试你的topology, 一切都在你的本地机器上模拟出来; 用远端模式的时候你提交的topology会在一个集群的机器上执行。
2.    组件:
 
storm的集群表面上看和hadoop的集群非常像。但是在Hadoop上面你运行的是MapReduce的Job, 而在Storm上面你运行的是Topology。它们是非常不一样的 — 一个关键的区别是: 一个MapReduce 的Job最终会结束, 而一个Topology将永远运行(除非你显式的杀掉他)。两种节点: 控制节点(master node)和工作节点(worker node)。
控制节点上面运行一个后台程序:Nimbus, 它的作用类似Hadoop里面的JobTracker。Nimbus负责在集群里面分布代码,分配工作给机器, 并且监控状态。
每一个工作节点上面运行一个叫做Supervisor的进程。Supervisor会监听分配给它那台机器的工作,根据需要 启动/关闭工作进程。每一个工作进程执行一个Topology的一个子集;一个运行的Topology由运行在很多机器上的很多工作进程组成。
Nimbus和Supervisor之间的所有协调工作都是通过一个Zookeeper集群来完成。并且,nimbus进程和supervisor都是快速失败(fail-fast)和无状态的。所有的状态要么在Zookeeper里面, 要么在本地磁盘上。这也就意味着你可以用kill -9来杀死nimbus和supervisor进程, 然后再重启它们,它们可以继续工作,就好像什么都没有发生过似的。这个设计使得storm不可思议的稳定。

三.    关键概念。
 
1.    Topologies:
一个实时计算应用程序的逻辑在storm里面被封装到topology对象里面。一个Topology是Spouts和Bolts组成的图状结构, 而链接Spouts和Bolts的则是Stream groupings。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new SearchCiSpout(), 1);
builder.setBolt("collector", new CollectorBolt(), collect).shuffleGrouping("spout");
builder.setBolt("insert", new InsertTsdbBolt(), insert).shuffleGrouping("collector");
builder.setBolt("alarm", new AlarmMetricBolt(), alarm).shuffleGrouping("collector");

Config conf = new Config();
//conf.setDebug(true);
conf.setNumWorkers(worker);

2.    数据模型:Data Modal
storm使用tuple来作为它的数据模型。每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型,一个tuple可以看作一个没有方法的java对象。总体来看,storm支持所有的基本类型、字符串以及字节数组作为tuple的值类型。也可以使用自己定义的类型来作为值类型, 只要实现对应的序列化器(serializer)。

declarer.declare(new Fields("points", "metrics", "ci"));

3.    消息源:Spouts
消息源Spouts是storm里面一个topology里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向topology里面发出 消息: tuple。消息源Spouts可以是可靠的也可以是不可靠的。一个可靠的消息源可以重新发射一个tuple如果这个tuple没有被storm成功的处理, 但是一个不可靠的消息源Spouts一旦发出一个tuple就把它彻底忘了 — 也就不可能再发了。
消息源可以发射多条消息流stream。使用OutFieldsDeclarer.declareStream来定义多个stream, 然后使用SpoutOutputCollector来发射指定的sream。
Spout类里面最重要的方法是nextTuple要么发射一个新的tuple到topology里面或者简单的返回如果已经没有新的tuple了。要注意的是nextTuple方法不能block Spout的实现, 因为storm在同一个线程上面调用所有消息源Spout的方法。即是说不能在spout的方法内使用任何block线程的方法,这样会影响spout的性能。
另外两个比较重要的Spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack, 否则调用fail。storm只对可靠的spout调用ack和fail。
如果不关心数据是否丢失(例如数据统计分析的典型场景),最好不要启用ack机制。

4.    消息处理:Blot
所有的消息处理逻辑被封装在bolts里面。
Bolts可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤,从而也就需要经过很多Bolts。
Bolts可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream, 使用OutputCollector.emit来选择要发射的stream。
Bolts的主要方法是execute, 它以一个tuple作为输入,Bolts使用OutputCollector来发射tuple。
对于可靠性stream,Bolts必须要为它处理的每一个tuple调用OutputCollector的ack方法,以通知storm这个tuple被处理完成了。一般的流程是:Bolts处理一个输入tuple, 发射0个或者多个tuple, 然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。(可靠性api)

5.    消息分发策略: Stream groupings
定义一个Topology的其中一步是定义每个bolt接受什么样的流作为输入。stream grouping就是用来定义一个stream应该如果分配给Bolts上面的多个Tasks。
storm里面有6种类型的stream grouping:
1.    Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目相同。
2.    Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts。
3.    All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。
4.    Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
5.    Non Grouping: 不分组, 这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果。
6.    Direct Grouping: 直接分组,  这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过 TopologyContext来获取处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)
7.    自定义数据流组:可以通过实现backtype.storm.grouping.CustormStreamGrouping接口创建自定义数据流组,自己决定哪些bolt接收哪些元组。
6.    可靠性
storm保证每个tuple会被topology完整的执行。storm会追踪由每个spout tuple所产生的tuple树(一个bolt处理一个tuple之后可能会发射别的tuple从而可以形成树状结构), 并且跟踪这棵tuple树什么时候成功处理完。每个topology都有一个消息超时的设置, 如果storm在这个超时的时间内检测不到某个tuple树到底有没有执行成功, 那么topology会把这个tuple标记为执行失败,并且过一会会重新发射这个tuple。
为了利用storm的可靠性特性,在你发出一个新的tuple以及你完成处理一个tuple的时候你必须要通知storm。这一切是由 OutputCollector来完成的。通过它的emit方法来通知一个新的tuple产生了, 通过它的ack方法通知一个tuple处理完成了。
7.    任务:Tasks
每一个Spout和Bolt会被当作很多task在整个集群里面执行。每一个task对应到一个线程,而stream grouping则是定义怎么从一堆task发射tuple到另外一堆task。你可以调用TopologyBuilder.setSpout()和 TopBuilder.setBolt来设置并行度 — 也就是有多少个task。
8. 工作进程:workders
一个topology可能会在一个或者多个工作进程里面执行,每个工作进程执行整个topology的一部分。比如对于并行度是300的 topology来说,如果我们使用50个工作进程来执行,那么每个工作进程会处理其中的6个tasks(其实就是每个工作进程里面分配6个线程)。 storm会尽量均匀的工作分配给所有的工作进程。

四.    新的消息传输机制
在以前的版本里,Storm是依赖ZeroMQ做消息的传输,而0.9版本引入Netty作为传输层。
在其发展过程中发现 ZeroMQ(JZMQ) 存在一些问题,比如 ZeroMQ 本身是 C 实现的,所使用的内存不受 Java 的控制,我们没有办法通过-Xmx 参数来调节 Storm 的内存使用;ZeroMQ 对于 Storm 来说有点黑盒的感觉,因此 Storm无法获得一些需要的信息;因为 ZeroMQ 是非 Java 实现的,这使得 Storm 的安装过程复杂了一些。
相较于ZeroMQ,Netty本身为Java实现,对跨平台来说更方便;另外,Netty的性能也比ZeroMQ快;此外,这也是为将来要做的 worker 进程之间的认证授权机制铺路。
storm.messaging.transport: "backtype.storm.messaging.netty.Context"
如果你不喜欢ZeroMQ或者Netty,你也可以通过实现backtype.storm.messaging.IContext interface来使用自己的消息传输层。但必须满足一些条件:
•  消息发送方可以在连接建立之前发送消息,而不需要等连接建立起来,因为这时候消息接收方可能还没有运行起来。因此这就需要在消息传输的 Client 端有个 buffer,在连接没有建立之前把要发送的消息 buffer 起来。
•  在消息传输层,消息『最多只能发送一次』,因为在 Storm 层面有 ACK 机制来保证没有被发送成功的消息会被重发,如果传输层面自己再重发,会导致消息被发多次。
IContext 负责客户端和服务器端连接的建立,主要定义了四个方法:
•    prepare(Map stormConf) —  遵从 Storm 的风格定义的 prepare 方法,可以把 storm 的配置接收进来
•    term() — term 是 terminate 的意思,这个方法会在 worker 卸载这个消息传输插件的时候调用,我们实现的时候可以在这里释放占用的资源、链接之类的。
•    bind(String topologyId, int port) — 建立一个服务器端的连接
•    connect(String stormId, String host, int port) — 建立一个客户端的连接


六.其它:
在官方API中对以上Storm的工作只认为是基本功能,它具有强大的Transactional topologies和Distributed RPC,他们是从Storm的组件中抽象出来的特性。
Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算。DRPC的storm topology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流。
DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach","http://twitter.com");
客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数。实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函数调用流。每个函数调用被DRPC服务器标记了一个唯一的id。 这个topology然后计算结果,在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器,并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识)。DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。
 

Storm通过保证每个tuple至少被处理一次来提供可靠的数据处理。关于这一点就存在一个问题:“既然tuple可能会被重写发射(replay), 那么我们怎么在storm上面做统计个数之类的事情呢?storm有可能会重复计数吧?”
Storm 0.7.0引入了Transactional Topology, 它可以保证每个tuple”被且仅被处理一次”, 这样就可以实现一种非常准确,非常可扩展,并且高度容错方式来实现计数类应用。
像在我们这个应用中因为不考虑数据是否可靠,所以也就不担心会出现Tuple重发,也就是某一CI项指标在一个周期内被重复采集或计算告警。
当使用Transactional Topologies的时候, storm为你做下面这些事情:
1) 管理状态: Storm把所有实现Transactional Topologies所必须的状态保存在zookeeper里面。 这包括当前transaction id以及定义每个batch的一些元数据。
2) 协调事务: Storm帮你管理所有事情, 以帮你决定在任何一个时间点是该proccessing还是该committing。
3) 错误检测: Storm利用acking框架来高效地检测什么时候一个batch被成功处理了,被成功提交了,或者失败了。Storm然后会相应地replay对应的 batch。你不需要自己手动做任何acking或者anchoring。
4) 内置的批处理API: Storm在普通bolt之上包装了一层API来提供对tuple的批处理支持。Storm管理所有的协调工作,包括决定什么时候一个bolt接收到一个 特定transaction的所有tuple。Storm同时也会自动清理每个transaction所产生的中间数据。
5) 最后,需要注意的一点是Transactional Topologies需要一个可以完全重发(replay)一个特定batch的消息的队列系统(Message Queue)。
一个简单例子:
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(
           DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(
           "global-count", "spout", spout, 3);
builder.setBolt("partial-count", new BatchCount(), 5).shuffleGrouping("spout");
builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
它的作用是计算输入流里面的tuple的个数。这段代码来自storm-starter里面的TransactionalGlobalCount

相关推荐

pfjia / 0评论 2013-08-02