• Home

使用Micrometer进行业务指标上报

[TOC]

什么是指标

指标,metric,是指对事物的度量。通常是对事物的属性进行采样测量,记录下属性的值。采样结果记录为:<指标名,时间戳,值>,例如对当前大气温度的测量<weather.temperature, 2018-07-25T00:00:00Z, 30>。但上面的记录很笼统,无法反应是哪个地区的气温,无法满足按省市区进行汇总、细化的统计分析(即多维分析)的需求。因此一般地,会把记录格式拓展为<指标名,标签集,时间戳,值>,标签集是形如city: shenzhen, district: nanshan这样的代表维度的标签名值对。值可以是整数、浮点数、字符串。大多数的指标存储系统(时序数据库)只支持数值类型的值,因此本文也只讨论数值类型。

指标上报/表示格式

时序数据库是以<指标名,标签集,时间戳,值>作为一行存储一次采样的数据。类似以下:

sys.cpu.user host=webserver01        1356998400  50
sys.cpu.user host=webserver01,cpu=0  1356998400  1
sys.cpu.user host=webserver01,cpu=1  1356998400  0
sys.cpu.user host=webserver01,cpu=2  1356998400  2
sys.cpu.user host=webserver01,cpu=3  1356998400  0
...
sys.cpu.user host=webserver01,cpu=63 1356998400  1

实际时序数据库的存储并非以上述文本格式,比上面要复杂得多。

指标定义

指标定义是指指标的元数据,用于帮助对指标数据进行解读。一般包括:
* metric_name,指标名,如web.response.size
* type,指标类型, 如Gauge/Counter等
* interval,上报(采样)间隔,
* unit_name,字符串形式单位,如Byte/Bit
* description,指标的描述
* tags,指标支持的标签(及取值范围),如app.web.request支持的tag可能包括area,host,instance分表机房、主机和实例

指标类型

根据指标监控的对象个数、监视目的和含义的不同,指标类型大体可以分为Gauge、Counter、DistributionSummary、Timer四种。

Gauge 刻度

1a7f4299-8026-4197-835f-52ad109277b1
Gauge是监视单个对象属性的当前状态值,时序数据库存储Guage指标的每次采样记录(时间,值)。

sys.load host=webserver01,cpu=0  1356998400  11
sys.load host=webserver01,cpu=0  1356998401  21
sys.load host=webserver01,cpu=0  1356998402  15
...

sys.cpu.user host=webserver01,cpu=0就是webserver01的0号cpu的负载属性。

Gauge类型的指标值是随时间连续变化的。Gauge指标的时间序列图(非采样)类似下图,可以看出数值随时间上下波动,而且是连续的。
gague_unsmapled_ts
像CPU当前负载、消息队列中当前的消息数、JVM当前使用内存大小,这些都是典型的Gauge指标。Gauge是对单个对象属性的度量。

聚合指标

聚合指标是对多个对象的度量进行过统计处理的后产生的指标。

Counter 计数器

计数就是监视对象、事件发生的次数。其中一种办法就是每次检测到一个对象就上报一个记录,那么就是如下

web.request.accept host=host1 1356998400 1
web.request.accept host=host1 1356998401 1
web.request.accept host=host1 1356998401 1
web.request.accept host=host1 1356998402 1

指标值1代表一次请求,可以看出同一时间(戳)可能有多次请求发生。通过sum()就可以计算出总数。
另一种办法是利用一个计数器,记下所有请求的累计数,再对这个计数器采样上报,这就是Counter类型。
419SoKslhU

如果把Counter的值进行上报(不清零),展示出来会得到下图的效果。
counter_ts
Counter类型的指标值是随时间单调递增,而且是跳跃式增加的。像nginx启动以来一共处理的请求数、用户认证服务中认证不通过的次数,这些都是Counter指标。

Counter指标本质上是对象个数聚合后产生的Gauge指标。Gauge数值有升有降,而Counter只数值是一直上升的。

所有聚合指标都是有意义的Gauge指标。

Counter指标又可以分为两种:每次上报后清零,每次上报后不清零。

复合指标

复合指标是对一组对象\事件的属性的复合统计。在上报和存储时是以多个聚合指标联合构成的。
以web服务处理请求为例,一个请求响应就是一个事件,响应时间就是监控的指标。如果把所有请求的响应时间上报后进行展示,会得到下图这样的散点图。
QQ20180725-134113@2x
这样的散点图无法看出某一时间段的请求数量,无法看出平均响应时间,无法知晓大于1s和小于1s的响应的数量。也就是散点图无法以一个数值来说明事件发展的整体趋势。因此我们需要对这些响应先进行预处理(统计),生产成多个新的指标,逻辑上新指标都是原指标的组成部分。

DistributionSummary

DistributionSummary是用于跟踪事件的分布情况,有多个指标组成:
* count,事件的个数,聚合指标,如响应的个数
* sum,综合,聚合指标,如响应大小的综合
* histogram,分布,聚合指标,包含le标签用于区分bucket,例如web.response.size.historgram{le=512} = 99,表示响应大小不超过512(Byte)的响应个数是99个。一般有多个bucket,如le=128,le=256,le=512,le=1024,le=+Inf等。
每个bucket展示为一条时间序列,会得到类似下面的图。
QQ20180725-184823@2x

  • percentile(quantile),百分位数,聚合指标,包含percentile标签用于区分不同的百分位,例如web.response.size.percentile{p=90) = 512,表示90%的响应大小都小于512。一般有多个percentile,如p50,p75,p90,p99。
    每个百分位展示为一条时间序列,会得到类似下面的图。
    QQ20180725-191921@2x

Timer

Timer是DistributionSummary的特化,专门用于计时类的指标,可以对记录的时间值(duration)进行单位换算。

指标怎么展示?

采样什么的图标进行展示指标要根据指标的实际意义和使用场景决定。

展示最新

即只展示最新的采样点的数值。如下图展示进程的累计运行时长
QQ20180725-161758@2x

展示历史

时间序列,是指唯一指标名+唯一标签名值组合的所有采样数据点组成的序列。一般采用折线图,横坐标表示时间,纵坐标表示数值,一个图上可以展示多条时间序列,方便对比。
key-pg-classi
通过time shift,可以再同一个图展示一个时间序列的两个时间段。下图展示今日(红色)和昨日(蓝色)的接口访问量。
QQ20180725-163952@2x

Counter与Rate

