给Lua实现了一个数学库

我的玩具引擎计划以Lua语言为第一业务语言。

引擎可以加载出一个场景之后,我就需要一个相机控制器,来接收用户输入来移动和旋转相机,以实现场景漫游。

我打算使用Lua来编写这一逻辑。在计算相机的Transform时,需要进行一定的数学运算。这就需要一个Lua版的数学库

怎么给Lua写一个简洁高效的数学库,这并不是最近才开始思考的问题。

早些年在使用tolua框架时,就发现在Lua中进行数学计算时会产生大量的临时对象,极大的加重GC的负担。

虽然这两年时不时就会想起这个问题,也一直没有解决方案。

这次,在没有了Unity的包袱之后, 希望能找到一条全新的思路。

为什么在C#中写数学运算就不会产生GC呢。根本原因是,在C#中(vector3,quaternion,matrix)等对象都是struct类型,即值类型。这些对象都是在栈上分配,函数返回即销毁。就算当值返回,也是直接值拷贝出去的。

在Lua中,严格意义的值类型只有boolean,number两种类型。虽然string表现的像个值类型,但是临时string对象一样会产生内存垃圾。

所以我们一般实现vector3时,会使用Table或userdata来保存xyz。这是因为Lua中的值类型不足以装下xyz这么多数据。

一个很直觉的思路,我们能不能扩展Lua中的值类型,使他最多能包含xyzw四个字段。

答案是能,但是代价很大。内存的代价,性能的代价,以及维护的代价。

我仔细回忆了这几年有限的客户端经历,我发现数学运算都是扎堆的。

换句话说,我们的数学运算一般都是几个有限的输入和几处有限的输出。但是中间计算过程很复杂,只要解决了这些中间过程产生的临时变量,那也算基本符合预期了。

沿着这个思路,即然Lua中只有numbert和boolean是值类型,那我有没有可能用number来代表一个vector3或quaternion呢?

答案是肯定的。我们只需要用C实现一片额外的空间,然后用索引指向这个vector3或quaternion的值就大功告成了。

基于以上思路,我实现了一个数学栈。这个栈的范围只能在一个函数内使用。

如果你想将计算结果返回到另一个函数使用,你只能将栈中的值取出,然后显式返回给其他函数。

如果其他函数需要再次进行数学计算,就需要重新开辟一个数学栈空间。

大致用法如下:

local mathx = require "engine.math"
local camera_up = {x = 0, y = 0, z = 0}
local camera_forward = {x = 0, y = 0, z = 0}
local camera_right = {x = 0, y = 0, z = 0}

local stk<close> = mathx.begin()
print("rotation", component.get_quaternion(self))
local rot = stk:quaternion(component.get_quaternion(self))
local up = stk:mul(rot, stk:vector3f_up())
local forward = stk:mul(rot, stk:vector3f_forward())
local right = stk:mul(rot, stk:vector3f_right())
stk:save(up, camera_up)
stk:save(right, camera_right)
stk:save(forward, camera_forward)

首先使用math.begin()来创建一个数学栈,接着我们就可以在栈上进行各种数学计算。

当数学计算结束时,我们可以使用stk:save来取出数学栈中的xyzw的值。

stk:save有两种使用方式,当我们传入一个table时,stk会直接将xyzw的值置入table内。我们还可以不传入参数,这时stk:save就会根据值的类型返回xyz或xyzw的值。

这里使用了Lua的toclose特性, 当栈使用完之后,__close函数会自动将栈对象放入Cache中。

下次调用math.begin时,直接从Cache中分配,这样可以做到0内存分配。

在实现完这个库之后,我特意与xlua做了一个性能对比。

local stk<close> = math.begin()
local v1 = stk:vector3f(xx_v3_1)
local v2 = stk:vector3f(xx_v3_2)
v2 = stk:vector3f_cross(v1, v2)
v2 = stk:vector3f_cross(v1, v2)
v2 = stk:vector3f_cross(v1, v2)
v2 = stk:vector3f_cross(v1, v2)
stk:save(v2, xx_v3_2)

与同样逻辑的xlua写法对比,性能要高出300%左右。并且随着计算过程的增加,性能优势会越来越明显。

除此之外,在进行数学计算之前,我们往往需要获取到transform中的position,rotation,scale等属性。

这些属性要么是vector3, 要么是quaternion。如果我们用table或userdata来返回依然会加重GC负担。

仔细数一下其实这些数据类型最大也只有4个变量。因此,我让函数component.get_position直接返回xyz三个值,而component.get_rotation直接返回xyzw四个值。

至于是否需要存到table里,这个交由业务逻辑来控制。

寻路和Flocking算法的结合

最近本来在研究行为树, 然后无意间发现了一本名叫《Artificial Intelligence for Games, Second Edition》的书,就顺便看了起来。

书中第三章提到了一个Flocking算法,该算法一般用于模拟群体(羊群,鸟群,鱼群,人群)的移动行为。

这让我想起了大约一年前,他们QQ群里分享了一个蚁群行军的视频。当时为了研究他是如何时实现的,还特意去学习了VO,RVO算法(没有学会),最终也没有实现出来。

这次,我想用Flocking算法再试一次。


先简单介绍一下Flocking算法。

对于鸟群中的一只鸟而言,除了他本身要飞行的速度向量Velocity外,还有三个额外的分量来辅助校正最终的速度向量。

这三个额外分量分别如下:

Separation:每只鸟都会考虑到它们相对周围其他鸟的位置,如果过近,就会产生一个排斥速度分量。

Cohesion: 每只鸟都会检查自己半径R范围内鸟的位置,计算出这群鸟的质心,产生一个向质心靠拢的速度分量。

Alignment: 每只鸟都会检查自己半径R范围内的鸟的速度,计算出这群鸟的平均速度,然后产生一个向平均速度靠拢的速度分量。

最终每只鸟的速度为:Velocity + Separation + Cohesion + Alignment(在叠加过程中,可以根据情况给每个分量加上相应的权重)。

Flocking在没有障碍物的场景,比如天空,海底,平原等表现都很不错。但是一旦进入有障碍物的场景如蚁穴,就会很难工作。

这时就需要加入寻路系统来提供路径支持。


然而,事情并没有这么简单。

由于有SeparationCohesionAlignment速度分量的存在,即使我们给每只鸟单独寻出来一条路径,也不能保证这只鸟就一定会严格按照路径行走。

比如我们为某只鸟寻出来的路径为((0,0), (0,1),(0,2))(我们把地图切成很多小块格子,坐标为格子坐标,不是实际的世界坐标)。

在从(0,0)到(0,1)运行的过程中,由于鸟群的干扰,可能会把这只鸟挤到了(1,1)格子,这时可能(1,1)是到不了(0,2)的,需要重新寻路。

这就意味着,每只鸟每跨过一个格子,就需要重新寻路一次,这么大的开销足以使FPS降到5。


在网上搜到一种解决方案。

给整个鸟群指定一个Leader。为Leader计算一条路径,Leader严格按照路径行走。鸟群中的其他鸟使用Flocking算法来跟随Leader即可。

我尝试了这种方案后,发现这个方案在绕过大片障碍物时非常好用。但是在通过狭窄通道时,很容易发生跟随失败,导致一些鸟永远卡在那里不能行动。

比如下面这种情况:

                   xxxxxL
                  x
  ------------ x --------------
                  x
                   x
                      xB

Leader在位置L处,B位置处的鸟要跟随Leader,必然要产生一个从B位置向L位置的速度。

如果B鸟按这个跟随速度运动,就会被卡在墙的一侧,永远的脱离队伍。

