Skip to content

Commit

Permalink
Add youtube feed processing in backend
Browse files Browse the repository at this point in the history
  • Loading branch information
dbelokon committed Dec 8, 2021
1 parent a3f1a3e commit c0fba55
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 27 deletions.
23 changes: 1 addition & 22 deletions src/backend/data/post.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,7 @@ const Feed = require('./feed');
const hash = require('./hash');
const ArticleError = require('./article-error');
const { indexPost } = require('../utils/indexer');

/**
* Makes sure that a given date can be constructed as a Date object
* Returns a constructed Date object, if possible
* Otherwise throws an Error
* @param {Object} date an Object to construct as a Date object
* @param {Date} [fallbackDate] an optional second Date to construct in case the first fails to do so
*/
function ensureDate(date, fallbackDate) {
if (
date &&
(Object.prototype.toString.call(date) === '[object String]' ||
(Object.prototype.toString.call(date) === '[object Date]' && !Number.isNaN(date)))
) {
return new Date(date);
}
if (Object.prototype.toString.call(fallbackDate) === '[object Date]') {
return new Date(fallbackDate);
}

throw new Error(`post has an invalid date: ${date}'`);
}
const { ensureDate } = require('../utils/date-validator');

/**
* Makes sure that the given feed is a Feed and not just an id. If the latter
Expand Down
91 changes: 91 additions & 0 deletions src/backend/data/video.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
const { logger } = require('../utils/logger');
const ArticleError = require('./article-error');
const { ensureDate } = require('../utils/date-validator');
const { addVideo } = require('../utils/storage');
const Feed = require('./feed');
const hash = require('./hash');

class Video {
constructor(title, url, guid, datePublished, dateUpdated, feed) {
this.id = hash(guid);
this.title = title;
this.url = url;
this.guid = guid;
this.published = ensureDate(datePublished);
this.updated = ensureDate(dateUpdated, datePublished);

if (!(feed instanceof Feed)) {
throw new Error(`expected feed to be a Feed Object, got '${feed}'`);
}
this.feed = feed;
}

/**
* Save the current Video to the database, using the
* associated feed's id to link the Feed with the Video
*
* @returns an empty Promise
*/
save() {
return addVideo({
...this,
feed: this.feed.id,
});
}

/**
* Create a Video object from an article object.
* If some data is missing, object creation will throw
* an ArticleError exception.
*
* @param {*} article the source article for the video
* @param {*} feed the feed associated to the video
* @returns
*/
static async createFromArticle(article, feed) {
if (!article) {
throw new Error('unable to read missing article');
}

const missing = [];

if (!article.link) missing.push('link');

if (!article.pubDate) missing.push('pubdate');

if (!article.id) missing.push('id');

if (!article.author) missing.push('author');

if (missing.length) {
const message = `invalid youtube article: missing ${missing.join(', ')}`;
logger.debug(message);
throw new ArticleError(message);
}

// Allow for a missing title, but give it one
if (!article.title) {
logger.debug('article missing title, substituting with "Untitled"');
article.title = 'Untitled';
}

if (!article.date) {
logger.debug('article missing date of last update, substituting publication date');
article.date = article.pubDate;
}

const video = new Video(
article.title,
article.link,
article.id,
article.pubDate,
article.date,
feed
);

await Promise.all([video.save()]);
return video.id;
}
}

module.exports = Video;
43 changes: 38 additions & 5 deletions src/backend/feed/processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
* with potentially multiple simultaneous instances, by the feed queue.
* https://github.com/OptimalBits/bull#separate-processes
*/
const { URL } = require('url');

const Parser = require('rss-parser');
const fetch = require('node-fetch');

const { logger } = require('../utils/logger');
const Video = require('../data/video');
const Post = require('../data/post');
const Feed = require('../data/feed');
const ArticleError = require('../data/article-error');
Expand All @@ -17,6 +19,13 @@ function hasHeaders(feed) {
return feed.etag || feed.lastModified;
}

// Check whether the feed link is from a YouTube feed
function isFeedFromYoutube(feedUrl) {
const urlObj = new URL(feedUrl);

return urlObj.hostname.includes('youtube');
}

