Monthly Archives: May 2018

SSO和CAS

SSO(Single sign-on,单点登录),就是只登录一次(只提供一次凭证,如账号密码),就可以畅通无阻的访问平台的多个应用/服务。不应该把SSO和OAuth等授权协议混淆,OAuth协议要求在登录不同系统是都要进行认证和授权,而SSO只认证一次。OAuth本质是解决授权问题。

要实现SSO有很多方案。CAS(Central Authentication Service,集中式认证服务)就是其中一种。最初的CAS由Yale实现,现在CAS协议的实现有很多。CAS协议已经发展到了3.0版本。

一下对CAS协议的描述来自apereo CAS实现的文档。

CAS protocol

The CAS protocol is a simple and powerful ticket-based protocol developed exclusively for CAS. A complete protocol specification may be found here.

It involves one or many clients and one server. Clients are embedded in CASified applications (called “CAS services”) whereas the CAS server is a standalone component:

  • The CAS server is responsible for authenticating users and granting accesses to applications
  • The CAS clients protect the CAS applications and retrieve the identity of the granted users from the CAS server.

The key concepts are:

  • The TGT (Ticket Granting Ticket), stored in the CASTGC cookie, represents a SSO session for a user
  • The ST (Service Ticket), transmitted as a GET parameter in urls, stands for the access granted by the CAS server to the CASified application for a specific user.

交互流程

cas_flow_diagram

从上图可以看出,web的CAS利用到了session cookie和ticket令牌。不难分析得出CASTGC用于保持浏览器到CAS server的会话,第二次访问不需要再提供凭证。利用GET参数传递ST使得认证结果可以在不同域名之间传递。浏览器和受保护应用之间还有一个session cookie,保持了两者之间的会话,使得第二次直接访问应用也不需要提供凭证,应用也不需要再去CAS认证对令牌进行认证。

redis+lua实现Token Bucket限流

在做网关限流时,经过研究后决定使用redis+lua实现Token Bucket算法做分布式限流。

Token Bucket算法要求匀速放入令牌,用定时器可以实现。但是网关的来源IP太多,桶很多,需要很多线程加定时器才能实时处理得过来,不是一个好办法。
换个思路,由于生产token是匀速的,且取token是有时间差的,可以利用两次取token的时间差计算出间隔间应该产生的token数量。原有token+新产生token数-要获取的token数,就是桶里剩余的token。因此对于一个桶,要记录剩余token数,上次取token时间戳。获取token是要传入当前时间戳,要获取token数量(默认1)。在修改桶的两个信息时,需要是原子操作,而且获取令牌过程中带有计算逻辑。
redis是以单线程串行处理请求,在处理EVAL命令时是一个原子操作。因此使用redis+lua脚本刚好可以满足上述要求。

脚本如下

-- bucket name
local key = KEYS[1]
-- token generate interval
local intervalPerPermit = tonumber(ARGV[1])
-- grant timestamp
local refillTime = tonumber(ARGV[2])
-- limit token count
local limit = tonumber(ARGV[3])
-- ratelimit time period
local interval = tonumber(ARGV[4])

local counter = redis.call('hgetall', key)

if table.getn(counter) == 0 then
    -- first check if bucket not exists, if yes, create a new one with full capacity, then grant access
    redis.call('hmset', key, 'lastRefillTime', refillTime, 'tokensRemaining', limit - 1)
    -- expire will save memory
    redis.call('expire', key, interval)
    return 1
