事件与发布订阅模式
2025年3月7日大约 15 分钟
事件与发布订阅模式
事件驱动编程是JavaScript异步编程的另一种重要模式。本文将介绍发布-订阅模式的概念和实现,以及如何使用事件系统构建松耦合、可扩展的应用程序架构。
发布-订阅模式基础
什么是发布-订阅模式
发布-订阅模式(Publish-Subscribe Pattern,简称Pub-Sub)是一种消息范式,它将消息的发送者(发布者)和接收者(订阅者)解耦,使发布者不需要知道谁在接收消息,订阅者也不需要知道谁在发送消息。
这种模式的核心组件包括:
- 发布者(Publisher):负责发布事件/消息
- 订阅者(Subscriber):对特定事件感兴趣并注册监听器
- 事件总线/调度中心(Event Bus/Dispatcher):负责管理订阅关系并将事件从发布者路由到订阅者
与观察者模式的区别
虽然发布-订阅模式和观察者模式(Observer Pattern)经常被混用,但它们有一个关键区别:
- 观察者模式:观察者直接订阅主题(Subject),主题维护观察者列表并直接通知它们
- 发布-订阅模式:发布者和订阅者之间有一个事件通道/调度中心,它们彼此不知道对方的存在
// 观察者模式
class Subject {
constructor() {
this.observers = [];
}
addObserver(observer) {
this.observers.push(observer);
}
notifyAll(data) {
this.observers.forEach(observer => observer.update(data));
}
}
// 发布-订阅模式
class EventBus {
constructor() {
this.subscribers = {};
}
subscribe(event, callback) {
if (!this.subscribers[event]) {
this.subscribers[event] = [];
}
this.subscribers[event].push(callback);
}
publish(event, data) {
if (!this.subscribers[event]) return;
this.subscribers[event].forEach(callback => callback(data));
}
}
实现发布-订阅模式
基本实现
下面是一个简单的发布-订阅模式实现:
class EventEmitter {
constructor() {
this.events = {};
}
// 订阅事件
on(eventName, listener) {
if (!this.events[eventName]) {
this.events[eventName] = [];
}
this.events[eventName].push(listener);
return this; // 支持链式调用
}
// 发布事件
emit(eventName, ...args) {
const listeners = this.events[eventName];
if (!listeners || listeners.length === 0) return false;
listeners.forEach(listener => {
listener.apply(this, args);
});
return true;
}
// 移除特定事件的特定监听器
off(eventName, listenerToRemove) {
if (!this.events[eventName]) return this;
this.events[eventName] = this.events[eventName].filter(
listener => listener !== listenerToRemove
);
return this;
}
// 只订阅一次,触发后自动移除
once(eventName, listener) {
const onceWrapper = (...args) => {
listener.apply(this, args);
this.off(eventName, onceWrapper);
};
return this.on(eventName, onceWrapper);
}
}
使用示例
// 创建事件发射器
const emitter = new EventEmitter();
// 订阅事件
function messageHandler(message) {
console.log('收到消息:', message);
}
emitter.on('message', messageHandler);
// 只订阅一次的事件
emitter.once('specialEvent', data => {
console.log('这个事件只会触发一次:', data);
});
// 发布事件
emitter.emit('message', 'Hello World'); // 输出: 收到消息: Hello World
emitter.emit('specialEvent', { id: 1 }); // 输出: 这个事件只会触发一次: { id: 1 }
emitter.emit('specialEvent', { id: 2 }); // 不会有输出,因为监听器已被移除
// 取消订阅
emitter.off('message', messageHandler);
emitter.emit('message', 'This won\'t be logged'); // 没有输出
高级功能
我们可以扩展基本实现,添加一些高级功能:
class AdvancedEventEmitter extends EventEmitter {
// 添加错误处理
emit(eventName, ...args) {
try {
return super.emit(eventName, ...args);
} catch (error) {
console.error(`事件 "${eventName}" 处理过程中出错:`, error);
// 发布错误事件
super.emit('error', error, eventName);
return false;
}
}
// 移除所有特定事件的监听器
removeAllListeners(eventName) {
if (eventName) {
this.events[eventName] = [];
} else {
this.events = {};
}
return this;
}
// 获取特定事件的监听器数量
listenerCount(eventName) {
const listeners = this.events[eventName] || [];
return listeners.length;
}
// 获取所有事件名称
eventNames() {
return Object.keys(this.events);
}
// 设置最大监听器数量(防止内存泄漏)
setMaxListeners(n) {
this.maxListeners = n;
return this;
}
}
浏览器中的事件系统
DOM事件模型
浏览器的DOM事件系统是发布-订阅模式的一个实现。它包括三个阶段:捕获阶段、目标阶段和冒泡阶段。
// 添加事件监听器
element.addEventListener('click', function(event) {
console.log('元素被点击');
}, false); // 第三个参数为false表示在冒泡阶段触发
// 移除事件监听器
element.removeEventListener('click', handler);
// 阻止事件冒泡
function stopPropagation(event) {
event.stopPropagation();
}
// 阻止默认行为
function preventDefault(event) {
event.preventDefault();
}
自定义事件
浏览器还支持创建和分发自定义事件:
// 创建自定义事件
const customEvent = new CustomEvent('userLogin', {
detail: { userId: 123, username: 'john_doe' },
bubbles: true,
cancelable: true
});
// 分发自定义事件
document.dispatchEvent(customEvent);
// 监听自定义事件
document.addEventListener('userLogin', function(event) {
console.log('用户登录:', event.detail.username);
});
Node.js中的事件系统
EventEmitter类
Node.js提供了内置的EventEmitter
类,它是Node.js异步事件驱动架构的核心:
const EventEmitter = require('events');
// 创建事件发射器实例
const myEmitter = new EventEmitter();
// 订阅事件
myEmitter.on('data', (data) => {
console.log('收到数据:', data);
});
// 发布事件
myEmitter.emit('data', { id: 1, name: '示例数据' });
// 错误处理
myEmitter.on('error', (err) => {
console.error('发生错误:', err.message);
});
// 如果没有监听error事件,则会抛出异常
myEmitter.emit('error', new Error('出错了'));
继承EventEmitter
许多Node.js核心模块继承自EventEmitter,我们也可以在自己的类中继承它:
const EventEmitter = require('events');
const fs = require('fs');
class FileWatcher extends EventEmitter {
constructor(filename) {
super();
this.filename = filename;
}
watch() {
fs.watch(this.filename, (eventType, filename) => {
this.emit('change', filename, eventType);
});
console.log(`正在监视文件: ${this.filename}`);
}
// 添加自定义方法
async readContent() {
try {
const content = await fs.promises.readFile(this.filename, 'utf8');
this.emit('content', content);
return content;
} catch (error) {
this.emit('error', error);
throw error;
}
}
}
// 使用自定义的EventEmitter子类
const watcher = new FileWatcher('./config.json');
watcher.on('change', (filename, eventType) => {
console.log(`文件 ${filename} 已${eventType}`);
watcher.readContent().catch(err => console.error('读取文件失败:', err));
});
watcher.on('content', (content) => {
console.log('文件内容:', content);
});
watcher.on('error', (error) => {
console.error('监视器错误:', error.message);
});
watcher.watch();
实际应用场景
用户界面交互
发布-订阅模式非常适合处理UI交互:
// UI组件之间的通信
class UIEventBus extends EventEmitter {}
const uiEvents = new UIEventBus();
// 导航组件
class Navigation {
constructor(eventBus) {
this.eventBus = eventBus;
// 监听事件
this.eventBus.on('user:login', this.updateNavigation.bind(this));
this.eventBus.on('cart:update', this.updateCartIcon.bind(this));
}
updateNavigation(user) {
console.log(`更新导航为已登录用户: ${user.name}`);
// 更新导航UI
}
updateCartIcon(cart) {
console.log(`购物车中有 ${cart.items.length} 件商品`);
// 更新购物车图标
}
navigateTo(route) {
// 导航到新页面
this.eventBus.emit('navigation:change', { route });
}
}
// 用户组件
class UserManager {
constructor(eventBus) {
this.eventBus = eventBus;
this.currentUser = null;
}
login(username, password) {
// 模拟登录API调用
setTimeout(() => {
this.currentUser = { id: 1, name: username };
this.eventBus.emit('user:login', this.currentUser);
}, 1000);
}
logout() {
this.currentUser = null;
this.eventBus.emit('user:logout');
}
}
// 购物车组件
class ShoppingCart {
constructor(eventBus) {
this.eventBus = eventBus;
this.items = [];
// 监听用户登出事件,清空购物车
this.eventBus.on('user:logout', this.clear.bind(this));
}
addItem(product) {
this.items.push(product);
this.eventBus.emit('cart:update', { items: this.items });
}
clear() {
this.items = [];
this.eventBus.emit('cart:update', { items: this.items });
}
}
// 使用示例
const eventBus = new UIEventBus();
const nav = new Navigation(eventBus);
const userManager = new UserManager(eventBus);
const cart = new ShoppingCart(eventBus);
// 用户登录
userManager.login('john_doe', 'password123');
// 添加商品到购物车
cart.addItem({ id: 101, name: '商品1', price: 99.99 });
// 用户登出
userManager.logout();
模块间通信
在大型应用中,发布-订阅模式可以帮助不同模块之间通信:
// 应用程序核心
class Application {
constructor() {
this.eventBus = new EventEmitter();
this.modules = {};
}
registerModule(name, moduleInstance) {
this.modules[name] = moduleInstance;
moduleInstance.init(this.eventBus);
console.log(`模块 "${name}" 已注册`);
}
start() {
this.eventBus.emit('app:start');
console.log('应用程序已启动');
}
}
// 模块基类
class Module {
init(eventBus) {
this.eventBus = eventBus;
this.registerEventHandlers();
}
registerEventHandlers() {
// 由子类实现
}
}
// 具体模块实现
class AuthModule extends Module {
registerEventHandlers() {
this.eventBus.on('auth:login', this.login.bind(this));
this.eventBus.on('auth:logout', this.logout.bind(this));
this.eventBus.on('app:start', () => {
console.log('Auth模块: 应用启动,检查登录状态');
});
}
login(credentials) {
console.log('Auth模块: 处理登录', credentials);
// 登录逻辑...
this.eventBus.emit('user:authenticated', { id: 1, name: credentials.username });
}
logout() {
console.log('Auth模块: 处理登出');
// 登出逻辑...
this.eventBus.emit('user:unauthenticated');
}
}
class DataModule extends Module {
constructor() {
super();
this.data = {};
}
registerEventHandlers() {
this.eventBus.on('data:fetch', this.fetchData.bind(this));
this.eventBus.on('user:authenticated', this.loadUserData.bind(this));
this.eventBus.on('user:unauthenticated', this.clearUserData.bind(this));
}
fetchData(params) {
console.log('Data模块: 获取数据', params);
// 模拟API请求
setTimeout(() => {
const data = { id: params.id, content: '获取的数据内容' };
this.data[params.id] = data;
this.eventBus.emit('data:fetched', data);
}, 500);
}
loadUserData(user) {
console.log(`Data模块: 加载用户 ${user.id} 的数据`);
// 加载用户特定数据...
this.eventBus.emit('data:user-loaded', { userId: user.id, preferences: {} });
}
clearUserData() {
console.log('Data模块: 清除用户数据');
// 清除用户特定数据...
this.data = {};
}
}
// 使用示例
const app = new Application();
app.registerModule('auth', new AuthModule());
app.registerModule('data', new DataModule());
// 启动应用
app.start();
// 触发登录事件
app.eventBus.emit('auth:login', { username: 'user123', password: 'pass123' });
// 获取数据
app.eventBus.emit('data:fetch', { id: 'article-123' });
// 登出
app.eventBus.emit('auth:logout');
异步操作管理
发布-订阅模式可以用于管理复杂的异步操作流程:
class AsyncTaskManager extends EventEmitter {
constructor() {
super();
this.tasks = new Map();
this.running = false;
}
addTask(id, taskFn) {
this.tasks.set(id, {
fn: taskFn,
status: 'pending',
result: null,
error: null
});
this.emit('task:added', { id, status: 'pending' });
return this;
}
async runAll() {
if (this.running) return;
this.running = true;
this.emit('manager:start');
const taskIds = Array.from(this.tasks.keys());
for (const id of taskIds) {
const task = this.tasks.get(id);
try {
this.emit('task:start', { id });
task.status = 'running';
// 执行任务
const result = await task.fn();
task.status = 'completed';
task.result = result;
this.emit('task:complete', { id, result });
} catch (error) {
task.status = 'failed';
task.error = error;
this.emit('task:error', { id, error });
}
}
this.running = false;
this.emit('manager:complete', {
successful: Array.from(this.tasks.entries())
.filter(([_, task]) => task.status === 'completed')
.map(([id]) => id),
failed: Array.from(this.tasks.entries())
.filter(([_, task]) => task.status === 'failed')
.map(([id]) => id)
});
}
getTaskStatus(id) {
return this.tasks.has(id) ? this.tasks.get(id).status : null;
}
getTaskResult(id) {
return this.tasks.has(id) ? this.tasks.get(id).result : null;
}
clearTasks() {
this.tasks.clear();
this.emit('manager:clear');
return this;
}
}
// 使用示例
async function demoAsyncTaskManager() {
const taskManager = new AsyncTaskManager();
// 监听事件
taskManager.on('task:added', ({ id }) => {
console.log(`任务 ${id} 已添加`);
});
taskManager.on('task:start', ({ id }) => {
console.log(`任务 ${id} 开始执行`);
});
taskManager.on('task:complete', ({ id, result }) => {
console.log(`任务 ${id} 完成,结果:`, result);
});
taskManager.on('task:error', ({ id, error }) => {
console.log(`任务 ${id} 失败:`, error.message);
});
taskManager.on('manager:complete', ({ successful, failed }) => {
console.log(`所有任务执行完毕。成功: ${successful.length}, 失败: ${failed.length}`);
});
// 添加任务
taskManager.addTask('task1', async () => {
await new Promise(resolve => setTimeout(resolve, 1000));
return '任务1结果';
});
taskManager.addTask('task2', async () => {
await new Promise(resolve => setTimeout(resolve, 500));
return '任务2结果';
});
taskManager.addTask('task3', async () => {
await new Promise((_, reject) => setTimeout(() => reject(new Error('任务3失败')), 800));
});
// 运行所有任务
await taskManager.runAll();
// 获取结果
console.log('任务1状态:', taskManager.getTaskStatus('task1'));
console.log('任务1结果:', taskManager.getTaskResult('task1'));
}
demoAsyncTaskManager();
性能与内存管理
内存泄漏问题
使用发布-订阅模式时,需要注意可能的内存泄漏问题:
// 潜在的内存泄漏
function createLeakyPattern() {
const emitter = new EventEmitter();
const heavyData = new Array(1000000).fill('大量数据');
// 问题:闭包引用了heavyData,但没有移除监听器
emitter.on('data', () => {
console.log(heavyData.length);
});
return emitter;
}
// 更好的做法
function createCleanPattern() {
const emitter = new EventEmitter();
// 使用弱引用或确保移除监听器
const handler = () => {
console.log('处理数据');
};
emitter.on('data', handler);
// 提供清理方法
return {
emitter,
cleanup() {
emitter.off('data', handler);
}
};
}
性能优化技巧
// 1. 使用命名空间避免事件名冲突
emitter.on('user:login', handler); // 比 emitter.on('login', handler) 更好
// 2. 限制监听器数量
emitter.setMaxListeners(20); // 默认Node.js中是10
// 3. 使用once()减少手动移除的需要
emitter.once('initialize', init);
// 4. 使用通配符匹配多个事件(需要特殊实现)
class WildcardEventEmitter extends EventEmitter {
emit(eventName, ...args) {
// 精确匹配
super.emit(eventName, ...args);
// 通配符匹配
const wildcardEvent = eventName.split(':')[0] + ':*';
super.emit(wildcardEvent, eventName, ...args);
// 全局通配符
super.emit('*', eventName, ...args);
}
}
// 使用通配符
const wEmitter = new WildcardEventEmitter();
wEmitter.on('user:*', (fullEventName, data) => {
console.log(`捕获到用户事件 ${fullEventName}:`, data);
});
测试发布-订阅系统
单元测试
// 使用Jest测试EventEmitter
describe('EventEmitter', () => {
let emitter;
beforeEach(() => {
emitter = new EventEmitter();
});
test('应该能够订阅和发布事件', () => {
const mockFn = jest.fn();
emitter.on('test', mockFn);
emitter.emit('test', 'data');
expect(mockFn).toHaveBeenCalledTimes(1);
expect(mockFn).toHaveBeenCalledWith('data');
});
test('应该能够移除监听器', () => {
const mockFn = jest.fn();
emitter.on('test', mockFn);
emitter.off('test', mockFn);
emitter.emit('test', 'data');
expect(mockFn).not.toHaveBeenCalled();
});
test('once方法应该只触发一次', () => {
const mockFn = jest.fn();
emitter.once('test', mockFn);
emitter.emit('test', 'first');
emitter.emit('test', 'second');
expect(mockFn).toHaveBeenCalledTimes(1);
expect(mockFn).toHaveBeenCalledWith('first');
});
});
模拟事件
// 模拟DOM事件
function simulateClick(element) {
const event = new MouseEvent('click', {
bubbles: true,
cancelable: true,
view: window
});
element.dispatchEvent(event);
}
// 测试DOM事件处理
test('点击按钮应该触发处理函数', () => {
// 设置
document.body.innerHTML = '<button id="testButton">点击我</button>';
const button = document.getElementById('testButton');
const mockHandler = jest.fn();
// 添加事件监听器
button.addEventListener('click', mockHandler);
// 模拟点击
simulateClick(button);
// 断言
expect(mockHandler).toHaveBeenCalledTimes(1);
});
框架中的事件系统
Vue中的事件系统
Vue提供了内置的事件系统,用于组件间通信:
// 父组件向子组件传递事件
// Parent.vue
<template>
<div>
<child-component @custom-event="handleCustomEvent" />
<button @click="triggerChildEvent">触发子组件事件</button>
</div>
</template>
<script>
import ChildComponent from './ChildComponent.vue';
export default {
components: {
ChildComponent
},
methods: {
handleCustomEvent(data) {
console.log('父组件收到自定义事件:', data);
},
triggerChildEvent() {
this.$refs.child.$emit('trigger-from-parent');
}
}
}
</script>
// 子组件
// ChildComponent.vue
<template>
<div>
<button @click="emitEvent">发送事件到父组件</button>
</div>
</template>
<script>
export default {
methods: {
emitEvent() {
this.$emit('custom-event', { message: '来自子组件的数据' });
}
},
created() {
this.$on('trigger-from-parent', () => {
console.log('子组件收到父组件触发的事件');
});
}
}
</script>
React中的事件处理
React使用合成事件系统:
// 简单的事件处理
function Button({ onClick, children }) {
return (
<button onClick={onClick}>
{children}
</button>
);
}
// 在类组件中使用事件
class Counter extends React.Component {
constructor(props) {
super(props);
this.state = { count: 0 };
// 绑定this
this.handleIncrement = this.handleIncrement.bind(this);
}
handleIncrement() {
this.setState(state => ({ count: state.count + 1 }));
}
render() {
return (
<div>
<p>计数: {this.state.count}</p>
<Button onClick={this.handleIncrement}>增加</Button>
</div>
);
}
}
// 在函数组件中使用事件
function FunctionalCounter() {
const [count, setCount] = React.useState(0);
const handleIncrement = () => {
setCount(count + 1);
};
return (
<div>
<p>计数: {count}</p>
<Button onClick={handleIncrement}>增加</Button>
</div>
);
}
全局事件总线
在一些框架中,可以实现全局事件总线:
// Vue中的全局事件总线
// main.js
Vue.prototype.$eventBus = new Vue();
// 组件A
this.$eventBus.$on('global-event', this.handleGlobalEvent);
// 组件B
this.$eventBus.$emit('global-event', { data: 'some data' });
// 记得在组件销毁时移除监听器
beforeDestroy() {
this.$eventBus.$off('global-event', this.handleGlobalEvent);
}
// React中使用第三方库或自定义事件总线
import { EventEmitter } from 'events';
// 创建全局事件总线
export const eventBus = new EventEmitter();
// 在组件中使用
import { eventBus } from './eventBus';
class ComponentA extends React.Component {
componentDidMount() {
eventBus.on('global-event', this.handleGlobalEvent);
}
componentWillUnmount() {
eventBus.off('global-event', this.handleGlobalEvent);
}
handleGlobalEvent = (data) => {
console.log('收到全局事件:', data);
}
render() {
return <div>组件A</div>;
}
}
// 在另一个组件中触发事件
class ComponentB extends React.Component {
triggerGlobalEvent = () => {
eventBus.emit('global-event', { message: '全局消息' });
}
render() {
return (
<div>
<button onClick={this.triggerGlobalEvent}>
触发全局事件
</button>
</div>
);
}
}
微前端中的事件通信
在微前端架构中,发布-订阅模式是不同应用之间通信的重要方式:
// 创建一个全局事件总线用于微前端通信
(function(global) {
// 确保只创建一次
if (global.MicroAppEventBus) return;
class MicroAppEventBus {
constructor() {
this.events = {};
}
on(eventName, callback, appName) {
if (!this.events[eventName]) {
this.events[eventName] = [];
}
this.events[eventName].push({
callback,
appName
});
return () => this.off(eventName, callback);
}
off(eventName, callback) {
if (!this.events[eventName]) return;
this.events[eventName] = this.events[eventName].filter(
listener => listener.callback !== callback
);
}
emit(eventName, data, sourceApp) {
if (!this.events[eventName]) return;
console.log(`[MicroAppEventBus] 事件 "${eventName}" 由 "${sourceApp}" 触发`);
this.events[eventName].forEach(listener => {
try {
// 可以选择性地过滤接收者
listener.callback({
type: eventName,
data,
source: sourceApp,
timestamp: Date.now()
});
} catch (error) {
console.error(`[MicroAppEventBus] 应用 "${listener.appName}" 处理事件出错:`, error);
}
});
}
// 获取特定事件的监听器数量
listenerCount(eventName) {
return this.events[eventName] ? this.events[eventName].length : 0;
}
}
// 创建全局单例
global.MicroAppEventBus = new MicroAppEventBus();
console.log('[MicroAppEventBus] 已初始化');
})(window);
// 在主应用中使用
function MainApp() {
// 注册事件监听
useEffect(() => {
const unsubscribe = window.MicroAppEventBus.on('micro:data-update',
event => {
console.log(`主应用收到来自 ${event.source} 的数据更新:`, event.data);
updateMainAppState(event.data);
},
'main-app'
);
// 清理函数
return unsubscribe;
}, []);
// 发送事件到微应用
const notifyMicroApps = () => {
window.MicroAppEventBus.emit('main:config-change',
{ theme: 'dark', permissions: ['read', 'write'] },
'main-app'
);
};
return (
<div>
<button onClick={notifyMicroApps}>更新配置</button>
{/* 其他主应用内容 */}
</div>
);
}
// 在微应用中使用
function MicroApp() {
useEffect(() => {
// 监听来自主应用的配置变更
const unsubscribe = window.MicroAppEventBus.on('main:config-change',
event => {
console.log('微应用收到配置变更:', event.data);
applyNewConfig(event.data);
},
'micro-app-1'
);
return unsubscribe;
}, []);
// 向主应用发送数据更新
const sendDataToMain = () => {
window.MicroAppEventBus.emit('micro:data-update',
{ id: 'micro-1-data', value: Math.random() },
'micro-app-1'
);
};
return (
<div>
<button onClick={sendDataToMain}>发送数据到主应用</button>
{/* 微应用内容 */}
</div>
);
}
服务端的发布-订阅模式
使用Redis实现发布-订阅
Redis提供了内置的发布-订阅功能,可用于构建分布式系统中的事件总线:
// 使用Node.js和Redis实现分布式事件系统
const redis = require('redis');
class RedisEventBus {
constructor(options = {}) {
this.publisher = redis.createClient(options);
this.subscriber = redis.createClient(options);
this.handlers = new Map();
// 设置消息处理器
this.subscriber.on('message', (channel, message) => {
try {
const data = JSON.parse(message);
const handlers = this.handlers.get(channel) || [];
handlers.forEach(handler => {
try {
handler(data);
} catch (error) {
console.error(`处理 ${channel} 事件出错:`, error);
}
});
} catch (error) {
console.error('解析消息失败:', error);
}
});
}
// 订阅事件
subscribe(channel, handler) {
if (!this.handlers.has(channel)) {
this.handlers.set(channel, []);
// 首次订阅时,向Redis订阅该频道
this.subscriber.subscribe(channel);
}
this.handlers.get(channel).push(handler);
return {
unsubscribe: () => this.unsubscribe(channel, handler)
};
}
// 取消订阅
unsubscribe(channel, handler) {
if (!this.handlers.has(channel)) return;
const handlers = this.handlers.get(channel);
const index = handlers.indexOf(handler);
if (index !== -1) {
handlers.splice(index, 1);
}
// 如果没有更多处理器,取消Redis订阅
if (handlers.length === 0) {
this.subscriber.unsubscribe(channel);
this.handlers.delete(channel);
}
}
// 发布事件
publish(channel, data) {
return new Promise((resolve, reject) => {
try {
const message = JSON.stringify(data);
this.publisher.publish(channel, message, (err, count) => {
if (err) return reject(err);
resolve(count); // 返回接收消息的客户端数量
});
} catch (error) {
reject(error);
}
});
}
// 关闭连接
close() {
this.publisher.quit();
this.subscriber.quit();
}
}
// 使用示例 - 服务A
const serviceA = new RedisEventBus({
host: 'localhost',
port: 6379
});
// 发布事件
async function publishUserActivity() {
try {
const recipients = await serviceA.publish('user:activity', {
userId: 123,
action: 'login',
timestamp: Date.now()
});
console.log(`事件已发送给 ${recipients} 个订阅者`);
} catch (error) {
console.error('发布事件失败:', error);
}
}
// 使用示例 - 服务B
const serviceB = new RedisEventBus({
host: 'localhost',
port: 6379
});
// 订阅事件
const subscription = serviceB.subscribe('user:activity', (data) => {
console.log(`用户 ${data.userId} 执行了 ${data.action} 操作`);
// 处理用户活动...
});
// 稍后取消订阅
// subscription.unsubscribe();
// 关闭连接
// serviceA.close();
// serviceB.close();
总结与最佳实践
发布-订阅模式的优势
- 解耦:发布者和订阅者之间不直接依赖,降低了组件间的耦合度
- 灵活性:可以动态添加或移除订阅者,不影响系统其他部分
- 可扩展性:新增功能只需添加新的订阅者,无需修改现有代码
- 异步通信:支持异步事件处理,提高系统响应性
发布-订阅模式的缺点
- 调试困难:事件流程不直观,难以追踪
- 内存泄漏风险:如果不正确管理订阅,可能导致内存泄漏
- 可能过度使用:滥用可能导致"事件地狱",使代码难以理解
最佳实践
- 使用命名空间:采用分层命名(如
domain:action
)避免冲突 - 文档化事件:清晰记录每个事件的目的、发布者和预期订阅者
- 错误处理:在事件处理器中添加适当的错误处理
- 资源清理:确保在组件销毁时取消订阅
- 避免过度使用:不要将发布-订阅模式作为组件间通信的唯一方式
- 考虑性能:对于高频事件,考虑节流或批处理
- 测试:编写单元测试验证事件系统的正确性