media.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let MediaModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let YouTubeModule;
  8. let SongsModule;
  9. let WSModule;
  10. class _MediaModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("media");
  14. MediaModule = this;
  15. }
  16. /**
  17. * Initialises the media module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. this.setStage(1);
  23. CacheModule = this.moduleManager.modules.cache;
  24. DBModule = this.moduleManager.modules.db;
  25. UtilsModule = this.moduleManager.modules.utils;
  26. YouTubeModule = this.moduleManager.modules.youtube;
  27. SongsModule = this.moduleManager.modules.songs;
  28. WSModule = this.moduleManager.modules.ws;
  29. this.RatingsModel = await DBModule.runJob("GET_MODEL", { modelName: "ratings" });
  30. this.RatingsSchemaCache = await CacheModule.runJob("GET_SCHEMA", { schemaName: "ratings" });
  31. this.ImportJobModel = await DBModule.runJob("GET_MODEL", { modelName: "importJob" });
  32. this.setStage(2);
  33. return new Promise((resolve, reject) => {
  34. CacheModule.runJob("SUB", {
  35. channel: "importJob.updated",
  36. cb: importJob => {
  37. WSModule.runJob("EMIT_TO_ROOM", {
  38. room: "admin.import",
  39. args: ["event:admin.importJob.updated", { data: { importJob } }]
  40. });
  41. }
  42. });
  43. CacheModule.runJob("SUB", {
  44. channel: "importJob.removed",
  45. cb: jobId => {
  46. WSModule.runJob("EMIT_TO_ROOM", {
  47. room: "admin.import",
  48. args: ["event:admin.importJob.removed", { data: { jobId } }]
  49. });
  50. }
  51. });
  52. async.waterfall(
  53. [
  54. next => {
  55. this.setStage(2);
  56. CacheModule.runJob("HGETALL", { table: "ratings" })
  57. .then(ratings => {
  58. next(null, ratings);
  59. })
  60. .catch(next);
  61. },
  62. (ratings, next) => {
  63. this.setStage(3);
  64. if (!ratings) return next();
  65. const youtubeIds = Object.keys(ratings);
  66. return async.each(
  67. youtubeIds,
  68. (youtubeId, next) => {
  69. MediaModule.RatingsModel.findOne({ youtubeId }, (err, rating) => {
  70. if (err) next(err);
  71. else if (!rating)
  72. CacheModule.runJob("HDEL", {
  73. table: "ratings",
  74. key: youtubeId
  75. })
  76. .then(() => next())
  77. .catch(next);
  78. else next();
  79. });
  80. },
  81. next
  82. );
  83. },
  84. next => {
  85. this.setStage(4);
  86. MediaModule.RatingsModel.find({}, next);
  87. },
  88. (ratings, next) => {
  89. this.setStage(5);
  90. async.each(
  91. ratings,
  92. (rating, next) => {
  93. CacheModule.runJob("HSET", {
  94. table: "ratings",
  95. key: rating.youtubeId,
  96. value: MediaModule.RatingsSchemaCache(rating)
  97. })
  98. .then(() => next())
  99. .catch(next);
  100. },
  101. next
  102. );
  103. }
  104. ],
  105. async err => {
  106. if (err) {
  107. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  108. reject(new Error(err));
  109. } else resolve();
  110. }
  111. );
  112. });
  113. }
  114. /**
  115. * Recalculates dislikes and likes
  116. *
  117. * @param {object} payload - returns an object containing the payload
  118. * @param {string} payload.youtubeId - the youtube id
  119. * @returns {Promise} - returns a promise (resolve, reject)
  120. */
  121. async RECALCULATE_RATINGS(payload) {
  122. const playlistModel = await DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this);
  123. return new Promise((resolve, reject) => {
  124. async.waterfall(
  125. [
  126. next => {
  127. playlistModel.countDocuments(
  128. { songs: { $elemMatch: { youtubeId: payload.youtubeId } }, type: "user-liked" },
  129. (err, likes) => {
  130. if (err) return next(err);
  131. return next(null, likes);
  132. }
  133. );
  134. },
  135. (likes, next) => {
  136. playlistModel.countDocuments(
  137. { songs: { $elemMatch: { youtubeId: payload.youtubeId } }, type: "user-disliked" },
  138. (err, dislikes) => {
  139. if (err) return next(err);
  140. return next(err, { likes, dislikes });
  141. }
  142. );
  143. },
  144. ({ likes, dislikes }, next) => {
  145. MediaModule.RatingsModel.findOneAndUpdate(
  146. { youtubeId: payload.youtubeId },
  147. {
  148. $set: {
  149. likes,
  150. dislikes
  151. }
  152. },
  153. { new: true, upsert: true },
  154. next
  155. );
  156. },
  157. (ratings, next) => {
  158. CacheModule.runJob(
  159. "HSET",
  160. {
  161. table: "ratings",
  162. key: payload.youtubeId,
  163. value: ratings
  164. },
  165. this
  166. )
  167. .then(ratings => next(null, ratings))
  168. .catch(next);
  169. }
  170. ],
  171. (err, { likes, dislikes }) => {
  172. if (err) return reject(new Error(err));
  173. return resolve({ likes, dislikes });
  174. }
  175. );
  176. });
  177. }
  178. /**
  179. * Recalculates all dislikes and likes
  180. *
  181. * @returns {Promise} - returns a promise (resolve, reject)
  182. */
  183. RECALCULATE_ALL_RATINGS() {
  184. return new Promise((resolve, reject) => {
  185. async.waterfall(
  186. [
  187. next => {
  188. SongsModule.SongModel.find({}, { youtubeId: true }, next);
  189. },
  190. (songs, next) => {
  191. YouTubeModule.youtubeVideoModel.find({}, { youtubeId: true }, (err, videos) => {
  192. if (err) next(err);
  193. else
  194. next(null, [
  195. ...songs.map(song => song.youtubeId),
  196. ...videos.map(video => video.youtubeId)
  197. ]);
  198. });
  199. },
  200. (youtubeIds, next) => {
  201. async.eachLimit(
  202. youtubeIds,
  203. 2,
  204. (youtubeId, next) => {
  205. MediaModule.runJob("RECALCULATE_RATINGS", { youtubeId }, this)
  206. .then(() => {
  207. next();
  208. })
  209. .catch(err => {
  210. next(err);
  211. });
  212. },
  213. err => {
  214. next(err);
  215. }
  216. );
  217. }
  218. ],
  219. err => {
  220. if (err) return reject(new Error(err));
  221. return resolve();
  222. }
  223. );
  224. });
  225. }
  226. /**
  227. * Gets ratings by id from the cache or Mongo, and if it isn't in the cache yet, adds it the cache
  228. *
  229. * @param {object} payload - object containing the payload
  230. * @param {string} payload.youtubeId - the youtube id
  231. * @param {string} payload.createMissing - whether to create missing ratings
  232. * @returns {Promise} - returns a promise (resolve, reject)
  233. */
  234. GET_RATINGS(payload) {
  235. return new Promise((resolve, reject) => {
  236. async.waterfall(
  237. [
  238. next =>
  239. CacheModule.runJob("HGET", { table: "ratings", key: payload.youtubeId }, this)
  240. .then(ratings => next(null, ratings))
  241. .catch(next),
  242. (ratings, next) => {
  243. if (ratings) return next(true, ratings);
  244. return MediaModule.RatingsModel.findOne({ youtubeId: payload.youtubeId }, next);
  245. },
  246. (ratings, next) => {
  247. if (ratings)
  248. return CacheModule.runJob(
  249. "HSET",
  250. {
  251. table: "ratings",
  252. key: payload.youtubeId,
  253. value: ratings
  254. },
  255. this
  256. ).then(ratings => next(true, ratings));
  257. if (!payload.createMissing) return next("Ratings not found.");
  258. return MediaModule.runJob("RECALCULATE_RATINGS", { youtubeId: payload.youtubeId }, this)
  259. .then(() => next())
  260. .catch(next);
  261. },
  262. next =>
  263. MediaModule.runJob("GET_RATINGS", { youtubeId: payload.youtubeId }, this)
  264. .then(res => next(null, res.ratings))
  265. .catch(next)
  266. ],
  267. (err, ratings) => {
  268. if (err && err !== true) return reject(new Error(err));
  269. return resolve({ ratings });
  270. }
  271. );
  272. });
  273. }
  274. /**
  275. * Remove ratings by id from the cache and Mongo
  276. *
  277. * @param {object} payload - object containing the payload
  278. * @param {string} payload.youtubeIds - the youtube id
  279. * @returns {Promise} - returns a promise (resolve, reject)
  280. */
  281. REMOVE_RATINGS(payload) {
  282. return new Promise((resolve, reject) => {
  283. let { youtubeIds } = payload;
  284. if (!Array.isArray(youtubeIds)) youtubeIds = [youtubeIds];
  285. async.eachLimit(
  286. youtubeIds,
  287. 1,
  288. (youtubeId, next) => {
  289. async.waterfall(
  290. [
  291. next => {
  292. MediaModule.RatingsModel.deleteOne({ youtubeId }, err => {
  293. if (err) next(err);
  294. else next();
  295. });
  296. },
  297. next => {
  298. CacheModule.runJob("HDEL", { table: "ratings", key: youtubeId }, this)
  299. .then(() => {
  300. next();
  301. })
  302. .catch(next);
  303. }
  304. ],
  305. next
  306. );
  307. },
  308. err => {
  309. if (err && err !== true) return reject(new Error(err));
  310. return resolve();
  311. }
  312. );
  313. });
  314. }
  315. /**
  316. * Get song or youtube video by youtubeId
  317. *
  318. * @param {object} payload - an object containing the payload
  319. * @param {string} payload.youtubeId - the youtube id of the song/video
  320. * @param {string} payload.userId - the user id
  321. * @returns {Promise} - returns a promise (resolve, reject)
  322. */
  323. GET_MEDIA(payload) {
  324. return new Promise((resolve, reject) => {
  325. async.waterfall(
  326. [
  327. next => {
  328. SongsModule.SongModel.findOne({ youtubeId: payload.youtubeId }, next);
  329. },
  330. (song, next) => {
  331. if (song && song.duration > 0) next(true, song);
  332. else {
  333. YouTubeModule.runJob(
  334. "GET_VIDEO",
  335. { identifier: payload.youtubeId, createMissing: true },
  336. this
  337. )
  338. .then(response => {
  339. const { youtubeId, title, author, duration } = response.video;
  340. next(null, song, { youtubeId, title, artists: [author], duration });
  341. })
  342. .catch(next);
  343. }
  344. },
  345. (song, youtubeVideo, next) => {
  346. if (song && song.duration <= 0) {
  347. song.duration = youtubeVideo.duration;
  348. song.save({ validateBeforeSave: true }, err => {
  349. if (err) next(err, song);
  350. next(null, song);
  351. });
  352. } else {
  353. next(null, {
  354. ...youtubeVideo,
  355. skipDuration: 0,
  356. requestedBy: payload.userId,
  357. requestedAt: Date.now(),
  358. verified: false
  359. });
  360. }
  361. }
  362. ],
  363. (err, song) => {
  364. if (err && err !== true) return reject(new Error(err));
  365. return resolve({ song });
  366. }
  367. );
  368. });
  369. }
  370. /**
  371. * Remove import job by id from Mongo
  372. *
  373. * @param {object} payload - object containing the payload
  374. * @param {string} payload.jobIds - the job ids
  375. * @returns {Promise} - returns a promise (resolve, reject)
  376. */
  377. UPDATE_IMPORT_JOBS(payload) {
  378. return new Promise((resolve, reject) => {
  379. let { jobIds } = payload;
  380. if (!Array.isArray(jobIds)) jobIds = [jobIds];
  381. async.waterfall(
  382. [
  383. next => {
  384. MediaModule.ImportJobModel.find({ _id: { $in: jobIds } }, next);
  385. },
  386. (importJobs, next) => {
  387. async.eachLimit(
  388. importJobs,
  389. 1,
  390. (importJob, next) => {
  391. CacheModule.runJob("PUB", {
  392. channel: "importJob.updated",
  393. value: importJob
  394. })
  395. .then(() => next())
  396. .catch(next);
  397. },
  398. err => {
  399. if (err) next(err);
  400. else next(null, importJobs);
  401. }
  402. );
  403. }
  404. ],
  405. (err, importJobs) => {
  406. if (err && err !== true) return reject(new Error(err));
  407. return resolve({ importJobs });
  408. }
  409. );
  410. });
  411. }
  412. /**
  413. * Remove import job by id from Mongo
  414. *
  415. * @param {object} payload - object containing the payload
  416. * @param {string} payload.jobIds - the job ids
  417. * @returns {Promise} - returns a promise (resolve, reject)
  418. */
  419. REMOVE_IMPORT_JOBS(payload) {
  420. return new Promise((resolve, reject) => {
  421. let { jobIds } = payload;
  422. if (!Array.isArray(jobIds)) jobIds = [jobIds];
  423. async.waterfall(
  424. [
  425. next => {
  426. MediaModule.ImportJobModel.deleteMany({ _id: { $in: jobIds } }, err => {
  427. if (err) next(err);
  428. else next();
  429. });
  430. },
  431. next => {
  432. async.eachLimit(
  433. jobIds,
  434. 1,
  435. (jobId, next) => {
  436. CacheModule.runJob("PUB", {
  437. channel: "importJob.removed",
  438. value: jobId
  439. })
  440. .then(() => next())
  441. .catch(next);
  442. },
  443. next
  444. );
  445. }
  446. ],
  447. err => {
  448. if (err && err !== true) return reject(new Error(err));
  449. return resolve();
  450. }
  451. );
  452. });
  453. }
  454. }
  455. export default new _MediaModule();