`
woodding2008
  • 浏览: 284916 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Storm 反压机制

 
阅读更多

反压机制

       Storm的反压机制不成熟直接带来的后果是洪峰流量或者流量预估不准确导致任务的worker OOM,频繁漂移。Storm1.0版本已经使用新的反压机制,社区解决方案:https://issues.apache.org/jira/browse/STORM-886

https://github.com/apache/storm/pull/700 

 

 

 

 

反压过程

  • worker executor的接收队列大于高水位,通知反压线程 
  • worker反压线程通知zookeeper,executor繁忙事件 
  • 所有worker监听zookeeper executor繁忙的事件 
  • worker spouts降低发送tuple速度 

 

storm 1.0以前的反压

 

   Spout tuples 不使用message id, TOPOLOGY_MAX_SPOUT_PENDING是不生效的。

public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending"; 
public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = ConfigValidation.IntegerValidator;

The maximum number of tuples that can be pending on a spout task at any given time. 
This config applies to individual tasks, not to spouts or topologies as a whole. 

A pending tuple is one that has been emitted from a spout but has not been acked or failed yet.
Note that this config parameter has no effect for unreliable spouts that don't tag their tuples with a message id.



   spout执行nextTupe逻辑

 

(fn []
          ;; This design requires that spouts be non-blocking
          (disruptor/consume-batch receive-queue event-handler) ;;从recieve-queue取出batch tuples, 并使用tuple-action-fn处理
          
          ;; try to clear the overflow-buffer, 将overflow-buffer里面的数据放到发送的缓存queue里面
          (try-cause
            (while (not (.isEmpty overflow-buffer))
              (let [[out-task out-tuple] (.peek overflow-buffer)]
                (transfer-fn out-task out-tuple false nil)
                (.removeFirst overflow-buffer)))
          (catch InsufficientCapacityException e
            ))
          
          (let [active? @(:storm-active-atom executor-data)
                curr-count (.get emitted-count)]
            (if (and (.isEmpty overflow-buffer)  ;;只有当overflow-buffer为空, 并且pending没有达到上限的时候, spout可以继续emit tuple
                     (or (not max-spout-pending)
                         (< (.size pending) max-spout-pending)))
              (if active?  ;;storm集群是否active
                (do  ;;storm active
                  (when-not @last-active  ;;如果当前spout出于unactive状态
                    (reset! last-active true)
                    (log-message "Activating spout " component-id ":" (keys task-datas))
                    (fast-list-iter [^ISpout spout spouts] (.activate spout))) ;;先active spout
               
                  (fast-list-iter [^ISpout spout spouts] (.nextTuple spout))) ;;调用nextTuple,产生新的tuple
                (do ;;storm unactive
                  (when @last-active ;;如果spout出于active状态
                    (reset! last-active false)
                    (log-message "Deactivating spout " component-id ":" (keys task-datas))
                    (fast-list-iter [^ISpout spout spouts] (.deactivate spout))) ;;deactive spout并休眠
                  ;; TODO: log that it's getting throttled
                  (Time/sleep 100))))
            (if (and (= curr-count (.get emitted-count)) active?) ;;没有能够emit新的tuple(前后emitted-count没有变化)
              (do (.increment empty-emit-streak)
                  (.emptyEmit spout-wait-strategy (.get empty-emit-streak))) ;;调用spout-wait-strategy进行sleep
              (.set empty-emit-streak 0)
              ))           
          0)) ;;返回0, 表示async-loop的sleep时间为0
      :kill-fn (:report-error-and-die executor-data)
      :factory? true
      :thread-name component-id)]))
 tuple pending的个数是有限制
p*num-tasks 
p是TOPOLOGY-MAX-SPOUT-PENDING, num-tasks是spout的task数

max-spout-pending (executor-max-spout-pending storm-conf (count task-datas))
(defn executor-max-spout-pending [storm-conf num-tasks]
  (let [p (storm-conf TOPOLOGY-MAX-SPOUT-PENDING)]
    (if p (* p num-tasks))))
 

 

反压不成熟带来的问题

fieldsGrouping不合理或者洪峰流量,bolt接收队列暴涨导致OOM,完善反压后可以解决这个问题。

 

 

 

 

扩展阅读 http://www.cnblogs.com/fxjwind/p/3238648.html

  • 大小: 199.4 KB
  • 大小: 92.5 KB
  • 大小: 112.7 KB
分享到:
评论

相关推荐

    论文研究-Storm流式计算框架反压机制研究.pdf

    针对该问题,提出了一种能够灵活调节Topology中各环节数据负载的反压机制,该机制采用可变队列,并根据当前Tuple负载动态调整队列大小,以适应数据负载的动态变化,并提升系统吞吐量。实验结果表明,该反压机制能够...

    Storm Acker机制

    Storm,是于90年代突然崛起,颇受年轻消费者追崇的,一个专注于时尚生活的品牌,主要产品集中在时尚腕表、珠宝首饰、箱包、雨伞以及香水等。她是由其创始人Steve Sun于1989年创立。 自诞生之日起,Storm就以独特的...

    Apache Storm Buffer内部机制简介Prezi幻灯片

    根据Storm官方文档以及LMAX的介绍,使用Prezi工具制作了Storm内部Buffer的简要介绍,没有过多的解释,需要配合网上的其他文档及资源来理解Storm内部机制。

    storm on yarn概念架构消息机制概述

    storm on yarn概念架构消息机制概述 包括storm job跟mapreduce job对比 storm on yarn架构图 storm关键概念描述 storm消息机制介绍

    Storm入门教程 之Storm原理和概念详解

    Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群、Zookeeper集群等,涉及项目:网站PV、UV案例实战、其他案例; Storm视频教程亮点: 1、Storm...

    storm-ui:Apache Storm 的用户界面

    主分支: ##包裹包战 mvn clean package -DskipTests=true -Dwarcp ./target/storm-ui.war $TOMCAT_HOME/webapps/包装罐 mvn clean package -DskipTests=truecp ./target/storm-ui-*.jar $STORM_HOME/external/...

    细细品味Storm_Storm简介及安装

    第8章探讨Lambda体系结构的实现方法,讲解如何 将批处理机制和实时处理引擎结合起来构建一个可 纠错的分析系统;第9章讲解如何将Pig脚本转化为 topology,并且使用Storm-YARN部署topology,从 而将批处理系统转化为...

    Storm.Applied.Strategies.for.real-time.event.processing

    Storm Applied is a practical guide to using Apache Storm for the real-world tasks associated with processing and analyzing real-time data streams. This immediately useful book starts by building a ...

    storm入门.pdf

    storm的入门,东西很不错!看完就算是基本入门啦!!还等什么?

    Apache Storm(apache-storm-2.3.0.tar.gz)

    Apache Storm(apache-storm-2.3.0.tar.gz) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何编程语言...

    storm利用ack保证数据的可靠性源码

    storm利用ack保证数据的可靠性,发送失败时进行重发,保证数据不丢失。

    传智播客Storm项目实战课程 Storm的集群搭建实战 Storm项目学习视频教程

    01-storm简介 02-storm部署-1 03-storm部署-2 04-storm部署概念 05-streamgrouping 06-storm组件生命周期 07-storm可靠性1 08-storm可靠性2

    03_storm.zip

    【Storm篇】--Storm中的同步服务DRPC 【Storm篇】--Storm从初始到分布式搭建 ...【Storm篇】--Storm 容错机制 【Storm篇】--Storm并发机制 【Storm篇】--Storm分组策略 【Storm篇】--Storm基础概念

    Apache Storm(apache-storm-2.3.0-src.tar.gz 源码)

    Apache Storm(apache-storm-2.3.0-src.tar.gz 源码) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与...

    storm.学习资料和代码

    获取到文件名称 : apache-storm-0.9.2-incubating.tar.gz 获取到文件名称 : Learning Storm [eBook].pdf 获取到文件名称 : Storm Blueprints.Patterns.pdf 获取到文件名称 : storm01.rar 获取到文件名称 : storm...

    Storm实战构建大数据实时计算

    Storm官方网站有段简介 Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop批量处理大数据一样,Storm可以实时处理数据。Storm简单,可以使用任何编程语言。

    Storm实时数据处理

    Storm实时数据处理

    IP-Storm!0[1].03IP-Storm!0[1].03

    IP-Storm!0[1].03IP-Storm!0[1].03IP-Storm!0[1].03IP-Storm!0[1].03IP-Storm!0[1].03IP-Storm!0[1].03IP-Storm!0[1].03IP-Storm!0[1].03IP-Storm!0[1].03IP-Storm!0[1].03IP-Storm!0[1].03

Global site tag (gtag.js) - Google Analytics