实现一个数据库存储队列

大约在在两年前, 我就意识到定点存库的必要性,并在今年应用到实际项目中了。

从历史经验来看,将数据库存储行为收集并合并起来,确实可以极大的降低抽象代码过程中的心智负担。我们在写代码时已经不需要考虑数据库的存储频率,反正最终所有修改过的数据,只会存一次。

从我意识到定点存库的必要性之后,我就一直对当时的抽象不太满意,在最原始的抽象中,刷新数据是需要业务逻辑来完成的。而数据库合并模块本质上只做了去重而已,这也就是说,其实大量定点存储代码实际是散落在业务逻辑各处的。而在此之后虽然我一直在思考该怎么缓存这些数据库操作,但是一直没有什么实质性进展。

直到最近一段时间,我一直在刻意强迫自己放弃部分性能,以达到更好的抽象后才突然有了点想法。

以Redis数据库的hset(key, field, value)为例,最直接的设计就是把每个key下面的value有变动的field收集起来。然后在特定时机将这些field写入相应的key。相应的伪码如下:

	struct key {
		std::unordered_set<uint32_t> dirty_fields;
	};
	std::unordered_map<string, key> dirty_keys;

这其实就是第一版的抽象,因为没有value相关信息,所以序列化及更新数据库的责任都落在了相关的业务逻辑模块,这个数据库操作缓存模块有与没有其实区别不大。

最近我仔细研究了一下Redis的数据模型和我们业务逻辑的数据模型。我发现其实只要抽象出’set’,’hset’, ‘del’, ‘hdel’这些对应的操作,然后对这些操作进行缓存和去重就能满足99%的需求。

现在还有一个问题需要解决,如何将序列化的工作接管过来。这样就不会存在任何回调了。这个问题其实不难解决,只要我们设计一个通用的接口类提供一个serialize接口即可。

伪码大概如下:

	struct obj {
		std::string serialize() const = 0;
	};
	struct cmd {
		std::string name;
		uint32_t field;
		obj *value;
	};
	
	struct key {
		std::unordered_map<uint32_t, cmd> dirty_fields;
	};
	std::unordered_map<string, key> dirty_keys;
	
	void hset(const std::string &key, uint32_t field, obj *value) {
		auto &set = dirty_keys[key];
		auto &cmd = set[field];
		cmd.name = "hset";
		cmd.field = field;
		cmd.value = value;
	}
	void hdel(const std::string &key, uint32_t field) {
		auto &set = dirty_keys[key];
		auto &cmd = set[field];
		cmd.name = "hdel";
		cmd.field = field;
		cmd.value = nullptr;
	}
	void set(const std::string &key, obj *value) {
		auto &set = dirty_keys[key];
		auto &cmd = set[0];
		cmd.name = "set"
		cmd.value = value;
	}
	
	void del(const std::string &key) {
		auto &set = dirty_keys[key];
		set.clear();
		auto &cmd = set[0];
		cmd.name = "del";
		cmd.value = nullptr;
	}
	void flush() {
		for (auto &kiter:dirty_keys) {
			auto &key = kiter.second;
			auto &kname = kiter.first;
			for (auto &citer:key.dirty_fields) {
				auto &cmd = citer.second;
				if (cmd.name == "hset") {
					auto dat = cmd.value->serialize();
					HSET(kname, cmd.field, dat);
				} else if (cmd.name == "set") {
					auto dat = cmd.value->serialize();
					SET(kname, dat);
				} else if (cmd.name == "del") {
					DEL(kname);
				} else if (cmd.name == "hdel") {
					HDEL(kname, cmd.field);
				}
			}
		}
	}

当然这里面其实还有两个个细节问题。

  1. 如果你的数据是直接使用std::unordered_map<uint32_t, dbst> DB进行存储, 直接调用hset("foo", 1, &DB[1])需要考虑DB进行rehash的情况。因此需要new一个新的dbst并将新指针传递过去,类似这样hset("foo", 1, new dbst(DB[1]))。同时struct cmd需要做出如下改变:

	struct cmd {
		std::string name;
		uint32_t field;
		std::unique_ptr<obj> value;
	};

看似我们需要很多次new, 但是仔细分析一下整个内存分配行为,就会发现,整个操作序列大概类似这样:new(首次对一个对象进行set/hset),new,new,new/delete(重复对一个对象进行set/hset),new/delete,delete,delete。也就是说这种分配具有局部性,一般不太会造成内存碎片和性能问题。

  1. 需要保证在调用flush函数之前,所有指针的有效性,这个只要稍微留意一下,其实不难保证。

做完如上抽象后,我在想,有没有可能再次简化上述数据结构。

我仔细研究了一下我们的数据模型,我发现一件很有意思的事。

我们在数据库的每一条数据, 都在内存中有一份惟一的对应。也就是说一个指针一定只对应一个value(set(key, value)/hset(key, field, value))。只要这个指针在整个数据生命周期期间,不发生改变,我们就可以直接使用指针来作主键去重,在业务逻辑层使用std::unordered_map<uint32_t, std::unique_ptr<dbst>来缓存数据库数据即可。

这样数据结构就可以简化为如下:

	struct obj {
		std::string serialize() const = 0;
	};
	struct cmd {
		std::string name;
		std::string key;
		uint32_t field;
		obj *value;
	};
	std::unordered_map<intptr_t, size_t> v2i;
	std::vector<cmd> cmds;

	static cmd &overwrite(obj *v) {
		auto iter = v2i.find((intptr_t)v);
		if (iter != v2i.end()) {
			cmds[iter->second].name = "nop"
		}
		cmds.emplace_back();
		return cmds.back();
	}
	
	void hset(const std::string &key, uint32_t field, obj *value) {
		auto &cmd = overwrite(value);
		cmd.name = "hset";
		cmd.key = key;
		cmd.field = field;
		cmd.value = value;
	}
	void set(const std::string &key, obj *value) {
		auto &cmd = overwrite(value);
		cmd.name = "set"
		cmd.key = key;
		cmd.value = value;
	}
	void hdel(const std::string &key, uint32_t field) {
		cmds.emplace_back();
		auto &cmd = cmds.back();
		cmd.name = "hdel";
		cmd.key = key;
		cmd.field = field;
		cmd.value = nullptr;
	}
	void del(const std::string &key) {
		cmds.emplace_back();
		auto &cmd = cmds.back();
		cmd.name = "del";
		cmd.key = key;
		cmd.value = nullptr;
	}
	void flush() {
		v2i.clear();
		for (auto &cmd:cmds) {
			if (cmd.name == "hset") {
				auto dat = cmd.value->serialize();
				HSET(cmd.key, cmd.field, dat);
			} else if (cmd.name == "set") {
				auto dat = cmd.value->serialize();
				SET(cmd.key, dat);
			} else if (cmd.name == "del") {
				DEL(cmd.key);
			} else if (cmd.name == "hdel") {
				HDEL(cmd.key, cmd.field);
			}
		}
	}

做成一个操作队列,最麻烦的其实是在两个hset/set之间插入hdel/del。这时会有两种选择:

  1. 扫描cmds, 找到相同的key+field, 将删除,并将最后一个相同key+field的cmd结构改成hdel/del。

  2. 直接将del/hdel添加到队列末尾。

由于没有直接证据表明,方式1会快,加上我最近刻意强迫自己牺牲部分性来保持简洁性。因此我选用了方式2.



发表评论