请选择 进入手机版 | 继续访问电脑版

登录  | 立即注册

游客您好!登录后享受更多精彩

查看: 3135|回复: 8

源码分析 Sentinel 实时数据采集实现原理

[复制链接]

17

主题

34

帖子

418

积分

中级会员

Rank: 3Rank: 3

积分
418
发表于 2020-1-21 13:45:00 | 显示全部楼层 |阅读模式
本篇将重点关注 Sentienl 实时数据收集,即 Sentienl 具体是如何收集调用信息,以此来判断是否需要触发限流或熔断。

本节目录


Sentienl 实时数据收集的入口类为 StatisticSlot。
我们先简单来看一下 StatisticSlot 该类的注释,来看一下该类的整体定位。
StatisticSlot,专用于实时统计的 slot。在进入一个资源时,在执行 Sentienl 的处理链条中会进入到该 slot 中,需要完成如下计算任务:

  • 集群维度计算资源的总统计信息,用于集群限流,后续文章将详细探讨。
  • 来自不同调用方/来源的群集节点的统计信息。
  • 特定调用上下文环境的统计信息。
  • 统计所有入口的统计信息。
接下来用源码分析的手段来详细分析 StatisticSlot 的实现原理。
1、源码分析 StatisticSlot

1.1 StatisticSlot entry 详解

