Refactor uploads
This commit is contained in:
parent
75a46212da
commit
f59b9a5d22
14 changed files with 297 additions and 269 deletions
|
@ -1,6 +1,6 @@
|
|||
import fs from 'fs';
|
||||
import createError from 'http-errors';
|
||||
import { get as getFile } from './s3';
|
||||
import { get as getFile } from './sftp';
|
||||
|
||||
import debugname from 'debug';
|
||||
const debug = debugname('hostr:file-stream');
|
||||
|
@ -10,22 +10,23 @@ export default function* hostrFileStream(localPath, remotePath) {
|
|||
return new Promise((resolve, reject) => {
|
||||
localRead.once('error', () => {
|
||||
debug('local error');
|
||||
const remoteRead = getFile(remotePath);
|
||||
const remoteFile = getFile(remotePath);
|
||||
|
||||
remoteRead.once('readable', () => {
|
||||
debug('remote readable');
|
||||
remoteFile.then((remoteRead) => {
|
||||
const localWrite = fs.createWriteStream(localPath);
|
||||
localWrite.once('finish', () => {
|
||||
debug('local write end');
|
||||
resolve(fs.createReadStream(localPath));
|
||||
});
|
||||
remoteRead.pipe(localWrite);
|
||||
|
||||
remoteRead.once('error', () => {
|
||||
debug('remote error');
|
||||
reject(createError(404));
|
||||
});
|
||||
});
|
||||
|
||||
remoteRead.once('error', () => {
|
||||
debug('remote error');
|
||||
reject(createError(404));
|
||||
});
|
||||
|
||||
});
|
||||
localRead.once('readable', () => {
|
||||
debug('local readable');
|
||||
|
|
|
@ -1,75 +0,0 @@
|
|||
/**
|
||||
* Module dependencies.
|
||||
*/
|
||||
|
||||
var swig = require('swig');
|
||||
var http = require('http');
|
||||
|
||||
/**
|
||||
* Expose `error`.
|
||||
*/
|
||||
|
||||
module.exports = error;
|
||||
|
||||
/**
|
||||
* Error middleware.
|
||||
*
|
||||
* - `template` defaults to ./error.html
|
||||
*
|
||||
* @param {Object} opts
|
||||
* @api public
|
||||
*/
|
||||
|
||||
function error(opts) {
|
||||
opts = opts || {};
|
||||
|
||||
// template
|
||||
var path = opts.template || __dirname + '/error.html';
|
||||
var render = swig.compileFile(path);
|
||||
|
||||
// env
|
||||
var env = process.env.NODE_ENV || 'development';
|
||||
|
||||
return function *error(next){
|
||||
try {
|
||||
yield next;
|
||||
if (404 == this.response.status && !this.response.body) this.throw(404);
|
||||
} catch (err) {
|
||||
this.status = err.status || 500;
|
||||
|
||||
// application
|
||||
this.app.emit('error', err, this);
|
||||
|
||||
// accepted types
|
||||
switch (this.accepts('html', 'text', 'json')) {
|
||||
case 'text':
|
||||
this.type = 'text/plain';
|
||||
if ('development' == env) this.body = err.message
|
||||
else if (err.expose) this.body = err.message
|
||||
else throw err;
|
||||
break;
|
||||
|
||||
case 'json':
|
||||
this.type = 'application/json';
|
||||
if ('development' == env) this.body = { error: err.message }
|
||||
else if (err.expose) this.body = { error: err.message }
|
||||
else this.body = { error: http.STATUS_CODES[this.status] }
|
||||
break;
|
||||
|
||||
case 'html':
|
||||
this.type = 'text/html';
|
||||
this.body = render({
|
||||
env: env,
|
||||
ctx: this,
|
||||
request: this.request,
|
||||
response: this.response,
|
||||
error: err.message,
|
||||
stack: err.stack,
|
||||
status: this.status,
|
||||
code: err.code
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,33 +1,71 @@
|
|||
import fs from 'mz/fs';
|
||||
import lwip from 'lwip';
|
||||
import debugname from 'debug';
|
||||
const debug = debugname('hostr-api:resize');
|
||||
import lwip from 'lwip';
|
||||
import imageType from 'image-type';
|
||||
|
||||
const supported = ['jpg', 'png', 'gif'];
|
||||
|
||||
export default function(input, size) {
|
||||
debug('Resizing');
|
||||
|
||||
const type = imageType(input);
|
||||
|
||||
if (!type.ext || supported.indexOf(type.ext) < 0) {
|
||||
throw new Error('Not a supported image.');
|
||||
}
|
||||
|
||||
function cover(path, type, size) {
|
||||
return new Promise((resolve, reject) => {
|
||||
lwip.open(input, type.ext, (errIn, image) => {
|
||||
lwip.open(path, type, (errIn, image) => {
|
||||
debug('Image Opened');
|
||||
if (errIn) {
|
||||
return reject(errIn);
|
||||
reject(errIn);
|
||||
}
|
||||
|
||||
image.cover(size.width, size.height, (errOut, resized) => {
|
||||
debug('Image Resized');
|
||||
if (errOut) {
|
||||
return reject(errOut);
|
||||
reject(errOut);
|
||||
}
|
||||
|
||||
resized.toBuffer(type.ext, (errBuf, buffer) => {
|
||||
resized.toBuffer(type, (errBuf, buffer) => {
|
||||
debug('Image Buffered');
|
||||
if (errBuf) {
|
||||
reject(errBuf);
|
||||
}
|
||||
resolve(buffer);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function scale(path, type, size) {
|
||||
return new Promise((resolve, reject) => {
|
||||
lwip.open(path, type, (errIn, image) => {
|
||||
debug('Image Opened');
|
||||
if (errIn) {
|
||||
reject(errIn);
|
||||
}
|
||||
|
||||
image.cover(size.width, size.height, (errOut, resized) => {
|
||||
debug('Image Resized');
|
||||
if (errOut) {
|
||||
reject(errOut);
|
||||
}
|
||||
|
||||
resized.toBuffer(type, (errBuf, buffer) => {
|
||||
debug('Image Buffered');
|
||||
if (errBuf) {
|
||||
reject(errBuf);
|
||||
}
|
||||
resolve(buffer);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export default function resize(path, type, currentSize, newSize) {
|
||||
debug('Resizing');
|
||||
const ratio = 970 / currentSize.width;
|
||||
if (newSize.width <= 150) {
|
||||
debug('Cover');
|
||||
return cover(path, type, newSize);
|
||||
} else if (newSize.width > 970 && ratio > 1) {
|
||||
debug('Scale');
|
||||
newSize.height = currentSize.height * ratio;
|
||||
return scale(path, type, newSize);
|
||||
}
|
||||
debug('Copy');
|
||||
return fs.readFile(path);
|
||||
}
|
||||
|
|
11
lib/s3.js
11
lib/s3.js
|
@ -1,17 +1,20 @@
|
|||
import aws from 'aws-sdk';
|
||||
import s3UploadStream from 's3-upload-stream';
|
||||
import debugname from 'debug';
|
||||
const debug = debugname('hostr:s3');
|
||||
|
||||
const s3 = new aws.S3();
|
||||
const s3Stream = s3UploadStream(s3);
|
||||
|
||||
export function get(key) {
|
||||
debug('fetching from s3: %s', 'hostr_files/' + key);
|
||||
return s3.getObject({Bucket: process.env.AWS_BUCKET, Key: 'hostr_files/' + key}).createReadStream();
|
||||
}
|
||||
|
||||
export function upload(key) {
|
||||
export function upload(stream, key, callback) {
|
||||
debug('sending to s3: %s', 'hostr_files/' + key);
|
||||
return s3Stream.upload({Bucket: process.env.AWS_BUCKET, Key: 'hostr_files/' + key});
|
||||
const params = {Bucket: process.env.AWS_BUCKET, Key: 'hostr_files/' + key, Body: stream};
|
||||
const uploading = s3.upload(params);
|
||||
uploading.on('error', (err) => {
|
||||
console.log(err)
|
||||
});
|
||||
uploading.send(callback);
|
||||
}
|
||||
|
|
40
lib/sftp.js
Normal file
40
lib/sftp.js
Normal file
|
@ -0,0 +1,40 @@
|
|||
import { dirname } from 'path';
|
||||
import Client from 'ssh2-sftp-client';
|
||||
import debugname from 'debug';
|
||||
const debug = debugname('hostr:sftp');
|
||||
|
||||
export function get(remotePath) {
|
||||
const sftp = new Client();
|
||||
return sftp.connect({
|
||||
host: process.env.SFTP_HOST,
|
||||
port: process.env.SFTP_PORT,
|
||||
username: process.env.SFTP_USERNAME,
|
||||
password: process.env.SFTP_PASSWORD,
|
||||
}).then(() => {
|
||||
return sftp.get('hostr/uploads/' + remotePath, true);
|
||||
});
|
||||
}
|
||||
|
||||
export function upload(localPath, remotePath) {
|
||||
debug('SFTP connecting');
|
||||
const sftp = new Client();
|
||||
return sftp.connect({
|
||||
host: process.env.SFTP_HOST,
|
||||
port: process.env.SFTP_PORT,
|
||||
username: process.env.SFTP_USERNAME,
|
||||
password: process.env.SFTP_PASSWORD,
|
||||
}).then(() => {
|
||||
return sftp.put(localPath, remotePath, true).then(() => {
|
||||
sftp.end();
|
||||
});
|
||||
}).catch(() => {
|
||||
debug('Creating ' + dirname(remotePath));
|
||||
return sftp.mkdir(dirname(remotePath), true).then(() => {
|
||||
return sftp.put(localPath, remotePath, true).then(() => {
|
||||
sftp.end();
|
||||
});
|
||||
});
|
||||
}).then(() => {
|
||||
sftp.end();
|
||||
});
|
||||
}
|
108
lib/upload.js
Normal file
108
lib/upload.js
Normal file
|
@ -0,0 +1,108 @@
|
|||
import { join } from 'path';
|
||||
import parse from 'co-busboy';
|
||||
import fs from 'mz/fs';
|
||||
import sizeOf from 'image-size';
|
||||
import hostrId from './hostr-id';
|
||||
import resize from './resize';
|
||||
import { upload as sftpUpload } from './sftp';
|
||||
|
||||
import debugname from 'debug';
|
||||
const debug = debugname('hostr-api:upload');
|
||||
|
||||
const storePath = process.env.UPLOAD_STORAGE_PATH;
|
||||
const baseURL = process.env.WEB_BASE_URL;
|
||||
const supported = ['jpg', 'png', 'gif'];
|
||||
|
||||
export function* checkLimit() {
|
||||
const count = yield this.db.Files.count({
|
||||
owner: this.user.id,
|
||||
time_added: {'$gt': Math.ceil(Date.now() / 1000) - 86400},
|
||||
});
|
||||
const userLimit = this.user.daily_upload_allowance;
|
||||
const underLimit = (count < userLimit || userLimit === 'unlimited');
|
||||
if (!underLimit) {
|
||||
this.statsd.incr('file.overlimit', 1);
|
||||
}
|
||||
this.assert(underLimit, 400, `{
|
||||
"error": {
|
||||
"message": "Daily upload limits (${this.user.daily_upload_allowance}) exceeded.",
|
||||
"code": 602
|
||||
}
|
||||
}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
export function* accept() {
|
||||
yield checkLimit.call(this);
|
||||
|
||||
const upload = yield parse(this, {
|
||||
autoFields: true,
|
||||
headers: this.request.headers,
|
||||
limits: { files: 1},
|
||||
highWaterMark: 1000000,
|
||||
});
|
||||
|
||||
upload.promise = new Promise((resolve, reject) => {
|
||||
upload.on('error', (err) => {
|
||||
this.statsd.incr('file.upload.error', 1);
|
||||
debug(err);
|
||||
reject();
|
||||
});
|
||||
|
||||
upload.on('end', () => {
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
|
||||
upload.tempGuid = this.request.headers['hostr-guid'];
|
||||
upload.originalName = upload.filename;
|
||||
upload.filename = upload.filename.replace(/[^a-zA-Z0-9\.\-\_\s]/g, '').replace(/\s+/g, '');
|
||||
upload.id = yield hostrId(this.db.Files);
|
||||
|
||||
const acceptedEvent = `{"type": "file-accepted", "data": {"id": "${upload.id}", "guid": "${upload.tempGuid}", "href": "${baseURL}/${upload.id}"}}`;
|
||||
this.redis.publish('/user/' + this.user.id, acceptedEvent);
|
||||
this.statsd.incr('file.upload.accepted', 1);
|
||||
|
||||
return upload;
|
||||
}
|
||||
|
||||
export function resizeImage(upload, type, currentSize, newSize) {
|
||||
return resize(join(storePath, upload.path), type, currentSize, newSize).then((image) => {
|
||||
const path = join(upload.id[0], String(newSize.width), upload.id + '_' + upload.filename);
|
||||
debug('Writing file');
|
||||
debug(join(storePath, path));
|
||||
return fs.writeFile(join(storePath, path), image).then(() => {
|
||||
debug('Uploading file');
|
||||
return sftpUpload(join(storePath, path), join('hostr', 'uploads', path));
|
||||
}).catch(debug);
|
||||
}).catch(debug);
|
||||
}
|
||||
|
||||
export function* processImage(upload) {
|
||||
debug('Processing image');
|
||||
return new Promise((resolve) => {
|
||||
const size = sizeOf(join(storePath, upload.path));
|
||||
debug('Size: ', size);
|
||||
if (!size.width || supported.indexOf(size.type) < 0) {
|
||||
resolve();
|
||||
}
|
||||
|
||||
Promise.all([
|
||||
resizeImage(upload, size.type, size, {width: 150, height: 150}),
|
||||
resizeImage(upload, size.type, size, {width: 970}),
|
||||
]).then(() => {
|
||||
resolve(size);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function progressEvent() {
|
||||
percentComplete = Math.floor(receivedSize * 100 / expectedSize);
|
||||
if (percentComplete > lastPercent && lastTick < Date.now() - 1000) {
|
||||
const progressEvent = `{"type": "file-progress", "data": {"id": "${upload.id}", "complete": ${percentComplete}}}`;
|
||||
this.redis.publish('/file/' + upload.id, progressEvent);
|
||||
this.redis.publish('/user/' + this.user.id, progressEvent);
|
||||
lastTick = Date.now();
|
||||
}
|
||||
lastPercent = percentComplete;
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue