media.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. import async from "async";
  2. import CoreClass from "../core";
  3. let MediaModule;
  4. let CacheModule;
  5. let DBModule;
  6. let UtilsModule;
  7. let YouTubeModule;
  8. let SongsModule;
  9. let WSModule;
  10. class _MediaModule extends CoreClass {
  11. // eslint-disable-next-line require-jsdoc
  12. constructor() {
  13. super("media");
  14. MediaModule = this;
  15. }
  16. /**
  17. * Initialises the media module
  18. *
  19. * @returns {Promise} - returns promise (reject, resolve)
  20. */
  21. async initialize() {
  22. this.setStage(1);
  23. CacheModule = this.moduleManager.modules.cache;
  24. DBModule = this.moduleManager.modules.db;
  25. UtilsModule = this.moduleManager.modules.utils;
  26. YouTubeModule = this.moduleManager.modules.youtube;
  27. SongsModule = this.moduleManager.modules.songs;
  28. WSModule = this.moduleManager.modules.ws;
  29. this.RatingsModel = await DBModule.runJob("GET_MODEL", { modelName: "ratings" });
  30. this.RatingsSchemaCache = await CacheModule.runJob("GET_SCHEMA", { schemaName: "ratings" });
  31. this.ImportJobModel = await DBModule.runJob("GET_MODEL", { modelName: "importJob" });
  32. this.setStage(2);
  33. return new Promise((resolve, reject) => {
  34. CacheModule.runJob("SUB", {
  35. channel: "importJob.updated",
  36. cb: importJob => {
  37. WSModule.runJob("EMIT_TO_ROOM", {
  38. room: "admin.import",
  39. args: ["event:admin.importJob.updated", { data: { importJob } }]
  40. });
  41. }
  42. });
  43. CacheModule.runJob("SUB", {
  44. channel: "importJob.removed",
  45. cb: jobId => {
  46. WSModule.runJob("EMIT_TO_ROOM", {
  47. room: "admin.import",
  48. args: ["event:admin.importJob.removed", { data: { jobId } }]
  49. });
  50. }
  51. });
  52. async.waterfall(
  53. [
  54. next => {
  55. this.setStage(2);
  56. CacheModule.runJob("HGETALL", { table: "ratings" })
  57. .then(ratings => {
  58. next(null, ratings);
  59. })
  60. .catch(next);
  61. },
  62. (ratings, next) => {
  63. this.setStage(3);
  64. if (!ratings) return next();
  65. const youtubeIds = Object.keys(ratings);
  66. return async.each(
  67. youtubeIds,
  68. (youtubeId, next) => {
  69. MediaModule.RatingsModel.findOne({ youtubeId }, (err, rating) => {
  70. if (err) next(err);
  71. else if (!rating)
  72. CacheModule.runJob("HDEL", {
  73. table: "ratings",
  74. key: youtubeId
  75. })
  76. .then(() => next())
  77. .catch(next);
  78. else next();
  79. });
  80. },
  81. next
  82. );
  83. },
  84. next => {
  85. this.setStage(4);
  86. MediaModule.RatingsModel.find({}, next);
  87. },
  88. (ratings, next) => {
  89. this.setStage(5);
  90. async.each(
  91. ratings,
  92. (rating, next) => {
  93. CacheModule.runJob("HSET", {
  94. table: "ratings",
  95. key: rating.youtubeId,
  96. value: MediaModule.RatingsSchemaCache(rating)
  97. })
  98. .then(() => next())
  99. .catch(next);
  100. },
  101. next
  102. );
  103. }
  104. ],
  105. async err => {
  106. if (err) {
  107. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  108. reject(new Error(err));
  109. } else resolve();
  110. }
  111. );
  112. });
  113. }
  114. /**
  115. * Recalculates dislikes and likes
  116. *
  117. * @param {object} payload - returns an object containing the payload
  118. * @param {string} payload.youtubeId - the youtube id
  119. * @returns {Promise} - returns a promise (resolve, reject)
  120. */
  121. async RECALCULATE_RATINGS(payload) {
  122. const playlistModel = await DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this);
  123. return new Promise((resolve, reject) => {
  124. async.waterfall(
  125. [
  126. next => {
  127. playlistModel.countDocuments(
  128. { songs: { $elemMatch: { youtubeId: payload.youtubeId } }, type: "user-liked" },
  129. (err, likes) => {
  130. if (err) return next(err);
  131. return next(null, likes);
  132. }
  133. );
  134. },
  135. (likes, next) => {
  136. playlistModel.countDocuments(
  137. { songs: { $elemMatch: { youtubeId: payload.youtubeId } }, type: "user-disliked" },
  138. (err, dislikes) => {
  139. if (err) return next(err);
  140. return next(err, { likes, dislikes });
  141. }
  142. );
  143. },
  144. ({ likes, dislikes }, next) => {
  145. MediaModule.RatingsModel.findOneAndUpdate(
  146. { youtubeId: payload.youtubeId },
  147. {
  148. $set: {
  149. likes,
  150. dislikes
  151. }
  152. },
  153. { new: true, upsert: true },
  154. next
  155. );
  156. },
  157. (ratings, next) => {
  158. CacheModule.runJob(
  159. "HSET",
  160. {
  161. table: "ratings",
  162. key: payload.youtubeId,
  163. value: ratings
  164. },
  165. this
  166. )
  167. .then(ratings => next(null, ratings))
  168. .catch(next);
  169. }
  170. ],
  171. (err, { likes, dislikes }) => {
  172. if (err) return reject(new Error(err));
  173. return resolve({ likes, dislikes });
  174. }
  175. );
  176. });
  177. }
  178. /**
  179. * Recalculates all dislikes and likes
  180. *
  181. * @returns {Promise} - returns a promise (resolve, reject)
  182. */
  183. RECALCULATE_ALL_RATINGS() {
  184. return new Promise((resolve, reject) => {
  185. async.waterfall(
  186. [
  187. next => {
  188. SongsModule.SongModel.find({}, { youtubeId: true }, next);
  189. },
  190. (songs, next) => {
  191. YouTubeModule.youtubeVideoModel.find({}, { youtubeId: true }, (err, videos) => {
  192. if (err) next(err);
  193. else
  194. next(null, [
  195. ...songs.map(song => song.youtubeId),
  196. ...videos.map(video => video.youtubeId)
  197. ]);
  198. });
  199. },
  200. (youtubeIds, next) => {
  201. async.eachLimit(
  202. youtubeIds,
  203. 2,
  204. (youtubeId, next) => {
  205. this.publishProgress({
  206. status: "update",
  207. message: `Recalculating ratings for ${youtubeId}`
  208. });
  209. MediaModule.runJob("RECALCULATE_RATINGS", { youtubeId }, this)
  210. .then(() => {
  211. next();
  212. })
  213. .catch(err => {
  214. next(err);
  215. });
  216. },
  217. err => {
  218. next(err);
  219. }
  220. );
  221. }
  222. ],
  223. err => {
  224. if (err) return reject(new Error(err));
  225. return resolve();
  226. }
  227. );
  228. });
  229. }
  230. /**
  231. * Gets ratings by id from the cache or Mongo, and if it isn't in the cache yet, adds it the cache
  232. *
  233. * @param {object} payload - object containing the payload
  234. * @param {string} payload.youtubeId - the youtube id
  235. * @param {string} payload.createMissing - whether to create missing ratings
  236. * @returns {Promise} - returns a promise (resolve, reject)
  237. */
  238. GET_RATINGS(payload) {
  239. return new Promise((resolve, reject) => {
  240. async.waterfall(
  241. [
  242. next =>
  243. CacheModule.runJob("HGET", { table: "ratings", key: payload.youtubeId }, this)
  244. .then(ratings => next(null, ratings))
  245. .catch(next),
  246. (ratings, next) => {
  247. if (ratings) return next(true, ratings);
  248. return MediaModule.RatingsModel.findOne({ youtubeId: payload.youtubeId }, next);
  249. },
  250. (ratings, next) => {
  251. if (ratings)
  252. return CacheModule.runJob(
  253. "HSET",
  254. {
  255. table: "ratings",
  256. key: payload.youtubeId,
  257. value: ratings
  258. },
  259. this
  260. ).then(ratings => next(true, ratings));
  261. if (!payload.createMissing) return next("Ratings not found.");
  262. return MediaModule.runJob("RECALCULATE_RATINGS", { youtubeId: payload.youtubeId }, this)
  263. .then(() => next())
  264. .catch(next);
  265. },
  266. next =>
  267. MediaModule.runJob("GET_RATINGS", { youtubeId: payload.youtubeId }, this)
  268. .then(res => next(null, res.ratings))
  269. .catch(next)
  270. ],
  271. (err, ratings) => {
  272. if (err && err !== true) return reject(new Error(err));
  273. return resolve({ ratings });
  274. }
  275. );
  276. });
  277. }
  278. /**
  279. * Remove ratings by id from the cache and Mongo
  280. *
  281. * @param {object} payload - object containing the payload
  282. * @param {string} payload.youtubeIds - the youtube id
  283. * @returns {Promise} - returns a promise (resolve, reject)
  284. */
  285. REMOVE_RATINGS(payload) {
  286. return new Promise((resolve, reject) => {
  287. let { youtubeIds } = payload;
  288. if (!Array.isArray(youtubeIds)) youtubeIds = [youtubeIds];
  289. async.eachLimit(
  290. youtubeIds,
  291. 1,
  292. (youtubeId, next) => {
  293. async.waterfall(
  294. [
  295. next => {
  296. MediaModule.RatingsModel.deleteOne({ youtubeId }, err => {
  297. if (err) next(err);
  298. else next();
  299. });
  300. },
  301. next => {
  302. CacheModule.runJob("HDEL", { table: "ratings", key: youtubeId }, this)
  303. .then(() => {
  304. next();
  305. })
  306. .catch(next);
  307. }
  308. ],
  309. next
  310. );
  311. },
  312. err => {
  313. if (err && err !== true) return reject(new Error(err));
  314. return resolve();
  315. }
  316. );
  317. });
  318. }
  319. /**
  320. * Get song or youtube video by youtubeId
  321. *
  322. * @param {object} payload - an object containing the payload
  323. * @param {string} payload.youtubeId - the youtube id of the song/video
  324. * @param {string} payload.userId - the user id
  325. * @returns {Promise} - returns a promise (resolve, reject)
  326. */
  327. GET_MEDIA(payload) {
  328. return new Promise((resolve, reject) => {
  329. async.waterfall(
  330. [
  331. next => {
  332. SongsModule.SongModel.findOne({ youtubeId: payload.youtubeId }, next);
  333. },
  334. (song, next) => {
  335. if (song && song.duration > 0) next(true, song);
  336. else {
  337. YouTubeModule.runJob(
  338. "GET_VIDEO",
  339. { identifier: payload.youtubeId, createMissing: true },
  340. this
  341. )
  342. .then(response => {
  343. const { youtubeId, title, author, duration } = response.video;
  344. next(null, song, { youtubeId, title, artists: [author], duration });
  345. })
  346. .catch(next);
  347. }
  348. },
  349. (song, youtubeVideo, next) => {
  350. if (song && song.duration <= 0) {
  351. song.duration = youtubeVideo.duration;
  352. song.save({ validateBeforeSave: true }, err => {
  353. if (err) next(err, song);
  354. next(null, song);
  355. });
  356. } else {
  357. next(null, {
  358. ...youtubeVideo,
  359. skipDuration: 0,
  360. requestedBy: payload.userId,
  361. requestedAt: Date.now(),
  362. verified: false
  363. });
  364. }
  365. }
  366. ],
  367. (err, song) => {
  368. if (err && err !== true) return reject(new Error(err));
  369. return resolve({ song });
  370. }
  371. );
  372. });
  373. }
  374. /**
  375. * Remove import job by id from Mongo
  376. *
  377. * @param {object} payload - object containing the payload
  378. * @param {string} payload.jobIds - the job ids
  379. * @returns {Promise} - returns a promise (resolve, reject)
  380. */
  381. UPDATE_IMPORT_JOBS(payload) {
  382. return new Promise((resolve, reject) => {
  383. let { jobIds } = payload;
  384. if (!Array.isArray(jobIds)) jobIds = [jobIds];
  385. async.waterfall(
  386. [
  387. next => {
  388. MediaModule.ImportJobModel.find({ _id: { $in: jobIds } }, next);
  389. },
  390. (importJobs, next) => {
  391. async.eachLimit(
  392. importJobs,
  393. 1,
  394. (importJob, next) => {
  395. CacheModule.runJob("PUB", {
  396. channel: "importJob.updated",
  397. value: importJob
  398. })
  399. .then(() => next())
  400. .catch(next);
  401. },
  402. err => {
  403. if (err) next(err);
  404. else next(null, importJobs);
  405. }
  406. );
  407. }
  408. ],
  409. (err, importJobs) => {
  410. if (err && err !== true) return reject(new Error(err));
  411. return resolve({ importJobs });
  412. }
  413. );
  414. });
  415. }
  416. /**
  417. * Remove import job by id from Mongo
  418. *
  419. * @param {object} payload - object containing the payload
  420. * @param {string} payload.jobIds - the job ids
  421. * @returns {Promise} - returns a promise (resolve, reject)
  422. */
  423. REMOVE_IMPORT_JOBS(payload) {
  424. return new Promise((resolve, reject) => {
  425. let { jobIds } = payload;
  426. if (!Array.isArray(jobIds)) jobIds = [jobIds];
  427. async.waterfall(
  428. [
  429. next => {
  430. MediaModule.ImportJobModel.deleteMany({ _id: { $in: jobIds } }, err => {
  431. if (err) next(err);
  432. else next();
  433. });
  434. },
  435. next => {
  436. async.eachLimit(
  437. jobIds,
  438. 1,
  439. (jobId, next) => {
  440. CacheModule.runJob("PUB", {
  441. channel: "importJob.removed",
  442. value: jobId
  443. })
  444. .then(() => next())
  445. .catch(next);
  446. },
  447. next
  448. );
  449. }
  450. ],
  451. err => {
  452. if (err && err !== true) return reject(new Error(err));
  453. return resolve();
  454. }
  455. );
  456. });
  457. }
  458. }
  459. export default new _MediaModule();