套接字。io、集群、express和sync事件。

时间:2022-08-22 15:26:10

I have a big problem sice 1 week. I try to convert my node.JS project actually run on single core to multi core with cluster.

我有一个很大的问题。我尝试转换我的节点。JS项目实际运行在单核到多核的集群。

With websockets, at this moment, i have no problems for events but, for xhr-polling or jsonp-polling, i have big problems with socket.io on cluster mode.

有了websockets,在这一时刻,我对事件没有问题,但是对于xhr-polling或jsonp-polling,我在套接字上有很大的问题。io集群模式。

this is my server configuration :

这是我的服务器配置:

00-generic.js

00-generic.js

'use strict';

var http            = require('http'),
    os              = require('os'),
    cluster         = require('cluster');

module.exports = function(done) {
    var app = this.express,
        port = process.env.PORT || 3000,
        address = '0.0.0.0';

    if(this.env == 'test'){
        port = 3030;
    }

    var self = this;
    var size = os.cpus().length;

    if (cluster.isMaster) {
        console.info('Creating HTTP server cluster with %d workers', size);

        for (var i = 0; i < size; ++i) {
            console.log('spawning worker process %d', (i + 1));
            cluster.fork();
        }

        cluster.on('fork', function(worker) {
            console.log('worker %s spawned', worker.id);
        });
        cluster.on('online', function(worker) {
            console.log('worker %s online', worker.id);
        });
        cluster.on('listening', function(worker, addr) {
            console.log('worker %s listening on %s:%d', worker.id, addr.address, addr.port);
        });
        cluster.on('disconnect', function(worker) {
            console.log('worker %s disconnected', worker.id);
        });
        cluster.on('exit', function(worker, code, signal) {
            console.log('worker %s died (%s)', worker.id, signal || code);
            if (!worker.suicide) {
                console.log('restarting worker');
                cluster.fork();
            }
        });
    } else {
        http.createServer(app).listen(port, address, function() {
            var addr = this.address();
            console.log('listening on %s:%d', addr.address, addr.port);
            self.server = this;
            done();
        });
    }
};

03-socket.io.js

03-socket.io.js

"use strict";
var _               = require('underscore'),
    socketio        = require('socket.io'),
    locomotive      = require('locomotive'),
    RedisStore      = require("socket.io/lib/stores/redis"),
    redis           = require("socket.io/node_modules/redis"),
    v1              = require(__dirname + '/../app/socket.io/v1'),
    sockets         = require(__dirname + '/../../app/socket/socket'),
    config          = require(__dirname + '/../app/global'),
    cluster         = require('cluster');

module.exports = function () {
    if (!cluster.isMaster) {
        this.io = socketio.listen(this.server);

        var pub             = redis.createClient(),
            sub             = redis.createClient(),
            client          = redis.createClient();

        this.io.enable('browser client minification');  // send minified client
        this.io.enable('browser client etag');          // apply etag caching logic based on version number
        this.io.enable('browser client gzip');          // gzip the file

        this.io.set("store", new RedisStore({
            redisPub        : pub,
            redisSub        : sub,
            redisClient     : client
        }));
        this.io.set('log level', 2);
        this.io.set('transports', [
            'websocket',
            'jsonp-polling'
        ]);
        this.io.set('close timeout', 24*60*60);
        this.io.set('heartbeat timeout', 24*60*60);

        this.io.sockets.on('connection', function (socket) {
            console.log('connected with ' + this.io.transports[socket.id].name);

            // partie v1 @deprecated
            v1.events(socket);

            // partie v1.1 refaite
            _.each(sockets['1.1'], function(Mod) {
                var mod = new Mod();
                mod.launch({
                    socket  : socket,
                    io      : this.io
                });
            }, this);

        }.bind(this));
    }
};

With polling, the client connects from time to time on a different process than that initiated listeners. Similarly, the communication server to the client with emit.

在轮询中,客户端在不同的流程上不时地连接,而不是启动侦听器。类似地,通信服务器与客户端之间的发射。

With a little searching, I found it necessary to pass by a store for socket.io to share the data connection. So I built RedisStore socket.io as shown in the documentation but even with that, I find myself with events not arriving safely and I still get this error message:

我搜索了一下,发现有必要通过一个商店的插座。io共享数据连接。所以我建立了RedisStore套接字。在文档中显示的io,但即使这样,我发现自己的事件还没有安全到达,我仍然得到这个错误信息:

warn: client not handshaken client should reconnect

EDIT

编辑

Now, the warn error is not called. I change the redisStore to socket.io-clusterhub BUT now, events are not always called. Sometimes as if the polling request was captured by another worker than that which began the listeners and so it nothing happens. Here is the new configuration:

