JavaScript 中的发布订阅模式

发布订阅模式在 JS 中很常见,本质是将指定事件的订阅者存储起来,等到触发事件时一块通知,基础的发布订阅实现如下:

const PubSub = {
    // 存储事件及其队列
    events: {},
    // 为事件添加订阅者,或者说订阅事件
    listen: function(ev, fn) {
        if (!this.events[ev]) {
            this.events[ev] = [];
        }
        // 将订阅者存储起来
        this.events[ev].push(fn);
    },
    // 触发事件
    dispatch: function(ev, ...args) {
        if (!this.events[ev]) return;
        // 所有订阅过该事件的订阅者都将会被通知
        this.events[ev].forEach(fn => {
            fn(...args);
        })
    },
    // 移除订阅
    remove: function(ev, fn) {
        if (!this.events[ev]) return;
        if (!fn) {
            // 没传递 fn,表示清除该事件下的所有订阅者
            this.events.length = 0;
            return;
        }
        const index = this.events[ev].findIndex(_fn => _fn === fn);
        this.events[ev].splice(index, 1);
    },
    // 单次订阅
    // 注意,调用 one 之后就不能手动调用 remove 移除了,因为 fn 被 x 替换了,要想还能手动移除,可能得改 events 的结构了
    one: function(ev, fn) {
        const x = () => {
            fn();
            this.remove(ev, x);
        }
        this.listen(ev, x);
    },
    // clear... 清除全部事件
}

JavaScript 设计模式与开发实践 中举了个开发中的例子,我深有感触,假如我们负责登录模块,用户信息模块、消息模块等需要知道登录状态才能正常显示。如果没有采用发布订阅模式,单纯的将依赖模块的逻辑放入登录成功的回调:

login().then(data => {
    UserInfo.init(data);
    Message.init(data);
    // ...
})

这样的话,其他模块的初始化工作就得由我们负责了,而且每次有一个新的模块依赖登录状态的话,都要由我们去改代码,即便登录逻辑并没有变动。这显然已经超出了我们的职责边界,不符合单一职责原则和开放封闭原则。

因此,我们希望登录模块在登录成功时发送一个事件,依赖模块只需要订阅这个事件就可以了,一旦事件来临,该模块就会自己去处理相关逻辑。再有新的模块依赖登录时,它都可以自己去处理。代码应该是这个样子的:

login().then(data => {
    // 登录成功后触发 loginSuccess 事件,并将用户信息传递过去
    LoginPubSub.dispatch("loginSuccess", data);
})

UserInfo.listen("loginSuccess", data => {
    // 用户信息模块的处理逻辑
})

Message.listen("loginSuccess", data => {
    // 消息模块的处理逻辑
})

// 再来个购物车模块,各个模块自己订阅消息就可以了

简易的发布订阅模式够用了,但最好还是设置一个命名空间以减少事件冲突的可能,同时还能做一些业务划分,如上面代码中的 LoginPubSub,表明是和登录相关代的发布订阅,当然也可以在事件名上加前缀。

class PubSub {
    constructor() {
        // 存储事件及其订阅者
        this.events = {};
        // 存储离线事件及其触发函数
        this.offline = {};
    }

    listen(ev, fn) {
        if (!this.events[ev]) {
            this.events[ev] = [];
        }
        this.events[ev].push(fn);
        // 发现有存储起来的 _dispatch,则依次触发
        if (this.offline[ev]) {
            this.offline[ev].forEach(_dispatch => _dispatch());
            // 清空离线事件
            this.offline[ev].length = 0;
        }
    }

    dispatch(ev, ...args) {
        // 这个 _dispath 其实可以提出去,专门做触发逻辑,这样符合单一职责原则
        // 然后这里再用一个函数包裹 _dispatch 的调用,也符合下面的解释
        // 这里偷懒了,莫捶我!
        const _dispatch = () => {
            if (!this.events[ev]) return;
            // 依次调用订阅者
            this.events[ev].forEach(fn => fn(...args));
        }

        if (!this.events[ev]) {
            // 没有此事件的订阅者,先将 _dispatch 方法存储起来,作为离线事件
            // 等有人订阅事件时再触发
            if (!this.offline[ev]) {
                this.offline[ev] = [];
            }
            this.offline[ev].push(_dispatch);
        } else {
            // 有此事件的订阅者,直接触发
            _dispatch();
        }
    }
}

const loginPubSub = new PubSub();
// 先触发
loginPubSub.dispatch("hello", "tom");
// 再订阅
loginPubSub.listen("hello", name => {
    console.log("Hello, ", name);
})
// 先订阅
loginPubSub.listen("hello", name => {
    console.log(`老${name}呀!唉,老${name}!(茄子掩面)`);
})
// 再触发
loginPubSub.dispatch("hello", "马");

上面的代码使用 class 实现了命名空间,每个业务模块都可创建自己的事件发布对象。同时还实现了离线事件,即事件触发可能先于事件订阅。即在事件触发时还没有人订阅,那就将此次触发存储起来,等待下次有人订阅再触发。如何存储呢?在触发函数外层再包裹一层函数,这样能保存触发函数的参数,确保触发函数在后续被调用时能使用正确的参数。


其实这里的 “离线事件” 是有问题的,这里作者举的例子是 QQ 的离线消息,没登录时消息都存储起来,等登录后再一次性推给用户,然后将这些离线消息清空,在这个场景下是没问题的,因为依赖只有一个人,即账号本人。但如果是用户信息模块、消息模块依赖登录模块的例子,就不太妥当了。作者说因为网络原因,登录成功的消息先发出去了,其他依赖模块的代码还没加载完于是还没订阅,按照上面的代码逻辑,用户信息模块加载完并执行 listen,导致 offline 里面的触发函数全部被执行然后清空,如果这时消息模块刚加载完,那么它执行 listen 时就无法响应那些触发函数了。这就相当于学校发布了通知,不管学生当时在没在学校,等他到了学校,他都应该能看到这个通知,总不能另一个学生看完后将通知撕了。