elseif table.getn(counter) == 4 then
    -- if bucket exists, first we try to refill the token bucket
    local lastRefillTime, tokensRemaining = tonumber(counter[2]), tonumber(counter[4])
    local currentTokens
    if refillTime > lastRefillTime then
        -- check if refillTime larger than lastRefillTime.
        -- if not, it means some other operation later than this call made the call first.
        -- there is no need to refill the tokens.
        local intervalSinceLast = refillTime - lastRefillTime
        if intervalSinceLast > interval then
            currentTokens = limit
            redis.call('hset', key, 'lastRefillTime', refillTime)
        else
            local grantedTokens = math.floor(intervalSinceLast / intervalPerPermit)
            if grantedTokens > 0 then
                -- ajust lastRefillTime, we want shift left the refill time.
                local padMillis = math.fmod(intervalSinceLast, intervalPerPermit)
                redis.call('hset', key, 'lastRefillTime', refillTime - padMillis)
            end
            currentTokens = math.min(grantedTokens + tokensRemaining, limit)
        end
    else
        -- if not, it means some other operation later than this call made the call first.
        -- there is no need to refill the tokens.
        currentTokens = tokensRemaining
    end
    
    assert(currentTokens >= 0)

    if currentTokens == 0 then
        -- we didn't consume any keys
        redis.call('hset', key, 'tokensRemaining', currentTokens)
        return 0
    else
        -- we take 1 token from the bucket
        redis.call('hset', key, 'tokensRemaining', currentTokens - 1)
        return 1
    end
else
    error("Size of counter is " .. table.getn(counter) .. ", Should Be 0 or 4.")
end

调用这个脚本需要4个参数,其中key是桶的名字,intervalPerPermit是产生令牌的时间间隔,refillTime是调用脚本的即时时间戳,limit是最大间隔访问次数,interval桶的限流间隔。key生产规则可以是”ratelimit:$IP:$PATH:$INTERVAL”。当token不存在时,默认创建一个带有最大token数量的桶。当超过interval都没有访问桶时,对该通的缓存会过期,减少内存使用量。返回1代表成功获取一个令牌,返回0代表未获取到令牌(此次访问应该被限制)。

假设要限制某个IP对某个接口每秒钟最多访问2次,则
interval = 1000(毫秒)
limit = 2
intervalPerPermit = interval/limit = 500
key=”ratelimit:127.0.0.1:/user/get:1000″

命令行调用

redis-cli -h localhost -p 6379 -a wingyiu --ldb --eval ~/Git/api-gateway/src/main/resources/ratelimit_token_bucket.lua ratelimit:127.0.0.1:/user/get:1000 , 500 1520402606803 2 1000

网关使用spring cloud netflix zuul,可以编写一个filter,作为限流功能的实现,调用redis,执行上述Lua脚本

package com.yunzhijia.platform.api.gateway.zuul.filter;

import com.google.common.net.InetAddresses;
import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import com.xxx.gateway.vo.SimpleRouteVo;
import com.xxx.gateway.common.constants.CacheConstants;
import com.xxx.gateway.common.constants.ErrorMsgConstants;
import com.xxx.gateway.exception.GatewayException;
import com.xxx.gateway.utils.RedisTemplateUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import java.util.Collections;

/**
 * 目前仅实现IP+URI的限流
 * 基于令牌桶算法
 * 借助于redis+lua实现对令牌桶的原子操作
 */
@Component
public class RateLimitPreFilter extends ZuulFilter {

	protected static final Logger LOG = LoggerFactory.getLogger(RateLimitPreFilter.class);

	@Override
	public String filterType() {
		return "pre";
	}

	@Override
	public int filterOrder() {
		return 4;
	}

	@Override
	public boolean shouldFilter() {
		return true;
	}

	@Autowired
	RedisScript redisScript;