Counter类型(不清零型)的时间序列,如果不做处理,展示出来就是一个单调增长的折线,如下图所示。
counter_ts
实际生产中我们希望知道某个时间段的请求速率,因此需要做上述时间序列做转换。通常是对时间段(period)第一个数据点和最后一个数据点进行delta计算,delta值作为该时间段的rate。转换后的时间序列类似下图,可以看出每天早上7点左右请求数有明显的增加。
QQ20180725-163952@2x
时序数据库提供rate()聚合函数用户上述需求的计算,rate()实现比delta要复杂很多,要处理计数器清零、反转、数据点丢失等情况。rate可以看做是原时间序列函数的一阶求导。

什么是业务指标?

按指标的来源进行分类。纵向实现了自底向上各层级的监控,包括网络、服务器、系统层、应用层、业务层,如下图所示:
836418C4-0D04-4A6F-AD92-21DF75E819A5
* 网络监控,包括机房出口VIP是否存活,流量是否正常,机房间专线流量和质量是否正常,以及网络设备及流量是否正常等
* 服务器监控,包括服务器是否宕机,服务器硬件是否有异常等。
* 系统监控,包括CPU、内存、磁盘、网卡等各项指标。
* 应用监控,包括:端口连通性,进程存活,QPS,连接数等指标。常用的开源中间件如nginx、redis、mysql等的监控数据也算是这一类的。
* 业务监控,包括业务关心的各项指标,例如登录验证成功/失败次数、某类型业务错误的次数、订单数等,业务指标通常与业务逻辑需求有关。

怎么上报业务指标?

通过在项目中使用micrometer库进行打点,由micrometer异步上报到监控系统。下面以使用spring boot 1.5的api-gatway项目为例。

micrometer简介

  • spring boot 2的默认指标库
  • 支持指标的多维,即Tag
  • 支持上报到Atlas、Graphite、Influx、Prometheus、Datadog等监控系统,可以看做Metric版的SL4J,屏蔽了各监控系统命名差异、Counter上报方式差异、percentile计算差异。
  • 支持Gauge、Counter、DistributionSummary、Timer(LongTimer)等Metric
  • 内置了一些指标,如JVM指标
  • 提供大量针对后端监控系统的spring boot starter,配置简单,文档齐全

引入micrometer包

    <properties>
        <micrometer.version>1.0.5</micrometer.version>
    </properties>

    <dependencies>
        ...
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-spring-legacy</artifactId>
            <version>${micrometer.version}</version>
        </dependency>
        <dependency>
            <groupId>com.yunzhijia.devops</groupId>
            <artifactId>micrometer-registry-falcon</artifactId>
            <version>1.0.5</version>
        </dependency>
        ...
    </dependencies>

    <repositories>
        ...
        <repository>
            <id>nexus-release</id>
            <url>http://192.168.0.22/nexus/content/repositories/cloudhub-release</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        ...
    </repositories>

io.micrometer:micrometer-spring-legacy是micrometer官方提供的spring boot 1.5的stater。com.yunzhijia.devops:micrometer-registry-falcon是自定义的micrometer插件,专门用于按格式要求上报到云之家的监控系统。

配置

application.properties里配置云之家的监控系统的上报接口的地址、上报时间间隔。

management.metrics.export.falcon.url=http://localhost:6060/api/push
management.metrics.export.falcon.hostTag=instance
management.metrics.export.falcon.step=30s

新建MicrometerConfig.java

@Configuration
public class MicrometerConfig {

    private static Logger logger = LoggerFactory.getLogger(MicrometerConfig.class);

    @Value("${spring.application.name}")
    private String appName;

    private String getInstanceIp() {
        String addr = null;
        try {
            InetAddress ip = InetAddress.getLocalHost();
            addr = ip.getHostAddress();
        } catch (UnknownHostException e) {

        }
        logger.info("ip addr: {}", addr);
        return addr;
    }

    @Bean
    MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
        return registry -> {
            registry.config().commonTags("app", appName);
            String addr = getInstanceIp();
            {
                registry.config().commonTags("instance", addr);
            }
            registry.config().meterFilter(
                    new MeterFilter() {
                        @Override
                        public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) {

                            if (id.getType() == Meter.Type.TIMER) {
                                return DistributionStatisticConfig.builder()
                                        .percentilesHistogram(true)
                                        .percentiles(0.5, 0.75, 0.90)
                                        .sla(Duration.ofMillis(50).toNanos(),
                                                Duration.ofMillis(100).toNanos(),
                                                Duration.ofMillis(200).toNanos(),
                                                Duration.ofMillis(500).toNanos(),
                                                Duration.ofSeconds(1).toNanos(),
                                                Duration.ofSeconds(2).toNanos(),
                                                Duration.ofSeconds(5).toNanos(),
                                                Duration.ofSeconds(10).toNanos())
                                        .minimumExpectedValue(Duration.ofMillis(1).toNanos())
                                        .maximumExpectedValue(Duration.ofSeconds(10).toNanos())
                                        .build()
                                        .merge(config);
                            } else {
                                return config;
                            }
                        }
                    });
        };
    }

    @Bean(name = "requestCounter")
    Counter requestCounter() {
        return Counter.builder(MetricDef.C_REQUEST)
                .register(Metrics.globalRegistry);
    }

    @Bean(name = "methodTypeRpcCounter")
    Counter methodTypeRpcCounter() {
        return Counter.builder(MetricDef.C_METHOD_TYPE).tags("type", "rpc")
                .register(Metrics.globalRegistry);
    }

    @Bean(name = "methodTypeHttpCounter")
    Counter methodTypeHttpCounter() {
        return Counter.builder(MetricDef.C_METHOD_TYPE).tags("type", "http")
                .register(Metrics.globalRegistry);
    }

上述代码配置了
1. 所有Metric的公共标签app和instance,app表示来自哪个服务,instance表示来自服务的实例(所有服务端口不重叠,故用IP表示实例)。
2. 对Timer进行设置,percentilesHistogram(true)表示上报分布的所有bucket;统计和上报的百分位数包括0.5,0.75,0.90;sla()额外添加了[50ms,100ms,200ms,500ms,1s,2s,5s,10s]这些bucket;并限定统计的耗时范围在0至10s之间。
3. 定义了3个不同的Counter,总请求数,rpc和http类型接口的请求数。定义为bean,之后利用Spring注入的方式可以减少Counter对象的查找耗时。

打点统计

在相应的业务代码文件引入counter,并打点。

@Component
public class MethodExistedPreFilter extends ZuulFilter{
    ...

    @Autowired
    @Qualifier("methodTypeRpcCounter")
    Counter methodTypeRpcCounter;

    @Autowired
    @Qualifier("methodTypeHttpCounter")
    Counter methodTypeHttpCounter;

    ...

