• Archive by category "微服务"

Blog Archives

使用maven解决微服务开发中的一些日常问题

1.多模块

微服务是按业务边界划分的,通常包含一个可供调用的服务和一些异步后台进程、定时任务,这些运行实际不同但又紧密的关联的进程一起完成这个业务的数据处理。因此可以把一个微服务拆分为多个模块,如api定义模块(以dubbo这类rpc微服务为例,通常会把服务接口构建为一个jar包)、服务实现模块、定时任务模块。而服务实现和定时任务会共用一些工具类代码,又可以提取为一个模块。
在工作经历中,见过一些人把服务api放在一个项目里,服务实现放在一个项目里,自动任务放在一个独立的项目里,且三个项目都是独立的git项目。每个模块使用独立的pom.xml,则很多共同的依赖和插件配置就需要写多次,在修改版本时难免出现不一致。
其实利用maven的「继承与聚合」功能,是一种更优雅的多模块组织方式。既可以把多个模块放在一个目录里管理(一个git项目,方便查看完整的变更历史),又解决多个模块依赖和插件管理问题,同时还能解决一条命令同时构建多个模块的问题。

以用户服务为例,项目结构可以组织成如下形式:

user
├── pom.xml
├── user-api
│   ├── pom.xml
│   └── src
│       └── main
│           └── java
└── user-service
    ├── pom.xml
    └── src
        └── main
            ├── java
            └── resources

user目录作为是一个父模块,而user-api和user-service是user模块的子模块,子模块和父模块以嵌套方式组织。
user/pom.xml的内容如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>wingyiu</groupId>
    <artifactId>user</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>pom</packaging>
    <name>User Parent</name>

    <modules>
        <module>user-api</module>
        <module>user-service</module>
    </modules>

    <properties>
        <java.version>1.8</java.version>
        <java.source.version>1.8</java.source.version>
        <java.target.version>1.8</java.target.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>1.5.14.RELEASE</spring-boot.version>
        <dubbo.version>2.6.2</dubbo.version>
        <dubbo.starter.version>0.1.1</dubbo.starter.version>
        <zkclient.version>0.2</zkclient.version>
        <zookeeper.version>3.4.9</zookeeper.version>
        <curator-framework.version>2.12.0</curator-framework.version>
        <!-- Build args -->
        <argline>-server -Xms256m -Xmx512m -XX:PermSize=64m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8
-Djava.net.preferIPv4Stack=true
        </argline>
        <arguments/>

        <!-- Maven plugins -->
        <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
        <maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>
        <maven-source-plugin.version>3.0.1</maven-source-plugin.version>
        <maven-jacoco-plugin.version>0.8.1</maven-jacoco-plugin.version>
        <maven-gpg-plugin.version>1.5</maven-gpg-plugin.version>
        <apache-rat-plugin.version>0.12</apache-rat-plugin.version>
        <maven-release-plugin.version>2.5.3</maven-release-plugin.version>
        <maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <!-- Spring Boot -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot</artifactId>
                <version>${spring-boot.version}</version>
            </dependency>
            ...省略
        </dependencies>
    </dependencyManagement>

    <repositories>
        <repository>
            <id>nexus-snapshot</id>
            <url>http://192.168.0.22/nexus/content/repositories/gayhub-snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>false</enabled>
            </releases>
        </repository>
        <repository>
            <id>nexus-release</id>
            <url>http://192.168.0.22/nexus/content/repositories/gayhub-release</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        <!-- Repositories to allow snapshot and milestone BOM imports during development.
            This section is stripped by the flatten plugin during install/deploy. -->
        <repository>
            <id>central</id>
            <url>https://repo.maven.apache.org/maven2</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        ...省略
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>central</id>
            <url>https://repo.maven.apache.org/maven2</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
        ...省略
    </pluginRepositories>

</project>

其中
<packaging>pom</packaging>
表明这是一个聚合模块,

<modules>
    <module>user-api</module>
    <module>user-service</module>
</modules>

modules元素里声明了两个聚合模块。
从聚合模块运行man clean install时,maven会首先解析觉和模块的POM,分析要构建的模块,并计算出一个反应堆构建顺序,然后依次构建各个模块。即实现了一次构建多个模块。
properties原理声明了共用的一次属性,子模块可以通过${spring-boot.version}这样的形式引用。

<dependencyManagement>
    <dependencies>
    </dependencies>
</dependencyManagement>

声明了共同的依赖,子模块可以继承这些依赖配置,dependencyManagement下生命的依赖不会实际引入,不过它能约束子模块dependencies下的依赖使用。同理

<build>
    <pluginManagement>
        <plugins>

帮助子模块进行插件管理。

子模块user-api的pom.xml内容如下


<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>wingyiu</groupId>
        <artifactId>user</artifactId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>user-api</artifactId>
    <packaging>jar</packaging>
</project>

