• Home

如何构建一个实时图片处理服务

图片处理的必要性

现如今的互联网应用,有大量的场景会涉及图片的展示,如头像、微博和朋友圈的图片、即时聊天里的图片、电商网站的商品图、首页的促销轮播图。这些图片可能来源于内容编辑的 PS 软件,也可能来自于用户的高清摄像头拍摄产生,这些图片的文件大小和尺寸越来越大。而几乎每一个应用都会有Web 端、手机客户端、Pad 客户端、小程序、Mobile 版的 Web 站点,在设计针对这些不同尺寸大小的设备的应用界面时,展示的图片的控件的尺寸也是不一样的。假设每次展示图片时都加载很大的原图的话,肯定会很影响体验。一方面,原图的文件大小较大,下载到端本地需要更长的时间;另一方面,原图的尺寸很大,应用需要花更多的计算来解析图片和压缩到图片控件的像素尺寸。加载原图既浪费流量占用带宽,也浪费端的 CPU 和 GPU 资源,展示图片的延时会造成不好的用户体验。因此灵活的、高性能的图片缩略对应互联网应用来说必不可少。

常见云服务商的图片服务

云厂商提供的图片处理能力较丰富,本节只介绍和基本的缩略、裁剪、压缩、格式转换有关的功能。

阿里云OSS

图片处理功能就集成在 OSS,通过 GET 请求 URL 查询参数方式来控制输出的图片,控制参数格式为

x-oss-process = image/action1,param11_value11,param12_value12/action2,param21_value21

其中 action 为操作,目前 image 模块支持的操作包括:缩放 resize、裁剪 crop、旋转 rotate、锐化 sharpen、格式转换 format、质量调节 quality、水印 watermark。action 后紧跟多个参数名值对,使用_分隔参数名和参数值。如

https://image-demo.oss-cn-hangzhou.aliyuncs.com/example.jpg?x-oss-process=image/resize,w_500

将图片将图缩略成宽度为 500 px,高度按原比例处理。

七牛云存储

七牛图片处理包括基本处理(缩略裁剪压缩) imageView2、水印 watermark、基本信息 imageInfo、圆角处理 roundPic、瘦身imageslim等,以基本处理为例,通过在原始 URL添加

?imageView2/<mode>/w/<LongEdge>/h/<ShortEdge>
/format/<Format>/interlace/<Interlace>
/q/<Quality>/colors/<colors>
/ignore-error/<ignoreError>

即可得到想要的转换图片。官方示例

http://dn-odum9helk.qbox.me/resource/gogopher.jpg?imageView2/1/w/200/h/200

可以得到裁剪正中部分,等比缩小生成200×200缩略图。

腾讯云数据万象(Cloud Infinite,CI)

基础图片处理服务提供缩放、裁剪、旋转、格式转换、质量变换、搞死模糊、锐化、水印、EXIF处理等功能。类似的,也是通过在原 URL 后添加自定义参数方式进行转换输出,以缩放为例,接口形如为

download_url?imageMogr2/thumbnail/<imageSizeAndOffsetGeometry>

假设缩放图片宽高为原图 50%,示例如下:

http://examples-1251000004.picsh.myqcloud.com/sample.jpeg?imageMogr2/thumbnail/!50p

华为云

通过示例

https://obs.cn-north-1.myhuaweicloud.com/image-demo/example.jpg?x-image-process=image/resize,w_500,limit_0

可以看出,与阿里云 OSS 基本是一样的。

预备知识

图片的缩放模式

假设图片控件为主,尺寸保持固定,按图片内容与控件的关系,可以分为:
Fit to Frame: 图片按原貌,完全展示在图片控件中,且紧贴控件的最小边;
Fill Frame:用图片填充满图片控件。

不难分析得出,为了达到 Fit 的效果,图片必需保持宽高比例(Aspect To Fit),因此这种现实模式称为AspectFit,此模式下,图片居中于控件,图片控件部分区域(大边两侧)可能会留白。

为了达到 Fill 的效果,可以有两种方法。一种是拉伸图片(Scale To Fill),原图所有内容都会展示在控件中,但是会扭曲。效果如下方所示。

另一种方法是裁剪 (Crop),只有部分内容显示在图片框中,一定会填充满图片控件的最大边,因此还伴随着保持图形的长宽比前提下的放大或者缩小的操作,因此又可称为Aspect To Fill。效果如下方所示。

最后一种是复制平铺了,这已经是多图了,离题了。

抛开图片控件,图片缩放还有一种模式是保持宽高比的前提下,按百分比缩放,我们称为Aspect Percent Scale。

裁剪模式

裁剪图片的某个区域,需要使用偏移坐标x, y和画框大小 w, h,你可能想裁剪左上角的一块,也可能是正中心的一块,也可能是右侧的一块。为了更灵活的指定裁剪位置,引入了新的重力(Gravity)方向这个概念,重力方向包括:NorthWest、 North、 NorthEast、 West、 Center、 East、 SouthWest、 South、 SouthEast,如下图所示。

重力迫使裁剪框和原图和一个方向上对齐,我们用下面几个图例来解释,假设篮框为原图,红框为裁剪输出图。

正的偏移量,表示裁剪框对抗重力,逃离原图,负偏移量表示臣服于重力,裁剪框超重力方向移动。下图分别展示了在 g = SE 下正偏移和负偏移的裁剪。

特殊的,g = Center,指定偏移量的效果与 g = NW 类似,-x 超西移动, -y超北移动。

图片格式

有损 vs 无损图片

文件格式有可能会对图片的文件大小进行不同程度的压缩,图片的压缩分为有损压缩和无损压缩两种。

  • 有损压缩。指在压缩文件大小的过程中,损失了一部分图片的像素信息,并且这种损失是不可逆的,我们不可能从有一个有损压缩过的图片中恢复出和原来一模一样的图片。

  • 无损压缩。只在压缩文件大小的过程中,图片的像素信息没有任何丢失。我们任何时候都可以从无损压缩过的图片中恢复出原来的信息。

索引色 vs 直接色

存储1个像素的颜色所用的位数(bit)成为色阶(Color Depth)。这里所谓的颜色有两种形式,一种称作索引颜色(Index Color),一种称作直接颜色(Direct Color)。

  • 索引色。抽取出图片中的最共性的颜色,放入一个颜色数组。这个颜色在数组的索引就可以用来表代这个颜色。这个数组成为调色板,特定到单个图片。如果是8位索引色,则调色板最多有256种颜色。原来需要24位表示一个像素,现在只需要8位,图片大小得以降低。

  • 直接色。使用四个字节来代表一种颜色,这四个数字分别代表这个颜色中红色、绿色、蓝色以及透明度。现在流行的显示设备可以在这四个维度分别支持 256 种变化,所以直接色可以表示2的32次方种颜色。当然并非所有的直接色都支持这么多种,为压缩空间使用,有可能只有表达红、绿、蓝的三个数字,每个数字也可能不支持 256 种变化之多。

点阵图vs矢量图

  • 点阵图,也叫做位图,像素图。构成点阵图的最小单位是像素,位图就是由像素阵列的排列来实现其显示效果的,每个像素有自己的颜色信息,在对位图图像进行编辑操作的时候,可操作的对象是每个像素,。点阵图缩放会失真,用最近非常流行的沙画来比喻最恰当不过,当你从远处看的时候,画面细腻多彩,但是当你靠的非常近的时候,你就能看到组成画面的每粒沙子以及每个沙粒的颜色。

  • 矢量图,也叫做向量图。矢量图并不纪录画面上每一点的信息,而是纪录了元素形状及颜色的算法,当你打开一付矢量图的时候,软件对图形象对应的函数进行运算,将运算结果[图形的形状和颜色]显示给你看。无论显示画面是大还是小,画面上的对象对应的算法是不变的,所以,即使对画面进行倍数相当大的缩放,其显示效果仍然不失真。

BMP

BitMap 的缩写,是无损的、既支持索引色也支持直接色的、点阵图。这是一种比较老的图片格式。BMP 是无损的,但同时这种图片格式几乎没有对数据进行压缩,所以 BMP 格式的图片通常具有较大的文件大小。虽然同时支持索引色和直接色是一个优点,但是太大的文件格式格式导致它几乎没有用武之地,现在除了在Windows 操作系统中还比较常见之外,我们几乎看不到它。

JPEG

JPEG 是原是指一种标准有的有损压缩方法。现在口头所说的 JPEG 图片实际是指采用 JPEG 方法压缩 JFIF 格式存储的以 jpg 或者 jpeg 为后缀的图片文件。jpeg 是有损的、采用直接色的、点阵图。JPEG 的设计目标,是在不影响人眼可分辨的图片质量的前提下,尽可能的压缩文件大小。这意味着 JPEG 去掉了一部分图片的原始信息,也即是进行了有损压缩。JPEG 的图片的优点,是采用了直接色,得益于更丰富的色彩,JPEG 非常适合用来存储照片,用来表达更生动的图像效果,比如颜色渐变。JPEG 不适合用来存储企业 Logo、线框类的图。因为有损压缩会导致图片模糊,而直接色的选用,又会导致图片文件较大。

GIF

全称 Graphics Interchange Format,采用 LZW 压缩算法进行编码。是无损的、采用索引色的、点阵图。 GIF 是无损的,采用 GIF 格式保存图片不会降低图片质量。但得益于数据的压缩, GIF 格式的图片,其文件大小要远小于 BMP 格式的图片。文件小,是 GIF 格式的优点,同时,GIF 格式还具有支持动画以及透明的优点。但 GIF 格式仅支持 8 bit 的索引色,即在整个图片中,只能存在 256 种不同的颜色。 GIF 格式适用于对色彩要求不高同时需要文件体积较小的场景,比如企业Logo、线框类的图等。因其体积小的特点,现在 GIF 被广泛的应用在各类网站中。

PNG-8

PNG 全称 Portable Network Graphics,PNG-8 是 PNG 的索引色版本。PNG-8 是无损的、使用索引色的、点阵图。PNG-8 最初就是开发出来作为有专利纠纷的 GIF 的替代。除了支持 GIF 已有特性之外,PNG-8 还支持透明度。现在,除非需要动画的支持,否则我们没有理由使用 GIF 而不是 PNG-8。

