为silly增加了互斥锁

silly是基于Lua语言的Coroutine机制而实现的一个高并发网络框架。

其coroutine调度机制很简单,所有的函数都被包裹在coroutine中被执行,当代码调用socket.read/core.sleep等指定API时,当前coroutine会挂起,直到socket.read/core.sleep返回结果为止。在此coroutine被挂起期间,会将CPU调度给其他coroutine来运行。

大部分情况下,基于coroutine编程和基于多线程编程的思路是一致的。

例外就是,由于是调用指定API时才会挂起当前coroutine,相比多线程而言,基于coroutine的调度机制会引入一个缺点和一个优点。

缺点就是,如果不慎编写死循环代码,当前coroutine永远也不会挂起,其他coroutine永远也得不到执行的机会。

优点刚好就是缺点的另一面,由于只在调用指定API才会挂起当前coroutine, 我们可以在coroutine中安心的修改所有数据而不必担心由于调度的不确定性而产生数据竞争问题。同样由于我们在需要挂起时才挂起,所有不必要的上下文切换都可以避免,可以极大地减少上下文切换的开销。


一直以来,我都认为mutex是为了解决抢占式调度的不确定性才被发明的。由于silly不存在抢占式调度,所以没有必要实现一个mutex锁。

但是就我这几年的经验来看,不管是不是抢占式调度,并发问题始终都存在。

只不过非抢占式调度所造成的影响范围更少,以致为我认为那不是并发的问题,是异步的问题(现在回头看看这两个概念也没有分得这么清楚)。

对于这种异步问题,我通常的做法,要么是做惰性队列,要么是做失败补偿。都是针对特定业务逻辑case by case的去实现。并没有一套成熟通用的方案。

下面来看两个例子(一个基于失败补偿,一个基于惰性队列):

---失败被偿
local user = {
    money = 100
}

func Foo() {
    if user.money < 50 then
        return false
    end
    user.money = user.money - 50
    ok = rpc:call("FooOtherService")
    if !ok {
        user.money = user.money + 50
    }
    return ok
}

func Bar() {
    if user.money < 60 then
        return false 
    end
    user.money = user.money - 60
    ok = rpc:call("BarOtherService")
    if !ok {
        user.money = user.money + 60
    }
    return ok
}
---惰性队列
local user = {
    money = 100,
    q = nil,
}

func enter() {
    if user.q then
        table.insert(user.q, core.running())
        core.wait()
    } else {
        user.q = {}
    }
}

func leave() {
    if user.q then
        co := table.remove(user.q, 1)
        if not co then
            user.q = nil
        else
            core.wakeup(co)
        end
    end
}

func Foo() {
    enter()
    if user.money < 50 then
        return false 
    end
    ok = rpc:call("FooOtherService")
    if ok {
        user.money = user.money - 50
    }
    leave()
    return ok
}

func Bar() {
    enter()
    if user.money < 60 then
        return false 
    end
    ok = rpc:call("BarOtherService")
    if ok {
        user.money = user.money - 60
    }
    leave()
    return ok
}

对比上面两段代码,可以发现,失败补偿逻辑基本没有任何被抽象的可能。

惰性队列虽然可以借助于Lua语言的动态性来抽象成库,但他只能解决单层异步问题,再复杂的异步问题惰性队列就无法完美抽象了。

还是上面的代码,假如Bar函数改为如下需求, 就需要新增一个惰性队列来处理。不然就会产生死锁。

local user = {
    money = 100,
    money2 = 100,
    q = nil,
    q2 = nil,
}

func enter(obj, name) {
    if obj[name] then
        table.insert(obj[name], core.running())
        core.wait()
    } else {
        obj[name] = {}
    }
}

func leave(obj, name) {
    if obj[name] then
        co := table.remove(obj[name], 1)
        if not co then
            obj[name] = nil
        else
            core.wakeup(co)
        end
    end
}

func Foo() {
    enter(user, "q")
    if user.money < 50 then
        return false 
    end
    ok = rpc:call("FooOtherService")
    if ok {
        user.money = user.money - 50
    }
    leave(user, "q")
    return ok
}

func Bar() {
    enter(user, "q2")
    if user.money2 < 60 then
        return false 
    end
    --do something
    ok := Foo()
    if ok {
        user.money2 = user.money2 - 60
    }
    leave(user, "q2")
}

最近突然觉得把这类问题归结于并发问题, 一把可重入锁就可以完美解决上面所有问题。

比如下面代码:

local user = {
    money = 100,
    money2 = 100,
    lock = lock:new()
}

func Foo() {
    local guad<close> = user.lock()
    if user.money < 50 then
        return fales
    end
    ok = rpc:call("FooOtherService")
    if ok {
        user.money = user.money - 50
    }
    return ok
}

func Bar() {
    local guad<close> = user.lock()
    if user.money2 < 60 then
        return false 
    end
    --do something
    ok := Foo()
    if ok {
        user.money2 = user.money2 - 60
    }
}

由于锁是可重入的,所以同一个coroutine内的函数者都可以加锁成功,这样不管后期这个模块怎么修改,模块内都不会产生死锁效果。

按照传统方式,一个锁在使用之前是需要new出来并初始化的,在一些极端场景(比如我之前的经历,有400W个格子)可能会需要new出来很多锁。

由于我们的非抢占式调度的机制,大部分情况下,锁冲突的概率很小,所有需要锁的地方都new一个锁,在这种场合下稍嫌浪费。

换句话说,虽然我们需要实现一把互斥锁,但是他是乐观的。

基于以上思路,我换了一种锁的实现方式,它的使用方式大概如下:

local mutex = require "sys.sync.mutex"
local user = {
    money = 100,
    money2 = 100,
}

func Foo() {
    local guad<close> = mutex.lock(user)
    if user.money < 50 then
        return fales
    end
    ok = rpc:call("FooOtherService")
    if ok {
        user.money = user.money - 50
    }
    return ok
}

func Bar() {
    local guad<close> = mutex.lock(user)
    if user.money2 < 60 then
        return false 
    end
    --do something
    ok := Foo()
    if ok {
        user.money2 = user.money2 - 60
    }
}

基于乐观锁的前提,这个锁被分配出来之后几乎不会发生碰撞,很快就会被释放掉。

我们在mutex内部可以做一些内存优化,比如我们做一个mutex cache, 当一把锁释放后就放入mutex cache中,当我们需要一个锁时先尝试从cache中分配等。

这样我们就有了一种通用的抽象方案。

ps. 写这篇文章时,我特意去查了wiki, 互斥锁原本就是为了解决并发问题而存在的,这种并发并不局限于抢占式并发还是非抢占式并发。

发表评论

two + four =