`
woodding2008
  • 浏览: 285212 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Storm Spout nextTuple策略

 
阅读更多

 

      Storm从0.8.1之后,在Spout调用nextTuple方法时,如果没有emit tuple,那么默认需要休眠1ms,这个具体的策略是可配置的,因此可以根据自己的具体场景,进行设置,以达到合理利用cpu资源。

      

       ISpoutWaitStrategy是Spout没有emit时等待策略的接口,目的是合理利用Cpu,默认提供了2个实现,一个什么也没做,一个是sleep 1毫秒,我们可以自己来实现这个接口。

 

storm策略配置

topology.spout.wait.strategy "backtype.storm.spout.SleepSpoutWaitStrategy"

topology.sleep.spout.wait.strategy.time.ms       1   

 

 

ISpoutWaitStrategy接口

 

/**
 * The strategy a spout needs to use when its waiting. Waiting is
 * triggered in one of two conditions:
 * 
 * 1. nextTuple emits no tuples
 * 2. The spout has hit maxSpoutPending and can't emit any more tuples
 * 
 * The default strategy sleeps for one millisecond.
 */
public interface ISpoutWaitStrategy {
    void prepare(Map conf);
    void emptyEmit(long streak);
}
 

 

SleepSpoutWaitStrategy实现

 

public class SleepSpoutWaitStrategy implements ISpoutWaitStrategy {

    long sleepMillis;
    
    @Override
    public void prepare(Map conf) {
        sleepMillis = ((Number) conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue();
    }

    @Override
    public void emptyEmit(long streak) {
        try {
            Thread.sleep(sleepMillis);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
 

 

 

 NothingEmptyEmitStrategy实现

public class NothingEmptyEmitStrategy implements ISpoutWaitStrategy {
    @Override
    public void emptyEmit(long streak) {        
    }

    @Override
    public void prepare(Map conf) {
        throw new UnsupportedOperationException("Not supported yet.");
    }
}

 

 扩展阅读 http://www.cnblogs.com/fxjwind/p/3238648.html

分享到:
评论

相关推荐

    Storm中spout和bolt之间发送和接收数据的java源代码实例

    Storm中spout和bolt之间发送和接收数据的java源代码实例

    Spout/Blot编程实例实例详解

    我们知道hadoop有mapreduce编程模型,那么与之对应的storm的编程模型是什么那,...Spout是Storm的Topoloy的入口,数据流都是从Spout进入topology来进行处理。他负责从外部接收数据,处理,然后发射出去。每个Topolog

    vortex-storm:一个Apache Storm Spout和Bolt,用于使用和生成Vortex数据

    涡旋风暴 一个 和用于使用和生成数据 建筑 mvn install -P cafe-build 构建 为了构建Vortex OpenSplice,必须定义OSPL_HOME环境变量。 mvn install -P ospl-build 涡流概述 PrismTech的Vortex智能数据共享平台提供...

    Flume+kafka+Storm整合

    Flume+kafka+Storm整合 示例简介: 以下为三个组建整合,这里只做操作也演示结果,原理性方面大家...流程顺序是flume获取telnet数据,将接收到的数据发送至kafak,kafka作为Storm的spout,Storm进行有向无环分析数据。

    netty-storm:使用 SSL (TLS) 在 Netty 客户端和 Apache Storm 之间进行集成。 Netty Spout 和 Netty Producer

    此存储库中的两个 Maven 项目: [netty-spout] 能够处理来自 Netty 客户端的直接连接的 Storm Spout 的实现。 通信使用 TLS。 在此项目的 test 文件夹中,您可以找到用于测试目的的示例 Netty 拓扑。 运行方法:将...

    第一个Storm应用

    写第一个Storm应用--数单词数量(一个spout读取文本,第一个bolt用来标准化单词,第二个bolt为单词计数) 一、Storm运行模式: 1.本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解) 运行在本地...

    storm demo

    代码参考传智播客课程编写,演示了如何使用storm的spout,bolt,Topology

    Storm Real-time Processing Cookbook实例代码

    Storm is a distributed, reliable, fault-tolerant system for processing streams of data. The work is delegated to different types of components that are each responsible for a simple specific ...

    Getting Started with Storm

    The input stream of a Storm cluster is handled by a component called a spout. The spout passes the data to a component called a bolt, which transforms it in some way. A bolt either persists the data ...

    Storm Real-time Processing Cookbook

    Storm is a distributed, reliable, fault-tolerant system for processing streams of data. The work is delegated to different types of components that are each responsible for a simple specific ...

    漫谈大数据第四期-storm

    Storm的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被处理的数据。Sprout是数据源。Bolt处理数据。Task是运行于Spout或Bolt中的 线程。Worker是运行这些线程的进程。Stream ...

    kinesis-storm-spout:风暴的Kinesis壶嘴

    Amazon Kinesis Storm喷口可帮助Java开发人员将与集成。 要求 或更高版本 3.0或更高版本 13.0或更高版本 当然,还有和 概述 Amazon Kinesis Storm喷口从Amazon Kinesis获取数据记录,并将其作为元组发出。 喷口将...

    storm记录级容错.docx

    storm允许用户在spout中发射一个新的源tuple时为其指定一个message id, 这个message id可以是任意的object对象。多个源tuple可以共用一个message id,表示这多个源 tuple对用户来说是同一个消息单元。storm中记录级...

    spout, 以一种快速且可以扩展的方式,读取和写入电子表格文件( CSV,XLSX和 ODS ),.zip

    spout, 以一种快速且可以扩展的方式,读取和写入电子表格文件( CSV,XLSX和 ODS ), 喷嘴 ,是一个PHP库,用于读取和写入电子表格文件( 。CSV,XLSX和 ODS ),以快速而可以伸缩的方式。 其他文件阅读器或者编写器...

    Storm Kafka Integration架包

    Storm Kafka Integration架包,包含storm.kafka.KafkaSpout、import、storm.kafka.SpoutConfig、import storm.kafka.StringScheme、import storm.kafka.ZkHosts等

    storm-nginx-log:基于Kafka、Storm的nginx日志监控,采用Apache Flume收集日志

    集群版(cluster branch) : 项目基于Kafka storm的实时nginx日志监控,将nginx的日志文件access.log读取并放入Kafka队列中,Storm的Spout来对接Kafka消息队列,来收集nginx服务器的状态,并在一定时间内,统计访问ip...

    Spout是一个PHP库可以快速可扩展的方式读写电子表格文件CSVXLSX和ODS

    Spout是一个PHP库,可以快速,可扩展的方式读写电子表格文件(CSV,XLSX和ODS)。 与其他文件读写器相反,它能够处理非常大的文件,同时保持内存使用率非常低(小于3MB)。

Global site tag (gtag.js) - Google Analytics