PNG-24

PNG-24 是 PNG 的直接色版本。PNG-24 是无损的、使用直接色的、点阵图。无损的、使用直接色的点阵图,听起来非常像 BMP,是的,从显示效果上来看,PNG-24 跟 BMP 没有不同。PNG-24 的优点在于,它压缩了图片的数据,使得同样效果的图片,PNG-24 格式的文件大小要比 BMP 小得多。当然,PNG24 的图片还是要比 JPEG、GIF、PNG-8 大得多。虽然 PNG-24 的一个很大的目标,是替换 JPEG 的使用。但一般而言,PNG-24 的文件大小是 JPEG 的五倍之多,而显示效果则通常只能获得一点点提升。所以,只有在你不在乎图片的文件体积,而想要最好的显示效果时,才应该使用 PNG-24 格式。另外,PNG-24 跟 PNG-8 一样,是支持图片透明度的。

SVG

全称 Scalable Vector Graphics,是无损的、矢量图。SVG 跟上面这些图片格式最大的不同,是 SVG 是矢量图。这意味着 SVG 图片由直线和曲线以及绘制它们的方法组成。当你放大一个 SVG图片的时候,你看到的还是线和曲线,而不会出现像素点。这意味着 SVG 图片在放大时,不会失真,所以它非常适合用来绘制企业 Logo、Icon 等。SVG 是很多种矢量图中的一种,它的特点是使用XML来描述图片。借助于前几年 XML 技术的流行, SVG 也流行了很多。使用 XML 的优点是,任何时候你都可以把它当做一个文本文件来对待,也就是说,你可以非常方便的修改 SVG 图片,你所需要的只需要一个文本编辑器。SVG 并非只能绘制简单的Logo类的图片,它可以绘制出精致的图片的。

WebP

WebP 是谷歌开发的一种新图片格式,WebP 是同时支持有损和无损压缩的、使用直接色的、点阵图。从名字就可以看出来它是为Web而生的,什么叫为 Web 而生呢?就是说相同质量的图片,WebP 具有更小的文件体积。现在网站上充满了大量的图片,如果能够降低每一个图片的文件大小,那么将大大减少浏览器和服务器之间的数据传输量,进而降低访问延迟,提升访问体验。在无损压缩的情况下,相同质量的 WebP 图片,文件大小要比PNG 小 26%;在有损压缩的情况下,具有相同图片精度的 WebP 图片,文件大小要比JPEG 小 25%~34%;WebP 图片格式支持图片透明度,一个无损压缩的WebP 图片,如果要支持透明度只需要 22% 的格外文件大小。想象Web上的图片之多,百分之几十的提升,是非常非常大的优化。目前只有 Chrome 浏览器和 Opera 浏览器支持 WebP 格式,所以WebP的应用并不广泛。

GraphicsMagick 功能、参数介绍,以及命令行示例

GraphicsMagick 号称图像处理领域的瑞士军刀。 短小精悍的代码却提供了一个鲁棒、高效的工具和库集合,来处理图像的读取、写入和操作,支持超过 88 种图像格式,包括重要的 DPX、GIF、JPEG、JPEG-2000、PNG、PDF、PNM 和 TIFF。

通过使用 OpenMP 可是利用多线程进行图片处理,增强了通过扩展CPU提高处理能力。

GraphicsMagick 可以在绝大多数的平台上使用,Linux、Mac、Windows 都没有问题。

GraphicsMagick 支持大图片的处理,并且已经做过GB级别的图像处理实验。GraphicsMagick 能够动态的生成图片,特别适用于互联网的应用。可以用来处理调整尺寸、旋转、加亮、颜色调整、增加特效等方面。GaphicsMagick 不仅支持命令行的模式,同时也支持 C、C++、Perl、PHP、Tcl、Ruby 等的调用。事实上,GraphicsMagick 是从 ImageMagick 5.5.2 分支出来的,但是现在他变得更稳定和优秀

安装GraphicsMagick后,以命令行方式调用 gm command [options ...],其中 command 是子命令,options 是相关命令参数,子命令包括:
* convert 对单张或者多种图片进行转换,如缩放、裁剪、转变格式、质量变换
* mogrify 和convert相似,但是默认会覆盖原图
* identify 获取图片的基本信息,如格式、大小、压缩等级、尺寸等
* composite 图片层叠
* montage 拼接图片
* compare 可视化图片差异比较
* conjure 使用MSL脚本进行处理
* batch 进入批量处理模式
* time 计算另一个子命令的耗时
* benchmark 对另一个子命令进行性能测试并返回结果

示例0 安装

# yum install -y gcc libpng libjpeg libpng-devel libjpeg-devel libtiff libtiff-devel freetype freetype-devel jasper jasper-devel  libwebp-devel libwebp gd-devel giflib giflib-devel

# yum install -y GraphicsMagick
gm version
GraphicsMagick 1.3.21 2015-02-28 Q8 http://www.GraphicsMagick.org/
Copyright (C) 2002-2014 GraphicsMagick Group.
Additional copyrights and licenses apply to this software.
See http://www.GraphicsMagick.org/www/Copyright.html for details.

Feature Support:
  Native Thread Safe       yes
  Large Files (> 32 bit)   yes
  Large Memory (> 32 bit)  yes
  BZIP                     yes
  DPS                      no
  FlashPix                 no
  FreeType                 yes
  Ghostscript (Library)    no
  JBIG                     no
  JPEG-2000                yes
  JPEG                     yes
  Little CMS               yes
  Loadable Modules         no
  OpenMP                   yes (200805)
  PNG                      yes
  TIFF                     yes
  TRIO                     no
  UMEM                     no
  WebP                     yes
  WMF                      no
  X11                      yes
  XML                      yes
  ZLIB                     yes

Host type: x86_64-unknown-linux-gnu

Configured using the command:
  ./configure 

...省略

为了支持 JPEG、PNG 等格式图片,必需先安装对应的库,Feature Supportyes的项表示已经开启的特性,可以看到 PNG 、JPEG、WebP、TIFF 格式均以支持。

示例1 获取详细图片基本信息

gm identify -verbose IMG20191209100903.jpg
Image: IMG20191209100903.jpg
  Format: JPEG (Joint Photographic Experts Group JFIF format)
  Geometry: 3120x4160
  Class: DirectClass
  Type: true color
  Depth: 8 bits-per-pixel component
  Channel Depths:
    Red:      8 bits
    Green:    8 bits
    Blue:     8 bits
  Channel Statistics:
    ...省略...
  Filesize: 2.9Mi
  Interlace: No
  Orientation: Unknown
  Background Color: white
  Border Color: #DFDFDF
  Matte Color: #BDBDBD
  Page geometry: 3120x4160+0+0
  Compose: Over
  Dispose: Undefined
  Iterations: 0
  Compression: JPEG
  JPEG-Quality: 95
  JPEG-Colorspace: 2
  JPEG-Colorspace-Name: RGB
  JPEG-Sampling-factors: 2x2,1x1,1x1
  Signature: 9992bc81f6c1b99c4504ef40144a7dd3217805d48fe4e9bf9e9af49c1211f280
  Profile-EXIF: 12966 bytes
    Make: OPPO
    Model: OPPO R7sm
    X Resolution: 72/1
    Y Resolution: 72/1
    Resolution Unit: 2
    Y Cb Cr Positioning: 1
    Exif Offset: 142
    Exposure Time: 1/20
    F Number: 220/100
    ISO Speed Ratings: 878
    Exif Version: 0220
    ...
    Flash: 16
    Focal Length: 3790/1000
    ...
    Color Space: 1
    Exif Image Width: 3120
    Exif Image Length: 4160
    Interoperability Offset: 478
    Exposure Index: 338/1
    Gain Control: 8
    ... 省略GPS信息 ...
  Tainted: False
  User Time: 0.240u
  Elapsed Time: 0m:0.240000s
  Pixels Per Second: 51.6Mi

示例2 按需获取图片信息

gm identify -format "%f\n%e\n%b\n%m\n%w\n%h\n%Q\n%q\n%#" IMG20191209100903.jpg
IMG20191209100903.jpg
jpg
2.9Mi
JPEG
3120
4160
95
8
9992bc81f6c1b99c4504ef40144a7dd3217805d48fe4e9bf9e9af49c1211f280

其中,%Q 是质量信息,%w%h是宽高,%e%m是图片后缀和图片格式,%#是图片有效像素的哈希值。值的注意的是,获取%#都是非常耗时的。

示例3 缩放图片

gm convert cockatoo.jpg -resize 120x120 thumbnail.jpg

表示把原图 cockatoo.jpg 在保持宽高比的前提下缩放到小于 120×120,生成新图片thubmnail.jpg

接口设计

通过观察以上云厂商的现有产品文档,可以看出都是提供对象存储的基础上,在原图 URL 的添加控制参数的方式进行图片实时处理和输出。支持原图格式包括 jpg、jpeg、png、bmp、webp、gif、tiff,支持处理后格式包括 jpg、png、bmp、webp,会限制可处理的原图文件大小,会限制原图宽高,gif 图会限制大小和帧数,超过限制或处理失败降级返回原图。
控制参数设计上,我们觉得阿里云的设计更规整,更容易拓展,因此参考设计为

x-process=cmd1:param1_value1,param2_value2/cmd2:param3_value3

其中 cmd 为操作,作为示例,只支持缩放 resize、格式转换 format、裁剪 crop、质量变换 quality。param_value 为参数名值对,多对参数间用,分隔,多个 cmd 直接用 / 分隔。

假设原图为1280×900 的 png,示例

/img/get?fileId=5ceba7e7b6238e31272dc506&x-process=resize:m_aspectFit,w_800,h_800/quality:q_90/format:t_jpg

表示保持宽高比缩略到大不于 800×800 的jpg,质量为原来的 90%。

各操作的参数设计如下。

缩放 resize

参数 长参名 必要 说明 默认
m mode Y 缩略模式
* aspectFit 按最大边缩略,保持宽高比,确保不超出wxh矩形区域,可能不完全填充,不放大
* aspectFill 按最小边优先缩略,填充,保持宽高比,超出矩形区域时居中裁剪,不放大
* scaleToFill 拉伸填充,填充,不保持宽高比
* aspectPercent 按百分比缩放,保持宽高比
aspectFit
w width Y 宽度,最大4096
h height Y 高度,最大4096
p percent N 百分比,只在aspectPercent,大于0

