Refactor uploader

This commit is contained in:
Jonathan Cremin 2016-06-06 13:39:42 +01:00
parent a1a39778aa
commit 6ec5f456f3
12 changed files with 647 additions and 261 deletions

View file

@ -1,35 +1,50 @@
import fs from 'fs';
import createError from 'http-errors';
import { get as getFile } from './sftp';
import { get as getSFTP } from './sftp';
import { get as getS3 } from './s3';
import debugname from 'debug';
const debug = debugname('hostr:file-stream');
export default function* hostrFileStream(localPath, remotePath) {
function writer(localPath, remoteRead) {
return new Promise((resolve, reject) => {
remoteRead.once('error', () => {
debug('remote error');
reject(createError(404));
});
const localWrite = fs.createWriteStream(localPath);
localWrite.once('finish', () => {
debug('local write end');
resolve(fs.createReadStream(localPath));
});
remoteRead.once('readable', () => {
debug('writing');
remoteRead.pipe(localWrite);
});
});
}
export default function hostrFileStream(localPath, remotePath) {
const localRead = fs.createReadStream(localPath);
return new Promise((resolve, reject) => {
localRead.once('error', () => {
debug('local error');
const remoteFile = getFile(remotePath);
remoteFile.then((remoteRead) => {
const localWrite = fs.createWriteStream(localPath);
localWrite.once('finish', () => {
debug('local write end');
resolve(fs.createReadStream(localPath));
debug('not found locally');
getSFTP(remotePath)
.then((remoteRead) => writer(localPath, remoteRead))
.then(resolve)
.catch((err) => {
debug('not on sftp', err);
writer(localPath, getS3(remotePath))
.then(resolve)
.catch((s3err) => {
debug('not on s3');
reject(s3err);
});
});
remoteRead.pipe(localWrite);
remoteRead.once('error', () => {
debug('remote error');
reject(createError(404));
});
});
});
localRead.once('readable', () => {
debug('local readable');
debug('found locally');
resolve(localRead);
});
});

View file

@ -12,7 +12,7 @@ function* checkId(Files, fileId, attempts) {
if (attempts > 10) {
return false;
}
const file = yield Files.findOne({'_id': fileId});
const file = yield Files.findOne({ _id: fileId });
if (file === null) {
return fileId;
}

View file

@ -5,16 +5,10 @@ const debug = debugname('hostr:s3');
const s3 = new aws.S3();
export function get(key) {
debug('fetching from s3: %s', 'hostr_files/' + key);
return s3.getObject({Bucket: process.env.AWS_BUCKET, Key: 'hostr_files/' + key}).createReadStream();
}
export function upload(stream, key, callback) {
debug('sending to s3: %s', 'hostr_files/' + key);
const params = {Bucket: process.env.AWS_BUCKET, Key: 'hostr_files/' + key, Body: stream};
const uploading = s3.upload(params);
uploading.on('error', (err) => {
console.log(err)
});
uploading.send(callback);
let fullKey = `hostr_files/${key}`;
if (key.substr(2, 5) === '970/' || key.substr(2, 5) === '150/') {
fullKey = `hostr_files/${key.substr(2)}`;
}
debug('fetching from s3: %s', fullKey);
return s3.getObject({ Bucket: process.env.AWS_BUCKET, Key: fullKey }).createReadStream();
}

View file

@ -1,40 +1,29 @@
import { dirname } from 'path';
import Client from 'ssh2-sftp-client';
import { dirname, join } from 'path';
import Client from './ssh2-sftp-client';
import debugname from 'debug';
const debug = debugname('hostr:sftp');
export function get(remotePath) {
debug('fetching', join('hostr', 'uploads', remotePath));
const sftp = new Client();
return sftp.connect({
host: process.env.SFTP_HOST,
port: process.env.SFTP_PORT,
username: process.env.SFTP_USERNAME,
password: process.env.SFTP_PASSWORD,
}).then(() => {
return sftp.get('hostr/uploads/' + remotePath, true);
});
})
.then(() => sftp.get(join('hostr', 'uploads', remotePath), { encoding: null }));
}
export function upload(localPath, remotePath) {
debug('SFTP connecting');
const sftp = new Client();
return sftp.connect({
host: process.env.SFTP_HOST,
port: process.env.SFTP_PORT,
username: process.env.SFTP_USERNAME,
password: process.env.SFTP_PASSWORD,
}).then(() => {
return sftp.put(localPath, remotePath, true).then(() => {
sftp.end();
});
}).catch(() => {
debug('Creating ' + dirname(remotePath));
return sftp.mkdir(dirname(remotePath), true).then(() => {
return sftp.put(localPath, remotePath, true).then(() => {
sftp.end();
});
});
}).then(() => {
sftp.end();
});
})
.then(() => sftp.put(localPath, remotePath, true))
.catch(() => sftp.mkdir(dirname(remotePath), true)
.then(() => sftp.put(localPath, remotePath, true)));
}

303
lib/ssh2-sftp-client.js Normal file
View file

@ -0,0 +1,303 @@
/**
* ssh2 sftp client for node
*/
'use strict';
let Client = require('ssh2').Client;
let SftpClient = function(){
this.client = new Client();
};
/**
* Retrieves a directory listing
*
* @param {String} path, a string containing the path to a directory
* @return {Promise} data, list info
*/
SftpClient.prototype.list = function(path) {
let reg = /-/gi;
return new Promise((resolve, reject) => {
let sftp = this.sftp;
if (sftp) {
sftp.readdir(path, (err, list) => {
if (err) {
reject(err);
return false;
}
// reset file info
list.forEach((item, i) => {
list[i] = {
type: item.longname.substr(0, 1),
name: item.filename,
size: item.attrs.size,
modifyTime: item.attrs.mtime * 1000,
accessTime: item.attrs.atime * 1000,
rights: {
user: item.longname.substr(1, 3).replace(reg, ''),
group: item.longname.substr(4,3).replace(reg, ''),
other: item.longname.substr(7, 3).replace(reg, '')
},
owner: item.attrs.uid,
group: item.attrs.gid
}
});
resolve(list);
});
} else {
reject('sftp connect error');
}
});
};
/**
* get file
*
* @param {String} path, path
* @param {Object} useCompression, config options
* @return {Promise} stream, readable stream
*/
SftpClient.prototype.get = function(path, useCompression) {
useCompression = Object.assign({}, {encoding: 'utf8'}, useCompression);
return new Promise((resolve, reject) => {
let sftp = this.sftp;
if (sftp) {
try {
let stream = sftp.createReadStream(path, useCompression);
stream.on('error', reject);
resolve(stream);
} catch(err) {
reject(err);
}
} else {
reject('sftp connect error');
}
});
};
/**
* Create file
*
* @param {String|Buffer|stream} input
* @param {String} remotePath,
* @param {Object} useCompression [description]
* @return {[type]} [description]
*/
SftpClient.prototype.put = function(input, remotePath, useCompression) {
useCompression = Object.assign({}, {encoding: 'utf8'}, useCompression);
return new Promise((resolve, reject) => {
let sftp = this.sftp;
if (sftp) {
if (typeof input === 'string') {
sftp.fastPut(input, remotePath, useCompression, (err) => {
if (err) {
reject(err);
return false;
}
resolve();
});
return false;
}
let stream = sftp.createWriteStream(remotePath, useCompression);
let data;
stream.on('error', reject);
stream.on('close', resolve);
if (input instanceof Buffer) {
data = stream.end(input);
return false;
}
data = input.pipe(stream);
} else {
reject('sftp connect error');
}
});
};
SftpClient.prototype.mkdir = function(path, recursive) {
recursive = recursive || false;
return new Promise((resolve, reject) => {
let sftp = this.sftp;
if (sftp) {
if (!recursive) {
sftp.mkdir(path, (err) => {
if (err) {
reject(err);
return false;
}
resolve();
});
return false;
}
let tokens = path.split(/\//g);
let p = '';
let mkdir = () => {
let token = tokens.shift();
if (!token && !tokens.length) {
resolve();
return false;
}
token += '/';
p = p + token;
sftp.mkdir(p, (err) => {
if (err && err.code !== 4) {
reject(err);
}
mkdir();
});
};
return mkdir();
} else {
reject('sftp connect error');
}
});
};
SftpClient.prototype.rmdir = function(path, recursive) {
recursive = recursive || false;
return new Promise((resolve, reject) => {
let sftp = this.sftp;
if (sftp) {
if (!recursive) {
return sftp.rmdir(path, (err) => {
if (err) {
reject(err);
}
resolve();
});
}
let rmdir = (p) => {
return this.list(p).then((list) => {
if (list.length > 0) {
let promises = [];
list.forEach((item) => {
let name = item.name;
let promise;
var subPath;
if (name[0] === '/') {
subPath = name;
} else {
if (p[p.length - 1] === '/') {
subPath = p + name;
} else {
subPath = p + '/' + name;
}
}
if (item.type === 'd') {
if (name !== '.' || name !== '..') {
promise = rmdir(subPath);
}
} else {
promise = this.delete(subPath);
}
promises.push(promise);
});
if (promises.length) {
return Promise.all(promises).then(() => {
return rmdir(p);
});
}
} else {
return new Promise((resolve, reject) => {
return sftp.rmdir(p, (err) => {
if (err) {
reject(err);
}
else {
resolve();
}
});
});
}
});
};
return rmdir(path).then(() => {resolve()})
.catch((err) => {reject(err)});
} else {
reject('sftp connect error');
}
});
};
SftpClient.prototype.delete = function(path) {
return new Promise((resolve, reject) => {
let sftp = this.sftp;
if (sftp) {
sftp.unlink(path, (err) => {
if (err) {
reject(err);
return false;
}
resolve();
});
} else {
reject('sftp connect error');
}
});
};
SftpClient.prototype.rename = function(srcPath, remotePath) {
return new Promise((resolve, reject) => {
let sftp = this.sftp;
if (sftp) {
sftp.rename(srcPath, remotePath, (err) => {
if (err) {
reject(err);
return false;
}
resolve();
});
} else {
reject('sftp connect error');
}
});
}
SftpClient.prototype.connect = function(config) {
var c = this.client;
return new Promise((resolve, reject) => {
this.client.on('ready', () => {
this.client.sftp((err, sftp) => {
if (err) {
reject(err);
}
this.sftp = sftp;
resolve(sftp);
});
}).on('error', (err) => {
reject(err);
}).connect(config);
});
};
SftpClient.prototype.end = function() {
return new Promise((resolve) => {
this.client.end();
resolve();
});
};
module.exports = SftpClient;

View file

@ -1,108 +0,0 @@
import { join } from 'path';
import parse from 'co-busboy';
import fs from 'mz/fs';
import sizeOf from 'image-size';
import hostrId from './hostr-id';
import resize from './resize';
import { upload as sftpUpload } from './sftp';
import debugname from 'debug';
const debug = debugname('hostr-api:upload');
const storePath = process.env.UPLOAD_STORAGE_PATH;
const baseURL = process.env.WEB_BASE_URL;
const supported = ['jpg', 'png', 'gif'];
export function* checkLimit() {
const count = yield this.db.Files.count({
owner: this.user.id,
time_added: {'$gt': Math.ceil(Date.now() / 1000) - 86400},
});
const userLimit = this.user.daily_upload_allowance;
const underLimit = (count < userLimit || userLimit === 'unlimited');
if (!underLimit) {
this.statsd.incr('file.overlimit', 1);
}
this.assert(underLimit, 400, `{
"error": {
"message": "Daily upload limits (${this.user.daily_upload_allowance}) exceeded.",
"code": 602
}
}`);
return true;
}
export function* accept() {
yield checkLimit.call(this);
const upload = yield parse(this, {
autoFields: true,
headers: this.request.headers,
limits: { files: 1},
highWaterMark: 1000000,
});
upload.promise = new Promise((resolve, reject) => {
upload.on('error', (err) => {
this.statsd.incr('file.upload.error', 1);
debug(err);
reject();
});
upload.on('end', () => {
resolve();
});
});
upload.tempGuid = this.request.headers['hostr-guid'];
upload.originalName = upload.filename;
upload.filename = upload.filename.replace(/[^a-zA-Z0-9\.\-\_\s]/g, '').replace(/\s+/g, '');
upload.id = yield hostrId(this.db.Files);
const acceptedEvent = `{"type": "file-accepted", "data": {"id": "${upload.id}", "guid": "${upload.tempGuid}", "href": "${baseURL}/${upload.id}"}}`;
this.redis.publish('/user/' + this.user.id, acceptedEvent);
this.statsd.incr('file.upload.accepted', 1);
return upload;
}
export function resizeImage(upload, type, currentSize, newSize) {
return resize(join(storePath, upload.path), type, currentSize, newSize).then((image) => {
const path = join(upload.id[0], String(newSize.width), upload.id + '_' + upload.filename);
debug('Writing file');
debug(join(storePath, path));
return fs.writeFile(join(storePath, path), image).then(() => {
debug('Uploading file');
return sftpUpload(join(storePath, path), join('hostr', 'uploads', path));
}).catch(debug);
}).catch(debug);
}
export function* processImage(upload) {
debug('Processing image');
return new Promise((resolve) => {
const size = sizeOf(join(storePath, upload.path));
debug('Size: ', size);
if (!size.width || supported.indexOf(size.type) < 0) {
resolve();
}
Promise.all([
resizeImage(upload, size.type, size, {width: 150, height: 150}),
resizeImage(upload, size.type, size, {width: 970}),
]).then(() => {
resolve(size);
});
});
}
export function progressEvent() {
percentComplete = Math.floor(receivedSize * 100 / expectedSize);
if (percentComplete > lastPercent && lastTick < Date.now() - 1000) {
const progressEvent = `{"type": "file-progress", "data": {"id": "${upload.id}", "complete": ${percentComplete}}}`;
this.redis.publish('/file/' + upload.id, progressEvent);
this.redis.publish('/user/' + this.user.id, progressEvent);
lastTick = Date.now();
}
lastPercent = percentComplete;
}

251
lib/uploader.js Normal file
View file

@ -0,0 +1,251 @@
import { join } from 'path';
import parse from 'co-busboy';
import crypto from 'crypto';
import fs from 'mz/fs';
import sizeOf from 'image-size';
import { formatFile } from './format';
import hostrId from './hostr-id';
import resize from './resize';
import malware from './malware';
import { sniff } from './type';
import { upload as sftpUpload } from './sftp';
import debugname from 'debug';
const debug = debugname('hostr-api:uploader');
const storePath = process.env.UPLOAD_STORAGE_PATH;
const baseURL = process.env.WEB_BASE_URL;
const supported = ['jpeg', 'jpg', 'png', 'gif'];
export default class Uploader {
constructor(context) {
this.context = context;
this.Files = context.db.Files;
this.expectedSize = context.request.headers['content-length'];
this.tempGuid = context.request.headers['hostr-guid'];
this.remoteIp = context.request.headers['x-real-ip'] || context.req.connection.remoteAddress;
this.md5sum = crypto.createHash('md5');
this.lastPercent = 0;
this.percentComplete = 0;
this.lastTick = 0;
this.receivedSize = 0;
}
*accept() {
yield this.checkLimit();
this.upload = yield parse(this.context, {
autoFields: true,
headers: this.context.request.headers,
limits: { files: 1 },
highWaterMark: 10000000,
});
this.promise = new Promise((resolve, reject) => {
this.upload.on('error', (err) => {
this.statsd.incr('file.upload.error', 1);
debug(err);
reject();
});
this.upload.on('end', () => {
resolve();
});
});
this.tempGuid = this.tempGuid;
this.originalName = this.upload.filename;
this.filename = this.upload.filename.replace(/[^a-zA-Z0-9\.\-_\s]/g, '').replace(/\s+/g, '');
this.id = yield hostrId(this.Files);
}
receive() {
this.path = join(this.id[0], `${this.id}_${this.filename}`);
this.localStream = fs.createWriteStream(join(storePath, this.path));
this.upload.pause();
this.upload.on('data', (data) => {
this.receivedSize += data.length;
if (this.receivedSize > this.context.user.max_filesize) {
fs.unlink(join(storePath, this.path));
this.context.throw(413, `{"error": {"message": "The file you tried to upload is too large.",
"code": 601}}`);
}
this.localStream.write(data);
this.percentComplete = Math.floor(this.receivedSize * 100 / this.expectedSize);
if (this.percentComplete > this.lastPercent && this.lastTick < Date.now() - 1000) {
const progressEvent = `{"type": "file-progress", "data":
{"id": "${this.upload.id}", "complete": ${this.percentComplete}}}`;
this.context.redis.publish(`/file/${this.upload.id}`, progressEvent);
this.context.redis.publish(`/user/${this.context.user.id}`, progressEvent);
this.lastTick = Date.now();
}
this.lastPercent = this.percentComplete;
this.md5sum.update(data);
});
this.upload.on('end', () => {
this.localStream.end();
});
this.upload.resume();
}
sendToSFTP() {
return sftpUpload(join(storePath, this.path), join('hostr', 'uploads', this.path));
}
acceptedEvent() {
const acceptedEvent = `{"type": "file-accepted", "data":
{"id": "${this.id}", "guid": "${this.tempGuid}", "href": "${baseURL}/${this.id}"}}`;
this.context.redis.publish(`/user/${this.context.user.id}`, acceptedEvent);
this.context.statsd.incr('file.upload.accepted', 1);
}
processingEvent() {
const processingEvent = `{"type": "file-progress", "data":
{"id": "${this.id}", "complete": 100}}`;
this.context.redis.publish(`/file/${this.id}`, processingEvent);
this.context.redis.publish(`/user/${this.context.user.id}`, processingEvent);
this.context.statsd.incr('file.upload.complete', 1);
}
completeEvent() {
const completeEvent = `{"type": "file-added", "data": ${JSON.stringify(this.toDBFormat())}}`;
this.context.redis.publish(`/file/${this.id}`, completeEvent);
this.context.redis.publish(`/user/${this.context.user.id}`, completeEvent);
}
toDBFormat() {
const formatted = {
owner: this.context.user.id,
ip: this.remoteIp,
system_name: this.id,
file_name: this.filename,
original_name: this.originalName,
file_size: this.receivedSize,
time_added: Math.ceil(Date.now() / 1000),
status: 'active',
last_accessed: null,
s3: false,
type: sniff(this.filename),
};
if (this.width) {
formatted.width = this.width;
formatted.height = this.height;
}
return formatted;
}
save() {
return this.Files.insertOne({ _id: this.id, ...this.toDBFormat() });
}
toJSON() {
return formatFile({ _id: this.id, ...this.toDBFormat() });
}
*checkLimit() {
const count = yield this.Files.count({
owner: this.context.user.id,
time_added: { $gt: Math.ceil(Date.now() / 1000) - 86400 },
});
const userLimit = this.context.user.daily_upload_allowance;
const underLimit = (count < userLimit || userLimit === 'unlimited');
if (!underLimit) {
this.context.statsd.incr('file.overlimit', 1);
}
this.context.assert(underLimit, 400, `{
"error": {
"message": "Daily upload limits (${this.context.user.daily_upload_allowance}) exceeded.",
"code": 602
}
}`);
return true;
}
*finalise() {
const dbFile = this.toDBFormat();
dbFile.file_size = this.receivedSize;
dbFile.status = 'active';
dbFile.md5 = this.md5sum.digest('hex');
if (this.width) {
dbFile.width = this.width;
dbFile.height = this.height;
}
yield this.Files.updateOne({ _id: this.id }, { $set: dbFile });
}
resizeImage(upload, type, currentSize, newSize) {
return resize(join(storePath, this.path), type, currentSize, newSize).then((image) => {
const path = join(this.id[0], String(newSize.width), `${this.id}_${this.filename}`);
debug('Writing file');
debug(join(storePath, path));
return fs.writeFile(join(storePath, path), image).then(() => {
debug('Uploading file');
return sftpUpload(join(storePath, path), join('hostr', 'uploads', path));
}).catch(debug);
}).catch(debug);
}
*processImage(upload) {
return new Promise((resolve) => {
let size;
try {
if (supported.indexOf(this.path.split('.').pop().toLowerCase()) < 0) {
resolve();
return;
}
size = sizeOf(join(storePath, this.path));
} catch (err) {
debug(err);
resolve();
return;
}
if (!size.width || supported.indexOf(size.type) < 0) {
resolve();
return;
}
this.width = size.width;
this.height = size.height;
Promise.all([
this.resizeImage(upload, size.type, size, { width: 150, height: 150 }),
this.resizeImage(upload, size.type, size, { width: 970 }),
]).then(() => {
resolve(size);
});
});
}
malwareScan() {
if (process.env.VIRUSTOTAL_KEY) {
// Check in the background
process.nextTick(function* scan() {
debug('Malware Scan');
const result = yield malware(this);
if (result) {
yield this.Files.updateOne({ _id: this.id },
{ $set: { malware: result.positive, virustotal: result } });
if (result.positive) {
this.context.statsd.incr('file.malware', 1);
}
}
});
} else {
debug('Skipping Malware Scan, VIRUSTOTAL env variable not found.');
}
}
}