其中, 元素指明了其父模块为user。不难发现user的POM,既是聚合POM,又是父POM,实际项目中经常如此。子POM并没有声明groupId和version,因为子模块隐式地从父模块继承了这两个元素。由于user-api仅包含以下interface定义,DO定义,没有依赖,因此pom文件没有任何dependency。模块打包方式为默认的jar。


<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>user</artifactId>
        <groupId>wingyiu</groupId>
        <version>1.0.0-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>

    <artifactId>user-service</artifactId>
    <packaging>jar</packaging>

    <properties>
        <micrometer.version>1.0.5</micrometer.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
        </dependency>

        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-prometheus</artifactId>
            <version>${micrometer.version}</version>
        </dependency>

    </dependencies>
</project>

同理user-service的pom中parent指明了父模块为user。特别的

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
        </dependency>

并没有制定version,还剩去了依赖范围。这是因为完整的依赖声明已经包含在父POM中,子模块只需配置groupId和artifactId就能获得对应的依赖信息。这种依赖管理机制似乎不能减少太多pom配置,但是能够统一项目范围的依赖版本。有父模块统一声明version,子模块不声明version,就不会发生过个子模块使用的依赖版本不一致的情况,这可以帮助降低依赖冲突的几率。

2.只构建并发布api包

为了加快项目进度,通常服务提供方和调用方会先定好接口定义(包括接口名,参数),调用方先以约定好的接口进行逻辑编写,等提供方正在实现好之后再进行联调。在这种情况下,就会要求先发布jar包。
此时可以利用maven的反应堆裁剪功能。反应堆是指所有模块组成的一个构建结构。对于多模块项目,反应堆包含了模块之间继承与依赖关系,从而能够自动计算出合理的模块构建顺序。模块间的依赖关系会将反应堆构成一个有向非循环图(DAG)。
默认情况,在聚合模块执行mvn clean install会执行整个反应堆,maven提供了很多命令行选项支持裁剪反应堆:
– -am, –also-make,同时构建所列模块的依赖模块
– -amd, –also-make-dependents,同时构建依赖于所列模块的模块
– -pl,–projects 构建指定的模块,模块间用逗号分隔
– -rf, –resume-from 从指定的模块恢复反应堆

因此要单独构建api模块,可以运行如下命令
mvn clean install -pl user-api

3.开发中的接口不断地改变

这种情形很常见,一个开发中的接口,参数不断的调整,此时调用方也要不断的修改,即调用方需要不断的更新依赖服务的api jar包。一般地,maven把依赖从远程仓库拉下来后会进行缓存。因此经常会出现要手动删除本地缓存包,或者增加版本号,但服务还在开发不断增加版本号显然不合适。
maven提供了更加简单有效的解决方法:SNAPSHOT,快照版本。当依赖版本为快照版本的时候,maven会自动找到最新的快照。即每次都会去仓库拉去元数据,计算真实的时间戳格式版本号,然后拉取对应的jar,保证本地使用到最新的jar。

注:默认情况下,maven每天检查一次更新(有仓库udatePolicy控制),用户也可以用-U参数强制检查更新,如mvn clean install -U

因此,快发中,我们可以把api模块(上例user-api)的版本号设为1.0.0-SNAPSHOT

发布过程中,maven自动为构件打上时间戳,如1.0.0-20180720.221414-13。

4.更新所有模块的版本号

当旧版本a.b.c已经发布,要进行下一轮迭代时,要把父模块版本号改为a.b.d-SNAPSHOT,测试通过后正式发布,要改为正式版本号a.b.d。由于每个子模块的<parent><version>都要于父POM一致,手动修改每个pom.xml显然很容易出错。

可以使用org.codehaus.mojo:versions-maven-plugin插件

5.使用企业内部私有maven仓库

假设以nexus搭建私有仓库,snapshot版本和正式版本的构件发布到不同仓库,可以在父POM中配置repository

<repositories>
        <repository>
            <id>nexus-snapshot</id>
            <url>http://192.168.0.22/nexus/content/repositories/gayhub-snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>false</enabled>
            </releases>
        </repository>
        <repository>
            <id>nexus-release</id>
            <url>http://192.168.0.22/nexus/content/repositories/gayhub-release</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
</repositories>

<repository><id>是和$MAVEN/settings.xml中的<server><id>对应的,<server>配置repository的账号和密码。

出于规范考虑,开发人员可以自由提交snapshot版本构件,正式版本构件只能在测试和审核后才能由发布系统进行提交。因此release仓库的密码不宜开放给开发人员,开发人员只能得到snapshot仓库的账号密码。

6.快速开发新的微服务

可以利用maven的archtype功能。首先根据代码规范,编写一个包含基本的、可复用逻辑的微服务,以此作为模板,把模板项目代码中的一些类名、变量名、文件名使用模板变量替换,然后编写自定义archtype 插件。利用此插件,运行时输入模板变量值就可以生产新的微服务代码了。