裁剪 crop

参数名 长参名 必要 说明 默认
w width Y 输出图宽
h height Y 输出图高
x x offset Y 反重力的横向偏移量 0
y y offset Y 反重力的纵向偏移量 0
g gravity N 重力向,取值
nw n ne
w c e
sw s se
nw

质量变换 quality

参数 长参名 必要 说明 默认值
q relative quality N 相对质量,只对jpg有效
假设原图jpg, 质量为90,令q=90,则输出图质量为81
Q absolute quaity N 绝对质量
假设原图为jpg, 质量为80,令Q=90,则输出图质量为90,图片大小会变大
假设原图为png,则Q取值从1到1009,其中Q/10为zlib压缩等级(0最快-100最慢),Q%10为过滤类型
png:75(压缩等级7;过滤类型5,自适应)
jpeg:原图质量系数

q和Q同时有时,优先采用q

格式转换 format

参数 长参名 必要 说明 默认
t type N 输出格式,目前仅支持格式:jpeg、jpg、png、bmp、webp、gif、tiff 原图格式

限制:
* 输出图去掉了除EXIF以外的所有profile信息
* 最大原图20M

如何构建图片处理程序

思路很直接,大致分为以下几步:

  1. 参数解析和校验
  2. 下载原图
  3. 根据 x-process 的参数值,生成对应的 GraphicsMagick 命令行
  4. fork 子进程,运行 gm 命令
  5. 输出原图

如何提升单机程序性能

缓存缩略图 + 只生成一次

短时间内(具体多长要看应用和场景)大量用户对同一张缩略的疯狂访问是必然的。因此缓存缩略图显得很有必要,尤其对缩略耗时较长的图,能有效减少请求耗时。对相同参数,多次计算产生的缩略图必然相同,因此建立参数和结果图的关系,下次请求时,就可以用参数直接找到结果图了。一种直观的方法是把参数规则后作为结果图的文件名,如

5de873920d5ce4000c92f0b6+crop-w_200,h_200,x_-10,y_-10,g_c.jpeg

表示原图id为 5de873920d5ce4000c92f0b6,缩略参数为 x-process = crop:g_c,w_200,h_200,x_-10,y_-10

笔者实际项目中,缓存命中率在5成左右。

针对某一个生产参数,首先检查文件系统内是否有缩略图,有的话直接返回。否则使用JVM内的锁,控制只有一个请求线程去生产缩略图,其他请求等待。实例代码。

if (shouldDownload) {
         // 获取锁,并标记文件需要生产
            LockingHelper.gainLockOf(outputFilePath);
            if (lock != null) {
                try {
                   // 加锁
                    lock.lock();
                    // 再次检查是否需要下载
                    if (LockingHelper.containLockOf(outputFilePath)) {
                        try {
                            // TODO 生产缩略图,保存到磁盘
                        } catch (Exception e) {
                            ...
                        } finally {
                            // 清除下载标记
                            LockingHelper.releaseLockOf(outputFilePath);
                        }
                    }
                } finally {
                   // 解锁
                    lock.unlock();
                }
            }
        }

等待的线程拿到锁后,还需要再次检查是否需要下载,双重检查的典型套路。这里的锁粒度是缩略图的输出路径(outputFilePath),即粒度细到具体控制参数。缩略到 100×100 和缩略到 200×200 是可以并发执行的,不会相互影响。

缓存原图 + 只下载一次

一张原图在产生后,会很快被多端多界面多用户拉取到,此时会并发的请求不同尺寸的缩略图,它们都需要使用到原图,因此图片处理服务只拉取一次原图,并进行缓存,能一定程度上的提升性能。
当某个实例接收到并发的多个对同一原图的缩略请求,首先检查文件系统内是否有原图,有的话进行下一步,否则获取锁进行下载,如上面所述相似,锁粒度到fileId。

写文件优化

首先介绍文件系统中文件与目录的一些基础知识:
1. 目录实际上是一个记录文件名和 inode 对应关系的映射文件
2. 单个目录下的文件越多,这个映射文件越大,需要的读取次数就越多(一般系统调用会每次读 32k 或者类似的量级)

假设有 1M 个文件,一级目录使用两个字母或数字,可以有 (26 + 10)^2 个二级目录,也就是 1296 个目录,每个目录名两个字节,加上 inode 和其他一些消耗, 10-20 字节完全够用,一次读取就能获得所有二级目录;而二级目录平均是 772 个文件,一次读取也能完成,总共两次读取,找到缓存文件,而如果把 1M 个文件放在一个目录下,如果每个记录 32 字节,需要 1000 次读取。

因此作为一个优化,最好构建一个多级的目录,用来保存原图和缩略图,尽可能的减少每一个目录里的子文件/子目录的个数,文件均匀分布,层级也不宜过深,具体如何平衡要根据自己业务特性和可利用条件决定。举例可以用 fileId 中的变化较大的四位,外加原图上传日期,构造了一个4级的路径。

20191205/ab/cd/5de873920d5ce4000c92f0b6+crop-w_200,h_200,x_-10,y_-10,g_c.jpeg

进程池

每次生产图片是都去fork gm 子进程,不仅代价大,而且在高并发的情况下,容易造成子进程数过大(处理网络请求的线程数比较大,因为fork出的子进程也会很多),系统负载飙高,上下文切换频繁。因此控制高峰期的子进程数在一定范围,可以保护系统自身。
前文提到 gm 有一个 batch 批量模式,运行在此模式下在 gm 进程,会一直读取标准输入,逐行接收命令及参数,实时进行处理。

# gm batch -feedback on

...省略...

GM> identify -format "%f\n%e\n%b\n%m\n%w\n%h\n%Q\n%q" IMG20191209100903.jpg
IMG20191209100903.jpg
jpg
2.9Mi
JPEG
3120
4160
95
8
PASS
GM> convert IMG20191209100903.jpg -resize 200x200 IMG20191209100903_200.jpg
PASS
GM> 

因此我们可以预先 fork 一批 gm 子进程,每次要运行命令时,从子进程池中挑选一个子进程,通过 pipe 这种 IPC 方式,发送命令。这又是一直常见的技术的套路:池化,一般的,最大活跃进程数、最大空闲进程数、最小空闲进程数、获取等待时间都可参数化控制。具体架构示意图如下。

获取等待时间不宜设置太大,在获取不到子进程时,可以降级直接返回原图。另一方面,要控制可接受的原图大小和尺寸,实践表明,大图处理耗时极大,瞬时多张大图容易占用完子进程,因此大图直接降级为输出原图即可。
最大活跃进程数控制在 CPU 核数的 3/4 左右,留出一些给 Web 线程。
最小空闲数设置在 1/4 的总 CPU 核数,能应对低峰期的小波的突发请求。

gm 参数优化

  • -size wxh ,提示 JPEG 解码器只需返回指定大小的像素,在大尺寸原图缩小时有优化效果
  • 开启OpenMP,并使用系统变量 OMP_NUM_THREADS 或者命令参数 -limit Threads n 控制每个gm进程的线程数
  • +profile '!EXIF,*',删除结果图的 profile,可以一定程序减小结果图的文件大小,EXIF包含图片方向,实际生产中按需保留
  • -strip 删除结果图的 profile 和其他信息,可以一定程序减小结果图的文件大小
  • 其他参数

HTTP Cache Control

使用HTTP的Cache Control特性,能有效的对服务端的请求,减少下载流量,减少计算。具体就是在响应时输出一系列的Header:
1. Cache-Control: max-age=$$
2. Expires: $$
3. Last-Modified: $$
4. ETag: $$ 。

而在接收到请求时,比较两个头部:
1. If-Modified-Since: $$
2. If-None-Match: $$ 。

因此可以缓存每张缩略图的宽高、大小、修改时间、ETag、文件哈希值等元数据到类似LevelDB、RocketDB等本地缓存。

其他手段

可能借助物理硬件加速,如某些云厂商就提供了「FPGA图片转码加速服务」。

如何构建图片处理集群

集群架构如下图所示。

使用 Nginx作为负载均衡,反向代理负载到后端的图片处理实例。当请求到来时,根据请求中的?fileId=xxx使用一致性哈希的方式分发到某一个实例。

为什么使用一致性哈希这种负载均衡算法?
* 首先对于同一张原图的所有处理请求,一定会落到同一个实例,可以充分利用该实例已有的原图缓存,结果图缓存,避免重复生成,减少整个集群的磁盘消耗和计算消耗。一致性哈希负载提高缓存的命中率,减少计算。
* 使用虚拟节点的一致性哈希算法在物理节点时候不可用时,数据和请求迁移更少。在某个实例不可能用时,只有部分请求会被转移到新节点处理,只有图片需要重新生产。

在 Nginx 还开启了proxy buffer功能后,会缓冲后端节点的响应内容。开启此功能后,业务实例快速完成图片处理,把响应交给 Nginx,快速释放业务线程的占用,而 Nginx 可以慢慢的完成与客户端的文件传输。proxy buffer 对文件服务这类响应较大的业务来说必不可少,是低速公网与高速内网的差速齿轮。

如何控制磁盘消耗

通过定时删除原图和缩略,可以控制磁盘消耗。大部分的业务场景都有热点效应,用户只会查看最近产生的内容,因此可以放心大胆的删除较早前的文件。
一方面控制缓存的时长,另一方面调整定时任务的频次。具体使用怎么的缓存淘汰策略就不在这里说太多,一句话:看业务。
另一个方面是减少缓存量,比如对生成速度快、文件小的图片不缓存,每次都直接生成。

安全问题

通过 URL 方式指定参数,很容易被通过修改参数的方式攻击,可以建立一组特定的样式(style),每个样式对一组常用参数进行封装,对外只以样式名方式访问。例如建立样式 smallmediumbig 分别指代三个常用尺寸的缩略图。

small -> m_aspectFit:w_100,h_100,q_90
medium -> m_aspectFit:w_400,h_400,q_90
big -> m_aspectFit:w_1280,h_1280,q_90