	@Override
	public Object run() {
		RequestContext ctx = RequestContext.getCurrentContext();
		HttpServletRequest request = ctx.getRequest();

		String uri = request.getRequestURI();
		LOG.info("开始限流检查:{}", uri);
		// 获取ip
		String ip = request.getHeader("X-Forwarded-For");
		if (StringUtils.isBlank(ip)) {
			ip = request.getRemoteAddr();
		}
		ip = ip.split(",")[0];
		LOG.info("remote ip: {}",  ip);

		if (isSiteLocalAddress(ip)) {
			LOG.info("私有地址跳过: {}", ip);
			return null;
		}

		SimpleRouteVo routeVo = (SimpleRouteVo) ctx.get("methodDefinition");

		if (routeVo.getReqPerSec() <= 0 && routeVo.getReqPerMin() <= 0) {
			// 不需要限流
			LOG.info("接口:{} 不限流", uri);
			return null;
		}

		if (routeVo.getReqPerSec() > 0) {
			boolean canAccess = access(ip, uri, 1000, routeVo.getReqPerSec());
			if (!canAccess) {
				LOG.info("被限流: ip={}, uri={}, interval=sec, limit={}", ip, uri, routeVo.getReqPerSec());
				ctx.setSendZuulResponse(false);
				// raise exception to jump to error filter
				throw new GatewayException(ErrorMsgConstants.TOO_MANY_REQUESTS);
			}
		}

		if (routeVo.getReqPerMin() > 0) {
			boolean canAccess = access(ip, uri, 60000, routeVo.getReqPerMin());
			if (!canAccess) {
				LOG.info("被限流: ip={}, uri={}, interval=min, limit={}", ip, uri, routeVo.getReqPerMin());
				ctx.setSendZuulResponse(false);
				// raise exception to jump to error filter
				throw new GatewayException(ErrorMsgConstants.TOO_MANY_REQUESTS);
			}
		}

		return null;
	}

	private boolean isSiteLocalAddress(String ip) {
		// refer to RFC 1918
		// 10/8 prefix
		// 172.16/12 prefix
		// 192.168/16 prefix
		int address = InetAddresses.coerceToInteger(InetAddresses.forString(ip));
		return (((address >>> 24) & 0xFF) == 10)
				|| ((((address >>> 24) & 0xFF) == 172)
				&& ((address >>> 16) & 0xFF) >= 16
				&& ((address >>> 16) & 0xFF) <= 31)
				|| ((((address >>> 24) & 0xFF) == 192)
				&& (((address >>> 16) & 0xFF) == 168));
	}

	public boolean access(String ip, String uri, long intervalInMills, long limit) {
		LOG.info(redisScript.getSha1());
		String key = genKey(ip, uri, intervalInMills, limit);
		key = CacheConstants.genKey(key);
		long intervalPerPermit = intervalInMills / limit;

		try {
			RedisTemplate redisTemplate = RedisTemplateUtils.getRedisTemplate();
			Long refillTime = System.currentTimeMillis();
			LOG.info("调用redis执行lua脚本, {} {} {} {} {}", key, String.valueOf(intervalPerPermit), String.valueOf(refillTime),
					String.valueOf(limit), String.valueOf(intervalInMills));
			Long res = (Long)redisTemplate.execute(redisScript, Collections.singletonList(key),
					String.valueOf(intervalPerPermit), String.valueOf(System.currentTimeMillis()),
					//String.valueOf(limit),
					String.valueOf(limit), String.valueOf(intervalInMills));
			LOG.info("调用redis执行lua脚本:{}", res);
			return res == 1L ? true : false;

		} catch (Exception e) {
			LOG.error("调用redis执行lua脚本出错", e);
			return true; // 内部异常,直接放过吧,保证能访问,不计较太多
		}
	}

	private String genKey(String ip, String uri, long intervalInMills, long limit) {
		return String.format("ratelimit:%s:%s:%s", ip, uri, intervalInMills);
	}
}

实现了对[IP,秒,接口],[IP,分钟,接口]两种限流,具体限流大小由管理后台配置。

留个问题:
Redis Cluster模式下,上述代码能正常工作吗?

参考:
https://zhuanlan.zhihu.com/p/20872901
https://github.com/YigWoo/toys/tree/master/src/main/java/com/yichao/woo/ratelimiter

对rate limiting功能的思考

频率限制,应该是针对调用方的限制,同时也对被调用对象的保护。

