`
woodding2008
  • 浏览: 285345 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Storm消息可靠处理

阅读更多

        一个消息(tuple)从spout发送出来,可能导致成百上千消息基于此消息创建,这些消息构成一个树状结构,称之为“tuple tree”,如下图:

      

      同时满足了下面两个条件,storm会认为消息被完整的处理了,如果在指定的时间内,一个消息衍生出来的tuple tree未被完全处理成功,则认为消息未被完整处理。这个超时时间可以通过任务参数Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS进行配置,默认是30秒。

  • tuple tree 不再生长
  • tuple tree 中的任何消息被标记为"已处理"

消息的生命周期

     ISpout接口

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void activate(); 
    void deactivate();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

 

       首先,使用spout的nextTuple()方法从spout请求一个tuple。收到请求后,spout使用open方法中提供的SpoutOutputCollector向它的输出流发送一个或多个消息。每发送一个消息,spout会给这个消息提供一个id,它将被用来标识这个消息。

       假设我们从kestrel队列中读取消息,spout会将kestrel队列为这个消息设置的id作为此消息的id,向SpoutOutputCollector中发送的格式如下:

_collector.emit(new Values("field1","field2",3),msgId);

        接下来,这个消息会被发送到后续业务处理的bolts,并且strom会跟踪由此消息产生出来的新消息。当检测到一个消息衍生出来的tuple tree被完整处理后,storm会调用spout中的ack方法,并将此消息的id作为参数传入。同理,如果某消息处理超时,则此消息对应的spout的fail方法会被调用,调用时此消息的id会被作为参数传入。一个消息只会由发送它的哪个spout任务调用ack或fail,如果系统中的某个spout由多个任务运行,消息也只会由穿件它的spout任务来应答(ack或fail)。

        当KestrelSpout从kestrel队列中读取一个消息,表示它“打开”了队列中某个消息。这意味着,此消息并为从队列中真正删除,而是被设置为“pending”状态,它等待来自客户端的应答,被应答以后,此消息才会被真正的从队列中删除。处于pending状态的消息不会被其他客户端看到。另外,如果一个客户端意外断开连接,则由此客户端“打开”的所有消息都会被重新加入到队列中。当消息被“打开”的时候,kestrel队列同时会为这个消息提供一个唯一表示。

       KestrelSpout使用这个唯一的标识作为这个tuple的id,当ack或fail被调用时,KestrelSpout会把ack或者fail连同id一起发送给kestrel队列,kestrel会将消息从队列中真正删除或者将它重新放回队列中。

 

可靠相关的API

      为了使用Storm提供的可靠处理特性,我们需要做两件事情。

  • 只要在tuple tree中创建了一个新节点,就要明确地通知storm
  • 当处理完一个单独的消息时,告知storm这棵tuple tree的变化状态

       为tuple tree中指定的节点增加一个新的节点,我们称之为锚定(anchoring)。锚定是在我们发送消息的同时进行的。bolt将包含整句话的消息分解为一系列的消息,每个消息包含一个单词。

public class WordBolt extends BaseRichBolt {
	OutputCollector _collector;

	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		_collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		String value = tuple.getString(0);
		for(String word : value.split(" ")){
			//锚定方式发送消息
			_collector.emit(tuple,new Values(word));
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
}

        每个消息都通过这种方式被锚定,把输入消息作为emit方法的第一个参数。因为word消息被锚定在了输入消息上,这个输入消息时发送过来的tuple tree的跟节点,如果任意一个word消息处理失败,派生这个tuple tree的哪个消息将会重新发送。

        与此相反,如果以这种方式发送消息,将会导致这个消息不会被锚定。如果tuple tree中的消息处理失败,派生此tuple tree的消息不会被重新发送,根据任务的容错级别,有时候适合发送一个非锚定的消息。_collector.emit(new Values(word))

       很多Bolt遵循特定的处理流程,读取一个消息,发送它派生出来的子消息,在execute结尾处应答此消息,storm有一个BasicBolt接口封装了上述流程,使用这种方式比之前稍微简单一些,实现功能是一样的。发送到BasicOutputCollector的消息会被自动锚定到输入消息中,并且,当execute执行完毕时,会自动应答输入消息。

//BaseRichBolt与BaseBasicBolt
public class WordBolt extends BaseBasicBolt {

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String value = tuple.getString(0);
		for(String word : value.split(" ")){
			//默认会锚定到tuple
			collector.emit(new Values(word));
		}
	}
}

 

高效地实现tuple tree

        Tuple是如何被跟踪的呢?系统中有成千上万的消息,如果为每个spout发送的消息都构建一棵树的话,很快内存就会耗尽。所以必须采用不同的策略跟踪每个消息。由于使用了新的跟踪算法,storm只需要固定的内存(大约20字节)就可以跟踪一棵树。这个算法是storm正确运行的核心,也是storm最大的突破。

         Acker任务记录了spout消息id到一对值的映射。第一个值就是spout的任务id,通过这个id,acker就知道消息处理完成是该通知哪个spout的任务。第二个值是一个64bit的数字,称之为ack val,他是树中所有消息的随机id的异或结果。ack val表示了整棵树的状态,无论这棵树多大,只要固定大小的数字就可以跟踪整棵树。当消息被创建和被应答是都会有相同的消息id发送过来做异或。

        每当acker发现一棵树的ack val值为0是,它就知道这棵树已经被完全处理了。

 

选择合适的可靠性级别

          Acker任务是轻量级的,所以在拓扑中并不需要太多的acker存在。可以通过Storm UI观察acker任务的吞吐量,如果吞吐量不够的话,说明需要增加额外的acker。如果你并不要求每个消息必须被处理,那么可以关闭消息的可靠处理机制,从而获取较好的性能。关闭消息的可靠处理机制意味着系统的消息会减半。

     有三种方法可以调整消息的可靠性处理机制。

  • 将参数Config.TOPOLOGY_ACKERS设置为0,通过此方法,当spout发送一个消息时,它的ack方法将立刻被调用。
  • Spout发送一个消息时,不指定此消息的id,当需要关闭特定消息可靠性是,可以使用此方法。
  • 如果你不在意派生出来的自消息的可靠性,则子消息不进行锚定,子消息失败不会引起任何spout重发消息。

 

 

      

  • 大小: 239.1 KB
分享到:
评论

相关推荐

    大数据-Storm实时数据处理

    此外,《大数据技术丛书:Storm实时数据处理》旨在围绕Storm技术促进DevOps实践,使读者能够开发Storm解决方案,同时可靠地交付有价值的产品。  《大数据技术丛书:Storm实时数据处理》适合想学习实时处理技术或者...

    《Storm实时数据处理》PDF.zip

    《storm实时数据处理》通过丰富的实例,系统讲解Storm的基础知识和实时数据处理的最佳实践...此外,Storm实时数据处理旨在围绕Storm技术促进DevOps实践,使读者能够开发Storm解决方案,同时可靠地交付有价值的产品。

    Storm如何保证可靠的消息处理

    Storm可以保证从Spout发出的每个消息都能被完全处理。Storm的可靠性机制是完全分布式的(distributed),可伸缩的(scalable),容错的(fault-tolerant)。本文介绍了Storm如何保证可靠性以及作为Storm使用者,我们需要...

    storm利用ack保证数据的可靠性源码

    storm利用ack保证数据的可靠性,发送失败时进行重发,保证数据不丢失。

    Storm实时数据处理.[澳]Quinton Anderson(带详细书签)

    本书中的每一个步骤都应用了成熟的开发和操作实践,确保你能够可靠地交付产品。 通过阅读本书,你将能够: 搭建你的开发环境并测试Strom集群。 处理数据流,包括基于规则的处理流程。 构建分布式远程过程调用。 交付...

    Apache Storm(apache-storm-2.3.0.tar.gz)

    Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何编程语言一起使用,而且使用起来非常有趣! Apache Storm 有很多用例:实时分析、在线机器...

    Storm的可靠性保证测试

    Storm是一个分布式的实时计算框架,可以很方便地对流式数据进行实时处理和分析,能运用在实时分析、在线数据挖掘、持续计算以及分布式RPC等场景下。...本文将通过实验验证Storm的消息可靠性保证机制,文章分为消息保

    storm讲义总结

    Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流。 Storm有很多使用场景:如实时分析,在线机器学习,持续计算,分布式RPC,ETL等等。 Storm支持水平扩展,具有高容错性,保证每个消息...

    Apache Storm(apache-storm-2.3.0-src.tar.gz 源码)

    Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何编程语言一起使用,而且使用起来非常有趣! Apache Storm 有很多用例:实时分析、在线机器...

    Storm实战构建大数据实时计算

    Storm官方网站有段简介 Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop批量处理大数据一样,Storm可以实时处理数据。Storm简单,可以使用任何编程语言。

    storm-可靠机制

    Storm的可靠性是指Storm会告知用户每一个消息单元是否在一个指定的时间(timeout)内被完全处理。完全处理的意思是该MessageId绑定的源Tuple以及由该源Tuple衍生的所有Tuple都经过了Topology中每一个应该到达的Bolt的...

    storm自学文档

    storm概念、基本概念、构建Topology、安装部署、消息的可靠处理

    Apache Storm(apache-storm-2.3.0.zip)

    Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何编程语言一起使用,而且使用起来非常有趣! Apache Storm 有很多用例:实时分析、在线机器...

    Apache Storm(apache-storm-2.3.0-src.zip 源码)

    Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何编程语言一起使用,而且使用起来非常有趣! Apache Storm 有很多用例:实时分析、在线机器...

    Storm实战:构建大数据实时计算

    第4章和第5章阐述了Storm的并发度、可靠处理的特性;第6章~第8章详细而系统地讲解了几个高级特性:事务、DRPC和Trident;第9章以实例的方式讲解了Storm在实际业务场景中的应用;第10章总结了几个在大数据场景应用...

    漫谈大数据第四期-storm

    Storm为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息并更新数据库。这是管理队列及工作者集群的另一种方式。 Storm也可被用于“连续计算”(continuous computation),对数据流做连续...

    longhai3395#BigData-Notes#Storm和流处理简介1

    1.1简介Storm 是一个开源的分布式实时计算框架,可以以简单、可靠的方式进行大数据流的处理。通常用于实时分析,在线机器学习、持续计算、分布式 RPC、ETL

    论文研究-基于Storm的流数据KNN分类算法的研究与实现.pdf

    考虑到流式数据流量大,连续且快速,不易存储和恢复等特性,以及流处理系统Storm对流数据处理具有实时性、可靠性的特点,提出了基于Storm的流数据KNN分类算法,该算法首先对整个样本集进行划分,形成多个片集,然后...

    实时可靠的开源分布式实时计算系统——Storm

    Storm是一个开源分布式实时计算系统,它可以实时可靠地处理流数据。在Storm出现之前,进行实时处理是非常痛苦的事情,我们主要的时间都花在关注往哪里发消息,从哪里接收消息,消息如何序列化,真正的业务逻辑只占了...

Global site tag (gtag.js) - Google Analytics