/img/get?fileId=$$&style=big

一点程度上也可以减少URL变得冗长,便于管理与阅读。

另一方面是防止盗图,控制Referer。

总结

前文首先讲解了如何设计图片处理功能,然后介绍了实现以及需要优化的点,最后如何集群化。大概总结一下为了提高性能的关键点
1. 减少对服务端的请求
2. 减少生成
3. 充分利用缓存(原图、结果图、元数据、集群定向)
4. 利用可控进程池控制系统不过载
5. 利用proxy buffer避免慢客户端影响
6. gm参数调整
7. 减少目录大小。
8. 锁粒度控制

使用Micrometer进行业务指标上报

[TOC]

什么是指标

指标,metric,是指对事物的度量。通常是对事物的属性进行采样测量,记录下属性的值。采样结果记录为:<指标名,时间戳,值>,例如对当前大气温度的测量<weather.temperature, 2018-07-25T00:00:00Z, 30>。但上面的记录很笼统,无法反应是哪个地区的气温,无法满足按省市区进行汇总、细化的统计分析(即多维分析)的需求。因此一般地,会把记录格式拓展为<指标名,标签集,时间戳,值>,标签集是形如city: shenzhen, district: nanshan这样的代表维度的标签名值对。值可以是整数、浮点数、字符串。大多数的指标存储系统(时序数据库)只支持数值类型的值,因此本文也只讨论数值类型。

指标上报/表示格式

时序数据库是以<指标名,标签集,时间戳,值>作为一行存储一次采样的数据。类似以下:

sys.cpu.user host=webserver01        1356998400  50
sys.cpu.user host=webserver01,cpu=0  1356998400  1
sys.cpu.user host=webserver01,cpu=1  1356998400  0
sys.cpu.user host=webserver01,cpu=2  1356998400  2
sys.cpu.user host=webserver01,cpu=3  1356998400  0
...
sys.cpu.user host=webserver01,cpu=63 1356998400  1

实际时序数据库的存储并非以上述文本格式,比上面要复杂得多。

指标定义

指标定义是指指标的元数据,用于帮助对指标数据进行解读。一般包括:
* metric_name,指标名,如web.response.size
* type,指标类型, 如Gauge/Counter等
* interval,上报(采样)间隔,
* unit_name,字符串形式单位,如Byte/Bit
* description,指标的描述
* tags,指标支持的标签(及取值范围),如app.web.request支持的tag可能包括area,host,instance分表机房、主机和实例

指标类型

根据指标监控的对象个数、监视目的和含义的不同,指标类型大体可以分为Gauge、Counter、DistributionSummary、Timer四种。

Gauge 刻度

1a7f4299-8026-4197-835f-52ad109277b1
Gauge是监视单个对象属性的当前状态值,时序数据库存储Guage指标的每次采样记录(时间,值)。

sys.load host=webserver01,cpu=0  1356998400  11
sys.load host=webserver01,cpu=0  1356998401  21
sys.load host=webserver01,cpu=0  1356998402  15
...

sys.cpu.user host=webserver01,cpu=0就是webserver01的0号cpu的负载属性。

Gauge类型的指标值是随时间连续变化的。Gauge指标的时间序列图(非采样)类似下图,可以看出数值随时间上下波动,而且是连续的。
gague_unsmapled_ts
像CPU当前负载、消息队列中当前的消息数、JVM当前使用内存大小,这些都是典型的Gauge指标。Gauge是对单个对象属性的度量。

聚合指标

聚合指标是对多个对象的度量进行过统计处理的后产生的指标。

Counter 计数器

计数就是监视对象、事件发生的次数。其中一种办法就是每次检测到一个对象就上报一个记录,那么就是如下

web.request.accept host=host1 1356998400 1
web.request.accept host=host1 1356998401 1
web.request.accept host=host1 1356998401 1
web.request.accept host=host1 1356998402 1

指标值1代表一次请求,可以看出同一时间(戳)可能有多次请求发生。通过sum()就可以计算出总数。
另一种办法是利用一个计数器,记下所有请求的累计数,再对这个计数器采样上报,这就是Counter类型。
419SoKslhU

如果把Counter的值进行上报(不清零),展示出来会得到下图的效果。
counter_ts
Counter类型的指标值是随时间单调递增,而且是跳跃式增加的。像nginx启动以来一共处理的请求数、用户认证服务中认证不通过的次数,这些都是Counter指标。

Counter指标本质上是对象个数聚合后产生的Gauge指标。Gauge数值有升有降,而Counter只数值是一直上升的。

所有聚合指标都是有意义的Gauge指标。

Counter指标又可以分为两种:每次上报后清零,每次上报后不清零。

复合指标

复合指标是对一组对象\事件的属性的复合统计。在上报和存储时是以多个聚合指标联合构成的。
以web服务处理请求为例,一个请求响应就是一个事件,响应时间就是监控的指标。如果把所有请求的响应时间上报后进行展示,会得到下图这样的散点图。
QQ20180725-134113@2x
这样的散点图无法看出某一时间段的请求数量,无法看出平均响应时间,无法知晓大于1s和小于1s的响应的数量。也就是散点图无法以一个数值来说明事件发展的整体趋势。因此我们需要对这些响应先进行预处理(统计),生产成多个新的指标,逻辑上新指标都是原指标的组成部分。

DistributionSummary

DistributionSummary是用于跟踪事件的分布情况,有多个指标组成:
* count,事件的个数,聚合指标,如响应的个数
* sum,综合,聚合指标,如响应大小的综合
* histogram,分布,聚合指标,包含le标签用于区分bucket,例如web.response.size.historgram{le=512} = 99,表示响应大小不超过512(Byte)的响应个数是99个。一般有多个bucket,如le=128,le=256,le=512,le=1024,le=+Inf等。
每个bucket展示为一条时间序列,会得到类似下面的图。
QQ20180725-184823@2x

  • percentile(quantile),百分位数,聚合指标,包含percentile标签用于区分不同的百分位,例如web.response.size.percentile{p=90) = 512,表示90%的响应大小都小于512。一般有多个percentile,如p50,p75,p90,p99。
    每个百分位展示为一条时间序列,会得到类似下面的图。
    QQ20180725-191921@2x

Timer

Timer是DistributionSummary的特化,专门用于计时类的指标,可以对记录的时间值(duration)进行单位换算。

指标怎么展示?

采样什么的图标进行展示指标要根据指标的实际意义和使用场景决定。

展示最新

即只展示最新的采样点的数值。如下图展示进程的累计运行时长
QQ20180725-161758@2x

展示历史

时间序列,是指唯一指标名+唯一标签名值组合的所有采样数据点组成的序列。一般采用折线图,横坐标表示时间,纵坐标表示数值,一个图上可以展示多条时间序列,方便对比。
key-pg-classi
通过time shift,可以再同一个图展示一个时间序列的两个时间段。下图展示今日(红色)和昨日(蓝色)的接口访问量。
QQ20180725-163952@2x

Counter与Rate

Counter类型(不清零型)的时间序列,如果不做处理,展示出来就是一个单调增长的折线,如下图所示。
counter_ts
实际生产中我们希望知道某个时间段的请求速率,因此需要做上述时间序列做转换。通常是对时间段(period)第一个数据点和最后一个数据点进行delta计算,delta值作为该时间段的rate。转换后的时间序列类似下图,可以看出每天早上7点左右请求数有明显的增加。
QQ20180725-163952@2x
时序数据库提供rate()聚合函数用户上述需求的计算,rate()实现比delta要复杂很多,要处理计数器清零、反转、数据点丢失等情况。rate可以看做是原时间序列函数的一阶求导。

什么是业务指标?

按指标的来源进行分类。纵向实现了自底向上各层级的监控,包括网络、服务器、系统层、应用层、业务层,如下图所示:
836418C4-0D04-4A6F-AD92-21DF75E819A5
* 网络监控,包括机房出口VIP是否存活,流量是否正常,机房间专线流量和质量是否正常,以及网络设备及流量是否正常等
* 服务器监控,包括服务器是否宕机,服务器硬件是否有异常等。
* 系统监控,包括CPU、内存、磁盘、网卡等各项指标。
* 应用监控,包括:端口连通性,进程存活,QPS,连接数等指标。常用的开源中间件如nginx、redis、mysql等的监控数据也算是这一类的。
* 业务监控,包括业务关心的各项指标,例如登录验证成功/失败次数、某类型业务错误的次数、订单数等,业务指标通常与业务逻辑需求有关。

怎么上报业务指标?

通过在项目中使用micrometer库进行打点,由micrometer异步上报到监控系统。下面以使用spring boot 1.5的api-gatway项目为例。

micrometer简介

  • spring boot 2的默认指标库
  • 支持指标的多维,即Tag
  • 支持上报到Atlas、Graphite、Influx、Prometheus、Datadog等监控系统,可以看做Metric版的SL4J,屏蔽了各监控系统命名差异、Counter上报方式差异、percentile计算差异。
  • 支持Gauge、Counter、DistributionSummary、Timer(LongTimer)等Metric
  • 内置了一些指标,如JVM指标
  • 提供大量针对后端监控系统的spring boot starter,配置简单,文档齐全

引入micrometer包

    <properties>
        <micrometer.version>1.0.5</micrometer.version>
    </properties>

    <dependencies>
        ...
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-spring-legacy</artifactId>
            <version>${micrometer.version}</version>
        </dependency>
        <dependency>
            <groupId>com.yunzhijia.devops</groupId>
            <artifactId>micrometer-registry-falcon</artifactId>
            <version>1.0.5</version>
        </dependency>
        ...
    </dependencies>

    <repositories>
        ...
        <repository>
            <id>nexus-release</id>
            <url>http://192.168.0.22/nexus/content/repositories/cloudhub-release</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        ...
    </repositories>

io.micrometer:micrometer-spring-legacy是micrometer官方提供的spring boot 1.5的stater。com.yunzhijia.devops:micrometer-registry-falcon是自定义的micrometer插件,专门用于按格式要求上报到云之家的监控系统。

配置

application.properties里配置云之家的监控系统的上报接口的地址、上报时间间隔。