例如每个IP对`/gateway/ticket/ticket2ctx`接口每秒钟调用次数不超过200次。

故频率定义为: 调用方属性组合 X 被调用对象属性组合 x 次数 x 间隔

调用方属性包括: IP,应用ID等

被调用对象属性:一般是接口

间隔:1秒、1分钟、1小时、1每天

可以在两个地方进行频率限制:
一个是在网关,使用“调用方IP X 接口”
一个是在开放平台,使用“调用方AppId x 接口” | 调用方appId x 接口

不同间隔的限制可以同时使用,如 rate <= 100rp/sec 且 rate <= 3500 rp/min 在逻辑正确及性能方面较符合的算法包括:吊桶算法、循环队列算法、令牌通算法、漏桶算法。

吊桶算法

image2018-2-28 18_45_59
这种算法没有办法保证任意1秒(不是整数开始)都限制n个请求。本质是计数器方式,粗暴,非精确。

循环队列算法

image2018-2-28 18_48_46

维护最近n个请求的时间,当有新请求时,查看一下n个之前的请求的时间,如果大于1秒就放过,小于1秒就拒绝。
缺点,n很大时,空间消耗大。

token bucket算法

image2018-2-28 18_51_9

令牌桶会以一定速率产生令牌放入桶中,满了以后会丢弃或暂停产生,数据通过时需要持有一个令牌,没有令牌说明超过了速率限制。令牌桶算法允许突发流量,如突然将桶内令牌都消耗完成

漏桶算法

image2018-2-28 18_51_58
漏桶算法中,水以一定速率放到漏桶里,然后漏桶以一定的速率向外流水。所以最大的速率就是出水的速率。不能出现突发流量。

限流可以分为:
1.应用级限流,只是单应用实例内的请求限流
2.分布式限流。

网关部署到多台机器,为了保证正确的限流,需要全局限流,即分布式限流。

限流vs熔断:
限流一般在被调用侧实施,而熔断则在调用侧做。

资料:

http://jinnianshilongnian.iteye.com/blog/2305117

零侵入微服务日志追踪(五):网关

微服务横行的年代,网关已经成为微服务的标配。作为服务的入口,网关在请求进去实际业务集群前,可以做很多公共的事情,使得业务微服务更加的专注于业务,比如统一的鉴权、限流、熔断保护、降级。

但其实网关还可以做一件很重要的事:给每个响应增加TraceId。把TraceId返回给调用方,在前后端联调、开放接口出错等情况下,客户只需要把TraceId告诉接口负责人,负责人便可以利用APM系统、日志系统进行精准定位,极大的加快问题的定位和分析速度。

为例保证无侵入,网关可以把TraceId写入额外的HTTP Header,放回给调用方,这样就不需要解析业务微服务的响应体。

我们以Zuul为基础实现api网关,借助前面提到的Pinpoint无侵入的生成唯一TrxId。TrxId格式$agentId^$startTimestamp^$seq,$agentId为应用名-主机IP,$seq表示这个请求是这个实例处理的第$seq个请求,$startTimestamp是实例启动时间。TrxId包含了较多的敏感信息,不易直接暴露给客户端。因此可以做个映射,TrxId <-> TraceId,输出不含敏感信息的TraceId。

import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import com.vanke.gateway.common.constants.Constants;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletResponse;
import java.util.UUID;

@Component
public class TraceIdPreFilter extends ZuulFilter {

    protected static final Logger LOG = LoggerFactory.getLogger(TraceIdPreFilter.class);

    @Override
    public String filterType() {
        return "pre";
    }

    @Override
    public int filterOrder() {
        return 2;
    }

    @Override
    public boolean shouldFilter() {
        return true;
    }

