diff --git a/fixmissing.js b/fixmissing.js index 13a2040..2e2ee51 100644 --- a/fixmissing.js +++ b/fixmissing.js @@ -1,4 +1,5 @@ import co from 'co'; +import kue from 'kue'; import debuglog from 'debug'; import models from './models'; @@ -6,6 +7,10 @@ import services from './lib/services'; const debug = debuglog('combine.fm:fixmissing'); +const queue = kue.createQueue({ + redis: process.env.REDIS_URL, +}); + debug('Fixing missing'); const serviceIds = []; @@ -24,41 +29,18 @@ const query = { ], }; -function* search(data) { +function search(data) { const share = data.share; const service = services.find(item => data.service.id === item.id); debug(`Matching ${share.name} on ${data.service.id}`); - const match = yield service.search(share); - - debug(`Match found for ${share.name} on ${data.service.id}`); - - if (match.id) { - models.match.create({ - trackId: share.type === 'track' ? share.id : null, - albumId: share.type === 'album' ? share.id : null, - externalId: match.id.toString(), - service: match.service, - name: match.name, - streamUrl: match.streamUrl, - purchaseUrl: match.purchaseUrl, - artworkSmall: match.artwork.small, - artworkLarge: match.artwork.large, + const job = queue.create('search-backlog', { title: `Matching ${share.name} on ${service.id}`, share, service }) + .attempts(3) + .backoff({ type: 'exponential' }) + .save((err) => { + debug(err || `JobID: ${job.id}`); }); - } else { - models.match.create({ - trackId: share.type === 'track' ? share.id : null, - albumId: share.type === 'album' ? share.id : null, - externalId: null, - service: match.service, - name: null, - streamUrl: null, - purchaseUrl: null, - artworkSmall: null, - artworkLarge: null, - }); - } } function* find(model) { @@ -71,17 +53,21 @@ function* find(model) { if (unmatched.length > 0) { debug(`Matching ${unmatched.join(', ')}`); for (const toMatch of unmatched) { - yield search({ share: item, service: { id: toMatch } }); + search({ share: item, service: { id: toMatch } }); } } else { debug(`No broken matches for ${item.name}`); } } + return Promise.resolve(); } co(function* main() { yield find('album'); yield find('track'); + setTimeout(() => { + process.exit(0); + }, 1000); }).catch((err) => { debug(err.stack); }); diff --git a/lib/services/amazon/index.js b/lib/services/amazon/index.js index c18c66a..8cb302a 100644 --- a/lib/services/amazon/index.js +++ b/lib/services/amazon/index.js @@ -22,7 +22,7 @@ export function* lookupId(id, type) { const result = results[0]; - if (!result || !result.Error) { + if (!result || result.Error) { return { service: 'amazon' }; } @@ -68,7 +68,7 @@ export function* lookupId(id, type) { return { service: 'amazon' }; } -export function* search(data, original = {}) { +export function* search(data) { try { const type = data.type; const results = yield client.itemSearch({ @@ -80,7 +80,7 @@ export function* search(data, original = {}) { const result = results[0]; - if (!result || !result.Error) { + if (!result || result.Error) { return { service: 'amazon' }; } @@ -122,6 +122,10 @@ export function* search(data, original = {}) { } } catch (err) { debug(inspect(err, { depth: 4 })); + if (err[0].Error[0].Code[0] === 'RequestThrottled') { + debug('Rate Limited'); + throw err; + } } return { service: 'amazon' }; } diff --git a/worker.js b/worker.js index bf46725..d71c36d 100644 --- a/worker.js +++ b/worker.js @@ -69,5 +69,12 @@ queue.process('search', 5, (job, done) => { search(job.data, done); }); +queue.process('search-backlog', 1, (job, ctx, done) => { + search(job.data, done); + ctx.pause(7000, () => { + console.log('Worker is paused... '); + setTimeout(() => { ctx.resume(); }, 10000); + }); +}); kue.app.listen(3000);