management.metrics.export.falcon.url=http://localhost:6060/api/push
management.metrics.export.falcon.hostTag=instance
management.metrics.export.falcon.step=30s

新建MicrometerConfig.java

@Configuration
public class MicrometerConfig {

    private static Logger logger = LoggerFactory.getLogger(MicrometerConfig.class);

    @Value("${spring.application.name}")
    private String appName;

    private String getInstanceIp() {
        String addr = null;
        try {
            InetAddress ip = InetAddress.getLocalHost();
            addr = ip.getHostAddress();
        } catch (UnknownHostException e) {

        }
        logger.info("ip addr: {}", addr);
        return addr;
    }

    @Bean
    MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
        return registry -> {
            registry.config().commonTags("app", appName);
            String addr = getInstanceIp();
            {
                registry.config().commonTags("instance", addr);
            }
            registry.config().meterFilter(
                    new MeterFilter() {
                        @Override
                        public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) {

                            if (id.getType() == Meter.Type.TIMER) {
                                return DistributionStatisticConfig.builder()
                                        .percentilesHistogram(true)
                                        .percentiles(0.5, 0.75, 0.90)
                                        .sla(Duration.ofMillis(50).toNanos(),
                                                Duration.ofMillis(100).toNanos(),
                                                Duration.ofMillis(200).toNanos(),
                                                Duration.ofMillis(500).toNanos(),
                                                Duration.ofSeconds(1).toNanos(),
                                                Duration.ofSeconds(2).toNanos(),
                                                Duration.ofSeconds(5).toNanos(),
                                                Duration.ofSeconds(10).toNanos())
                                        .minimumExpectedValue(Duration.ofMillis(1).toNanos())
                                        .maximumExpectedValue(Duration.ofSeconds(10).toNanos())
                                        .build()
                                        .merge(config);
                            } else {
                                return config;
                            }
                        }
                    });
        };
    }

    @Bean(name = "requestCounter")
    Counter requestCounter() {
        return Counter.builder(MetricDef.C_REQUEST)
                .register(Metrics.globalRegistry);
    }

    @Bean(name = "methodTypeRpcCounter")
    Counter methodTypeRpcCounter() {
        return Counter.builder(MetricDef.C_METHOD_TYPE).tags("type", "rpc")
                .register(Metrics.globalRegistry);
    }

    @Bean(name = "methodTypeHttpCounter")
    Counter methodTypeHttpCounter() {
        return Counter.builder(MetricDef.C_METHOD_TYPE).tags("type", "http")
                .register(Metrics.globalRegistry);
    }

上述代码配置了
1. 所有Metric的公共标签app和instance,app表示来自哪个服务,instance表示来自服务的实例(所有服务端口不重叠,故用IP表示实例)。
2. 对Timer进行设置,percentilesHistogram(true)表示上报分布的所有bucket;统计和上报的百分位数包括0.5,0.75,0.90;sla()额外添加了[50ms,100ms,200ms,500ms,1s,2s,5s,10s]这些bucket;并限定统计的耗时范围在0至10s之间。
3. 定义了3个不同的Counter,总请求数,rpc和http类型接口的请求数。定义为bean,之后利用Spring注入的方式可以减少Counter对象的查找耗时。

打点统计

在相应的业务代码文件引入counter,并打点。

@Component
public class MethodExistedPreFilter extends ZuulFilter{
    ...

    @Autowired
    @Qualifier("methodTypeRpcCounter")
    Counter methodTypeRpcCounter;

    @Autowired
    @Qualifier("methodTypeHttpCounter")
    Counter methodTypeHttpCounter;

    ...

    @Override
    public Object run() {

        ...
        if (StringUtils.equalsIgnoreCase(routeVo.getProtocol(), "rpc")) {
            methodTypeRpcCounter.increment();

        } else {
            methodTypeHttpCounter.increment();
        }
        ...
    }

    ...

}

QQ20180725-194449@2x

什么不该/不需要上报?

  • 内部服务之间的相互调用的次数、耗时不需要上报。因为已经由PinPoint统一进行采集和上报了,指标监控系统回去PinPoint拉取这些数据。
  • 如果tag值是无限的,不能上报。比如你可能想按appId统计接口调用次数,don’t do that.

mac使用virtualbox和cloudera manager搭建大数据集群

装备集群

利用virtualbox,创建4个guest虚拟机,一个作为管理机,其他3个作为worker。虚拟机网络设置为NAT + HostOnly。NAT保证虚拟机能访问WAN,但是虚拟机之间无法通信。Host Only保证Mac和4个虚拟机之间处在一个网络192.168.56.0/24,可以相互通信。

  • mac 192.168.56.1 安装virtualbox
  • master 192.168.56.10 虚拟机,安装cm server
  • app1 192.168.56.11 虚拟机,安装cm agent
  • app2 192.168.56.12 虚拟机,安装cm agent
  • app3 192.168.56.13 虚拟机,安装cm agent

安装cloudera manager

在master上运行

# wget http://archive.cloudera.com/cm5/installer/5.15.0/cloudera-manager-installer.bin
# chmod u+x cloudera-manager-installer.bin
# ./cloudera-manager-installer.bin

有些包很大,可能会卡很久,或者下载很慢。可以手动下载到/var/cache/yum/x86_64/6/cloudera-manager/packages,然后重新运行./cloudera-manager-installer.bin

最好先手动下载
– oracle-j2sdk1.7-1.7.0+update67-1.x86_64.rpm
– cloudera-manager-agent-5.15.0-1.cm5150.p0.62.el6.x86_64.rpm
– cloudera-manager-server-5.15.0-1.cm5150.p0.62.el6.x86_64.rpm
– cloudera-manager-daemons-5.15.0-1.cm5150.p0.62.el6.x86_64.rpm
然后cp/scp到四个虚拟机的/var/cache/yum/x86_64/6/cloudera-manager/packages下。

下载
– CDH-5.15.0-1.cdh5.15.0.p0.21-el6.parcel
– CDH-5.15.0-1.cdh5.15.0.p0.21-el6.parcel.sha1

扔到master的/opt/cloudera/parcel-repo

配置集群

访问http://192.168.56.10:7180/ ,账号admin,密码admin,选择一些配置选项,加入机器,填写ssh用户名密码(所有机器使用同样账号密码,关闭iptables和selinux)。然后就开始安装jdk、cm agent。

使用maven解决微服务开发中的一些日常问题

1.多模块

微服务是按业务边界划分的,通常包含一个可供调用的服务和一些异步后台进程、定时任务,这些运行实际不同但又紧密的关联的进程一起完成这个业务的数据处理。因此可以把一个微服务拆分为多个模块,如api定义模块(以dubbo这类rpc微服务为例,通常会把服务接口构建为一个jar包)、服务实现模块、定时任务模块。而服务实现和定时任务会共用一些工具类代码,又可以提取为一个模块。
在工作经历中,见过一些人把服务api放在一个项目里,服务实现放在一个项目里,自动任务放在一个独立的项目里,且三个项目都是独立的git项目。每个模块使用独立的pom.xml,则很多共同的依赖和插件配置就需要写多次,在修改版本时难免出现不一致。
其实利用maven的「继承与聚合」功能,是一种更优雅的多模块组织方式。既可以把多个模块放在一个目录里管理(一个git项目,方便查看完整的变更历史),又解决多个模块依赖和插件管理问题,同时还能解决一条命令同时构建多个模块的问题。

以用户服务为例,项目结构可以组织成如下形式:

user
├── pom.xml
├── user-api
│   ├── pom.xml
│   └── src
│       └── main
│           └── java
└── user-service
    ├── pom.xml
    └── src
        └── main
            ├── java
            └── resources

user目录作为是一个父模块,而user-api和user-service是user模块的子模块,子模块和父模块以嵌套方式组织。
user/pom.xml的内容如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>wingyiu</groupId>
    <artifactId>user</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>pom</packaging>
    <name>User Parent</name>

    <modules>
        <module>user-api</module>
        <module>user-service</module>
    </modules>

    <properties>
        <java.version>1.8</java.version>
        <java.source.version>1.8</java.source.version>
        <java.target.version>1.8</java.target.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>1.5.14.RELEASE</spring-boot.version>
        <dubbo.version>2.6.2</dubbo.version>
        <dubbo.starter.version>0.1.1</dubbo.starter.version>
        <zkclient.version>0.2</zkclient.version>
        <zookeeper.version>3.4.9</zookeeper.version>
        <curator-framework.version>2.12.0</curator-framework.version>
        <!-- Build args -->
        <argline>-server -Xms256m -Xmx512m -XX:PermSize=64m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8
-Djava.net.preferIPv4Stack=true
        </argline>
        <arguments/>

        <!-- Maven plugins -->
        <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
        <maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>
        <maven-source-plugin.version>3.0.1</maven-source-plugin.version>
        <maven-jacoco-plugin.version>0.8.1</maven-jacoco-plugin.version>
        <maven-gpg-plugin.version>1.5</maven-gpg-plugin.version>
        <apache-rat-plugin.version>0.12</apache-rat-plugin.version>
        <maven-release-plugin.version>2.5.3</maven-release-plugin.version>
        <maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <!-- Spring Boot -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot</artifactId>
                <version>${spring-boot.version}</version>
            </dependency>
            ...省略
        </dependencies>
    </dependencyManagement>

    <repositories>
        <repository>
            <id>nexus-snapshot</id>
            <url>http://192.168.0.22/nexus/content/repositories/gayhub-snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>false</enabled>
            </releases>
        </repository>
        <repository>
            <id>nexus-release</id>
            <url>http://192.168.0.22/nexus/content/repositories/gayhub-release</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
        <!-- Repositories to allow snapshot and milestone BOM imports during development.
            This section is stripped by the flatten plugin during install/deploy. -->
        <repository>
            <id>central</id>
            <url>https://repo.maven.apache.org/maven2</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
        ...省略
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>central</id>
            <url>https://repo.maven.apache.org/maven2</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
        ...省略
    </pluginRepositories>

</project>

其中
<packaging>pom</packaging>
表明这是一个聚合模块,

<modules>
    <module>user-api</module>
    <module>user-service</module>
</modules>

