Lua5.3 GC源码阅读(5)

这篇内容并不是上篇提到的关于弱表实现的分析。而是最近有位同学与我探讨了一些Lua的GC时,发现了两个以前从没注意过的智慧闪光点。

首先是luaC_fullgc的实现.


 #define keepinvariant(g)	((g)->gcstate <= GCSatomic)

static void entersweep (lua_State *L) {
  global_State *g = G(L);
  g->gcstate = GCSswpallgc;
  lua_assert(g->sweepgc == NULL);
  g->sweepgc = sweeplist(L, &g->allgc, 1);
}

void luaC_fullgc (lua_State *L, int isemergency) {
  global_State *g = G(L);
  lua_assert(g->gckind == KGC_NORMAL);
  if (isemergency) g->gckind = KGC_EMERGENCY;  /* set flag */
  if (keepinvariant(g)) {  /* black objects? */
    entersweep(L); /* sweep everything to turn them back to white */
  }
  /* finish any pending sweep phase to start a new cycle */
  luaC_runtilstate(L, bitmask(GCSpause));
  luaC_runtilstate(L, ~bitmask(GCSpause));  /* start new collection */
  luaC_runtilstate(L, bitmask(GCScallfin));  /* run up to finalizers */
  /* estimate must be correct after a full GC cycle */
  lua_assert(g->GCestimate == gettotalbytes(g));
  luaC_runtilstate(L, bitmask(GCSpause));  /* finish collection */
  g->gckind = KGC_NORMAL;
  setpause(g);
}

在luaC_fullgc中,主要的意图是首先尽可能快的结束执行到一半的GC循环,然后从头开始一轮新的GC循环,以达到fullgc的语义。

那么怎么快速结束当前未结束的循环呢,Lua采用了一个很有智慧的方法,那就是快速进入sweep阶段(即如果当前还没有处理sweep阶段,就直接进入sweep阶段)。那么我为什么说这是一个很有智慧的方式呢,这是因为不论在整个GC状态机的任意一个状态进入sweep阶段,整个GC状态都不会出错,这是我以前没有意识到的。

先来看一下整个GC状态机全部状态:

 #define GCSpropagate	0
 #define GCSatomic	1
 #define GCSswpallgc	2
 #define GCSswpfinobj	3
 #define GCSswptobefnz	4
 #define GCSswpend	5
 #define GCScallfin	6
 #define GCSpause	7

如果将一个GC循环按照标记和清除来区分,很显然GC循环的开始在GCSpropagate, 结束在GCSpause。整个GC循环被GCSatomic状态分为标记和清除两个大阶段。

为了更清楚的看清整个GC循环,我们将每个GC循环概括为GCMark和GCSweep两个阶段,并把几个GC循环连起来看。

GCMark1, GCSweep1, GCMark2, GCSweep2,GCMark3, GCSweep3…

由于atomic函数会改变当前GC循环的白色值,因此GCSweep1,和GCMakr2阶段拥有相同的白色值,而GCSweep2,GCMark3拥有相同的白色值。

从sweeplist函数可以看出GCSweep2只会清理GCSweep1和GCMark2阶段的白色对象。并把所有存活对象都变成当前GC循环的白色值。

假如我们在执行GCMark2过程中调用了luaC_fullgc导致跳过了atomic阶段的执行,那么就不会产生更换GC循环白色值的行为,这样GCSweep1,GCMark2,GCSweep2,GCMark3都使用了相同的白色值来执行标记清除逻辑。由于GCSweep1已经清除过GCSweep0和GCMark1阶段产生(包括由黑变白)的白色对象,那么GCSweep2的作用其实只剩下“把所有在GCMark2阶段标记过的对象都变白”。相当于撤销了GCMark1的作用。而从GCMark3开始就能执行一轮全新的GC循环。

这就变相达到了最快结束当前GC循环的目的。


另一个就是luaC_barrierback的实现.

 
 #define luaC_barrierback(L,p,v) (  \
	(iscollectable(v) && isblack(p) && iswhite(gcvalue(v))) ? \
	luaC_barrierback_(L,p) : cast_void(0))

void luaC_barrierback_ (lua_State *L, Table *t) {
  global_State *g = G(L);
  lua_assert(isblack(t) && !isdead(g, t));
  black2gray(t);  /* make table gray (again) */
  linkgclist(t, g->grayagain);
}

