"use strict";
const countDownLatch = require('../../util/countDownLatch');
const utils = require('../../util/utils');
const ChannelRemote = require('../remote/frontend/channelRemote');
const logger = require('@sex-pomelo/sex-pomelo-logger').getLogger('pomelo', __filename);
/*!
* constant
*/
const ST_INITED = 0;
const ST_DESTROYED = 1;
/**
* Create and maintain channels for server local.
*
* ChannelService is created by channel component which is a default loaded
* component of pomelo and channel service would be accessed by `app.get('channelService')`.
*
* @class
* @constructor
*/
let ChannelService = function(app, opts) {
opts = opts || {};
this.app = app;
this.channels = {};
this.prefix = opts.prefix;
this.store = opts.store;
this.broadcastFilter = opts.broadcastFilter;
this.channelRemote = new ChannelRemote(app);
};
module.exports = ChannelService;
ChannelService.prototype.start = function(cb) {
restoreChannel(this, cb);
};
/**
* Create channel with name.
*
* @param {String} name channel's name
* @memberOf ChannelService
*/
ChannelService.prototype.createChannel = function(name) {
if(this.channels[name]) {
return this.channels[name];
}
let c = new Channel(name, this);
addToStore(this, genKey(this), genKey(this, name));
this.channels[name] = c;
return c;
};
/**
* Get channel by name.
*
* @param {String} name channel's name
* @param {Boolean} create if true, create channel
* @return {Channel}
* @memberOf ChannelService
*/
ChannelService.prototype.getChannel = function(name, create) {
let channel = this.channels[name];
if(!channel && !!create) {
channel = this.channels[name] = new Channel(name, this);
addToStore(this, genKey(this), genKey(this, name));
}
return channel;
};
/**
* Destroy channel by name.
*
* @param {String} name channel name
* @memberOf ChannelService
*/
ChannelService.prototype.destroyChannel = function(name) {
delete this.channels[name];
removeFromStore(this, genKey(this), genKey(this, name));
removeAllFromStore(this, genKey(this, name));
};
/**
* Push message by uids.
* Group the uids by group. ignore any uid if sid not specified.
*
* @param {String} route message route
* @param {Object} msg message that would be sent to client
* @param {Array} uids the receiver info list, [{uid: userId, sid: frontendServerId}]
* @param {Object} opts user-defined push options, optional
* @param {Function} cb cb(err)
* @memberOf ChannelService
*/
ChannelService.prototype.pushMessageByUids = function(route, msg, uids, opts, cb) {
if(typeof route !== 'string') {
cb = opts;
opts = uids;
uids = msg;
msg = route;
route = msg.route;
}
if(!cb && typeof opts === 'function') {
cb = opts;
opts = {};
}
if(!uids || uids.length === 0) {
utils.invokeCallback(cb, new Error('uids should not be empty'));
return;
}
let groups = {}, record;
for(let i=0, l=uids.length; i<l; i++) {
record = uids[i];
add(record.uid, record.sid, groups);
}
sendMessageByGroup(this, route, msg, groups, opts, cb);
};
/**
* Broadcast message to all the connected clients.
*
* @param {String} stype frontend server type string
* @param {String} route route string
* @param {Object} msg message
* @param {Object} opts user-defined broadcast options, optional
* opts.binded: push to binded sessions or all the sessions
* opts.filterParam: parameters for broadcast filter.
* @param {Function} cb callback
* @memberOf ChannelService
*/
ChannelService.prototype.broadcast = function(stype, route, msg, opts, cb) {
let app = this.app;
let namespace = 'sys';
let service = 'channelRemote';
let method = 'broadcast';
let servers = app.getServersByType(stype);
if(!servers || servers.length === 0) {
// server list is empty
utils.invokeCallback(cb);
return;
}
let count = servers.length;
let successFlag = false;
let latch = countDownLatch.createCountDownLatch(count, function() {
if(!successFlag) {
utils.invokeCallback(cb, new Error('broadcast fails'));
return;
}
utils.invokeCallback(cb, null);
});
let genCB = function(serverId) {
return function(err) {
if(err) {
logger.error('[broadcast] fail to push message to serverId: ' + serverId + ', err:' + err.stack);
latch.done();
return;
}
successFlag = true;
latch.done();
};
};
let self = this;
let sendMessage = function(serverId) {
return (function() {
if(serverId === app.serverId) {
self.channelRemote[method](route, msg, opts, genCB());
} else {
app.rpcInvoke(serverId, {namespace: namespace, service: service,
method: method, args: [route, msg, opts]}, genCB(serverId));
}
}());
};
opts = {type: 'broadcast', userOptions: opts || {}};
// for compatiblity
opts.isBroadcast = true;
if(opts.userOptions) {
opts.binded = opts.userOptions.binded;
opts.filterParam = opts.userOptions.filterParam;
}
for(let i=0, l=count; i<l; i++) {
sendMessage(servers[i].id);
}
};
/**
* Channel maintains the receiver collection for a subject. You can
* add users into a channel and then broadcast message to them by channel.
*
* @class channel
* @constructor
*/
let Channel = function(name, service) {
this.name = name;
this.groups = {}; // group map for uids. key: sid, value: [uid]
this.records = {}; // member records. key: uid
this.__channelService__ = service;
this.state = ST_INITED;
this.userAmount =0;
};
/**
* Add user to channel.
*
* @param {Number} uid user id
* @param {String} sid frontend server id which user has connected to
*/
Channel.prototype.add = function(uid, sid) {
if(this.state > ST_INITED) {
return false;
} else {
let res = add(uid, sid, this.groups);
if(res) {
this.records[uid] = {sid: sid, uid: uid};
this.userAmount =this.userAmount+1;
}
addToStore(this.__channelService__, genKey(this.__channelService__, this.name), genValue(sid, uid));
return res;
}
};
/**
* Remove user from channel.
*
* @param {Number} uid user id
* @param {String} sid frontend server id which user has connected to.
* @return [Boolean] true if success or false if fail
*/
Channel.prototype.leave = function(uid, sid) {
if(!uid || !sid) {
return false;
}
let res = deleteFrom(uid, sid, this.groups[sid]);
if(res){
delete this.records[uid];
this.userAmount = this.userAmount-1;
}
if(this.userAmount<0) this.userAmount=0;//robust
removeFromStore(this.__channelService__, genKey(this.__channelService__, this.name), genValue(sid, uid));
if(this.groups[sid] && this.groups[sid].length === 0) {
delete this.groups[sid];
}
return res;
};
/**
* Get channel UserAmount in a channel.
*
* @return {number } channel member amount
*/
Channel.prototype.getUserAmount = function() {
return this.userAmount;
};
/**
* Get channel members.
*
* <b>Notice:</b> Heavy operation.
*
* @return {Array} channel member uid list
*/
Channel.prototype.getMembers = function() {
let res = [], groups = this.groups;
let group, i, l;
for(let sid in groups) {
group = groups[sid];
for(i=0, l=group.length; i<l; i++) {
res.push(group[i]);
}
}
return res;
};
/**
* Get Member info.
*
* @param {String} uid user id
* @return {Object} member info
*/
Channel.prototype.getMember = function(uid) {
return this.records[uid];
};
/**
* Destroy channel.
*/
Channel.prototype.destroy = function() {
this.state = ST_DESTROYED;
this.__channelService__.destroyChannel(this.name);
};
/**
* Push message to all the members in the channel
*
* @param {String} route message route
* @param {Object} msg message that would be sent to client
* @param {Object} opts user-defined push options, optional
* @param {Function} cb callback function
*/
Channel.prototype.pushMessage = function(route, msg, opts, cb) {
if(this.state !== ST_INITED) {
utils.invokeCallback(new Error('channel is not running now'));
return;
}
if(typeof route !== 'string') {
cb = opts;
opts = msg;
msg = route;
route = msg.route;
}
if(!cb && typeof opts === 'function') {
cb = opts;
opts = {};
}
sendMessageByGroup(this.__channelService__, route, msg, this.groups, opts, cb);
};
/**
* add uid and sid into group. ignore any uid that uid not specified.
* @access private
* @param uid user id
* @param sid server id
* @param groups {Object} grouped uids, , key: sid, value: [uid]
*/
let add = function(uid, sid, groups) {
if(!sid) {
logger.warn('ignore uid %j for sid not specified.', uid);
return false;
}
let group = groups[sid];
if(!group) {
group = [];
groups[sid] = group;
}
group.push(uid);
return true;
};
/**
* delete element from array
* @access private
*/
let deleteFrom = function(uid, sid, group) {
if(!uid || !sid || !group) {
return false;
}
for(let i=0, l=group.length; i<l; i++) {
if(group[i] === uid) {
group.splice(i, 1);
return true;
}
}
return false;
};
/**
* push message by group
* @access private
* @param route {String} route route message
* @param msg {Object} message that would be sent to client
* @param groups {Object} grouped uids, , key: sid, value: [uid]
* @param opts {Object} push options
* @param cb {Function} cb(err)
*
* @api private
*/
let sendMessageByGroup = function(channelService, route, msg, groups, opts, cb) {
let app = channelService.app;
let namespace = 'sys';
let service = 'channelRemote';
let method = 'pushMessage';
let count = utils.size(groups);
let successFlag = false;
let failIds = [];
logger.debug('[%s] channelService sendMessageByGroup route: %s, msg: %j, groups: %j, opts: %j', app.serverId, route, msg, groups, opts);
if(count === 0) {
// group is empty
utils.invokeCallback(cb);
return;
}
let latch = countDownLatch.createCountDownLatch(count, function() {
if(!successFlag) {
utils.invokeCallback(cb, new Error('all uids push message fail'));
return;
}
utils.invokeCallback(cb, null, failIds);
});
let rpcCB = function(serverId) {
return function(err, fails) {
if(err) {
logger.error('[pushMessage] fail to dispatch msg to serverId: ' + serverId + ', err:' + err.stack);
latch.done();
return;
}
if(fails) {
failIds = failIds.concat(fails);
}
successFlag = true;
latch.done();
};
};
opts = {type: 'push', userOptions: opts || {}};
// for compatiblity
opts.isPush = true;
let sendMessage = function(sid) {
return (function() {
if(sid === app.serverId) {
channelService.channelRemote[method](route, msg, groups[sid], opts, rpcCB(sid));
} else {
app.rpcInvoke(sid, {namespace: namespace, service: service,
method: method, args: [route, msg, groups[sid], opts]}, rpcCB(sid));
}
})();
};
let group;
for(let sid in groups) {
group = groups[sid];
if(group && group.length > 0) {
sendMessage(sid);
} else {
// empty group
process.nextTick(rpcCB(sid));
}
}
};
let restoreChannel = function(self, cb) {
if(!self.store) {
utils.invokeCallback(cb);
return;
} else {
loadAllFromStore(self, genKey(self), function(err, list) {
if(!!err) {
utils.invokeCallback(cb, err);
return;
} else {
if(!list.length || !Array.isArray(list)) {
utils.invokeCallback(cb);
return;
}
let load = function(key,name) {
return (function() {
loadAllFromStore(self, key, function(err, items) {
for(let j=0; j<items.length; j++) {
let array = items[j].split(':');
let sid = array[0];
let uid = array[1];
let channel = self.channels[name];
let res = add(uid, sid, channel.groups);
if(res) {
channel.records[uid] = {sid: sid, uid: uid};
}
}
});
})();
};
for(let i=0; i<list.length; i++) {
let name = list[i].slice(genKey(self).length + 1);
self.channels[name] = new Channel(name, self);
load(list[i],name);
}
utils.invokeCallback(cb);
}
});
}
};
let addToStore = function(self, key, value) {
if(!!self.store) {
self.store.add(key, value, function(err) {
if(!!err) {
logger.error('add key: %s value: %s to store, with err: %j', key, value, err.stack);
}
});
}
};
let removeFromStore = function(self, key, value) {
if(!!self.store) {
self.store.remove(key, value, function(err) {
if(!!err) {
logger.error('remove key: %s value: %s from store, with err: %j', key, value, err.stack);
}
});
}
};
let loadAllFromStore = function(self, key, cb) {
if(!!self.store) {
self.store.load(key, function(err, list) {
if(!!err) {
logger.error('load key: %s from store, with err: %j', key, err.stack);
utils.invokeCallback(cb, err);
} else {
utils.invokeCallback(cb, null, list);
}
});
}
};
let removeAllFromStore = function(self, key) {
if(!!self.store) {
self.store.removeAll(key, function(err) {
if(!!err) {
logger.error('remove key: %s all members from store, with err: %j', key, err.stack);
}
});
}
};
let genKey = function(self, name) {
if(!!name) {
return self.prefix + ':' + self.app.serverId + ':' + name;
} else {
return self.prefix + ':' + self.app.serverId;
}
};
let genValue = function(sid, uid) {
return sid + ':' + uid;
};