123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468 |
- import async from "async";
- import CoreClass from "../core";
- let ActivitiesModule;
- let DBModule;
- let CacheModule;
- let UtilsModule;
- let WSModule;
- let PlaylistsModule;
- class _ActivitiesModule extends CoreClass {
- // eslint-disable-next-line require-jsdoc
- constructor() {
- super("activities");
- ActivitiesModule = this;
- }
- /**
- * Initialises the activities module
- *
- * @returns {Promise} - returns promise (reject, resolve)
- */
- initialize() {
- return new Promise(resolve => {
- DBModule = this.moduleManager.modules.db;
- CacheModule = this.moduleManager.modules.cache;
- UtilsModule = this.moduleManager.modules.utils;
- WSModule = this.moduleManager.modules.ws;
- PlaylistsModule = this.moduleManager.modules.playlists;
- resolve();
- });
- }
- /**
- * Adds a new activity to the database
- *
- * @param {object} payload - object that contains the payload
- * @param {string} payload.userId - the id of the user who's activity is to be added
- * @param {string} payload.type - the type of activity (enum specified in schema)
- * @param {object} payload.payload - the details of the activity e.g. an array of songs that were added
- * @param {string} payload.payload.message - the main message describing the activity e.g. 50 songs added to playlist 'playlist name'
- * @param {string} payload.payload.thumbnail - url to a thumbnail e.g. song album art to be used when display an activity
- * @param {string} payload.payload.youtubeId - (optional) if relevant, the youtube id of the song related to the activity
- * @param {string} payload.payload.reportId - (optional) if relevant, the id of the report related to the activity
- * @param {string} payload.payload.playlistId - (optional) if relevant, the id of the playlist related to the activity
- * @param {string} payload.payload.stationId - (optional) if relevant, the id of the station related to the activity
- * @returns {Promise} - returns promise (reject, resolve)
- */
- ADD_ACTIVITY(payload) {
- return new Promise((resolve, reject) => {
- async.waterfall(
- [
- next => {
- DBModule.runJob("GET_MODEL", { modelName: "activity" }, this)
- .then(res => next(null, res))
- .catch(next);
- },
- (ActivityModel, next) => {
- const { userId, type } = payload;
- const activity = new ActivityModel({
- userId,
- type,
- payload: payload.payload
- });
- activity.save(next);
- },
- (activity, next) => {
- WSModule.runJob("SOCKETS_FROM_USER", { userId: activity.userId }, this)
- .then(sockets => {
- sockets.forEach(socket =>
- socket.dispatch("event:activity.created", { data: { activity } })
- );
- next(null, activity);
- })
- .catch(next);
- },
- (activity, next) => {
- const mergeableActivities = ["playlist__remove_song", "playlist__add_song"];
- const spammableActivities = [
- "user__toggle_nightmode",
- "user__toggle_autoskip_disliked_songs",
- "user__toggle_activity_watch",
- "song__like",
- "song__unlike",
- "song__dislike",
- "song__undislike"
- ];
- CacheModule.runJob("HGET", { table: "recentActivities", key: activity.userId })
- .then(recentActivity => {
- if (recentActivity) {
- const timeDifference = mins =>
- new Date() - new Date(recentActivity.createdAt) < mins * 60 * 1000;
- // if both activities have the same type, if within last 15 mins and if activity is within the spammableActivities array
- if (
- recentActivity.type === activity.type &&
- !!timeDifference(15) &&
- spammableActivities.includes(activity.type)
- )
- return ActivitiesModule.runJob(
- "CHECK_FOR_ACTIVITY_SPAM_TO_HIDE",
- { userId: activity.userId, type: activity.type },
- this
- )
- .then(() => next(null, activity))
- .catch(next);
- // if activity is within the mergeableActivities array, if both activities are about removing/adding and if within last 5 mins
- if (
- mergeableActivities.includes(activity.type) &&
- recentActivity.type === activity.type &&
- !!timeDifference(5)
- ) {
- return PlaylistsModule.runJob("GET_PLAYLIST", {
- playlistId: activity.payload.playlistId
- })
- .then(playlist =>
- ActivitiesModule.runJob(
- "CHECK_FOR_ACTIVITY_SPAM_TO_MERGE",
- {
- userId: activity.userId,
- type: activity.type,
- playlist: {
- playlistId: playlist._id,
- displayName: playlist.displayName
- }
- },
- this
- )
- .then(() => next(null, activity))
- .catch(next)
- )
- .catch(next);
- }
- return next(null, activity);
- }
- return next(null, activity);
- })
- .catch(next);
- },
- // store most recent activity in cache to be quickly accessible
- (activity, next) =>
- CacheModule.runJob(
- "HSET",
- {
- table: "recentActivities",
- key: activity.userId,
- value: { createdAt: activity.createdAt, type: activity.type }
- },
- this
- )
- .then(() => next(null))
- .catch(next)
- ],
- async (err, activity) => {
- if (err) {
- err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
- return reject(new Error(err));
- }
- return resolve(activity);
- }
- );
- });
- }
- /**
- * Merges activities about adding/removing songs from a playlist within a 5-minute period to prevent spam
- *
- * @param {object} payload - object that contains the payload
- * @param {string} payload.userId - the id of the user to check for duplicates
- * @param {object} payload.playlist - object that contains info about the relevant playlist
- * @param {string} payload.playlist.playlistId - the id of the playlist
- * @param {string} payload.playlist.displayName - the display name of the playlist
- * @param {string} payload.type - the type of activity to check for duplicates
- * @returns {Promise} - returns promise (reject, resolve)
- */
- async CHECK_FOR_ACTIVITY_SPAM_TO_MERGE(payload) {
- const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
- return new Promise((resolve, reject) => {
- async.waterfall(
- [
- // find all activities of this type from the last 5 minutes
- next => {
- activityModel
- .find(
- {
- userId: payload.userId,
- type: { $in: [payload.type, `${payload.type}s`] },
- hidden: false,
- createdAt: {
- $gte: new Date(new Date() - 5 * 60 * 1000)
- },
- "payload.playlistId": payload.playlist.playlistId
- },
- ["_id", "type", "payload.message"]
- )
- .sort({ createdAt: -1 })
- .exec(next);
- },
- // hide these activities, emit to socket listeners and count number of songs in each
- (activities, next) => {
- let howManySongs = 0; // how many songs added/removed
- activities.forEach(activity => {
- activityModel.updateOne({ _id: activity._id }, { $set: { hidden: true } }).catch(next);
- WSModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId }, this)
- .then(sockets =>
- sockets.forEach(socket =>
- socket.dispatch("event:activity.hidden", { data: { activityId: activity._id } })
- )
- )
- .catch(next);
- if (activity.type === payload.type) howManySongs += 1;
- else if (activity.type === `${payload.type}s`)
- howManySongs += parseInt(
- activity.payload.message.replace(
- /(?:Removed|Added)\s(?<songs>\d+)\ssongs.+/g,
- "$<songs>"
- )
- );
- });
- return next(null, howManySongs);
- },
- // // delete in cache the most recent activity to avoid issues when adding a new activity
- (howManySongs, next) => {
- CacheModule.runJob("HDEL", { table: "recentActivities", key: payload.userId }, this)
- .then(() => next(null, howManySongs))
- .catch(next);
- },
- // add a new activity that merges the activities together
- (howManySongs, next) => {
- const activity = {
- userId: payload.userId,
- type: "",
- payload: {
- message: "",
- playlistId: payload.playlist.playlistId
- }
- };
- if (payload.type === "playlist__remove_song" || payload.type === "playlist__remove_songs") {
- activity.payload.message = `Removed ${howManySongs} songs from playlist <playlistId>${payload.playlist.displayName}</playlistId>`;
- activity.type = "playlist__remove_songs";
- } else if (payload.type === "playlist__add_song" || payload.type === "playlist__add_songs") {
- activity.payload.message = `Added ${howManySongs} songs to playlist <playlistId>${payload.playlist.displayName}</playlistId>`;
- activity.type = "playlist__add_songs";
- }
- ActivitiesModule.runJob("ADD_ACTIVITY", activity, this)
- .then(() => next())
- .catch(next);
- }
- ],
- async err => {
- if (err) {
- err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
- return reject(new Error(err));
- }
- return resolve();
- }
- );
- });
- }
- /**
- * Removes any references to a station, playlist or song in activities
- *
- * @param {object} payload - object that contains the payload
- * @param {string} payload.type - type of reference. enum: ["youtubeId", "stationId", "playlistId", "playlistId"]
- * @param {string} payload.stationId - (optional) the id of a station
- * @param {string} payload.reportId - (optional) the id of a report
- * @param {string} payload.playlistId - (optional) the id of a playlist
- * @param {string} payload.youtubeId - (optional) the id of a song
- * @returns {Promise} - returns promise (reject, resolve)
- */
- async REMOVE_ACTIVITY_REFERENCES(payload) {
- const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
- return new Promise((resolve, reject) => {
- async.waterfall(
- [
- next => {
- if (
- (payload.type !== "youtubeId" &&
- payload.type !== "stationId" &&
- payload.type !== "reportId" &&
- payload.type !== "playlistId") ||
- !payload.type
- )
- return next("Please use a valid reference type.");
- if (!payload[payload.type]) return next(`Please provide a ${payload.type} in the job payload.`);
- return next();
- },
- // find all activities that include the reference
- next => {
- const query = {};
- query[`payload.${payload.type}`] = payload[payload.type];
- activityModel
- .find(query, ["_id", "userId", "payload.message"])
- .sort({ createdAt: -1 })
- .exec(next);
- },
- (activities, next) => {
- async.eachLimit(
- activities,
- 1,
- (activity, next) => {
- // remove the reference tags
- if (payload.youtubeId) {
- activity.payload.message = activity.payload.message.replace(
- /<youtubeId>(.*)<\/youtubeId>/g,
- "$1"
- );
- }
- if (payload.reportId) {
- activity.payload.message = activity.payload.message.replace(
- /<reportId>(.*)<\/reportId>/g,
- "$1"
- );
- }
- if (payload.playlistId) {
- activity.payload.message = activity.payload.message.replace(
- /<playlistId>(.*)<\/playlistId>/g,
- `$1`
- );
- }
- if (payload.stationId) {
- activity.payload.message = activity.payload.message.replace(
- /<stationId>(.*)<\/stationId>/g,
- `$1`
- );
- }
- activityModel
- .updateOne(
- { _id: activity._id },
- { $set: { "payload.message": activity.payload.message } }
- )
- .then(() => {
- WSModule.runJob("SOCKETS_FROM_USER", { userId: activity.userId })
- .then(sockets =>
- sockets.forEach(socket =>
- socket.dispatch("event:activity.updated", {
- data: {
- activityId: activity._id,
- message: activity.payload.message
- }
- })
- )
- )
- .catch(next);
- return next();
- })
- .catch(next);
- },
- err => next(err)
- );
- }
- ],
- async err => {
- if (err) {
- err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
- return reject(new Error(err));
- }
- return resolve();
- }
- );
- });
- }
- /**
- * Hides any activities of the same type within a 15-minute period to prevent spam
- *
- * @param {object} payload - object that contains the payload
- * @param {string} payload.userId - the id of the user to check for duplicates
- * @param {string} payload.type - the type of activity to check for duplicates
- * @returns {Promise} - returns promise (reject, resolve)
- */
- async CHECK_FOR_ACTIVITY_SPAM_TO_HIDE(payload) {
- const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
- return new Promise((resolve, reject) => {
- async.waterfall(
- [
- // find all activities of this type from the last 15 minutes
- next => {
- activityModel
- .find(
- {
- userId: payload.userId,
- type: payload.type,
- hidden: false,
- createdAt: {
- $gte: new Date(new Date() - 15 * 60 * 1000)
- }
- },
- "_id"
- )
- .sort({ createdAt: -1 })
- .skip(1)
- .exec(next);
- },
- // hide these activities and emit to socket listeners
- (activities, next) => {
- activities.forEach(activity => {
- activityModel.updateOne({ _id: activity._id }, { $set: { hidden: true } }).catch(next);
- WSModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId })
- .then(sockets => {
- sockets.forEach(socket =>
- socket.dispatch("event:activity.hidden", {
- data: { activityId: activity._id }
- })
- );
- })
- .catch(next);
- });
- return next();
- }
- ],
- async err => {
- if (err) {
- err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
- return reject(new Error(err));
- }
- return resolve();
- }
- );
- });
- }
- }
- export default new _ActivitiesModule();
|