Skip to content

Commit

Permalink
fix: websocket subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael.Taylor committed Dec 31, 2021
1 parent 11356a7 commit 548afd4
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 61 deletions.
29 changes: 15 additions & 14 deletions src/models/projects/projects.model.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
import Sequelize from 'sequelize';
import rxjs from "rxjs";
import rxjs from 'rxjs';

const { Model } = Sequelize;

Expand All @@ -14,11 +14,11 @@ import { Rating } from './../ratings';
import { CoBenefit } from './../co-benefits';

import ModelTypes from './projects.modeltypes.cjs';
import { optionallyPaginatedResponse } from "../../controllers/helpers.js";
import { optionallyPaginatedResponse } from '../../controllers/helpers.js';

class Project extends Model {
static changes = new rxjs.Subject();

static associate() {
Project.hasMany(RelatedProject);
Project.hasMany(Vintage);
Expand All @@ -27,24 +27,25 @@ class Project extends Model {
Project.hasMany(CoBenefit);
Project.hasMany(ProjectLocation);
}

static async create(values, options) {
const createResult = await super.create(values, options);
const { orgUid } = createResult;


const { orgUid } = values['0'];

Project.changes.next(['projects', orgUid]);

return createResult;
}

static async destroy(values) {
const { id: projectId } = values.where;
const { orgUid } = await super.findOne(projectId);

const destroyResult = await super.destroy(values);

Project.changes.next(['projects', orgUid]);

return destroyResult;
}

Expand Down Expand Up @@ -78,7 +79,7 @@ class Project extends Model {

sql = `${sql} ORDER BY relevance DESC LIMIT :limit OFFSET :offset`;

const count = await Project.count()
const count = await Project.count();

return {
count,
Expand All @@ -88,11 +89,11 @@ class Project extends Model {
mapToModel: true, // pass true here if you have any mapped fields
offset,
limit,
})
}),
};
}

static findAllSqliteFts(searchStr, orgUid, pagination ) {
static findAllSqliteFts(searchStr, orgUid, pagination) {
const { offset, limit } = pagination;
let sql = `SELECT * FROM projects_fts WHERE projects_fts MATCH :search`;

Expand Down
1 change: 1 addition & 0 deletions src/routes/v1/resources/projects.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ ProjectRouter.get('/', (req, res) => {
});

const baseSchema = {
orgUid: Joi.string().required(),
currentRegistry: Joi.string().required(),
registryOfOrigin: Joi.string().required(),
originProjectId: Joi.string().required(),
Expand Down
4 changes: 2 additions & 2 deletions src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import rootRouter from './routes';
import http from 'http';
import { Server } from 'socket.io';
import Debug from 'debug';
import { connection } from "./websocket";
import { connection } from './websocket';

const debug = Debug('climate-warehouse:server');

Expand All @@ -17,7 +17,7 @@ server.on('listening', onListening);
server.listen(port);

const io = new Server(server);
io.of('/v1/updates').on('connection', connection);
io.of('/v1/ws').on('connection', connection);

function onError(error) {
if (error.syscall !== 'listen') {
Expand Down
80 changes: 35 additions & 45 deletions src/websocket.js
Original file line number Diff line number Diff line change
@@ -1,67 +1,57 @@
'use strict';

import { Project, Unit } from "./models/index.js";
import { BehaviorSubject, zip } from "rxjs";
import { Project, Unit } from './models/index.js';

const socketSubscriptions = {};

const ORG_UID = 0;
const ENTITY_NAME = 1;

//future authentication logic goes here
const authenticate = (_payload) => true;

const subscribedChangeFeeds = ({ projects, units }) => [projects, units].filter(feed => Boolean(feed));

export const connection = (socket) => {
let combinedChangeFeedSubscription = null;

const changeFeeds = new BehaviorSubject({
projects: null,
units: null,
socket.on('authentication', () => {
console.log('Attempting to authenticate');
if (!authenticate(socket)) {
console.log('authentication failure');
return socket.disconnect();
} else {
socket.emit('authenticated');
}
});

socket.on('disconnect', () => {
if (changeFeedListSub) {
changeFeedListSub.unsubscribe();
}

if (socketSubscriptions[socket.id]) {
socketSubscriptions[socket.id].unsubscribe();
delete socketSubscriptions[socket.id];
}
});

socket.on('/subscribe', (feed) => {

socket.on('/subscribe', (feed, callback) => {
if (!socketSubscriptions[socket.id]) {
socketSubscriptions[socket.id] = [];
}

switch (feed) {
case 'projects':
if (!changeFeeds.projects) {
changeFeeds.projects = Project.changes;
if (!socketSubscriptions[socket.id].includes('projects')) {
Project.changes.subscribe((data) => {
socket.emit('change:projects', data);
});
socketSubscriptions[socket.id].push('projects');
callback('success');
} else {
callback('already subscribed');
}
break;
break;
case 'units':
if (!changeFeeds.units) {
changeFeeds.units = Unit.changes;
if (!socketSubscriptions[socket.id].includes('units')) {
Unit.changes.subscribe((data) => {
socket.emit('change:units', data);
});
socketSubscriptions[socket.id].push('units');
callback('success');
} else {
callback('already subscribed');
}
break;
}
});

if (!authenticate(socket)) {
console.log('authentication failure');
return socket.disconnect();
} else {
socket.broadcast.emit('authenticated');
}

const changeFeedListSub = changeFeeds.subscribe((feeds) => {
if (combinedChangeFeedSubscription) {
combinedChangeFeedSubscription.unsubscribe();
break;
}

// using zip to take an array of observables, and funnel all their outputs into a single handler
combinedChangeFeedSubscription = zip(...subscribedChangeFeeds(feeds)).subscribe(update => {
socket.broadcast.emit(`change:${update[ENTITY_NAME]}`, JSON.stringify({ orgUid: update[ORG_UID] }));
});
});
}
};

0 comments on commit 548afd4

Please sign in to comment.