import co from "co"; import kue from "kue"; import Sentry from "@sentry/node"; import debuglog from "debug"; import { inspect } from "util"; import models from "./models/index.cjs"; import services from "./lib/services.js"; const debug = debuglog("combine.fm:worker"); Sentry.init({ dsn: process.env.SENTRY_DSN }); const queue = kue.createQueue({ redis: process.env.REDIS_URL }); function search(data, done) { const share = data.share; const service = services.find(item => data.service.id === item.id); if (!service) { return; } debug(`Searching on: ${service.id}`); co(function* gen() { // eslint-disable-line no-loop-func try { const match = yield service.search(share); if (match.id) { models.match.create({ trackId: share.type === "track" ? share.id : null, albumId: share.type === "album" ? share.id : null, externalId: match.id.toString(), service: match.service, name: match.name, streamUrl: match.streamUrl, purchaseUrl: match.purchaseUrl, artworkSmall: match.artwork.small, artworkLarge: match.artwork.large }); } else { models.match.create({ trackId: share.type === "track" ? share.id : null, albumId: share.type === "album" ? share.id : null, externalId: null, service: match.service, name: null, streamUrl: null, purchaseUrl: null, artworkSmall: null, artworkLarge: null }); } return done(); } catch (err) { debug(`Error searching on: ${service.id}`); debug(share); debug(inspect(err, { depth: 5 })); Sentry.captureException(err); return done(err); } }).catch(err => { debug(`Error searching on: ${service.id}`); debug(share); debug(inspect(err, { depth: 5 })); Sentry.captureException(err); return done(err); }); } queue.process("search", 5, (job, done) => { search(job.data, done); }); queue.process("search-backlog", 1, (job, ctx, done) => { search(job.data, done); ctx.pause(7000, () => { console.log("Worker is paused... "); setTimeout(() => { ctx.resume(); }, 10000); }); }); kue.app.listen(3001);