StatisticSlot#entry
  1. public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {         try {                // Do some checking.                fireEntry(context, resourceWrapper, node, count, prioritized, args);  // @1                // Request passed, add thread count and pass count.                node.increaseThreadNum();                                                             // @2                       node.addPassRequest(count);                if (context.getCurEntry().getOriginNode() != null) {                           // @3                        // Add count for origin node.                            context.getCurEntry().getOriginNode().increaseThreadNum();                            context.getCurEntry().getOriginNode().addPassRequest(count);                        }                if (resourceWrapper.getEntryType() == EntryType.IN) {                // @4                        // Add count for global inbound entry node for global statistics.                            Constants.ENTRY_NODE.increaseThreadNum();                            Constants.ENTRY_NODE.addPassRequest(count);                }                // Handle pass event with registered entry callback handlers.                for (ProcessorSlotEntryCallback handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {   // @5                            handler.onPass(context, resourceWrapper, node, count, args);                }            } catch (PriorityWaitException ex) {                                                                                                                                // @6                node.increaseThreadNum();                if (context.getCurEntry().getOriginNode() != null) {                        // Add count for origin node.                            context.getCurEntry().getOriginNode().increaseThreadNum();                        }                if (resourceWrapper.getEntryType() == EntryType.IN) {                        // Add count for global inbound entry node for global statistics.                            Constants.ENTRY_NODE.increaseThreadNum();                }                // Handle pass event with registered entry callback handlers.                for (ProcessorSlotEntryCallback handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {                            handler.onPass(context, resourceWrapper, node, count, args);                }            } catch (BlockException e) {     // @7                                                                                                                              // Blocked, set block exception to current entry.                context.getCurEntry().setError(e);                // Add block count.                node.increaseBlockQps(count);                if (context.getCurEntry().getOriginNode() != null) {                            context.getCurEntry().getOriginNode().increaseBlockQps(count);                }                if (resourceWrapper.getEntryType() == EntryType.IN) {                            // Add count for global inbound entry node for global statistics.                            Constants.ENTRY_NODE.increaseBlockQps(count);                }                // Handle block event with registered entry callback handlers.                for (ProcessorSlotEntryCallback handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {                            handler.onBlocked(e, context, resourceWrapper, node, count, args);                }                throw e;            } catch (Throwable e) {   // @8                // Unexpected error, set error to current entry.                context.getCurEntry().setError(e);                // This should not happen.                node.increaseExceptionQps(count);                if (context.getCurEntry().getOriginNode() != null) {                            context.getCurEntry().getOriginNode().increaseExceptionQps(count);                }                if (resourceWrapper.getEntryType() == EntryType.IN) {                            Constants.ENTRY_NODE.increaseExceptionQps(count);                }                throw e;            }}
复制代码
代码@1:首先调用 fireEntry,先调用 Sentinel Slot Chain 中其他的处理器,执行完其他处理器的逻辑,例如 FlowSlot、DegradeSlot,因为 StatisticSlot 的职责是收集统计信息。
代码@2:如果后续处理器成功执行,则将正在执行线程数统计指标加一,并将通过的请求数量指标增加对应的值。下文会对 Sentinel Node 体系进行详细的介绍,在 Sentinel 中使用 Node 来表示调用链中的某一个节点,每个节点关联一个资源,资源的实时统计信息就存储在 Node 中,故该部分也是调用 DefaultNode 的相关方法来改变线程数等,将在下文会向详细介绍。
代码@3:如果上下文环境中保存了调用的源头(调用方)的节点信息不为空,则更新该节点的统计数据:线程数与通过数量。
代码@4:如果资源的进入类型为 EntryType.IN,表示入站流量,更新入站全局统计数据(集群范围 ClusterNode)。
代码@5:执行注册的进入Handler,可以通过 StatisticSlotCallbackRegistry 的 addEntryCallback 注册相关监听器。
代码@6:如果捕获到 PriorityWaitException ,则认为是等待过一定时间,但最终还是算通过,只需增加线程的个数,但无需增加节点通过的数量,具体原因我们在详细分析限流部分时会重点讨论,也会再次阐述 PriorityWaitException 的含义。
代码@7:如果捕获到 BlockException,则主要增加阻塞的数量。
代码@8:如果是系统异常,则增加异常数量。
我想上面的代码应该不难理解,但涉及到统计指标数据的变化,都是调用 DefaultNode node 相关的方法,从这里也可以看出,Node 将是实时统计数据的直接持有者,那毋容置疑接下来将重点来学习 Node,为了知识体系的完备性,我们先来看一下 StatisticSlot 的 exit 方法。
1.2 StatisticSlot exit 详解

StatisticSlot#exit
  1. public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {        DefaultNode node = (DefaultNode)context.getCurNode();        if (context.getCurEntry().getError() == null) {         // @1                // Calculate response time (max RT is TIME_DROP_VALVE).                long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();                if (rt > Constants.TIME_DROP_VALVE) {                        rt = Constants.TIME_DROP_VALVE;                }                // Record response time and success count.                node.addRtAndSuccess(rt, count);                if (context.getCurEntry().getOriginNode() != null) {                        context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);                       }        node.decreaseThreadNum();               if (context.getCurEntry().getOriginNode() != null) {                    context.getCurEntry().getOriginNode().decreaseThreadNum();                }               if (resourceWrapper.getEntryType() == EntryType.IN) {                   Constants.ENTRY_NODE.addRtAndSuccess(rt, count);                   Constants.ENTRY_NODE.decreaseThreadNum();               }        } else {            // Error may happen.        }        // Handle exit event with registered exit callback handlers.        Collection exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();        for (ProcessorSlotExitCallback handler : exitCallbacks) {      // @2                handler.onExit(context, resourceWrapper, count, args);         }        fireExit(context, resourceWrapper, count);     // @3}
复制代码
代码@1:成功执行,则重点关注响应时间,其实现亮点如下:
计算本次响应时间,将本次响应时间收集到 Node 中。
将当前活跃线程数减一。
代码@2:执行退出时的 callback。可以通过 StatisticSlotCallbackRegistry 的 addExitCallback 方法添加退出回调函数。
代码@3:传播 exit 事件。
接下来我们将重点介绍 DefaultNode,即 Sentinel 的 Node 体系,持有资源的实时调用信息。
2、Sentienl Node 体系

2.1 Node 类体系图

我们先简单介绍一下上述核心类的作用与核心接口或核心属性的含义。

  • OccupySupport
    支持抢占未来的时间窗口,有点类似借用“未来”的令牌。其核心方法如下:

    • long tryOccupyNext(long currentTime, int acquireCount, double threshold)
      尝试抢占未来的令牌,返回值为调用该方法的线程应该 sleep 的时间。
      1、long currentTime
      当前时间。
      2、int acquireCount
      本次需要申请的令牌个数。
      3、double threshold
      设置的阔值。

  • long waiting()
    获取当前已申请的未来的令牌的个数。
  • void addWaitingRequest(long futureTime, int acquireCount)
    申请未来时间窗口中的令牌。
  • void addOccupiedPass(int acquireCount)
    增加申请未来令牌通过的个数。
  • double occupiedPassQps()
    当前抢占未来令牌的QPS。
  • Node
    持有实时统计信息的节点。定义了收集统计信息与获取统计信息的接口,上面方法根据方法名称即可得知其含义,故这里就不一一罗列了。
  • StatisticNode
    实现统计信息的默认实现类。
  • DefaultNode
    用于在特定上下文环境中保存某一个资源的实时统计信息。
  • ClusterNode
    实现基于集群限流模式的节点,将在集群限流模式部分详细介绍。
  • EntranceNode
    用来表示调用链入口的节点信息。
本文将详细介绍 DefaultNode 与  StatisticNode,重点阐述调用树与实时统计信息。DefaultNode 是 StatisticNode 的子类,我们先从 StatisticNode 开始 Node 体系的探究。
2、StatisticNode 详解

2.1 核心类图


我们对其核心属性进行一一解读:

  • Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL)
    每秒的实时统计信息,使用 ArrayMetric 实现,即基于滑动窗口实现,正是上篇文章详细介绍的,默认1s 采样 2次。即一个统计周期中包含两个滑动窗口。
  • Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false)
    每分钟实时统计信息,同样使用 ArrayMetric  实现,即基于滑动窗口实现。每1分钟,抽样60次,即包含60个滑动窗口,每一个窗口的时间间隔为 1s 。
  • LongAdder curThreadNum = new LongAdder()
    当前线程计数器。
  • long lastFetchTime = -1
    上一次获取资源的有效统计数据的时间,即调用 Node 的 metrics() 方法的时间。
关于 ArrayMetric 滑动窗口设计与实现原理,请参考笔者的另一篇博文:Alibaba Seninel 滑动窗口实现原理(文末附原理图)
接下来我们挑选几个具有代表性的方法进行探究。
2.2 addPassRequest
  1. public void addPassRequest(int count) {        rollingCounterInSecond.addPass(count);        rollingCounterInMinute.addPass(count);}
复制代码
增加通过请求数量。即将实时调用信息向滑动窗口中进行统计。addPassRequest 即报告成功的通过数量。就是分别调用 秒级、分钟即对应的滑动窗口中添加数量,然后限流规则、熔断规则将基于滑动窗口中的值进行计算。
2.3 totalRequest
  1. public long totalRequest() {        return rollingCounterInMinute.pass() + rollingCounterInMinute.block();}
复制代码
获取当前时间戳的总请求数,获取分钟级时间窗口中的统计信息。
2.4 successQps
  1. public double successQps() {        return rollingCounterInSecond.success() / rollingCounterInSecond.getWindowIntervalInSec();}
复制代码
成功TPS,用秒级统计滑动窗口中统计的个数 除以 窗口的间隔得出其 tps,即抽样个数越大,其统计越精确。
温馨提示:上面的方法在学习了上文的滑动窗口设计原理后将显得非常简单,大家在学习的过程中,可以总结出一个规律,什么时候时候使用秒级滑动窗口,什么时候使用分钟级滑动窗口。
2.5 metrics

由于 Sentienl 基于滑动窗口来实时收集统计信息,并存储在内存中,并随着时间的推移,旧的滑动窗口将失效,故需要提供一个方法,及时将所有的统计信息进行汇总输出,供监控客户端定时拉取,转储都其他客户端,例如数据库,方便监控数据的可视化,这也通常是中间件用于监控指标的监控与采集的通用设计方法。
  1. public Map metrics() {    long currentTime = TimeUtil.currentTimeMillis();    currentTime = currentTime - currentTime % 1000;   // @1    Map metrics = new ConcurrentHashMap();    List nodesOfEverySecond = rollingCounterInMinute.details();   // @2    long newLastFetchTime = lastFetchTime;    // Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date).    for (MetricNode node : nodesOfEverySecond) {         if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) {    // @3            metrics.put(node.getTimestamp(), node);            newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp());        }    }    lastFetchTime = newLastFetchTime;    return metrics;}
复制代码
代码@1:获取当前时间对应的滑动窗口的开始时间,可以对比上文计算滑动窗口的算法。
代码@2:获取一分钟内的所有滑动窗口中的统计数据,使用 MetricNode 表示。
代码@3:遍历所有节点,刷选出不是当前滑动窗口外的所有数据。这里的重点是方法:isNodeInTime。
  1. private boolean isNodeInTime(MetricNode node, long currentTime) {    return node.getTimestamp() > lastFetchTime && node.getTimestamp() < currentTime;}
复制代码
这里只刷选出不是当前窗口的数据,即 metrics 方法返回的是“过去”的统计数据。
接下来我们再来看看 DefaultNode 相关的几个特性方法。
3、DefaultNode  详解

3.1 类图


DefaultNode 是 StatisticNode 的子类,其额外增加的属性如下:

  • private ResourceWrapper id
    资源id,即 DefaultNode 才真正与资源挂钩,可以将 DefaultNode 看出是调用链中的一个节点,并且与资源关联。
  • private volatile Set< Node > childList
    子节点结合。以此来维持其调用链。
  • private ClusterNode clusterNode
    集群节点,同样为 StatisticNode 的子类,表示与资源集群相关的环境。
接下来我们将来看一下 DefaultNode 的核心方法。
3.2 increaseBlockQps
  1. public void increaseBlockQps(int count) {    super.increaseBlockQps(count);    this.clusterNode.increaseBlockQps(count);}
复制代码
DefaultNode 的此类方法,通常是先调用 StatisticNode 的方法,然后再调用 clusterNode 的相关方法,最终就是使用在对应的滑动窗口中增加或减少计量值。
其他方法也比较简单,就不再细看了,我们可以通过 DefaultNode 的 printDefaultNode 方法来打印该节点的调用链。
本文就介绍到这里了,本文详细介绍了 Sentinel 实时数据收集的统一入口 StatisticSlot,并且介绍了 Seninel Node 体系,即调用链中的每一个节点,每一个节点对一个资源的实时统计信息。下一篇将开始重点限流是如何实现的,即 FlowSlot 的实现技巧。
作者信息:丁威,《RocketMQ技术内幕》作者,目前担任中通科技技术平台部资深架构师,维护 中间件兴趣圈公众号,目前主要发表了源码阅读java集合、JUC(java并发包)、Netty、ElasticJob、Mycat、Dubbo、RocketMQ、mybaits等系列源码。点击链接:加入笔者的知识星球,一起探讨高并发、分布式服务架构,分享阅读源码心得。
                                                                                                                                       
                                                    
  • 点赞                        
  • 收藏                        
  • 分享                                                                                                                        
  •                                                         
                                      
    • 文章举报                           
                                                
                                                                        
                                            
                                                        唯有坚持不懈                                                                                    博客专家                                                                                            发布了198 篇原创文章 · 获赞 332 · 访问量 43万+                                                                                            他的留言板                                                            关注
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x




上一篇:分库分表理论篇&mdash;&mdash; 以MySQL为例
下一篇:一个秒杀系统的结构设计
回复

使用道具 举报

0

主题

34

帖子

724

积分

高级会员

Rank: 4

积分
724
发表于 2020-1-24 17:21:13 | 显示全部楼层
楼主发贴辛苦了,谢谢楼主分享![www.12360.co]
回复

使用道具 举报

0

主题

16

帖子

346

积分

中级会员

Rank: 3Rank: 3

积分
346
发表于 2020-1-25 04:37:51 | 显示全部楼层
其实我一直觉得楼主的品味不错!呵呵![www.12360.co]
回复

使用道具 举报

0

主题

28

帖子

598

积分

高级会员

Rank: 4

积分
598
发表于 2020-1-25 13:17:36 | 显示全部楼层
楼主太厉害了!楼主,I*老*虎*U![www.12360.co]
回复

使用道具 举报

0

主题

12

帖子

262

积分

中级会员

Rank: 3Rank: 3

积分
262
发表于 2020-2-19 15:18:45 | 显示全部楼层
楼主,我太崇拜你了![www.12360.co]
社区不能没有像楼主这样的人才啊!
回复

使用道具 举报

12

主题

33

帖子

487

积分

中级会员

Rank: 3Rank: 3

积分
487
发表于 2020-2-21 19:29:52 | 显示全部楼层
感谢楼主的无私分享![www.12360.co]
回复

使用道具 举报

0

主题

37

帖子

787

积分

高级会员

Rank: 4

积分
787
发表于 2020-3-2 16:18:38 | 显示全部楼层
楼主,大恩不言谢了![www.12360.co]
回复

使用道具 举报

0

主题

26

帖子

556

积分

高级会员

Rank: 4

积分
556
发表于 2020-3-26 11:37:53 | 显示全部楼层
这个帖子不回对不起自己![www.12360.co]
回复

使用道具 举报

0

主题

31

帖子

661

积分

高级会员

Rank: 4

积分
661
发表于 7 天前 | 显示全部楼层
我看不错噢 谢谢楼主![www.12360.co]
回复

使用道具 举报

懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

动物之森

GMT+8, 2020-4-6 00:11 , Processed in 0.109513 second(s), 27 queries .

www.12360.co 集合吧!动物之森

Copyright © 2019-2020.

快速回复 返回顶部 返回列表