现在,警告错误没有被调用。我将redisStore改为socket。但现在,事件并不总是被称为。有时,好像是另一个工作者捕捉到的轮询请求,而不是开始监听器,所以什么也没有发生。这是新的配置:

'use strict';

var http            = require('http'),
    locomotive      = require('locomotive'),
    os              = require('os'),
    cluster         = require('cluster'),
    config          = require(__dirname + '/../app/global'),
    _               = require('underscore'),
    socketio        = require('socket.io'),
    v1              = require(__dirname + '/../app/socket.io/v1'),
    sockets         = require(__dirname + '/../../app/socket/socket');

module.exports = function(done) {
    var app = this.express,
        port = process.env.PORT || 3000,
        address = '0.0.0.0';

    if(this.env == 'test'){
        port = 3030;
    }

    var self = this;
    var size = os.cpus().length;

    this.clusterStore = new (require('socket.io-clusterhub'));

    if (cluster.isMaster) {
        for (var i = 0; i < size; ++i) {
            console.log('spawning worker process %d', (i + 1));
            cluster.fork();
        }

        cluster.on('fork', function(worker) {
            console.log('worker %s spawned', worker.id);
        });
        cluster.on('online', function(worker) {
            console.log('worker %s online', worker.id);
        });
        cluster.on('listening', function(worker, addr) {
            console.log('worker %s listening on %s:%d', worker.id, addr.address, addr.port);
        });
        cluster.on('disconnect', function(worker) {
            console.log('worker %s disconnected', worker.id);
        });
        cluster.on('exit', function(worker, code, signal) {
            console.log('worker %s died (%s)', worker.id, signal || code);
            if (!worker.suicide) {
                console.log('restarting worker');
                cluster.fork();
            }
        });
    } else {
        var server = http.createServer(app);

        this.io = socketio.listen(server);

        this.io.configure(function() {
            this.io.enable('browser client minification');  // send minified client
            this.io.enable('browser client etag');          // apply etag caching logic based on version number
            this.io.enable('browser client gzip');          // gzip the file

            this.io.set('store', this.clusterStore);
            this.io.set('log level', 2);
            this.io.set('transports', [
                'websocket',
                'jsonp-polling'
            ]);
            //this.io.set('close timeout', 24*60*60);
            //this.io.set('heartbeat timeout', 24*60*60);
        }.bind(this));

        this.io.sockets.on('connection', function (socket) {
            console.log('connected with ' + this.io.transports[socket.id].name);
            console.log('connected to worker: ' + cluster.worker.id);

            // partie v1 @deprecated
            v1.events(socket);

            // partie v1.1 refaite
            _.each(sockets['1.1'], function(Mod) {
                var mod = new Mod();
                mod.launch({
                    socket  : socket,
                    io      : this.io
                });
            }, this);

        }.bind(this));

        server.listen(port, address, function() {
            var addr = this.address();
            console.log('listening on %s:%d', addr.address, addr.port);
            self.server = this;
            done();
        });
    }
};

3 个解决方案

#1


2  

From that source : http://socket.io/docs/using-multiple-nodes/

从这个来源:http://socket.io/docs/usingmultinodes/。

If you plan to distribute the load of connections among different processes or machines, you have to make sure that requests associated with a particular session id connect to the process that originated them.

如果您计划在不同的进程或机器之间分配连接负载,则必须确保与特定会话id相关联的请求连接到产生它们的进程。

This is due to certain transports like XHR Polling or JSONP Polling relying on firing several requests during the lifetime of the “socket”.

这是由于XHR轮询或JSONP轮询等特定传输方式,依赖于在“套接字”的生命周期内触发多个请求。

To route connections to the same worker every time :

每段时间与同一名员工建立联系:

sticky-session

粘性会话

This is, in the socket.io documentation, the recommended way to route requests to the same worker every time.

这是,在插座里。io文档,推荐的方式每次将请求路由到同一工作者。

https://github.com/indutny/sticky-session

https://github.com/indutny/sticky-session

A simple performant way to use socket.io with a cluster.

一种简单的使用套接字的方法。io集群。

Socket.io is doing multiple requests to perform handshake and establish connection with a client. With a cluster those requests may arrive to different workers, which will break handshake protocol.

套接字。io正在执行多个请求,以执行握手并与客户机建立连接。如果有一个集群,这些请求可能会到达不同的工作人员,这将打破握手协议。

var sticky = require('sticky-sesion');

sticky(function() {
  // This code will be executed only in slave workers

  var http = require('http'),
      io = require('socket.io');

  var server = http.createServer(function(req, res) {
    // ....
  });
  io.listen(server);

  return server;
}).listen(3000, function() {
  console.log('server started on 3000 port');
});

To pass messages between nodes :

在节点之间传递消息:

socket.io-redis

socket.io-redis

This is, in socket.io documentation, the recommended way to share messages between workers.

这是插座。io文档,建议在员工之间共享消息的方法。

https://github.com/automattic/socket.io-redis

https://github.com/automattic/socket.io-redis

By running socket.io with the socket.io-redis adapter you can run multiple socket.io instances in different processes or servers that can all broadcast and emit events to and from each other.

通过运行插座。io的套接字。io-redis适配器可以运行多个套接字。不同进程或服务器上的io实例,它们可以相互广播和发送事件。

socket.io-redis is used this way :

套接字。io-redis是这样使用的:

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

Also

I think you are not using socket.io v1.0.0. You might want to update your version in order to get more stability.

我想你没有使用插座。io v1.0.0。您可能想要更新您的版本以获得更多的稳定性。

You can check their migration guide at http://socket.io/docs/migrating-from-0-9/

您可以在http://socket.io/docs/migrating-from-09/上查看他们的迁移指南。

#2


2  

There is a step missing from the socket.io docs when using

从插座中漏出了一步。io文档在使用

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

You need to tell the client that you want to use 'websockets' as the only form of transport or it will not work... so for the constructor on the client use

你需要告诉客户你想要使用“websockets”作为唯一的交通工具,否则它将无法工作。对于客户端使用的构造函数。

io.connect(yourURL , { transports : ['websocket']});

see my answer to a similar question here ( my answer might be more appropriate on this thread ): https://*.com/a/30791006/4127352

请参阅我在这里的类似问题的答案(我的答案可能在这个线程上更合适):https://*.com/a/30791006/4127352。

#3


1  

The below code work for me, this is socket.io who created clusters, i set config.clusterSticky on true for activate compatibility clusters and socket.io

下面的代码为我工作,这是套接字。io创建了集群,我设置了config。对于激活兼容性集群和socket.io, clusterSticky是正确的。

'use strict';

/*
 var cl = console.log;
 console.log = function(){
 console.trace();
 cl.apply(console,arguments);
 };
 */

var cluster = require('cluster'),
    config = require('./config/all'),
    deferred = require('q').defer(),
    express = require('express'),
    app = express(),
    http = require('http'),
    sticky = require('socketio-sticky-session'),
    io = require('socket.io');

// Code to run if we're in the master process or if we are not in debug mode/ running tests

if ((cluster.isMaster) &&
    (process.execArgv.indexOf('--debug') < 0) &&
    (process.env.NODE_ENV !== 'test') && (process.env.NODE_ENV !== 'development') &&
    (process.execArgv.indexOf('--singleProcess') < 0) &&
    (!config.clusterSticky)) {

    console.log('for real!');
    // Count the machine's CPUs
    var cpuCount = process.env.CPU_COUNT || require('os').cpus().length;

    // Create a worker for each CPU
    for (var i = 0; i < cpuCount; i += 1) {
        console.log('forking ', i);
        cluster.fork();
    }

    // Listen for dying workers
    cluster.on('exit', function (worker) {
        // Replace the dead worker, we're not sentimental
        console.log('Worker ' + worker.id + ' died :(');
        cluster.fork();
    });

// Code to run if we're in a worker process
} else {
    var port = config.http.port;
    var workerId = 0;
    if (!cluster.isMaster) {
        workerId = cluster.worker.id;
    }

    var server = http.createServer(app);
    io.listen(server);

    //TODO routes etc (core)

    server.on('listening', function () {
        console.log('Slave app started on port ' + port + ' (' + process.env.NODE_ENV + ') cluster.worker.id:', workerId);
    });

    if(config.clusterSticky && (process.env.NODE_ENV !== 'test') && (process.env.NODE_ENV !== 'development')) {
        sticky(server).listen(port);
    } else {
        server.listen(port);
    }

    deferred.resolve(server);
}

module.exports = deferred.promise;

#1


2  

From that source : http://socket.io/docs/using-multiple-nodes/

从这个来源:http://socket.io/docs/usingmultinodes/。

If you plan to distribute the load of connections among different processes or machines, you have to make sure that requests associated with a particular session id connect to the process that originated them.

如果您计划在不同的进程或机器之间分配连接负载,则必须确保与特定会话id相关联的请求连接到产生它们的进程。

This is due to certain transports like XHR Polling or JSONP Polling relying on firing several requests during the lifetime of the “socket”.

这是由于XHR轮询或JSONP轮询等特定传输方式,依赖于在“套接字”的生命周期内触发多个请求。

To route connections to the same worker every time :

每段时间与同一名员工建立联系:

sticky-session

粘性会话

This is, in the socket.io documentation, the recommended way to route requests to the same worker every time.

这是,在插座里。io文档,推荐的方式每次将请求路由到同一工作者。