7.接口文档编写与更新

参考swagger-maven-plugin,swagger可以利用maven插件,解析接口的类名、方法名、参数等,生成一个可以部署的接口文档站点。

进一步的,代码可以利用JSR 303 Bean Validation的注解,标注参数的要求。插件解析这些注解,生产的文档包含完整的参数要求信息,如是否必填、类型、最大最小值、长度、可选枚举值等。

不同于swagger-maven-plugin,我们也可以把解析的接口信息统一上报到接口文档管理系统,接口管理系统保留每个接口的url、参数要求、所属服务及版本、变更历史,提供分组和搜索功能。利用插件自动解析和生产接口文档,降低手动维护接口文档的痛苦和出错机会。

8.接口发布到网关

原理类似利用maven插件接口接口生成接口文档,只不过这里是把接口信息提交给网关的接口配置管理中心,降低维护网关接口配置的痛苦

redis+lua实现Token Bucket限流

在做网关限流时,经过研究后决定使用redis+lua实现Token Bucket算法做分布式限流。

Token Bucket算法要求匀速放入令牌,用定时器可以实现。但是网关的来源IP太多,桶很多,需要很多线程加定时器才能实时处理得过来,不是一个好办法。
换个思路,由于生产token是匀速的,且取token是有时间差的,可以利用两次取token的时间差计算出间隔间应该产生的token数量。原有token+新产生token数-要获取的token数,就是桶里剩余的token。因此对于一个桶,要记录剩余token数,上次取token时间戳。获取token是要传入当前时间戳,要获取token数量(默认1)。在修改桶的两个信息时,需要是原子操作,而且获取令牌过程中带有计算逻辑。
redis是以单线程串行处理请求,在处理EVAL命令时是一个原子操作。因此使用redis+lua脚本刚好可以满足上述要求。

脚本如下

-- bucket name
local key = KEYS[1]
-- token generate interval
local intervalPerPermit = tonumber(ARGV[1])
-- grant timestamp
local refillTime = tonumber(ARGV[2])
-- limit token count
local limit = tonumber(ARGV[3])
-- ratelimit time period
local interval = tonumber(ARGV[4])

local counter = redis.call('hgetall', key)

if table.getn(counter) == 0 then
    -- first check if bucket not exists, if yes, create a new one with full capacity, then grant access
    redis.call('hmset', key, 'lastRefillTime', refillTime, 'tokensRemaining', limit - 1)
    -- expire will save memory
    redis.call('expire', key, interval)
    return 1
elseif table.getn(counter) == 4 then
    -- if bucket exists, first we try to refill the token bucket
    local lastRefillTime, tokensRemaining = tonumber(counter[2]), tonumber(counter[4])
    local currentTokens
    if refillTime > lastRefillTime then
        -- check if refillTime larger than lastRefillTime.
        -- if not, it means some other operation later than this call made the call first.
        -- there is no need to refill the tokens.
        local intervalSinceLast = refillTime - lastRefillTime
        if intervalSinceLast > interval then
            currentTokens = limit
            redis.call('hset', key, 'lastRefillTime', refillTime)
        else
            local grantedTokens = math.floor(intervalSinceLast / intervalPerPermit)
            if grantedTokens > 0 then
                -- ajust lastRefillTime, we want shift left the refill time.
                local padMillis = math.fmod(intervalSinceLast, intervalPerPermit)
                redis.call('hset', key, 'lastRefillTime', refillTime - padMillis)
            end
            currentTokens = math.min(grantedTokens + tokensRemaining, limit)
        end
    else
        -- if not, it means some other operation later than this call made the call first.
        -- there is no need to refill the tokens.
        currentTokens = tokensRemaining
    end
    
    assert(currentTokens >= 0)

    if currentTokens == 0 then
        -- we didn't consume any keys
        redis.call('hset', key, 'tokensRemaining', currentTokens)
        return 0
    else
        -- we take 1 token from the bucket
        redis.call('hset', key, 'tokensRemaining', currentTokens - 1)
        return 1
    end
else
    error("Size of counter is " .. table.getn(counter) .. ", Should Be 0 or 4.")
end

调用这个脚本需要4个参数,其中key是桶的名字,intervalPerPermit是产生令牌的时间间隔,refillTime是调用脚本的即时时间戳,limit是最大间隔访问次数,interval桶的限流间隔。key生产规则可以是”ratelimit:$IP:$PATH:$INTERVAL”。当token不存在时,默认创建一个带有最大token数量的桶。当超过interval都没有访问桶时,对该通的缓存会过期,减少内存使用量。返回1代表成功获取一个令牌,返回0代表未获取到令牌(此次访问应该被限制)。

假设要限制某个IP对某个接口每秒钟最多访问2次,则
interval = 1000(毫秒)
limit = 2
intervalPerPermit = interval/limit = 500
key=”ratelimit:127.0.0.1:/user/get:1000″

