本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长


+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

2024-11(1)

ratelimiter-starter 限流框架简化版

发布于2021-05-29 23:26     阅读(605)     评论(0)     点赞(25)     收藏(4)


一.背景分析

1.基于互联网背景,客户数剧增,服务稳定性受到严峻的挑战,机器CPU和内存秒秒钟打满,导致服务器宕机。

2.为了实现服务高可用,CTO下达的指令一般都是说服务全年需要达到99.99%可用,其实都是吹牛皮的,但是,我们应该朝着这个目标矢志不渝地奋斗终生,从而实现自己的人生价值,进而实现中华民族的伟大复兴。

3.限流可用从不同的角度出发,市面上也有比较有名的阿里的Sentinel,感兴趣的同学可用去深究,本篇文章主要是从Java拦截器的限制用户多次请求的角度来限制用户的访问量,大家也可以基于此框架来定制适用于自己项目的限流策略。

二代码框架

1.工程结构

限流框架代码目录
2.限流Interceptor类RateLimiterHandlerAdapter

@Component
public class RateLimiterHandlerAdapter extends HandlerInterceptorAdapter {

    private static Logger LOGGER = LoggerFactory.getLogger(RateLimiterHandlerAdapter.class);

    @Autowired
    private RateLimiter rateLimiter;

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        long start = System.currentTimeMillis();
        try {
            if (!rateLimiter.rateLimitEnable()) {
                return true;
            }
            ExecuteResult executeResult = rateLimiter.run(request.getRequestURI(), buildParameterMap(request));
            if (executeResult == null) {
                return true;
            }
            if (executeResult.handlerStatus().isReject()) {
                response.setContentType("text/html;charset=UTF-8");
                JSONObject respJsonObj = new JSONObject();
                respJsonObj.put("code", "-99999");
                respJsonObj.put("comments", "服务繁忙,请稍后再试~");
                response.getWriter().print(respJsonObj.toJSONString());
                response.flushBuffer();
                return false;
            }
        } catch (Exception e) {
            LOGGER.error("rateLimit handler error", e);
        }
        return true;
    }

    private Map<String, String> buildParameterMap(HttpServletRequest request) {
        Map<String, String[]> parameterMap = request.getParameterMap();

        Map<String, String> reqParamMap = new HashMap<>();
        if (parameterMap != null && !parameterMap.isEmpty()) {
            Set<Map.Entry<String, String[]>> entrySet = parameterMap.entrySet();
            for (Map.Entry<String, String[]> entry : entrySet) {
                String val = entry.getValue() != null ? entry.getValue()[0] : "";
                reqParamMap.put(entry.getKey(), val);
            }
        }

        Map<String, String> attribute = (Map<String, String>) request.getAttribute(HandlerMapping.URI_TEMPLATE_VARIABLES_ATTRIBUTE);
        if (attribute != null) {
            reqParamMap.putAll(attribute);
        }

        return reqParamMap;
    }
}

3.限流组件配置类RateLimiterHandlerAdapterConfig

@Configuration
@Slf4j
public class RateLimiterHandlerAdapterConfig implements WebMvcConfigurer {

    /**
     * 此处配置接口限流地址,应该配置在配置中心
     */
    @Value("${rateLimiter.path:/test/query,/test/query2}")
    private String rateLimiterPath;

    @Bean
    public RateLimiterHandlerAdapter rateLimitHandlerAdapter() {
        return new RateLimiterHandlerAdapter();
    }

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        if (!StringUtils.isEmpty(rateLimiterPath)) {
            String[] pathPatterns = rateLimiterPath.split(",");
            registry.addInterceptor(rateLimitHandlerAdapter()).addPathPatterns(pathPatterns);
            log.info("add rateLimiter path:{}", pathPatterns);
        }
    }

}

4.限流组件管理器LimiterManager


@Component
public class LimiterManager {

    private static Logger LOGGER = LoggerFactory.getLogger(LimiterManager.class);

    private ObjectMapper mapper = new ObjectMapper();

    private volatile Map<String, LimiterRule> LIMITER_RULES = new HashMap<>();

    @PostConstruct
    public void initRateLimiterRules() {
        Map<String, LimiterRule> rules = getRateLimiterRules();
        LIMITER_RULES = rules;
    }

    /**
     * 此处的配置规则应该从配置中心获取
     *
     * @return
     */
    private Map<String, LimiterRule> getRateLimiterRules() {
        Map<String, LimiterRule> rules = new HashMap<>();
        // 此处自己定义一个规则,实际情况需要从配置中心动态获取
        String limiterRuleValue = "{\"id\":\"1\",\"entrance\":\"/test/query\",\"strategies\":[{\"active\":\"true\",\"ttl\":\"1\",\"type\":\"frequency\"}]}";
        String ruleKey = "testQuery";
        LimiterRule limiterRule = initRateLimiterRule(ruleKey, limiterRuleValue);
        rules.put(ruleKey, limiterRule);
        return rules;
    }

