media.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670
  1. import async from "async";
  2. import config from "config";
  3. import CoreClass from "../core";
  4. let MediaModule;
  5. let CacheModule;
  6. let DBModule;
  7. let UtilsModule;
  8. let YouTubeModule;
  9. let SoundCloudModule;
  10. let SpotifyModule;
  11. let SongsModule;
  12. let WSModule;
  13. class _MediaModule extends CoreClass {
  14. // eslint-disable-next-line require-jsdoc
  15. constructor() {
  16. super("media");
  17. MediaModule = this;
  18. }
  19. /**
  20. * Initialises the media module
  21. *
  22. * @returns {Promise} - returns promise (reject, resolve)
  23. */
  24. async initialize() {
  25. this.setStage(1);
  26. CacheModule = this.moduleManager.modules.cache;
  27. DBModule = this.moduleManager.modules.db;
  28. UtilsModule = this.moduleManager.modules.utils;
  29. YouTubeModule = this.moduleManager.modules.youtube;
  30. SoundCloudModule = this.moduleManager.modules.soundcloud;
  31. SpotifyModule = this.moduleManager.modules.spotify;
  32. SongsModule = this.moduleManager.modules.songs;
  33. WSModule = this.moduleManager.modules.ws;
  34. this.RatingsModel = await DBModule.runJob("GET_MODEL", { modelName: "ratings" });
  35. this.RatingsSchemaCache = await CacheModule.runJob("GET_SCHEMA", { schemaName: "ratings" });
  36. this.ImportJobModel = await DBModule.runJob("GET_MODEL", { modelName: "importJob" });
  37. this.setStage(2);
  38. return new Promise((resolve, reject) => {
  39. CacheModule.runJob("SUB", {
  40. channel: "importJob.updated",
  41. cb: importJob => {
  42. WSModule.runJob("EMIT_TO_ROOM", {
  43. room: "admin.import",
  44. args: ["event:admin.importJob.updated", { data: { importJob } }]
  45. });
  46. }
  47. });
  48. CacheModule.runJob("SUB", {
  49. channel: "importJob.removed",
  50. cb: jobId => {
  51. WSModule.runJob("EMIT_TO_ROOM", {
  52. room: "admin.import",
  53. args: ["event:admin.importJob.removed", { data: { jobId } }]
  54. });
  55. }
  56. });
  57. async.waterfall(
  58. [
  59. next => {
  60. this.setStage(2);
  61. CacheModule.runJob("HGETALL", { table: "ratings" })
  62. .then(ratings => {
  63. next(null, ratings);
  64. })
  65. .catch(next);
  66. },
  67. (ratings, next) => {
  68. this.setStage(3);
  69. if (!ratings) return next();
  70. const mediaSources = Object.keys(ratings);
  71. return async.each(
  72. mediaSources,
  73. (mediaSource, next) => {
  74. MediaModule.RatingsModel.findOne({ mediaSource }, (err, rating) => {
  75. if (err) next(err);
  76. else if (!rating)
  77. CacheModule.runJob("HDEL", {
  78. table: "ratings",
  79. key: mediaSource
  80. })
  81. .then(() => next())
  82. .catch(next);
  83. else next();
  84. });
  85. },
  86. next
  87. );
  88. },
  89. next => {
  90. this.setStage(4);
  91. MediaModule.RatingsModel.find({}, next);
  92. },
  93. (ratings, next) => {
  94. this.setStage(5);
  95. async.each(
  96. ratings,
  97. (rating, next) => {
  98. CacheModule.runJob("HSET", {
  99. table: "ratings",
  100. key: rating.mediaSource,
  101. value: MediaModule.RatingsSchemaCache(rating)
  102. })
  103. .then(() => next())
  104. .catch(next);
  105. },
  106. next
  107. );
  108. }
  109. ],
  110. async err => {
  111. if (err) {
  112. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  113. reject(new Error(err));
  114. } else resolve();
  115. }
  116. );
  117. });
  118. }
  119. /**
  120. * Recalculates dislikes and likes
  121. *
  122. * @param {object} payload - returns an object containing the payload
  123. * @param {string} payload.mediaSource - the media source
  124. * @returns {Promise} - returns a promise (resolve, reject)
  125. */
  126. async RECALCULATE_RATINGS(payload) {
  127. const playlistModel = await DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this);
  128. return new Promise((resolve, reject) => {
  129. async.waterfall(
  130. [
  131. next => {
  132. playlistModel.countDocuments(
  133. { songs: { $elemMatch: { mediaSource: payload.mediaSource } }, type: "user-liked" },
  134. (err, likes) => {
  135. if (err) return next(err);
  136. return next(null, likes);
  137. }
  138. );
  139. },
  140. (likes, next) => {
  141. playlistModel.countDocuments(
  142. { songs: { $elemMatch: { mediaSource: payload.mediaSource } }, type: "user-disliked" },
  143. (err, dislikes) => {
  144. if (err) return next(err);
  145. return next(err, { likes, dislikes });
  146. }
  147. );
  148. },
  149. ({ likes, dislikes }, next) => {
  150. MediaModule.RatingsModel.findOneAndUpdate(
  151. { mediaSource: payload.mediaSource },
  152. {
  153. $set: {
  154. likes,
  155. dislikes
  156. }
  157. },
  158. { new: true, upsert: true },
  159. next
  160. );
  161. },
  162. (ratings, next) => {
  163. CacheModule.runJob(
  164. "HSET",
  165. {
  166. table: "ratings",
  167. key: payload.mediaSource,
  168. value: ratings
  169. },
  170. this
  171. )
  172. .then(ratings => next(null, ratings))
  173. .catch(next);
  174. }
  175. ],
  176. (err, { likes, dislikes }) => {
  177. if (err) return reject(new Error(err));
  178. return resolve({ likes, dislikes });
  179. }
  180. );
  181. });
  182. }
  183. /**
  184. * Recalculates all dislikes and likes
  185. *
  186. * @returns {Promise} - returns a promise (resolve, reject)
  187. */
  188. RECALCULATE_ALL_RATINGS() {
  189. return new Promise((resolve, reject) => {
  190. async.waterfall(
  191. [
  192. next => {
  193. SongsModule.SongModel.find({}, { mediaSource: true }, next);
  194. },
  195. (songs, next) => {
  196. // TODO support spotify
  197. YouTubeModule.youtubeVideoModel.find({}, { youtubeId: true }, (err, videos) => {
  198. if (err) next(err);
  199. else
  200. next(null, [
  201. ...songs.map(song => song.mediaSource),
  202. ...videos.map(video => `youtube:${video.youtubeId}`)
  203. ]);
  204. });
  205. },
  206. (mediaSources, next) => {
  207. async.eachLimit(
  208. mediaSources,
  209. 2,
  210. (mediaSource, next) => {
  211. this.publishProgress({
  212. status: "update",
  213. message: `Recalculating ratings for ${mediaSource}`
  214. });
  215. MediaModule.runJob("RECALCULATE_RATINGS", { mediaSource }, this)
  216. .then(() => {
  217. next();
  218. })
  219. .catch(err => {
  220. next(err);
  221. });
  222. },
  223. err => {
  224. next(err);
  225. }
  226. );
  227. }
  228. ],
  229. err => {
  230. if (err) return reject(new Error(err));
  231. return resolve();
  232. }
  233. );
  234. });
  235. }
  236. /**
  237. * Gets ratings by id from the cache or Mongo, and if it isn't in the cache yet, adds it the cache
  238. *
  239. * @param {object} payload - object containing the payload
  240. * @param {string} payload.mediaSource - the media source
  241. * @param {string} payload.createMissing - whether to create missing ratings
  242. * @returns {Promise} - returns a promise (resolve, reject)
  243. */
  244. GET_RATINGS(payload) {
  245. return new Promise((resolve, reject) => {
  246. async.waterfall(
  247. [
  248. next =>
  249. CacheModule.runJob("HGET", { table: "ratings", key: payload.mediaSource }, this)
  250. .then(ratings => next(null, ratings))
  251. .catch(next),
  252. (ratings, next) => {
  253. if (ratings) return next(true, ratings);
  254. return MediaModule.RatingsModel.findOne({ mediaSource: payload.mediaSource }, next);
  255. },
  256. (ratings, next) => {
  257. if (ratings)
  258. return CacheModule.runJob(
  259. "HSET",
  260. {
  261. table: "ratings",
  262. key: payload.mediaSource,
  263. value: ratings
  264. },
  265. this
  266. ).then(ratings => next(true, ratings));
  267. if (!payload.createMissing) return next("Ratings not found.");
  268. return MediaModule.runJob("RECALCULATE_RATINGS", { mediaSource: payload.mediaSource }, this)
  269. .then(() => next())
  270. .catch(next);
  271. },
  272. next =>
  273. MediaModule.runJob("GET_RATINGS", { mediaSource: payload.mediaSource }, this)
  274. .then(res => next(null, res.ratings))
  275. .catch(next)
  276. ],
  277. (err, ratings) => {
  278. if (err && err !== true) return reject(new Error(err));
  279. return resolve({ ratings });
  280. }
  281. );
  282. });
  283. }
  284. /**
  285. * Remove ratings by id from the cache and Mongo
  286. *
  287. * @param {object} payload - object containing the payload
  288. * @param {string} payload.mediaSources - the media source
  289. * @returns {Promise} - returns a promise (resolve, reject)
  290. */
  291. REMOVE_RATINGS(payload) {
  292. return new Promise((resolve, reject) => {
  293. let { mediaSources } = payload;
  294. if (!Array.isArray(mediaSources)) mediaSources = [mediaSources];
  295. async.eachLimit(
  296. mediaSources,
  297. 1,
  298. (mediaSource, next) => {
  299. async.waterfall(
  300. [
  301. next => {
  302. MediaModule.RatingsModel.deleteOne({ mediaSource }, err => {
  303. if (err) next(err);
  304. else next();
  305. });
  306. },
  307. next => {
  308. CacheModule.runJob("HDEL", { table: "ratings", key: mediaSource }, this)
  309. .then(() => {
  310. next();
  311. })
  312. .catch(next);
  313. }
  314. ],
  315. next
  316. );
  317. },
  318. err => {
  319. if (err && err !== true) return reject(new Error(err));
  320. return resolve();
  321. }
  322. );
  323. });
  324. }
  325. /**
  326. * Get song or youtube video by mediaSource
  327. *
  328. * @param {object} payload - an object containing the payload
  329. * @param {string} payload.mediaSource - the media source of the song/video
  330. * @param {string} payload.userId - the user id
  331. * @returns {Promise} - returns a promise (resolve, reject)
  332. */
  333. GET_MEDIA(payload) {
  334. return new Promise((resolve, reject) => {
  335. async.waterfall(
  336. [
  337. next => {
  338. SongsModule.SongModel.findOne({ mediaSource: payload.mediaSource }, next);
  339. },
  340. (song, next) => {
  341. if (song && song.duration > 0) return next(true, song);
  342. console.log(123, payload);
  343. if (payload.mediaSource.startsWith("youtube:")) {
  344. const youtubeId = payload.mediaSource.split(":")[1];
  345. return YouTubeModule.runJob(
  346. "GET_VIDEOS",
  347. { identifiers: [youtubeId], createMissing: true },
  348. this
  349. )
  350. .then(response => {
  351. const { youtubeId, title, author, duration } = response.videos[0];
  352. next(null, song, {
  353. mediaSource: `youtube:${youtubeId}`,
  354. title,
  355. artists: [author],
  356. duration
  357. });
  358. })
  359. .catch(next);
  360. }
  361. if (config.get("experimental.soundcloud")) {
  362. if (payload.mediaSource.startsWith("soundcloud:")) {
  363. const trackId = payload.mediaSource.split(":")[1];
  364. return SoundCloudModule.runJob(
  365. "GET_TRACK",
  366. { identifier: trackId, createMissing: true },
  367. this
  368. )
  369. .then(response => {
  370. const { trackId, title, username, artworkUrl, duration } = response.track;
  371. next(null, song, {
  372. mediaSource: `soundcloud:${trackId}`,
  373. title,
  374. artists: [username],
  375. thumbnail: artworkUrl,
  376. duration
  377. });
  378. })
  379. .catch(next);
  380. }
  381. if (payload.mediaSource.indexOf("soundcloud.com") !== -1) {
  382. return SoundCloudModule.runJob(
  383. "GET_TRACK_FROM_URL",
  384. { identifier: payload.mediaSource, createMissing: true },
  385. this
  386. )
  387. .then(response => {
  388. const { trackId, title, username, artworkUrl, duration } = response.track;
  389. next(null, song, {
  390. mediaSource: `soundcloud:${trackId}`,
  391. title,
  392. artists: [username],
  393. thumbnail: artworkUrl,
  394. duration
  395. });
  396. })
  397. .catch(next);
  398. }
  399. }
  400. if (config.get("experimental.spotify") && payload.mediaSource.startsWith("spotify:")) {
  401. const trackId = payload.mediaSource.split(":")[1];
  402. return SpotifyModule.runJob("GET_TRACK", { identifier: trackId, createMissing: true }, this)
  403. .then(response => {
  404. const { trackId, name, artists, albumImageUrl, duration } = response.track;
  405. next(null, song, {
  406. mediaSource: `spotify:${trackId}`,
  407. title: name,
  408. artists,
  409. thumbnail: albumImageUrl,
  410. duration
  411. });
  412. })
  413. .catch(next);
  414. }
  415. return next("Invalid media source provided.");
  416. },
  417. (song, youtubeVideo, next) => {
  418. if (song && song.duration <= 0) {
  419. song.duration = youtubeVideo.duration;
  420. song.save({ validateBeforeSave: true }, err => {
  421. if (err) next(err, song);
  422. next(null, song);
  423. });
  424. } else {
  425. next(null, {
  426. ...youtubeVideo,
  427. skipDuration: 0,
  428. requestedBy: payload.userId,
  429. requestedAt: Date.now(),
  430. verified: false
  431. });
  432. }
  433. }
  434. ],
  435. (err, song) => {
  436. if (err && err !== true) return reject(new Error(err));
  437. return resolve({ song });
  438. }
  439. );
  440. });
  441. }
  442. /**
  443. * Gets media from media sources
  444. *
  445. * @param {object} payload - an object containing the payload
  446. * @param {string} payload.mediaSources - the media sources
  447. * @returns {Promise} - returns a promise (resolve, reject)
  448. */
  449. GET_MEDIA_FROM_MEDIA_SOURCES(payload) {
  450. return new Promise((resolve, reject) => {
  451. const songMap = {};
  452. const youtubeMediaSources = payload.mediaSources.filter(mediaSource => mediaSource.startsWith("youtube:"));
  453. const soundcloudMediaSources = payload.mediaSources.filter(mediaSource =>
  454. mediaSource.startsWith("soundcloud:")
  455. );
  456. async.waterfall(
  457. [
  458. next => {
  459. const allPromises = [];
  460. youtubeMediaSources.forEach(mediaSource => {
  461. const youtubeId = mediaSource.split(":")[1];
  462. const promise = YouTubeModule.runJob(
  463. "GET_VIDEOS",
  464. { identifiers: [youtubeId], createMissing: true },
  465. this
  466. )
  467. .then(response => {
  468. const { youtubeId, title, author, duration } = response.videos[0];
  469. songMap[mediaSource] = {
  470. mediaSource: `youtube:${youtubeId}`,
  471. title,
  472. artists: [author],
  473. duration
  474. };
  475. })
  476. .catch(err => {
  477. MediaModule.log(
  478. "ERROR",
  479. `Failed to get media in GET_MEDIA_FROM_MEDIA_SOURCES with mediaSource ${mediaSource} and error`,
  480. typeof err === "string" ? err : err.message
  481. );
  482. });
  483. allPromises.push(promise);
  484. });
  485. if (config.get("experimental.soundcloud"))
  486. soundcloudMediaSources.forEach(mediaSource => {
  487. const trackId = mediaSource.split(":")[1];
  488. const promise = SoundCloudModule.runJob(
  489. "GET_TRACK",
  490. { identifier: trackId, createMissing: true },
  491. this
  492. )
  493. .then(response => {
  494. const { trackId, title, username, artworkUrl, duration } = response.track;
  495. songMap[mediaSource] = {
  496. mediaSource: `soundcloud:${trackId}`,
  497. title,
  498. artists: [username],
  499. thumbnail: artworkUrl,
  500. duration
  501. };
  502. })
  503. .catch(err => {
  504. MediaModule.log(
  505. "ERROR",
  506. `Failed to get media in GET_MEDIA_FROM_MEDIA_SOURCES with mediaSource ${mediaSource} and error`,
  507. typeof err === "string" ? err : err.message
  508. );
  509. });
  510. allPromises.push(promise);
  511. });
  512. Promise.allSettled(allPromises).then(() => {
  513. next();
  514. });
  515. }
  516. ],
  517. err => {
  518. if (err && err !== true) return reject(new Error(err));
  519. return resolve(songMap);
  520. }
  521. );
  522. });
  523. }
  524. /**
  525. * Remove import job by id from Mongo
  526. *
  527. * @param {object} payload - object containing the payload
  528. * @param {string} payload.jobIds - the job ids
  529. * @returns {Promise} - returns a promise (resolve, reject)
  530. */
  531. UPDATE_IMPORT_JOBS(payload) {
  532. return new Promise((resolve, reject) => {
  533. let { jobIds } = payload;
  534. if (!Array.isArray(jobIds)) jobIds = [jobIds];
  535. async.waterfall(
  536. [
  537. next => {
  538. MediaModule.ImportJobModel.find({ _id: { $in: jobIds } }, next);
  539. },
  540. (importJobs, next) => {
  541. async.eachLimit(
  542. importJobs,
  543. 1,
  544. (importJob, next) => {
  545. CacheModule.runJob("PUB", {
  546. channel: "importJob.updated",
  547. value: importJob
  548. })
  549. .then(() => next())
  550. .catch(next);
  551. },
  552. err => {
  553. if (err) next(err);
  554. else next(null, importJobs);
  555. }
  556. );
  557. }
  558. ],
  559. (err, importJobs) => {
  560. if (err && err !== true) return reject(new Error(err));
  561. return resolve({ importJobs });
  562. }
  563. );
  564. });
  565. }
  566. /**
  567. * Remove import job by id from Mongo
  568. *
  569. * @param {object} payload - object containing the payload
  570. * @param {string} payload.jobIds - the job ids
  571. * @returns {Promise} - returns a promise (resolve, reject)
  572. */
  573. REMOVE_IMPORT_JOBS(payload) {
  574. return new Promise((resolve, reject) => {
  575. let { jobIds } = payload;
  576. if (!Array.isArray(jobIds)) jobIds = [jobIds];
  577. async.waterfall(
  578. [
  579. next => {
  580. MediaModule.ImportJobModel.deleteMany({ _id: { $in: jobIds } }, err => {
  581. if (err) next(err);
  582. else next();
  583. });
  584. },
  585. next => {
  586. async.eachLimit(
  587. jobIds,
  588. 1,
  589. (jobId, next) => {
  590. CacheModule.runJob("PUB", {
  591. channel: "importJob.removed",
  592. value: jobId
  593. })
  594. .then(() => next())
  595. .catch(next);
  596. },
  597. next
  598. );
  599. }
  600. ],
  601. err => {
  602. if (err && err !== true) return reject(new Error(err));
  603. return resolve();
  604. }
  605. );
  606. });
  607. }
  608. }
  609. export default new _MediaModule();