media.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672
  1. import async from "async";
  2. import config from "config";
  3. import CoreClass from "../core";
  4. let MediaModule;
  5. let CacheModule;
  6. let DBModule;
  7. let UtilsModule;
  8. let YouTubeModule;
  9. let SoundCloudModule;
  10. let SpotifyModule;
  11. let SongsModule;
  12. let WSModule;
  13. class _MediaModule extends CoreClass {
  14. // eslint-disable-next-line require-jsdoc
  15. constructor() {
  16. super("media");
  17. MediaModule = this;
  18. }
  19. /**
  20. * Initialises the media module
  21. *
  22. * @returns {Promise} - returns promise (reject, resolve)
  23. */
  24. async initialize() {
  25. this.setStage(1);
  26. CacheModule = this.moduleManager.modules.cache;
  27. DBModule = this.moduleManager.modules.db;
  28. UtilsModule = this.moduleManager.modules.utils;
  29. YouTubeModule = this.moduleManager.modules.youtube;
  30. SoundCloudModule = this.moduleManager.modules.soundcloud;
  31. SpotifyModule = this.moduleManager.modules.spotify;
  32. SongsModule = this.moduleManager.modules.songs;
  33. WSModule = this.moduleManager.modules.ws;
  34. this.RatingsModel = await DBModule.runJob("GET_MODEL", { modelName: "ratings" });
  35. this.RatingsSchemaCache = await CacheModule.runJob("GET_SCHEMA", { schemaName: "ratings" });
  36. this.ImportJobModel = await DBModule.runJob("GET_MODEL", { modelName: "importJob" });
  37. this.setStage(2);
  38. return new Promise((resolve, reject) => {
  39. CacheModule.runJob("SUB", {
  40. channel: "importJob.updated",
  41. cb: importJob => {
  42. WSModule.runJob("EMIT_TO_ROOM", {
  43. room: "admin.import",
  44. args: ["event:admin.importJob.updated", { data: { importJob } }]
  45. });
  46. }
  47. });
  48. CacheModule.runJob("SUB", {
  49. channel: "importJob.removed",
  50. cb: jobId => {
  51. WSModule.runJob("EMIT_TO_ROOM", {
  52. room: "admin.import",
  53. args: ["event:admin.importJob.removed", { data: { jobId } }]
  54. });
  55. }
  56. });
  57. async.waterfall(
  58. [
  59. next => {
  60. this.setStage(2);
  61. CacheModule.runJob("HGETALL", { table: "ratings" })
  62. .then(ratings => {
  63. next(null, ratings);
  64. })
  65. .catch(next);
  66. },
  67. (ratings, next) => {
  68. this.setStage(3);
  69. if (!ratings) return next();
  70. const mediaSources = Object.keys(ratings);
  71. return async.each(
  72. mediaSources,
  73. (mediaSource, next) => {
  74. MediaModule.RatingsModel.findOne({ mediaSource }, (err, rating) => {
  75. if (err) next(err);
  76. else if (!rating)
  77. CacheModule.runJob("HDEL", {
  78. table: "ratings",
  79. key: mediaSource
  80. })
  81. .then(() => next())
  82. .catch(next);
  83. else next();
  84. });
  85. },
  86. next
  87. );
  88. },
  89. next => {
  90. this.setStage(4);
  91. MediaModule.RatingsModel.find({}, next);
  92. },
  93. (ratings, next) => {
  94. this.setStage(5);
  95. async.each(
  96. ratings,
  97. (rating, next) => {
  98. CacheModule.runJob("HSET", {
  99. table: "ratings",
  100. key: rating.mediaSource,
  101. value: MediaModule.RatingsSchemaCache(rating)
  102. })
  103. .then(() => next())
  104. .catch(next);
  105. },
  106. next
  107. );
  108. }
  109. ],
  110. async err => {
  111. if (err) {
  112. err = await UtilsModule.runJob("GET_ERROR", { error: err });
  113. reject(new Error(err));
  114. } else resolve();
  115. }
  116. );
  117. });
  118. }
  119. /**
  120. * Recalculates dislikes and likes
  121. *
  122. * @param {object} payload - returns an object containing the payload
  123. * @param {string} payload.mediaSource - the media source
  124. * @returns {Promise} - returns a promise (resolve, reject)
  125. */
  126. async RECALCULATE_RATINGS(payload) {
  127. const playlistModel = await DBModule.runJob("GET_MODEL", { modelName: "playlist" }, this);
  128. return new Promise((resolve, reject) => {
  129. async.waterfall(
  130. [
  131. next => {
  132. playlistModel.countDocuments(
  133. { songs: { $elemMatch: { mediaSource: payload.mediaSource } }, type: "user-liked" },
  134. (err, likes) => {
  135. if (err) return next(err);
  136. return next(null, likes);
  137. }
  138. );
  139. },
  140. (likes, next) => {
  141. playlistModel.countDocuments(
  142. { songs: { $elemMatch: { mediaSource: payload.mediaSource } }, type: "user-disliked" },
  143. (err, dislikes) => {
  144. if (err) return next(err);
  145. return next(err, { likes, dislikes });
  146. }
  147. );
  148. },
  149. ({ likes, dislikes }, next) => {
  150. MediaModule.RatingsModel.findOneAndUpdate(
  151. { mediaSource: payload.mediaSource },
  152. {
  153. $set: {
  154. likes,
  155. dislikes
  156. }
  157. },
  158. { new: true, upsert: true },
  159. next
  160. );
  161. },
  162. (ratings, next) => {
  163. CacheModule.runJob(
  164. "HSET",
  165. {
  166. table: "ratings",
  167. key: payload.mediaSource,
  168. value: ratings
  169. },
  170. this
  171. )
  172. .then(ratings => next(null, ratings))
  173. .catch(next);
  174. }
  175. ],
  176. (err, { likes, dislikes }) => {
  177. if (err) return reject(new Error(err));
  178. return resolve({ likes, dislikes });
  179. }
  180. );
  181. });
  182. }
  183. /**
  184. * Recalculates all dislikes and likes
  185. *
  186. * @returns {Promise} - returns a promise (resolve, reject)
  187. */
  188. RECALCULATE_ALL_RATINGS() {
  189. return new Promise((resolve, reject) => {
  190. async.waterfall(
  191. [
  192. next => {
  193. SongsModule.SongModel.find({}, { mediaSource: true }, next);
  194. },
  195. (songs, next) => {
  196. // TODO support spotify
  197. YouTubeModule.youtubeVideoModel.find({}, { youtubeId: true }, (err, videos) => {
  198. if (err) next(err);
  199. else
  200. next(null, [
  201. ...songs.map(song => song.mediaSource),
  202. ...videos.map(video => `youtube:${video.youtubeId}`)
  203. ]);
  204. });
  205. },
  206. (mediaSources, next) => {
  207. async.eachLimit(
  208. mediaSources,
  209. 2,
  210. (mediaSource, next) => {
  211. this.publishProgress({
  212. status: "update",
  213. message: `Recalculating ratings for ${mediaSource}`
  214. });
  215. MediaModule.runJob("RECALCULATE_RATINGS", { mediaSource }, this)
  216. .then(() => {
  217. next();
  218. })
  219. .catch(err => {
  220. next(err);
  221. });
  222. },
  223. err => {
  224. next(err);
  225. }
  226. );
  227. }
  228. ],
  229. err => {
  230. if (err) return reject(new Error(err));
  231. return resolve();
  232. }
  233. );
  234. });
  235. }
  236. /**
  237. * Gets ratings by id from the cache or Mongo, and if it isn't in the cache yet, adds it the cache
  238. *
  239. * @param {object} payload - object containing the payload
  240. * @param {string} payload.mediaSource - the media source
  241. * @param {string} payload.createMissing - whether to create missing ratings
  242. * @returns {Promise} - returns a promise (resolve, reject)
  243. */
  244. GET_RATINGS(payload) {
  245. return new Promise((resolve, reject) => {
  246. async.waterfall(
  247. [
  248. next =>
  249. CacheModule.runJob("HGET", { table: "ratings", key: payload.mediaSource }, this)
  250. .then(ratings => next(null, ratings))
  251. .catch(next),
  252. (ratings, next) => {
  253. if (ratings) return next(true, ratings);
  254. return MediaModule.RatingsModel.findOne({ mediaSource: payload.mediaSource }, next);
  255. },
  256. (ratings, next) => {
  257. if (ratings)
  258. return CacheModule.runJob(
  259. "HSET",
  260. {
  261. table: "ratings",
  262. key: payload.mediaSource,
  263. value: ratings
  264. },
  265. this
  266. ).then(ratings => next(true, ratings));
  267. if (!payload.createMissing) return next("Ratings not found.");
  268. return MediaModule.runJob("RECALCULATE_RATINGS", { mediaSource: payload.mediaSource }, this)
  269. .then(() => next())
  270. .catch(next);
  271. },
  272. next =>
  273. MediaModule.runJob("GET_RATINGS", { mediaSource: payload.mediaSource }, this)
  274. .then(res => next(null, res.ratings))
  275. .catch(next)
  276. ],
  277. (err, ratings) => {
  278. if (err && err !== true) return reject(new Error(err));
  279. return resolve({ ratings });
  280. }
  281. );
  282. });
  283. }
  284. /**
  285. * Remove ratings by id from the cache and Mongo
  286. *
  287. * @param {object} payload - object containing the payload
  288. * @param {string} payload.mediaSources - the media source
  289. * @returns {Promise} - returns a promise (resolve, reject)
  290. */
  291. REMOVE_RATINGS(payload) {
  292. return new Promise((resolve, reject) => {
  293. let { mediaSources } = payload;
  294. if (!Array.isArray(mediaSources)) mediaSources = [mediaSources];
  295. async.eachLimit(
  296. mediaSources,
  297. 1,
  298. (mediaSource, next) => {
  299. async.waterfall(
  300. [
  301. next => {
  302. MediaModule.RatingsModel.deleteOne({ mediaSource }, err => {
  303. if (err) next(err);
  304. else next();
  305. });
  306. },
  307. next => {
  308. CacheModule.runJob("HDEL", { table: "ratings", key: mediaSource }, this)
  309. .then(() => {
  310. next();
  311. })
  312. .catch(next);
  313. }
  314. ],
  315. next
  316. );
  317. },
  318. err => {
  319. if (err && err !== true) return reject(new Error(err));
  320. return resolve();
  321. }
  322. );
  323. });
  324. }
  325. /**
  326. * Get song or youtube video by mediaSource
  327. *
  328. * @param {object} payload - an object containing the payload
  329. * @param {string} payload.mediaSource - the media source of the song/video
  330. * @param {string} payload.userId - the user id
  331. * @returns {Promise} - returns a promise (resolve, reject)
  332. */
  333. GET_MEDIA(payload) {
  334. return new Promise((resolve, reject) => {
  335. async.waterfall(
  336. [
  337. next => {
  338. SongsModule.SongModel.findOne({ mediaSource: payload.mediaSource }, next);
  339. },
  340. (song, next) => {
  341. if (song && song.duration > 0) return next(true, song);
  342. if (payload.mediaSource.startsWith("youtube:")) {
  343. const youtubeId = payload.mediaSource.split(":")[1];
  344. return YouTubeModule.runJob(
  345. "GET_VIDEOS",
  346. { identifiers: [youtubeId], createMissing: true },
  347. this
  348. )
  349. .then(response => {
  350. if (response.videos.length === 0) {
  351. next("Media not found.");
  352. return;
  353. }
  354. const { youtubeId, title, author, duration } = response.videos[0];
  355. next(null, song, {
  356. mediaSource: `youtube:${youtubeId}`,
  357. title,
  358. artists: [author],
  359. duration
  360. });
  361. })
  362. .catch(next);
  363. }
  364. if (config.get("experimental.soundcloud")) {
  365. if (payload.mediaSource.startsWith("soundcloud:")) {
  366. const trackId = payload.mediaSource.split(":")[1];
  367. return SoundCloudModule.runJob(
  368. "GET_TRACK",
  369. { identifier: trackId, createMissing: true },
  370. this
  371. )
  372. .then(response => {
  373. const { trackId, title, username, artworkUrl, duration } = response.track;
  374. next(null, song, {
  375. mediaSource: `soundcloud:${trackId}`,
  376. title,
  377. artists: [username],
  378. thumbnail: artworkUrl,
  379. duration
  380. });
  381. })
  382. .catch(next);
  383. }
  384. if (payload.mediaSource.indexOf("soundcloud.com") !== -1) {
  385. return SoundCloudModule.runJob(
  386. "GET_TRACK_FROM_URL",
  387. { identifier: payload.mediaSource, createMissing: true },
  388. this
  389. )
  390. .then(response => {
  391. const { trackId, title, username, artworkUrl, duration } = response.track;
  392. next(null, song, {
  393. mediaSource: `soundcloud:${trackId}`,
  394. title,
  395. artists: [username],
  396. thumbnail: artworkUrl,
  397. duration
  398. });
  399. })
  400. .catch(next);
  401. }
  402. }
  403. if (config.get("experimental.spotify") && payload.mediaSource.startsWith("spotify:")) {
  404. const trackId = payload.mediaSource.split(":")[1];
  405. return SpotifyModule.runJob("GET_TRACK", { identifier: trackId, createMissing: true }, this)
  406. .then(response => {
  407. const { trackId, name, artists, albumImageUrl, duration } = response.track;
  408. next(null, song, {
  409. mediaSource: `spotify:${trackId}`,
  410. title: name,
  411. artists,
  412. thumbnail: albumImageUrl,
  413. duration
  414. });
  415. })
  416. .catch(next);
  417. }
  418. return next("Invalid media source provided.");
  419. },
  420. (song, youtubeVideo, next) => {
  421. if (song && song.duration <= 0) {
  422. song.duration = youtubeVideo.duration;
  423. song.save({ validateBeforeSave: true }, err => {
  424. if (err) next(err, song);
  425. next(null, song);
  426. });
  427. } else {
  428. next(null, {
  429. ...youtubeVideo,
  430. skipDuration: 0,
  431. requestedBy: payload.userId,
  432. requestedAt: Date.now(),
  433. verified: false
  434. });
  435. }
  436. }
  437. ],
  438. (err, song) => {
  439. if (err && err !== true) return reject(new Error(err));
  440. return resolve({ song });
  441. }
  442. );
  443. });
  444. }
  445. /**
  446. * Gets media from media sources
  447. *
  448. * @param {object} payload - an object containing the payload
  449. * @param {string} payload.mediaSources - the media sources
  450. * @returns {Promise} - returns a promise (resolve, reject)
  451. */
  452. GET_MEDIA_FROM_MEDIA_SOURCES(payload) {
  453. return new Promise((resolve, reject) => {
  454. const songMap = {};
  455. const youtubeMediaSources = payload.mediaSources.filter(mediaSource => mediaSource.startsWith("youtube:"));
  456. const soundcloudMediaSources = payload.mediaSources.filter(mediaSource =>
  457. mediaSource.startsWith("soundcloud:")
  458. );
  459. async.waterfall(
  460. [
  461. next => {
  462. const allPromises = [];
  463. youtubeMediaSources.forEach(mediaSource => {
  464. const youtubeId = mediaSource.split(":")[1];
  465. const promise = YouTubeModule.runJob(
  466. "GET_VIDEOS",
  467. { identifiers: [youtubeId], createMissing: true },
  468. this
  469. )
  470. .then(response => {
  471. const { youtubeId, title, author, duration } = response.videos[0];
  472. songMap[mediaSource] = {
  473. mediaSource: `youtube:${youtubeId}`,
  474. title,
  475. artists: [author],
  476. duration
  477. };
  478. })
  479. .catch(err => {
  480. MediaModule.log(
  481. "ERROR",
  482. `Failed to get media in GET_MEDIA_FROM_MEDIA_SOURCES with mediaSource ${mediaSource} and error`,
  483. typeof err === "string" ? err : err.message
  484. );
  485. });
  486. allPromises.push(promise);
  487. });
  488. if (config.get("experimental.soundcloud"))
  489. soundcloudMediaSources.forEach(mediaSource => {
  490. const trackId = mediaSource.split(":")[1];
  491. const promise = SoundCloudModule.runJob(
  492. "GET_TRACK",
  493. { identifier: trackId, createMissing: true },
  494. this
  495. )
  496. .then(response => {
  497. const { trackId, title, username, artworkUrl, duration } = response.track;
  498. songMap[mediaSource] = {
  499. mediaSource: `soundcloud:${trackId}`,
  500. title,
  501. artists: [username],
  502. thumbnail: artworkUrl,
  503. duration
  504. };
  505. })
  506. .catch(err => {
  507. MediaModule.log(
  508. "ERROR",
  509. `Failed to get media in GET_MEDIA_FROM_MEDIA_SOURCES with mediaSource ${mediaSource} and error`,
  510. typeof err === "string" ? err : err.message
  511. );
  512. });
  513. allPromises.push(promise);
  514. });
  515. Promise.allSettled(allPromises).then(() => {
  516. next();
  517. });
  518. }
  519. ],
  520. err => {
  521. if (err && err !== true) return reject(new Error(err));
  522. return resolve(songMap);
  523. }
  524. );
  525. });
  526. }
  527. /**
  528. * Remove import job by id from Mongo
  529. *
  530. * @param {object} payload - object containing the payload
  531. * @param {string} payload.jobIds - the job ids
  532. * @returns {Promise} - returns a promise (resolve, reject)
  533. */
  534. UPDATE_IMPORT_JOBS(payload) {
  535. return new Promise((resolve, reject) => {
  536. let { jobIds } = payload;
  537. if (!Array.isArray(jobIds)) jobIds = [jobIds];
  538. async.waterfall(
  539. [
  540. next => {
  541. MediaModule.ImportJobModel.find({ _id: { $in: jobIds } }, next);
  542. },
  543. (importJobs, next) => {
  544. async.eachLimit(
  545. importJobs,
  546. 1,
  547. (importJob, next) => {
  548. CacheModule.runJob("PUB", {
  549. channel: "importJob.updated",
  550. value: importJob
  551. })
  552. .then(() => next())
  553. .catch(next);
  554. },
  555. err => {
  556. if (err) next(err);
  557. else next(null, importJobs);
  558. }
  559. );
  560. }
  561. ],
  562. (err, importJobs) => {
  563. if (err && err !== true) return reject(new Error(err));
  564. return resolve({ importJobs });
  565. }
  566. );
  567. });
  568. }
  569. /**
  570. * Remove import job by id from Mongo
  571. *
  572. * @param {object} payload - object containing the payload
  573. * @param {string} payload.jobIds - the job ids
  574. * @returns {Promise} - returns a promise (resolve, reject)
  575. */
  576. REMOVE_IMPORT_JOBS(payload) {
  577. return new Promise((resolve, reject) => {
  578. let { jobIds } = payload;
  579. if (!Array.isArray(jobIds)) jobIds = [jobIds];
  580. async.waterfall(
  581. [
  582. next => {
  583. MediaModule.ImportJobModel.deleteMany({ _id: { $in: jobIds } }, err => {
  584. if (err) next(err);
  585. else next();
  586. });
  587. },
  588. next => {
  589. async.eachLimit(
  590. jobIds,
  591. 1,
  592. (jobId, next) => {
  593. CacheModule.runJob("PUB", {
  594. channel: "importJob.removed",
  595. value: jobId
  596. })
  597. .then(() => next())
  598. .catch(next);
  599. },
  600. next
  601. );
  602. }
  603. ],
  604. err => {
  605. if (err && err !== true) return reject(new Error(err));
  606. return resolve();
  607. }
  608. );
  609. });
  610. }
  611. }
  612. export default new _MediaModule();