modules元素里声明了两个聚合模块。
从聚合模块运行man clean install时,maven会首先解析觉和模块的POM,分析要构建的模块,并计算出一个反应堆构建顺序,然后依次构建各个模块。即实现了一次构建多个模块。
properties原理声明了共用的一次属性,子模块可以通过${spring-boot.version}这样的形式引用。

<dependencyManagement>
    <dependencies>
    </dependencies>
</dependencyManagement>

声明了共同的依赖,子模块可以继承这些依赖配置,dependencyManagement下生命的依赖不会实际引入,不过它能约束子模块dependencies下的依赖使用。同理

<build>
    <pluginManagement>
        <plugins>

帮助子模块进行插件管理。

子模块user-api的pom.xml内容如下


<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>wingyiu</groupId>
        <artifactId>user</artifactId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>user-api</artifactId>
    <packaging>jar</packaging>
</project>

其中, 元素指明了其父模块为user。不难发现user的POM,既是聚合POM,又是父POM,实际项目中经常如此。子POM并没有声明groupId和version,因为子模块隐式地从父模块继承了这两个元素。由于user-api仅包含以下interface定义,DO定义,没有依赖,因此pom文件没有任何dependency。模块打包方式为默认的jar。


<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>user</artifactId>
        <groupId>wingyiu</groupId>
        <version>1.0.0-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>

    <artifactId>user-service</artifactId>
    <packaging>jar</packaging>

    <properties>
        <micrometer.version>1.0.5</micrometer.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
        </dependency>

        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-prometheus</artifactId>
            <version>${micrometer.version}</version>
        </dependency>

    </dependencies>
</project>

同理user-service的pom中parent指明了父模块为user。特别的

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot</artifactId>
        </dependency>

并没有制定version,还剩去了依赖范围。这是因为完整的依赖声明已经包含在父POM中,子模块只需配置groupId和artifactId就能获得对应的依赖信息。这种依赖管理机制似乎不能减少太多pom配置,但是能够统一项目范围的依赖版本。有父模块统一声明version,子模块不声明version,就不会发生过个子模块使用的依赖版本不一致的情况,这可以帮助降低依赖冲突的几率。

2.只构建并发布api包

为了加快项目进度,通常服务提供方和调用方会先定好接口定义(包括接口名,参数),调用方先以约定好的接口进行逻辑编写,等提供方正在实现好之后再进行联调。在这种情况下,就会要求先发布jar包。
此时可以利用maven的反应堆裁剪功能。反应堆是指所有模块组成的一个构建结构。对于多模块项目,反应堆包含了模块之间继承与依赖关系,从而能够自动计算出合理的模块构建顺序。模块间的依赖关系会将反应堆构成一个有向非循环图(DAG)。
默认情况,在聚合模块执行mvn clean install会执行整个反应堆,maven提供了很多命令行选项支持裁剪反应堆:
– -am, –also-make,同时构建所列模块的依赖模块
– -amd, –also-make-dependents,同时构建依赖于所列模块的模块
– -pl,–projects 构建指定的模块,模块间用逗号分隔
– -rf, –resume-from 从指定的模块恢复反应堆

因此要单独构建api模块,可以运行如下命令
mvn clean install -pl user-api

3.开发中的接口不断地改变

这种情形很常见,一个开发中的接口,参数不断的调整,此时调用方也要不断的修改,即调用方需要不断的更新依赖服务的api jar包。一般地,maven把依赖从远程仓库拉下来后会进行缓存。因此经常会出现要手动删除本地缓存包,或者增加版本号,但服务还在开发不断增加版本号显然不合适。
maven提供了更加简单有效的解决方法:SNAPSHOT,快照版本。当依赖版本为快照版本的时候,maven会自动找到最新的快照。即每次都会去仓库拉去元数据,计算真实的时间戳格式版本号,然后拉取对应的jar,保证本地使用到最新的jar。

注:默认情况下,maven每天检查一次更新(有仓库udatePolicy控制),用户也可以用-U参数强制检查更新,如mvn clean install -U

因此,快发中,我们可以把api模块(上例user-api)的版本号设为1.0.0-SNAPSHOT

发布过程中,maven自动为构件打上时间戳,如1.0.0-20180720.221414-13。

4.更新所有模块的版本号

当旧版本a.b.c已经发布,要进行下一轮迭代时,要把父模块版本号改为a.b.d-SNAPSHOT,测试通过后正式发布,要改为正式版本号a.b.d。由于每个子模块的<parent><version>都要于父POM一致,手动修改每个pom.xml显然很容易出错。

可以使用org.codehaus.mojo:versions-maven-plugin插件

5.使用企业内部私有maven仓库

假设以nexus搭建私有仓库,snapshot版本和正式版本的构件发布到不同仓库,可以在父POM中配置repository

<repositories>
        <repository>
            <id>nexus-snapshot</id>
            <url>http://192.168.0.22/nexus/content/repositories/gayhub-snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
            <releases>
                <enabled>false</enabled>
            </releases>
        </repository>
        <repository>
            <id>nexus-release</id>
            <url>http://192.168.0.22/nexus/content/repositories/gayhub-release</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
</repositories>

<repository><id>是和$MAVEN/settings.xml中的<server><id>对应的,<server>配置repository的账号和密码。

出于规范考虑,开发人员可以自由提交snapshot版本构件,正式版本构件只能在测试和审核后才能由发布系统进行提交。因此release仓库的密码不宜开放给开发人员,开发人员只能得到snapshot仓库的账号密码。

6.快速开发新的微服务

可以利用maven的archtype功能。首先根据代码规范,编写一个包含基本的、可复用逻辑的微服务,以此作为模板,把模板项目代码中的一些类名、变量名、文件名使用模板变量替换,然后编写自定义archtype 插件。利用此插件,运行时输入模板变量值就可以生产新的微服务代码了。

7.接口文档编写与更新

参考swagger-maven-plugin,swagger可以利用maven插件,解析接口的类名、方法名、参数等,生成一个可以部署的接口文档站点。

进一步的,代码可以利用JSR 303 Bean Validation的注解,标注参数的要求。插件解析这些注解,生产的文档包含完整的参数要求信息,如是否必填、类型、最大最小值、长度、可选枚举值等。

不同于swagger-maven-plugin,我们也可以把解析的接口信息统一上报到接口文档管理系统,接口管理系统保留每个接口的url、参数要求、所属服务及版本、变更历史,提供分组和搜索功能。利用插件自动解析和生产接口文档,降低手动维护接口文档的痛苦和出错机会。

8.接口发布到网关

原理类似利用maven插件接口接口生成接口文档,只不过这里是把接口信息提交给网关的接口配置管理中心,降低维护网关接口配置的痛苦

HBaseWD:通过顺序RowKey避免HBase RegionServer热点问题

在看Pinpoint源码时,发现hbaseOperations2.findParallelRowKeyDistributorByHashPrefix,似乎是RowKey做了哈希打散。后来发现是来自https://github.com/sematext/HBaseWD,用于解决HBase写时由于顺序RowKey导致某个Region成为热点。采用了给RowKey加前缀,把写分散到不同Region,同时分区后的新RowKey还是保持了原有的顺序,对并行Scan的性能没有影响。

原文地址:
https://sematext.com/blog/hbasewd-avoid-regionserver-hotspotting-despite-writing-records-with-sequential-keys/
译文来源:
https://blog.csdn.net/jiangmingzhi23/article/details/78575757

在HBase领域,RegionServer热点是一个共性问题。用一句话来描述HBase热点:以顺序RowKey记录数据时,可以通过startRowkey和endRowKey区间最高效地读取数据,但是这种顺序写入却会不可避免地产生RegionServer热点。接下来两部分我们将讨论并告诉你如何避免这个问题。

问题描述

Hbase中的记录是按照字典顺序存储的。因此可以通过确定的RowKey快速找到某个记录,或者通过start RowKey和end RowKey作为区间以快速查询RowKey在这个区间的记录。所以你可能会认为,以顺序RowKey记录数据,然后可以快速通过上面的方法查询某个区间的记录的方式一个不出错的主意。例如我们可能会希望每个记录都与时间戳有关,以便我们可以通过时间区间查询一段时间的记录,这样的RowKey的例子如下:
  • 基于时间戳的格式:Long.MAX_VALUE – new Date().getTime()
  • 递增/递减序列:”001”, ”002”, ”003”,… or ”499”, ”498”, ”497”, …
但是这种幼稚的RowKey会导致RegionServer写热点。

RegionServer Hotspotting

以顺序RowKey记录数据到HBase中时,所有的写操作会命中一个region。但是如果多个RegionServer服务同一个region时,这个问题就不存在了,不过实际并非如此,一个region通常只在一个RegionServer上。每个region都会预定义大小,当一个region达到最大大小时,就会被分割成两个小的region。然后两个较小的region中的一个负责记录所有的新写入的记录,这个region和它说在的RegionServer就成了新的写热点。显然,这种不均匀地写入是需要避免的,因为它将写操作的性能限制到单个RegionServer的写入能力,而没有将集群中所有RegionServer的性能释放出来。这种负载不均匀地写入操作可以从下图一看出:

hbasewd-pic1

可以看出其中一台服务器满负荷运行以写入记录,但是其他服务器却处于空闲状态。HBase的RegionServer热点问题更多的信息可以参考HBase官方文档

解决方案

如何解决这个问题呢?本文讨论的前提是我们不是一次性批量把所有数据写入HBase,而是以数据流的方式不断持续达到。批量数据导入HBase时如何避免热点的问题在HBase文档中有相关的最佳解决方案。但是,如果你和我们一样,数据持续流入并且需要处理和存储,那么解决热点的最简单的方案就是通过随机RowKey将数据流分发到不同的Region中。然而不幸的是,这样会牺牲通过start RowKey和end RowKey快速检索区间数据的能力。这个在HBase mail list和其它地方多次提及的解决方案就是为RowKey增加前缀。例如可以考虑通过下面的方式构造Rowkey:
new_row_key = (++index % BUCKETS_NUMBER) + original_key
这种方式构造的RowKey,以我们可见的数据类型方式展示如下图2示:

hbasewd-pic2

