-
Notifications
You must be signed in to change notification settings - Fork 8.1k
/
call_client.js
125 lines (103 loc) · 3.45 KB
/
call_client.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import _ from 'lodash';
import IsRequestProvider from './is_request';
import MergeDuplicatesRequestProvider from './merge_duplicate_requests';
import ReqStatusProvider from './req_status';
export default function CourierFetchCallClient(Private, Promise, es, esShardTimeout, sessionId) {
const isRequest = Private(IsRequestProvider);
const mergeDuplicateRequests = Private(MergeDuplicatesRequestProvider);
const ABORTED = Private(ReqStatusProvider).ABORTED;
const DUPLICATE = Private(ReqStatusProvider).DUPLICATE;
function callClient(strategy, requests) {
// merging docs can change status to DUPLICATE, capture new statuses
const statuses = mergeDuplicateRequests(requests);
// get the actual list of requests that we will be fetching
const executable = statuses.filter(isRequest);
let execCount = executable.length;
// resolved by respond()
let esPromise;
const defer = Promise.defer();
// for each respond with either the response or ABORTED
const respond = function (responses) {
responses = responses || [];
return Promise.map(requests, function (req, i) {
switch (statuses[i]) {
case ABORTED:
return ABORTED;
case DUPLICATE:
return req._uniq.resp;
default:
return responses[_.findIndex(executable, req)];
}
})
.then(
(res) => defer.resolve(res),
(err) => defer.reject(err)
);
};
// handle a request being aborted while being fetched
const requestWasAborted = Promise.method(function (req, i) {
if (statuses[i] === ABORTED) {
defer.reject(new Error('Request was aborted twice?'));
}
execCount -= 1;
if (execCount > 0) {
// the multi-request still contains other requests
return;
}
if (esPromise && _.isFunction(esPromise.abort)) {
esPromise.abort();
}
esPromise = ABORTED;
return respond();
});
// attach abort handlers, close over request index
statuses.forEach(function (req, i) {
if (!isRequest(req)) return;
req.whenAborted(function () {
requestWasAborted(req, i).catch(defer.reject);
});
});
// Now that all of THAT^^^ is out of the way, lets actually
// call out to elasticsearch
Promise.map(executable, function (req) {
return Promise.try(req.getFetchParams, void 0, req)
.then(function (fetchParams) {
return (req.fetchParams = fetchParams);
});
})
.then(function (reqsFetchParams) {
return strategy.reqsFetchParamsToBody(reqsFetchParams);
})
.then(function (body) {
// while the strategy was converting, our request was aborted
if (esPromise === ABORTED) {
throw ABORTED;
}
return (esPromise = es[strategy.clientMethod]({
timeout: esShardTimeout,
ignore_unavailable: true,
preference: sessionId,
body: body
}));
})
.then(function (clientResp) {
return strategy.getResponses(clientResp);
})
.then(respond)
.catch(function (err) {
if (err === ABORTED) respond();
else defer.reject(err);
});
// return our promise, but catch any errors we create and
// send them to the requests
return defer.promise
.catch(function (err) {
requests.forEach(function (req, i) {
if (statuses[i] !== ABORTED) {
req.handleFailure(err);
}
});
});
}
return callClient;
};