命令行调用

redis-cli -h localhost -p 6379 -a wingyiu --ldb --eval ~/Git/api-gateway/src/main/resources/ratelimit_token_bucket.lua ratelimit:127.0.0.1:/user/get:1000 , 500 1520402606803 2 1000

网关使用spring cloud netflix zuul,可以编写一个filter,作为限流功能的实现,调用redis,执行上述Lua脚本

package com.yunzhijia.platform.api.gateway.zuul.filter;

import com.google.common.net.InetAddresses;
import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import com.xxx.gateway.vo.SimpleRouteVo;
import com.xxx.gateway.common.constants.CacheConstants;
import com.xxx.gateway.common.constants.ErrorMsgConstants;
import com.xxx.gateway.exception.GatewayException;
import com.xxx.gateway.utils.RedisTemplateUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import java.util.Collections;

/**
 * 目前仅实现IP+URI的限流
 * 基于令牌桶算法
 * 借助于redis+lua实现对令牌桶的原子操作
 */
@Component
public class RateLimitPreFilter extends ZuulFilter {

	protected static final Logger LOG = LoggerFactory.getLogger(RateLimitPreFilter.class);

	@Override
	public String filterType() {
		return "pre";
	}

	@Override
	public int filterOrder() {
		return 4;
	}

	@Override
	public boolean shouldFilter() {
		return true;
	}

	@Autowired
	RedisScript redisScript;

	@Override
	public Object run() {
		RequestContext ctx = RequestContext.getCurrentContext();
		HttpServletRequest request = ctx.getRequest();

		String uri = request.getRequestURI();
		LOG.info("开始限流检查:{}", uri);
		// 获取ip
		String ip = request.getHeader("X-Forwarded-For");
		if (StringUtils.isBlank(ip)) {
			ip = request.getRemoteAddr();
		}
		ip = ip.split(",")[0];
		LOG.info("remote ip: {}",  ip);

		if (isSiteLocalAddress(ip)) {
			LOG.info("私有地址跳过: {}", ip);
			return null;
		}

		SimpleRouteVo routeVo = (SimpleRouteVo) ctx.get("methodDefinition");

		if (routeVo.getReqPerSec() <= 0 && routeVo.getReqPerMin() <= 0) {
			// 不需要限流
			LOG.info("接口:{} 不限流", uri);
			return null;
		}

		if (routeVo.getReqPerSec() > 0) {
			boolean canAccess = access(ip, uri, 1000, routeVo.getReqPerSec());
			if (!canAccess) {
				LOG.info("被限流: ip={}, uri={}, interval=sec, limit={}", ip, uri, routeVo.getReqPerSec());
				ctx.setSendZuulResponse(false);
				// raise exception to jump to error filter
				throw new GatewayException(ErrorMsgConstants.TOO_MANY_REQUESTS);
			}
		}

		if (routeVo.getReqPerMin() > 0) {
			boolean canAccess = access(ip, uri, 60000, routeVo.getReqPerMin());
			if (!canAccess) {
				LOG.info("被限流: ip={}, uri={}, interval=min, limit={}", ip, uri, routeVo.getReqPerMin());
				ctx.setSendZuulResponse(false);
				// raise exception to jump to error filter
				throw new GatewayException(ErrorMsgConstants.TOO_MANY_REQUESTS);
			}
		}

		return null;
	}

	private boolean isSiteLocalAddress(String ip) {
		// refer to RFC 1918
		// 10/8 prefix
		// 172.16/12 prefix
		// 192.168/16 prefix
		int address = InetAddresses.coerceToInteger(InetAddresses.forString(ip));
		return (((address >>> 24) & 0xFF) == 10)
				|| ((((address >>> 24) & 0xFF) == 172)
				&& ((address >>> 16) & 0xFF) >= 16
				&& ((address >>> 16) & 0xFF) <= 31)
				|| ((((address >>> 24) & 0xFF) == 192)
				&& (((address >>> 16) & 0xFF) == 168));
	}

	public boolean access(String ip, String uri, long intervalInMills, long limit) {
		LOG.info(redisScript.getSha1());
		String key = genKey(ip, uri, intervalInMills, limit);
		key = CacheConstants.genKey(key);
		long intervalPerPermit = intervalInMills / limit;

		try {
			RedisTemplate redisTemplate = RedisTemplateUtils.getRedisTemplate();
			Long refillTime = System.currentTimeMillis();
			LOG.info("调用redis执行lua脚本, {} {} {} {} {}", key, String.valueOf(intervalPerPermit), String.valueOf(refillTime),
					String.valueOf(limit), String.valueOf(intervalInMills));
			Long res = (Long)redisTemplate.execute(redisScript, Collections.singletonList(key),
					String.valueOf(intervalPerPermit), String.valueOf(System.currentTimeMillis()),
					//String.valueOf(limit),
					String.valueOf(limit), String.valueOf(intervalInMills));
			LOG.info("调用redis执行lua脚本:{}", res);
			return res == 1L ? true : false;

		} catch (Exception e) {
			LOG.error("调用redis执行lua脚本出错", e);
			return true; // 内部异常,直接放过吧,保证能访问,不计较太多
		}
	}