    private LimiterRule initRateLimiterRule(String ruleKey, String value) {
        try {
            LimiterRule rule = mapper.readValue(value, new TypeReference<LimiterRule>() {
            });
            rule.init();
            return rule;
        } catch (Exception e) {
            LOGGER.error(String.format("initRateLimiterRule %s error.", ruleKey), e);
            return null;
        }
    }

    /**
     * 根据具体的入口匹配限流规则列表
     * <p>
     * 待扩展:集群总限流、单机限流
     *
     * @param entrance
     * @param reqParams
     * @return
     */
    public List<LimiterRuleStrategy> findLimitedStrategies(String entrance, Map<String, String> reqParams) {
        List<LimiterRuleStrategy> strategies = Lists.newArrayList();
        Map<String, LimiterRule> rules = LIMITER_RULES;
        if (MapUtils.isNotEmpty(rules) && CollectionUtils.isNotEmpty(rules.values())) {
            for (LimiterRule rule : rules.values()) {
                if (rule != null) {
                    List<LimiterRuleStrategy> findRuleStrategies = rule.findMatchedStrategies(entrance, reqParams);
                    if (CollectionUtils.isNotEmpty(findRuleStrategies)) {
                        strategies.addAll(findRuleStrategies);
                    }
                }
            }
        }

        return strategies;
    }


    /**
     * 此处是服务限流开关,为了测试方便写死打开,实际可以配置在配置中心动态获取
     *
     * @return
     */
    public boolean rateLimitEnable() {
        return true;
    }

}

5.限流规则类LimiterRule


@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
public class LimiterRule implements Serializable {

    private static final long serialVersionUID = 1049243098391717657L;

    private static Logger LOGGER = LoggerFactory.getLogger(LimiterRule.class);

    private String id;

    private String entrance;

    private boolean regex;

    private List<LimiterRuleStrategy> strategies;

    private Pattern entrancePattern;

    public void init() {
        if (StringUtils.isEmpty(entrance)) {
            throw new IllegalArgumentException("rule entrance can't be empty.");
        }
        this.entrance = entrance.trim();

        if (regex) {
            entrancePattern = Pattern.compile(entrance);
        }
        if (CollectionUtils.isEmpty(this.strategies)) {
            return;
        }
        Iterator<LimiterRuleStrategy> itr = this.strategies.iterator();
        while (itr.hasNext()) {
            LimiterRuleStrategy strategy = itr.next();
            try {
                strategy.init(this);
            } catch (Exception e) {
                LOGGER.error("init rateLimiter rule strategy error", e);
            }
        }
    }

    public boolean matched(String entrance) {
        if (this.regex) {
            return entrancePattern.matcher(entrance).matches();
        }
        return this.entrance.equals(entrance);
    }

    public List<LimiterRuleStrategy> findMatchedStrategies(String entrance, Map<String, String> reqParams) {
        List<LimiterRuleStrategy> matchedStrategies = new ArrayList<>();

        if (matched(entrance) && CollectionUtils.isNotEmpty(strategies)) {
            for (LimiterRuleStrategy strategy : strategies) {
                if (strategy.isActive()) {
                    matchedStrategies.add(strategy);
                }
            }
        }

        return matchedStrategies;
    }

}

6.限流策略类LimiterRuleStrategy


@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
public class LimiterRuleStrategy implements Serializable {

    private static final long serialVersionUID = 1049243098391717689L;

    private static Logger LOG = LoggerFactory.getLogger(LimiterRuleStrategy.class);

    private boolean active;

    private int ttl;

    /**
     * 阈值
     */
    private int threshold = 10;

    /**
     * 告警值 %
     */
    private int warningRate = 60;

    private Map<String, String> params;

    private ClusterOneLimiter limiter;

    private LimiterRule limiterRule;

    public LimiterRuleStrategy() {
    }

    public ExecuteResult execute(String path, Map<String, String> reqParams) {
        try {
            return this.limiter.execute(path, reqParams);
        } catch (Exception e) {
            LOG.error("execute limit error; entrance: {},e:{}", path, e);
        }
        return ExecuteResult.CONTINUE;
    }

    public int getWarningLine() {
        return (int) (this.threshold * ((double) this.warningRate / 100));
    }

    public void init(LimiterRule limiterRule) {
        this.limiterRule = limiterRule;
        limiter = new RealClusterOneLimiter(this);
        LOG.info(this.limiterRule.getEntrance() + " init RealClusterOneLimiter success");
    }
}

8.限流组件

public interface ClusterOneLimiter {

