-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
159 lines (130 loc) · 3.06 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
'use strict';
const uuid = require('uuid/v4');
const {
EventEmitter
} = require('events');
// Legal characters
const chars = 'a-zA-Z0-9\\-\\_';
/**
* NB: The subs are stored in 2 maps shared by all instances of the NATS class.
*/
let _subs = new Map();
class NATS extends EventEmitter {
/**
* The mocked transport's subs for testing purposes.
*/
static get subs() {
return _subs;
}
static set subs(subs) {
if (!(subs instanceof Map))
throw new TypeError('subs must be a map');
_subs = subs;
}
/**
* Fakes a connection to a nats-server and returns the client.
*
* @returns {NATS} client
*/
static connect() {
const nats = new NATS();
process.nextTick(() => nats.emit('connect'));
return nats;
}
/**
* Fakes a disconnection.
*/
close() {
process.nextTick(() => this.emit('disconnect'));
}
/**
* Subscribe to a given subject.
* TODO: implement `options` argument.
*
* @param {String} subject
* @param {Function} callback
*
* @returns {String} sid
*/
subscribe(subject, callback) {
// TODO: validate subject syntax
const sid = uuid();
// Handle wild cards
// NB: this assumes a valid subject syntax
const _subject = `^${subject
.replace('>', `[${chars}\\.]+`) // '>' full wildcard
.replace(/\*/g, `[${chars}]+`) // '*' token wildcard
.replace(/\./g, '\\.')}$`; // escape dots
const sub = {
sid,
subject: _subject,
callback
};
this._addSub(sub);
return sid;
}
/**
* Unsubscribe to a given Subscriber Id.
*
* @param {String} sid
*/
unsubscribe(sid) {
const sub = _subs.get(sid);
if (sub == null) return;
_subs.delete(sid);
}
/**
* Publish a message to the given subject, with optional `replyTo`.
* TODO: implement optional callback
*
* @param {String} subject
* @param {String} message
* @param {String} replyTo
*/
publish(subject, message, replyTo) {
const subs = this._getSubsBySubject(subject);
for (const sub of subs) {
sub.callback(message, replyTo, subject);
}
}
/**
* Subscribe to an ad hoc subject to get a reply after publishing using this
* ad hoc subject as the replyTo.
*
* @param {String} subject
* @param {String} message
* @param {Object} options
* @param {Function} callback
*
* @returns {String} sid
*/
request(subject, message, options, callback) {
const sid = uuid();
if (typeof message === 'function') {
callback = message;
message = EMPTY;
opt_options = null;
}
if (typeof options === 'function') {
callback = options;
options = null;
}
const sub = {
sid,
subject: sid,
callback
};
this._addSub(sub);
this.publish(subject, message, sid);
return sid;
}
_getSubsBySubject(subject) {
return Array.from(_subs.values())
.filter(({ subject: _subject }) =>
new RegExp(_subject, 'g').test(subject));
}
_addSub(sub) {
_subs.set(sub.sid, sub);
}
}
module.exports = NATS;