Source: longoWorker.js

/* global self:false */
/**
 * @project Longo.js
 * @module longoWorker
 * @requires longo
 * @requires {@linkplain http://underscorejs.org/|underscore}
 * @requires {@linkplain https://github.com/davidgtonge/underscore-query|underscore-query}
 * @desc This module is work only WebWorker thread.<br>
 *       Longo automatically create worker thread. So user application does not need use this module directory.
 *
 * @see https://github.com/georgeOsdDev/Longo
 *
 * @license   The MIT License (MIT)
 * @copyright Copyright (c) 2014 Takeharu Oshida <georgeosddev@gmail.com>
 */


// Global objects
var Longo, _;
if (typeof importScripts !== "undefined") {
  // For WebWorker
  importScripts("./lib/underscore/underscore-min.js",
    "./lib/underscore-query/lib/underscore-query.min.js",
    "./longo.js");
} else {
  // For command line test
  _ = require("underscore");
  require("underscore-query")(_);
  Longo = require("./longo.js");
}

(function(global, Longo, _, undefined) {
  "use strict";

  var _console = (typeof console !== "undefined") ? console : {};
  var logger = function(obj, method) {
    var loglevel = method || "log";
    // console in WebWorker thread does not accept more than one arguments
    // eg. console.log(1,2,3); -> console display just only `1`
    // So use JSON
    var msg = {
      "msg": obj,
      "workerName": Worker.name
    };
    if (typeof _console !== undefined && _console[loglevel]) _console[loglevel].call(_console, JSON.stringify(msg));
  };

  var Worker = {};
  var Utils = Longo.Utils;


  var SKIP_REST = "SKIP_REST";

  var UPDATE_OPERATORS = [
    "$inc",
    "$mul",
    "$rename",
    "$setOnInsert",
    "$set",
    "$unset",
    "$min",
    "$max",
    "$currentDate",
    "$invert"
    // "$",
    // "$addToSet",
    // "$pop",
    // "$pullAll",
    // "$pull",
    // "$pushAll",
    // "$push",
    // "$each",
    // "$slice",
    // "$sort",
    // "$position",
    // "$bit",
    // "$isolated",
  ];

  var gctx = {};
  gctx.dataset = [];
  gctx.option = {
    capped: false
  };
  gctx.isUpdatedBySeq = {};

  Worker.getDataset = function(ctx) {
    var _ctx = ctx || gctx;
    return _ctx.dataset;
  };

  Worker.setDataset = function(dataset, ctx) {
    var _ctx = ctx || gctx;
    _ctx.dataset = dataset;
  };


  Worker.applyOperator = function(doc, current) {
    var result = _.identity(current);
    var operators = _.keys(doc);
    var pairs;

    _.each(operators, function(op) {
      pairs = doc[op];
      switch (op) {
      case "$inc":
        _.each(_.keys(pairs), function(k) {
          if (_.isNumber(current[k]) && _.isNumber(pairs[k])) result[k] = current[k] + pairs[k];
        });
        break;
      case "$mul":
        _.each(_.keys(pairs), function(k) {
          if (_.isNumber(current[k]) && _.isNumber(pairs[k])) result[k] = current[k] * pairs[k];
        });
        break;
      case "$rename":
        _.each(_.keys(pairs), function(k) {
          if (_.has(current, k)) result[pairs[k]] = current[k];
        });
        break;
      case "$set":
        _.each(_.keys(pairs), function(k) {
          if (_.has(current, k)) result[k] = pairs[k];
        });
        break;
      case "$unset":
        _.each(_.keys(pairs), function(k) {
          if (_.has(current, k)) delete result[k];
        });
        break;
      case "$mod":
        _.each(_.keys(pairs), function(k) {
          if (_.isNumber(current[k]) && _.isNumber(pairs[k])) result[k] = current[k] % pairs[k];
        });
        break;
      default:
        //noop
      }
    });
    return result;
  };


  Worker.parseDotQuery = function(v,k){
    var token, parent, child, matcher;
    if (k.indexOf(".") > 0) {
      token = k.split(".");
      child  = _.last(token);
      parent = _.initial(token).join(".");
      matcher = {};
      matcher[child] = v;
      return Worker.parseDotQuery({$elemMatch:matcher}, parent);
    } else {
      return [v, k];
    }
  };

  Worker.toQuery = function(query) {
    var q = Utils.checkOrElse(query, {}, function(val) {
      return _.isObject(val);
    });

    var queryArray = {};
    _.each(q, function(v,k){
      if (_.contains(k, ".")){
        var tuple = Worker.parseDotQuery(v, k);
        var or = {};
        or[k] = v;
        or[tuple[1]] = tuple[0];
        queryArray.$or = or;
      } else {
        queryArray[k] = v;
      }
    });

    return queryArray;
  };

  Worker.toDocument = function(doc) {
    if (_.isObject(doc) && !_.isArray(doc)) return doc;
    var arr = Utils.toArray(doc);
    var keys = _.keys(arr);
    return _.object(keys, arr);
  };

  Worker.isSizeReached = function(dataset, doc, size) {
    var max = size || 1024 * 1024,
      current = Utils.str2ab(JSON.stringify(dataset)).byteLength,
      param = Utils.str2ab(JSON.stringify(doc)).byteLength;
    return current + param > max;
  };

  Worker.isCountReached = function(dataset, max) {
    return _.size(dataset) + 1 > (max || 1000);
  };


  // side effect to the ctx
  Worker.doStart = function(command, seq, ctx) {
    var _ctx = ctx || gctx;
    _ctx.name = command.name;
    _ctx.option = command.option;
    _ctx.dataset = command.dataset;
    if (!_ctx.isUpdatedBySeq) _ctx.isUpdatedBySeq = {};
    _ctx.isUpdatedBySeq[seq] = true;
    logger("Longo Collection Worker Started", "info");
    return [null, []];
  };

  Worker.doFind = function(dataset, query) {
    var q = Worker.toQuery(query);
    return [null, _.query(dataset, q)];
  };

  // side effect to the ctx
  Worker.doInsert = function(docs, seq, ctx) {
    var _ctx = ctx || gctx;
    if (Utils.isZero(_.size(docs))) {
      return [SKIP_REST, null];
    }
    if (!_ctx.dataset) _ctx.dataset = [];

    // TODO : what does this line mean ?
    var doc = _.omit(Worker.toDocument(_.first(Utils.toArray(docs))), UPDATE_OPERATORS);

    if (!doc._id) {
      doc._id = Utils.objectId();
    } else {
      if (_.where(_ctx.dataset, {
        "_id": doc._id
      }).length > 0) return [new Longo.Error(Longo.Error.DUPLICATE_KEY_ERROR, "_id: " + doc._id), doc];
    }

    if (_ctx.option.capped) {
      // Check max size of dataset
      if (Worker.isSizeReached(_ctx.dataset, doc, _ctx.option.size)) {
        logger("Reached to size count for capped Collection. Size: " + _ctx.option.size, "warn");
        _ctx.dataset.shift();
        if (_ctx.dataset.length === 0) return [SKIP_REST, null];
        return Worker.doInsert(docs, seq, _ctx);
      }
      // Check max count of dataset
      if (Worker.isCountReached(_ctx.dataset, _ctx.option.max)) {
        logger("Reached to max count for capped Collection. Count: " + _ctx.option.max, "warn");
        _ctx.dataset.shift();
      }
    }

    _ctx.dataset.push(doc);
    _ctx.isUpdatedBySeq[seq] = true;
    return Worker.doInsert(_.rest(docs), seq, _ctx);
  };

  // side effect to the ctx
  Worker.updateById = function(current, doc, seq, ctx) {
    var _ctx = ctx || gctx;
    doc = Worker.toDocument(doc);
    if (doc._id && current._id !== doc._id) return [new Longo.Error(Longo.Error.MOD_ID_NOT_ALLOWED, "_id: " + doc._id), null];

    var operators = _.pick(doc, UPDATE_OPERATORS);
    var normal = _.omit(doc, UPDATE_OPERATORS);

    if (_.size(operators) > 0) {
      if (_.size(normal) > 0) return [new Longo.Error(Longo.Error.INVALID_MODIFIER_SPECIFIED, normal[0]), null];

      doc = Worker.applyOperator(doc, current);
    }
    doc._id = current._id;
    _ctx.dataset = _.reject(_ctx.dataset, function(d) {
      return d._id === current._id;
    }).concat([doc]);
    _ctx.isUpdatedBySeq[seq] = true;
    return [null, null];
  };

  // side effect to the ctx
  Worker.doUpdate = function(query, update, option, seq, ctx) {
    var _ctx = ctx || gctx;
    var hits, current;
    hits = Worker.doFind(_ctx.dataset, query)[1];

    if (Utils.isZero(_.size(hits))) {
      if (option.upsert) return Worker.doInsert(update, seq, _ctx);
      return [new Longo.Error(Longo.Error.DOCUMENT_NOT_FOUND, "query: " + JSON.stringify(query)), null];
    } else if (Utils.isOne(_.size(hits))) {
      current = hits[0];
      return Worker.updateById(current, update, seq, _ctx);
    } else {
      if (!option.multi) return Worker.updateById(hits[0], update, seq, _ctx);
      if (update._id && current._id !== update._id) return [new Longo.Error(Longo.Error.MOD_ID_NOT_ALLOWED, "_id: " + update._id), null];

      var res = [null, null];
      _.every(hits, function(current) {
        res = Worker.updateById(current, update, seq, _ctx);
        return res[0] === null;
      });
      return res;
    }
  };

  // side effect to the ctx
  Worker.doSave = function(docs, seq, ctx) {
    var _ctx = ctx || gctx;
    var doc, result;
    docs = Utils.toArray(docs);
    if (Utils.isZero(_.size(docs))) {
      return [SKIP_REST, null];
    }
    doc = _.first(docs);

    if (!doc._id) {
      result = Worker.doInsert(doc, seq, _ctx);
    } else {
      result = Worker.doUpdate({
        "_id": doc._id
      }, doc, {
        upsert: true
      }, seq, _ctx);
      if (result[0] && result[0] !== SKIP_REST) return result;
    }
    return Worker.doSave(_.rest(docs), seq, _ctx);
  };

  // side effect to the ctx
  Worker.doRemove = function(query, justOne, seq, ctx) {
    var _ctx = ctx || gctx;
    var hits = Worker.doFind(_ctx.dataset, query)[1],
      ids;

    if (Utils.isZero(_.size(hits))) return [new Longo.Error(Longo.Error.DOCUMENT_NOT_FOUND, "query: " + JSON.stringify(query)), null];

    if (justOne) {
      _ctx.dataset = _.reject(_ctx.dataset, function(doc) {
        return doc._id === hits[0]._id;
      });
    } else {
      ids = _.pluck(hits, "_id");
      _ctx.dataset = _.reject(_ctx.dataset, function(doc) {
        return _.contains(ids, doc._id);
      });
    }
    _ctx.isUpdatedBySeq[seq] = true;
    return [null, null];
  };

  Worker.doProject = function(dataset, projection) {
    var pairs = _.pairs(projection),
      includes = _.filter(pairs, function(p) {
        return Utils.isOne(p[1]) || Utils.isTrue(p[1]);
      }),
      excludes = _.filter(pairs, function(p) {
        return Utils.isZero(p[1]) || Utils.isFalse(p[1]);
      }),
      keys;

    if (_.size(includes) > 0) {
      keys = _.pluck(includes, "0");
      if (Utils.isZero(projection._id) || Utils.isFalse(projection._id)) {
        keys = _.without(keys, "_id");
      } else {
        keys = _.union(keys, ["_id"]);
      }
      return [null, _.map(dataset, function(d) {
        return _.pick(d, keys);
      })];
    } else {
      keys = _.pluck(excludes, "0");
      return [null, _.map(dataset, function(d) {
        return _.omit(d, keys);
      })];
    }
  };

  Worker.doLimit = function(dataset, limit) {
    return [null, _.first(dataset, limit)];
  };

  Worker.doSkip = function(dataset, skip) {
    return [null, _.rest(dataset, skip)];
  };

  Worker.doCount = function(dataset) {
    return [SKIP_REST, [_.size(dataset)]];
  };

  Worker.doSize = function(dataset) {
    return [SKIP_REST, [Utils.str2ab(JSON.stringify(dataset)).byteLength]];
  };

  Worker.doToArray = function(dataset) {
    return [null, Utils.toArray(dataset)];
  };

  Worker.doMax = function(dataset, indexBounds) {
    var k, v, query;
    k = _.keys(indexBounds);
    v = _.values(indexBounds, function(val) {
      return {
        "$lte": val
      };
    });
    query = _.object(k, v);
    return Worker.doFind(dataset, query);
  };

  Worker.doMin = function(dataset, indexBounds) {
    var k, v, query;
    k = _.keys(indexBounds);
    v = _.values(indexBounds, function(val) {
      return {
        "$gte": val
      };
    });
    query = _.object(k, v);
    return Worker.doFind(dataset, query);
  };

  Worker.doForEach = function(dataset, func) {
    /*jshint -W054 */
    if (!func) return [null, dataset];
    var f = (new Function(func + ""))();
    try {
      return [null, _.forEach(dataset, f)];
    } catch (e) {
      return [new Longo.Error(Longo.Error.EVAL_ERROR, "function: " + func, e.stack), null];
    }
  };

  Worker.doMap = function(dataset, func) {
    /*jshint -W054 */
    if (!func) return [null, dataset];
    var f = (new Function(func + ""))();
    try {
      return [null, _.map(dataset, f)];
    } catch (e) {
      return [new Longo.Error(Longo.Error.EVAL_ERROR, "function: " + func, e.stack), null];
    }
  };

  Worker.doSort = function(dataset, sorter) {
    var sorted;
    /*jshint -W054 */
    if (_.isString(sorter)) {
      var f = (new Function(sorter + ""))();
      try {
        sorted = _.sortBy(dataset, f);
        // remove trailing comma
        if (_.isEmpty(_.last(sorted))) sorted = _.initial(sorted);
        return [null, sorted];
      } catch (e) {
        return [new Longo.Error(Longo.Error.EVAL_ERROR, "sorter: " + sorter, e.stack), null];
      }
    } else {
      var key = _.keys(sorter)[0],
        order = sorter[key];
      sorted = _.sortBy(dataset, key);

      // remove trailing comma
      if (_.isEmpty(_.last(sorted))) sorted = _.initial(sorted);
      if (!Utils.isOne(order)) return [null, sorted.reverse()];
      return [null, sorted];
    }
  };

  Worker.getExecuter = function(seq, ctx) {
    var _ctx = ctx || gctx;
    return function(memo, command) {
      var error = memo[0],
        dataset = memo[1];

      if (error) return memo;

      switch (command.cmd) {

      // start/insert/save/update/remove will change dataset of ctx
      case "start":
        return Worker.doStart(command, seq, _ctx);
      case "insert":
        return Worker.doInsert(Utils.toArray(Utils.getOrElse(command.doc), []), seq, _ctx);
      case "save":
        return Worker.doSave(Utils.toArray(command.doc), seq, _ctx);
      case "update":
        return Worker.doUpdate(command.query, Utils.getOrElse(command.update, {}), Utils.getOrElse(command.option, {}), seq, _ctx);
      case "remove":
        return Worker.doRemove(command.query, Utils.getOrElse(command.justOne, false), seq, _ctx);

      // rest commands have no side effect
      case "find":
        return Worker.doFind(dataset, command.query);
      case "project":
        return Worker.doProject(dataset, Utils.getOrElse(command.projection, {}));
      case "limit":
        return Worker.doLimit(dataset, Utils.getOrElse(command.value, 15));
      case "skip":
        return Worker.doSkip(dataset, Utils.getOrElse(command.value, 0));
      case "count":
        return Worker.doCount(dataset);
      case "size":
        return Worker.doSize(dataset);
      case "toArray":
        return Worker.doToArray(dataset);
      case "max":
        return Worker.doMax(dataset, Utils.getOrElse(command.indexBounds, {}));
      case "min":
        return Worker.doMin(dataset, Utils.getOrElse(command.indexBounds, {}));
      case "forEach":
        return Worker.doForEach(dataset, Utils.getOrElse(command.func, null));
      case "map":
        return Worker.doMap(dataset, Utils.getOrElse(command.func, null));
      case "sort":
        return Worker.doSort(dataset, Utils.getOrElse(command.sorter, {"_id": 1}));

      default:
        return memo;
      }
    };
  };

  global.postMessage = global.webkitPostMessage || global.postMessage || function() {};
  Worker.send = function(message) {
    message.seqs = self.seqs;
    var json = JSON.stringify(message),
      bytes = Utils.str2ab(json);
    global.postMessage(bytes, [bytes]);
  };

  global.addEventListener = global.addEventListener || function() {};
  global.addEventListener("message", function(e) {

    var request, data, cmds, seq, result = [],
      executer;
    request = Utils.tryParseJSON(Utils.ab2str(e.data));

    if (request[0]) {
      return Worker.send({
        "seq": -1,
        "error": request[0],
        "result": []
      });
    }

    data = request[1] || {};
    cmds = data.cmds;
    seq = data.seq;
    gctx.isUpdatedBySeq[seq] = false;
    executer = Worker.getExecuter(seq, gctx);

    try {
      result = _.reduce(cmds, executer, [null, Worker.getDataset()]);
    } catch (err) {
      return Worker.send({
        "seq": seq,
        "error": err,
        "result": []
      });
    }

    if (result[0] === SKIP_REST) result[0] = null;

    Worker.send({
      "seq": seq,
      "error": result[0],
      "result": result[1],
      "isUpdated": gctx.isUpdatedBySeq[seq]
    });
  }, false);

  if (typeof exports !== "undefined") {
    if (typeof module !== "undefined" && module.exports) {
      module.exports = Worker;
    }
    exports.LongoWorker = Worker;
  }

  // self is global at WebWorker thread
})((typeof self !== "undefined") ? self : this, Longo, _);