副问题[/!--empirenews.page--]
Sentinel 是阿里中间件团队开源的,面向漫衍式处事架构的轻量级高可用流量节制组件,首要以流量为切入点,从流量节制、熔断降级、体系负载掩护等多个维度来辅佐用户掩护处事的不变性。
各人也许会问:Sentinel 和之前常用的熔断降级库 Netflix Hystrix 有什么异同呢?Sentinel官网有一个比拟的文章,这里摘抄一个总结的表格,详细的比拟可以点此 链接 查察。
从比拟的表格可以看到,Sentinel比Hystrix在成果性上还要强盛一些,本文让我们一路来相识下Sentinel的源码,揭开Sentinel的隐秘面纱。
项目布局
将Sentinel的源码fork到本身的github库中,接着把源码clone到当地,然后开始源码阅读之旅吧。
起首我们看一下Sentinel项目标整个布局:
- sentinel-core 焦点模块,限流、降级、体系掩护等都在这里实现
- sentinel-dashboard 节制台模块,可以对毗连上的sentinel客户端实现可视化的打点
- sentinel-transport 传输模块,提供了根基的监控处事端和客户端的API接口,以及一些基于差异库的实现
- sentinel-extension 扩展模块,首要对DataSource举办了部门扩展实现
- sentinel-adapter 适配器模块,首要实现了对一些常见框架的适配
- sentinel-demo 样例模块,可参考怎么行使sentinel举办限流、降级等
- sentinel-benchmark 基准测试模块,对焦点代码的准确性提供基准测试
运行样例
根基上每个框架城市带有样例模块,有的叫example,有的叫demo,sentinel也不破例。
那我们从sentinel的demo中找一个例子运行下看看大抵的环境吧,上面说过了sentinel首要的焦点成果是做限流、降级和体系掩护,那我们就从“限流”开始看sentinel的实现道理吧。
可以看到sentinel-demo模块中有许多差异的样例,我们找到basic模块下的flow包,这个包下面就是对应的限流的样例,可是限流也有许多种范例的限流,我们就找按照qps限流的类看吧,其他的限流方法道理上都大差不差。
- public class FlowQpsDemo {
-
- private static final String KEY = "abc";
-
- private static AtomicInteger pass = new AtomicInteger();
-
- private static AtomicInteger block = new AtomicInteger();
-
- private static AtomicInteger total = new AtomicInteger();
-
- private static volatile boolean stop = false;
-
- private static final int threadCount = 32;
-
- private static int seconds = 30;
-
- public static void main(String[] args) throws Exception {
-
- initFlowQpsRule();
- tick();
-
- // first make the system run on a very low condition
-
- simulateTraffic();
-
- System.out.println("===== begin to do flow control");
-
- System.out.println("only 20 requests per second can pass");
-
- }
-
- private static void initFlowQpsRule() {
-
- List<FlowRule> rules = new ArrayList<FlowRule>();
-
- FlowRule rule1 = new FlowRule();
-
- rule1.setResource(KEY);
-
- // set limit qps to 20
-
- rule1.setCount(20);
-
- // 配置限流范例:按照qps
-
- rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
-
- rule1.setLimitApp("default");
-
- rules.add(rule1);
-
- // 加载限流的法则
-
- FlowRuleManager.loadRules(rules);
-
- }
-
- private static void simulateTraffic() {
-
- for (int i = 0; i < threadCount; i++) {
-
- Thread t = new Thread(new RunTask());
-
- t.setName("simulate-traffic-Task");
-
- t.start();
-
- }
-
- }
-
- private static void tick() {
-
- Thread timer = new Thread(new TimerTask());
-
- timer.setName("sentinel-timer-task");
-
- timer.start();
-
- }
-
- static class TimerTask implements Runnable {
-
- @Override
-
- public void run() {
-
- long start = System.currentTimeMillis();
-
- System.out.println("begin to statistic!!!");
-
- long oldTotal = 0;
-
- long oldPass = 0;
-
- long oldBlock = 0;
-
- while (!stop) {
-
- try {
-
- TimeUnit.SECONDS.sleep(1);
-
- } catch (InterruptedException e) {
-
- }
-
- long globalTotal = total.get();
-
- long oneSecondTotal = globalTotal - oldTotal;
-
- oldTotal = globalTotal;
-
- long globalPass = pass.get();
-
- long oneSecondPass = globalPass - oldPass;
-
- oldPass = globalPass;
-
- long globalBlock = block.get();
-
- long oneSecondBlock = globalBlock - oldBlock;
-
- oldBlock = globalBlock;
-
- System.out.println(seconds + " send qps is: " + oneSecondTotal);
-
- System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
-
- + ", pass:" + oneSecondPass
-
- + ", block:" + oneSecondBlock);
-
- if (seconds-- <= 0) {
-
- stop = true;
-
- }
-
- }
-
- long cost = System.currentTimeMillis() - start;
-
- System.out.println("time cost: " + cost + " ms");
-
- System.out.println("total:" + total.get() + ", pass:" + pass.get()
-
- + ", block:" + block.get());
-
- System.exit(0);
-
- }
-
- }
-
- static class RunTask implements Runnable {
-
- @Override
-
- public void run() {
-
- while (!stop) {
-
- Entry entry = null;
-
- try {
-
- entry = SphU.entry(KEY);
-
- // token acquired, means pass
-
- pass.addAndGet(1);
-
- } catch (BlockException e1) {
-
- block.incrementAndGet();
-
- } catch (Exception e2) {
-
- // biz exception
-
- } finally {
-
- total.incrementAndGet();
-
- if (entry != null) {
-
- entry.exit();
-
- }
-
- }
-
- Random random2 = new Random();
-
- try {
-
- TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
-
- } catch (InterruptedException e) {
-
- // ignore
-
- }
-
- }
-
- }
-
- }
-
- }
执行上面的代码后,打印出如下的功效:
可以看到,上面的功效中,pass的数目和我们的预期并不沟通,我们预期的是每秒应承pass的哀求数是20个,可是今朝有许多pass的哀求数是高出20个的。
缘故起因是,我们这里测试的代码行使了多线程,留意看 threadCount 的值,一共有32个线程来模仿,而在RunTask的run要领中执行资源掩护时,即在 SphU.entry 的内部是没有加锁的,以是就会导致在高并发下,pass的数目会高于20。
可以用下面这个模子来描写下,有一个TimeTicker线程在做统计,每1秒钟做一次。有N个RunTask线程在模仿哀求,被会见的business code被资源key掩护着,按照法则,每秒只应承20个哀求通过。
因为pass、block、total等计数器是全局共享的,而多个RunTask线程在执行SphU.entry申请获取entry时,内部没有锁掩护,以是会存在pass的个数高出设定的阈值。
那为了证明在单线程下限流的正确性与靠得住性,那我们的模子就应该酿成了这样:
那接下来我把 threadCount 的值改为1,只有一个线程来执行这个要领,看下详细的限流功效,执行上面的代码后打印的功效如下:
可以看到pass数根基上维持在20,可是第一次统计的pass置魅照旧高出了20。这又是什么缘故起因导致的呢?
着实细心看下Demo中的代码可以发明,模仿哀求是用的一个线程,统计功效是用的其它一个线程,统计线程每1秒钟统计一次功效,这两个线程之间是偶然刻上的偏差的。从TimeTicker线程打印出来的时刻戳可以看出来,固然每隔一秒举办统计,可是当前打印时的时刻和上一次的时刻照旧有偏差的,不完满是1000ms的隔断。
要真正验证每秒限定20个哀求,担保数据的精准性,必要做基准测试,这个不是本篇文章的重点,有乐趣的同窗可以去相识下jmh,sentinel中的基准测试也是通过jmh做的。
深入道理
通过一个简朴的示例措施,我们相识了sentinel可以对哀求举办限流,除了限流外,尚有降级和体系掩护等成果。那此刻我们就拨开云雾,深入源码内部去一窥sentinel的实现道理吧。
起首从进口开始: SphU.entry() 。这个要了解去申请一个entry,假如可以或许申请乐成,则声名没有被限流,不然会抛出BlockException,外貌已经被限流了。
从 SphU.entry() 要领往下执行会进入到 Sph.entry() ,Sph的默认实现类是 CtSph ,在CtSph中最终会执行到 entry(ResourceWrapperresourceWrapper,intcount,Object...args)throwsBlockException 这个要领。
我们来看一下这个要领的详细实现:
- public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
-
- Context context = ContextUtil.getContext();
-
- if (context instanceof NullContext) {
-
- // Init the entry only. No rule checking will occur.
-
- return new CtEntry(resourceWrapper, null, context);
-
- }
-
- if (context == null) {
-
- context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
-
- }
-
- // Global switch is close, no rule checking will do.
-
- if (!Constants.ON) {
-
- return new CtEntry(resourceWrapper, null, context);
-
- }
-
- // 获取该资源对应的SlotChain
-
- ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
-
- /*
-
- * Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no
-
- * rule checking will be done.
-
- */
-
- if (chain == null) {
-
- return new CtEntry(resourceWrapper, null, context);
-
- }
-
- Entry e = new CtEntry(resourceWrapper, chain, context);
-
- try {
-
- // 执行Slot的entry要领
-
- chain.entry(context, resourceWrapper, null, count, args);
-
- } catch (BlockException e1) {
-
- e.exit(count, args);
-
- // 抛出BlockExecption
-
- throw e1;
-
- } catch (Throwable e1) {
-
- RecordLog.info("Sentinel unexpected exception", e1);
-
- }
-
- return e;
-
- }
这个要领可以分为以下几个部门:
- 1.对参数和全局设置项做检测,假如不切合要求就直接返回了一个CtEntry工具,不会再举办后头的限流检测,不然进入下面的检测流程。
- 2.按照包装过的资源工具获取对应的SlotChain
- 3.执行SlotChain的entry要领
- 3.1.假如SlotChain的entry要领抛出了BlockException,则将该非常继承向上抛出
- 3.2.假如SlotChain的entry要领正常执行了,则最后会将该entry工具返回
- 4.假如上层要领捕捉了BlockException,则声名哀求被限流了,不然哀求能正常执行
个中较量重要的是第2、3两个步调,我们来解析一下这两个步调。
建设SlotChain
起首看一下lookProcessChain的要领实现:
- private ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
-
- ProcessorSlotChain chain = chainMap.get(resourceWrapper);
-
- if (chain == null) {
-
- synchronized (LOCK) {
-
- chain = chainMap.get(resourceWrapper);
-
- if (chain == null)
-
- // Entry size limit.
-
- if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
-
- return null;
-
- }
-
- // 详细结构chain的要领
-
- chain = Env.slotsChainbuilder.build();
-
- Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);
-
- newMap.putAll(chainMap);
-
- newMap.put(resourceWrapper, chain);
-
- chainMap = newMap;
-
- }
-
- }
-
- }
-
- return chain;
-
- }
(编辑:河北网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|