	private String genKey(String ip, String uri, long intervalInMills, long limit) {
		return String.format("ratelimit:%s:%s:%s", ip, uri, intervalInMills);
	}
}

实现了对[IP,秒,接口],[IP,分钟,接口]两种限流,具体限流大小由管理后台配置。

留个问题:
Redis Cluster模式下,上述代码能正常工作吗?

参考:
https://zhuanlan.zhihu.com/p/20872901
https://github.com/YigWoo/toys/tree/master/src/main/java/com/yichao/woo/ratelimiter

对rate limiting功能的思考

频率限制,应该是针对调用方的限制,同时也对被调用对象的保护。

例如每个IP对`/gateway/ticket/ticket2ctx`接口每秒钟调用次数不超过200次。

故频率定义为: 调用方属性组合 X 被调用对象属性组合 x 次数 x 间隔

调用方属性包括: IP,应用ID等

被调用对象属性:一般是接口

间隔:1秒、1分钟、1小时、1每天

可以在两个地方进行频率限制:
一个是在网关,使用“调用方IP X 接口”
一个是在开放平台,使用“调用方AppId x 接口” | 调用方appId x 接口

不同间隔的限制可以同时使用,如 rate <= 100rp/sec 且 rate <= 3500 rp/min 在逻辑正确及性能方面较符合的算法包括:吊桶算法、循环队列算法、令牌通算法、漏桶算法。

吊桶算法

image2018-2-28 18_45_59
这种算法没有办法保证任意1秒(不是整数开始)都限制n个请求。本质是计数器方式,粗暴,非精确。

循环队列算法

image2018-2-28 18_48_46

维护最近n个请求的时间,当有新请求时,查看一下n个之前的请求的时间,如果大于1秒就放过,小于1秒就拒绝。
缺点,n很大时,空间消耗大。

token bucket算法

image2018-2-28 18_51_9

令牌桶会以一定速率产生令牌放入桶中,满了以后会丢弃或暂停产生,数据通过时需要持有一个令牌,没有令牌说明超过了速率限制。令牌桶算法允许突发流量,如突然将桶内令牌都消耗完成

漏桶算法

image2018-2-28 18_51_58
漏桶算法中,水以一定速率放到漏桶里,然后漏桶以一定的速率向外流水。所以最大的速率就是出水的速率。不能出现突发流量。

限流可以分为:
1.应用级限流,只是单应用实例内的请求限流
2.分布式限流。

网关部署到多台机器,为了保证正确的限流,需要全局限流,即分布式限流。

限流vs熔断:
限流一般在被调用侧实施,而熔断则在调用侧做。

资料:

http://jinnianshilongnian.iteye.com/blog/2305117

零侵入微服务日志追踪(五):网关

微服务横行的年代,网关已经成为微服务的标配。作为服务的入口,网关在请求进去实际业务集群前,可以做很多公共的事情,使得业务微服务更加的专注于业务,比如统一的鉴权、限流、熔断保护、降级。

但其实网关还可以做一件很重要的事:给每个响应增加TraceId。把TraceId返回给调用方,在前后端联调、开放接口出错等情况下,客户只需要把TraceId告诉接口负责人,负责人便可以利用APM系统、日志系统进行精准定位,极大的加快问题的定位和分析速度。

为例保证无侵入,网关可以把TraceId写入额外的HTTP Header,放回给调用方,这样就不需要解析业务微服务的响应体。

我们以Zuul为基础实现api网关,借助前面提到的Pinpoint无侵入的生成唯一TrxId。TrxId格式$agentId^$startTimestamp^$seq,$agentId为应用名-主机IP,$seq表示这个请求是这个实例处理的第$seq个请求,$startTimestamp是实例启动时间。TrxId包含了较多的敏感信息,不易直接暴露给客户端。因此可以做个映射,TrxId <-> TraceId,输出不含敏感信息的TraceId。

import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import com.vanke.gateway.common.constants.Constants;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletResponse;
import java.util.UUID;

@Component
public class TraceIdPreFilter extends ZuulFilter {

    protected static final Logger LOG = LoggerFactory.getLogger(TraceIdPreFilter.class);

    @Override
    public String filterType() {
        return "pre";
    }

    @Override
    public int filterOrder() {
        return 2;
    }

    @Override
    public boolean shouldFilter() {
        return true;
    }