    @Override
    public Object run() {
        // 更新MDC内的ptxId,不能删除
        // 由于线程复用,MDC可能是上个请求的trx信息,强制调用一次log,可以更新MDC的内容为本次请求对应的trx
        LOG.info("生产内部TraceId");
        String traceId = UUID.randomUUID().toString().replace("-", "");
        RequestContext context = RequestContext.getCurrentContext();
        context.set(Constants.REQ_CTX_TRACE_ID, traceId);

        String ptxId = MDC.get("PtxId");
        if (StringUtils.isBlank(ptxId)) {
            return null;
        }
        LOG.info("设置X-Trace-Id: {}, 关联pinpoint trxId: {}", traceId, ptxId);
        RequestContext ctx = RequestContext.getCurrentContext();
        HttpServletResponse resp = ctx.getResponse();
        resp.setHeader("X-Trace-Id", traceId);


        return null;
    }
}

输出示例如下:

通过ELK可以找到对应的Pinpoint TrxId:

利用Pinpoint TrxId就可以进行更进入的追踪了。

零侵入微服务日志追踪(四):ELK做日志分析

上文介绍了利用Pinpoint把trxId以低侵入方式注入到每行日志输出中,方便在日志文件内进行单个请求的日志识别。

但是依然没有解决一个问题是:快速找出某个请求涉及的所有服务的日志内容。

可以利用ELK这类分布式日志系统,自动收集所有服务、所有实例的每个日志文件,统一存储和索引。然后利用ES的查询语句,进行全局的搜索(精确条件过滤、文本模糊搜索)。

由于我们已经把Pinpoint trxId写入日志行中,因此可以对trxId进行索引。这样我们就可以利用ptxId精确的搜索到一个请求的链路经过的所有服务实例产生的日志了,极大的提升了日志定位的速度。

上图展示了一个涉及两个服务(api-gateway、authserver)的请求。

如何获取一个接口调用的trxId将会在下一篇文章介绍。

零侵入微服务日志追踪(三):pinpoint

dapper

Dapper是google的分布式日志追踪系统。在谷歌公开的关于dapper的论文中,阐述了其分布式追踪的原理。dapper在Correlation Id(论文中称为Trace Id)的基础是引入了跟踪树。Span表示每一个调用,记录Span之间父子关系。同一个事务的所有Span都挂在一个特定的跟踪上,也共用一个Trace id。所有这些ID用全局唯一的64位整数标示。Span有关联的时间信息和业务注释信息(Annotation)。

topu

以上图调用拓扑为例,转换为调用树如下

tree

一个Span的细节如下

spandetail

上图中这种某个span表述了两个“Helper.Call”的RPC(分别为server端和client端)。span的开始时间和结束时间,以及任何RPC的时间信息都通过Dapper在RPC组件库的植入记录下来。如果应用程序开发者选择在跟踪中增加他们自己的注释(如图中“foo”的注释)(业务数据),这些信息也会和其他span信息一样记录下来。

Pinpoint

Pinpoint是Google dapper模型的一个实现,但是有所不同。

Pinpoint 实现追踪的消息的数据结构主要包含三种类型 Span,Trace 和 TraceId。
1.Span:是最基本的调用追踪单元,当远程调用到达的时候,Span 指代处理该调用的作业,并且携带追踪数据。为了实现代码级别的可见性,Span 下面还包含一层 SpanEvent 的数据结构。每个 Span 都包含一个 SpanId。
2.Trace:是一组相互关联的 Span 集合,同一个 Trace 下的 Span 共享一个 TransactionId,而且会按照 SpanId 和 ParentSpanId 排列成一棵有层级关系的树形结构。
3.TraceId:是 TransactionId、SpanId 和 ParentSpanId 的组合。
4.TransactionId: (TxId) 是一个交易下的横跨整个分布式系统收发消息的 ID,其必须在整个服务器组中是全局唯一的。也就是说 TransactionId 识别了整个调用链;TxId由AgentId、JVM启动时间戳和消息在此实例的序列号组成。
5.SpanId: (SpanId) 是处理远程调用作业的 ID,当一个调用到达一个节点的时候随即产生;
6.ParentSpanId: (pSpanId) 顾名思义,就是产生当前 Span 的调用方 Span 的 ID。如果一个节点是交易的最初发起方,其 ParentSpanId 是 -1,以标志其是整个交易的根 Span。下图能够比较直观的说明这些 ID 结构之间的关系。
7.AgentId:JVM实例的名字,由用户设定,全局唯一。实践中,我们采用应用名加主机名。


