activities.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let ActivitiesModule;
  4. let DBModule;
  5. let CacheModule;
  6. let UtilsModule;
  7. let WSModule;
  8. let PlaylistsModule;
  9. class _ActivitiesModule extends CoreClass {
  10. // eslint-disable-next-line require-jsdoc
  11. constructor() {
  12. super("activities");
  13. ActivitiesModule = this;
  14. }
  15. /**
  16. * Initialises the activities module
  17. * @returns {Promise} - returns promise (reject, resolve)
  18. */
  19. initialize() {
  20. return new Promise(resolve => {
  21. DBModule = this.moduleManager.modules.db;
  22. CacheModule = this.moduleManager.modules.cache;
  23. UtilsModule = this.moduleManager.modules.utils;
  24. WSModule = this.moduleManager.modules.ws;
  25. PlaylistsModule = this.moduleManager.modules.playlists;
  26. resolve();
  27. });
  28. }
  29. /**
  30. * Adds a new activity to the database
  31. * @param {object} payload - object that contains the payload
  32. * @param {string} payload.userId - the id of the user who's activity is to be added
  33. * @param {string} payload.type - the type of activity (enum specified in schema)
  34. * @param {object} payload.payload - the details of the activity e.g. an array of songs that were added
  35. * @param {string} payload.payload.message - the main message describing the activity e.g. 50 songs added to playlist 'playlist name'
  36. * @param {string} payload.payload.thumbnail - url to a thumbnail e.g. song album art to be used when display an activity
  37. * @param {string} payload.payload.mediaSource - (optional) if relevant, the media source of the song related to the activity
  38. * @param {string} payload.payload.reportId - (optional) if relevant, the id of the report related to the activity
  39. * @param {string} payload.payload.playlistId - (optional) if relevant, the id of the playlist related to the activity
  40. * @param {string} payload.payload.stationId - (optional) if relevant, the id of the station related to the activity
  41. * @returns {Promise} - returns promise (reject, resolve)
  42. */
  43. ADD_ACTIVITY(payload) {
  44. return new Promise((resolve, reject) => {
  45. async.waterfall(
  46. [
  47. next => {
  48. DBModule.runJob("GET_MODEL", { modelName: "activity" }, this)
  49. .then(res => next(null, res))
  50. .catch(next);
  51. },
  52. (ActivityModel, next) => {
  53. const { userId, type } = payload;
  54. const activity = new ActivityModel({
  55. userId,
  56. type,
  57. payload: payload.payload
  58. });
  59. activity.save(next);
  60. },
  61. (activity, next) => {
  62. WSModule.runJob("SOCKETS_FROM_USER", { userId: activity.userId }, this)
  63. .then(sockets => {
  64. sockets.forEach(socket =>
  65. socket.dispatch("event:activity.created", { data: { activity } })
  66. );
  67. next(null, activity);
  68. })
  69. .catch(next);
  70. },
  71. (activity, next) => {
  72. const mergeableActivities = ["playlist__remove_song", "playlist__add_song"];
  73. const spammableActivities = [
  74. "user__toggle_nightmode",
  75. "user__toggle_autoskip_disliked_songs",
  76. "user__toggle_activity_watch",
  77. "song__like",
  78. "song__unlike",
  79. "song__dislike",
  80. "song__undislike"
  81. ];
  82. CacheModule.runJob("HGET", { table: "recentActivities", key: activity.userId })
  83. .then(recentActivity => {
  84. if (recentActivity) {
  85. const timeDifference = mins =>
  86. new Date() - new Date(recentActivity.createdAt) < mins * 60 * 1000;
  87. // if both activities have the same type, if within last 15 mins and if activity is within the spammableActivities array
  88. if (
  89. recentActivity.type === activity.type &&
  90. !!timeDifference(15) &&
  91. spammableActivities.includes(activity.type)
  92. )
  93. return ActivitiesModule.runJob(
  94. "CHECK_FOR_ACTIVITY_SPAM_TO_HIDE",
  95. { userId: activity.userId, type: activity.type },
  96. this
  97. )
  98. .then(() => next(null, activity))
  99. .catch(next);
  100. // if activity is within the mergeableActivities array, if both activities are about removing/adding and if within last 5 mins
  101. if (
  102. mergeableActivities.includes(activity.type) &&
  103. recentActivity.type === activity.type &&
  104. !!timeDifference(5)
  105. ) {
  106. return PlaylistsModule.runJob("GET_PLAYLIST", {
  107. playlistId: activity.payload.playlistId
  108. })
  109. .then(playlist =>
  110. ActivitiesModule.runJob(
  111. "CHECK_FOR_ACTIVITY_SPAM_TO_MERGE",
  112. {
  113. userId: activity.userId,
  114. type: activity.type,
  115. playlist: {
  116. playlistId: playlist._id,
  117. displayName: playlist.displayName
  118. }
  119. },
  120. this
  121. )
  122. .then(() => next(null, activity))
  123. .catch(next)
  124. )
  125. .catch(next);
  126. }
  127. return next(null, activity);
  128. }
  129. return next(null, activity);
  130. })
  131. .catch(next);
  132. },
  133. // store most recent activity in cache to be quickly accessible
  134. (activity, next) =>
  135. CacheModule.runJob(
  136. "HSET",
  137. {
  138. table: "recentActivities",
  139. key: activity.userId,
  140. value: { createdAt: activity.createdAt, type: activity.type }
  141. },
  142. this
  143. )
  144. .then(() => next(null))
  145. .catch(next)
  146. ],
  147. async (err, activity) => {
  148. if (err) {
  149. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  150. return reject(new Error(err));
  151. }
  152. return resolve(activity);
  153. }
  154. );
  155. });
  156. }
  157. /**
  158. * Merges activities about adding/removing songs from a playlist within a 5-minute period to prevent spam
  159. * @param {object} payload - object that contains the payload
  160. * @param {string} payload.userId - the id of the user to check for duplicates
  161. * @param {object} payload.playlist - object that contains info about the relevant playlist
  162. * @param {string} payload.playlist.playlistId - the id of the playlist
  163. * @param {string} payload.playlist.displayName - the display name of the playlist
  164. * @param {string} payload.type - the type of activity to check for duplicates
  165. * @returns {Promise} - returns promise (reject, resolve)
  166. */
  167. async CHECK_FOR_ACTIVITY_SPAM_TO_MERGE(payload) {
  168. const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
  169. return new Promise((resolve, reject) => {
  170. async.waterfall(
  171. [
  172. // find all activities of this type from the last 5 minutes
  173. next => {
  174. activityModel
  175. .find(
  176. {
  177. userId: payload.userId,
  178. type: { $in: [payload.type, `${payload.type}s`] },
  179. hidden: false,
  180. createdAt: {
  181. $gte: new Date(new Date() - 5 * 60 * 1000)
  182. },
  183. "payload.playlistId": payload.playlist.playlistId
  184. },
  185. ["_id", "type", "payload.message"]
  186. )
  187. .sort({ createdAt: -1 })
  188. .exec(next);
  189. },
  190. // hide these activities, emit to socket listeners and count number of songs in each
  191. (activities, next) => {
  192. let howManySongs = 0; // how many songs added/removed
  193. activities.forEach(activity => {
  194. activityModel.updateOne({ _id: activity._id }, { $set: { hidden: true } }).catch(next);
  195. WSModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId }, this)
  196. .then(sockets =>
  197. sockets.forEach(socket =>
  198. socket.dispatch("event:activity.hidden", { data: { activityId: activity._id } })
  199. )
  200. )
  201. .catch(next);
  202. if (activity.type === payload.type) howManySongs += 1;
  203. else if (activity.type === `${payload.type}s`)
  204. howManySongs += parseInt(
  205. activity.payload.message.replace(
  206. /(?:Removed|Added)\s(?<songs>\d+)\ssongs.+/g,
  207. "$<songs>"
  208. )
  209. );
  210. });
  211. return next(null, howManySongs);
  212. },
  213. // // delete in cache the most recent activity to avoid issues when adding a new activity
  214. (howManySongs, next) => {
  215. CacheModule.runJob("HDEL", { table: "recentActivities", key: payload.userId }, this)
  216. .then(() => next(null, howManySongs))
  217. .catch(next);
  218. },
  219. // add a new activity that merges the activities together
  220. (howManySongs, next) => {
  221. const activity = {
  222. userId: payload.userId,
  223. type: "",
  224. payload: {
  225. message: "",
  226. playlistId: payload.playlist.playlistId
  227. }
  228. };
  229. if (payload.type === "playlist__remove_song" || payload.type === "playlist__remove_songs") {
  230. activity.payload.message = `Removed ${howManySongs} songs from playlist <playlistId>${payload.playlist.displayName}</playlistId>`;
  231. activity.type = "playlist__remove_songs";
  232. } else if (payload.type === "playlist__add_song" || payload.type === "playlist__add_songs") {
  233. activity.payload.message = `Added ${howManySongs} songs to playlist <playlistId>${payload.playlist.displayName}</playlistId>`;
  234. activity.type = "playlist__add_songs";
  235. }
  236. ActivitiesModule.runJob("ADD_ACTIVITY", activity, this)
  237. .then(() => next())
  238. .catch(next);
  239. }
  240. ],
  241. async err => {
  242. if (err) {
  243. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  244. return reject(new Error(err));
  245. }
  246. return resolve();
  247. }
  248. );
  249. });
  250. }
  251. /**
  252. * Removes any references to a station, playlist or song in activities
  253. * @param {object} payload - object that contains the payload
  254. * @param {string} payload.type - type of reference. enum: ["mediaSource", "stationId", "playlistId", "playlistId"]
  255. * @param {string} payload.stationId - (optional) the id of a station
  256. * @param {string} payload.reportId - (optional) the id of a report
  257. * @param {string} payload.playlistId - (optional) the id of a playlist
  258. * @param {string} payload.mediaSource - (optional) the id of a song
  259. * @returns {Promise} - returns promise (reject, resolve)
  260. */
  261. async REMOVE_ACTIVITY_REFERENCES(payload) {
  262. const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
  263. return new Promise((resolve, reject) => {
  264. async.waterfall(
  265. [
  266. next => {
  267. if (
  268. (payload.type !== "mediaSource" &&
  269. payload.type !== "stationId" &&
  270. payload.type !== "reportId" &&
  271. payload.type !== "playlistId") ||
  272. !payload.type
  273. )
  274. return next("Please use a valid reference type.");
  275. if (!payload[payload.type]) return next(`Please provide a ${payload.type} in the job payload.`);
  276. return next();
  277. },
  278. // find all activities that include the reference
  279. next => {
  280. const query = {};
  281. query[`payload.${payload.type}`] = payload[payload.type];
  282. activityModel
  283. .find(query, ["_id", "userId", "payload.message"])
  284. .sort({ createdAt: -1 })
  285. .exec(next);
  286. },
  287. (activities, next) => {
  288. async.eachLimit(
  289. activities,
  290. 1,
  291. (activity, next) => {
  292. // remove the reference tags
  293. if (payload.mediaSource) {
  294. activity.payload.message = activity.payload.message.replace(
  295. /<mediaSource>(.*)<\/mediaSource>/g,
  296. "$1"
  297. );
  298. }
  299. if (payload.reportId) {
  300. activity.payload.message = activity.payload.message.replace(
  301. /<reportId>(.*)<\/reportId>/g,
  302. "$1"
  303. );
  304. }
  305. if (payload.playlistId) {
  306. activity.payload.message = activity.payload.message.replace(
  307. /<playlistId>(.*)<\/playlistId>/g,
  308. `$1`
  309. );
  310. }
  311. if (payload.stationId) {
  312. activity.payload.message = activity.payload.message.replace(
  313. /<stationId>(.*)<\/stationId>/g,
  314. `$1`
  315. );
  316. }
  317. activityModel
  318. .updateOne(
  319. { _id: activity._id },
  320. { $set: { "payload.message": activity.payload.message } }
  321. )
  322. .then(() => {
  323. WSModule.runJob("SOCKETS_FROM_USER", { userId: activity.userId })
  324. .then(sockets =>
  325. sockets.forEach(socket =>
  326. socket.dispatch("event:activity.updated", {
  327. data: {
  328. activityId: activity._id,
  329. message: activity.payload.message
  330. }
  331. })
  332. )
  333. )
  334. .catch(next);
  335. return next();
  336. })
  337. .catch(next);
  338. },
  339. err => next(err)
  340. );
  341. }
  342. ],
  343. async err => {
  344. if (err) {
  345. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  346. return reject(new Error(err));
  347. }
  348. return resolve();
  349. }
  350. );
  351. });
  352. }
  353. /**
  354. * Hides any activities of the same type within a 15-minute period to prevent spam
  355. * @param {object} payload - object that contains the payload
  356. * @param {string} payload.userId - the id of the user to check for duplicates
  357. * @param {string} payload.type - the type of activity to check for duplicates
  358. * @returns {Promise} - returns promise (reject, resolve)
  359. */
  360. async CHECK_FOR_ACTIVITY_SPAM_TO_HIDE(payload) {
  361. const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);
  362. return new Promise((resolve, reject) => {
  363. async.waterfall(
  364. [
  365. // find all activities of this type from the last 15 minutes
  366. next => {
  367. activityModel
  368. .find(
  369. {
  370. userId: payload.userId,
  371. type: payload.type,
  372. hidden: false,
  373. createdAt: {
  374. $gte: new Date(new Date() - 15 * 60 * 1000)
  375. }
  376. },
  377. "_id"
  378. )
  379. .sort({ createdAt: -1 })
  380. .skip(1)
  381. .exec(next);
  382. },
  383. // hide these activities and emit to socket listeners
  384. (activities, next) => {
  385. activities.forEach(activity => {
  386. activityModel.updateOne({ _id: activity._id }, { $set: { hidden: true } }).catch(next);
  387. WSModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId })
  388. .then(sockets => {
  389. sockets.forEach(socket =>
  390. socket.dispatch("event:activity.hidden", {
  391. data: { activityId: activity._id }
  392. })
  393. );
  394. })
  395. .catch(next);
  396. });
  397. return next();
  398. }
  399. ],
  400. async err => {
  401. if (err) {
  402. err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);
  403. return reject(new Error(err));
  404. }
  405. return resolve();
  406. }
  407. );
  408. });
  409. }
  410. }
  411. export default new _ActivitiesModule();