Skip to content
This repository has been archived by the owner on May 26, 2023. It is now read-only.

Commit

Permalink
Support decrypting of history payloads (#302)
Browse files Browse the repository at this point in the history
Co-authored-by: feedmeapples <aksenov.ro@outlook.com>
Co-authored-by: Ruslan <11838981+feedmeapples@users.noreply.github.com>
Co-authored-by: swyx <shawnthe1@gmail.com>
  • Loading branch information
4 people authored Apr 27, 2021
1 parent 0dc9de2 commit a49b7e5
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 11 deletions.
47 changes: 47 additions & 0 deletions client/features/data-encryption.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import WebSocketAsPromised from 'websocket-as-promised';

export const decryptEventPayloads = async (events, port) => {
const sock = new WebSocketAsPromised(`ws://localhost:${port}/`, {
packMessage: data => JSON.stringify(data),
unpackMessage: data => JSON.parse(data),
attachRequestId: (data, requestId) =>
Object.assign({ requestId: requestId }, data),
extractRequestId: data => data && data.requestId,
});

try {
await sock.open();
const requests = [];

events.forEach(event => {
let payloads = [];

if (event.details.input) {
payloads = event.details.input.payloads;
} else if (event.details.result) {
payloads = event.details.result.payloads;
}

payloads.forEach((payload, i) => {
requests.push(
sock
.sendRequest({ payload: JSON.stringify(payload) })
.then(response => {
payloads[i] = JSON.parse(response.content);
})
);
});
});
await Promise.all(requests);
} catch (err) {
const message = `Unable to decrypt event payload: ${err}`;

return Promise.reject({ message });
} finally {
if (sock.isOpened) {
await sock.close();
}
}

return events;
};
10 changes: 10 additions & 0 deletions client/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ const routeOpts = {

// redirects

{
name: 'data-converter',
path: '/data-converter/:port',
beforeEnter: async (to, _from, next) => {
await http.global.post(
`/api/web-settings/data-converter/${to.params.port}`
);
next('/');
},
},
{
name: 'namespaces-redirect',
path: '/namespace/*',
Expand Down
43 changes: 39 additions & 4 deletions client/routes/workflow/index.vue
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ import {
import { NOTIFICATION_TYPE_ERROR } from '~constants';
import { getErrorMessage } from '~helpers';
import { NavigationBar, NavigationLink } from '~components';
import { decryptEventPayloads } from '~features/data-encryption';
export default {
data() {
Expand Down Expand Up @@ -107,10 +108,13 @@ export default {
taskQueue: {},
unwatch: [],
webSettings: undefined,
};
},
props: ['namespace', 'runId', 'workflowId'],
created() {
async created() {
await this.getWebSettings();
this.unwatch.push(
this.$watch('baseAPIURL', this.onBaseApiUrlChange, { immediate: true })
);
Expand All @@ -131,7 +135,10 @@ export default {
)}/${encodeURIComponent(runId)}`;
},
historyUrl() {
const historyUrl = `${this.baseAPIURL}/history?waitForNewEvent=true`;
const rawPayloads = this.webSettings?.dataConverter?.port
? '&rawPayloads=true'
: '';
const historyUrl = `${this.baseAPIURL}/history?waitForNewEvent=true${rawPayloads}`;
if (!this.nextPageToken) {
return historyUrl;
Expand Down Expand Up @@ -188,12 +195,33 @@ export default {
this.nextPageToken = res.nextPageToken;
});
const { events } = res.history;
return events;
})
.then(events => {
const port = this.webSettings?.dataConverter?.port;
if (port == undefined) {
return events;
}
return decryptEventPayloads(events, port).catch(error => {
console.error(error);
this.$emit('onNotification', {
message: getErrorMessage(error),
type: NOTIFICATION_TYPE_ERROR,
});
return events;
});
})
.then(events => {
const shouldHighlightEventId =
this.$route.query.eventId &&
this.events.length <= this.$route.query.eventId;
const { events } = res.history;
this.events = this.events.concat(events);
this.history.events = getHistoryEvents(this.events);
Expand Down Expand Up @@ -300,6 +328,13 @@ export default {
this.loading = false;
});
},
async getWebSettings() {
if (this.webSettings) {
return this.webSettings;
}
this.webSettings = await this.$http(`/api/web-settings`);
},
},
};
</script>
26 changes: 26 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@
"vue-template-compiler": "^2.5.2",
"vue-virtual-scroller": "^1.0.10",
"vue2-datepicker": "^3.4.1",
"webpack": "^3.12.0"
"webpack": "^3.12.0",
"websocket-as-promised": "^2.0.1"
},
"devDependencies": {
"atob": "^2.1.2",
Expand Down
8 changes: 8 additions & 0 deletions server/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ router.get(
? Buffer.from(q.nextPageToken, 'base64')
: undefined,
waitForNewEvent: 'waitForNewEvent' in q ? true : undefined,
rawPayloads: 'rawPayloads' in q ? true : undefined,
});
}
);
Expand Down Expand Up @@ -350,17 +351,24 @@ router.get('/api/namespaces/:namespace/task-queues/:taskQueue/', async function(
ctx.body = tq;
});

router.post('/api/web-settings/data-converter/:port', async(ctx) => {
ctx.session.dataConverter = { port: ctx.params.port };
ctx.status = 200;
});

router.get('/api/web-settings', async (ctx) => {
const routing = await getRoutingConfig();
const { enabled } = await getAuthConfig();
const permitWriteApi = isWriteApiPermitted();
const dataConverter = ctx.session.dataConverter;

const auth = { enabled }; // only include non-sensitive data

ctx.body = {
routing,
auth,
permitWriteApi,
dataConverter,
};
});

Expand Down
18 changes: 13 additions & 5 deletions server/temporal-client/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const _uiTransformPayloadKeys = [
_payloads,
];

function uiTransform(item) {
function uiTransform(item, rawPayloads=false, transformingPayloads=false) {
if (!item || typeof item !== 'object') {
return item;
}
Expand All @@ -79,6 +79,12 @@ function uiTransform(item) {
return;
}

if (rawPayloads && transformingPayloads) {
item[subkey] = subvalue.toString('base64');

return;
}

const stringval = subvalue.toString('utf8');

try {
Expand All @@ -96,7 +102,9 @@ function uiTransform(item) {
item[subkey] = stringval;
}
} else if (Array.isArray(subvalue)) {
if (subkey === _payloads) {
if (subkey === _payloads && rawPayloads) {
subvalue.forEach(function(item) { uiTransform(item, rawPayloads, true) });
} else if (subkey === _payloads) {
let payloads = [];

Object.entries(subvalue).forEach(([subkey, payload]) => {
Expand All @@ -120,7 +128,7 @@ function uiTransform(item) {
});
item[_payloads] = payloads;
} else {
subvalue.forEach(uiTransform);
subvalue.forEach(function(item) { uiTransform(item, rawPayloads) });
}
} else if (typeof subvalue == 'string') {
subvalue = enumTransform(subvalue);
Expand All @@ -144,10 +152,10 @@ function uiTransform(item) {
});
item[subkey] = values;
} else {
uiTransform(subvalue);
uiTransform(subvalue, rawPayloads, transformingPayloads);
}
} else {
uiTransform(subvalue);
uiTransform(subvalue, rawPayloads, transformingPayloads);
}
}
});
Expand Down
3 changes: 2 additions & 1 deletion server/temporal-client/temporal-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ TemporalClient.prototype.getHistory = async function(
nextPageToken,
execution,
waitForNewEvent,
rawPayloads,
maximumPageSize = 100,
}
) {
Expand All @@ -174,7 +175,7 @@ TemporalClient.prototype.getHistory = async function(

let res = await this.client.getWorkflowExecutionHistoryAsync(ctx, req);

res = uiTransform(res);
res = uiTransform(res, rawPayloads);

if (res.history && res.history.events) {
res.history = buildHistory(res);
Expand Down

0 comments on commit a49b7e5

Please sign in to comment.