    @Override
    public Object run() {
        // 更新MDC内的ptxId,不能删除
        // 由于线程复用,MDC可能是上个请求的trx信息,强制调用一次log,可以更新MDC的内容为本次请求对应的trx
        LOG.info("生产内部TraceId");
        String traceId = UUID.randomUUID().toString().replace("-", "");
        RequestContext context = RequestContext.getCurrentContext();
        context.set(Constants.REQ_CTX_TRACE_ID, traceId);

        String ptxId = MDC.get("PtxId");
        if (StringUtils.isBlank(ptxId)) {
            return null;
        }
        LOG.info("设置X-Trace-Id: {}, 关联pinpoint trxId: {}", traceId, ptxId);
        RequestContext ctx = RequestContext.getCurrentContext();
        HttpServletResponse resp = ctx.getResponse();
        resp.setHeader("X-Trace-Id", traceId);


        return null;
    }
}

输出示例如下:

通过ELK可以找到对应的Pinpoint TrxId:

利用Pinpoint TrxId就可以进行更进入的追踪了。

零侵入微服务日志追踪(三):pinpoint

dapper

Dapper是google的分布式日志追踪系统。在谷歌公开的关于dapper的论文中,阐述了其分布式追踪的原理。dapper在Correlation Id(论文中称为Trace Id)的基础是引入了跟踪树。Span表示每一个调用,记录Span之间父子关系。同一个事务的所有Span都挂在一个特定的跟踪上,也共用一个Trace id。所有这些ID用全局唯一的64位整数标示。Span有关联的时间信息和业务注释信息(Annotation)。

topu

以上图调用拓扑为例,转换为调用树如下

tree

一个Span的细节如下

spandetail

上图中这种某个span表述了两个“Helper.Call”的RPC(分别为server端和client端)。span的开始时间和结束时间,以及任何RPC的时间信息都通过Dapper在RPC组件库的植入记录下来。如果应用程序开发者选择在跟踪中增加他们自己的注释(如图中“foo”的注释)(业务数据),这些信息也会和其他span信息一样记录下来。

Pinpoint

Pinpoint是Google dapper模型的一个实现,但是有所不同。

Pinpoint 实现追踪的消息的数据结构主要包含三种类型 Span,Trace 和 TraceId。
1.Span:是最基本的调用追踪单元,当远程调用到达的时候,Span 指代处理该调用的作业,并且携带追踪数据。为了实现代码级别的可见性,Span 下面还包含一层 SpanEvent 的数据结构。每个 Span 都包含一个 SpanId。
2.Trace:是一组相互关联的 Span 集合,同一个 Trace 下的 Span 共享一个 TransactionId,而且会按照 SpanId 和 ParentSpanId 排列成一棵有层级关系的树形结构。
3.TraceId:是 TransactionId、SpanId 和 ParentSpanId 的组合。
4.TransactionId: (TxId) 是一个交易下的横跨整个分布式系统收发消息的 ID,其必须在整个服务器组中是全局唯一的。也就是说 TransactionId 识别了整个调用链;TxId由AgentId、JVM启动时间戳和消息在此实例的序列号组成。
5.SpanId: (SpanId) 是处理远程调用作业的 ID,当一个调用到达一个节点的时候随即产生;
6.ParentSpanId: (pSpanId) 顾名思义,就是产生当前 Span 的调用方 Span 的 ID。如果一个节点是交易的最初发起方,其 ParentSpanId 是 -1,以标志其是整个交易的根 Span。下图能够比较直观的说明这些 ID 结构之间的关系。
7.AgentId:JVM实例的名字,由用户设定,全局唯一。实践中,我们采用应用名加主机名。


dapper

Pinpoint和google dapper的不同:
1.Pinpoint的TransactionId就是Google Dapper中的trace id
2.PinPoint中的Trace id是一值一组不同的ID,如上图所示

Pinpoint运用JavaAgent字节码增强技术,只需在JVM启动时加上一些参数即可,业务代码无需任何修改。实践中,由于我们是对遗留系统进行架构升级,因此我们首选Pinpoint这种零侵入、低开发成本的方案。

运行是额外的启动参数:
-javaagent:pinpoint提供的agent jar包的路径
-Dpinpoint.agentId:JVM唯一标识(应用实例,实践中,我们采用单机多应用部署,因此采用应用名加主机名)
-Dpinpoint.applicationName:应用名(微服务名,采用CMDB中统一服务名)

示例:

java -javaagent:/opt/pinpoint/pinpoint-bootstrap-1.6.0-RC1.jar -Dpinpoint.agentId=api-gw-10.0.0.1 -Dpinpoint.applicationName=api-gateway -jar api-gateway-1.0.0-SNAPSHOT.jar

一个服务调用及Pinpoint展示的跟踪树的例子:
td_figure5

Pinpoint与logback/log4j

Pinpoint的零侵入其实是建立在提前以插件形式对常用库的JVM增强基础上的。其中日志常用库log4j和logback都有相关plugin。以logback为例,pinpoint-logback-plugin插件会把txId和spanid存到logback的MDC中,因此只需要在logback的配置文件中设置layout为输出MDC中txId和spanId即可。