这里:
  • index是我们用于特定记录的RowID的数字部分,例如1,2,3,4,
  • BUCKETS_NUMBER是我们希望新构建的RowKey想要分发到不同的‘桶’的数量。每一个‘桶’内,数据保持着他们原始ID记录的顺序。
  • original_key是写入数据的原始主键。
  • new_row_key是数据写入HBase中的实际RowKey(即distributed key或者prefixed key)。后文中,distributed records用来表示通过distributed key写入的记录。
所以,新的记录将被分发到不同的‘桶’中,被HBase集群的不同RegionServer处理入库。新写入的记录的RowKey不再是顺序序列,但是在每一个‘桶’中,它们依然保持着原始的字典顺序。当然,如果你开始写数据到一个空的HBase表,在这个表被分割成多个region前你可能要等一段时间,等待的时长取决于流入数据大的大小和速度、压缩比以及region的大小。提示:通过HBase region预分割特效,可以避免这个等待。通过上面的方案写数据到不同的region,你的HBase节点负载看起来就好看多了:

Scan操作

数据在写入过程中被分发到不同的‘桶’中,因此我们可以通过基于start RowKey和end RowKey的scan操作从多个‘桶’中提取数据,并且保证数据的原始排序状态。这也意味着BUCKETS_NUMBER个scan操作可能会影响性能。但是幸运的是这些scan操作可以并行,所以性能不至于降低,甚至会有所提高。对比一下,从一个region中读取100K数据,和从10个region中并行的读取10K数据,哪个更快?

hbasewd-pic3

Get/Delete

对单条记录进行Get/Delete操作,操作复杂度为O(1)到O(BUCKETS_NUMBER)。 例如。 当使用“静态”散列作为前缀时,给定原始RowKey,就可以精确地识别prefised rowkey。 如果我们使用随机前缀,我们将不得不在每个可能的‘桶’中执行Get操作。Delete操作也是如此。

MapReduce Input

我们仍旧希望从数据本身出发,因此将distributed record提供给MapReduce作业可能会打乱数据到达mapper的顺序。至少在HBaseWD的实现中,这个问题是存在的。每个map task处理特定的‘桶’中的数据,所以,数据处理的顺序将于它们在‘桶’中的原始顺序一致。然而由于两个原始RowKey相邻的记录可能被分发存储到不同的‘桶’中,它们将会被分配到不同的map task。因此如果mapper认为数据严格的按照其原始顺序流入,我们则很受伤,因为数据只在每个桶保证原始顺序,并非全局保证顺序。

Increased Number of Map Tasks

当我们以上述方案的数据提供给MapReduce作业时,‘桶’的数目可能会增加。在HBaseWD的实现中,与使用相同参数的常规MapReduce作业相比,你需要进行BUCKETS_NUMBER倍分割。这与前面讨论的Get操作的逻辑相似。所以(HBaseWD的实现中,)MapReduce作业需要有BUCKETS_NUMBER倍的map task。如果BUCKETS_NUMBER不大,理论上性能不会降低,当然,MapReduce作业本身的初始化和清理工作需要更多的时间。而且在很多情况下,更多的mapper可以提升性能。很多用户报告指出,基于标准的HBase Tbase输入的MapReduce作业的map task数目过少(每个region对应一个map task),所以(我们的实现)可以不需要额外编码就可以对MapReduce产生积极作用。
如果在你的应用中,除了按顺序Rowkey写入数据到HBase,还需要通过MapReduce持续的处理新数据的增量,那么本文建议的方案及其实现很可能会有所帮助。在这种情况下,数据持续频繁写入,增量处理只会位于少数region中,或者如果写负载不高时,(增量处理)只会在一个region中,亦或者如果最大region大小很大时,批量处理会很频繁。

方案实现: HBaseWD

我们实现了上述方案,并且将之作为一个叫做HBaseWD的项目开源。由于HBaseWD支持HBase本地客户端API,所以它实际上是独立的,而且很容易集成到现有的项目代码中。HBaseWD项目首次在

Configuring Distribution

Simple Even Distribution

使用顺序RowKey分发记录,最多分发到Byte.MAX_VALUE个‘桶’中,(通过在原始Rowkey前添加一个字节的前缀):
byte bucketsCount = (byte) 32; // distributing into 32 buckets
RowKeyDistributor keyDistributor =  new RowKeyDistributorByOneBytePrefix(bucketsCount);
Put put = new Put(keyDistributor.getDistributedKey(originalKey));
... // add values
hTable.put(put);

Hash-Based Distribution

另一个有用的RowKey分发器是RowKeyDistributorByHashPrefix。参考下面的示例。它通过原始RowKey创建distributed key,如果稍后希望通过原始Rowkey更新记录时,可以直接计算出distributed key,而无需调用HBase,也无需知道记录在哪个‘桶’中。或者,你可以在知道原始Rowkey的情况下通过Get操作获取记录,而无需到所有的‘桶’中寻找。
AbstractRowKeyDistributor keyDistributor =
     new RowKeyDistributorByHashPrefix(
            new RowKeyDistributorByHashPrefix.OneByteSimpleHash(15));
你亦可以通过实现下面的接口以使用你自己的hash逻辑:
public static interface Hasher extends Parametrizable {
  byte[] getHashPrefix(byte[] originalKey);
  byte[][] getAllPossiblePrefixes();
}

自定义分发逻辑

HBaseWD的设计灵活,特别是在支持自定义distributed key方法时。 除了上面提到的用于实现用于RowKeyDistributorByHashPrefix的定制哈希逻辑的功能之外,还可以通过扩展AbstractRowKeyDistributor抽象类来定义自己的RowKey分发逻辑,该类的接口非常简单:
public abstract class AbstractRowKeyDistributor implements Parametrizable {
  public abstract byte[] getDistributedKey(byte[] originalKey);
  public abstract byte[] getOriginalKey(byte[] adjustedKey);
  public abstract byte[][] getAllDistributedKeys(byte[] originalKey);
  ... // some utility methods
}

Common Operations

Scan

对数据执行基于范围的scan操作:
Scan scan = new Scan(startKey, stopKey);
ResultScanner rs = DistributedScanner.create(hTable, scan, keyDistributor);
for (Result current : rs) {
  ...
}

Configuring MapReduce Job

