activities.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let ActivitiesModule;
  4. let DBModule;
  5. let CacheModule;
  6. let UtilsModule;
  7. let WSModule;
  8. let PlaylistsModule;
  9. class _ActivitiesModule extends CoreClass {
  10. // eslint-disable-next-line require-jsdoc
  11. constructor() {
  12. super("activities");
  13. ActivitiesModule = this;
  14. }
  15. /**
  16. * Initialises the activities module
  17. *
  18. * @returns {Promise} - returns promise (reject, resolve)
  19. */
  20. initialize() {
  21. return new Promise(resolve => {
  22. DBModule = this.moduleManager.modules.db;
  23. CacheModule = this.moduleManager.modules.cache;
  24. UtilsModule = this.moduleManager.modules.utils;
  25. WSModule = this.moduleManager.modules.ws;
  26. PlaylistsModule = this.moduleManager.modules.playlists;
  27. resolve();
  28. });
  29. }
  30. /**
  31. * Adds a new activity to the database
  32. *
  33. * @param {object} payload - object that contains the payload
  34. * @param {string} payload.userId - the id of the user who's activity is to be added
  35. * @param {string} payload.type - the type of activity (enum specified in schema)
  36. * @param {object} payload.payload - the details of the activity e.g. an array of songs that were added
  37. * @param {string} payload.payload.message - the main message describing the activity e.g. 50 songs added to playlist 'playlist name'
  38. * @param {string} payload.payload.thumbnail - url to a thumbnail e.g. song album art to be used when display an activity
  39. * @param {string} payload.payload.mediaSource - (optional) if relevant, the media source of the song related to the activity
  40. * @param {string} payload.payload.reportId - (optional) if relevant, the id of the report related to the activity
  41. * @param {string} payload.payload.playlistId - (optional) if relevant, the id of the playlist related to the activity
  42. * @param {string} payload.payload.stationId - (optional) if relevant, the id of the station related to the activity
  43. * @returns {Promise} - returns promise (reject, resolve)
  44. */
  45. ADD_ACTIVITY(payload) {
  46. return new Promise((resolve, reject) => {
  47. async.waterfall(
  48. [
  49. next => {
  50. DBModule.runJob("GET_MODEL", { modelName: "activity" }, this)
  51. .then(res => next(null, res))
  52. .catch(next);
  53. },
  54. (ActivityModel, next) => {
  55. const { userId, type } = payload;
  56. const activity = new ActivityModel({
  57. userId,
  58. type,
  59. payload: payload.payload
  60. });
  61. activity.save(next);
  62. },
  63. (activity, next) => {
  64. WSModule.runJob("SOCKETS_FROM_USER", { userId: activity.userId }, this)
  65. .then(sockets => {
  66. sockets.forEach(socket =>
  67. socket.dispatch("event:activity.created", { data: { activity } })
  68. );
  69. next(null, activity);
  70. })
  71. .catch(next);
  72. },
  73. (activity, next) => {
  74. const mergeableActivities = ["playlist__remove_song", "playlist__add_song"];
  75. const spammableActivities = [
  76. "user__toggle_nightmode",
  77. "user__toggle_autoskip_disliked_songs",
  78. "user__toggle_activity_watch",
  79. "song__like",
  80. "song__unlike",
  81. "song__dislike",
  82. "song__undislike"
  83. ];
  84. CacheModule.runJob("HGET", { table: "recentActivities", key: activity.userId })
  85. .then(recentActivity => {
  86. if (recentActivity) {
  87. const timeDifference = mins =>
  88. new Date() - new Date(recentActivity.createdAt) < mins * 60 * 1000;
  89. // if both activities have the same type, if within last 15 mins and if activity is within the spammableActivities array
  90. if (
  91. recentActivity.type === activity.type &&
  92. !!timeDifference(15) &&
  93. spammableActivities.includes(activity.type)
  94. )
  95. return ActivitiesModule.runJob(
  96. "CHECK_FOR_ACTIVITY_SPAM_TO_HIDE",
  97. { userId: activity.userId, type: activity.type },
  98. this
  99. )
  100. .then(() => next(null, activity))
  101. .catch(next);
  102. // if activity is within the mergeableActivities array, if both activities are about removing/adding and if within last 5 mins
  103. if (
  104. mergeableActivities.includes(activity.type) &&
  105. recentActivity.type === activity.type &&
  106. !!timeDifference(5)
  107. ) {
  108. return PlaylistsModule.runJob("GET_PLAYLIST", {
  109. playlistId: activity.payload.playlistId
  110. })
  111. .then(playlist =>
  112. ActivitiesModule.runJob(
  113. "CHECK_FOR_ACTIVITY_SPAM_TO_MERGE",
  114. {
  115. userId: activity.userId,
  116. type: activity.type,
  117. playlist: {
  118. playlistId: playlist._id,
  119. displayName: playlist.displayName
  120. }
  121. },
  122. this
  123. )
  124. .then(() => next(null, activity))
  125. .catch(next)
  126. )
  127. .catch(next);
  128. }
  129. return next(null, activity);
  130. }
  131. return next(null, activity);
  132. })
  133. .catch(next);
  134. },
  135. // store most recent activity in cache to be quickly accessible
  136. (activity, next) =>
  137. CacheModule.runJob(
  138. "HSET",
  139. {
  140. table: "recentActivities",
  141. key: activity.userId,
  142. value: { createdAt: activity.createdAt, type: activity.type }
  143. },
  144. this
  145. )
  146. .then(() => next(null))
  147. .catch(next)
  148. ],
  149. async (err, activity) => {
  150. if (err) {
  151. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  152. return reject(new Error(err));
  153. }
  154. return resolve(activity);
  155. }
  156. );
  157. });
  158. }
  159. /**
  160. * Merges activities about adding/removing songs from a playlist within a 5-minute period to prevent spam
  161. *
  162. * @param {object} payload - object that contains the payload
  163. * @param {string} payload.userId - the id of the user to check for duplicates
  164. * @param {object} payload.playlist - object that contains info about the relevant playlist
  165. * @param {string} payload.playlist.playlistId - the id of the playlist
  166. * @param {string} payload.playlist.displayName - the display name of the playlist
  167. * @param {string} payload.type - the type of activity to check for duplicates
  168. * @returns {Promise} - returns promise (reject, resolve)
  169. */
  170. async CHECK_FOR_ACTIVITY_SPAM_TO_MERGE(payload) {
  171. const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
  172. return new Promise((resolve, reject) => {
  173. async.waterfall(
  174. [
  175. // find all activities of this type from the last 5 minutes
  176. next => {
  177. activityModel
  178. .find(
  179. {
  180. userId: payload.userId,
  181. type: { $in: [payload.type, `${payload.type}s`] },
  182. hidden: false,
  183. createdAt: {
  184. $gte: new Date(new Date() - 5 * 60 * 1000)
  185. },
  186. "payload.playlistId": payload.playlist.playlistId
  187. },
  188. ["_id", "type", "payload.message"]
  189. )
  190. .sort({ createdAt: -1 })
  191. .exec(next);
  192. },
  193. // hide these activities, emit to socket listeners and count number of songs in each
  194. (activities, next) => {
  195. let howManySongs = 0; // how many songs added/removed
  196. activities.forEach(activity => {
  197. activityModel.updateOne({ _id: activity._id }, { $set: { hidden: true } }).catch(next);
  198. WSModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId }, this)
  199. .then(sockets =>
  200. sockets.forEach(socket =>
  201. socket.dispatch("event:activity.hidden", { data: { activityId: activity._id } })
  202. )
  203. )
  204. .catch(next);
  205. if (activity.type === payload.type) howManySongs += 1;
  206. else if (activity.type === `${payload.type}s`)
  207. howManySongs += parseInt(
  208. activity.payload.message.replace(
  209. /(?:Removed|Added)\s(?<songs>\d+)\ssongs.+/g,
  210. "$<songs>"
  211. )
  212. );
  213. });
  214. return next(null, howManySongs);
  215. },
  216. // // delete in cache the most recent activity to avoid issues when adding a new activity
  217. (howManySongs, next) => {
  218. CacheModule.runJob("HDEL", { table: "recentActivities", key: payload.userId }, this)
  219. .then(() => next(null, howManySongs))
  220. .catch(next);
  221. },
  222. // add a new activity that merges the activities together
  223. (howManySongs, next) => {
  224. const activity = {
  225. userId: payload.userId,
  226. type: "",
  227. payload: {
  228. message: "",
  229. playlistId: payload.playlist.playlistId
  230. }
  231. };
  232. if (payload.type === "playlist__remove_song" || payload.type === "playlist__remove_songs") {
  233. activity.payload.message = `Removed ${howManySongs} songs from playlist <playlistId>${payload.playlist.displayName}</playlistId>`;
  234. activity.type = "playlist__remove_songs";
  235. } else if (payload.type === "playlist__add_song" || payload.type === "playlist__add_songs") {
  236. activity.payload.message = `Added ${howManySongs} songs to playlist <playlistId>${payload.playlist.displayName}</playlistId>`;
  237. activity.type = "playlist__add_songs";
  238. }
  239. ActivitiesModule.runJob("ADD_ACTIVITY", activity, this)
  240. .then(() => next())
  241. .catch(next);
  242. }
  243. ],
  244. async err => {
  245. if (err) {
  246. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  247. return reject(new Error(err));
  248. }
  249. return resolve();
  250. }
  251. );
  252. });
  253. }
  254. /**
  255. * Removes any references to a station, playlist or song in activities
  256. *
  257. * @param {object} payload - object that contains the payload
  258. * @param {string} payload.type - type of reference. enum: ["mediaSource", "stationId", "playlistId", "playlistId"]
  259. * @param {string} payload.stationId - (optional) the id of a station
  260. * @param {string} payload.reportId - (optional) the id of a report
  261. * @param {string} payload.playlistId - (optional) the id of a playlist
  262. * @param {string} payload.mediaSource - (optional) the id of a song
  263. * @returns {Promise} - returns promise (reject, resolve)
  264. */
  265. async REMOVE_ACTIVITY_REFERENCES(payload) {
  266. const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
  267. return new Promise((resolve, reject) => {
  268. async.waterfall(
  269. [
  270. next => {
  271. if (
  272. (payload.type !== "mediaSource" &&
  273. payload.type !== "stationId" &&
  274. payload.type !== "reportId" &&
  275. payload.type !== "playlistId") ||
  276. !payload.type
  277. )
  278. return next("Please use a valid reference type.");
  279. if (!payload[payload.type]) return next(`Please provide a ${payload.type} in the job payload.`);
  280. return next();
  281. },
  282. // find all activities that include the reference
  283. next => {
  284. const query = {};
  285. query[`payload.${payload.type}`] = payload[payload.type];
  286. activityModel
  287. .find(query, ["_id", "userId", "payload.message"])
  288. .sort({ createdAt: -1 })
  289. .exec(next);
  290. },
  291. (activities, next) => {
  292. async.eachLimit(
  293. activities,
  294. 1,
  295. (activity, next) => {
  296. // remove the reference tags
  297. if (payload.mediaSource) {
  298. activity.payload.message = activity.payload.message.replace(
  299. /<mediaSource>(.*)<\/mediaSource>/g,
  300. "$1"
  301. );
  302. }
  303. if (payload.reportId) {
  304. activity.payload.message = activity.payload.message.replace(
  305. /<reportId>(.*)<\/reportId>/g,
  306. "$1"
  307. );
  308. }
  309. if (payload.playlistId) {
  310. activity.payload.message = activity.payload.message.replace(
  311. /<playlistId>(.*)<\/playlistId>/g,
  312. `$1`
  313. );
  314. }
  315. if (payload.stationId) {
  316. activity.payload.message = activity.payload.message.replace(
  317. /<stationId>(.*)<\/stationId>/g,
  318. `$1`
  319. );
  320. }
  321. activityModel
  322. .updateOne(
  323. { _id: activity._id },
  324. { $set: { "payload.message": activity.payload.message } }
  325. )
  326. .then(() => {
  327. WSModule.runJob("SOCKETS_FROM_USER", { userId: activity.userId })
  328. .then(sockets =>
  329. sockets.forEach(socket =>
  330. socket.dispatch("event:activity.updated", {
  331. data: {
  332. activityId: activity._id,
  333. message: activity.payload.message
  334. }
  335. })
  336. )
  337. )
  338. .catch(next);
  339. return next();
  340. })
  341. .catch(next);
  342. },
  343. err => next(err)
  344. );
  345. }
  346. ],
  347. async err => {
  348. if (err) {
  349. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  350. return reject(new Error(err));
  351. }
  352. return resolve();
  353. }
  354. );
  355. });
  356. }
  357. /**
  358. * Hides any activities of the same type within a 15-minute period to prevent spam
  359. *
  360. * @param {object} payload - object that contains the payload
  361. * @param {string} payload.userId - the id of the user to check for duplicates
  362. * @param {string} payload.type - the type of activity to check for duplicates
  363. * @returns {Promise} - returns promise (reject, resolve)
  364. */
  365. async CHECK_FOR_ACTIVITY_SPAM_TO_HIDE(payload) {
  366. const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
  367. return new Promise((resolve, reject) => {
  368. async.waterfall(
  369. [
  370. // find all activities of this type from the last 15 minutes
  371. next => {
  372. activityModel
  373. .find(
  374. {
  375. userId: payload.userId,
  376. type: payload.type,
  377. hidden: false,
  378. createdAt: {
  379. $gte: new Date(new Date() - 15 * 60 * 1000)
  380. }
  381. },
  382. "_id"
  383. )
  384. .sort({ createdAt: -1 })
  385. .skip(1)
  386. .exec(next);
  387. },
  388. // hide these activities and emit to socket listeners
  389. (activities, next) => {
  390. activities.forEach(activity => {
  391. activityModel.updateOne({ _id: activity._id }, { $set: { hidden: true } }).catch(next);
  392. WSModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId })
  393. .then(sockets => {
  394. sockets.forEach(socket =>
  395. socket.dispatch("event:activity.hidden", {
  396. data: { activityId: activity._id }
  397. })
  398. );
  399. })
  400. .catch(next);
  401. });
  402. return next();
  403. }
  404. ],
  405. async err => {
  406. if (err) {
  407. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  408. return reject(new Error(err));
  409. }
  410. return resolve();
  411. }
  412. );
  413. });
  414. }
  415. }
  416. export default new _ActivitiesModule();