这个函数其实在上篇分析中也有提到过。但那是在GCMark阶段时分析的luaC_barrierback的作用。我从来没有注意过如果在GCSweep阶段,触发了luaC_barrierback会怎么样。

在GCSweep阶段会触发luaC_barrierback么?答案是肯定的。因为sweeplist是增量执行的,不可能一下子就把所有黑对象变白。如果这时操作黑对象,就会触发luaC_barrierback。

那么在GCSweep阶段,如果触发了luaCbarrierback,那么相关黑对象会被变成灰对象,然后被挂在g->grayagain链表上。同时如果一个对象触发了luaC_barrierback, 说明他还没有对sweeplist处理过,也就是说最终他一定会被sweeplist函数变成白色。

这时就会存在一种情况,一个对象被挂在g->grayagain链表上,但是它是白色。这就是其中的智慧闪光点,虽然一个白色对象挂在g->grayagain, 但是他不会有副作用。因为在restartcollection函数中的 g->gray = g->grayagain = NULL语句会将整个grayagain链表清空。同时虽然白色对象的gclist字段依然有无效值,但是在下次插入gray或grayagain之前,会对此对象的gclist字段重新赋值,因此也不会有任何副作用。

纵观这两个例子可以发现,sweeplist本来是为了GCSatomic阶段之后清除对象而生,而luaC_barrierback本来也是为了在GCMark阶段保持GC循环不变式而生。但是他们在这两个场景之外,依然有良好的适用场景。这得益于良好的抽象,我有时甚至都在想,这也许已经是人类智慧的极限了。

实现一个数据库存储队列

大约在在两年前, 我就意识到定点存库的必要性,并在今年应用到实际项目中了。

从历史经验来看,将数据库存储行为收集并合并起来,确实可以极大的降低抽象代码过程中的心智负担。我们在写代码时已经不需要考虑数据库的存储频率,反正最终所有修改过的数据,只会存一次。

从我意识到定点存库的必要性之后,我就一直对当时的抽象不太满意,在最原始的抽象中,刷新数据是需要业务逻辑来完成的。而数据库合并模块本质上只做了去重而已,这也就是说,其实大量定点存储代码实际是散落在业务逻辑各处的。而在此之后虽然我一直在思考该怎么缓存这些数据库操作,但是一直没有什么实质性进展。

直到最近一段时间,我一直在刻意强迫自己放弃部分性能,以达到更好的抽象后才突然有了点想法。

以Redis数据库的hset(key, field, value)为例,最直接的设计就是把每个key下面的value有变动的field收集起来。然后在特定时机将这些field写入相应的key。相应的伪码如下:

	struct key {
		std::unordered_set<uint32_t> dirty_fields;
	};
	std::unordered_map<string, key> dirty_keys;

这其实就是第一版的抽象,因为没有value相关信息,所以序列化及更新数据库的责任都落在了相关的业务逻辑模块,这个数据库操作缓存模块有与没有其实区别不大。

最近我仔细研究了一下Redis的数据模型和我们业务逻辑的数据模型。我发现其实只要抽象出’set’,’hset’, ‘del’, ‘hdel’这些对应的操作,然后对这些操作进行缓存和去重就能满足99%的需求。

现在还有一个问题需要解决,如何将序列化的工作接管过来。这样就不会存在任何回调了。这个问题其实不难解决,只要我们设计一个通用的接口类提供一个serialize接口即可。

伪码大概如下:

	struct obj {
		std::string serialize() const = 0;
	};
	struct cmd {
		std::string name;
		uint32_t field;
		obj *value;
	};
	
	struct key {
		std::unordered_map<uint32_t, cmd> dirty_fields;
	};
	std::unordered_map<string, key> dirty_keys;
	
	void hset(const std::string &key, uint32_t field, obj *value) {
		auto &set = dirty_keys[key];
		auto &cmd = set[field];
		cmd.name = "hset";
		cmd.field = field;
		cmd.value = value;
	}
	void hdel(const std::string &key, uint32_t field) {
		auto &set = dirty_keys[key];
		auto &cmd = set[field];
		cmd.name = "hdel";
		cmd.field = field;
		cmd.value = nullptr;
	}
	void set(const std::string &key, obj *value) {
		auto &set = dirty_keys[key];
		auto &cmd = set[0];
		cmd.name = "set"
		cmd.value = value;
	}
	
	void del(const std::string &key) {
		auto &set = dirty_keys[key];
		set.clear();
		auto &cmd = set[0];
		cmd.name = "del";
		cmd.value = nullptr;
	}
	void flush() {
		for (auto &kiter:dirty_keys) {
			auto &key = kiter.second;
			auto &kname = kiter.first;
			for (auto &citer:key.dirty_fields) {
				auto &cmd = citer.second;
				if (cmd.name == "hset") {
					auto dat = cmd.value->serialize();
					HSET(kname, cmd.field, dat);
				} else if (cmd.name == "set") {
					auto dat = cmd.value->serialize();
					SET(kname, dat);
				} else if (cmd.name == "del") {
					DEL(kname);
				} else if (cmd.name == "hdel") {
					HDEL(kname, cmd.field);
				}
			}
		}
	}