https://github.com/indutny/sticky-session

https://github.com/indutny/sticky-session

A simple performant way to use socket.io with a cluster.

一种简单的使用套接字的方法。io集群。

Socket.io is doing multiple requests to perform handshake and establish connection with a client. With a cluster those requests may arrive to different workers, which will break handshake protocol.

套接字。io正在执行多个请求,以执行握手并与客户机建立连接。如果有一个集群,这些请求可能会到达不同的工作人员,这将打破握手协议。

var sticky = require('sticky-sesion');

sticky(function() {
  // This code will be executed only in slave workers

  var http = require('http'),
      io = require('socket.io');

  var server = http.createServer(function(req, res) {
    // ....
  });
  io.listen(server);

  return server;
}).listen(3000, function() {
  console.log('server started on 3000 port');
});

To pass messages between nodes :

在节点之间传递消息:

socket.io-redis

socket.io-redis

This is, in socket.io documentation, the recommended way to share messages between workers.

这是插座。io文档,建议在员工之间共享消息的方法。

https://github.com/automattic/socket.io-redis

https://github.com/automattic/socket.io-redis

By running socket.io with the socket.io-redis adapter you can run multiple socket.io instances in different processes or servers that can all broadcast and emit events to and from each other.

通过运行插座。io的套接字。io-redis适配器可以运行多个套接字。不同进程或服务器上的io实例,它们可以相互广播和发送事件。

socket.io-redis is used this way :

套接字。io-redis是这样使用的:

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

Also

I think you are not using socket.io v1.0.0. You might want to update your version in order to get more stability.

我想你没有使用插座。io v1.0.0。您可能想要更新您的版本以获得更多的稳定性。

You can check their migration guide at http://socket.io/docs/migrating-from-0-9/

您可以在http://socket.io/docs/migrating-from-09/上查看他们的迁移指南。

#2


2  

There is a step missing from the socket.io docs when using

从插座中漏出了一步。io文档在使用

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

You need to tell the client that you want to use 'websockets' as the only form of transport or it will not work... so for the constructor on the client use

你需要告诉客户你想要使用“websockets”作为唯一的交通工具,否则它将无法工作。对于客户端使用的构造函数。

io.connect(yourURL , { transports : ['websocket']});

see my answer to a similar question here ( my answer might be more appropriate on this thread ): https://*.com/a/30791006/4127352

请参阅我在这里的类似问题的答案(我的答案可能在这个线程上更合适):https://*.com/a/30791006/4127352。

#3


1  

The below code work for me, this is socket.io who created clusters, i set config.clusterSticky on true for activate compatibility clusters and socket.io

下面的代码为我工作,这是套接字。io创建了集群,我设置了config。对于激活兼容性集群和socket.io, clusterSticky是正确的。

'use strict';

/*
 var cl = console.log;
 console.log = function(){
 console.trace();
 cl.apply(console,arguments);
 };
 */

var cluster = require('cluster'),
    config = require('./config/all'),
    deferred = require('q').defer(),
    express = require('express'),
    app = express(),
    http = require('http'),
    sticky = require('socketio-sticky-session'),
    io = require('socket.io');

// Code to run if we're in the master process or if we are not in debug mode/ running tests

if ((cluster.isMaster) &&
    (process.execArgv.indexOf('--debug') < 0) &&
    (process.env.NODE_ENV !== 'test') && (process.env.NODE_ENV !== 'development') &&
    (process.execArgv.indexOf('--singleProcess') < 0) &&
    (!config.clusterSticky)) {

    console.log('for real!');
    // Count the machine's CPUs
    var cpuCount = process.env.CPU_COUNT || require('os').cpus().length;

    // Create a worker for each CPU
    for (var i = 0; i < cpuCount; i += 1) {
        console.log('forking ', i);
        cluster.fork();
    }

    // Listen for dying workers
    cluster.on('exit', function (worker) {
        // Replace the dead worker, we're not sentimental
        console.log('Worker ' + worker.id + ' died :(');
        cluster.fork();
    });

// Code to run if we're in a worker process
} else {
    var port = config.http.port;
    var workerId = 0;
    if (!cluster.isMaster) {
        workerId = cluster.worker.id;
    }

    var server = http.createServer(app);
    io.listen(server);

    //TODO routes etc (core)

    server.on('listening', function () {
        console.log('Slave app started on port ' + port + ' (' + process.env.NODE_ENV + ') cluster.worker.id:', workerId);
    });

    if(config.clusterSticky && (process.env.NODE_ENV !== 'test') && (process.env.NODE_ENV !== 'development')) {
        sticky(server).listen(port);
    } else {
        server.listen(port);
    }

    deferred.resolve(server);
}

module.exports = deferred.promise;