    @Override
    public Object run() {

        ...
        if (StringUtils.equalsIgnoreCase(routeVo.getProtocol(), "rpc")) {
            methodTypeRpcCounter.increment();

        } else {
            methodTypeHttpCounter.increment();
        }
        ...
    }

    ...

}

QQ20180725-194449@2x

什么不该/不需要上报?

  • 内部服务之间的相互调用的次数、耗时不需要上报。因为已经由PinPoint统一进行采集和上报了,指标监控系统回去PinPoint拉取这些数据。
  • 如果tag值是无限的,不能上报。比如你可能想按appId统计接口调用次数,don’t do that.

mac使用virtualbox和cloudera manager搭建大数据集群

装备集群

利用virtualbox,创建4个guest虚拟机,一个作为管理机,其他3个作为worker。虚拟机网络设置为NAT + HostOnly。NAT保证虚拟机能访问WAN,但是虚拟机之间无法通信。Host Only保证Mac和4个虚拟机之间处在一个网络192.168.56.0/24,可以相互通信。

  • mac 192.168.56.1 安装virtualbox
  • master 192.168.56.10 虚拟机,安装cm server
  • app1 192.168.56.11 虚拟机,安装cm agent
  • app2 192.168.56.12 虚拟机,安装cm agent
  • app3 192.168.56.13 虚拟机,安装cm agent

安装cloudera manager

在master上运行

# wget http://archive.cloudera.com/cm5/installer/5.15.0/cloudera-manager-installer.bin
# chmod u+x cloudera-manager-installer.bin
# ./cloudera-manager-installer.bin

有些包很大,可能会卡很久,或者下载很慢。可以手动下载到/var/cache/yum/x86_64/6/cloudera-manager/packages,然后重新运行./cloudera-manager-installer.bin

最好先手动下载
– oracle-j2sdk1.7-1.7.0+update67-1.x86_64.rpm
– cloudera-manager-agent-5.15.0-1.cm5150.p0.62.el6.x86_64.rpm
– cloudera-manager-server-5.15.0-1.cm5150.p0.62.el6.x86_64.rpm
– cloudera-manager-daemons-5.15.0-1.cm5150.p0.62.el6.x86_64.rpm
然后cp/scp到四个虚拟机的/var/cache/yum/x86_64/6/cloudera-manager/packages下。

下载
– CDH-5.15.0-1.cdh5.15.0.p0.21-el6.parcel
– CDH-5.15.0-1.cdh5.15.0.p0.21-el6.parcel.sha1

扔到master的/opt/cloudera/parcel-repo

配置集群

访问http://192.168.56.10:7180/ ,账号admin,密码admin,选择一些配置选项,加入机器,填写ssh用户名密码(所有机器使用同样账号密码,关闭iptables和selinux)。然后就开始安装jdk、cm agent。

使用maven解决微服务开发中的一些日常问题

1.多模块

微服务是按业务边界划分的,通常包含一个可供调用的服务和一些异步后台进程、定时任务,这些运行实际不同但又紧密的关联的进程一起完成这个业务的数据处理。因此可以把一个微服务拆分为多个模块,如api定义模块(以dubbo这类rpc微服务为例,通常会把服务接口构建为一个jar包)、服务实现模块、定时任务模块。而服务实现和定时任务会共用一些工具类代码,又可以提取为一个模块。
在工作经历中,见过一些人把服务api放在一个项目里,服务实现放在一个项目里,自动任务放在一个独立的项目里,且三个项目都是独立的git项目。每个模块使用独立的pom.xml,则很多共同的依赖和插件配置就需要写多次,在修改版本时难免出现不一致。
其实利用maven的「继承与聚合」功能,是一种更优雅的多模块组织方式。既可以把多个模块放在一个目录里管理(一个git项目,方便查看完整的变更历史),又解决多个模块依赖和插件管理问题,同时还能解决一条命令同时构建多个模块的问题。

以用户服务为例,项目结构可以组织成如下形式:

user
├── pom.xml
├── user-api
│   ├── pom.xml
│   └── src
│       └── main
│           └── java
└── user-service
    ├── pom.xml
    └── src
        └── main
            ├── java
            └── resources

user目录作为是一个父模块,而user-api和user-service是user模块的子模块,子模块和父模块以嵌套方式组织。
user/pom.xml的内容如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>wingyiu</groupId>
    <artifactId>user</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>pom</packaging>
    <name>User Parent</name>

    <modules>
        <module>user-api</module>
        <module>user-service</module>
    </modules>

    <properties>
        <java.version>1.8</java.version>
        <java.source.version>1.8</java.source.version>
        <java.target.version>1.8</java.target.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>1.5.14.RELEASE</spring-boot.version>
        <dubbo.version>2.6.2</dubbo.version>
        <dubbo.starter.version>0.1.1</dubbo.starter.version>
        <zkclient.version>0.2</zkclient.version>
        <zookeeper.version>3.4.9</zookeeper.version>
        <curator-framework.version>2.12.0</curator-framework.version>
        <!-- Build args -->
        <argline>-server -Xms256m -Xmx512m -XX:PermSize=64m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8
-Djava.net.preferIPv4Stack=true
        </argline>
        <arguments/>

        <!-- Maven plugins -->
        <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
        <maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>
        <maven-source-plugin.version>3.0.1</maven-source-plugin.version>
        <maven-jacoco-plugin.version>0.8.1</maven-jacoco-plugin.version>
        <maven-gpg-plugin.version>1.5</maven-gpg-plugin.version>
        <apache-rat-plugin.version>0.12</apache-rat-plugin.version>
        <maven-release-plugin.version>2.5.3</maven-release-plugin.version>
        <maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <!-- Spring Boot -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot</artifactId>
                <version>${spring-boot.version}</version>
            </dependency>
            ...省略
        </dependencies>
    </dependencyManagement>

    <repositories>
        <repository>
            <id>nexus-snapshot</id>
            <url>http://192.168.0.22/nexus/content/repositories/gayhub-snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>false</enabled>
            </releases>
        </repository>
        <repository>
            <id>nexus-release</id>
            <url>http://192.168.0.22/nexus/content/repositories/gayhub-release</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        <!-- Repositories to allow snapshot and milestone BOM imports during development.
            This section is stripped by the flatten plugin during install/deploy. -->
        <repository>
            <id>central</id>
            <url>https://repo.maven.apache.org/maven2</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        ...省略
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>central</id>
            <url>https://repo.maven.apache.org/maven2</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
        ...省略
    </pluginRepositories>

</project>

其中
<packaging>pom</packaging>
表明这是一个聚合模块,

<modules>
    <module>user-api</module>
    <module>user-service</module>