我尝试优化这种方案,除了Leader之外,我加入了Target角色。

所有的鸟在运动时,会在自身周围一定范围内寻找一个Leader或Target作为跟随的目标。

找到跟随目标之后,自身也会变成Target角色,供其他鸟跟随。

如果找不到合适的跟随目标,自己就会变成临时Leader。然后重新计算一条路径,并严格按照路径运动。直到遇见一个合适的Target之后,这只鸟就会再次变回Target。

这种方案可以应对各种极端障碍物情况。但是这个方案几乎把Flocking所有的特性都抹掉了,鸟群在整个运动过程中会排成一字长蛇阵,看起来非常不自然。


我找到当时的QQ聊天记录,仔细读了几遍,然后换了个思路。

计划让鸟群运行到某个目标点那一刻,使用Dijkstra算法计算出地图上所有格子到目标点的最佳运动方向。

这里有个小技巧,我们使用目标点作起始点,然后运行Dijkstra算法。

当Open列表为空时,就已经完成了地图上所有格子到目标点的最佳方向计算。

每只鸟在移动前,根据当前位置计算出当前格子,然后直接查询出下一步的目标点。

理论上,根据目标点计算出鸟的Velocity速度向量,再叠加SeparationCohesionAlignment速度分量就是最终的速度值。

然而,现实是残酷的。

经过实验发现,由于鸟群的作用力,经常会有鸟被挤进障碍物中,尤其是在经过狭窄通道时。

因此我们还需要静态避障速度分量。

在《Artificial Intelligence for Games, Second Edition》中第“3.3.15 Obstacle and Wall Avoidance”节中,讲到可以使用射线检测来躲避静态障碍物。

测试发现,当角度比较奇葩时,射线检测不到障碍物的存在,从而导致最终被挤到墙里面去,3.3.15节也有提到过这种情况。

最终,我采用了AABB来检测周围是否存在障碍物,当有障碍物时,根据障碍物的质心和当前鸟的位置来产生一个远离障碍物的速度分量,这个分量的权重要显著大于其他4个速度分量。

如果障碍物形状态复杂时,可能需要重写AABB检测逻辑,根据相交的边计算出远离障碍物的速度分量。


到目前为止,最大的开销就剩下为地图上所有格子计算最佳方向了。

如果地图过大,这样计算是不现实的。

在写这篇文章时,我想到了一个优化算法,还没来得及测试。

通过观察Flocking算法,不难发现鸟群中的鸟几乎全是按照大致相同的路线行走的。

也就是说,只要我们想办法生成一个有宽度的路径,基本上就可以满足给鸟群寻路的需求了。

首先使用AStar算法,从整个鸟群的质心到目标点计算出一条路径。

然后,对第一步中路径的每个格子,都使用Dijkstra算法,计算出周边格子到这个格子的最短路径。计算时要限制Dijkstra算法遍历的深度。只要我们选取的深度合适,大部分鸟行走的格子都会被命中。

值得一提的是,在应用Dijkstra算法时,路径中相临格子的周围是相互覆盖的,需要根据权重进行刷新。

举个例子:

已经使用AStar算法计算出A到D的路径为(A,B,C,D)。

对格子B应用Dijkstra算法时,对邻居E生成了最佳运动方向为向B运动,E到D的权重为E(1)+B(2) = 3。

对格子C应用Dijkstra算法时,同样会处理到邻居E,这时不能简单的跳过E,而应该计算E到D的权重为E(1) + C(1) = 2。

这时应将E的最佳运动方向改为向C而不是B。

如果某只鸟被挤到了一个我们事先没有计算过的格子上,就使用AStar以此格子为原点向目标点寻路。

这里有一个可以优化的地方,我们已经有了一条很宽的路径,只要AStar寻到已有的路径格子就可以停止继续寻路了。

最后,Demo在此

行为树的一种高效实现

我的玩具项目中,需要有一定智能的NPC来辅助人类攻击防御塔。

通常实现智能会采用状态机,行为树,GOAP等技术。

GOAP技术我没有研究过,行为树在早些年大致了解过一些。因为觉得行为树性能太差,不可能取代状态机实现,之后就再也没有研究过了。

随着这些年我性能强迫症的好转,再加上听到行为树的次数逐年增加,我打算趁机仔细研究一下。

我找来《Behavior Trees in Robotics and AI》仔细读了一遍。这本书详细介绍了行为树,并且对比了行为树和状态机之间的优劣。

根据《Behavior Trees in Robotics and AI》描述,行为树一般有4种控制节点(Sequence, Fallback, Parallel, Decorator)和两种执行节点(Action和Condition)。只有执行节点才能成为叶子节点。

先来简单描述一下最重要的两种控制节点, Sequence和Fallback。

Sequence节点: 当执行Sequence节点时,从左往右顺序执行子节点,直到某一个子节点返回Failure或Running状态,伪码如下:

//Algorithm 1: Pseudocode of a Sequence node with N children
for i 1 to N do
    childStatus <- Tick(child(i))
    if childStatus = Running then
        return Running
    else if childStatus = Failure then
        return Failure
return Success

Fallback节点:当执行Fallback节点时,从左往右顺序执行子节点,直到某一个子节点返回Success or Running状态,伪码如下:

//Algorithm 2: Pseudocode of a Fallback node with N children
for i 1 to N do
    childStatus <- Tick(child(i))
    if childStatus = Running then
        return Running
    else if childStatus = Success then
        return Success
return Failure

Action和Condition节点,是我们具体的业务逻辑,不是本次优化的重点。


对比行为树和状态机可以发现,行为树比状态机额外多出的开销, 就是在执行执行节点之前,必须要先穿过控制节点

如果我们在运行时能避过控制节点,只执行执行节点,那行为树和状态机的开销差别就只是多了几次函数调用而已。

仔细思考过之后, 我认为这是可能的。

结合上面对Sequence和Fallback节点的定义。我们不难发现,在编程语言中,Sequence就是and(与)逻辑,而Fallback就是or(或)逻辑。

整棵行为树的控制节点就是用来描述if-else的逻辑,叶子节点是相应的业务逻辑。从这个角度来看,行为树和语法树有颇多相似之处。

不难发现,整棵树的执行路径,其实依赖于特定执行节点的特定返回值。

某一个执行节点(叶子节点)返回Failure或Success, 整棵行为树下一步要执行的执行节点是固定的。

某个执行节点返回Running, 整棵树就停止执行。在下一Tick之后从头执行,这种情况比较简单,暂时不需要考虑。

来看一棵简单的行为树:

如果 Action 1 Done 返回Success,下一步将要执行的执行节点(叶子节点)就是 Actino 2 Done
如果 Action 1 Done 返回Failure, 下一步将要执行的执行节点(叶子节点)就是 Action 1

这种逻辑可以递归到所有的执行节点

这样,我们只需要两张跳转表(Success跳转表,Failure跳转表),就可以在运行时,以状态机的开销来实现行为树的功能。

以上面的行为树为例,我们可以生成如下跳转表:

local tree = {
["Action 1 Done"] = {
    ["Success"] = "Action 2 Done",
    ["Failure"] = "Action 1"
},
["Action 1"] = {
    ["Success"] = "Action 2 Done",
    ["Failure"] = nil, --nil 代表整棵树执行结束
},
["Action 2 Done"] = {
    ["Success"] = nil,
    ["Failure"] = "Action 2"
},
["Action 2"] = {
    ["Success"] = nil,
    ["Failure"] = nil,
}
}

