activities.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let ActivitiesModule;
  4. let DBModule;
  5. let CacheModule;
  6. let UtilsModule;
  7. let WSModule;
  8. let PlaylistsModule;
  9. class _ActivitiesModule extends CoreClass {
  10. // eslint-disable-next-line require-jsdoc
  11. constructor() {
  12. super("activities");
  13. ActivitiesModule = this;
  14. }
  15. /**
  16. * Initialises the activities module
  17. *
  18. * @returns {Promise} - returns promise (reject, resolve)
  19. */
  20. initialize() {
  21. return new Promise(resolve => {
  22. DBModule = this.moduleManager.modules.db;
  23. CacheModule = this.moduleManager.modules.cache;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. WSModule = this.moduleManager.modules.ws;
  26. PlaylistsModule = this.moduleManager.modules.playlists;
  27. resolve();
  28. });
  29. }
  30. // TODO: Migrate
  31. /**
  32. * Adds a new activity to the database
  33. *
  34. * @param {object} payload - object that contains the payload
  35. * @param {string} payload.userId - the id of the user who's activity is to be added
  36. * @param {string} payload.type - the type of activity (enum specified in schema)
  37. * @param {object} payload.payload - the details of the activity e.g. an array of songs that were added
  38. * @param {string} payload.payload.message - the main message describing the activity e.g. 50 songs added to playlist 'playlist name'
  39. * @param {string} payload.payload.thumbnail - url to a thumbnail e.g. song album art to be used when display an activity
  40. * @param {string} payload.payload.youtubeId - (optional) if relevant, the youtube id of the song related to the activity
  41. * @param {string} payload.payload.playlistId - (optional) if relevant, the id of the playlist related to the activity
  42. * @param {string} payload.payload.stationId - (optional) if relevant, the id of the station related to the activity
  43. * @returns {Promise} - returns promise (reject, resolve)
  44. */
  45. ADD_ACTIVITY(payload) {
  46. return new Promise((resolve, reject) => {
  47. async.waterfall(
  48. [
  49. next => {
  50. DBModule.runJob("GET_MODEL", { modelName: "activity" }, this)
  51. .then(res => next(null, res))
  52. .catch(next);
  53. },
  54. (ActivityModel, next) => {
  55. const { userId, type } = payload;
  56. const activity = new ActivityModel({
  57. userId,
  58. type,
  59. payload: payload.payload
  60. });
  61. activity.save(next);
  62. },
  63. (activity, next) => {
  64. WSModule.runJob("SOCKETS_FROM_USER", { userId: activity.userId }, this)
  65. .then(sockets => {
  66. sockets.forEach(socket => socket.dispatch("event:activity.create", activity));
  67. next(null, activity);
  68. })
  69. .catch(next);
  70. },
  71. (activity, next) => {
  72. WSModule.runJob("EMIT_TO_ROOM", {
  73. room: `profile-${activity.userId}-activities`,
  74. args: ["event:activity.create", activity]
  75. });
  76. return next(null, activity);
  77. },
  78. (activity, next) => {
  79. const mergeableActivities = ["playlist__remove_song", "playlist__add_song"];
  80. const spammableActivities = [
  81. "user__toggle_nightmode",
  82. "user__toggle_autoskip_disliked_songs",
  83. "user__toggle_activity_watch",
  84. "song__like",
  85. "song__unlike",
  86. "song__dislike",
  87. "song__undislike"
  88. ];
  89. CacheModule.runJob("HGET", { table: "recentActivities", key: activity.userId })
  90. .then(recentActivity => {
  91. if (recentActivity) {
  92. const timeDifference = mins =>
  93. new Date() - new Date(recentActivity.createdAt) < mins * 60 * 1000;
  94. // if both activities have the same type, if within last 15 mins and if activity is within the spammableActivities array
  95. if (
  96. recentActivity.type === activity.type &&
  97. !!timeDifference(15) &&
  98. spammableActivities.includes(activity.type)
  99. )
  100. return ActivitiesModule.runJob(
  101. "CHECK_FOR_ACTIVITY_SPAM_TO_HIDE",
  102. { userId: activity.userId, type: activity.type },
  103. this
  104. )
  105. .then(() => next(null, activity))
  106. .catch(next);
  107. // if activity is within the mergeableActivities array, if both activities are about removing/adding and if within last 5 mins
  108. if (
  109. mergeableActivities.includes(activity.type) &&
  110. recentActivity.type === activity.type &&
  111. !!timeDifference(5)
  112. ) {
  113. return PlaylistsModule.runJob("GET_PLAYLIST", {
  114. playlistId: activity.payload.playlistId
  115. })
  116. .then(playlist =>
  117. ActivitiesModule.runJob(
  118. "CHECK_FOR_ACTIVITY_SPAM_TO_MERGE",
  119. {
  120. userId: activity.userId,
  121. type: activity.type,
  122. playlist: {
  123. playlistId: playlist._id,
  124. displayName: playlist.displayName
  125. }
  126. },
  127. this
  128. )
  129. .then(() => next(null, activity))
  130. .catch(next)
  131. )
  132. .catch(next);
  133. }
  134. return next(null, activity);
  135. }
  136. return next(null, activity);
  137. })
  138. .catch(next);
  139. },
  140. // store most recent activity in cache to be quickly accessible
  141. (activity, next) =>
  142. CacheModule.runJob(
  143. "HSET",
  144. {
  145. table: "recentActivities",
  146. key: activity.userId,
  147. value: { createdAt: activity.createdAt, type: activity.type }
  148. },
  149. this
  150. )
  151. .then(() => next(null))
  152. .catch(next)
  153. ],
  154. async (err, activity) => {
  155. if (err) {
  156. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  157. return reject(new Error(err));
  158. }
  159. return resolve(activity);
  160. }
  161. );
  162. });
  163. }
  164. /**
  165. * Merges activities about adding/removing songs from a playlist within a 5-minute period to prevent spam
  166. *
  167. * @param {object} payload - object that contains the payload
  168. * @param {string} payload.userId - the id of the user to check for duplicates
  169. * @param {object} payload.playlist - object that contains info about the relevant playlist
  170. * @param {string} payload.playlist.playlistId - the id of the playlist
  171. * @param {string} payload.playlist.displayName - the display name of the playlist
  172. * @param {string} payload.type - the type of activity to check for duplicates
  173. * @returns {Promise} - returns promise (reject, resolve)
  174. */
  175. async CHECK_FOR_ACTIVITY_SPAM_TO_MERGE(payload) {
  176. const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
  177. return new Promise((resolve, reject) => {
  178. async.waterfall(
  179. [
  180. // find all activities of this type from the last 5 minutes
  181. next => {
  182. activityModel
  183. .find(
  184. {
  185. userId: payload.userId,
  186. type: { $in: [payload.type, `${payload.type}s`] },
  187. hidden: false,
  188. createdAt: {
  189. $gte: new Date(new Date() - 5 * 60 * 1000)
  190. },
  191. "payload.playlistId": payload.playlist.playlistId
  192. },
  193. ["_id", "type", "payload.message"]
  194. )
  195. .sort({ createdAt: -1 })
  196. .exec(next);
  197. },
  198. // hide these activities, emit to socket listeners and count number of songs in each
  199. (activities, next) => {
  200. let howManySongs = 0; // how many songs added/removed
  201. activities.forEach(activity => {
  202. activityModel.updateOne({ _id: activity._id }, { $set: { hidden: true } }).catch(next);
  203. WSModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId }, this)
  204. .then(sockets =>
  205. sockets.forEach(socket => socket.dispatch("event:activity.hide", activity._id))
  206. )
  207. .catch(next);
  208. WSModule.runJob("EMIT_TO_ROOM", {
  209. room: `profile-${payload.userId}-activities`,
  210. args: ["event:activity.hide", activity._id]
  211. });
  212. if (activity.type === payload.type) howManySongs += 1;
  213. else if (activity.type === `${payload.type}s`)
  214. howManySongs += parseInt(
  215. activity.payload.message.replace(
  216. /(?:Removed|Added)\s(?<songs>\d+)\ssongs.+/g,
  217. "$<songs>"
  218. )
  219. );
  220. });
  221. return next(null, howManySongs);
  222. },
  223. // // delete in cache the most recent activity to avoid issues when adding a new activity
  224. (howManySongs, next) => {
  225. CacheModule.runJob("HDEL", { table: "recentActivities", key: payload.userId }, this)
  226. .then(() => next(null, howManySongs))
  227. .catch(next);
  228. },
  229. // add a new activity that merges the activities together
  230. (howManySongs, next) => {
  231. const activity = {
  232. userId: payload.userId,
  233. type: "",
  234. payload: {
  235. message: "",
  236. playlistId: payload.playlist.playlistId
  237. }
  238. };
  239. if (payload.type === "playlist__remove_song" || payload.type === "playlist__remove_songs") {
  240. activity.payload.message = `Removed ${howManySongs} songs from playlist <playlistId>${payload.playlist.displayName}</playlistId>`;
  241. activity.type = "playlist__remove_songs";
  242. } else if (payload.type === "playlist__add_song" || payload.type === "playlist__add_songs") {
  243. activity.payload.message = `Added ${howManySongs} songs to playlist <playlistId>${payload.playlist.displayName}</playlistId>`;
  244. activity.type = "playlist__add_songs";
  245. }
  246. ActivitiesModule.runJob("ADD_ACTIVITY", activity, this)
  247. .then(() => next())
  248. .catch(next);
  249. }
  250. ],
  251. async err => {
  252. if (err) {
  253. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  254. return reject(new Error(err));
  255. }
  256. return resolve();
  257. }
  258. );
  259. });
  260. }
  261. /**
  262. * Hides any activities of the same type within a 15-minute period to prevent spam
  263. *
  264. * @param {object} payload - object that contains the payload
  265. * @param {string} payload.userId - the id of the user to check for duplicates
  266. * @param {string} payload.type - the type of activity to check for duplicates
  267. * @returns {Promise} - returns promise (reject, resolve)
  268. */
  269. async CHECK_FOR_ACTIVITY_SPAM_TO_HIDE(payload) {
  270. const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
  271. return new Promise((resolve, reject) => {
  272. async.waterfall(
  273. [
  274. // find all activities of this type from the last 15 minutes
  275. next => {
  276. activityModel
  277. .find(
  278. {
  279. userId: payload.userId,
  280. type: payload.type,
  281. hidden: false,
  282. createdAt: {
  283. $gte: new Date(new Date() - 15 * 60 * 1000)
  284. }
  285. },
  286. "_id"
  287. )
  288. .sort({ createdAt: -1 })
  289. .skip(1)
  290. .exec(next);
  291. },
  292. // hide these activities and emit to socket listeners
  293. (activities, next) => {
  294. activities.forEach(activity => {
  295. activityModel.updateOne({ _id: activity._id }, { $set: { hidden: true } }).catch(next);
  296. WSModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId }, this)
  297. .then(sockets =>
  298. sockets.forEach(socket => socket.dispatch("event:activity.hide", activity._id))
  299. )
  300. .catch(next);
  301. WSModule.runJob("EMIT_TO_ROOM", {
  302. room: `profile-${payload.userId}-activities`,
  303. args: ["event:activity.hide", activity._id]
  304. });
  305. });
  306. return next();
  307. }
  308. ],
  309. async err => {
  310. if (err) {
  311. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  312. return reject(new Error(err));
  313. }
  314. return resolve();
  315. }
  316. );
  317. });
  318. }
  319. }
  320. export default new _ActivitiesModule();