</modules>

modules元素里声明了两个聚合模块。
从聚合模块运行man clean install时,maven会首先解析觉和模块的POM,分析要构建的模块,并计算出一个反应堆构建顺序,然后依次构建各个模块。即实现了一次构建多个模块。
properties原理声明了共用的一次属性,子模块可以通过${spring-boot.version}这样的形式引用。

<dependencyManagement>
    <dependencies>
    </dependencies>
</dependencyManagement>

声明了共同的依赖,子模块可以继承这些依赖配置,dependencyManagement下生命的依赖不会实际引入,不过它能约束子模块dependencies下的依赖使用。同理

<build>
    <pluginManagement>
        <plugins>

帮助子模块进行插件管理。

子模块user-api的pom.xml内容如下


<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>wingyiu</groupId>
        <artifactId>user</artifactId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>user-api</artifactId>
    <packaging>jar</packaging>
</project>

其中, 元素指明了其父模块为user。不难发现user的POM,既是聚合POM,又是父POM,实际项目中经常如此。子POM并没有声明groupId和version,因为子模块隐式地从父模块继承了这两个元素。由于user-api仅包含以下interface定义,DO定义,没有依赖,因此pom文件没有任何dependency。模块打包方式为默认的jar。


<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>user</artifactId>
        <groupId>wingyiu</groupId>
        <version>1.0.0-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>

    <artifactId>user-service</artifactId>
    <packaging>jar</packaging>

    <properties>
        <micrometer.version>1.0.5</micrometer.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
        </dependency>

        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-prometheus</artifactId>
            <version>${micrometer.version}</version>
        </dependency>

    </dependencies>
</project>

同理user-service的pom中parent指明了父模块为user。特别的

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
        </dependency>

并没有制定version,还剩去了依赖范围。这是因为完整的依赖声明已经包含在父POM中,子模块只需配置groupId和artifactId就能获得对应的依赖信息。这种依赖管理机制似乎不能减少太多pom配置,但是能够统一项目范围的依赖版本。有父模块统一声明version,子模块不声明version,就不会发生过个子模块使用的依赖版本不一致的情况,这可以帮助降低依赖冲突的几率。

2.只构建并发布api包

为了加快项目进度,通常服务提供方和调用方会先定好接口定义(包括接口名,参数),调用方先以约定好的接口进行逻辑编写,等提供方正在实现好之后再进行联调。在这种情况下,就会要求先发布jar包。
此时可以利用maven的反应堆裁剪功能。反应堆是指所有模块组成的一个构建结构。对于多模块项目,反应堆包含了模块之间继承与依赖关系,从而能够自动计算出合理的模块构建顺序。模块间的依赖关系会将反应堆构成一个有向非循环图(DAG)。
默认情况,在聚合模块执行mvn clean install会执行整个反应堆,maven提供了很多命令行选项支持裁剪反应堆:
– -am, –also-make,同时构建所列模块的依赖模块
– -amd, –also-make-dependents,同时构建依赖于所列模块的模块
– -pl,–projects 构建指定的模块,模块间用逗号分隔
– -rf, –resume-from 从指定的模块恢复反应堆

因此要单独构建api模块,可以运行如下命令
mvn clean install -pl user-api

3.开发中的接口不断地改变

这种情形很常见,一个开发中的接口,参数不断的调整,此时调用方也要不断的修改,即调用方需要不断的更新依赖服务的api jar包。一般地,maven把依赖从远程仓库拉下来后会进行缓存。因此经常会出现要手动删除本地缓存包,或者增加版本号,但服务还在开发不断增加版本号显然不合适。
maven提供了更加简单有效的解决方法:SNAPSHOT,快照版本。当依赖版本为快照版本的时候,maven会自动找到最新的快照。即每次都会去仓库拉去元数据,计算真实的时间戳格式版本号,然后拉取对应的jar,保证本地使用到最新的jar。

注:默认情况下,maven每天检查一次更新(有仓库udatePolicy控制),用户也可以用-U参数强制检查更新,如mvn clean install -U

因此,快发中,我们可以把api模块(上例user-api)的版本号设为1.0.0-SNAPSHOT

发布过程中,maven自动为构件打上时间戳,如1.0.0-20180720.221414-13。

4.更新所有模块的版本号

当旧版本a.b.c已经发布,要进行下一轮迭代时,要把父模块版本号改为a.b.d-SNAPSHOT,测试通过后正式发布,要改为正式版本号a.b.d。由于每个子模块的<parent><version>都要于父POM一致,手动修改每个pom.xml显然很容易出错。

可以使用org.codehaus.mojo:versions-maven-plugin插件

5.使用企业内部私有maven仓库

假设以nexus搭建私有仓库,snapshot版本和正式版本的构件发布到不同仓库,可以在父POM中配置repository

<repositories>
        <repository>
            <id>nexus-snapshot</id>
            <url>http://192.168.0.22/nexus/content/repositories/gayhub-snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>false</enabled>
            </releases>
        </repository>
        <repository>
            <id>nexus-release</id>
            <url>http://192.168.0.22/nexus/content/repositories/gayhub-release</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
</repositories>

<repository><id>是和$MAVEN/settings.xml中的<server><id>对应的,<server>配置repository的账号和密码。

出于规范考虑,开发人员可以自由提交snapshot版本构件,正式版本构件只能在测试和审核后才能由发布系统进行提交。因此release仓库的密码不宜开放给开发人员,开发人员只能得到snapshot仓库的账号密码。

6.快速开发新的微服务

可以利用maven的archtype功能。首先根据代码规范,编写一个包含基本的、可复用逻辑的微服务,以此作为模板,把模板项目代码中的一些类名、变量名、文件名使用模板变量替换,然后编写自定义archtype 插件。利用此插件,运行时输入模板变量值就可以生产新的微服务代码了。

7.接口文档编写与更新

参考swagger-maven-plugin,swagger可以利用maven插件,解析接口的类名、方法名、参数等,生成一个可以部署的接口文档站点。

进一步的,代码可以利用JSR 303 Bean Validation的注解,标注参数的要求。插件解析这些注解,生产的文档包含完整的参数要求信息,如是否必填、类型、最大最小值、长度、可选枚举值等。

不同于swagger-maven-plugin,我们也可以把解析的接口信息统一上报到接口文档管理系统,接口管理系统保留每个接口的url、参数要求、所属服务及版本、变更历史,提供分组和搜索功能。利用插件自动解析和生产接口文档,降低手动维护接口文档的痛苦和出错机会。

8.接口发布到网关

