media.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668
  1. import async from "async";
  2. import config from "config";
  3. import CoreClass from "../core";
  4. let MediaModule;
  5. let CacheModule;
  6. let DBModule;
  7. let UtilsModule;
  8. let YouTubeModule;
  9. let SoundCloudModule;
  10. let SpotifyModule;
  11. let SongsModule;
  12. let WSModule;
  13. class _MediaModule extends CoreClass {
  14. // eslint-disable-next-line require-jsdoc
  15. constructor() {
  16. super("media");
  17. MediaModule = this;
  18. }
  19. /**
  20. * Initialises the media module
  21. *
  22. * @returns {Promise} - returns promise (reject, resolve)
  23. */
  24. async initialize() {
  25. this.setStage(1);
  26. CacheModule = this.moduleManager.modules.cache;
  27. DBModule = this.moduleManager.modules.db;
  28. UtilsModule = this.moduleManager.modules.utils;
  29. YouTubeModule = this.moduleManager.modules.youtube;
  30. SoundCloudModule = this.moduleManager.modules.soundcloud;
  31. SpotifyModule = this.moduleManager.modules.spotify;
  32. SongsModule = this.moduleManager.modules.songs;
  33. WSModule = this.moduleManager.modules.ws;
  34. this.RatingsModel = await DBModule.runJob("GET_MODEL", { modelName: "ratings" });
  35. this.RatingsSchemaCache = await CacheModule.runJob("GET_SCHEMA", { schemaName: "ratings" });
  36. this.ImportJobModel = await DBModule.runJob("GET_MODEL", { modelName: "importJob" });
  37. this.setStage(2);
  38. return new Promise((resolve, reject) => {
  39. CacheModule.runJob("SUB", {
  40. channel: "importJob.updated",
  41. cb: importJob => {
  42. WSModule.runJob("EMIT_TO_ROOM", {
  43. room: "admin.import",
  44. args: ["event:admin.importJob.updated", { data: { importJob } }]
  45. });
  46. }
  47. });
  48. CacheModule.runJob("SUB", {
  49. channel: "importJob.removed",
  50. cb: jobId => {
  51. WSModule.runJob("EMIT_TO_ROOM", {
  52. room: "admin.import",
  53. args: ["event:admin.importJob.removed", { data: { jobId } }]
  54. });
  55. }
  56. });
  57. async.waterfall(
  58. [
  59. next => {
  60. this.setStage(2);
  61. CacheModule.runJob("HGETALL", { table: "ratings" })
  62. .then(ratings => {
  63. next(null, ratings);
  64. })
  65. .catch(next);
  66. },
  67. (ratings, next) => {
  68. this.setStage(3);
  69. if (!ratings) return next();
  70. const mediaSources = Object.keys(ratings);
  71. return async.each(
  72. mediaSources,
  73. (mediaSource, next) => {
  74. MediaModule.RatingsModel.findOne({ mediaSource }, (err, rating) => {
  75. if (err) next(err);
  76. else if (!rating)
  77. CacheModule.runJob("HDEL", {
  78. table: "ratings",
  79. key: mediaSource
  80. })
  81. .then(() => next())
  82. .catch(next);
  83. else next();
  84. });
  85. },
  86. next
  87. );
  88. },
  89. next => {
  90. this.setStage(4);
  91. MediaModule.RatingsModel.find({}, next);
  92. },
  93. (ratings, next) => {
  94. this.setStage(5);
  95. async.each(
  96. ratings,
  97. (rating, next) => {
  98. CacheModule.runJob("HSET", {
  99. table: "ratings",
  100. key: rating.mediaSource,
  101. value: MediaModule.RatingsSchemaCache(rating)
  102. })
  103. .then(() => next())
  104. .catch(next);
  105. },
  106. next
  107. );
  108. }
  109. ],
  110. async err => {
  111. if (err) {
  112. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  113. reject(new Error(err));
  114. } else resolve();
  115. }
  116. );
  117. });
  118. }
  119. /**
  120. * Recalculates dislikes and likes
  121. *
  122. * @param {object} payload - returns an object containing the payload
  123. * @param {string} payload.mediaSource - the media source
  124. * @returns {Promise} - returns a promise (resolve, reject)
  125. */
  126. async RECALCULATE_RATINGS(payload) {
  127. const playlistModel = await DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this);
  128. return new Promise((resolve, reject) => {
  129. async.waterfall(
  130. [
  131. next => {
  132. playlistModel.countDocuments(
  133. { songs: { $elemMatch: { mediaSource: payload.mediaSource } }, type: "user-liked" },
  134. (err, likes) => {
  135. if (err) return next(err);
  136. return next(null, likes);
  137. }
  138. );
  139. },
  140. (likes, next) => {
  141. playlistModel.countDocuments(
  142. { songs: { $elemMatch: { mediaSource: payload.mediaSource } }, type: "user-disliked" },
  143. (err, dislikes) => {
  144. if (err) return next(err);
  145. return next(err, { likes, dislikes });
  146. }
  147. );
  148. },
  149. ({ likes, dislikes }, next) => {
  150. MediaModule.RatingsModel.findOneAndUpdate(
  151. { mediaSource: payload.mediaSource },
  152. {
  153. $set: {
  154. likes,
  155. dislikes
  156. }
  157. },
  158. { new: true, upsert: true },
  159. next
  160. );
  161. },
  162. (ratings, next) => {
  163. CacheModule.runJob(
  164. "HSET",
  165. {
  166. table: "ratings",
  167. key: payload.mediaSource,
  168. value: ratings
  169. },
  170. this
  171. )
  172. .then(ratings => next(null, ratings))
  173. .catch(next);
  174. }
  175. ],
  176. (err, { likes, dislikes }) => {
  177. if (err) return reject(new Error(err));
  178. return resolve({ likes, dislikes });
  179. }
  180. );
  181. });
  182. }
  183. /**
  184. * Recalculates all dislikes and likes
  185. *
  186. * @returns {Promise} - returns a promise (resolve, reject)
  187. */
  188. RECALCULATE_ALL_RATINGS() {
  189. return new Promise((resolve, reject) => {
  190. async.waterfall(
  191. [
  192. next => {
  193. SongsModule.SongModel.find({}, { mediaSource: true }, next);
  194. },
  195. (songs, next) => {
  196. // TODO support spotify
  197. YouTubeModule.youtubeVideoModel.find({}, { youtubeId: true }, (err, videos) => {
  198. if (err) next(err);
  199. else
  200. next(null, [
  201. ...songs.map(song => song.mediaSource),
  202. ...videos.map(video => `youtube:${video.youtubeId}`)
  203. ]);
  204. });
  205. },
  206. (mediaSources, next) => {
  207. async.eachLimit(
  208. mediaSources,
  209. 2,
  210. (mediaSource, next) => {
  211. this.publishProgress({
  212. status: "update",
  213. message: `Recalculating ratings for ${mediaSource}`
  214. });
  215. MediaModule.runJob("RECALCULATE_RATINGS", { mediaSource }, this)
  216. .then(() => {
  217. next();
  218. })
  219. .catch(err => {
  220. next(err);
  221. });
  222. },
  223. err => {
  224. next(err);
  225. }
  226. );
  227. }
  228. ],
  229. err => {
  230. if (err) return reject(new Error(err));
  231. return resolve();
  232. }
  233. );
  234. });
  235. }
  236. /**
  237. * Gets ratings by id from the cache or Mongo, and if it isn't in the cache yet, adds it the cache
  238. *
  239. * @param {object} payload - object containing the payload
  240. * @param {string} payload.mediaSource - the media source
  241. * @param {string} payload.createMissing - whether to create missing ratings
  242. * @returns {Promise} - returns a promise (resolve, reject)
  243. */
  244. GET_RATINGS(payload) {
  245. return new Promise((resolve, reject) => {
  246. async.waterfall(
  247. [
  248. next =>
  249. CacheModule.runJob("HGET", { table: "ratings", key: payload.mediaSource }, this)
  250. .then(ratings => next(null, ratings))
  251. .catch(next),
  252. (ratings, next) => {
  253. if (ratings) return next(true, ratings);
  254. return MediaModule.RatingsModel.findOne({ mediaSource: payload.mediaSource }, next);
  255. },
  256. (ratings, next) => {
  257. if (ratings)
  258. return CacheModule.runJob(
  259. "HSET",
  260. {
  261. table: "ratings",
  262. key: payload.mediaSource,
  263. value: ratings
  264. },
  265. this
  266. ).then(ratings => next(true, ratings));
  267. if (!payload.createMissing) return next("Ratings not found.");
  268. return MediaModule.runJob("RECALCULATE_RATINGS", { mediaSource: payload.mediaSource }, this)
  269. .then(() => next())
  270. .catch(next);
  271. },
  272. next =>
  273. MediaModule.runJob("GET_RATINGS", { mediaSource: payload.mediaSource }, this)
  274. .then(res => next(null, res.ratings))
  275. .catch(next)
  276. ],
  277. (err, ratings) => {
  278. if (err && err !== true) return reject(new Error(err));
  279. return resolve({ ratings });
  280. }
  281. );
  282. });
  283. }
  284. /**
  285. * Remove ratings by id from the cache and Mongo
  286. *
  287. * @param {object} payload - object containing the payload
  288. * @param {string} payload.mediaSources - the media source
  289. * @returns {Promise} - returns a promise (resolve, reject)
  290. */
  291. REMOVE_RATINGS(payload) {
  292. return new Promise((resolve, reject) => {
  293. let { mediaSources } = payload;
  294. if (!Array.isArray(mediaSources)) mediaSources = [mediaSources];
  295. async.eachLimit(
  296. mediaSources,
  297. 1,
  298. (mediaSource, next) => {
  299. async.waterfall(
  300. [
  301. next => {
  302. MediaModule.RatingsModel.deleteOne({ mediaSource }, err => {
  303. if (err) next(err);
  304. else next();
  305. });
  306. },
  307. next => {
  308. CacheModule.runJob("HDEL", { table: "ratings", key: mediaSource }, this)
  309. .then(() => {
  310. next();
  311. })
  312. .catch(next);
  313. }
  314. ],
  315. next
  316. );
  317. },
  318. err => {
  319. if (err && err !== true) return reject(new Error(err));
  320. return resolve();
  321. }
  322. );
  323. });
  324. }
  325. /**
  326. * Get song or youtube video by mediaSource
  327. *
  328. * @param {object} payload - an object containing the payload
  329. * @param {string} payload.mediaSource - the media source of the song/video
  330. * @param {string} payload.userId - the user id
  331. * @returns {Promise} - returns a promise (resolve, reject)
  332. */
  333. GET_MEDIA(payload) {
  334. return new Promise((resolve, reject) => {
  335. async.waterfall(
  336. [
  337. next => {
  338. SongsModule.SongModel.findOne({ mediaSource: payload.mediaSource }, next);
  339. },
  340. (song, next) => {
  341. if (song && song.duration > 0) return next(true, song);
  342. if (payload.mediaSource.startsWith("youtube:")) {
  343. const youtubeId = payload.mediaSource.split(":")[1];
  344. return YouTubeModule.runJob(
  345. "GET_VIDEOS",
  346. { identifiers: [youtubeId], createMissing: true },
  347. this
  348. )
  349. .then(response => {
  350. const { youtubeId, title, author, duration } = response.videos[0];
  351. next(null, song, {
  352. mediaSource: `youtube:${youtubeId}`,
  353. title,
  354. artists: [author],
  355. duration
  356. });
  357. })
  358. .catch(next);
  359. }
  360. if (config.get("experimental.soundcloud")) {
  361. if (payload.mediaSource.startsWith("soundcloud:")) {
  362. const trackId = payload.mediaSource.split(":")[1];
  363. return SoundCloudModule.runJob(
  364. "GET_TRACK",
  365. { identifier: trackId, createMissing: true },
  366. this
  367. )
  368. .then(response => {
  369. const { trackId, title, username, artworkUrl, duration } = response.track;
  370. next(null, song, {
  371. mediaSource: `soundcloud:${trackId}`,
  372. title,
  373. artists: [username],
  374. thumbnail: artworkUrl,
  375. duration
  376. });
  377. })
  378. .catch(next);
  379. }
  380. if (payload.mediaSource.indexOf("soundcloud.com") !== -1) {
  381. return SoundCloudModule.runJob(
  382. "GET_TRACK_FROM_URL",
  383. { identifier: payload.mediaSource, createMissing: true },
  384. this
  385. )
  386. .then(response => {
  387. const { trackId, title, username, artworkUrl, duration } = response.track;
  388. next(null, song, {
  389. mediaSource: `soundcloud:${trackId}`,
  390. title,
  391. artists: [username],
  392. thumbnail: artworkUrl,
  393. duration
  394. });
  395. })
  396. .catch(next);
  397. }
  398. }
  399. if (config.get("experimental.spotify") && payload.mediaSource.startsWith("spotify:")) {
  400. const trackId = payload.mediaSource.split(":")[1];
  401. return SpotifyModule.runJob("GET_TRACK", { identifier: trackId, createMissing: true }, this)
  402. .then(response => {
  403. const { trackId, name, artists, albumImageUrl, duration } = response.track;
  404. next(null, song, {
  405. mediaSource: `spotify:${trackId}`,
  406. title: name,
  407. artists,
  408. thumbnail: albumImageUrl,
  409. duration
  410. });
  411. })
  412. .catch(next);
  413. }
  414. return next("Invalid media source provided.");
  415. },
  416. (song, youtubeVideo, next) => {
  417. if (song && song.duration <= 0) {
  418. song.duration = youtubeVideo.duration;
  419. song.save({ validateBeforeSave: true }, err => {
  420. if (err) next(err, song);
  421. next(null, song);
  422. });
  423. } else {
  424. next(null, {
  425. ...youtubeVideo,
  426. skipDuration: 0,
  427. requestedBy: payload.userId,
  428. requestedAt: Date.now(),
  429. verified: false
  430. });
  431. }
  432. }
  433. ],
  434. (err, song) => {
  435. if (err && err !== true) return reject(new Error(err));
  436. return resolve({ song });
  437. }
  438. );
  439. });
  440. }
  441. /**
  442. * Gets media from media sources
  443. *
  444. * @param {object} payload - an object containing the payload
  445. * @param {string} payload.mediaSources - the media sources
  446. * @returns {Promise} - returns a promise (resolve, reject)
  447. */
  448. GET_MEDIA_FROM_MEDIA_SOURCES(payload) {
  449. return new Promise((resolve, reject) => {
  450. const songMap = {};
  451. const youtubeMediaSources = payload.mediaSources.filter(mediaSource => mediaSource.startsWith("youtube:"));
  452. const soundcloudMediaSources = payload.mediaSources.filter(mediaSource =>
  453. mediaSource.startsWith("soundcloud:")
  454. );
  455. async.waterfall(
  456. [
  457. next => {
  458. const allPromises = [];
  459. youtubeMediaSources.forEach(mediaSource => {
  460. const youtubeId = mediaSource.split(":")[1];
  461. const promise = YouTubeModule.runJob(
  462. "GET_VIDEOS",
  463. { identifiers: [youtubeId], createMissing: true },
  464. this
  465. )
  466. .then(response => {
  467. const { youtubeId, title, author, duration } = response.videos[0];
  468. songMap[mediaSource] = {
  469. mediaSource: `youtube:${youtubeId}`,
  470. title,
  471. artists: [author],
  472. duration
  473. };
  474. })
  475. .catch(err => {
  476. MediaModule.log(
  477. "ERROR",
  478. `Failed to get media in GET_MEDIA_FROM_MEDIA_SOURCES with mediaSource ${mediaSource} and error`,
  479. typeof err === "string" ? err : err.message
  480. );
  481. });
  482. allPromises.push(promise);
  483. });
  484. if (config.get("experimental.soundcloud"))
  485. soundcloudMediaSources.forEach(mediaSource => {
  486. const trackId = mediaSource.split(":")[1];
  487. const promise = SoundCloudModule.runJob(
  488. "GET_TRACK",
  489. { identifier: trackId, createMissing: true },
  490. this
  491. )
  492. .then(response => {
  493. const { trackId, title, username, artworkUrl, duration } = response.track;
  494. songMap[mediaSource] = {
  495. mediaSource: `soundcloud:${trackId}`,
  496. title,
  497. artists: [username],
  498. thumbnail: artworkUrl,
  499. duration
  500. };
  501. })
  502. .catch(err => {
  503. MediaModule.log(
  504. "ERROR",
  505. `Failed to get media in GET_MEDIA_FROM_MEDIA_SOURCES with mediaSource ${mediaSource} and error`,
  506. typeof err === "string" ? err : err.message
  507. );
  508. });
  509. allPromises.push(promise);
  510. });
  511. Promise.allSettled(allPromises).then(() => {
  512. next();
  513. });
  514. }
  515. ],
  516. err => {
  517. if (err && err !== true) return reject(new Error(err));
  518. return resolve(songMap);
  519. }
  520. );
  521. });
  522. }
  523. /**
  524. * Remove import job by id from Mongo
  525. *
  526. * @param {object} payload - object containing the payload
  527. * @param {string} payload.jobIds - the job ids
  528. * @returns {Promise} - returns a promise (resolve, reject)
  529. */
  530. UPDATE_IMPORT_JOBS(payload) {
  531. return new Promise((resolve, reject) => {
  532. let { jobIds } = payload;
  533. if (!Array.isArray(jobIds)) jobIds = [jobIds];
  534. async.waterfall(
  535. [
  536. next => {
  537. MediaModule.ImportJobModel.find({ _id: { $in: jobIds } }, next);
  538. },
  539. (importJobs, next) => {
  540. async.eachLimit(
  541. importJobs,
  542. 1,
  543. (importJob, next) => {
  544. CacheModule.runJob("PUB", {
  545. channel: "importJob.updated",
  546. value: importJob
  547. })
  548. .then(() => next())
  549. .catch(next);
  550. },
  551. err => {
  552. if (err) next(err);
  553. else next(null, importJobs);
  554. }
  555. );
  556. }
  557. ],
  558. (err, importJobs) => {
  559. if (err && err !== true) return reject(new Error(err));
  560. return resolve({ importJobs });
  561. }
  562. );
  563. });
  564. }
  565. /**
  566. * Remove import job by id from Mongo
  567. *
  568. * @param {object} payload - object containing the payload
  569. * @param {string} payload.jobIds - the job ids
  570. * @returns {Promise} - returns a promise (resolve, reject)
  571. */
  572. REMOVE_IMPORT_JOBS(payload) {
  573. return new Promise((resolve, reject) => {
  574. let { jobIds } = payload;
  575. if (!Array.isArray(jobIds)) jobIds = [jobIds];
  576. async.waterfall(
  577. [
  578. next => {
  579. MediaModule.ImportJobModel.deleteMany({ _id: { $in: jobIds } }, err => {
  580. if (err) next(err);
  581. else next();
  582. });
  583. },
  584. next => {
  585. async.eachLimit(
  586. jobIds,
  587. 1,
  588. (jobId, next) => {
  589. CacheModule.runJob("PUB", {
  590. channel: "importJob.removed",
  591. value: jobId
  592. })
  593. .then(() => next())
  594. .catch(next);
  595. },
  596. next
  597. );
  598. }
  599. ],
  600. err => {
  601. if (err && err !== true) return reject(new Error(err));
  602. return resolve();
  603. }
  604. );
  605. });
  606. }
  607. }
  608. export default new _MediaModule();