activities.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let ActivitiesModule;
  4. let DBModule;
  5. let CacheModule;
  6. let UtilsModule;
  7. let WSModule;
  8. let PlaylistsModule;
  9. class _ActivitiesModule extends CoreClass {
  10. // eslint-disable-next-line require-jsdoc
  11. constructor() {
  12. super("activities");
  13. ActivitiesModule = this;
  14. }
  15. /**
  16. * Initialises the activities module
  17. *
  18. * @returns {Promise} - returns promise (reject, resolve)
  19. */
  20. initialize() {
  21. return new Promise(resolve => {
  22. DBModule = this.moduleManager.modules.db;
  23. CacheModule = this.moduleManager.modules.cache;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. WSModule = this.moduleManager.modules.ws;
  26. PlaylistsModule = this.moduleManager.modules.playlists;
  27. resolve();
  28. });
  29. }
  30. /**
  31. * Adds a new activity to the database
  32. *
  33. * @param {object} payload - object that contains the payload
  34. * @param {string} payload.userId - the id of the user who's activity is to be added
  35. * @param {string} payload.type - the type of activity (enum specified in schema)
  36. * @param {object} payload.payload - the details of the activity e.g. an array of songs that were added
  37. * @param {string} payload.payload.message - the main message describing the activity e.g. 50 songs added to playlist 'playlist name'
  38. * @param {string} payload.payload.thumbnail - url to a thumbnail e.g. song album art to be used when display an activity
  39. * @param {string} payload.payload.youtubeId - (optional) if relevant, the youtube id of the song related to the activity
  40. * @param {string} payload.payload.reportId - (optional) if relevant, the id of the report related to the activity
  41. * @param {string} payload.payload.playlistId - (optional) if relevant, the id of the playlist related to the activity
  42. * @param {string} payload.payload.stationId - (optional) if relevant, the id of the station related to the activity
  43. * @returns {Promise} - returns promise (reject, resolve)
  44. */
  45. ADD_ACTIVITY(payload) {
  46. return new Promise((resolve, reject) => {
  47. async.waterfall(
  48. [
  49. next => {
  50. DBModule.runJob("GET_MODEL", { modelName: "activity" }, this)
  51. .then(res => next(null, res))
  52. .catch(next);
  53. },
  54. (ActivityModel, next) => {
  55. const { userId, type } = payload;
  56. const activity = new ActivityModel({
  57. userId,
  58. type,
  59. payload: payload.payload
  60. });
  61. activity.save(next);
  62. },
  63. (activity, next) => {
  64. WSModule.runJob("SOCKETS_FROM_USER", { userId: activity.userId }, this)
  65. .then(sockets => {
  66. sockets.forEach(socket =>
  67. socket.dispatch("event:activity.created", { data: { activity } })
  68. );
  69. next(null, activity);
  70. })
  71. .catch(next);
  72. },
  73. (activity, next) => {
  74. WSModule.runJob("EMIT_TO_ROOM", {
  75. room: `profile-${activity.userId}-activities`,
  76. args: ["event:activity.created", { data: { activity } }]
  77. });
  78. return next(null, activity);
  79. },
  80. (activity, next) => {
  81. const mergeableActivities = ["playlist__remove_song", "playlist__add_song"];
  82. const spammableActivities = [
  83. "user__toggle_nightmode",
  84. "user__toggle_autoskip_disliked_songs",
  85. "user__toggle_activity_watch",
  86. "song__like",
  87. "song__unlike",
  88. "song__dislike",
  89. "song__undislike"
  90. ];
  91. CacheModule.runJob("HGET", { table: "recentActivities", key: activity.userId })
  92. .then(recentActivity => {
  93. if (recentActivity) {
  94. const timeDifference = mins =>
  95. new Date() - new Date(recentActivity.createdAt) < mins * 60 * 1000;
  96. // if both activities have the same type, if within last 15 mins and if activity is within the spammableActivities array
  97. if (
  98. recentActivity.type === activity.type &&
  99. !!timeDifference(15) &&
  100. spammableActivities.includes(activity.type)
  101. )
  102. return ActivitiesModule.runJob(
  103. "CHECK_FOR_ACTIVITY_SPAM_TO_HIDE",
  104. { userId: activity.userId, type: activity.type },
  105. this
  106. )
  107. .then(() => next(null, activity))
  108. .catch(next);
  109. // if activity is within the mergeableActivities array, if both activities are about removing/adding and if within last 5 mins
  110. if (
  111. mergeableActivities.includes(activity.type) &&
  112. recentActivity.type === activity.type &&
  113. !!timeDifference(5)
  114. ) {
  115. return PlaylistsModule.runJob("GET_PLAYLIST", {
  116. playlistId: activity.payload.playlistId
  117. })
  118. .then(playlist =>
  119. ActivitiesModule.runJob(
  120. "CHECK_FOR_ACTIVITY_SPAM_TO_MERGE",
  121. {
  122. userId: activity.userId,
  123. type: activity.type,
  124. playlist: {
  125. playlistId: playlist._id,
  126. displayName: playlist.displayName
  127. }
  128. },
  129. this
  130. )
  131. .then(() => next(null, activity))
  132. .catch(next)
  133. )
  134. .catch(next);
  135. }
  136. return next(null, activity);
  137. }
  138. return next(null, activity);
  139. })
  140. .catch(next);
  141. },
  142. // store most recent activity in cache to be quickly accessible
  143. (activity, next) =>
  144. CacheModule.runJob(
  145. "HSET",
  146. {
  147. table: "recentActivities",
  148. key: activity.userId,
  149. value: { createdAt: activity.createdAt, type: activity.type }
  150. },
  151. this
  152. )
  153. .then(() => next(null))
  154. .catch(next)
  155. ],
  156. async (err, activity) => {
  157. if (err) {
  158. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  159. return reject(new Error(err));
  160. }
  161. return resolve(activity);
  162. }
  163. );
  164. });
  165. }
  166. /**
  167. * Merges activities about adding/removing songs from a playlist within a 5-minute period to prevent spam
  168. *
  169. * @param {object} payload - object that contains the payload
  170. * @param {string} payload.userId - the id of the user to check for duplicates
  171. * @param {object} payload.playlist - object that contains info about the relevant playlist
  172. * @param {string} payload.playlist.playlistId - the id of the playlist
  173. * @param {string} payload.playlist.displayName - the display name of the playlist
  174. * @param {string} payload.type - the type of activity to check for duplicates
  175. * @returns {Promise} - returns promise (reject, resolve)
  176. */
  177. async CHECK_FOR_ACTIVITY_SPAM_TO_MERGE(payload) {
  178. const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
  179. return new Promise((resolve, reject) => {
  180. async.waterfall(
  181. [
  182. // find all activities of this type from the last 5 minutes
  183. next => {
  184. activityModel
  185. .find(
  186. {
  187. userId: payload.userId,
  188. type: { $in: [payload.type, `${payload.type}s`] },
  189. hidden: false,
  190. createdAt: {
  191. $gte: new Date(new Date() - 5 * 60 * 1000)
  192. },
  193. "payload.playlistId": payload.playlist.playlistId
  194. },
  195. ["_id", "type", "payload.message"]
  196. )
  197. .sort({ createdAt: -1 })
  198. .exec(next);
  199. },
  200. // hide these activities, emit to socket listeners and count number of songs in each
  201. (activities, next) => {
  202. let howManySongs = 0; // how many songs added/removed
  203. activities.forEach(activity => {
  204. activityModel.updateOne({ _id: activity._id }, { $set: { hidden: true } }).catch(next);
  205. WSModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId }, this)
  206. .then(sockets =>
  207. sockets.forEach(socket =>
  208. socket.dispatch("event:activity.hidden", { data: { activityId: activity._id } })
  209. )
  210. )
  211. .catch(next);
  212. WSModule.runJob("EMIT_TO_ROOM", {
  213. room: `profile-${payload.userId}-activities`,
  214. args: ["event:activity.hidden", { data: { activityId: activity._id } }]
  215. });
  216. if (activity.type === payload.type) howManySongs += 1;
  217. else if (activity.type === `${payload.type}s`)
  218. howManySongs += parseInt(
  219. activity.payload.message.replace(
  220. /(?:Removed|Added)\s(?<songs>\d+)\ssongs.+/g,
  221. "$<songs>"
  222. )
  223. );
  224. });
  225. return next(null, howManySongs);
  226. },
  227. // // delete in cache the most recent activity to avoid issues when adding a new activity
  228. (howManySongs, next) => {
  229. CacheModule.runJob("HDEL", { table: "recentActivities", key: payload.userId }, this)
  230. .then(() => next(null, howManySongs))
  231. .catch(next);
  232. },
  233. // add a new activity that merges the activities together
  234. (howManySongs, next) => {
  235. const activity = {
  236. userId: payload.userId,
  237. type: "",
  238. payload: {
  239. message: "",
  240. playlistId: payload.playlist.playlistId
  241. }
  242. };
  243. if (payload.type === "playlist__remove_song" || payload.type === "playlist__remove_songs") {
  244. activity.payload.message = `Removed ${howManySongs} songs from playlist <playlistId>${payload.playlist.displayName}</playlistId>`;
  245. activity.type = "playlist__remove_songs";
  246. } else if (payload.type === "playlist__add_song" || payload.type === "playlist__add_songs") {
  247. activity.payload.message = `Added ${howManySongs} songs to playlist <playlistId>${payload.playlist.displayName}</playlistId>`;
  248. activity.type = "playlist__add_songs";
  249. }
  250. ActivitiesModule.runJob("ADD_ACTIVITY", activity, this)
  251. .then(() => next())
  252. .catch(next);
  253. }
  254. ],
  255. async err => {
  256. if (err) {
  257. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  258. return reject(new Error(err));
  259. }
  260. return resolve();
  261. }
  262. );
  263. });
  264. }
  265. /**
  266. * Removes any references to a station, playlist or song in activities
  267. *
  268. * @param {object} payload - object that contains the payload
  269. * @param {string} payload.type - type of reference. enum: ["youtubeId", "stationId", "playlistId", "playlistId"]
  270. * @param {string} payload.stationId - (optional) the id of a station
  271. * @param {string} payload.reportId - (optional) the id of a report
  272. * @param {string} payload.playlistId - (optional) the id of a playlist
  273. * @param {string} payload.youtubeId - (optional) the id of a song
  274. * @returns {Promise} - returns promise (reject, resolve)
  275. */
  276. async REMOVE_ACTIVITY_REFERENCES(payload) {
  277. const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
  278. return new Promise((resolve, reject) => {
  279. async.waterfall(
  280. [
  281. next => {
  282. if (
  283. (payload.type !== "youtubeId" &&
  284. payload.type !== "stationId" &&
  285. payload.type !== "reportId" &&
  286. payload.type !== "playlistId") ||
  287. !payload.type
  288. )
  289. return next("Please use a valid reference type.");
  290. if (!payload[payload.type]) return next(`Please provide a ${payload.type} in the job payload.`);
  291. return next();
  292. },
  293. // find all activities that include the reference
  294. next => {
  295. const query = {};
  296. query[`payload.${payload.type}`] = payload[payload.type];
  297. activityModel
  298. .find(query, ["_id", "userId", "payload.message"])
  299. .sort({ createdAt: -1 })
  300. .exec(next);
  301. },
  302. (activities, next) => {
  303. async.eachLimit(
  304. activities,
  305. 1,
  306. (activity, next) => {
  307. // remove the reference tags
  308. if (payload.youtubeId) {
  309. activity.payload.message = activity.payload.message.replace(
  310. /<youtubeId>(.*)<\/youtubeId>/g,
  311. "$1"
  312. );
  313. }
  314. if (payload.reportId) {
  315. activity.payload.message = activity.payload.message.replace(
  316. /<reportId>(.*)<\/reportId>/g,
  317. "$1"
  318. );
  319. }
  320. if (payload.playlistId) {
  321. activity.payload.message = activity.payload.message.replace(
  322. /<playlistId>(.*)<\/playlistId>/g,
  323. `$1`
  324. );
  325. }
  326. if (payload.stationId) {
  327. activity.payload.message = activity.payload.message.replace(
  328. /<stationId>(.*)<\/stationId>/g,
  329. `$1`
  330. );
  331. }
  332. activityModel
  333. .updateOne(
  334. { _id: activity._id },
  335. { $set: { "payload.message": activity.payload.message } }
  336. )
  337. .then(() => {
  338. WSModule.runJob("SOCKETS_FROM_USER", { userId: activity.userId })
  339. .then(sockets =>
  340. sockets.forEach(socket =>
  341. socket.dispatch("event:activity.updated", {
  342. data: {
  343. activityId: activity._id,
  344. message: activity.payload.message
  345. }
  346. })
  347. )
  348. )
  349. .catch(next);
  350. WSModule.runJob("EMIT_TO_ROOM", {
  351. room: `profile-${activity.userId}-activities`,
  352. args: [
  353. "event:activity.updated",
  354. {
  355. data: {
  356. activityId: activity._id,
  357. message: activity.payload.message
  358. }
  359. }
  360. ]
  361. });
  362. return next();
  363. })
  364. .catch(next);
  365. },
  366. err => next(err)
  367. );
  368. }
  369. ],
  370. async err => {
  371. if (err) {
  372. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  373. return reject(new Error(err));
  374. }
  375. return resolve();
  376. }
  377. );
  378. });
  379. }
  380. /**
  381. * Hides any activities of the same type within a 15-minute period to prevent spam
  382. *
  383. * @param {object} payload - object that contains the payload
  384. * @param {string} payload.userId - the id of the user to check for duplicates
  385. * @param {string} payload.type - the type of activity to check for duplicates
  386. * @returns {Promise} - returns promise (reject, resolve)
  387. */
  388. async CHECK_FOR_ACTIVITY_SPAM_TO_HIDE(payload) {
  389. const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
  390. return new Promise((resolve, reject) => {
  391. async.waterfall(
  392. [
  393. // find all activities of this type from the last 15 minutes
  394. next => {
  395. activityModel
  396. .find(
  397. {
  398. userId: payload.userId,
  399. type: payload.type,
  400. hidden: false,
  401. createdAt: {
  402. $gte: new Date(new Date() - 15 * 60 * 1000)
  403. }
  404. },
  405. "_id"
  406. )
  407. .sort({ createdAt: -1 })
  408. .skip(1)
  409. .exec(next);
  410. },
  411. // hide these activities and emit to socket listeners
  412. (activities, next) => {
  413. activities.forEach(activity => {
  414. activityModel.updateOne({ _id: activity._id }, { $set: { hidden: true } }).catch(next);
  415. WSModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId }, this)
  416. .then(sockets =>
  417. sockets.forEach(socket =>
  418. socket.dispatch("event:activity.hidden", { data: { activityId: activity._id } })
  419. )
  420. )
  421. .catch(next);
  422. WSModule.runJob("EMIT_TO_ROOM", {
  423. room: `profile-${payload.userId}-activities`,
  424. args: ["event:activity.hidden", { data: { activityId: activity._id } }]
  425. });
  426. });
  427. return next();
  428. }
  429. ],
  430. async err => {
  431. if (err) {
  432. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  433. return reject(new Error(err));
  434. }
  435. return resolve();
  436. }
  437. );
  438. });
  439. }
  440. }
  441. export default new _ActivitiesModule();