美文网首页
Node.js中多线程,多主机下数据的处理

Node.js中多线程,多主机下数据的处理

作者: Nestor_Gu | 来源:发表于2022-05-31 15:39 被阅读0次

    先看代码

    demo代码背景:通过Socket,实时同步笔记内容

    const interviewDict: Record<string, InterviewDetail> = {};
    
    export const interviewWebSocket: expressWs.WebsocketRequestHandler = (ws: IWebSocket, req) => {
      const id = req.query.id as string;
      ws.interviewId = id;
      ws.uuid = uuidv4();
      ws.on('message', msg => {
        if (!ArrayBuffer.isView(msg)) {
          return;
        }
        const [type] = msg;
        const valueBytes = msg.slice(1);
        const value = decode.decode(valueBytes);
        switch (type) {
          case encode.encode(WSTypes.heartbeat)[0]:
            ws.send(getBuffer(WSTypes.heartbeat, id));
            return;
          case encode.encode(WSTypes.setValue)[0]:
            if (value && interviewDict[id]) {
              interviewDict[id].value = value;
              getWsInstance()
                ?.getWss()
                ?.clients?.forEach((wsItem: IWebSocket) => {
                console.log('ws', wsItem.interviewId, wsItem.uuid, ws.uuid);
                if (wsItem.interviewId === id && wsItem.uuid !== ws.uuid) {
                  wsItem.send(getBuffer(WSTypes.getValue, value));
                }
              });
            }
            return;
          case encode.encode(WSTypes.getValue)[0]:
            if (interviewDict[id]) {
              ws.send(getBuffer(WSTypes.getValue, interviewDict[id].value));
            }
            return;
        }
      });
    };
    复制代码
    

    会不会出现问题?

    什么时候出现问题?

    场景一:

    为了充分利用cpu的多核,需要通过fork方式进行多进程运行。

    例:Egg默认执行start会创建和 CPU 核数相当的 app worker进程。

    解决方法:进程间通信

    Egg中的Master、Agent和Worker

    当一个应用启动时,会同时启动这三类进程。

    类型 进程数量 作用 稳定性 是否运行业务代码
    Master 1 进程管理,进程间消息转发 非常高
    Agent 1 后台运行工作(长连接客户端) 少量
    Worker 一般设置为 CPU 核数 执行业务代码 一般

    interviewDict需要放在Agent中

    // 发送
    app.messenger.once('egg-ready', () => {
        app.messenger.sendToAgent('agent-event', { foo: 'bar' });
        app.messenger.sendToApp('app-event', { foo: 'bar' });
      });
    };
    // 接收
    app.messenger.on(action, (data) => {
      // process data
    });
    app.messenger.once(action, (data) => {
      // process data
    });
    复制代码
    

    扩展:为什么Node.js不适合cpu密集型任务

    进程间通信与线程间通讯:

    node是单线程应用(一个进程一个线程),单线程的进程只能运行在一个 CPU内核 上,node通过创建多进程来进行实现多线程。

    线程:由于同一进程中的多个线程具有相同的地址空间,线程间可以直接读写进程数据段(如全局变量)来进行通信只需要进程同步和互斥手段的辅助,保证数据的一致性。

    进程:进程通讯需要通过系统IPC等方式进行通信,远比线程间通讯开销大。

    场景二:

    生产环境部署服务的应用,一般会部署到两台以上(可以是实体机也可以是虚拟机)。然后nginx负载均衡到每台机器上。

    解决方法:redis、其它数据库。

    这种情况下更适合redis

    Redis数据全部存在内存,定期写入磁盘,当内存不够时,通过LRU算法删除数据。

    Egg中redis的使用

    // 配置redis
    config.redis = {
      client: {
        host: 'www.yzapp.cn',
        port: 6379,
        password: process.env.NESTOR_REDIS,
        db: 0
      }
    };
    
    config.websocket = {
      // 配置 websocket 使用 redis 作消息广播
      redis: config.redis,
    };
    复制代码
    
    import { Service } from 'egg';
    const prefix = '00353:';
    
    /**
     * 调用redis的服务
     */
    export default class RedisService extends Service {
        /**
         * 根据key获得值
         * @param key key
         */
        public async get(key: string) {
            const { redis, logger } = this.app;
            const t = Date.now();
            let data = await redis.getBuffer(prefix + key);
            if (!data) return;
            const duration = Date.now() - t;
            logger.debug('Cache', 'get', key, duration + 'ms');
            return data;
        }
    
        /**
         * 根据key存值
         * @param key key
         * @param value value
         */
        public async set(key: string, value: any) {
            const { redis, logger } = this.app;
            const t = Date.now();
            await redis.set(prefix + key, value);
            const duration = Date.now() - t;
            logger.debug('Cache', 'set', key, duration + 'ms');
        }
    
        /**
         * 根据key存值并设置过期时间
         * @param key key
         * @param value value
         * @param seconds 过期时间
         */
        public async setex(key: string, value: any, seconds: number) {
            const { redis, logger } = this.app;
            const t = Date.now();
            await redis.set(prefix + key, value, 'EX', seconds);
            const duration = Date.now() - t;
            logger.debug('Cache', 'set', key, value, duration + 'ms');
        }
    
        /**
         * 根据key删除缓存
         * @param key key
         */
        public async del(key: string) {
            const { redis, logger } = this.app;
            const t = Date.now();
            await redis.del(prefix + key);
            const duration = Date.now() - t;
            logger.debug('Cache', 'del', key, duration + 'ms');
        }
    
        /**
         * 递增值并设定过期时间
         * @param key
         * @param seconds
         */
        public async incr(key: string, seconds: number) {
            const { redis, logger } = this.app;
            const t = Date.now();
            const result = await redis
                .multi()
                .incr(prefix + key)
                .expire(key, seconds)
                .exec();
            const duration = Date.now() - t;
            logger.debug('Cache', 'set', key, duration + 'ms');
            return result[0][1];
        }
    }
    复制代码
    
    /**
     * 同步答题内容
     * @param websocket
     * @param id
     */
    public async syncValue(websocket: EggWsClient, id: string): Promise<string | null> {
        const uuid = uuidV4();
        // 根据interview id创建房间
        websocket.room.join(id, ({message}) => {
            const data = JSON.parse(message.toString());
            if (!data || !data.value || data.from === uuid) {
                return;
            }
            // 将消息发送到除自己外的其它房间成员
            websocket.send(encode.encode(data.value));
        });
    
        // 从数据库里读取
        const info = await this.findInterviewById(Number(id));
        if (info === null) {
            return `没有找到${id}的数据`;
        }
        // 从redis读取最新的笔试信息
        const value = await this.service.redis.get("room" + id);
        websocket
            .on('message', async (msg) => {
                // console.log('receive', msg);
                if (!ArrayBuffer.isView(msg)) {
                    return;
                }
                const type = msg[0];
                switch (type) {
                    case encode.encode(WSTypes.heartbeat)[0]:
                        websocket.send(this.getBuffer(WSTypes.heartbeat, encode.encode(id)));
                        return;
                    case encode.encode(WSTypes.setValue)[0]:
                        // 存入redis最新的笔试信息
                        await this.service.redis.set("room" + id, msg.slice(1));
                        if (msg.slice(1)) {
                            this.app.ws.sendJsonTo(id, {
                                from: uuid,
                                value: decode.decode(this.getBuffer(WSTypes.getValue, msg.slice(1)))
                            });
                        }
                        return;
                    case encode.encode(WSTypes.getValue)[0]:
                        if (!value) {
                            return;
                        }
                        websocket.send(this.getBuffer(WSTypes.getValue, value));
                        return;
                }
            })
            .on('close', (code, reason) => {
                console.log('websocket closed', code, reason);
            });
        return null;
    }
    复制代码
    

    具体代码见:github.com/nesror/biki…

    相关文章

      网友评论

          本文标题:Node.js中多线程,多主机下数据的处理

          本文链接:https://www.haomeiwen.com/subject/vkebzhtx.html