美文网首页
Chapter 9 Advanced Asynchronous

Chapter 9 Advanced Asynchronous

作者: 宫若石 | 来源:发表于2018-01-05 17:44 被阅读0次

    现在我们了解到的设计模式能够应对很多情况,不过针对一些特定的问题,我们还有一些方案可以使⽤。就像做菜的菜谱⼀样,它提供了一个⼤概的步骤,我们还可以⾃己创造一些,在这个章节,我们主要提供以下情形的解决方案:

    • 需要进行异步初始化的模块
    • 以较小的开销,在忙碌应⽤用中实现⾼表现⼒的批量量异步缓存
    • 运⾏可能阻塞事件循环或者可能削弱Node.js处理并发请求能⼒的,同步计算密集(CPUbound)操作

    处理需要异步初始化模块的方案

    我们在第⼆章提到过,require()和module.exports是同步工作的,这是同步API存在于核⼼模块和很多npm的库中的⼀个主要原因。这样的方案提供了了更方便的转换机制,这也是最初使用同步而不是异步方案开初始化模块的原型。
    不幸的是,有时候同步API并不不好用,特别是在需要连接⽹网络进⾏初始化的时候,需要花费精力去配置参数、使⽤握⼿进⾏连接等等。这种场景存在于许多数据库驱动和类似消息队列这样的中间件里。

    标准解决方案

    举例,我们需要通过请求,连接⼀个叫db的远程数据库,我们通常有两种选择:

    • 在使⽤前先确保它已经被初始化,否则就等它初始化完毕后再处理接下来的异步操作
    const db = require('aDb');  //  The async module
    module.exports = function findAll(tyoe, callback) {
      if (db.connected) {
        runFild();
      } else {
        db.once('connected', runFild);
      }
      function runFild() {
        db.findAll(type, callback);
      }
    };
    
    • 使⽤依赖注⼊(Dependency Injection)⽽不是直接加载异步模块。延迟其他模块的初始化直到它的依赖都加载完成,这样就转移了了重点到别的地⽅
    //  in module app.js
    const db = require('aDb');
    const findAllFactory = require('./findAll');
    db.on('connected', function(){
      const findAll = findAllFactory(db);
    });
    //  in module findAll.js
    module.exports = db => {
      return function findAll(type, callback) {
        db.findAll(type, callback);
      }
    };
    

    显然第一种⽅方案就很不受欢迎,因为包含很多引⽤模板(boilerplate)
    第⼆种⽅案有时候也不太好,在大型项目中使用依赖注入会很复杂,特别是需要手动去初始化一些模块的时候。不过使⽤用一些⽀持异步初始化的依赖注⼊容器会有些帮助。
    下⾯的第三种⽅案,能够轻易的帮助我们从初始化依赖里分离出需要的模块

    预初始化队列方案

    方案的主要思想:把含未完成的初始化的模块操作保存下来,一旦初始化步骤完成,就执行行这些操作。下⾯是⼀个例⼦:

    • asyncModule.js(⼀个需要异步初始化的模块)
    const asyncModule = module.exports;
    
    asyncModule.initialized = false;
    asyncModule.initialize = callback => {  //  一个延迟10s的初始化操作
      setTimeout(() => {
        asyncModule.initialized = true;
        callback();
      }, 10000);
    };
    
    asyncModule.tellMeSomething = callback => {
      process.nextTick(() => {
        if(!asyncModule.initialized) {
          return callback(
            new Error('I don\'t have anything to say right now') //如果还没初始化
          );
        }
        callback(null, 'Current time is: ' + new Date());
      });
    };
    
    • routes.js(一个HTTP请求的处理模块)
    const asyncModule = require('./asyncModule');
    
    module.exports.say = (req, res) => {
      asyncModule.tellMeSomething((err, something) => {
        if(err) {
          res.writeHead(500);
          return res.end('Error:' + err.message);
        }
        res.writeHead(200);
        res.end('I say: ' + something);
      });
    };
    

    这个HTTP处理模块(handler)主要是触发刚刚的tellMeSomething()方法并将结果写入HTTP响应头。可以看到这⾥,我们require()这个异步的asyncModule时并没有检测它是不是已经初始化了,这就可能导致问题。
    让我们⽤基本的http核⼼模块,创建⼀个HTTP server

    • app.js
    const http = require('http');
    const routes = require('./routes');
    const asyncModule = require('./asyncModule.js');
    
    asyncModule.initialize(() => {
      console.log('Async module initialized');
    });
    
    http.createServer((req, res) => {
      if (req.method === 'GET' && req.url === '/say') {
        return routes.say(req, res);
      }
      res.writeHead(404);
      res.end('Not found');
    }).listen(8000, () => console.log('Started'));
    

    这个app.js是程序的入⼝,它触发了AsyncModule的初始化、利用routes.say()创建了一个
    HTTPServer。运⾏一下,正如想象,如果我们在运行server时,快一点打开http://localhost:8000/say (也就是调⽤了say()的时候,AsyncModule还没有异步初始化完成),我们会在浏览器上看到:

    image.png
    本例的显示取决于异步初始化的细节,如果慢⼀点等初始化结束,我们就能看到正确的结果了。我们应该尽量避免这种问题,因为这可能导致丢失信息、程序崩溃等更严重的问题。虽然初始化一般很快,我们⼀般不会去注意失败的请求,但是对于加载频繁和自动设置(autoscale)的云服务情况来说,这可能导致阻塞等更多问题。
    ⽤用预加载队列列去包装模块

    我们⽤队列去存放未初始化的操作,这有点像状态模式(State Pattern)。存在两种状态:⼀一种是把未初始化的操作排队,另一种是当初始化完成时,再把最后的方法委托给原模块(也就是AsyncModule)

    • AsyncModuleWarpper.js
    const asyncModule = require('./asyncModule');
    
    //The wrapper 给 activeState 设置分发操作
    const asyncModuleWrapper = module.exports;
    asyncModuleWrapper.initialized = false;
    asyncModuleWrapper.initialize = function() {
      activeState.initialize.apply(activeState, arguments);
    };
    
    asyncModuleWrapper.tellMeSomething = function() {
      activeState.tellMeSomething.apply(activeState, arguments);
    };
    
    //Module没有初始化成功的时候的state
    let pending = [];
    let notInitializedState = {
      initialize: function(callback) {
        asyncModule.initialize(function() {\
          asyncModuleWrapper.initalized = true;
          activeState = initializedState; //  [1]更更新activeState的变量量
    
          pending.forEach(function(req) { //  [2]运⾏行行存储队列列的操作
            asyncModule[req.method].apply(null, req.args);
          });
          pending = [];
    
          callback(); //  [3]触发原来的回调
        });
      },
    
      tellMeSomething: function(callback) {
        return pending.push({
          method: 'tellMeSomething',
          args: arguments
        });
      }
    };
    
    //  The state to use when the module is initialized
    let initializedState = asyncModule;
    
    //  Set the initial state to the notInitializedState
    let activeState = notInitializedState; // 设置更新activeState
    

    当开始初始化的时候,我们提供了一个回调代理,它让我们的warpper知道什么时候已经初始化好了,应该进⾏接下来的操作了。
    可⻅代码,如果没有初始化完成,tellMeSomething会放在pending队列里等完成初始化再调⽤。
    而且,通过这个模式我们可以清晰的知道AsyncModule这个模块有没有被初始化完成,因为完成之后会切换状态为initializedState(倒数第二⾏代码)
    运行一下:

    image.png
    通过队列我们可以把为可能含初始化模块的操作给挂起,这是⼀个更稳健的行为。
    核⼼在于:如果要初始化异步模块,我们让需要的操作排队等待直到初始化完成。
    在自然情况下
    刚刚这种模式在很多数据库驱动和ORM库中经常使用,最著名的就是MongoDB的ORM库
    Moogoose了,这样就不需要等待数据库的连接情况,因为这都⾃动被放入队列里,并且在初始化建立完成后执⾏,这种方案增强了了API的可用性。

    处理批量异步缓存的方案

    在高负载的应用中,缓存在web里里起到很重要的作用。无论是静态的文件资源还是数据库的查询结果。在这个章节我们了解高吞吐量下的异步操作缓存。

    暂时没有缓存和批量量处理理的Server

    这是一个管理理销售情况的情景,JSON数据格式:transactionId{ amount, item}

    • totalSales.js
    const level = require('level');
    const sublevel = require('level-sublevel');
    
    const db = sublevel(level('example-db', {valueEncoding: 'json'}));
    const salesDb = db.sublevel('sales');
    
    module.exports = function totalSales(item, callback) {
        console.log('totalSales() invoked');
        let sum = 0;
        salesDb.createValueStream() // [1] 从数据库流式输⼊入数据
        .on('data', data => {
          if(!item || data.item === item) { // [2] 如果同样商品,求和
            sum += data.amount;
          }
        })
        .on('end', () => {
          callback(null, sum); // [3]返回刚刚的求和结果
        });
    };
    

    这种方案显然表现不大好,我们没有用index去区分transaction,求和计算也没有⽤更⽅便的map或者reduce。接下来暴露接⼝给HTTP server:

    • app.js
    const http = require('http');
    const url = require('url');
    const totalSales = require('./totalSales');
    //const totalSales = require('./totalSalesBatch');
    //const totalSales = require('./totalSalesCache');
    
    http.createServer((req, res) => {
      const query = url.parse(req.url, true).query;
      totalSales(query.item, (err, sum) => {
        res.writeHead(200);
        res.end(`Total sales for item ${query.item} is ${sum}`);
      });
    }).listen(8000, () => console.log('Started'));
    

    接下来访问http://localhost:8000/?item=book,顺⼿造一点假数据更方便查看

    批量处理异步请求

    最基本的方案是使用相同的API处理⼀系列的调用,通过回调的方式:


    image.png

    图示两个客户端(可能是两个不同的对象或者两个不同的web请求)触发了相同输入的的异步操作,在这第一个图中,它们单独发起⾃己的异步操作执⾏完毕后通过回调返回。第⼆张图针对这种相同输⼊的异步操作进⾏了批量处理,当操作完成时,通知两个客户端,这种方式优化了应⽤的加载,避免了了使⽤复杂的缓存机制,也避免了这种机制带来的内存管理和失效问题。


    image.png

    在这个web server使用批量处理请求

    主要采⽤队列的方案,把回调加⼊队列,等异步操作完成之后⼀块儿触发

    • totalSalesBatch.js
    const totalSales = require('./totalSales');
    const queues = {};
    
    module.exports = function totalSalesBatch(item, callback) {
      if(queues[item]) { // [1] 队列列存在,请求正在执⾏行行,回调加⼊入队列列
        console.log('Batching operation');
        return queues[item].push(callback);
      }
      queues[item] = [callback]; // [2]我们需建⽴立新请求因为队列列有内容了了
      totalSales(item, (err, res) => {
        const queue = queues[item]; // [3]获得操作结果后⼀一个个触发回调
        queues[item] = null;
        queue.forEach(cb => cb(err, res));
      });
    };
    
    缓存异步请求

    刚刚的批量处理方案在着快速API和少量批处理请求的情况下会有点问题。在这种情况下,采⽤缓存模式是比较好的方案。
    缓存模式的核心思想是:请求完成后我们就把结果保存在缓存里(可以是一个变量、⼀个数据库入口等),下一次就不请求了,直接从缓存里取出
    缓存模式很常见,但真正厉害的是把缓存模式和批处理模式结合起来使用:


    image.png

    由图可知,没有缓存时,和批处理模式是差不多的,不过当请求完成后,结果会被放入缓存里,方便下次直接从缓存里提取。

    在这个web server使用缓存请求

    • totalSalesCache.js (在批处理的基础上加上缓存层)
    const totalSales = require('./totalSales');
    const queues = {};
    const cache = {};
    
    module.exports = function totalSalesBatch(item, callback) {
        const cached = cache[item];
        if (cached) { //如果缓存,⽤用回调返回,注意⽤用的nextTick哦
          console.log('Cache hit');
          return process.nextTick(callback.bind(null, null, cached));
        }
        if (queues[item]) {
          console.log('Batching operation');
          return queues[item].push(callback);
        }
        queues[item] = [callback];
        totalSales(item, (err, res) => {
        if (!err) {
          cache[item] = res;
            setTimeout(() => {
            delete cache[item];
          }, 30 * 1000); //30 seconds expiry 设置了了超时时间
        }
        const queue = queues[item];
        queues[item] = null;
        queue.forEach(cb => cb(err, res));
      });
    };
    

    缓存函数调⽤结果是实现记忆的⼀种方式,在npm中你可以找到很多方便用于异步存储记忆的⼯具,比如memoizee

    缓存机制的具体实现

    在实际应用中,我们需要更高级的存储和释放机制,有以下的原因:

    • 缓存大量的结果会消耗大量内存,我们可以采用LRU(Least Recently Used)算法来只缓存最近的缓存的结果
    • 当应用通过多进程执⾏时,不同server实例的缓存结果可能不一样,解决方案是共享缓存,流
      ⾏的方案有Redis和Memcached
    • 可以手动的清理缓存,或者进⾏时间限制,从而避免时间过期缓存。虽然这会有点不易管理。

    使用Promise进行批处理和缓存

    在这种情况下,使用Promise有以下几种优势:

    • 一个Promise可以用多个then()方法,在这里⽅便执行批处理请求
    • then()方法最多触发一次,在这里刚好只缓存一次结果
    • resolved的Promise也可以使用then(),在这里⽅便持久的获取缓存值
    • then()方法是异步调用的,着这里⽅便异步返回结果
      在这里,我们使用Promise包装totalSale()接口:
    • totalSalesPromises.js
    // [1]引⼊入promisification模块,使返回Promise⽽而不不是回调
    const pify = require('pify');
    const totalSales = pify(require('./totalSales'));
    const cache = {};
    
    module.exports = function totalSalesPromises(item) {
      if (cache[item]) { // [2]检测缓存情况
        return cache[item];
      }
      cache[item] = totalSales(item) // [3]新建Promise
        .then(res => {
          // [4]resolve这个Promise之后,设计缓存清理理
          setTimeout(() => {delete cache[item]}, 30 * 1000);
          //30 seconds expiry
          return res;
        })
        .catch(err => { // [5]Promise被reject,删除缓存抛出错误
          delete cache[item];
          throw err;
        });
        return cache[item]; // [6]返回缓存结果
    };
    

    可见使用Promise之后代码变得优雅简洁,并且同时使用了批处理和缓存

    处理计算密集型(CPU-bound)任务的方案

    在第⼆章我们知道,通过触发异步操作我们让堆栈回到事件循环,从而可以自由处理其它的请求。
    但是如果运⾏一个同步请求时间很⻓(比如计算密集型任务),它可能久久不返回给事件循环,因为它在重度利用CPU而不是在频繁使⽤I/O操作

    解决子集求和问题(subset sum problem)

    子集求和问题,即求一个集合的非空子集满足和为0
    最简单的解决方案是把⼦集进行排列组合,这有2^n的复杂度,这就很计算复杂了

    • subsetSum.js实现该算法
    const EventEmitter = require('events').EventEmitter;
    //继承EventEmitter,每次实现匹配就就触发⼀一下事件
    class SubsetSum extends EventEmitter {
      constructor(sum, set) {
        super();
        this.sum = sum;
        this.set = set;
        this.totalSubsets = 0;
      }
      //实现可能的组合,注意它是同步的
      _combine(set, subset) {
        for(let i = 0; i < set.length; i++) {
          let newSubset = subset.concat(set[i]);
          this._combine(set.slice(i + 1), newSubset);
          this._processSubset(newSubset);
        }
      }
      //⼀一旦实现组合之后,确定是否匹配,匹配就emit⼀一个match事件
      _processSubset(subset) {
        console.log('Subset', ++this.totalSubsets, subset);
        const res = subset.reduce((prev, item) => (prev + item), 0);
        if(res == this.sum) {
          this.emit('match', subset);
        }
      }
    
      start() {
        this._combine(this.set, []); //触发同步的排列列组合
        this.emit('end'); //所有排列列组合结果计算之后触发end
      }
    }
    module.exports = SubsetSum;
    
    • app.js 把刚才的SubsetSum放在HTTP server里执⾏
    const http = require('http');
    const SubsetSum = require('./subsetSum');
    //const SubsetSum = require('./subsetSumDefer');
    //const SubsetSum = require('./subsetSumFork');
    
    http.createServer((req, res) => {
      const url = require('url').parse(req.url, true);
      if(url.pathname === '/subsetSum') {
        const data = JSON.parse(url.query.data);
        res.writeHead(200);
        const subsetSum = new SubsetSum(url.query.sum, data);
        subsetSum.on('match', match => {
          res.write('Match: ' + JSON.stringify(match) + '\n');
        });
        subsetSum.on('end', () => res.end());
        subsetSum.start();
      } else {
        res.writeHead(200);
        res.end('I\m alive!\n');
      }
    }).listen(8000, () => console.log('Started'));
    

    通过事件机制,我们知道,当算法执行完毕时,也就是要等⼀会儿,会⾃自动返回结果:


    image.png

    可以看到,在算法返回结果时请求⼀直被挂起,也就是暂时都不会显示I'm alive,Node.js的单线程就这样被一个⻓的同步计算操作阻塞。

    使⽤用setImmediate实现交错

    通常来说,计算密集型(CPU-bound)的算法都是由一系列步骤组成,可能是递归调⽤、循环或者是和其他的组合。所以⼀个简单的解决⽅案就是让每一个(或者一定数量)步骤执行完成之后把执行权交给事件循环。
    核心思想是:通过setImmediate实现异步任务和密集计算的交错执⾏

    采⽤交错执⾏的⼦集求和问题

    • subsetSumDefer.js 在刚刚的基础上修改
    class SubsetSumDefer extends EventEmitter {
      constructor(sum, set) {
        super();
        this.sum = sum;
        this.set = set;
        this.totalSubsets = 0;
      }
      //实现交错执行的核心函数
      _combineInterleaved(set, subset) {
        this.runningCombine++; //需要新的参数来计数
        setImmediate(() => {
          this._combine(set, subset);
          if(--this.runningCombine === 0) {
            this.emit('end');
          }
        });
      }
    
      _combine(set, subset) {
        for(let i = 0; i < set.length; i++) {
          let newSubset = subset.concat(set[i]);
          this._combineInterleaved(set.slice(i + 1), newSubset); //替换为step
          this._processSubset(newSubset);
        }
      }
    
      _processSubset(subset) {
        console.log('Subset', ++this.totalSubsets, subset);
        const res = subset.reduce((prev, item) => prev + item, 0);
        if(res == this.sum) {
          this.emit('match', subset);
        }
      }
    
      start() {
        this.runningCombine = 0; //计数参数确认计算完毕
        this._combineInterleaved(this.set, []);
      }
    }
    

    交错执行模式的思考

    尽管刚刚的模式解决了阻塞的问题,但是它并不不是一个很好的模式,因为延迟同步操作会带来事件开销,这可能导致⼤大的影响。特别是当我们想尽快响应用户时,不不希望等待太久。我们可以设置step的数量,但这并不能从根本上解决问题。
    不过也不能因为它有时间开销就完全放弃这种模式,实际上,如果同步操作顺利,偶尔执行行异步的操作,这种利用setImmediate的模式还是可以被认可的
    注意process.nextTick() 不适用与这种交错的模式,正如第一章讲到,它在I/O之前执⾏操作,频繁的调用可能导致I/O饥饿。

    使⽤多进程

    另⼀个防止事件循环阻塞的方案是使用⼦进程,从而不把昂贵的计算密集型任务运行在主进程上,这样做有以下好处:

    • 同步的任务能够全心的去运⾏,不用进⾏交错执⾏
    • 使⽤多进程会比setImmediate更简单,而且不⽤考虑主程序的规模⼤小
    • 如果需要更好的性能,我们甚⾄至可以用更底层的语言去实现(比如C)
      Node.js有很多操作外部进程的工具库可以使用,比如可以使用child_process模块。甚至外部进程可以是Node.js程序,我们只需要做相关的连接。child_process.fork()方法创建的子进程会⾃动的创建进程间的交流通道,还可以⽤⼀一个类似EventEmitter的接⼝实现进程间的信息交互

    把子集求和任务交付其他进程处理

    核⼼思想是创建⼀个子进程去处理同步任务,让时间循环自由处理⽹络请求,下面提出可行的⽅案:

    1. 创建processPool.js 模块来建立进程池。因为开新进程要花时间,所以就让他们在进程池⾥里运行来等待请求,这样可以节约时间和CPU。另外限制一下进程的数量,可以防止Dos攻击
    const fork = require('child_process').fork; //创建⼦子进程
    class ProcessPool {
      constructor(file, poolMax) {
        this.file = file;
        this.poolMax = poolMax;
        this.pool = []; //pool是⼀系列将被使⽤用的运⾏的进程
        this.active = []; //active包含现在正在使用的进程
        this.waiting = []; //因为缺少进程资源 等待的回调队列
      }
    
      acquire(callback) {
      let worker;
      if(this.pool.length > 0) { // [1] 可⽤用的进程就把它变为active
        worker = this.pool.pop();
        this.active.push(worker);
        return process.nextTick(callback.bind(null, null, worker));
      }
      if(this.active.length >= this.poolMax) {
        // [2]缺少进程资源 等待
        return this.waiting.push(callback);
      }
      worker = fork(this.file); // [3]如果进程数还没到最⼤大就再开⼀一个进程
        this.active.push(worker);
        process.nextTick(callback.bind(null, null, worker));
      }
      
      release(worker) {
        if(this.waiting.length > 0) { // [1]如果请求队列列⾥里里有,则赋给worker
          const waitingCallback = this.waiting.shift();
          waitingCallback(null, worker);
        }
        this.active = this.active.filter(w => worker !== w);
        // [2]完成worker之后,放回池中
        this.pool.push(worker);
      }
    }
    
    module.exports = ProcessPool;
    

    进程⼀直在运行不停止,想要减少常驻内存的使用、增强代码的健壮性,可以:

    • 在⼀段时间后清理闲置的进程
    • 设计⼀种机制,将无响应和崩溃的进程重启
    1. 接下来的subsetSumFork.js把⼦集求和的任务分发到子进程。它的作用是和⼦进程交流然后传输结果
    const EventEmitter = require('events').EventEmitter;
    const ProcessPool = require('./processPool');
    //初始化进程池,并且设置最⼤可用的进程数为2
    const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2);
    
    class SubsetSumFork extends EventEmitter {
      constructor(sum, set) { //接收sum和set的EventEmitter
        super();
        this.sum = sum;
        this.set = set;
      }
    
      start() { //触发算法运⾏行行
        workers.acquire((err, worker) => { // [1]从进程池获取⼀一个⼦子进程
        worker.send({sum: this.sum, set: this.set}); //进程通信的通道
    
        const onMessage = msg => {
          // [3]监听任务完成 删掉onmessage然后放回进程池
          if (msg.event === 'end') {
            worker.removeListener('message', onMessage);
            workers.release(worker);
          }
          this.emit(msg.event, msg.data); // [4]⽆无缝传递信息
        };
          worker.on('message', onMessage); // [2]监听信息
        });
      }
    }
    
    module.exports = SubsetSumFork;
    

    send()⽅法在子进程也是可用的,这也被cluster模块用来实现多线程分发HTTP server

    1. 最后我们的subsetSumWorker.js 需要⼀一个worker(子进程)来执⾏子集求和的算法,并且把它的算法结果传给⽗进程。
    const SubsetSum = require('./subsetSum');
    
    process.on('message', msg => {
      // [1]从⽗进程监测信息,⼀旦有信息我们就新建实例,然后说明匹配
      const subsetSum = new SubsetSum(msg.sum, msg.set);
    
      subsetSum.on('match', data => {
        // [2]⽤一个对象封装匹配结果传给父进程
        process.send({event: 'match', data: data});
      });
      
      subsetSum.on('end', data => {
        process.send({event: 'end', data: data});
      });
      
      subsetSum.start();
    });
    

    可以看到,我们重⽤了原来的subsetSum(同步版本),但是这次由于我们单独开了进程,所以不⽤担心事件循环被阻塞。
    综上,可以看到,应用程序的一部分可以交付外部进程去处理的。
    不过当子进程不是Node.js程序时,进程间通信的通道可能就不可⽤了。我们可以⾃己通过流式输入输出协议设计接口,可以参考child_process的实现

    多进程模式的思考

    我们可以并发的开两个⾃己求和任务,如果开3个会挂起⼀个直到其中一个任务完成,这是因为我们之前设置了进程的最大数。
    可⻅多进程模式比交错执⾏模式更高效有力,不过,因为单一设备对资源有硬性限制,它并不不⽅便扩展。所以我们可以通过多个设备去实现分发加载任务。这也是我们接下来会提到的分布式架构模式。

    相关文章

      网友评论

          本文标题:Chapter 9 Advanced Asynchronous

          本文链接:https://www.haomeiwen.com/subject/dipsnxtx.html