	ExecuteResult execute(String entrance, Map<String, String> reqParams) throws Exception;
	
}


public class RealClusterOneLimiter implements ClusterOneLimiter {

    protected Logger LOG = LoggerFactory.getLogger(this.getClass());

    private static final String SEPARATOR_CHAR = ":";

    private LimiterRuleStrategy strategy;

    public RealClusterOneLimiter(LimiterRuleStrategy strategy) {
        this.strategy = strategy;
    }

    public LimiterRuleStrategy getLimiterRuleStrategy() {
        return strategy;
    }

    @Override
    public ExecuteResult execute(String path, Map<String, String> reqParams) throws Exception {
        LimiterRuleStrategy strategy = getLimiterRuleStrategy();
        String userUniqueKey = genUniqueKey(reqParams);
        if (StringUtils.isEmpty(userUniqueKey)) {
            return ExecuteResult.CONTINUE;
        }

        long num = this.requestTimeInTtl(userUniqueKey, strategy.getTtl());
        if (num < strategy.getWarningLine()) {
            LOG.warn("接口:{},目标阈值:{},当前请求量:{},正常执行", strategy.getLimiterRule().getEntrance(), strategy.getThreshold(), num);
            return ExecuteResult.CONTINUE.setCount(num);
        }
        if (num <= strategy.getThreshold()) {
            LOG.warn("接口:{},目标阈值:{},当前请求量:{},已到达告警阈值", strategy.getLimiterRule().getEntrance(), strategy.getThreshold(), num);
            return ExecuteResult.CONTINUE_AND_WARN.setCount(num);
        }
        LOG.warn("接口:{},目标阈值:{},当前请求量:{},已被拦截:{\"code\":\"-99999\",\"comments\":\"服务繁忙,请稍后再试~\"}", strategy.getLimiterRule().getEntrance(), strategy.getThreshold(), num);
        return ExecuteResult.REJECT.setCount(num);
    }

    protected String genUniqueKey(Map<String, String> reqParams) throws Exception {
        LimiterRuleStrategy strategy = getLimiterRuleStrategy();
        StringBuilder uniqueKey = new StringBuilder();
        String userId = reqParams.get("USER_ID");
        if (StringUtils.isEmpty(userId)) {
            throw new Exception("请登录");
        }
        uniqueKey.append(userId).append(SEPARATOR_CHAR).append(strategy.getLimiterRule().getEntrance());
        return uniqueKey.toString();
    }

    /***
     * 统计同一个用户在单位时间内请求的次数
     * @param userUniqueKey
     * @param expire
     * @return
     */
    protected long requestTimeInTtl(String userUniqueKey, int expire) {
        RedisTemplate redisTemplate = ApplicationContextUtil.getBean("redisTemplate");
        Long increment = RedisUtil.incrByExpire(redisTemplate, userUniqueKey, expire);
        return increment != null ? increment.longValue() : 1;
    }


9.pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>ratelimiter-starter</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>ratelimiter-starter</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.1</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.69</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.1</version>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>20.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

三.测试验证

1.说明
为了测试方便直接在starter中 执行测试代码,实际可以剔除这部分代码,同理也可以将redis封装为starter,rateLimiter依赖,从而不在限流框架项目中耦合redis 业务代码,测试时使用Jmeter模拟高频词请求,Jmeter 简单使用之前有做过讲解:Jmeter相关教程

2.测试类

@RestController
public class TestController {

    private static final Logger LOG = LoggerFactory.getLogger(TestController.class);

    @GetMapping("/test/query")
    public void testQuery(@RequestParam("id") String id) {
        LOG.info("请求 /test/query 正常返回,id:{}", id);
    }