原理类似利用maven插件接口接口生成接口文档,只不过这里是把接口信息提交给网关的接口配置管理中心,降低维护网关接口配置的痛苦

HBaseWD:通过顺序RowKey避免HBase RegionServer热点问题

在看Pinpoint源码时,发现hbaseOperations2.findParallelRowKeyDistributorByHashPrefix,似乎是RowKey做了哈希打散。后来发现是来自https://github.com/sematext/HBaseWD,用于解决HBase写时由于顺序RowKey导致某个Region成为热点。采用了给RowKey加前缀,把写分散到不同Region,同时分区后的新RowKey还是保持了原有的顺序,对并行Scan的性能没有影响。

原文地址:
https://sematext.com/blog/hbasewd-avoid-regionserver-hotspotting-despite-writing-records-with-sequential-keys/
译文来源:
https://blog.csdn.net/jiangmingzhi23/article/details/78575757

在HBase领域,RegionServer热点是一个共性问题。用一句话来描述HBase热点:以顺序RowKey记录数据时,可以通过startRowkey和endRowKey区间最高效地读取数据,但是这种顺序写入却会不可避免地产生RegionServer热点。接下来两部分我们将讨论并告诉你如何避免这个问题。

问题描述

Hbase中的记录是按照字典顺序存储的。因此可以通过确定的RowKey快速找到某个记录,或者通过start RowKey和end RowKey作为区间以快速查询RowKey在这个区间的记录。所以你可能会认为,以顺序RowKey记录数据,然后可以快速通过上面的方法查询某个区间的记录的方式一个不出错的主意。例如我们可能会希望每个记录都与时间戳有关,以便我们可以通过时间区间查询一段时间的记录,这样的RowKey的例子如下:
  • 基于时间戳的格式:Long.MAX_VALUE – new Date().getTime()
  • 递增/递减序列:”001”, ”002”, ”003”,… or ”499”, ”498”, ”497”, …
但是这种幼稚的RowKey会导致RegionServer写热点。

RegionServer Hotspotting

以顺序RowKey记录数据到HBase中时,所有的写操作会命中一个region。但是如果多个RegionServer服务同一个region时,这个问题就不存在了,不过实际并非如此,一个region通常只在一个RegionServer上。每个region都会预定义大小,当一个region达到最大大小时,就会被分割成两个小的region。然后两个较小的region中的一个负责记录所有的新写入的记录,这个region和它说在的RegionServer就成了新的写热点。显然,这种不均匀地写入是需要避免的,因为它将写操作的性能限制到单个RegionServer的写入能力,而没有将集群中所有RegionServer的性能释放出来。这种负载不均匀地写入操作可以从下图一看出:

hbasewd-pic1

可以看出其中一台服务器满负荷运行以写入记录,但是其他服务器却处于空闲状态。HBase的RegionServer热点问题更多的信息可以参考HBase官方文档

解决方案

如何解决这个问题呢?本文讨论的前提是我们不是一次性批量把所有数据写入HBase,而是以数据流的方式不断持续达到。批量数据导入HBase时如何避免热点的问题在HBase文档中有相关的最佳解决方案。但是,如果你和我们一样,数据持续流入并且需要处理和存储,那么解决热点的最简单的方案就是通过随机RowKey将数据流分发到不同的Region中。然而不幸的是,这样会牺牲通过start RowKey和end RowKey快速检索区间数据的能力。这个在HBase mail list和其它地方多次提及的解决方案就是为RowKey增加前缀。例如可以考虑通过下面的方式构造Rowkey:
new_row_key = (++index % BUCKETS_NUMBER) + original_key
这种方式构造的RowKey,以我们可见的数据类型方式展示如下图2示:

hbasewd-pic2

这里:
  • index是我们用于特定记录的RowID的数字部分,例如1,2,3,4,
  • BUCKETS_NUMBER是我们希望新构建的RowKey想要分发到不同的‘桶’的数量。每一个‘桶’内,数据保持着他们原始ID记录的顺序。
  • original_key是写入数据的原始主键。
  • new_row_key是数据写入HBase中的实际RowKey(即distributed key或者prefixed key)。后文中,distributed records用来表示通过distributed key写入的记录。
所以,新的记录将被分发到不同的‘桶’中,被HBase集群的不同RegionServer处理入库。新写入的记录的RowKey不再是顺序序列,但是在每一个‘桶’中,它们依然保持着原始的字典顺序。当然,如果你开始写数据到一个空的HBase表,在这个表被分割成多个region前你可能要等一段时间,等待的时长取决于流入数据大的大小和速度、压缩比以及region的大小。提示:通过HBase region预分割特效,可以避免这个等待。通过上面的方案写数据到不同的region,你的HBase节点负载看起来就好看多了:

Scan操作

数据在写入过程中被分发到不同的‘桶’中,因此我们可以通过基于start RowKey和end RowKey的scan操作从多个‘桶’中提取数据,并且保证数据的原始排序状态。这也意味着BUCKETS_NUMBER个scan操作可能会影响性能。但是幸运的是这些scan操作可以并行,所以性能不至于降低,甚至会有所提高。对比一下,从一个region中读取100K数据,和从10个region中并行的读取10K数据,哪个更快?

hbasewd-pic3

Get/Delete

对单条记录进行Get/Delete操作,操作复杂度为O(1)到O(BUCKETS_NUMBER)。 例如。 当使用“静态”散列作为前缀时,给定原始RowKey,就可以精确地识别prefised rowkey。 如果我们使用随机前缀,我们将不得不在每个可能的‘桶’中执行Get操作。Delete操作也是如此。

MapReduce Input

我们仍旧希望从数据本身出发,因此将distributed record提供给MapReduce作业可能会打乱数据到达mapper的顺序。至少在HBaseWD的实现中,这个问题是存在的。每个map task处理特定的‘桶’中的数据,所以,数据处理的顺序将于它们在‘桶’中的原始顺序一致。然而由于两个原始RowKey相邻的记录可能被分发存储到不同的‘桶’中,它们将会被分配到不同的map task。因此如果mapper认为数据严格的按照其原始顺序流入,我们则很受伤,因为数据只在每个桶保证原始顺序,并非全局保证顺序。

Increased Number of Map Tasks

