Update stuff

This commit is contained in:
Jonathan Cremin 2018-06-02 15:50:39 +00:00
parent 0254e42b9c
commit 553ba9db9a
40 changed files with 7343 additions and 717 deletions

View file

@ -1,17 +1,18 @@
import fs from 'fs';
import createError from 'http-errors';
import { get as getSFTP } from './sftp';
import { get as getS3 } from './s3';
import debugname from 'debug';
const debug = debugname('hostr:file-stream');
function writer(localPath, remoteRead) {
return new Promise((resolve, reject) => {
remoteRead.once('error', () => {
debug('remote error');
const localWrite = fs.createWriteStream(localPath);
remoteRead.once('error', (err) => {
debug('remote error', err);
reject(createError(404));
});
const localWrite = fs.createWriteStream(localPath);
localWrite.once('finish', () => {
debug('local write end');
resolve(fs.createReadStream(localPath));
@ -29,12 +30,9 @@ export default function hostrFileStream(localPath, remotePath) {
return new Promise((resolve, reject) => {
localRead.once('error', () => {
debug('not found locally');
getSFTP(remotePath)
.then((remoteRead) => writer(localPath, remoteRead))
.then(resolve)
.catch((err) => {
debug('not on sftp', err);
});
writer(localPath, getS3(remotePath)).then((readable) => {
resolve(readable);
}).catch(reject);
});
localRead.once('readable', () => {
debug('found locally');

View file

@ -10,17 +10,17 @@ function randomID() {
return rand;
}
function* checkId(Files, fileId, attempts) {
async function checkId(Files, fileId, attempts) {
if (attempts > 10) {
return false;
}
const file = yield models.file.findById(fileId);
const file = await models.file.findById(fileId);
if (file === null) {
return fileId;
}
return checkId(randomID(), ++attempts); // eslint-disable-line no-param-reassign
}
export default function* (Files) {
return yield checkId(Files, randomID(), 0);
export default function (Files) {
return checkId(Files, randomID(), 0);
}

37
lib/koa-statsd.js Normal file
View file

@ -0,0 +1,37 @@
/**
* Module dependencies.
*/
var Stats = require('statsy');
/**
* Initialize stats middleware with `opts`
* which are passed to statsy.
*
* @param {Object} [opts]
* @return {Function}
* @api public
*/
module.exports = function(opts){
opts = opts || {};
var s = new Stats(opts);
return async (ctx, next) => {
// counters
s.incr('request.count');
s.incr('request.' + ctx.method + '.count');
// size
s.histogram('request.size', ctx.request.length || 0);
// remote addr
// s.set('request.addresses', this.ip);
// duration
ctx.res.on('finish', s.timer('request.duration'));
await next();
}
};

View file

@ -44,15 +44,15 @@ const wrapped = new Promise((resolve, reject) =>
);
export function sessionStore() {
return function* sessionStoreMiddleware(next) {
const sess = yield redisSession;
yield sess.bind(this)(next);
return async (ctx, next) => {
const sess = await redisSession;
await sess.bind(ctx)(next());
};
}
export function middleware() {
return function* redisMiddleware(next) {
this.redis = yield wrapped;
yield next;
return async (ctx, next) => {
ctx.redis = await wrapped;
await next();
};
}

View file

@ -1,23 +1,29 @@
import fs from 'mz/fs';
import lwip from 'lwip';
import jimp from 'jimp';
import debugname from 'debug';
const debug = debugname('hostr-api:resize');
const types = {
jpg: jimp.MIME_JPEG,
png: jimp.MIME_PNG,
gif: jimp.MIME_JPEG,
}
function cover(path, type, size) {
return new Promise((resolve, reject) => {
lwip.open(path, type, (errIn, image) => {
jimp.read(path, (errIn, image) => {
debug('Image Opened');
if (errIn) {
reject(errIn);
}
image.cover(size.width, size.height, (errOut, resized) => {
image.quality(80).cover(size.width, size.height, (errOut, resized) => {
debug('Image Resized');
if (errOut) {
reject(errOut);
}
resized.toBuffer(type, (errBuf, buffer) => {
resized.getBuffer(types[type], (errBuf, buffer) => {
debug('Image Buffered');
if (errBuf) {
reject(errBuf);
@ -31,19 +37,19 @@ function cover(path, type, size) {
function scale(path, type, size) {
return new Promise((resolve, reject) => {
lwip.open(path, type, (errIn, image) => {
jimp.read(path, (errIn, image) => {
debug('Image Opened');
if (errIn) {
reject(errIn);
}
image.cover(size.width, size.height, (errOut, resized) => {
image.quality(80).cover(size.width, size.height, (errOut, resized) => {
debug('Image Resized');
if (errOut) {
reject(errOut);
}
resized.toBuffer(type, (errBuf, buffer) => {
resized.getBuffer(types[type], (errBuf, buffer) => {
debug('Image Buffered');
if (errBuf) {
reject(errBuf);

View file

@ -2,13 +2,32 @@ import aws from 'aws-sdk';
import debugname from 'debug';
const debug = debugname('hostr:s3');
const s3 = new aws.S3();
const s3 = new aws.S3({
endpoint: process.env.AWS_ENDPOINT,
s3ForcePathStyle: true,
signatureVersion: 'v4',
});
export function get(key) {
let fullKey = `hostr_files/${key}`;
let fullKey = `uploads/${key}`;
if (key.substr(2, 5) === '970/' || key.substr(2, 5) === '150/') {
fullKey = `hostr_files/${key.substr(2)}`;
fullKey = `uploads/${key.substr(2)}`;
}
debug('fetching from s3: %s', fullKey);
return s3.getObject({ Bucket: process.env.AWS_BUCKET, Key: fullKey }).createReadStream();
return s3.getObject({ Bucket: process.env.AWS_BUCKET, Key: fullKey })
.createReadStream()
.on('error', (err) => {
debug('S3 error', err);
});
}
export function upload(stream, key, callback) {
debug(`sending to s3: uploads/'${key}`);
const params = { Bucket: process.env.AWS_BUCKET, Key: `uploads/${key}`, Body: stream };
const uploading = s3.upload(params);
uploading.on('error', (err) => {
debug('S3 Error', err);
});
uploading.send(callback);
return uploading;
}

View file

@ -1,5 +1,5 @@
import { join } from 'path';
import parse from 'co-busboy';
import Busboy from 'busboy';
import crypto from 'crypto';
import fs from 'mz/fs';
import sizeOf from 'image-size';
@ -10,6 +10,7 @@ import { formatFile } from './format';
import resize from './resize';
import malware from './malware';
import { sniff } from './type';
import { upload as s3upload } from './s3';
import debugname from 'debug';
const debug = debugname('hostr-api:uploader');
@ -33,109 +34,8 @@ export default class Uploader {
this.receivedSize = 0;
}
*accept() {
this.upload = yield parse(this.context, {
autoFields: true,
headers: this.context.request.headers,
limits: { files: 1 },
highWaterMark: 10000000,
});
this.promise = new Promise((resolve, reject) => {
this.upload.on('error', (err) => {
this.statsd.incr('file.upload.error', 1);
debug(err);
reject();
});
this.upload.on('end', () => {
resolve();
});
});
this.tempGuid = this.tempGuid;
this.file = yield models.file.create({
id: yield createHostrId(),
name: this.upload.filename.replace(/[^a-zA-Z0-9\.\-_\s]/g, '').replace(/\s+/g, ''),
originalName: this.upload.filename,
userId: this.context.user.id,
status: 'uploading',
type: sniff(this.upload.filename),
ip: this.remoteIp,
accessedAt: null,
width: null,
height: null,
});
yield this.file.save();
}
receive() {
return new Promise((resolve) => {
this.path = join(this.file.id[0], `${this.file.id}_${this.file.name}`);
this.localStream = fs.createWriteStream(join(storePath, this.path));
this.upload.pause();
this.localStream.on('finish', () => {
resolve();
});
this.upload.on('data', (data) => {
this.receivedSize += data.length;
if (this.receivedSize > this.context.user.max_filesize) {
fs.unlink(join(storePath, this.path));
this.context.throw(413, `{"error": {"message": "The file you uploaded is too large.",
"code": 601}}`);
}
this.localStream.write(data);
this.percentComplete = Math.floor(this.receivedSize * 100 / this.expectedSize);
if (this.percentComplete > this.lastPercent && this.lastTick < Date.now() - 1000) {
const progressEvent = `{"type": "file-progress", "data":
{"id": "${this.file.id}", "complete": ${this.percentComplete}}}`;
this.context.redis.publish(`/file/${this.file.id}`, progressEvent);
this.context.redis.publish(`/user/${this.context.user.id}`, progressEvent);
this.lastTick = Date.now();
}
this.lastPercent = this.percentComplete;
this.md5sum.update(data);
});
this.upload.on('end', () => {
this.file.size = this.receivedSize;
this.file.md5 = this.md5sum.digest('hex');
this.localStream.end();
});
this.upload.resume();
});
}
acceptedEvent() {
const accepted = `{"type": "file-accepted", "data":
{"id": "${this.file.id}", "guid": "${this.tempGuid}", "href": "${baseURL}/${this.file.id}"}}`;
this.context.redis.publish(`/user/${this.context.user.id}`, accepted);
this.context.statsd.incr('file.upload.accepted', 1);
}
processingEvent() {
const processing = `{"type": "file-progress", "data":
{"id": "${this.file.id}", "complete": 100}}`;
this.context.redis.publish(`/file/${this.file.id}`, processing);
this.context.redis.publish(`/user/${this.context.user.id}`, processing);
this.context.statsd.incr('file.upload.complete', 1);
}
completeEvent() {
const complete = `{"type": "file-added", "data": ${JSON.stringify(formatFile(this.file))}}`;
this.context.redis.publish(`/file/${this.file.id}`, complete);
this.context.redis.publish(`/user/${this.context.user.id}`, complete);
}
*checkLimit() {
const count = yield models.file.count({
async checkLimit() {
const count = await models.file.count({
where: {
userId: this.context.user.id,
createdAt: {
@ -157,23 +57,96 @@ export default class Uploader {
return true;
}
*finalise() {
this.file.size = this.receivedSize;
this.file.status = 'active';
this.file.processed = 'true';
yield this.file.save();
async accept() {
return new Promise((resolve) => {
this.upload = new Busboy({
autoFields: true,
headers: this.context.request.headers,
limits: { files: 1 },
highWaterMark: 10000000,
});
this.upload.on('file', async (fieldname, file, filename, encoding, mimetype) => {
debug('FILE', fieldname, file, filename, encoding, mimetype);
this.upload.filename = filename;
this.file = await models.file.create({
id: await createHostrId(),
name: this.upload.filename.replace(/[^a-zA-Z0-9\.\-_\s]/g, '').replace(/\s+/g, ''),
originalName: this.upload.filename,
userId: this.context.user.id,
status: 'uploading',
type: sniff(this.upload.filename),
ip: this.remoteIp,
accessedAt: null,
width: null,
height: null,
});
await this.file.save();
this.path = join(this.file.id[0], `${this.file.id}_${this.file.name}`);
this.localStream = fs.createWriteStream(join(storePath, this.path));
this.localStream.on('finish', () => {
resolve();
});
file.on('data', (data) => {
this.receivedSize += data.length;
if (this.receivedSize > this.context.user.max_filesize) {
fs.unlink(join(storePath, this.path));
this.context.throw(413, `{"error": {"message": "The file you uploaded is too large.",
"code": 601}}`);
}
this.localStream.write(data);
this.percentComplete = Math.floor(this.receivedSize * 100 / this.expectedSize);
if (this.percentComplete > this.lastPercent && this.lastTick < Date.now() - 1000) {
const progressEvent = `{"type": "file-progress", "data":
{"id": "${this.file.id}", "complete": ${this.percentComplete}}}`;
this.context.redis.publish(`/file/${this.file.id}`, progressEvent);
this.context.redis.publish(`/user/${this.context.user.id}`, progressEvent);
this.lastTick = Date.now();
}
this.lastPercent = this.percentComplete;
this.md5sum.update(data);
});
debug('accepted');
const accepted = `{"type": "file-accepted", "data":
{"id": "${this.file.id}", "guid": "${this.tempGuid}", "href": "${baseURL}/${this.file.id}"}}`;
this.context.redis.publish(`/user/${this.context.user.id}`, accepted);
this.context.statsd.incr('file.upload.accepted', 1);
file.on('end', () => {
this.file.size = this.receivedSize;
this.file.md5 = this.md5sum.digest('hex');
this.localStream.end();
this.processingEvent();
});
this.localStream.on('end', () => {
s3upload(fs.createReadStream(join(storePath, this.path)), this.path);
});
});
this.context.req.pipe(this.upload);
});
}
resizeImage(upload, type, currentSize, dim) {
return resize(join(storePath, this.path), type, currentSize, dim).then((image) => {
const path = join(this.file.id[0], String(dim.width), `${this.file.id}_${this.file.name}`);
debug('Writing file');
debug(join(storePath, path));
return fs.writeFile(join(storePath, path), image).catch(debug);
}).catch(debug);
processingEvent() {
debug('processing');
const processing = `{"type": "file-progress", "data":
{"id": "${this.file.id}", "complete": 100}}`;
this.context.redis.publish(`/file/${this.file.id}`, processing);
this.context.redis.publish(`/user/${this.context.user.id}`, processing);
this.context.statsd.incr('file.upload.complete', 1);
}
*processImage(upload) {
async processImage(upload) {
return new Promise((resolve) => {
let size;
try {
@ -205,16 +178,43 @@ export default class Uploader {
});
}
resizeImage(upload, type, currentSize, dim) {
return resize(join(storePath, this.path), type, currentSize, dim).then((image) => {
const path = join(this.file.id[0], String(dim.width), `${this.file.id}_${this.file.name}`);
debug('Writing file');
debug(join(storePath, path));
return fs.writeFile(join(storePath, path), image).then(() => {
s3upload(fs.createReadStream(join(storePath, path)), path);
}).catch(debug);
}).catch(debug);
}
async finalise() {
debug('finalise');
this.file.size = this.receivedSize;
this.file.status = 'active';
this.file.processed = 'true';
await this.file.save();
this.completeEvent();
}
completeEvent() {
debug('complete');
const complete = `{"type": "file-added", "data": ${JSON.stringify(formatFile(this.file))}}`;
this.context.redis.publish(`/file/${this.file.id}`, complete);
this.context.redis.publish(`/user/${this.context.user.id}`, complete);
}
malwareScan() {
if (process.env.VIRUSTOTAL_KEY) {
// Check in the background
process.nextTick(function* scan() {
process.nextTick(async () => {
debug('Malware Scan');
const result = yield malware(this);
const result = await malware(this);
if (result) {
this.file.malwarePositives = result.positives;
this.file.save();
const fileMalware = yield models.malware.create({
const fileMalware = await models.malware.create({
fileId: this.file.id,
positives: result.positives,
virustotal: result,