在运行时,我们首先执行整棵行为树的第一个节点"Action 1 Done"。

如果"Action 1 Done"返回Success, 根据表tree可知,下一步需要执行的是"Action 2 Done"。

如果"Action 2 Done"返回Failure, 根据表tree可知,下一步需要执行的是"Action 2"。

这样我们仅需要生成一个跳转表,就可以在运行时抹掉所有控制节点所带来的开销。

最终,我花了200行代码实现了根据行为树生成上述跳转表的逻辑。

PS.我把生成跳转表的行为称之为编译。如果控制节点是Parallel或Decorator类型,或者有记忆功能。在编译过程中,需要将其保留,不能将其编译掉。不然无法完成和行为树等价的逻辑。

PPS. 在示例代码,我使用了behavior3来编辑行为树。

谈谈数据库的选型

在开发游戏服务器程序的过程中,好像大家都默认使用Mysql, 如果有性能问题,大不了再加个Memcached, 或者干脆使用Redis来做数据库。

但这么做是否真的对所有模式的游戏服务器都合适呢, 对于某些游戏模式,是不是有更好的选择?

这是我最近在看《MySql是怎样运行的》,突然想到的问题。

我挑了三款存储模式完全不同的数据库, 来对比一下它们的特点。


Mysql: 一款关系型数据库。

由于有RedoLog,UndoLog的存在, 支持事务数据落地比较可靠

存储引擎InnoDB采用B+Tree作为存储结构, 而由于B+Tree的性质以及RedoLog,BinLog,UndoLog等机制的存在,导致Mysql的写入性能远低于查询性能。

因此Mysql适用于查询压力大,但是写入压力小的场景。虽然这些复杂的机制拖慢了写入速度,但是MySql可以提供各种复杂的查询

即使如此,由于Mysql的数据结构是严格和磁盘对应的,相比Memcached和Redis等,将数据以内存数据结构的方式完全存储在内存的程序来讲,Mysql的查询性能还是要差不少。

这也是为什么在一些读流量大的地方,有时候会加Memcached或Redis作为前端,以防止大流量将Mysql冲垮(还可以使用从机做读写分离)。


Redis: 一款读写性能都很卓越的NoSql内存数据库。

本质上Redis就是一个带持久化功能的内存缓存,所有的数据以最适合内存访问的方式存储,因此查询极快写入极快不支持事务仅支持键-值查询

其持久化方式分为RDB和AOF两种方式:

RDB是通过定时将进程内存中的数据集快照写入磁盘文件,这种持久化方式性能较高, 但安全性较低。默认配置下,可能会丢失最近60s的数据,由于RDB每次都是重新写入全量数据集,随着持久化频率间隔的降低,会显著增加CPU和IO开销。

AOF是性能和可靠性的另一种折衷, 每一条修改命令都会尽可能快的(不是立即,Redis会在Sleep前才会尝试)写入到文件系统缓存。至于什么时机通知操作系统将文件系统的脏页刷新到磁盘上, Redis最高可以配置为每次写入到操作系统文件系统缓存时,都执行刷新操作,默认为每秒通知操作系统刷新。

AOF文件的大小会随着数据修改次数的增加而逐渐变大,当大到一定程度后,Redis会Fork一个进程对AOF文件进行重写,以达到减少AOF文件尺寸的目的。AOF的重写时机同样可以进行配置。

不管是AOF还RDF方案, 都有一个不可避免的缺点, 每次生成RDB文件或重写AOF文件时, 都会将内存中全量的数据写入文件, 在数据量很大的情况下, 会产生CPU峰值


LevelDB: 一款写性能卓越的NoSql数据库。

LevelDB底层采用LSM数据结构来存储数据, 所以写入极快查询较慢, 不支持事务仅支持键-值查询(还支持键的遍历)

与Redis相反,LevelDB将所有数据都存储在硬盘上,仅在自身内存中缓存热数据

由于LSM数据结构的特殊性,LevelDB还需要WAL(Write Ahead Log)来保证数据的可靠性。

WAL的作用和MySql的RedoLog的作用几乎一样,都是用于在意外Crash时,恢复还没有写入磁盘的数据。

在LSM数据结构中, 所有数据都是存储在SSTable中, 而SSTable是只读的。

这意味着随着数据增删改次数的增加,SSTable会变的越来越大。这时LevelDB的后台线程会在合适的时机,合并SSTable,以达到减少SSTable文件的目的。

LevelDB在合并数据时,是以SSTable文件为单位进行的, 而每个SSTable文件的大小一般为2M。这保证了,即使在数据库存有超大规模数据时,其合并过程依然是可控的


总的来讲,MySql适合查询场景复杂, 而且查询多于写入的场景。Redis适合单进程数据量不大,并且对查询和写入都要求极高的场景。而LevelDB则适合于写多,读少的场景。

不管是Redis的内存限制,还是RDB生成/AOF的重写机制,都限制了其单进程能处理的数据量要远低于Mysql和LevelDB。同时,Redis的查询和写入性能也是这三者之间最出色的。


在我们游戏中,玩家数据是需要长驻内存的,即使一个玩家下线,别的玩家还是可以影响他的所有数据(包括货币和英雄)。

这意味着,我们必须在开服期间,就要从数据库加载所有游戏数据到游戏进程。之后只需要操作进程内数据即可。

在不考虑数据安全的情况下,甚至我们都不需要数据库。

只需要在停服时,像Redis写入RDB一样,将每个系统的数据按照约定的格式写入到不同的文件,下次开服再加载回来。

使用数据库的理由是,它可以让我们按需写入数据,可以提高数据的安全性。

那么,我们对它的需求就只剩一点了:“写入要快,持久化要安全”。

从安全性上来讲,Mysql, LevelDB, Redis的AOF都满足要求。

就写入速度而言,显然MySql落选了,因为他是为查询设计的数据库系统。

就现在需求而言,而Redis和LevelDB在CPU和内存足够的情况下,其实差别不大,甚至Redis要优于LevelDB。

如果我们想再节约一点,就会发现LevelDB在内存和CPU峰值方面优于Redis, 同时他的写入性能要差于Redis, 因为LevelDB有WAL和SSTable两次写入。

ps. 这种取舍其实很像数据结构的优化,一份平均每写10次只查一次的数据,显然没有必要每次写入之后都对数据排一下序,选择时关键是还是要看清数据库的定位以及自己的需求。

实现一个数据库存储队列

大约在在两年前, 我就意识到定点存库的必要性,并在今年应用到实际项目中了。

从历史经验来看,将数据库存储行为收集并合并起来,确实可以极大的降低抽象代码过程中的心智负担。我们在写代码时已经不需要考虑数据库的存储频率,反正最终所有修改过的数据,只会存一次。

从我意识到定点存库的必要性之后,我就一直对当时的抽象不太满意,在最原始的抽象中,刷新数据是需要业务逻辑来完成的。而数据库合并模块本质上只做了去重而已,这也就是说,其实大量定点存储代码实际是散落在业务逻辑各处的。而在此之后虽然我一直在思考该怎么缓存这些数据库操作,但是一直没有什么实质性进展。

直到最近一段时间,我一直在刻意强迫自己放弃部分性能,以达到更好的抽象后才突然有了点想法。

以Redis数据库的hset(key, field, value)为例,最直接的设计就是把每个key下面的value有变动的field收集起来。然后在特定时机将这些field写入相应的key。相应的伪码如下:

	struct key {
		std::unordered_set<uint32_t> dirty_fields;
	};
	std::unordered_map<string, key> dirty_keys;