这种场景该如何实现呢?感觉要引入更多的状态了!

学校通知:对于指定的学校通知,学生应该只订阅一次。历史通知应该保留,便于未收到通知的学生主动查阅。

class PubSub {
    constructor() {
        // 保存消息及其订阅者
        this.events = {};
        // 保存消息及其发布者
        this.offline = {};
    }

    listen(ev, fn) {
        if (!this.events[ev]) {
            this.events[ev] = [];
        }
        this.events[ev].push(fn);
        // 每次监听都刷一遍历史消息
        // 对于只监听一次的订阅者,不会收到重复的消息
        if (this.offline[ev]) {
            this.offline[ev].forEach(_dispatch => _dispatch());
        }
    }

    once(ev, fn) {
        const cleanUp = () => {
            fn();
            this.remove(ev, cleanUp);
        }
        this.listen(ev, cleanUp);
    }

    remove(ev, fn) {
        if (!this.events[ev]) return;
        if (!fn) {
            this.events[ev] = [];
            return;
        }
        this.events[ev] = this.events[ev].filter(_fn => _fn !== fn);
    }

    _dispatch(ev, ...args) {
        if (!this.events[ev]) return;
        this.events[ev].forEach(fn => fn(...args));
    }

    dispatch(ev, ...args) {
        const fn = () => {
            this._dispatch(ev, ...args);
        }
        if (!this.offline[ev]) {
            this.offline[ev] = [];
        }
        // 任何消息都应该存储起来
        this.offline[ev].push(fn);
        // 如果当前有订阅者,则触发一次,_dispatch 内部做判断了
        fn();
    }
}

const school = new PubSub();

school.listen("fangjia", () => {
    console.log("憨憨收到");
})

school.once("fangjia", () => {
    console.log("张三收到");
})

// 学校通知放假
school.dispatch("fangjia");

school.once("fangjia", () => {
    console.log("李四收到");
})

school.once("fangjia", () => {
    console.log("王五收到");
})

// 学校又通知放假(只有憨憨会受到)
school.dispatch("fangjia");

// 憨憨收到
// 张三收到
// 憨憨收到
// 李四收到
// 憨憨收到
// 王五收到

// 憨憨收到(啊?)

这样倒是能解决问题,但是得要求所有订阅者只订阅一次事件,否则就和憨憨一样,每次都能收到重复的通知。但是如果学校发现之前的通知错了,又发了个新的,这下就只有憨憨收到了,要不然就让所有学生订阅新的通知事件,如 "fangjia 2.0",但这么做让学生压力很大,鬼知道要订阅到 “fangjia xx.0”,最好还是只有一个放假通知,即一个事件。

所以,重点在于订阅者如何区分这个消息对自己来说是新消息还是旧消息。新消息,响应即可,旧消息,直接丢掉!

这里使用历史消息 id (历史消息数组的长度)作为表示,给每个订阅者打上标记。

class PubSub {
    constructor() {
        // 保存消息及其订阅者
        this.events = {};
        // 保存历史消息及其发布者
        this.history = {};
    }

    listen(ev, fn) {
        if (!this.events[ev]) {
            this.events[ev] = [];
        }
        this.events[ev].push({
            // 新订阅者
            lastEventId: -1,
            callback: fn,
        });
        if (this.history[ev]) {
            // 如果有历史消息,就重现当时的消息
            this.history[ev].forEach(_dispatch => _dispatch(true));
        }
    }

    _dispatch(isNew, ev, ...args) {
        if (!this.events[ev]) return;
        if (isNew) {
            // isNew 表示只给后来订阅者 dispatch 消息(一般是最后一个,且仅它一个,毕竟是同步的代码)
            this.events[ev].filter(sub => sub.lastEventId === -1).forEach(sub => {
                sub.lastEventId = this.history[ev].length - 1;
                sub.callback(...args);
            });
        } else {
            this.events[ev].forEach(sub => {
                sub.lastEventId = this.history[ev].length - 1;
                sub.callback(...args);
            });
        }
    }

    dispatch(ev, ...args) {
        const fn = (isNew) => {
            // 注意,这么做能保存当时的参数
            this._dispatch(isNew, ev, ...args);
        }
        if (!this.history[ev]) {
            this.history[ev] = [];
        }
        // 任何消息都应该存储起来
        this.history[ev].push(fn);
        // 如果当前有订阅者,则触发一次,_dispatch 内部做判断了
        fn();
    }
}

const school = new PubSub();

school.listen("fangjia", (txt) => {
    console.log("憨憨收到: ", txt);
})

school.listen("fangjia", (txt) => {
    console.log("张三收到: ", txt);
})

// 学校通知放假
school.dispatch("fangjia", "放假通知");

school.listen("fangjia", (txt) => {
    console.log("李四收到: ", txt);
})

school.listen("fangjia", (txt) => {
    console.log("王五收到: ", txt);
})

// 学校又通知放假(只有憨憨会受到)
school.dispatch("fangjia", "新放假通知!!!");

// 憨憨收到: 放假通知
// 张三收到: 放假通知
// 李四收到: 放假通知
// 王五收到: 放假通知

// 憨憨收到: 新放假通知
// 张三收到: 新放假通知
// 李四收到: 新放假通知
// 王五收到: 新放假通知

大功告成!

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注