activities.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let ActivitiesModule;
  4. let DBModule;
  5. let CacheModule;
  6. let UtilsModule;
  7. let WSModule;
  8. let PlaylistsModule;
  9. class _ActivitiesModule extends CoreClass {
  10. // eslint-disable-next-line require-jsdoc
  11. constructor() {
  12. super("activities");
  13. ActivitiesModule = this;
  14. }
  15. /**
  16. * Initialises the activities module
  17. *
  18. * @returns {Promise} - returns promise (reject, resolve)
  19. */
  20. initialize() {
  21. return new Promise(resolve => {
  22. DBModule = this.moduleManager.modules.db;
  23. CacheModule = this.moduleManager.modules.cache;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. WSModule = this.moduleManager.modules.ws;
  26. PlaylistsModule = this.moduleManager.modules.playlists;
  27. resolve();
  28. });
  29. }
  30. /**
  31. * Adds a new activity to the database
  32. *
  33. * @param {object} payload - object that contains the payload
  34. * @param {string} payload.userId - the id of the user who's activity is to be added
  35. * @param {string} payload.type - the type of activity (enum specified in schema)
  36. * @param {object} payload.payload - the details of the activity e.g. an array of songs that were added
  37. * @param {string} payload.payload.message - the main message describing the activity e.g. 50 songs added to playlist 'playlist name'
  38. * @param {string} payload.payload.thumbnail - url to a thumbnail e.g. song album art to be used when display an activity
  39. * @param {string} payload.payload.youtubeId - (optional) if relevant, the youtube id of the song related to the activity
  40. * @param {string} payload.payload.playlistId - (optional) if relevant, the id of the playlist related to the activity
  41. * @param {string} payload.payload.stationId - (optional) if relevant, the id of the station related to the activity
  42. * @returns {Promise} - returns promise (reject, resolve)
  43. */
  44. ADD_ACTIVITY(payload) {
  45. return new Promise((resolve, reject) => {
  46. async.waterfall(
  47. [
  48. next => {
  49. DBModule.runJob("GET_MODEL", { modelName: "activity" }, this)
  50. .then(res => next(null, res))
  51. .catch(next);
  52. },
  53. (ActivityModel, next) => {
  54. const { userId, type } = payload;
  55. const activity = new ActivityModel({
  56. userId,
  57. type,
  58. payload: payload.payload
  59. });
  60. activity.save(next);
  61. },
  62. (activity, next) => {
  63. WSModule.runJob("SOCKETS_FROM_USER", { userId: activity.userId }, this)
  64. .then(sockets => {
  65. sockets.forEach(socket =>
  66. socket.dispatch("event:activity.created", { data: { activity } })
  67. );
  68. next(null, activity);
  69. })
  70. .catch(next);
  71. },
  72. (activity, next) => {
  73. WSModule.runJob("EMIT_TO_ROOM", {
  74. room: `profile-${activity.userId}-activities`,
  75. args: ["event:activity.created", { data: { activity } }]
  76. });
  77. return next(null, activity);
  78. },
  79. (activity, next) => {
  80. const mergeableActivities = ["playlist__remove_song", "playlist__add_song"];
  81. const spammableActivities = [
  82. "user__toggle_nightmode",
  83. "user__toggle_autoskip_disliked_songs",
  84. "user__toggle_activity_watch",
  85. "song__like",
  86. "song__unlike",
  87. "song__dislike",
  88. "song__undislike"
  89. ];
  90. CacheModule.runJob("HGET", { table: "recentActivities", key: activity.userId })
  91. .then(recentActivity => {
  92. if (recentActivity) {
  93. const timeDifference = mins =>
  94. new Date() - new Date(recentActivity.createdAt) < mins * 60 * 1000;
  95. // if both activities have the same type, if within last 15 mins and if activity is within the spammableActivities array
  96. if (
  97. recentActivity.type === activity.type &&
  98. !!timeDifference(15) &&
  99. spammableActivities.includes(activity.type)
  100. )
  101. return ActivitiesModule.runJob(
  102. "CHECK_FOR_ACTIVITY_SPAM_TO_HIDE",
  103. { userId: activity.userId, type: activity.type },
  104. this
  105. )
  106. .then(() => next(null, activity))
  107. .catch(next);
  108. // if activity is within the mergeableActivities array, if both activities are about removing/adding and if within last 5 mins
  109. if (
  110. mergeableActivities.includes(activity.type) &&
  111. recentActivity.type === activity.type &&
  112. !!timeDifference(5)
  113. ) {
  114. return PlaylistsModule.runJob("GET_PLAYLIST", {
  115. playlistId: activity.payload.playlistId
  116. })
  117. .then(playlist =>
  118. ActivitiesModule.runJob(
  119. "CHECK_FOR_ACTIVITY_SPAM_TO_MERGE",
  120. {
  121. userId: activity.userId,
  122. type: activity.type,
  123. playlist: {
  124. playlistId: playlist._id,
  125. displayName: playlist.displayName
  126. }
  127. },
  128. this
  129. )
  130. .then(() => next(null, activity))
  131. .catch(next)
  132. )
  133. .catch(next);
  134. }
  135. return next(null, activity);
  136. }
  137. return next(null, activity);
  138. })
  139. .catch(next);
  140. },
  141. // store most recent activity in cache to be quickly accessible
  142. (activity, next) =>
  143. CacheModule.runJob(
  144. "HSET",
  145. {
  146. table: "recentActivities",
  147. key: activity.userId,
  148. value: { createdAt: activity.createdAt, type: activity.type }
  149. },
  150. this
  151. )
  152. .then(() => next(null))
  153. .catch(next)
  154. ],
  155. async (err, activity) => {
  156. if (err) {
  157. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  158. return reject(new Error(err));
  159. }
  160. return resolve(activity);
  161. }
  162. );
  163. });
  164. }
  165. /**
  166. * Merges activities about adding/removing songs from a playlist within a 5-minute period to prevent spam
  167. *
  168. * @param {object} payload - object that contains the payload
  169. * @param {string} payload.userId - the id of the user to check for duplicates
  170. * @param {object} payload.playlist - object that contains info about the relevant playlist
  171. * @param {string} payload.playlist.playlistId - the id of the playlist
  172. * @param {string} payload.playlist.displayName - the display name of the playlist
  173. * @param {string} payload.type - the type of activity to check for duplicates
  174. * @returns {Promise} - returns promise (reject, resolve)
  175. */
  176. async CHECK_FOR_ACTIVITY_SPAM_TO_MERGE(payload) {
  177. const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
  178. return new Promise((resolve, reject) => {
  179. async.waterfall(
  180. [
  181. // find all activities of this type from the last 5 minutes
  182. next => {
  183. activityModel
  184. .find(
  185. {
  186. userId: payload.userId,
  187. type: { $in: [payload.type, `${payload.type}s`] },
  188. hidden: false,
  189. createdAt: {
  190. $gte: new Date(new Date() - 5 * 60 * 1000)
  191. },
  192. "payload.playlistId": payload.playlist.playlistId
  193. },
  194. ["_id", "type", "payload.message"]
  195. )
  196. .sort({ createdAt: -1 })
  197. .exec(next);
  198. },
  199. // hide these activities, emit to socket listeners and count number of songs in each
  200. (activities, next) => {
  201. let howManySongs = 0; // how many songs added/removed
  202. activities.forEach(activity => {
  203. activityModel.updateOne({ _id: activity._id }, { $set: { hidden: true } }).catch(next);
  204. WSModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId }, this)
  205. .then(sockets =>
  206. sockets.forEach(socket =>
  207. socket.dispatch("event:activity.hidden", { data: { activityId: activity._id } })
  208. )
  209. )
  210. .catch(next);
  211. WSModule.runJob("EMIT_TO_ROOM", {
  212. room: `profile-${payload.userId}-activities`,
  213. args: ["event:activity.hidden", { data: { activityId: activity._id } }]
  214. });
  215. if (activity.type === payload.type) howManySongs += 1;
  216. else if (activity.type === `${payload.type}s`)
  217. howManySongs += parseInt(
  218. activity.payload.message.replace(
  219. /(?:Removed|Added)\s(?<songs>\d+)\ssongs.+/g,
  220. "$<songs>"
  221. )
  222. );
  223. });
  224. return next(null, howManySongs);
  225. },
  226. // // delete in cache the most recent activity to avoid issues when adding a new activity
  227. (howManySongs, next) => {
  228. CacheModule.runJob("HDEL", { table: "recentActivities", key: payload.userId }, this)
  229. .then(() => next(null, howManySongs))
  230. .catch(next);
  231. },
  232. // add a new activity that merges the activities together
  233. (howManySongs, next) => {
  234. const activity = {
  235. userId: payload.userId,
  236. type: "",
  237. payload: {
  238. message: "",
  239. playlistId: payload.playlist.playlistId
  240. }
  241. };
  242. if (payload.type === "playlist__remove_song" || payload.type === "playlist__remove_songs") {
  243. activity.payload.message = `Removed ${howManySongs} songs from playlist <playlistId>${payload.playlist.displayName}</playlistId>`;
  244. activity.type = "playlist__remove_songs";
  245. } else if (payload.type === "playlist__add_song" || payload.type === "playlist__add_songs") {
  246. activity.payload.message = `Added ${howManySongs} songs to playlist <playlistId>${payload.playlist.displayName}</playlistId>`;
  247. activity.type = "playlist__add_songs";
  248. }
  249. ActivitiesModule.runJob("ADD_ACTIVITY", activity, this)
  250. .then(() => next())
  251. .catch(next);
  252. }
  253. ],
  254. async err => {
  255. if (err) {
  256. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  257. return reject(new Error(err));
  258. }
  259. return resolve();
  260. }
  261. );
  262. });
  263. }
  264. /**
  265. * Removes any references to a station, playlist or song in activities
  266. *
  267. * @param {object} payload - object that contains the payload
  268. * @param {string} payload.type - type of reference. enum: ["youtubeId", "stationId", "playlistId"]
  269. * @param {string} payload.stationId - (optional) the id of a station
  270. * @param {string} payload.playlistId - (optional) the id of a playlist
  271. * @param {string} payload.youtubeId - (optional) the id of a song
  272. * @returns {Promise} - returns promise (reject, resolve)
  273. */
  274. async REMOVE_ACTIVITY_REFERENCES(payload) {
  275. const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
  276. return new Promise((resolve, reject) => {
  277. async.waterfall(
  278. [
  279. next => {
  280. if (
  281. (payload.type !== "youtubeId" &&
  282. payload.type !== "stationId" &&
  283. payload.type !== "playlistId") ||
  284. !payload.type
  285. )
  286. return next("Please use a valid reference type.");
  287. if (!payload[payload.type]) return next(`Please provide a ${payload.type} in the job payload.`);
  288. return next();
  289. },
  290. // find all activities that include the reference
  291. next => {
  292. const query = {};
  293. query[`payload.${payload.type}`] = payload[payload.type];
  294. activityModel
  295. .find(query, ["_id", "userId", "payload.message"])
  296. .sort({ createdAt: -1 })
  297. .exec(next);
  298. },
  299. (activities, next) => {
  300. async.eachLimit(
  301. activities,
  302. 1,
  303. (activity, next) => {
  304. // remove the reference tags
  305. if (payload.youtubeId) {
  306. activity.payload.message = activity.payload.message.replace(
  307. /<youtubeId>(.*)<\/youtubeId>/g,
  308. "$1"
  309. );
  310. }
  311. if (payload.playlistId) {
  312. activity.payload.message = activity.payload.message.replace(
  313. /<playlistId>(.*)<\/playlistId>/g,
  314. `$1`
  315. );
  316. }
  317. if (payload.stationId) {
  318. activity.payload.message = activity.payload.message.replace(
  319. /<stationId>(.*)<\/stationId>/g,
  320. `$1`
  321. );
  322. }
  323. activityModel
  324. .updateOne(
  325. { _id: activity._id },
  326. { $set: { "payload.message": activity.payload.message } }
  327. )
  328. .then(() => {
  329. WSModule.runJob("SOCKETS_FROM_USER", { userId: activity.userId })
  330. .then(sockets =>
  331. sockets.forEach(socket =>
  332. socket.dispatch("event:activity.updated", {
  333. data: {
  334. activityId: activity._id,
  335. message: activity.payload.message
  336. }
  337. })
  338. )
  339. )
  340. .catch(next);
  341. WSModule.runJob("EMIT_TO_ROOM", {
  342. room: `profile-${activity.userId}-activities`,
  343. args: [
  344. "event:activity.updated",
  345. {
  346. data: {
  347. activityId: activity._id,
  348. message: activity.payload.message
  349. }
  350. }
  351. ]
  352. });
  353. return next();
  354. })
  355. .catch(next);
  356. },
  357. err => next(err)
  358. );
  359. }
  360. ],
  361. async err => {
  362. if (err) {
  363. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  364. return reject(new Error(err));
  365. }
  366. return resolve();
  367. }
  368. );
  369. });
  370. }
  371. /**
  372. * Hides any activities of the same type within a 15-minute period to prevent spam
  373. *
  374. * @param {object} payload - object that contains the payload
  375. * @param {string} payload.userId - the id of the user to check for duplicates
  376. * @param {string} payload.type - the type of activity to check for duplicates
  377. * @returns {Promise} - returns promise (reject, resolve)
  378. */
  379. async CHECK_FOR_ACTIVITY_SPAM_TO_HIDE(payload) {
  380. const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
  381. return new Promise((resolve, reject) => {
  382. async.waterfall(
  383. [
  384. // find all activities of this type from the last 15 minutes
  385. next => {
  386. activityModel
  387. .find(
  388. {
  389. userId: payload.userId,
  390. type: payload.type,
  391. hidden: false,
  392. createdAt: {
  393. $gte: new Date(new Date() - 15 * 60 * 1000)
  394. }
  395. },
  396. "_id"
  397. )
  398. .sort({ createdAt: -1 })
  399. .skip(1)
  400. .exec(next);
  401. },
  402. // hide these activities and emit to socket listeners
  403. (activities, next) => {
  404. activities.forEach(activity => {
  405. activityModel.updateOne({ _id: activity._id }, { $set: { hidden: true } }).catch(next);
  406. WSModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId }, this)
  407. .then(sockets =>
  408. sockets.forEach(socket =>
  409. socket.dispatch("event:activity.hidden", { data: { activityId: activity._id } })
  410. )
  411. )
  412. .catch(next);
  413. WSModule.runJob("EMIT_TO_ROOM", {
  414. room: `profile-${payload.userId}-activities`,
  415. args: ["event:activity.hidden", { data: { activityId: activity._id } }]
  416. });
  417. });
  418. return next();
  419. }
  420. ],
  421. async err => {
  422. if (err) {
  423. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  424. return reject(new Error(err));
  425. }
  426. return resolve();
  427. }
  428. );
  429. });
  430. }
  431. }
  432. export default new _ActivitiesModule();