其中PtxId和PspanId即为pinpoint-logback-plugin插件存入MDC的两个属性。
默认情况下,Pinpoint并不是跟踪所有的请求,而且不把txId和spanId存入MDC,因此需求修改pinpoint的配置文件$AGENTPATH/pinpoint.config

# 修改采样率为100%(如果不设置为100%,则会有请求不会被trace)
profiler.sampling.rate=1
# 把transaction信息存入logback MDC
profiler.logback.logging.transactioninfo=true
# 把transaction信息存入log4j MDC
profiler.log4j.logging.transactioninfo=true

输出日志示例:

[2018-03-13 15:09:47.953] INFO [qtp1861416877-61] c.y.p.a.g.z.f.MethodExistedPreFilter [TxId:gateway^1520924888825^1, SpanId:-9016575308619697817] - 检查/gateway/oauth2/token/refreshToken是否存在对应接口定义
[2018-03-13 15:09:47.979] INFO [qtp1861416877-61] c.y.p.a.g.z.f.MethodExistedPreFilter [TxId:gateway^1520924888825^1, SpanId:-9016575308619697817] - /gateway/oauth2/token/refreshToken对应的接口定义: com.**.web.AuthorityService#refreshToken
[2018-03-13 15:09:47.983] INFO [qtp1861416877-61] c.y.p.a.g.z.f.RateLimitPreFilter [TxId:gateway^1520924888825^1, SpanId:-9016575308619697817] - 开始限流检查:0:0:0:0:0:0:0:1 /gateway/oauth2/token/refreshToken
[2018-03-13 15:09:47.983] INFO [qtp1861416877-61] c.y.p.a.g.z.f.RateLimitPreFilter [TxId:gateway^1520924888825^1, SpanId:-9016575308619697817] - 接口:/gateway/oauth2/token/refreshToken 不限流
[2018-03-13 15:09:47.984] INFO [qtp1861416877-61] c.y.p.a.g.z.f.AuthPreFilter [TxId:gateway^1520924888825^1, SpanId:-9016575308619697817] - /gateway/oauth2/token/refreshToken 开始认证
[2018-03-13 15:09:47.984] INFO [qtp1861416877-61] c.y.p.a.g.z.f.AuthPreFilter [TxId:gateway^1520924888825^1, SpanId:-9016575308619697817] - 不需要认证

[TxId:gateway^1520924888825^1, SpanId:-9016575308619697817]即为pinpoint生产的traceId,其中TxId即为全局唯一请求ID。

参考资料:
1.http://research.google.com/pubs/pub36356.html
2.https://bigbully.github.io/Dapper-translation/
3.http://naver.github.io/pinpoint/techdetail.html
4.http://www.tangrui.net/2016/zipkin-vs-pinpoint.html

零侵入微服务日志追踪(二):MDC

MDC

MDC(Mapped Diagnostic Context,映射调试上下文)是 log4j 和 logback 提供的一种方便在多线程条件下记录日志的功能。

某些应用程序采用多线程的方式来处理多个用户的请求。在一个用户的使用过程中,可能有多个不同的线程来进行处理。典型的例子是 Web 应用服务器。当用户访问某个页面时,应用服务器可能会创建一个新的线程来处理该请求,也可能从线程池中复用已有的线程。在一个用户的会话存续期间,可能有多个线程处理过该用户的请求。这使得比较难以区分不同用户所对应的日志。当需要追踪某个用户在系统中的相关日志记录时,就会变得很麻烦。

一种解决的办法是采用自定义的日志格式,把用户的信息采用某种方式编码在日志记录中。这种方式的问题在于要求在每个使用日志记录器的类中,都可以访问到用户相关的信息。这样才可能在记录日志时使用。这样的条件通常是比较难以满足的。MDC 的作用是解决这个问题。

MDC 可以看成是一个与当前线程绑定的哈希表,可以往其中添加键值对。MDC 中包含的内容可以被同一线程中执行的代码所访问。当前线程的子线程会继承其父线程中的 MDC 的内容。当需要记录日志时,只需要从 MDC 中获取所需的信息即可。

MDC的内容则由程序在适当的时候保存进去。对于一个 Web 应用来说,通常是在请求被处理的最开始保存这些数据。

以log4j为例

在记录日志前,首先在 MDC 中保存了名称为username的数据。其中包含的数据可以在格式化日志记录时直接引用

public class MdcSample { 
   private static final Logger LOGGER = Logger.getLogger("mdc"); 

   public void log() { 
       // 保存共享信息到MDC
       MDC.put("username", "Alex"); 

       if (LOGGER.isInfoEnabled()) { 
           LOGGER.info("This is a message."); 
       } 
   } 
}