这其实就是第一版的抽象,因为没有value相关信息,所以序列化及更新数据库的责任都落在了相关的业务逻辑模块,这个数据库操作缓存模块有与没有其实区别不大。

最近我仔细研究了一下Redis的数据模型和我们业务逻辑的数据模型。我发现其实只要抽象出’set’,’hset’, ‘del’, ‘hdel’这些对应的操作,然后对这些操作进行缓存和去重就能满足99%的需求。

现在还有一个问题需要解决,如何将序列化的工作接管过来。这样就不会存在任何回调了。这个问题其实不难解决,只要我们设计一个通用的接口类提供一个serialize接口即可。

伪码大概如下:

	struct obj {
		std::string serialize() const = 0;
	};
	struct cmd {
		std::string name;
		uint32_t field;
		obj *value;
	};
	
	struct key {
		std::unordered_map<uint32_t, cmd> dirty_fields;
	};
	std::unordered_map<string, key> dirty_keys;
	
	void hset(const std::string &key, uint32_t field, obj *value) {
		auto &set = dirty_keys[key];
		auto &cmd = set[field];
		cmd.name = "hset";
		cmd.field = field;
		cmd.value = value;
	}
	void hdel(const std::string &key, uint32_t field) {
		auto &set = dirty_keys[key];
		auto &cmd = set[field];
		cmd.name = "hdel";
		cmd.field = field;
		cmd.value = nullptr;
	}
	void set(const std::string &key, obj *value) {
		auto &set = dirty_keys[key];
		auto &cmd = set[0];
		cmd.name = "set"
		cmd.value = value;
	}
	
	void del(const std::string &key) {
		auto &set = dirty_keys[key];
		set.clear();
		auto &cmd = set[0];
		cmd.name = "del";
		cmd.value = nullptr;
	}
	void flush() {
		for (auto &kiter:dirty_keys) {
			auto &key = kiter.second;
			auto &kname = kiter.first;
			for (auto &citer:key.dirty_fields) {
				auto &cmd = citer.second;
				if (cmd.name == "hset") {
					auto dat = cmd.value->serialize();
					HSET(kname, cmd.field, dat);
				} else if (cmd.name == "set") {
					auto dat = cmd.value->serialize();
					SET(kname, dat);
				} else if (cmd.name == "del") {
					DEL(kname);
				} else if (cmd.name == "hdel") {
					HDEL(kname, cmd.field);
				}
			}
		}
	}

当然这里面其实还有两个个细节问题。

  1. 如果你的数据是直接使用std::unordered_map<uint32_t, dbst> DB进行存储, 直接调用hset("foo", 1, &DB[1])需要考虑DB进行rehash的情况。因此需要new一个新的dbst并将新指针传递过去,类似这样hset("foo", 1, new dbst(DB[1]))。同时struct cmd需要做出如下改变:

	struct cmd {
		std::string name;
		uint32_t field;
		std::unique_ptr<obj> value;
	};

看似我们需要很多次new, 但是仔细分析一下整个内存分配行为,就会发现,整个操作序列大概类似这样:new(首次对一个对象进行set/hset),new,new,new/delete(重复对一个对象进行set/hset),new/delete,delete,delete。也就是说这种分配具有局部性,一般不太会造成内存碎片和性能问题。

  1. 需要保证在调用flush函数之前,所有指针的有效性,这个只要稍微留意一下,其实不难保证。

做完如上抽象后,我在想,有没有可能再次简化上述数据结构。

我仔细研究了一下我们的数据模型,我发现一件很有意思的事。

我们在数据库的每一条数据, 都在内存中有一份惟一的对应。也就是说一个指针一定只对应一个value(set(key, value)/hset(key, field, value))。只要这个指针在整个数据生命周期期间,不发生改变,我们就可以直接使用指针来作主键去重,在业务逻辑层使用std::unordered_map<uint32_t, std::unique_ptr<dbst>来缓存数据库数据即可。

这样数据结构就可以简化为如下:

	struct obj {
		std::string serialize() const = 0;
	};
	struct cmd {
		std::string name;
		std::string key;
		uint32_t field;
		obj *value;
	};
	std::unordered_map<intptr_t, size_t> v2i;
	std::vector<cmd> cmds;

	static cmd &overwrite(obj *v) {
		auto iter = v2i.find((intptr_t)v);
		if (iter != v2i.end()) {
			cmds[iter->second].name = "nop"
		}
		cmds.emplace_back();
		return cmds.back();
	}
	
	void hset(const std::string &key, uint32_t field, obj *value) {
		auto &cmd = overwrite(value);
		cmd.name = "hset";
		cmd.key = key;
		cmd.field = field;
		cmd.value = value;
	}
	void set(const std::string &key, obj *value) {
		auto &cmd = overwrite(value);
		cmd.name = "set"
		cmd.key = key;
		cmd.value = value;
	}
	void hdel(const std::string &key, uint32_t field) {
		cmds.emplace_back();
		auto &cmd = cmds.back();
		cmd.name = "hdel";
		cmd.key = key;
		cmd.field = field;
		cmd.value = nullptr;
	}
	void del(const std::string &key) {
		cmds.emplace_back();
		auto &cmd = cmds.back();
		cmd.name = "del";
		cmd.key = key;
		cmd.value = nullptr;
	}
	void flush() {
		v2i.clear();
		for (auto &cmd:cmds) {
			if (cmd.name == "hset") {
				auto dat = cmd.value->serialize();
				HSET(cmd.key, cmd.field, dat);
			} else if (cmd.name == "set") {
				auto dat = cmd.value->serialize();
				SET(cmd.key, dat);
			} else if (cmd.name == "del") {
				DEL(cmd.key);
			} else if (cmd.name == "hdel") {
				HDEL(cmd.key, cmd.field);
			}
		}
	}

做成一个操作队列,最麻烦的其实是在两个hset/set之间插入hdel/del。这时会有两种选择:

  1. 扫描cmds, 找到相同的key+field, 将删除,并将最后一个相同key+field的cmd结构改成hdel/del。

  2. 直接将del/hdel添加到队列末尾。

由于没有直接证据表明,方式1会快,加上我最近刻意强迫自己牺牲部分性来保持简洁性。因此我选用了方式2.

双向链表的三种实现

这篇文章,其实很像是“茴字的四种写法”。这让人不由的想起来孔乙己。在我印象中,大多数人对孔乙己是持嘲讽态度的。

但是从技术上讲,我觉得”茴字的四种写法”在满足需求的前提下,有助于我们简化实现。

在我的历史经验中,我一共写过三种双向链表。

在最开始实现时,就是按算法导论最朴素的实现。

    //算法1
    struct node {
        struct node *prev;
        struct node *next;
    }
    struct node *head = NULL;
    void insert(struct node *n) {
        n->prev = NULL;
        n->next = head;
        if (head != NULL)
            head->prev = n;
        head = n;
    }
    void remove(struct node *n) {
        if (n->prev != NULL)
            n->prev->next = n->next;
        else
            head = n;
        if (n->next != NULL)
            n-next->prev = n->prev;
    }

写了几次之后,我觉得每次修正head指针,心智负担有点重而且极易出错,于是浪费了一点点空间改进了一下。

    //算法2
    struct node {
        struct node *prev;
        struct node *next;
    }
    struct node head = {NULL, NULL}
    void insert(struct node *n) {
        n->next = head.next;
        if (n->next != NULL)
            n->next->prev = n;
        head.next = n;
        n->prev = &head;
    }
    void remove(struct node *n) {
        n->prev->next = n->next;
        if (n->next != NULL)
            n->next->prev = n->prev;
    }