dapper

Pinpoint和google dapper的不同:
1.Pinpoint的TransactionId就是Google Dapper中的trace id
2.PinPoint中的Trace id是一值一组不同的ID,如上图所示

Pinpoint运用JavaAgent字节码增强技术,只需在JVM启动时加上一些参数即可,业务代码无需任何修改。实践中,由于我们是对遗留系统进行架构升级,因此我们首选Pinpoint这种零侵入、低开发成本的方案。

运行是额外的启动参数:
-javaagent:pinpoint提供的agent jar包的路径
-Dpinpoint.agentId:JVM唯一标识(应用实例,实践中,我们采用单机多应用部署,因此采用应用名加主机名)
-Dpinpoint.applicationName:应用名(微服务名,采用CMDB中统一服务名)

示例:

java -javaagent:/opt/pinpoint/pinpoint-bootstrap-1.6.0-RC1.jar -Dpinpoint.agentId=api-gw-10.0.0.1 -Dpinpoint.applicationName=api-gateway -jar api-gateway-1.0.0-SNAPSHOT.jar

一个服务调用及Pinpoint展示的跟踪树的例子:
td_figure5

Pinpoint与logback/log4j

Pinpoint的零侵入其实是建立在提前以插件形式对常用库的JVM增强基础上的。其中日志常用库log4j和logback都有相关plugin。以logback为例,pinpoint-logback-plugin插件会把txId和spanid存到logback的MDC中,因此只需要在logback的配置文件中设置layout为输出MDC中txId和spanId即可。


其中PtxId和PspanId即为pinpoint-logback-plugin插件存入MDC的两个属性。
默认情况下,Pinpoint并不是跟踪所有的请求,而且不把txId和spanId存入MDC,因此需求修改pinpoint的配置文件$AGENTPATH/pinpoint.config

# 修改采样率为100%(如果不设置为100%,则会有请求不会被trace)
profiler.sampling.rate=1
# 把transaction信息存入logback MDC
profiler.logback.logging.transactioninfo=true
# 把transaction信息存入log4j MDC
profiler.log4j.logging.transactioninfo=true

输出日志示例:

[2018-03-13 15:09:47.953] INFO [qtp1861416877-61] c.y.p.a.g.z.f.MethodExistedPreFilter [TxId:gateway^1520924888825^1, SpanId:-9016575308619697817] - 检查/gateway/oauth2/token/refreshToken是否存在对应接口定义
[2018-03-13 15:09:47.979] INFO [qtp1861416877-61] c.y.p.a.g.z.f.MethodExistedPreFilter [TxId:gateway^1520924888825^1, SpanId:-9016575308619697817] - /gateway/oauth2/token/refreshToken对应的接口定义: com.**.web.AuthorityService#refreshToken
[2018-03-13 15:09:47.983] INFO [qtp1861416877-61] c.y.p.a.g.z.f.RateLimitPreFilter [TxId:gateway^1520924888825^1, SpanId:-9016575308619697817] - 开始限流检查:0:0:0:0:0:0:0:1 /gateway/oauth2/token/refreshToken
[2018-03-13 15:09:47.983] INFO [qtp1861416877-61] c.y.p.a.g.z.f.RateLimitPreFilter [TxId:gateway^1520924888825^1, SpanId:-9016575308619697817] - 接口:/gateway/oauth2/token/refreshToken 不限流
[2018-03-13 15:09:47.984] INFO [qtp1861416877-61] c.y.p.a.g.z.f.AuthPreFilter [TxId:gateway^1520924888825^1, SpanId:-9016575308619697817] - /gateway/oauth2/token/refreshToken 开始认证
[2018-03-13 15:09:47.984] INFO [qtp1861416877-61] c.y.p.a.g.z.f.AuthPreFilter [TxId:gateway^1520924888825^1, SpanId:-9016575308619697817] - 不需要认证

