In this next part we'll break down how I set-up the Agenda task worker, how i defined the 'upload ready' task and the recursive uploader.

Logging a Task

Agenda is a background task scheduler similar to Ruby on Rails 'Whenever' plugin. Rather than hosting the tasks on a separate datastore like Redis we opted to keeping it light by allowing Agenda to create the table 'agendaJobs' within our primary MongoDB database. Then using the Agenda methods to push new tasks to it for processing.

In the previous post we did this via

const Agenda = require('agenda');
var agenda = new Agenda({db: {address: process.env.MONGODB_URI}});

var job = agenda.create('upload ready', {
  extract_location: record.extract_location,
  story_id: record.id
});

In the first two lines we make a mongo connection, require Agenda and then afterwards we use the .create method with the name of the task with any attributes wrapped in {} it may need for processing. Here it's the extract location and the record it will eventually need to update.

The Worker

So we've logged the task into our agendaJobs table for processing. How do we build the process that watch for new tasks and process them when they come in?

For this we'll create a simple js script that we can run with pm2 on our server, or a Procfile on Heroku.

// worker.js
'use strict';

const Agenda = require('agenda');
const mongoose = require('mongoose');
var { uploadDirToS3 } = require('./lib/workers/s3uploader');
require('dotenv').config();

// setup mongoose
var mongooseOptions = {
  reconnectInterval: 500, // Reconnect every 500ms
  reconnectTries: 30, // max number of retries
  keepAlive: true, // keep alive for long running connections
  poolSize: 10, // Maintain up to 10 socket connections
  bufferMaxEntries: 0, // If not connected, return errors immediately
  useNewUrlParser: true
};
mongoose.Promise = global.Promise;
mongoose.connect(process.env.MONGODB_URI || 'mongodb://localhost/et', mongooseOptions)

// setup delayed job worker
var agenda = new Agenda({db: {address: process.env.MONGODB_URI}});

// -----------------------

// define story uploader
agenda.define('upload ready', {priority: 'highest', concurrency: 1}, function(job, done) {
  var data = job.attrs.data;
  uploadDirToS3(data.extract_location, data.story_id, job, done)
});

// start job runner
agenda.on('ready', function() {
  agenda.start();
  console.log("Worker started")
});

agenda.on('success:upload story', function(job) {
  console.log('Successfully uploaded story');
});

From the top we:

  • Require Agenda.
  • Require our s3uploader (which we'll talk about next).
  • Setup our connection to MongoDB via the library Mongoose.
  • Give Agenda a connection to our mongo database server.
  • Define the 'upload ready' task, set it's priority, concurrency (how many of these jobs can be run at once), then using job.attrs.data we'll gain access to those attributes we defined when we created the task and pass it to the uploadDirToS3 method.
  • Next we'll start the job runner.
  • And finally on success we'll notify the console.

In our package.json we can define the job with the 'worker' script.

"scripts": {
  "start": "nodemon --ignore media/ --ignore client/ --exec 'node server.js'",
  "worker": "node worker.js",
  "test": "jest"
},

'upload ready' Script

So far we've setup built the client and server side to get the zip file there. Unzipped it and setup a task worker to operate on our uploads when they get logged but how are we going to handle the actual uploads.

Here we need to build a method that once given a directory can go in there, find all the files inside, then walk from directory to directory pushing more content up to s3.

Now we can do this in two methods.

Depth First, where we go down each directory as far as we can go pushing content to s3 before moving to the next.

Breadth First, where we try to stay as close to the top working one row at a time in unison until we get to the bottom of the tree.

As we don't want to overtax our server or our connection to S3 we'll go with the Depth First approach. It may take more time but it will have less chance of bottlenecking our server. Later down the line we can look at streamlining our process so the pipe is more concurrent, and improve our error handling but for this example lets focus on walking the tree.

// s3uploader.js

// setup libraries
const async = require('async');
const AWS = require('aws-sdk');
const mime = require('mime');
const fs = require('fs');
const path = require("path");
const story = require('../models/story.js');

// build s3 story url, each archive contains a index.html
// we need to display on the front-end
function buildS3ExtractLocation(dir) {
  return `${process.env.AWS_ROOT}${dir}/index.html`
}

// update story status and where to find it for the frontend
function updateStoryStatus(uploadPath, storyId = null) {
  const s3_extract_location = buildS3ExtractLocation(uploadPath)

  if (storyId) {
    story.findById(storyId, function(err, record) {
      if (s3_extract_location !== null) {
        record.s3_extract_location = s3_extract_location
        record.s3_extract_status = "done"
      }
      record.save()
      console.log("story updated.")
    })
  }
}

// upload story to s3
function uploadDirToS3(uploadPath, storyId, job, job_done) {

  // instantiate aws object for s3
  var s3 = new AWS.S3();
  var fileCount = 0;

  // directory and file walk method
  function walk(currentDirPath, callback) {
    fs.readdir(currentDirPath, function (err, files) {
      if (err) {
        throw new Error(err);
      }

      fileCount = fileCount + (files.length - 1)

      files.forEach(function (name) {
        var filePath = path.join(currentDirPath, name);
        var stat = fs.statSync(filePath);

        // if file, upload
        if (stat.isFile()) {
          callback(filePath, stat);

        // if directory, walk thru files
        } else if (stat.isDirectory()) {
          walk(filePath, callback);
        }
      });
    });
  }

  // walk the tree
  walk(uploadPath, function(filePath) {
    fs.readFile(filePath, function (err, data) {

      // if error
      if (err) { throw err; }

      // get content-type (html,jpeg,gif,etc...)
      var metaData = mime.getType(filePath)

      // set bucket, key (filename), body (file),
      // public read-only and content-type
      var params = {
        Bucket: process.env.AWS_BUCKET,
        Key: filePath,
        Body: data,
        ACL: 'public-read',
        ContentType: metaData
      };

      // upload object to s3
      s3.putObject(params, function(err, data) {
        if (err) {
          // handle error, disable job, save and mark as done
          console.log(err)
          job.disable()
          job.save()
          job_done()
        } else {
          console.log(`Successfully uploaded (${fileCount}) ${filePath}`);
          fileCount--;

          // if fileCount -1 all files exhausted
          // update story record, disable job, save and mark as done
          if (fileCount < 0) {
            console.log(`All files successfully uploaded`)
            updateStoryStatus(uploadPath, storyId)
            job.disable()
            job.save()
            job_done()
          }
        }
      });
    });
  })
}

module.exports = {
  uploadDirToS3
}

The core of it revolves around recursively running the walk function on itself whether it's files or directories. Building a file-count and paring that down uploading to s3 and, if directory spawning a new branch walk pulling all the files and uploading it until the entire tree's content's is uploaded and ready on Amazon S3.

In the future this can be advanced with better error handling, data management so the uploads are more consistent without bottlenecking the pipe but for now this should get you started.