虽然insert函数逻辑几乎没有减少,但是remove函数的逻辑大大减少,并且更容易理解了。

但是这样做有一个弊端,struct node是一个结构体而不是一个指针,在某些情况下不便于存储。

因此这些年,我几乎都是两种算法换着用。但是每次使用算法1时,总是要停下来仔细思考一下,才敢下手。

最近在Review几年前的代码时,发现之前使用算法1写的双向链表有bug.

这再次使我想对双向链表的算法2进行改进,我仔细思考了一下双向链表的特性。

双向链表主要有两个功能:

  1. 提供反向遍历
  2. 以O(1)的时间复杂度删除某个节点

但是到目前为止, 我从来没有使用过双向链表的特性1.

我使用双向链表的惟一原因就是要快速删除某一个节点。

即然如此,根据“这个世界是平衡的”原则,如果我去掉某个特性,就一定能简化部分实现,只是简化多少的问题。

我仔细研究了算法2,想从中找到某种启发。

最终我发现,在整个逻辑中,prev指针的惟一用处就是用来访问或修改前置节点的next变量。

而head的prev变量同样是多余的。

那么,如果将prev的含义修改为指向前置节点的next变量,关于prev的循环不变式同样成立。

优化后的代码如下:

    //算法3
    struct node {
        struct node **prev;
        struct node *next;
    }
    struct node *head = NULL;
    void insert(struct node *n) {
        n->prev = &head;
        n->next = head;
        if (head != NULL)
            head->prev = &n->next;
        head = n;
    }
    void remove(struct node *n) {
        *n->prev = n->next;
        if (n->next != NULL)
            n->next->prev = n->prev;
    }

由此,终于可以在享有算法2逻辑复杂度的同时,而不必要承担一个head结构体。

BTW,在写本文的前一天,我无意间发现Lua源码中也是这样做的 😀

一次关于Cache的性能分析

Lua5.4-alpha-rc2 已经发布了好一段时间了, 一直没时间去跑跑看性能如何。最近刚好有空,就跑来看看。结果第一段测试代码就把我惊住了。

--a.lua
collectgarbage("stop")
local function foo()
    local a = 3
    for i = 1, 64 * 1024 * 1024 do
        a = i
    end
    print(a)
end
foo()

在 Lua5.3.4 和 Lua5.4-alpha-rc2 上,这段代码运行时间分为0.55,0.42s。

通过`./luac -p -l ./lua ` 可以得知,上段这代码性能热点一定是OP_MOVE,和OP_FORLOOP。因此一定是这两个opcode的执行解释代码有修改。

我仔细对比了一下,关于OP_FORLOOP和OP_MOVE的实现,发现实现上一共有三处优化。

1. vmcase(OP_FORLOOP)的执行代码去掉了’0<step’的判断。(由于一次for循环期间,step的符号总是固定的,因此cpu分支预测成功率是100%)

2. vmcase(OP_FORLOOP)向回跳转时,偏移量改成了正值,因此将Bx寄存器直接当作无符号数去处理,省了一个符号转换操作。

3. vmcase(OP_FORLOOP)向回跳转时,由直接修改ci->u.savedpc改为了修改一个局部变量pc。通过反汇编得知,修改局部pc可以省掉一次store操作。

经过测试发现,这三处修改都达不到0.13s这么大幅度的提升。

万般无奈的情况下,我使用git bisec测试了从 Lua5.3.4 到 Lua5.4-alpha-rc2的所有变更(这里说所有不准确,因为git bisec是通过二分法查找的)。

最终发现引起性能影响的竟然是下面一段赋值操作的修改。

  
 typedef union Value {
   GCObject *gc;    /* collectable objects */
   void *p;         /* light userdata */
   int b;           /* booleans */
   lua_CFunction f; /* light C functions */
   lua_Integer i;   /* integer numbers */
   lua_Number n;    /* float numbers */
 } Value;

 #define TValuefields Value value_; int tt_

 typedef struct lua_TValue {
   TValuefields;
 } TValue;

 #define setobj(L,obj1,obj2) \