[TxId:gateway^1520924888825^1, SpanId:-9016575308619697817]即为pinpoint生产的traceId,其中TxId即为全局唯一请求ID。

参考资料:
1.http://research.google.com/pubs/pub36356.html
2.https://bigbully.github.io/Dapper-translation/
3.http://naver.github.io/pinpoint/techdetail.html
4.http://www.tangrui.net/2016/zipkin-vs-pinpoint.html

零侵入微服务日志追踪(二):MDC

MDC

MDC(Mapped Diagnostic Context,映射调试上下文)是 log4j 和 logback 提供的一种方便在多线程条件下记录日志的功能。

某些应用程序采用多线程的方式来处理多个用户的请求。在一个用户的使用过程中,可能有多个不同的线程来进行处理。典型的例子是 Web 应用服务器。当用户访问某个页面时,应用服务器可能会创建一个新的线程来处理该请求,也可能从线程池中复用已有的线程。在一个用户的会话存续期间,可能有多个线程处理过该用户的请求。这使得比较难以区分不同用户所对应的日志。当需要追踪某个用户在系统中的相关日志记录时,就会变得很麻烦。

一种解决的办法是采用自定义的日志格式,把用户的信息采用某种方式编码在日志记录中。这种方式的问题在于要求在每个使用日志记录器的类中,都可以访问到用户相关的信息。这样才可能在记录日志时使用。这样的条件通常是比较难以满足的。MDC 的作用是解决这个问题。

MDC 可以看成是一个与当前线程绑定的哈希表,可以往其中添加键值对。MDC 中包含的内容可以被同一线程中执行的代码所访问。当前线程的子线程会继承其父线程中的 MDC 的内容。当需要记录日志时,只需要从 MDC 中获取所需的信息即可。

MDC的内容则由程序在适当的时候保存进去。对于一个 Web 应用来说,通常是在请求被处理的最开始保存这些数据。

以log4j为例

在记录日志前,首先在 MDC 中保存了名称为username的数据。其中包含的数据可以在格式化日志记录时直接引用

public class MdcSample { 
   private static final Logger LOGGER = Logger.getLogger("mdc"); 

   public void log() { 
       // 保存共享信息到MDC
       MDC.put("username", "Alex"); 

       if (LOGGER.isInfoEnabled()) { 
           LOGGER.info("This is a message."); 
       } 
   } 
}

修改log4j配置文件的日志输出格式的,其中%X{username}表示引用MDC中username的值

log4j.appender.stdout.layout.ConversionPattern=%X{username} %d{yyyy-MM-dd HH:mm:ss}

MDC与ThreadLocal对比

在前文中的示例使用ThreadLocal来存储Correlation Id,并在打印日志时从ThreadLocal取出,写入日志

// ...
    
    String correlationId = RequestCorrelation.getId();
    LOGGER.info("start REST request to {} with correlationId {}", uri, correlationId);

// ...

这要求开发人员在打日志时必须先取Correlation Id,再写入日志内容。特别容易忘记或者出错。虽然可以封装LogUtil类,但对于遗留项目,要求全部日志输出都得改用logUtil,工程量太大,侵入太强,肯定无法推广。

使用带MDC的日志框架,只需修改日志配置文件,代码中日志输出的地方无需任何修改,是一种更低成本,更不容易出错的解决方案。更服务零侵入的要求。

零侵入微服务日志追踪(一):Correlation Id