当然这里面其实还有两个个细节问题。

  1. 如果你的数据是直接使用std::unordered_map<uint32_t, dbst> DB进行存储, 直接调用hset("foo", 1, &DB[1])需要考虑DB进行rehash的情况。因此需要new一个新的dbst并将新指针传递过去,类似这样hset("foo", 1, new dbst(DB[1]))。同时struct cmd需要做出如下改变:

	struct cmd {
		std::string name;
		uint32_t field;
		std::unique_ptr<obj> value;
	};

看似我们需要很多次new, 但是仔细分析一下整个内存分配行为,就会发现,整个操作序列大概类似这样:new(首次对一个对象进行set/hset),new,new,new/delete(重复对一个对象进行set/hset),new/delete,delete,delete。也就是说这种分配具有局部性,一般不太会造成内存碎片和性能问题。

  1. 需要保证在调用flush函数之前,所有指针的有效性,这个只要稍微留意一下,其实不难保证。

做完如上抽象后,我在想,有没有可能再次简化上述数据结构。

我仔细研究了一下我们的数据模型,我发现一件很有意思的事。

我们在数据库的每一条数据, 都在内存中有一份惟一的对应。也就是说一个指针一定只对应一个value(set(key, value)/hset(key, field, value))。只要这个指针在整个数据生命周期期间,不发生改变,我们就可以直接使用指针来作主键去重,在业务逻辑层使用std::unordered_map<uint32_t, std::unique_ptr<dbst>来缓存数据库数据即可。

这样数据结构就可以简化为如下:

	struct obj {
		std::string serialize() const = 0;
	};
	struct cmd {
		std::string name;
		std::string key;
		uint32_t field;
		obj *value;
	};
	std::unordered_map<intptr_t, size_t> v2i;
	std::vector<cmd> cmds;

	static cmd &overwrite(obj *v) {
		auto iter = v2i.find((intptr_t)v);
		if (iter != v2i.end()) {
			cmds[iter->second].name = "nop"
		}
		cmds.emplace_back();
		return cmds.back();
	}
	
	void hset(const std::string &key, uint32_t field, obj *value) {
		auto &cmd = overwrite(value);
		cmd.name = "hset";
		cmd.key = key;
		cmd.field = field;
		cmd.value = value;
	}
	void set(const std::string &key, obj *value) {
		auto &cmd = overwrite(value);
		cmd.name = "set"
		cmd.key = key;
		cmd.value = value;
	}
	void hdel(const std::string &key, uint32_t field) {
		cmds.emplace_back();
		auto &cmd = cmds.back();
		cmd.name = "hdel";
		cmd.key = key;
		cmd.field = field;
		cmd.value = nullptr;
	}
	void del(const std::string &key) {
		cmds.emplace_back();
		auto &cmd = cmds.back();
		cmd.name = "del";
		cmd.key = key;
		cmd.value = nullptr;
	}
	void flush() {
		v2i.clear();
		for (auto &cmd:cmds) {
			if (cmd.name == "hset") {
				auto dat = cmd.value->serialize();
				HSET(cmd.key, cmd.field, dat);
			} else if (cmd.name == "set") {
				auto dat = cmd.value->serialize();
				SET(cmd.key, dat);
			} else if (cmd.name == "del") {
				DEL(cmd.key);
			} else if (cmd.name == "hdel") {
				HDEL(cmd.key, cmd.field);
			}
		}
	}

做成一个操作队列,最麻烦的其实是在两个hset/set之间插入hdel/del。这时会有两种选择:

  1. 扫描cmds, 找到相同的key+field, 将删除,并将最后一个相同key+field的cmd结构改成hdel/del。

  2. 直接将del/hdel添加到队列末尾。

由于没有直接证据表明,方式1会快,加上我最近刻意强迫自己牺牲部分性来保持简洁性。因此我选用了方式2.