-{ TValue *io1=(obj1); *io1 = *(obj2); \
+{ TValue *io1=(obj1); const TValue *io2=(obj2); \
+  io1->value_ = io2->value_; io1->tt_ = io2->tt_; \
   (void)L; checkliveness(L,io1); }

两个赋值的作用都是复制一个结构体。只不过由于结构体对齐的存在,直接使用结构体赋值,会多复制了四个字节。

但是,在64位机器上,如果地址是对齐的,复制4个字节和复制8个字节不应该会有如此大的差异才对。毕竟都是一条指令完成的。为了近一步证明不是多复制4个字节带来的开销,我做了如下测试。

假设修改前的setobj是setobj_X, 修改后的setobj为setobj_Y。然后分别对setobj_X和setobj_Y进行测试tt_类型为char, short, int, long的情况。

测试结果如下:

 typeof(tt_)  char       short       int      long
 setobj_X     0.55s      0.55s       0.55s    0.41s
 setobj_Y     0.52s      0.43s       0.42s    0.42s

从测试结果可以看出,setobj_X在tt_类型为long时反而是最快的,这说明开销并不是多复制4字节造成的。


反汇编之后发现,setobj_X 和 setobj_Y 惟一的差别就是赋值顺序和寻址模式。

汇编如下:

;setobj_X
0x413e10 :        shr    r13d,0x17
0x413e14 :        shl    r13,0x4
0x413e18 :        mov    rax,QWORD PTR [r15+r13*1] ;value_
0x413e1c :        mov    rdx,QWORD PTR [r15+r13*1+0x8] ;tt_
0x413e21 :        mov    QWORD PTR [rbx],rax
0x413e24 :        mov    QWORD PTR [rbx+0x8],rdx
0x413e28 :        mov    rsi,QWORD PTR [rbp+0x28]
0x413e2c :        jmp    0x4131a0 

;setobj_Y
0x413da8 :        shr    r13d,0x17
0x413dac :        shl    r13,0x4
0x413db0 :        add    r13,r15
0x413db3 :        mov    eax,DWORD PTR [r13+0x8] ;tt_
0x413db7 :        mov    DWORD PTR [rbx+0x8],eax
0x413dba :        mov    rax,QWORD PTR [r13+0x0] ;value_
0x413dbe :        mov    QWORD PTR [rbx],rax
0x413dc1 :        mov    rax,QWORD PTR [rbp+0x28]
0x413dc5 :        jmp    0x413170 

猜测,难道是赋值顺序打乱了流水线并行,还是寻址模式需要额外的机器周期? 但是他们都无法解释,当我把tt_的类型改为long之后,setobj_X也会变得更快。

种种迹象把矛头指向Cache。 但这时我已经黔驴技穷了,我找不到更多的测试来继续缩小排查范围了。也没有办法进一步确定一定是Cache造成的(我这时还不知道PMU的存在)。


我开始查找《64-ia-32-architectures-optimization-manual》,试图能在这里找到答案。

找来找去,只在3.6.5.1节中找到了关于L1D Cache效率的相关内容。我又仔细阅读了一下lvm.c的代码,却并没有发现符合产生 Cache 惩罚的条件。(其实这里我犯了一个错误,不然走到这里我就已经找到答案了。以前看lparse.c中关于OP_FORLOOP部分时不仔细。欠的技术债这里终于还了。)

万般无奈下,我又测试了下面代码,想看看能否进一步缩小推断范围。

--b.lua
collectgarbage("stop")
local function foo()
    local a = 3
    local b = 4
    for i = 1, 64 * 1024 * 1024 do
        a = b
    end
    print(a)
end
foo()

这次测试其实是有点意外的,因为setobj_X版本的luaVM一下子跑的几乎跟setobj_Y版本一样快了。

看起来更像是3.6.5.1节中提到的L1D Cache的惩罚问题了。但是我依然没有找到惩罚的原因。

我把这一测试结果同步到lua的maillist上去(在我反汇编找不到答案后,就已经去maillist上提问了,虽然有进度,但是同样一直没有结论).

这一次maillist上的同学,终于有了进一步答案了。

他指出,在vmcase(OP_FORLOOP)中使用分开赋值的方式更新’i’(一次赋值value_, 一次赋值tt_,这次tt_赋值是store 32位)。而在vmcase(OP_MOVE)使用的setobj_X赋值时,使用了两次load 64位来读取value_和tt_。

这恰好就是3.6.5.1节中提到的规则(b),因此会有L1D Cache惩罚。

而这时我恰好已经通过perf观察到两个版本的setobj在PMU的l1d_pend_miss.pending_cycles和l1d_pend_miss.pending_cycles_any指标上有显著不同。 两相印证,基本可以90%的肯定就是这个问题。

现在来解释一下,我之前犯的错误。我之前一直认为,一个`for i = 1, 3, 1 do end`一共占三个lua寄存器:一个初始值i,一个最大值3, 暂时称为_m,一个步长1, 暂时称为_s。

但是经过maillist上的同学提醒后,我又仔细看了一下lparse.c,发现其实上面的for一共占四个lua寄存器:初始值1,暂称为_i,最大值_m, 步长_s,及变量i。

每次OP_FORLOOP在执行到最后会同步_i的值到变量i. 代码中的使用的值来自变量i所在的寄存器,而不是_i。

从lparse.c中得知,_i来自R(A), _m来自R(A+1), _s来自R(A+2), i来自R(A+3)。

再来看一下lvm.c中关于vmcase(OP_FORLOOP)的代码:

vmcase(OP_FORLOOP) {
  if (ttisinteger(ra)) {  /* integer loop? */
    lua_Integer step = ivalue(ra + 2);
    lua_Integer idx = intop(+, ivalue(ra), step); 
    lua_Integer limit = ivalue(ra + 1);
    if ((0 < step) ? (idx <= limit) : (limit <= idx)) {
        ci->u.l.savedpc += GETARG_sBx(i);  /* jump back */
        chgivalue(ra, idx);  /* update internal index... */
        setivalue(ra + 3, idx);  /* ...and external index */
    }
  }
  ...
  vmbreak;
}

可以很明显看出ra寄存器和(ra+3)的寄存器的赋值方式并不一样。其中chgivalue是只改value_部分,而setivalue是分别对value_和tt_进行赋值。

因此当接下来执行vmcase(OP_MOVE)时,setobj_X对tt_所在的地址,直接读取64位时就就会受到L1D Cache的惩罚。

而我之前犯的错误就是我一直认为修改i的值是通过chgivalue(ra, idx)来实现的。

为了更加确定是L1D Cache中Store-to-Load-Forwarding惩罚造成的开销。我将setivalue改为了chgivalue之后再测试。果然运行时间与setobj_Y的时间相差无几。这下结论已经99%可靠了,那剩下的1%恐怕要问Intel工程师了。

BTW,这次分析其实断断续续执行了4天。对于Cache对程序性能的影响,终于有了一次深刻的意识。

从CPU层面谈谈优化

大多数时间,大家都在从设计和算法上优化效率(这类优化往往效果比较明显,比如一个二分查找可以轻易将时间复杂度降低为lg(n))。但是在实现上,却很少有人注重实现效率,而理由是反正每年都会有更高频率的CPU出现,我何必花那个心思呢(Java程序员尤其擅长使用这个理由@_@)。

我个人不是完全同意上面的说辞,在不影响代码的可读性和花费不高的代价下,我习惯性的会写最我当前所知道的性能最高的写法。

在很早以前,我一度只有在编写汇编时才可以计算出汇编条数。随着代码量的增加,我慢慢意识到,这种想法是错误的。即使在写C语言时,其实生成的汇编也时有迹可寻的。

程序中所有代码一般都分为三个步骤(有些汇编指令会合并其中的某几步)。

1. 从内存读到寄存器(有可能直接从cache读)
2. 操作寄存器进行运算,并将结果保存在寄存器
3. 将结果写入内存(有可能会同时写入cache)

一般步骤1和3是对称的,只要步骤1的效率高,步骤3会同样高效。因此下面只分析步骤1和步骤2。

先来定义几个数据结构:

struct foo {
    int b;
    int a;
};
struct bar {
    struct foo f;
    struct foo *fp; //assume fp = &f
};

影响访问内存效率的第一个因素,汇编条数:

int access1(struct bar *b)
{
    return b->f.a;
}
int access2(struct bar *b)
{
    return b->fp->a;
}

在上面代码中access1比access2快一倍,这是因为结构体嵌套,不论嵌套多少层,其子成员偏移量都是常量。因此`b->f.a` 等价于`int a = *(int*)((unsigned char *)b + 4)`,所以这句代码翻译成汇编就是`movl 4(%rdi),%eax`(这里%rdi就是b的值)。

而access2就必须要经过两次内存访问, `struct foo *p = ((unsigned char *)b + 8); int a = *(int)((unsigned char *)p + 4)`。 翻译成汇编就成了`movq 8(%rdi),%rax, movl 4(%rax), %eax`(这里%rdi同样是参数b)。

因此当你写出类似下面代码时请慎重。

b->fp->a = xx;
b->fp->b = xx;
b->fp->c = xx;
b->fp->d = xx;
b->fp->e = xx;

反观下面的代码,除了代码前缀长一点,却并不会产生什么问题。

b->fp.a = xx;
b->fp.b = xx;
b->fp.c = xx;
b->fp.d = xx;
b->fp.e = xx;

影响访问内存效率的第二个因素, 访问内存的频率。 也许很多人都写过类似下面的代码:

void test(struct bar *b)
{
    int i;
    for (i = 0; i < b->foo.a; i++)
        do_something()
}

编译器在执行优化时,一般不会使用寄存器缓存指针指向的内容和函数调用的返回结果(这个不同的编译器实现可能不太一样,至少我使用的GCC在O2的情况下并不会做此优化),我称之为指针不可优化原则。因为从理论上来讲内存是整个进程共享的,而进程中到底会有多少个线程编译器并不知晓,它并不知道在执行for期间,b->foo.a是否会被其他线程修改。函数调用也是同理。

这也意味着每执行一次do_somthing之后都会先去访问b->foo.a的值,再来比较。这会增加访存次数, 即使有cache的存在,也没有直接访问寄存器来的快。因此这时我们可以向编译器做个暗示(增加一个局部变量,告诉编译器,我们需要的这个值,在for循环期间不会变化),比如像下面这样。

void test(struct bar *b)
{
    int i, n = b->foo.a;
    for (i = 0; i < n; i++)
        do_something()
}

影响访问内存效率的第三个因素, CPU CACHE。

数据结构的设计与CPU CACHE的命中率其实是息息相关。

由于Cache要远小于内存,因此一般会采用某种映射算法进行映射(是的,上学时“计算机组成原理”上讲的都是真的)。

然而不论CPU采用何种算法,Cache line的概念是不变的。即在Cache miss时,是按Cache line的模式来加载的.

比如在X86架构上Cache line一般为64字节,我们在访问内存地址0x100时,CPU会自动将0x100 ~ 0x140的内存全部载入Cache。

假如我们需要频繁在一个链表中,查出某几个结点进行数据修改,下面的数据结构就是低效的。

struct node {
    struct node *next;
    int select;
    char buffer[128];
};
void test(struct node *head, int select)
{
    struct node *h;
    for (h = head; h != NULL; h = h->next) {
        if (h->select == select) {
            cook(h->buffer, sizeof(h->buffer));
            break;
        }
    }
}

很显然,从上面的算法来看,在for循环时我们现关心node.next 和 node.select字段,node.buffer只有在最后一步时才会访问,而上面的代码每访问一次h->select的值,都会使Cache line充满了node.buffer的值,而这个值我们是不需要的。

因此我们需要优化struct node的内存布局,以使链表中所有的node.next, node.select 尽可能的排布在一起,以便可以充分使用Cache line的预加载功能。

最后,再简单看一下运算过程中的两个优化(并不是只有两个,而是我只会这几个:D)

一个是比较功能,在所有比较中,与0比较是最快的,因为大部分CPU的指令都会影响ZF标准位。比如下面代码:

void test1(struct foo *f)
{
	if ((f->a & 0xf) == 0)
		do_something();
}
int test2(struct foo *f)
{
	if ((f->a & 0xf) == 1)
		do_something();
}

test1函数比test2函数快3倍,因为test1函数可以直接用test指令然后判断ZF标准位,而test2需要先将f->a取出来,然后做and, 最后做compare操作。也许直接上汇编会更能说明问题。

//test1
testb	$15, 4(%rdi)
je	LBB3_2
//test2
movl	4(%rdi), %eax
andl	$15, %eax
cmpl	$1, %eax
jne	LBB4_1

再一个就是switch功能

在写switch语句时,如果case的值是连续的, 则编译器会采用跳转表的形式来直接jmp, 时间复杂度为O(1)。

如果值是不连续的,编译器默认会采用二分查找法来进行,时间复杂度为O(lgn)。因此如果有可能,应该尽可能将case的值定义为连续。

一次性能优化经历

自从上次修改backlog之后, Silly的IO能力,就一直以少量(约4~6K)的差距落后于redis,却一直找不到原因。

这次打算从头做一次profile来问题到底出在哪。

先用GNU提供的gprof分析一下C代码是否有值得优化的地方,结果发现CPU使用率最高的地方是luaVM内部和malloc/free。

我们所有的业务逻辑全在lua层做的,而IO线程与worker(lua)线程进行交互时是通过malloc来实现的。这几乎表明C代码几乎已经没有优化的余地了。

但是有个好消息,就是gprof不去profile系统调用和so,也许还有机会。

压测时,先使用top看一下每个cpu core的使用率,包括用户态与内核态。

用户态过高一般是应用逻辑代码消耗的,内核态则有可能是因为系统调用过多,上下文切换过频繁,等其他原因。

不过top只能看到当时的cpu状态,不太好看出整个测试区间,cpu消耗的曲线。可以改用’sar -u -P ALL 1’每隔一秒打印出cpu的使用情况。

通过观察发现,有一个线程的内核态有70+%之多,相比Redis来讲高出不少。

再使用vmstat命令查看in(系统中断)/cs(上下文切换), 可以确认在整个压测区间in和cs显著升高,推测应该是系统调用造成的。

为了近一步确认这些‘中断和上下文切换’是由Silly造成的,使用’pidstat -w -p PID 1’来打印出某一个线程的上下文切换频率。

当确认之后,再使用’strace -p $PID -c -f’来收集此进程所有系统调用次数。再根据收集到的信息有针对性的优化。

如果以上都做了,还是没有什么可以优化的余地。没关系,我们还有一个神器perf来查看Cache命中率,分支预测失败率,CPU调度迁移等与cpu密切相关的信息。

如果以上都已经做还是没有找到优化空间。

还有一个很常见但很容易被人忽略的因素,就是CPU的用户态和内核态都很低。

这种情况下,一般是程序或集群间有队列(这个队列可能是socket等一切有FIFO性质的设施),队列的一端处理过慢(比如由于某种原因,处理端被卡住了,而又不耗cpu) ,而队列的产生端在产生完请求之后,由于一直没有收到回应,也一直在idle中。

整个表现,看上去特别诡异,就像是突然间机器空载了一样。


最后,当我们发现应用层代码实在无法优化之后,别着急,也许还有最后几个免费午餐你还没有吃。

jemalloc

一款很优秀的内存分配器,即使对多线程也有很好的表现。以此次优化silly为例,把内存分配器换成jemalloc 5.0之后,请求处理速度有显著提升。

__builtin_expect

GNU内建函数,可以用来向GCC暗示哪个分支更高概率的被执行,以便GCC可以生成更好的代码,以方便CPU做分支预测。当我们的分支判断成功与失败的概率有显著差别时(比如异常处理),可以用来提高性能,至于能提高多少,要看具体情况。其中一种情况的测如见上篇

cpu affinity

linux内核向应用程序提供了一些接口,可以让我们微调内核,包括调度算法。cpu affinity可以修改进程或线程的cpu亲和力。以暗示内核,最少可能选成cpu迁移,cpu迁移数据可以通过perf工具来获取。

关于CPU分支预测

由于很偶然的原因,这两天瞄了几眼”ZeroMQ”项目,发现项目中使用了’likely/unlikely’这种暗示编译器做分支预测优化的宏(这些宏只是gcc内建函数__builtin_expect的别名)。

其实早在看linux kernel源码时,就不止一次看到这组宏。但是,对此并没有放在心上。仅仅粗略搜了一下,得知‘likely/unlikely这组宏可以向编译器提供分支预测信息,以便编译器可以更好的安排指令顺序来提高效率’就没有再深入下去了。

因为我觉得,这种级别的优化估计能提高个千分之几就已经算很不错了,内核可能是为了把CPU性能压榨到极致所以才使用的。而应用层远没有达到需要这种优化的极限。

然而今天这组宏出现在一个应用级的程序代码里面,让我意识到事情可能与我的想当然相差甚远。

于是写了段代码测了一下:

//make 'gcc -g -O2 -o a a.c b.c'
//b.c 
//之所以把foo1、foo2函数放在b.c文件里,是为了防止编译器把foo函数调用给优化掉
void foo1 {}
void foo1 {}
//a.c
#define	likely(x) __builtin_expect(!!(x), 1)
#define	unlikely(x) __builtin_expect(!!(x), 0)
void foo1();
void foo2();
void bar()
{
	foo2();foo2();foo2();foo2();foo2();foo2();foo2();foo2();
}
int main(int argc)
{
	int i;
	for (i = 0; i < 1024 * 1024 * 640; i++) {
		if (likely(argc == 1)) {
			foo1();
		} else {
			bar();
		}
	}
	return 0;
}

测试平台:

Intel(R) Core(TM) i5-3210M CPU @ 2.50GHz

测试结束:

不使用likely宏(即直接使用if(argc == 1))的前提下,使用‘time ./a’测试结果如下:

./a 2.01s user 0.03s system 97% cpu 2.100 total

使用likely宏之后,再次使用’time ./a’测试结果下如:

./a 1.63s user 0.03s system 99% cpu 1.672 total

从上面结果可以得出,性能相差了将近20%。


下面反汇编两种不同的情况,看看__builtin_expect到底让编译器做了什么,会让效果这么明显。

需要注意的是,只有使用了gcc -O2,__builtin_expect函数才会让编译器产生优化效果。

先来看不使用likely宏的a.c反汇编代码如下:

_bar:                                   ## @bar
	.cfi_startproc
	## 此处省略无关代码
Ltmp2:
	.cfi_def_cfa_register %rbp
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	popq	%rbp
	jmp	_foo2                   ## TAILCALL
	.cfi_endproc
	## 此处省略无关代码
_main:                                  ## @main
	.cfi_startproc
	## 此处省略无关代码
	movl	%edi, %r14d
	movl	$671088640, %ebx   ## imm = 0x28000000
	.p2align	4, 0x90
LBB1_1:                            ## =>This Inner Loop Header: Depth=1
	xorl	%eax, %eax
	cmpl	$1, %r14d
	jne	LBB1_3
## BB#2:                           ## in Loop: Header=BB1_1 Depth=1
	callq	_foo1
	jmp	LBB1_4
	.p2align	4, 0x90
LBB1_3:                           ## in Loop: Header=BB1_1 Depth=1
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
LBB1_4:                           ## in Loop: Header=BB1_1 Depth=1
	decl	%ebx
	jne	LBB1_1
## BB#5:
	xorl	%eax, %eax
	popq	%rbx
	popq	%r14
	popq	%rbp
	retq
	.cfi_endproc

由于测试代码实现太简单了,以致于连编译器都没什么可优化的了,所以它惟一能做的就是把bar函数内联了。

从标号BB#2到LBB1_3之间即为if(cond == true)时调用的代码,即调用foo1函数,从标号LBB1_3到LBB1_4之间即为else执行的代码即调用了bar函数,编译器认为bar函数内联效率更高,所以这里是bar函数的内联形式。

可以看到,基本上汇编指令的排布就是我们C语言中的书写排布。

下面再来看一下使用likely之后,汇编指令是如何排布的。

_bar:                                   ## @bar
	.cfi_startproc
	## 省略无关代码
Ltmp2:
	.cfi_def_cfa_register %rbp
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	popq	%rbp
	jmp	_foo2              ## TAILCALL
	.cfi_endproc

	.globl	_main
	.p2align	4, 0x90
_main:                            ## @main
	.cfi_startproc
	## 省略无关代码
	movl	%edi, %r14d
	movl	$671088640, %ebx  ## imm = 0x28000000
	jmp	LBB1_1
LBB1_3:                           ## in Loop: Header=BB1_1 Depth=1
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	jmp	LBB1_4
	.p2align	4, 0x90
LBB1_1:                           ## =>This Inner Loop Header: Depth=1
	xorl	%eax, %eax
	cmpl	$1, %r14d
	jne	LBB1_3
## BB#2:                          ## in Loop: Header=BB1_1 Depth=1
	callq	_foo1
LBB1_4:                           ## in Loop: Header=BB1_1 Depth=1
	decl	%ebx
	jne	LBB1_1
## BB#5:
	## 省略无关代码
	.cfi_endproc

与未使用likely宏之前的代码相比,相同的是bar函数依被内联了,不同的是,for, if, esle 代码块的布局发生了根本性的变化。

将上面的汇编代码翻译成C代码之后会更加明显。

goto label_start;
label_else:
bar();
goto label_for:
label_start:
for (i = 0; i < 1024 * 1024 * 640; i++) {
	if (argc != 1)
		goto label_else;
	foo1();
label_for:
}

从上面的C代码看出,编译器将高概率指执的代码(for语句逻辑、调用foo1函数)安排在了一起,而将低概率执行的代码块挪到其他地方,以获得更高的效率。


Intel的优化手册中’3.4.1.3节静态预测’指出:

predict backward conditional branches to be taken; rule is suitable for loops
predict forward conditional branches to be NOT taken

Assembly/Compiler Coding Rule 3. (M impact, H generality) Arrange code to be consistent with the static branch prediction algorithm: make the fall-through code following a conditional branch be the likely target for a branch with a forward target, and make the fall-through code following a conditional branch be the unlikely target for a branch with a backward target.

当cpu第一次执行此分支时,由于cpu的BTB(Branch target buffers)还没有历史数据,因此部分CPU会采用静态预测算法。Intel的静态预测算法为:在条件分支预测中,预测所有向前跳转的都不执行,所有向后跳转都会执行。

因此,Intel手册指出,如果条件分支生成的是向前跳转代码,则应该将likely执行的代码紧跟着j*(jmp的条件形式)之后,如果条件分支生成的是向后跳转代码,则应该将likely执行的代码放在要跳转的标号处。

在’3.4.1.5节代码对齐’指出:

Careful arrangement of code can enhance cache and memory locality. Likely sequences of basic blocks should be laid out contiguously in memory. This may involve removing unlikely code, such as code to handle error conditions, from the sequence.See Section 3.7, “Prefetching,” on optimizing the instruction prefetcher.
Assembly/Compiler Coding Rule 12. (M impact, H generality) All branch targets should be 16-byte aligned.
Assembly/Compiler Coding Rule 13. (M impact, H generality) If the body of a conditional is not likely to be executed, it should be placed in another part of the program. If it is highly unlikely to be executed and code locality is an issue, it should be placed on a different code page.

代码对齐一节指出,所有likely执行的代码应该在内存中尽可能连续的存放,而那些unlikely的代码,比如异常处理代码,应该被从这段连续的代码中去除,放到其他地方。

对比上面的汇编发现,GCC的优化与文档描述不完合相同。

GCC确实将unlikely的代码单独拿出来,而让likely的代码尽可能连续。但是对于unlikely的代码却使用了向后跳转条件分支。

让代码更简单一些,然后重新反汇编看一看,C代码如下:

#define	likely(x) __builtin_expect(!!(x), 1)
#define	unlikely(x) __builtin_expect(!!(x), 0)
void foo1();
void foo2();
int main(int argc, char *argv[])
{
	if (likely(argc == 1))
		foo1();
	else
		foo2();
	return 0;
}

反汇编代码如下:

## 省略部分无关代码
_main:                                  ## @main
	.cfi_startproc
## 省略部分无关代码
	xorl	%eax, %eax
	cmpl	$1, %edi
	jne	LBB0_2
## BB#1:
	callq	_foo1
LBB0_3:
	xorl	%eax, %eax
	popq	%rbp
	retq
LBB0_2:
	callq	_foo2
	jmp	LBB0_3
	.cfi_endproc

可以看出,在编译上面代码时,GCC严格尊守了Intel手册的‘静态预测算法’和‘代码对齐规则’。

但是在编译有for语句的代码时,似乎有其他的规则,让GCC觉得违反静态分支预测会活的更好的性能。

再对比一下两个使用likely宏的代码(一个使用for语句,一个不使用for语句)反汇编之后的代码发现, 带有for语句的代码中有一句伪码‘.p2align 4, 0x90’。

这句代码把unlikely的代码和likely的代码使用nop分开了,猜测这里可能会使这两面代码分别处在不同的cache line上,符合’代码对齐’标准,而由于‘分支静态预测’仅在分支第一次执行时才会使用。而且面都会使用动态预测法。

因此,猜测可能‘代码对齐’带来的好处高于满足‘静态预测’算法,而让GCC选项这么做的原因就是‘代码对齐’规则。


另,在阅读Intel手册时发现,‘3.4.1.2 Spin-Wait and Idle Loops’提到,在实现spinlock时,插入PAUSE指令可以显著提高性能。我又去查阅了linux源码,发现kernel源码中的spinlock也有插入PAUSE指令。

而我之前都是使用while (__sync_test_and_set(lock, 1))来简单实现spinlock,这可能是一种错误的使用形式。