    @GetMapping("/test/query2")
    public void testQuery2(@PathVariable("id") String id) {
        LOG.info("请求 /test/query2 正常返回,id:{}", id);
    }
}

3.配置文件
application.yml

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    timeout: 10000
    block-when-exhausted : true
    jedis:
      pool:
        max-idle: 200
        min-idle: 50
        max-active: 1024
        max-wait: 10000

4.测试结果

配置了1秒内同一个用户只能访问十次,用Jmeter启动20个线程后,拦截了10次请求

2021-05-28 14:38:57.851  WARN 5420 --- [io-8080-exec-20] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:1,正常执行
2021-05-28 14:38:57.852  INFO 5420 --- [io-8080-exec-20] c.e.r.starter.test.TestController        : 请求 /test/query 正常返回,id:12345
2021-05-28 14:38:57.869  WARN 5420 --- [io-8080-exec-25] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:2,正常执行
2021-05-28 14:38:57.870  INFO 5420 --- [io-8080-exec-25] c.e.r.starter.test.TestController        : 请求 /test/query 正常返回,id:12345
2021-05-28 14:38:57.871  WARN 5420 --- [nio-8080-exec-4] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:3,正常执行
2021-05-28 14:38:57.871  INFO 5420 --- [nio-8080-exec-4] c.e.r.starter.test.TestController        : 请求 /test/query 正常返回,id:12345
2021-05-28 14:38:57.871  WARN 5420 --- [io-8080-exec-21] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:4,正常执行
2021-05-28 14:38:57.871  INFO 5420 --- [io-8080-exec-21] c.e.r.starter.test.TestController        : 请求 /test/query 正常返回,id:12345
2021-05-28 14:38:57.873  WARN 5420 --- [io-8080-exec-21] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:5,正常执行
2021-05-28 14:38:57.873  INFO 5420 --- [io-8080-exec-21] c.e.r.starter.test.TestController        : 请求 /test/query 正常返回,id:12345
2021-05-28 14:38:57.876  WARN 5420 --- [io-8080-exec-21] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:6,已到达告警阈值
2021-05-28 14:38:57.876  WARN 5420 --- [io-8080-exec-18] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:7,已到达告警阈值
2021-05-28 14:38:57.876  INFO 5420 --- [io-8080-exec-21] c.e.r.starter.test.TestController        : 请求 /test/query 正常返回,id:12345
2021-05-28 14:38:57.877  INFO 5420 --- [io-8080-exec-18] c.e.r.starter.test.TestController        : 请求 /test/query 正常返回,id:12345
2021-05-28 14:38:57.881  WARN 5420 --- [io-8080-exec-24] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:8,已到达告警阈值
2021-05-28 14:38:57.882  INFO 5420 --- [io-8080-exec-24] c.e.r.starter.test.TestController        : 请求 /test/query 正常返回,id:12345
2021-05-28 14:38:57.882  WARN 5420 --- [io-8080-exec-26] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:9,已到达告警阈值
2021-05-28 14:38:57.882  INFO 5420 --- [io-8080-exec-26] c.e.r.starter.test.TestController        : 请求 /test/query 正常返回,id:12345
2021-05-28 14:38:57.883  WARN 5420 --- [io-8080-exec-15] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:10,已到达告警阈值
2021-05-28 14:38:57.883  INFO 5420 --- [io-8080-exec-15] c.e.r.starter.test.TestController        : 请求 /test/query 正常返回,id:12345
2021-05-28 14:38:57.886  WARN 5420 --- [io-8080-exec-13] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:11,已被拦截:{"code":"-99999","comments":"服务繁忙,请稍后再试~"}
2021-05-28 14:38:57.889  WARN 5420 --- [nio-8080-exec-9] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:12,已被拦截:{"code":"-99999","comments":"服务繁忙,请稍后再试~"}
2021-05-28 14:38:57.889  WARN 5420 --- [io-8080-exec-23] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:13,已被拦截:{"code":"-99999","comments":"服务繁忙,请稍后再试~"}
2021-05-28 14:38:57.890  WARN 5420 --- [nio-8080-exec-4] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:14,已被拦截:{"code":"-99999","comments":"服务繁忙,请稍后再试~"}
2021-05-28 14:38:57.890  WARN 5420 --- [io-8080-exec-27] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:15,已被拦截:{"code":"-99999","comments":"服务繁忙,请稍后再试~"}
2021-05-28 14:38:57.891  WARN 5420 --- [io-8080-exec-19] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:16,已被拦截:{"code":"-99999","comments":"服务繁忙,请稍后再试~"}
2021-05-28 14:38:57.895  WARN 5420 --- [io-8080-exec-29] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:17,已被拦截:{"code":"-99999","comments":"服务繁忙,请稍后再试~"}
2021-05-28 14:38:57.897  WARN 5420 --- [nio-8080-exec-8] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:18,已被拦截:{"code":"-99999","comments":"服务繁忙,请稍后再试~"}
2021-05-28 14:38:57.897  WARN 5420 --- [io-8080-exec-28] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:19,已被拦截:{"code":"-99999","comments":"服务繁忙,请稍后再试~"}
2021-05-28 14:38:57.899  WARN 5420 --- [io-8080-exec-22] c.e.r.s.limiter.RealClusterOneLimiter    : 接口:/test/query,目标阈值:10,当前请求量:20,已被拦截:{"code":"-99999","comments":"服务繁忙,请稍后再试~"}

四.源码地址

限流框架ratelimiter-starter源码下载地址

原文链接:https://blog.csdn.net/weixin_47626220/article/details/117364111



所属网站分类: 技术文章 > 博客

作者:飞向远方

链接:http://www.javaheidong.com/blog/article/207883/dacee49f17f532feb537/

来源:java黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

25 0
收藏该文
已收藏

评论内容:(最多支持255个字符)