加入收藏 | 设为首页 | 会员中心 | 我要投稿 河北网 (https://www.hebeiwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 建站 > 正文

限流降级神器,带你解读阿里巴巴开源 Sentinel 实现原理

发布时间:2019-07-11 13:08:30 所属栏目:建站 来源:Java高级互联网架构
导读:Sentinel 是阿里中间件团队开源的,面向漫衍式处事架构的轻量级高可用流量节制组件,首要以流量为切入点,从流量节制、熔断降级、体系负载掩护等多个维度来辅佐用户掩护处事的不变性。 各人也许会问:Sentinel 和之前常用的熔断降级库 Netflix Hystrix 有
副问题[/!--empirenews.page--]

限流降级神器,带你解读阿里巴巴开源 Sentinel 实现道理

Sentinel 是阿里中间件团队开源的,面向漫衍式处事架构的轻量级高可用流量节制组件,首要以流量为切入点,从流量节制、熔断降级、体系负载掩护等多个维度来辅佐用户掩护处事的不变性。

各人也许会问:Sentinel 和之前常用的熔断降级库 Netflix Hystrix 有什么异同呢?Sentinel官网有一个比拟的文章,这里摘抄一个总结的表格,详细的比拟可以点此 链接 查察。

限流降级神器,带你解读阿里巴巴开源 Sentinel 实现道理

从比拟的表格可以看到,Sentinel比Hystrix在成果性上还要强盛一些,本文让我们一路来相识下Sentinel的源码,揭开Sentinel的隐秘面纱。

项目布局

将Sentinel的源码fork到本身的github库中,接着把源码clone到当地,然后开始源码阅读之旅吧。

起首我们看一下Sentinel项目标整个布局:

限流降级神器,带你解读阿里巴巴开源 Sentinel 实现道理
  • sentinel-core 焦点模块,限流、降级、体系掩护等都在这里实现
  • sentinel-dashboard 节制台模块,可以对毗连上的sentinel客户端实现可视化的打点
  • sentinel-transport 传输模块,提供了根基的监控处事端和客户端的API接口,以及一些基于差异库的实现
  • sentinel-extension 扩展模块,首要对DataSource举办了部门扩展实现
  • sentinel-adapter 适配器模块,首要实现了对一些常见框架的适配
  • sentinel-demo 样例模块,可参考怎么行使sentinel举办限流、降级等
  • sentinel-benchmark 基准测试模块,对焦点代码的准确性提供基准测试

运行样例

根基上每个框架城市带有样例模块,有的叫example,有的叫demo,sentinel也不破例。

那我们从sentinel的demo中找一个例子运行下看看大抵的环境吧,上面说过了sentinel首要的焦点成果是做限流、降级和体系掩护,那我们就从“限流”开始看sentinel的实现道理吧。

限流降级神器,带你解读阿里巴巴开源 Sentinel 实现道理

可以看到sentinel-demo模块中有许多差异的样例,我们找到basic模块下的flow包,这个包下面就是对应的限流的样例,可是限流也有许多种范例的限流,我们就找按照qps限流的类看吧,其他的限流方法道理上都大差不差。

  1. public class FlowQpsDemo { 
  2.  
  3. private static final String KEY = "abc"; 
  4.  
  5. private static AtomicInteger pass = new AtomicInteger(); 
  6.  
  7. private static AtomicInteger block = new AtomicInteger(); 
  8.  
  9. private static AtomicInteger total = new AtomicInteger(); 
  10.  
  11. private static volatile boolean stop = false; 
  12.  
  13. private static final int threadCount = 32; 
  14.  
  15. private static int seconds = 30; 
  16.  
  17. public static void main(String[] args) throws Exception { 
  18.  
  19. initFlowQpsRule(); 
  20. tick(); 
  21.  
  22. // first make the system run on a very low condition 
  23.  
  24. simulateTraffic(); 
  25.  
  26. System.out.println("===== begin to do flow control"); 
  27.  
  28. System.out.println("only 20 requests per second can pass"); 
  29.  
  30.  
  31. private static void initFlowQpsRule() { 
  32.  
  33. List<FlowRule> rules = new ArrayList<FlowRule>(); 
  34.  
  35. FlowRule rule1 = new FlowRule(); 
  36.  
  37. rule1.setResource(KEY); 
  38.  
  39. // set limit qps to 20 
  40.  
  41. rule1.setCount(20); 
  42.  
  43. // 配置限流范例:按照qps 
  44.  
  45. rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); 
  46.  
  47. rule1.setLimitApp("default"); 
  48.  
  49. rules.add(rule1); 
  50.  
  51. // 加载限流的法则 
  52.  
  53. FlowRuleManager.loadRules(rules); 
  54.  
  55.  
  56. private static void simulateTraffic() { 
  57.  
  58. for (int i = 0; i < threadCount; i++) { 
  59.  
  60. Thread t = new Thread(new RunTask()); 
  61.  
  62. t.setName("simulate-traffic-Task"); 
  63.  
  64. t.start(); 
  65.  
  66.  
  67.  
  68. private static void tick() { 
  69.  
  70. Thread timer = new Thread(new TimerTask()); 
  71.  
  72. timer.setName("sentinel-timer-task"); 
  73.  
  74. timer.start(); 
  75.  
  76.  
  77. static class TimerTask implements Runnable { 
  78.  
  79. @Override 
  80.  
  81. public void run() { 
  82.  
  83. long start = System.currentTimeMillis(); 
  84.  
  85. System.out.println("begin to statistic!!!"); 
  86.  
  87. long oldTotal = 0; 
  88.  
  89. long oldPass = 0; 
  90.  
  91. long oldBlock = 0; 
  92.  
  93. while (!stop) { 
  94.  
  95. try { 
  96.  
  97. TimeUnit.SECONDS.sleep(1); 
  98.  
  99. } catch (InterruptedException e) { 
  100.  
  101.  
  102. long globalTotal = total.get(); 
  103.  
  104. long oneSecondTotal = globalTotal - oldTotal; 
  105.  
  106. oldTotal = globalTotal; 
  107.  
  108. long globalPass = pass.get(); 
  109.  
  110. long oneSecondPass = globalPass - oldPass; 
  111.  
  112. oldPass = globalPass; 
  113.  
  114. long globalBlock = block.get(); 
  115.  
  116. long oneSecondBlock = globalBlock - oldBlock; 
  117.  
  118. oldBlock = globalBlock; 
  119.  
  120. System.out.println(seconds + " send qps is: " + oneSecondTotal); 
  121.  
  122. System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal 
  123.  
  124. + ", pass:" + oneSecondPass 
  125.  
  126. + ", block:" + oneSecondBlock); 
  127.  
  128. if (seconds-- <= 0) { 
  129.  
  130. stop = true; 
  131.  
  132.  
  133.  
  134. long cost = System.currentTimeMillis() - start; 
  135.  
  136. System.out.println("time cost: " + cost + " ms"); 
  137.  
  138. System.out.println("total:" + total.get() + ", pass:" + pass.get() 
  139.  
  140. + ", block:" + block.get()); 
  141.  
  142. System.exit(0); 
  143.  
  144.  
  145.  
  146. static class RunTask implements Runnable { 
  147.  
  148. @Override 
  149.  
  150. public void run() { 
  151.  
  152. while (!stop) { 
  153.  
  154. Entry entry = null; 
  155.  
  156. try { 
  157.  
  158. entry = SphU.entry(KEY); 
  159.  
  160. // token acquired, means pass 
  161.  
  162. pass.addAndGet(1); 
  163.  
  164. } catch (BlockException e1) { 
  165.  
  166. block.incrementAndGet(); 
  167.  
  168. } catch (Exception e2) { 
  169.  
  170. // biz exception 
  171.  
  172. } finally { 
  173.  
  174. total.incrementAndGet(); 
  175.  
  176. if (entry != null) { 
  177.  
  178. entry.exit(); 
  179.  
  180.  
  181.  
  182. Random random2 = new Random(); 
  183.  
  184. try { 
  185.  
  186. TimeUnit.MILLISECONDS.sleep(random2.nextInt(50)); 
  187.  
  188. } catch (InterruptedException e) { 
  189.  
  190. // ignore 
  191.  
  192.  
  193.  
  194.  
  195.  

执行上面的代码后,打印出如下的功效:

限流降级神器,带你解读阿里巴巴开源 Sentinel 实现道理

可以看到,上面的功效中,pass的数目和我们的预期并不沟通,我们预期的是每秒应承pass的哀求数是20个,可是今朝有许多pass的哀求数是高出20个的。

缘故起因是,我们这里测试的代码行使了多线程,留意看 threadCount 的值,一共有32个线程来模仿,而在RunTask的run要领中执行资源掩护时,即在 SphU.entry 的内部是没有加锁的,以是就会导致在高并发下,pass的数目会高于20。

可以用下面这个模子来描写下,有一个TimeTicker线程在做统计,每1秒钟做一次。有N个RunTask线程在模仿哀求,被会见的business code被资源key掩护着,按照法则,每秒只应承20个哀求通过。

因为pass、block、total等计数器是全局共享的,而多个RunTask线程在执行SphU.entry申请获取entry时,内部没有锁掩护,以是会存在pass的个数高出设定的阈值。

限流降级神器,带你解读阿里巴巴开源 Sentinel 实现道理

那为了证明在单线程下限流的正确性与靠得住性,那我们的模子就应该酿成了这样:

限流降级神器,带你解读阿里巴巴开源 Sentinel 实现道理

那接下来我把 threadCount 的值改为1,只有一个线程来执行这个要领,看下详细的限流功效,执行上面的代码后打印的功效如下:

限流降级神器,带你解读阿里巴巴开源 Sentinel 实现道理

可以看到pass数根基上维持在20,可是第一次统计的pass置魅照旧高出了20。这又是什么缘故起因导致的呢?

着实细心看下Demo中的代码可以发明,模仿哀求是用的一个线程,统计功效是用的其它一个线程,统计线程每1秒钟统计一次功效,这两个线程之间是偶然刻上的偏差的。从TimeTicker线程打印出来的时刻戳可以看出来,固然每隔一秒举办统计,可是当前打印时的时刻和上一次的时刻照旧有偏差的,不完满是1000ms的隔断。

要真正验证每秒限定20个哀求,担保数据的精准性,必要做基准测试,这个不是本篇文章的重点,有乐趣的同窗可以去相识下jmh,sentinel中的基准测试也是通过jmh做的。

深入道理

通过一个简朴的示例措施,我们相识了sentinel可以对哀求举办限流,除了限流外,尚有降级和体系掩护等成果。那此刻我们就拨开云雾,深入源码内部去一窥sentinel的实现道理吧。

起首从进口开始: SphU.entry() 。这个要了解去申请一个entry,假如可以或许申请乐成,则声名没有被限流,不然会抛出BlockException,外貌已经被限流了。

从 SphU.entry() 要领往下执行会进入到 Sph.entry() ,Sph的默认实现类是 CtSph ,在CtSph中最终会执行到 entry(ResourceWrapperresourceWrapper,intcount,Object...args)throwsBlockException 这个要领。

我们来看一下这个要领的详细实现:

  1. public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { 
  2.  
  3. Context context = ContextUtil.getContext(); 
  4.  
  5. if (context instanceof NullContext) { 
  6.  
  7. // Init the entry only. No rule checking will occur. 
  8.  
  9. return new CtEntry(resourceWrapper, null, context); 
  10.  
  11.  
  12. if (context == null) { 
  13.  
  14. context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); 
  15.  
  16.  
  17. // Global switch is close, no rule checking will do. 
  18.  
  19. if (!Constants.ON) { 
  20.  
  21. return new CtEntry(resourceWrapper, null, context); 
  22.  
  23.  
  24. // 获取该资源对应的SlotChain 
  25.  
  26. ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); 
  27.  
  28. /* 
  29.  
  30. * Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no 
  31.  
  32. * rule checking will be done. 
  33.  
  34. */ 
  35.  
  36. if (chain == null) { 
  37.  
  38. return new CtEntry(resourceWrapper, null, context); 
  39.  
  40.  
  41. Entry e = new CtEntry(resourceWrapper, chain, context); 
  42.  
  43. try { 
  44.  
  45. // 执行Slot的entry要领 
  46.  
  47. chain.entry(context, resourceWrapper, null, count, args); 
  48.  
  49. } catch (BlockException e1) { 
  50.  
  51. e.exit(count, args); 
  52.  
  53. // 抛出BlockExecption 
  54.  
  55. throw e1; 
  56.  
  57. } catch (Throwable e1) { 
  58.  
  59. RecordLog.info("Sentinel unexpected exception", e1); 
  60.  
  61.  
  62. return e; 
  63.  

这个要领可以分为以下几个部门:

  • 1.对参数和全局设置项做检测,假如不切合要求就直接返回了一个CtEntry工具,不会再举办后头的限流检测,不然进入下面的检测流程。
  • 2.按照包装过的资源工具获取对应的SlotChain
  • 3.执行SlotChain的entry要领
  • 3.1.假如SlotChain的entry要领抛出了BlockException,则将该非常继承向上抛出
  • 3.2.假如SlotChain的entry要领正常执行了,则最后会将该entry工具返回
  • 4.假如上层要领捕捉了BlockException,则声名哀求被限流了,不然哀求能正常执行

个中较量重要的是第2、3两个步调,我们来解析一下这两个步调。

建设SlotChain

起首看一下lookProcessChain的要领实现:

  1. private ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { 
  2.  
  3. ProcessorSlotChain chain = chainMap.get(resourceWrapper); 
  4.  
  5. if (chain == null) { 
  6.  
  7. synchronized (LOCK) { 
  8.  
  9. chain = chainMap.get(resourceWrapper); 
  10.  
  11. if (chain == null) 
  12.  
  13. // Entry size limit. 
  14.  
  15. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { 
  16.  
  17. return null; 
  18.  
  19.  
  20. // 详细结构chain的要领 
  21.  
  22. chain = Env.slotsChainbuilder.build(); 
  23.  
  24. Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1); 
  25.  
  26. newMap.putAll(chainMap); 
  27.  
  28. newMap.put(resourceWrapper, chain); 
  29.  
  30. chainMap = newMap; 
  31.  
  32.  
  33.  
  34.  
  35. return chain; 
  36.  

(编辑:河北网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读