Correlation ids is an essential feature of service-oriented/microservice platforms for monitoring, reporting and diagnostics。
Correlation ids allow distributed tracing within complex service oriented platforms, where a single request into the application can often be dealt with by multiple downstream service. Without the ability to correlate downstream service requests it can be very difficult to understand how requests are being handled within your platform.

拆分微服务后,对一个接口的请求,往往会引发对其他若干个微服务的调用。如果缺少对这一连串请求的关联信息,就很难搞清楚服务间调用关系。这一问题就是链路追踪问题。链路追踪又分APM和日志追踪。APM关注调用间的耗时,日志追踪关注。

链路追踪我们通常使用Correlation id(下文也称Trace id)是来解决这,来自客户端的一个请求的产生的服务间调用都被管理同一个Correlation id。
通过在APM数据和日志中输出该ID,实现追踪。

具体如下:
1.服务收到请求时,检查是否带有Correlation id,如果没有产生新的Correlation id,如果有则使用请求中的Correlation id
2.在调用下游其他服务时,带上这个唯一的id
3.服务在打印响应耗时的指标数据或者日志内容中输出Correlation id
4.APM系统收集所有请求监控数据,以Correlation id和父子调用关系,生产调用树及耗时
5.日志系统收集所有微服务的日志,解析出Correlation id,并按Correlation id索引,排查问题时可以按Correlation id搜索

一个J2EE实现例子

public class CorrelationHeaderFilter implements Filter {

    //...

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
            throws IOException, ServletException {

        final HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
        String currentCorrId = httpServletRequest.getHeader(RequestCorrelation.CORRELATION_ID_HEADER);

        if (!currentRequestIsAsyncDispatcher(httpServletRequest)) {
            if (currentCorrId == null) {
                // 生产新的Correlation id
                currentCorrId = UUID.randomUUID().toString();
                LOGGER.info("No correlationId found in Header. Generated : " + currentCorrId);
            } else {
                LOGGER.info("Found correlationId in Header : " + currentCorrId);
            }
            // 保存Correlation id到请求上下文
            RequestCorrelation.setId(currentCorrId);
        }

        filterChain.doFilter(httpServletRequest, servletResponse);
    }

    //...
    
}

例子中,通过filter拦截所有请求,判断http header是否有Correlation id,如果没有,则使用uuid生产新的Correlation id。最后存入RequestCorrelation中。

public class RequestCorrelation {

    public static final String CORRELATION_ID = "correlationId";

    private static final ThreadLocal id = new ThreadLocal();


    public static String getId() { return id.get(); }

    public static void setId(String correlationId) { id.set(correlationId); }
}

一般地可以使用ThreadLocal来存储Correlation id.因为使用一个线程处理一个请求的模式,可以使用threadlocal在请求中共享Correlation id。

当调用下游服务,已http方式为例,可利用http header来传递Correlation id。

@Component
// 子类隐藏Correlation Id传递细节
public class CorrelatingRestClient implements RestClient {

    private RestTemplate restTemplate = new RestTemplate();

    @Override
    public String getForString(String uri) {
        String correlationId = RequestCorrelation.getId();
        HttpHeaders httpHeaders = new HttpHeaders();
        
        // 以http header方式传递Correlation Id到下一个服务
        httpHeaders.set(RequestCorrelation.CORRELATION_ID, correlationId);

        LOGGER.info("start REST request to {} with correlationId {}", uri, correlationId);

        ResponseEntity response = restTemplate.exchange(uri, HttpMethod.GET,
                new HttpEntity(httpHeaders), String.class);
        // 日志中输出Correlation Id
        LOGGER.info("completed REST request to {} with correlationId {}", uri, correlationId);

        return response.getBody();
    }
}


// 调用示例
public String exampleMethod() {
        RestClient restClient = new CorrelatingRestClient();
        return restClient.getForString(URI_LOCATION); 
}