通过scan操作在指定的数据块上自习MapReduce作业:
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "testMapreduceJob");
Scan scan = new Scan(startKey, stopKey);
TableMapReduceUtil.initTableMapperJob("table", scan,
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
// Substituting standard TableInputFormat which was set in
// TableMapReduceUtil.initTableMapperJob(...)
job.setInputFormatClass(WdTableInputFormat.class);
keyDistributor.addInfo(job.getConfiguration());

网络编程总结

同步/异步/阻塞/非阻塞/多路复用

QQ20180606-234210@2x

设计模式(套路)

QQ20180606-232414@2x

并发服务端程式设计范式(9种)

QQ20180606-232736@2x

经典开源组件

QQ20180606-232750@2x

快速体验HBase

standalone运行hbase

下载,并解压

# wget http://mirror.reverse.net/pub/apache/hbase/2.0.0/hbase-2.0.0-bin.tar.gz
# tar xzvf hbase-2.0.0-bin.tar.gz -C /opt
# cd /opt/hbase-2.0.0/

配置

# vi conf/hbase-site.xml

  
    hbase.rootdir
    file:///Users/wingyiu/data/hbase
  
  
    hbase.zookeeper.property.dataDir
    /Users/wingyiu/data/zookeeper
  
  
    hbase.unsafe.stream.capability.enforce
    false
    
      Controls whether HBase will check for stream capabilities (hflush/hsync).

      Disable this if you intend to run on LocalFileSystem, denoted by a rootdir
      with the 'file://' scheme, but be mindful of the NOTE below.

      WARNING: Setting this to false blinds you to potential data loss and
      inconsistent system state in the event of process and/or node failures. If
      HBase is complaining of an inability to use hsync or hflush it's most
      likely not a false positive.
    
  

启动

# bin/start-hbase.sh
# jps
22660 HMaster
22681 Jps

访问Hbase Web UI,在浏览器打开http://localhost:16010
huaban (9)

命令客户端Shell体验

连接

$ ./bin/hbase shell
hbase(main):001:0>

创建table和列族

hbase(main):001:0> create 'test', 'cf'
0 row(s) in 0.4170 seconds

=> Hbase::Table - test

hbase(main):002:0> list 'test'
TABLE
test
1 row(s) in 0.0180 seconds

=> ["test"]

插入Row

hbase(main):003:0> put 'test', 'row1', 'cf:a', 'value1'
0 row(s) in 0.0850 seconds

hbase(main):004:0> put 'test', 'row2', 'cf:b', 'value2'
0 row(s) in 0.0110 seconds

hbase(main):005:0> put 'test', 'row3', 'cf:c', 'value3'
0 row(s) in 0.0100 seconds

遍历表

hbase(main):006:0> scan 'test'
ROW                                      COLUMN+CELL
 row1                                    column=cf:a, timestamp=1421762485768, value=value1
 row2                                    column=cf:b, timestamp=1421762491785, value=value2
 row3                                    column=cf:c, timestamp=1421762496210, value=value3
3 row(s) in 0.0230 seconds

SSO和CAS

SSO(Single sign-on,单点登录),就是只登录一次(只提供一次凭证,如账号密码),就可以畅通无阻的访问平台的多个应用/服务。不应该把SSO和OAuth等授权协议混淆,OAuth协议要求在登录不同系统是都要进行认证和授权,而SSO只认证一次。OAuth本质是解决授权问题。

要实现SSO有很多方案。CAS(Central Authentication Service,集中式认证服务)就是其中一种。最初的CAS由Yale实现,现在CAS协议的实现有很多。CAS协议已经发展到了3.0版本。

一下对CAS协议的描述来自apereo CAS实现的文档。

CAS protocol

The CAS protocol is a simple and powerful ticket-based protocol developed exclusively for CAS. A complete protocol specification may be found here.

It involves one or many clients and one server. Clients are embedded in CASified applications (called “CAS services”) whereas the CAS server is a standalone component:

  • The CAS server is responsible for authenticating users and granting accesses to applications
  • The CAS clients protect the CAS applications and retrieve the identity of the granted users from the CAS server.

The key concepts are:

  • The TGT (Ticket Granting Ticket), stored in the CASTGC cookie, represents a SSO session for a user
  • The ST (Service Ticket), transmitted as a GET parameter in urls, stands for the access granted by the CAS server to the CASified application for a specific user.

交互流程

cas_flow_diagram

从上图可以看出,web的CAS利用到了session cookie和ticket令牌。不难分析得出CASTGC用于保持浏览器到CAS server的会话,第二次访问不需要再提供凭证。利用GET参数传递ST使得认证结果可以在不同域名之间传递。浏览器和受保护应用之间还有一个session cookie,保持了两者之间的会话,使得第二次直接访问应用也不需要提供凭证,应用也不需要再去CAS认证对令牌进行认证。

redis+lua实现Token Bucket限流

在做网关限流时,经过研究后决定使用redis+lua实现Token Bucket算法做分布式限流。

Token Bucket算法要求匀速放入令牌,用定时器可以实现。但是网关的来源IP太多,桶很多,需要很多线程加定时器才能实时处理得过来,不是一个好办法。
换个思路,由于生产token是匀速的,且取token是有时间差的,可以利用两次取token的时间差计算出间隔间应该产生的token数量。原有token+新产生token数-要获取的token数,就是桶里剩余的token。因此对于一个桶,要记录剩余token数,上次取token时间戳。获取token是要传入当前时间戳,要获取token数量(默认1)。在修改桶的两个信息时,需要是原子操作,而且获取令牌过程中带有计算逻辑。
redis是以单线程串行处理请求,在处理EVAL命令时是一个原子操作。因此使用redis+lua脚本刚好可以满足上述要求。

脚本如下

-- bucket name
local key = KEYS[1]
-- token generate interval
local intervalPerPermit = tonumber(ARGV[1])
-- grant timestamp
local refillTime = tonumber(ARGV[2])
-- limit token count
local limit = tonumber(ARGV[3])
-- ratelimit time period
local interval = tonumber(ARGV[4])

local counter = redis.call('hgetall', key)

if table.getn(counter) == 0 then
    -- first check if bucket not exists, if yes, create a new one with full capacity, then grant access
    redis.call('hmset', key, 'lastRefillTime', refillTime, 'tokensRemaining', limit - 1)
    -- expire will save memory
    redis.call('expire', key, interval)
    return 1
elseif table.getn(counter) == 4 then
    -- if bucket exists, first we try to refill the token bucket
    local lastRefillTime, tokensRemaining = tonumber(counter[2]), tonumber(counter[4])
    local currentTokens
    if refillTime > lastRefillTime then
        -- check if refillTime larger than lastRefillTime.
        -- if not, it means some other operation later than this call made the call first.
        -- there is no need to refill the tokens.
        local intervalSinceLast = refillTime - lastRefillTime
        if intervalSinceLast > interval then
            currentTokens = limit
            redis.call('hset', key, 'lastRefillTime', refillTime)
        else
            local grantedTokens = math.floor(intervalSinceLast / intervalPerPermit)
            if grantedTokens > 0 then
                -- ajust lastRefillTime, we want shift left the refill time.
                local padMillis = math.fmod(intervalSinceLast, intervalPerPermit)
                redis.call('hset', key, 'lastRefillTime', refillTime - padMillis)
            end
            currentTokens = math.min(grantedTokens + tokensRemaining, limit)
        end
    else
        -- if not, it means some other operation later than this call made the call first.
        -- there is no need to refill the tokens.
        currentTokens = tokensRemaining
    end
    
    assert(currentTokens >= 0)

    if currentTokens == 0 then
        -- we didn't consume any keys
        redis.call('hset', key, 'tokensRemaining', currentTokens)
        return 0
    else
        -- we take 1 token from the bucket
        redis.call('hset', key, 'tokensRemaining', currentTokens - 1)
        return 1
    end
else
    error("Size of counter is " .. table.getn(counter) .. ", Should Be 0 or 4.")
end

调用这个脚本需要4个参数,其中key是桶的名字,intervalPerPermit是产生令牌的时间间隔,refillTime是调用脚本的即时时间戳,limit是最大间隔访问次数,interval桶的限流间隔。key生产规则可以是”ratelimit:$IP:$PATH:$INTERVAL”。当token不存在时,默认创建一个带有最大token数量的桶。当超过interval都没有访问桶时,对该通的缓存会过期,减少内存使用量。返回1代表成功获取一个令牌,返回0代表未获取到令牌(此次访问应该被限制)。

假设要限制某个IP对某个接口每秒钟最多访问2次,则
interval = 1000(毫秒)
limit = 2
intervalPerPermit = interval/limit = 500
key=”ratelimit:127.0.0.1:/user/get:1000″

命令行调用

redis-cli -h localhost -p 6379 -a wingyiu --ldb --eval ~/Git/api-gateway/src/main/resources/ratelimit_token_bucket.lua ratelimit:127.0.0.1:/user/get:1000 , 500 1520402606803 2 1000

网关使用spring cloud netflix zuul,可以编写一个filter,作为限流功能的实现,调用redis,执行上述Lua脚本

package com.yunzhijia.platform.api.gateway.zuul.filter;

import com.google.common.net.InetAddresses;
import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import com.xxx.gateway.vo.SimpleRouteVo;
import com.xxx.gateway.common.constants.CacheConstants;
import com.xxx.gateway.common.constants.ErrorMsgConstants;
import com.xxx.gateway.exception.GatewayException;
import com.xxx.gateway.utils.RedisTemplateUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import java.util.Collections;

/**
 * 目前仅实现IP+URI的限流
 * 基于令牌桶算法
 * 借助于redis+lua实现对令牌桶的原子操作
 */
@Component
public class RateLimitPreFilter extends ZuulFilter {

	protected static final Logger LOG = LoggerFactory.getLogger(RateLimitPreFilter.class);

	@Override
	public String filterType() {
		return "pre";
	}

	@Override
	public int filterOrder() {
		return 4;
	}

	@Override
	public boolean shouldFilter() {
		return true;
	}

	@Autowired
	RedisScript redisScript;

	@Override
	public Object run() {
		RequestContext ctx = RequestContext.getCurrentContext();
		HttpServletRequest request = ctx.getRequest();

		String uri = request.getRequestURI();
		LOG.info("开始限流检查:{}", uri);
		// 获取ip
		String ip = request.getHeader("X-Forwarded-For");
		if (StringUtils.isBlank(ip)) {
			ip = request.getRemoteAddr();
		}
		ip = ip.split(",")[0];
		LOG.info("remote ip: {}",  ip);

		if (isSiteLocalAddress(ip)) {
			LOG.info("私有地址跳过: {}", ip);
			return null;
		}

		SimpleRouteVo routeVo = (SimpleRouteVo) ctx.get("methodDefinition");

		if (routeVo.getReqPerSec() <= 0 && routeVo.getReqPerMin() <= 0) {
			// 不需要限流
			LOG.info("接口:{} 不限流", uri);
			return null;
		}

		if (routeVo.getReqPerSec() > 0) {
			boolean canAccess = access(ip, uri, 1000, routeVo.getReqPerSec());
			if (!canAccess) {
				LOG.info("被限流: ip={}, uri={}, interval=sec, limit={}", ip, uri, routeVo.getReqPerSec());
				ctx.setSendZuulResponse(false);
				// raise exception to jump to error filter
				throw new GatewayException(ErrorMsgConstants.TOO_MANY_REQUESTS);
			}
		}

		if (routeVo.getReqPerMin() > 0) {
			boolean canAccess = access(ip, uri, 60000, routeVo.getReqPerMin());
			if (!canAccess) {
				LOG.info("被限流: ip={}, uri={}, interval=min, limit={}", ip, uri, routeVo.getReqPerMin());
				ctx.setSendZuulResponse(false);
				// raise exception to jump to error filter
				throw new GatewayException(ErrorMsgConstants.TOO_MANY_REQUESTS);
			}
		}

		return null;
	}

	private boolean isSiteLocalAddress(String ip) {
		// refer to RFC 1918
		// 10/8 prefix
		// 172.16/12 prefix
		// 192.168/16 prefix
		int address = InetAddresses.coerceToInteger(InetAddresses.forString(ip));
		return (((address >>> 24) & 0xFF) == 10)
				|| ((((address >>> 24) & 0xFF) == 172)
				&& ((address >>> 16) & 0xFF) >= 16
				&& ((address >>> 16) & 0xFF) <= 31)
				|| ((((address >>> 24) & 0xFF) == 192)
				&& (((address >>> 16) & 0xFF) == 168));
	}

	public boolean access(String ip, String uri, long intervalInMills, long limit) {
		LOG.info(redisScript.getSha1());
		String key = genKey(ip, uri, intervalInMills, limit);
		key = CacheConstants.genKey(key);
		long intervalPerPermit = intervalInMills / limit;

		try {
			RedisTemplate redisTemplate = RedisTemplateUtils.getRedisTemplate();
			Long refillTime = System.currentTimeMillis();
			LOG.info("调用redis执行lua脚本, {} {} {} {} {}", key, String.valueOf(intervalPerPermit), String.valueOf(refillTime),
					String.valueOf(limit), String.valueOf(intervalInMills));
			Long res = (Long)redisTemplate.execute(redisScript, Collections.singletonList(key),
					String.valueOf(intervalPerPermit), String.valueOf(System.currentTimeMillis()),
					//String.valueOf(limit),
					String.valueOf(limit), String.valueOf(intervalInMills));
			LOG.info("调用redis执行lua脚本:{}", res);
			return res == 1L ? true : false;

		} catch (Exception e) {
			LOG.error("调用redis执行lua脚本出错", e);
			return true; // 内部异常,直接放过吧,保证能访问,不计较太多
		}
	}

	private String genKey(String ip, String uri, long intervalInMills, long limit) {
		return String.format("ratelimit:%s:%s:%s", ip, uri, intervalInMills);
	}
}

实现了对[IP,秒,接口],[IP,分钟,接口]两种限流,具体限流大小由管理后台配置。

留个问题:
Redis Cluster模式下,上述代码能正常工作吗?

参考:
https://zhuanlan.zhihu.com/p/20872901
https://github.com/YigWoo/toys/tree/master/src/main/java/com/yichao/woo/ratelimiter