/**
* If we have extra cache/modification info about this feed, add it to the headers.
* @param {Feed} feed - the feed Object, possibly with etag and lastModified info
Expand Down Expand Up @@ -118,6 +127,21 @@ function articlesToPosts(articles, feed) {
);
}

function articlesToVideos(articles, feed) {
return Promise.all(
articles.map(async (article) => {
try {
await Video.createFromArticle(article, feed);
} catch (error) {
if (error instanceof ArticleError) {
return;
}
throw error;
}
})
);
}

/**
* The processor for the feed queue receives feed jobs, where
* the job to process is an Object with the `id` of the feed.
Expand Down Expand Up @@ -184,22 +208,31 @@ module.exports = async function processor(job) {
['pubDate', 'pubdate'],
['creator', 'author'],
['content:encoded', 'contentEncoded'],
['updated', 'date'],
['link', 'link', { keepArray: true }],
],
},
},
feed
)
);
const articles = await parser.parseURL(feed.url);
// Transform the list of articles to a list of Post objects
await articlesToPosts(articles.items, feed);

const feedXml = await parser.parseURL(feed.url);

if (isFeedFromYoutube(feed.url)) {
// Transform the list of items to a list of Video objects
await articlesToVideos(feedXml.items, feed);
} else {
// Transform the list of articles to a list of Post objects
await articlesToPosts(feedXml.items, feed);
}

// Version info for this feed changed, so update the database
feed.etag = feed.etag || info.etag;
feed.lastModified = feed.lastModified || info.lastModified;
// If feed.link is empty or there are blog posts
if (!feed.link && articles.items.length) {
feed.link = articles?.link || null;
if (!feed.link && feedXml.items.length) {
feed.link = feedXml?.link || null;
}
await feed.save();
} catch (error) {
Expand Down
23 changes: 23 additions & 0 deletions src/backend/utils/date-validator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
module.exports = {
/**
* Makes sure that a given date can be constructed as a Date object
* Returns a constructed Date object, if possible
* Otherwise throws an Error
* @param {Object} date an Object to construct as a Date object
* @param {Date} [fallbackDate] an optional second Date to construct in case the first fails to do so
*/
ensureDate: function (date, fallbackDate) {
if (
date &&
(Object.prototype.toString.call(date) === '[object String]' ||
(Object.prototype.toString.call(date) === '[object Date]' && !Number.isNaN(date)))
) {
return new Date(date);
}
if (Object.prototype.toString.call(fallbackDate) === '[object Date]') {
return new Date(fallbackDate);
}

throw new Error(`date is invalid: ${date}'`);
},
};
63 changes: 63 additions & 0 deletions src/backend/utils/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@ const { logger } = require('./logger');
const feedsKey = 't:feeds';
const flaggedFeedsKey = 't:feeds:flagged';
const postsKey = 't:posts';
const videosKey = 't:videos';

// Namespaces
const feedNamespace = 't:feed:';
const postNamespace = 't:post:';
const videoNamespace = 't:video:';
// Suffixes
const invalidSuffix = ':invalid';
const delayedSuffix = ':delayed';

// "6Xoj0UXOW3" to "t:post:6Xoj0UXOW3"
const createPostKey = (id) => postNamespace.concat(id);
// "MpoY0ZxK23" to "t:video:MpoY0ZxK23"
const createVideoKey = (id) => videoNamespace.concat(id);
// "NirlSYranl" to "t:feed:NirlSYranl"
const createFeedKey = (id) => feedNamespace.concat(id);
// "NirlSYranl" to "t:feed:NirlSYranl:invalid"
Expand Down Expand Up @@ -162,4 +166,63 @@ module.exports = {
.zrem(postsKey, id)
.exec();
},

/**
* Introduces a new Video record
* to the 't:video' namespace.
*
* The structure of video is of a Video-like object.
*
* @param {*} video a Video-like object
*/
addVideo: async (video) => {
const key = createVideoKey(video.id);
await redis
.multi()
.hset(
key,
'id',
video.id,
'title',
video.title,
'url',
video.url,
'published',
video.published,
'updated',
video.updated,
'guid',
video.guid,
'feed',
video.feed
)
// sort set by published date as scores
.zadd(videosKey, video.published.getTime(), video.id)
.exec();
},

/**
* Returns an array of video ids, where the range is 'from' index
* to 'to' index, exclusive; that is, 'from ..< to'
*
* @param {*} from starting index, included in the result
* @param {*} to ending index, excluded from the result
* @returns an array of video ids, length equal to "to - from"
*/
getVideos: (from, to) => redis.zrevrange(videosKey, from, to - 1),

/**
* Returns an array of video ids, where each video has been uploaded
* between the 'startDate' and the 'endDate'
*
* @param {*} startDate
* @param {*} endDate
* @returns an array of video ids, length can vary
*/
getVideosByDate: (startDate, endDate) =>
redis.zrangebyscore(videosKey, startDate.getTime(), endDate.getTime()),

getVideosCount: () => redis.zcard(videosKey),

getVideo: (id) => redis.hgetall(videoNamespace.concat(id)),
};

0 comments on commit c0fba55

Please sign in to comment.