当我们以上述方案的数据提供给MapReduce作业时,‘桶’的数目可能会增加。在HBaseWD的实现中,与使用相同参数的常规MapReduce作业相比,你需要进行BUCKETS_NUMBER倍分割。这与前面讨论的Get操作的逻辑相似。所以(HBaseWD的实现中,)MapReduce作业需要有BUCKETS_NUMBER倍的map task。如果BUCKETS_NUMBER不大,理论上性能不会降低,当然,MapReduce作业本身的初始化和清理工作需要更多的时间。而且在很多情况下,更多的mapper可以提升性能。很多用户报告指出,基于标准的HBase Tbase输入的MapReduce作业的map task数目过少(每个region对应一个map task),所以(我们的实现)可以不需要额外编码就可以对MapReduce产生积极作用。
如果在你的应用中,除了按顺序Rowkey写入数据到HBase,还需要通过MapReduce持续的处理新数据的增量,那么本文建议的方案及其实现很可能会有所帮助。在这种情况下,数据持续频繁写入,增量处理只会位于少数region中,或者如果写负载不高时,(增量处理)只会在一个region中,亦或者如果最大region大小很大时,批量处理会很频繁。

方案实现: HBaseWD

我们实现了上述方案,并且将之作为一个叫做HBaseWD的项目开源。由于HBaseWD支持HBase本地客户端API,所以它实际上是独立的,而且很容易集成到现有的项目代码中。HBaseWD项目首次在

Configuring Distribution

Simple Even Distribution

使用顺序RowKey分发记录,最多分发到Byte.MAX_VALUE个‘桶’中,(通过在原始Rowkey前添加一个字节的前缀):
byte bucketsCount = (byte) 32; // distributing into 32 buckets
RowKeyDistributor keyDistributor =  new RowKeyDistributorByOneBytePrefix(bucketsCount);
Put put = new Put(keyDistributor.getDistributedKey(originalKey));
... // add values
hTable.put(put);

Hash-Based Distribution

另一个有用的RowKey分发器是RowKeyDistributorByHashPrefix。参考下面的示例。它通过原始RowKey创建distributed key,如果稍后希望通过原始Rowkey更新记录时,可以直接计算出distributed key,而无需调用HBase,也无需知道记录在哪个‘桶’中。或者,你可以在知道原始Rowkey的情况下通过Get操作获取记录,而无需到所有的‘桶’中寻找。
AbstractRowKeyDistributor keyDistributor =
     new RowKeyDistributorByHashPrefix(
            new RowKeyDistributorByHashPrefix.OneByteSimpleHash(15));
你亦可以通过实现下面的接口以使用你自己的hash逻辑:
public static interface Hasher extends Parametrizable {
  byte[] getHashPrefix(byte[] originalKey);
  byte[][] getAllPossiblePrefixes();
}

自定义分发逻辑

HBaseWD的设计灵活,特别是在支持自定义distributed key方法时。 除了上面提到的用于实现用于RowKeyDistributorByHashPrefix的定制哈希逻辑的功能之外,还可以通过扩展AbstractRowKeyDistributor抽象类来定义自己的RowKey分发逻辑,该类的接口非常简单:
public abstract class AbstractRowKeyDistributor implements Parametrizable {
  public abstract byte[] getDistributedKey(byte[] originalKey);
  public abstract byte[] getOriginalKey(byte[] adjustedKey);
  public abstract byte[][] getAllDistributedKeys(byte[] originalKey);
  ... // some utility methods
}

Common Operations

Scan

对数据执行基于范围的scan操作:
Scan scan = new Scan(startKey, stopKey);
ResultScanner rs = DistributedScanner.create(hTable, scan, keyDistributor);
for (Result current : rs) {
  ...
}

Configuring MapReduce Job