修改log4j配置文件的日志输出格式的,其中%X{username}表示引用MDC中username的值

log4j.appender.stdout.layout.ConversionPattern=%X{username} %d{yyyy-MM-dd HH:mm:ss}

MDC与ThreadLocal对比

在前文中的示例使用ThreadLocal来存储Correlation Id,并在打印日志时从ThreadLocal取出,写入日志

// ...
    
    String correlationId = RequestCorrelation.getId();
    LOGGER.info("start REST request to {} with correlationId {}", uri, correlationId);

// ...

这要求开发人员在打日志时必须先取Correlation Id,再写入日志内容。特别容易忘记或者出错。虽然可以封装LogUtil类,但对于遗留项目,要求全部日志输出都得改用logUtil,工程量太大,侵入太强,肯定无法推广。

使用带MDC的日志框架,只需修改日志配置文件,代码中日志输出的地方无需任何修改,是一种更低成本,更不容易出错的解决方案。更服务零侵入的要求。

零侵入微服务日志追踪(一):Correlation Id

Correlation ids is an essential feature of service-oriented/microservice platforms for monitoring, reporting and diagnostics。
Correlation ids allow distributed tracing within complex service oriented platforms, where a single request into the application can often be dealt with by multiple downstream service. Without the ability to correlate downstream service requests it can be very difficult to understand how requests are being handled within your platform.

拆分微服务后,对一个接口的请求,往往会引发对其他若干个微服务的调用。如果缺少对这一连串请求的关联信息,就很难搞清楚服务间调用关系。这一问题就是链路追踪问题。链路追踪又分APM和日志追踪。APM关注调用间的耗时,日志追踪关注。

链路追踪我们通常使用Correlation id(下文也称Trace id)是来解决这,来自客户端的一个请求的产生的服务间调用都被管理同一个Correlation id。
通过在APM数据和日志中输出该ID,实现追踪。

具体如下:
1.服务收到请求时,检查是否带有Correlation id,如果没有产生新的Correlation id,如果有则使用请求中的Correlation id
2.在调用下游其他服务时,带上这个唯一的id
3.服务在打印响应耗时的指标数据或者日志内容中输出Correlation id
4.APM系统收集所有请求监控数据,以Correlation id和父子调用关系,生产调用树及耗时
5.日志系统收集所有微服务的日志,解析出Correlation id,并按Correlation id索引,排查问题时可以按Correlation id搜索

一个J2EE实现例子

public class CorrelationHeaderFilter implements Filter {

    //...

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
            throws IOException, ServletException {

        final HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
        String currentCorrId = httpServletRequest.getHeader(RequestCorrelation.CORRELATION_ID_HEADER);

        if (!currentRequestIsAsyncDispatcher(httpServletRequest)) {
            if (currentCorrId == null) {
                // 生产新的Correlation id
                currentCorrId = UUID.randomUUID().toString();
                LOGGER.info("No correlationId found in Header. Generated : " + currentCorrId);
            } else {
                LOGGER.info("Found correlationId in Header : " + currentCorrId);
            }
            // 保存Correlation id到请求上下文
            RequestCorrelation.setId(currentCorrId);
        }

        filterChain.doFilter(httpServletRequest, servletResponse);
    }

    //...
    
}

例子中,通过filter拦截所有请求,判断http header是否有Correlation id,如果没有,则使用uuid生产新的Correlation id。最后存入RequestCorrelation中。

public class RequestCorrelation {

    public static final String CORRELATION_ID = "correlationId";

    private static final ThreadLocal id = new ThreadLocal();


    public static String getId() { return id.get(); }

    public static void setId(String correlationId) { id.set(correlationId); }
}

一般地可以使用ThreadLocal来存储Correlation id.因为使用一个线程处理一个请求的模式,可以使用threadlocal在请求中共享Correlation id。

当调用下游服务,已http方式为例,可利用http header来传递Correlation id。

@Component
// 子类隐藏Correlation Id传递细节
public class CorrelatingRestClient implements RestClient {

    private RestTemplate restTemplate = new RestTemplate();

    @Override
    public String getForString(String uri) {
        String correlationId = RequestCorrelation.getId();
        HttpHeaders httpHeaders = new HttpHeaders();
        
        // 以http header方式传递Correlation Id到下一个服务
        httpHeaders.set(RequestCorrelation.CORRELATION_ID, correlationId);

        LOGGER.info("start REST request to {} with correlationId {}", uri, correlationId);

        ResponseEntity response = restTemplate.exchange(uri, HttpMethod.GET,
                new HttpEntity(httpHeaders), String.class);
        // 日志中输出Correlation Id
        LOGGER.info("completed REST request to {} with correlationId {}", uri, correlationId);

        return response.getBody();
    }
}


// 调用示例
public String exampleMethod() {
        RestClient restClient = new CorrelatingRestClient();
        return restClient.getForString(URI_LOCATION); 
}