通过scan操作在指定的数据块上自习MapReduce作业:
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "testMapreduceJob");
Scan scan = new Scan(startKey, stopKey);
TableMapReduceUtil.initTableMapperJob("table", scan,
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
// Substituting standard TableInputFormat which was set in
// TableMapReduceUtil.initTableMapperJob(...)
job.setInputFormatClass(WdTableInputFormat.class);
keyDistributor.addInfo(job.getConfiguration());

网络编程总结

同步/异步/阻塞/非阻塞/多路复用

QQ20180606-234210@2x

设计模式(套路)

QQ20180606-232414@2x

并发服务端程式设计范式(9种)

QQ20180606-232736@2x

经典开源组件

QQ20180606-232750@2x

快速体验HBase

standalone运行hbase

下载,并解压

# wget http://mirror.reverse.net/pub/apache/hbase/2.0.0/hbase-2.0.0-bin.tar.gz
# tar xzvf hbase-2.0.0-bin.tar.gz -C /opt
# cd /opt/hbase-2.0.0/

配置

# vi conf/hbase-site.xml

  
    hbase.rootdir
    file:///Users/wingyiu/data/hbase
  
  
    hbase.zookeeper.property.dataDir
    /Users/wingyiu/data/zookeeper
  
  
    hbase.unsafe.stream.capability.enforce
    false
    
      Controls whether HBase will check for stream capabilities (hflush/hsync).

      Disable this if you intend to run on LocalFileSystem, denoted by a rootdir
      with the 'file://' scheme, but be mindful of the NOTE below.

      WARNING: Setting this to false blinds you to potential data loss and
      inconsistent system state in the event of process and/or node failures. If
      HBase is complaining of an inability to use hsync or hflush it's most
      likely not a false positive.
    
  

启动

# bin/start-hbase.sh
# jps
22660 HMaster
22681 Jps

访问Hbase Web UI,在浏览器打开http://localhost:16010
huaban (9)

命令客户端Shell体验

连接

$ ./bin/hbase shell
hbase(main):001:0>

创建table和列族

hbase(main):001:0> create 'test', 'cf'
0 row(s) in 0.4170 seconds

=> Hbase::Table - test

hbase(main):002:0> list 'test'
TABLE
test
1 row(s) in 0.0180 seconds

=> ["test"]

插入Row

hbase(main):003:0> put 'test', 'row1', 'cf:a', 'value1'
0 row(s) in 0.0850 seconds

hbase(main):004:0> put 'test', 'row2', 'cf:b', 'value2'
0 row(s) in 0.0110 seconds

hbase(main):005:0> put 'test', 'row3', 'cf:c', 'value3'
0 row(s) in 0.0100 seconds

遍历表

hbase(main):006:0> scan 'test'
ROW                                      COLUMN+CELL
 row1                                    column=cf:a, timestamp=1421762485768, value=value1
 row2                                    column=cf:b, timestamp=1421762491785, value=value2
 row3                                    column=cf:c, timestamp=1421762496210, value=value3
3 row(s) in 0.0230 seconds

SSO和CAS

SSO(Single sign-on,单点登录),就是只登录一次(只提供一次凭证,如账号密码),就可以畅通无阻的访问平台的多个应用/服务。不应该把SSO和OAuth等授权协议混淆,OAuth协议要求在登录不同系统是都要进行认证和授权,而SSO只认证一次。OAuth本质是解决授权问题。

要实现SSO有很多方案。CAS(Central Authentication Service,集中式认证服务)就是其中一种。最初的CAS由Yale实现,现在CAS协议的实现有很多。CAS协议已经发展到了3.0版本。

一下对CAS协议的描述来自apereo CAS实现的文档。

CAS protocol

The CAS protocol is a simple and powerful ticket-based protocol developed exclusively for CAS. A complete protocol specification may be found here.

It involves one or many clients and one server. Clients are embedded in CASified applications (called “CAS services”) whereas the CAS server is a standalone component:

  • The CAS server is responsible for authenticating users and granting accesses to applications
  • The CAS clients protect the CAS applications and retrieve the identity of the granted users from the CAS server.

The key concepts are:

  • The TGT (Ticket Granting Ticket), stored in the CASTGC cookie, represents a SSO session for a user
  • The ST (Service Ticket), transmitted as a GET parameter in urls, stands for the access granted by the CAS server to the CASified application for a specific user.

交互流程

cas_flow_diagram

从上图可以看出,web的CAS利用到了session cookie和ticket令牌。不难分析得出CASTGC用于保持浏览器到CAS server的会话,第二次访问不需要再提供凭证。利用GET参数传递ST使得认证结果可以在不同域名之间传递。浏览器和受保护应用之间还有一个session cookie,保持了两者之间的会话,使得第二次直接访问应用也不需要提供凭证,应用也不需要再去CAS认证对令牌进行认证。

redis+lua实现Token Bucket限流

在做网关限流时,经过研究后决定使用redis+lua实现Token Bucket算法做分布式限流。

Token Bucket算法要求匀速放入令牌,用定时器可以实现。但是网关的来源IP太多,桶很多,需要很多线程加定时器才能实时处理得过来,不是一个好办法。
换个思路,由于生产token是匀速的,且取token是有时间差的,可以利用两次取token的时间差计算出间隔间应该产生的token数量。原有token+新产生token数-要获取的token数,就是桶里剩余的token。因此对于一个桶,要记录剩余token数,上次取token时间戳。获取token是要传入当前时间戳,要获取token数量(默认1)。在修改桶的两个信息时,需要是原子操作,而且获取令牌过程中带有计算逻辑。
redis是以单线程串行处理请求,在处理EVAL命令时是一个原子操作。因此使用redis+lua脚本刚好可以满足上述要求。

脚本如下

-- bucket name
local key = KEYS[1]
-- token generate interval
local intervalPerPermit = tonumber(ARGV[1])
-- grant timestamp
local refillTime = tonumber(ARGV[2])
-- limit token count
local limit = tonumber(ARGV[3])
-- ratelimit time period
local interval = tonumber(ARGV[4])

local counter = redis.call('hgetall', key)

if table.getn(counter) == 0 then
    -- first check if bucket not exists, if yes, create a new one with full capacity, then grant access
    redis.call('hmset', key, 'lastRefillTime', refillTime, 'tokensRemaining', limit - 1)
    -- expire will save memory
    redis.call('expire', key, interval)
    return 1
elseif table.getn(counter) == 4 then
    -- if bucket exists, first we try to refill the token bucket
    local lastRefillTime, tokensRemaining = tonumber(counter[2]), tonumber(counter[4])
    local currentTokens
    if refillTime > lastRefillTime then
        -- check if refillTime larger than lastRefillTime.
        -- if not, it means some other operation later than this call made the call first.
        -- there is no need to refill the tokens.
        local intervalSinceLast = refillTime - lastRefillTime
        if intervalSinceLast > interval then
            currentTokens = limit
            redis.call('hset', key, 'lastRefillTime', refillTime)
        else
            local grantedTokens = math.floor(intervalSinceLast / intervalPerPermit)
            if grantedTokens > 0 then
                -- ajust lastRefillTime, we want shift left the refill time.
                local padMillis = math.fmod(intervalSinceLast, intervalPerPermit)
                redis.call('hset', key, 'lastRefillTime', refillTime - padMillis)
            end
            currentTokens = math.min(grantedTokens + tokensRemaining, limit)
        end
    else
        -- if not, it means some other operation later than this call made the call first.
        -- there is no need to refill the tokens.
        currentTokens = tokensRemaining
    end
    
    assert(currentTokens >= 0)

    if currentTokens == 0 then
        -- we didn't consume any keys
        redis.call('hset', key, 'tokensRemaining', currentTokens)
        return 0
    else
        -- we take 1 token from the bucket
        redis.call('hset', key, 'tokensRemaining', currentTokens - 1)
        return 1
    end
else
    error("Size of counter is " .. table.getn(counter) .. ", Should Be 0 or 4.")
end

调用这个脚本需要4个参数,其中key是桶的名字,intervalPerPermit是产生令牌的时间间隔,refillTime是调用脚本的即时时间戳,limit是最大间隔访问次数,interval桶的限流间隔。key生产规则可以是”ratelimit:$IP:$PATH:$INTERVAL”。当token不存在时,默认创建一个带有最大token数量的桶。当超过interval都没有访问桶时,对该通的缓存会过期,减少内存使用量。返回1代表成功获取一个令牌,返回0代表未获取到令牌(此次访问应该被限制)。

假设要限制某个IP对某个接口每秒钟最多访问2次,则
interval = 1000(毫秒)
limit = 2
intervalPerPermit = interval/limit = 500
key=”ratelimit:127.0.0.1:/user/get:1000″

命令行调用

redis-cli -h localhost -p 6379 -a wingyiu --ldb --eval ~/Git/api-gateway/src/main/resources/ratelimit_token_bucket.lua ratelimit:127.0.0.1:/user/get:1000 , 500 1520402606803 2 1000

网关使用spring cloud netflix zuul,可以编写一个filter,作为限流功能的实现,调用redis,执行上述Lua脚本

package com.yunzhijia.platform.api.gateway.zuul.filter;

import com.google.common.net.InetAddresses;
import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import com.xxx.gateway.vo.SimpleRouteVo;
import com.xxx.gateway.common.constants.CacheConstants;
import com.xxx.gateway.common.constants.ErrorMsgConstants;
import com.xxx.gateway.exception.GatewayException;
import com.xxx.gateway.utils.RedisTemplateUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import java.util.Collections;

/**
 * 目前仅实现IP+URI的限流
 * 基于令牌桶算法
 * 借助于redis+lua实现对令牌桶的原子操作
 */
@Component
public class RateLimitPreFilter extends ZuulFilter {

	protected static final Logger LOG = LoggerFactory.getLogger(RateLimitPreFilter.class);

	@Override
	public String filterType() {
		return "pre";
	}

	@Override
	public int filterOrder() {
		return 4;
	}

	@Override
	public boolean shouldFilter() {
		return true;
	}

	@Autowired
	RedisScript redisScript;

	@Override
	public Object run() {
		RequestContext ctx = RequestContext.getCurrentContext();
		HttpServletRequest request = ctx.getRequest();

		String uri = request.getRequestURI();
		LOG.info("开始限流检查:{}", uri);
		// 获取ip
		String ip = request.getHeader("X-Forwarded-For");
		if (StringUtils.isBlank(ip)) {
			ip = request.getRemoteAddr();
		}
		ip = ip.split(",")[0];
		LOG.info("remote ip: {}",  ip);

		if (isSiteLocalAddress(ip)) {
			LOG.info("私有地址跳过: {}", ip);
			return null;
		}

		SimpleRouteVo routeVo = (SimpleRouteVo) ctx.get("methodDefinition");

		if (routeVo.getReqPerSec() <= 0 && routeVo.getReqPerMin() <= 0) {
			// 不需要限流
			LOG.info("接口:{} 不限流", uri);
			return null;
		}

		if (routeVo.getReqPerSec() > 0) {
			boolean canAccess = access(ip, uri, 1000, routeVo.getReqPerSec());
			if (!canAccess) {
				LOG.info("被限流: ip={}, uri={}, interval=sec, limit={}", ip, uri, routeVo.getReqPerSec());
				ctx.setSendZuulResponse(false);
				// raise exception to jump to error filter
				throw new GatewayException(ErrorMsgConstants.TOO_MANY_REQUESTS);
			}
		}

		if (routeVo.getReqPerMin() > 0) {
			boolean canAccess = access(ip, uri, 60000, routeVo.getReqPerMin());
			if (!canAccess) {
				LOG.info("被限流: ip={}, uri={}, interval=min, limit={}", ip, uri, routeVo.getReqPerMin());
				ctx.setSendZuulResponse(false);
				// raise exception to jump to error filter
				throw new GatewayException(ErrorMsgConstants.TOO_MANY_REQUESTS);
			}
		}

		return null;
	}

	private boolean isSiteLocalAddress(String ip) {
		// refer to RFC 1918
		// 10/8 prefix
		// 172.16/12 prefix
		// 192.168/16 prefix
		int address = InetAddresses.coerceToInteger(InetAddresses.forString(ip));
		return (((address >>> 24) & 0xFF) == 10)
				|| ((((address >>> 24) & 0xFF) == 172)
				&& ((address >>> 16) & 0xFF) >= 16
				&& ((address >>> 16) & 0xFF) <= 31)
				|| ((((address >>> 24) & 0xFF) == 192)
				&& (((address >>> 16) & 0xFF) == 168));
	}

	public boolean access(String ip, String uri, long intervalInMills, long limit) {
		LOG.info(redisScript.getSha1());
		String key = genKey(ip, uri, intervalInMills, limit);
		key = CacheConstants.genKey(key);
		long intervalPerPermit = intervalInMills / limit;

		try {
			RedisTemplate redisTemplate = RedisTemplateUtils.getRedisTemplate();
			Long refillTime = System.currentTimeMillis();
			LOG.info("调用redis执行lua脚本, {} {} {} {} {}", key, String.valueOf(intervalPerPermit), String.valueOf(refillTime),
					String.valueOf(limit), String.valueOf(intervalInMills));
			Long res = (Long)redisTemplate.execute(redisScript, Collections.singletonList(key),
					String.valueOf(intervalPerPermit), String.valueOf(System.currentTimeMillis()),
					//String.valueOf(limit),
					String.valueOf(limit), String.valueOf(intervalInMills));
			LOG.info("调用redis执行lua脚本:{}", res);
			return res == 1L ? true : false;

		} catch (Exception e) {
			LOG.error("调用redis执行lua脚本出错", e);
			return true; // 内部异常,直接放过吧,保证能访问,不计较太多
		}
	}

	private String genKey(String ip, String uri, long intervalInMills, long limit) {
		return String.format("ratelimit:%s:%s:%s", ip, uri, intervalInMills);
	}
}

实现了对[IP,秒,接口],[IP,分钟,接口]两种限流,具体限流大小由管理后台配置。

留个问题:
Redis Cluster模式下,上述代码能正常工作吗?

参考:
https://zhuanlan.zhihu.com/p/20872901
https://github.com/YigWoo/toys/tree/master/src/main/java/com/yichao/woo/ratelimiter

对rate limiting功能的思考

频率限制,应该是针对调用方的限制,同时也对被调用对象的保护。

例如每个IP对`/gateway/ticket/ticket2ctx`接口每秒钟调用次数不超过200次。

故频率定义为: 调用方属性组合 X 被调用对象属性组合 x 次数 x 间隔

调用方属性包括: IP,应用ID等

被调用对象属性:一般是接口

间隔:1秒、1分钟、1小时、1每天

可以在两个地方进行频率限制:
一个是在网关,使用“调用方IP X 接口”
一个是在开放平台,使用“调用方AppId x 接口” | 调用方appId x 接口

不同间隔的限制可以同时使用,如 rate <= 100rp/sec 且 rate <= 3500 rp/min 在逻辑正确及性能方面较符合的算法包括:吊桶算法、循环队列算法、令牌通算法、漏桶算法。

吊桶算法

image2018-2-28 18_45_59
这种算法没有办法保证任意1秒(不是整数开始)都限制n个请求。本质是计数器方式,粗暴,非精确。

循环队列算法

image2018-2-28 18_48_46

维护最近n个请求的时间,当有新请求时,查看一下n个之前的请求的时间,如果大于1秒就放过,小于1秒就拒绝。
缺点,n很大时,空间消耗大。

token bucket算法

image2018-2-28 18_51_9

令牌桶会以一定速率产生令牌放入桶中,满了以后会丢弃或暂停产生,数据通过时需要持有一个令牌,没有令牌说明超过了速率限制。令牌桶算法允许突发流量,如突然将桶内令牌都消耗完成

漏桶算法

image2018-2-28 18_51_58
漏桶算法中,水以一定速率放到漏桶里,然后漏桶以一定的速率向外流水。所以最大的速率就是出水的速率。不能出现突发流量。

限流可以分为:
1.应用级限流,只是单应用实例内的请求限流
2.分布式限流。

网关部署到多台机器,为了保证正确的限流,需要全局限流,即分布式限流。

限流vs熔断:
限流一般在被调用侧实施,而熔断则在调用侧做。

资料:

http://jinnianshilongnian.iteye.com/blog/2305117