EzDevInfo.com

highland

High-level streams library for Node.js and the browser Highland.js

Dynamic piping with FRP

Consider a problem:

  • split file by lines
  • write lines to a result file
  • if a result file exceeds some size create a new result file

For example, if I have a file which weights 4gb and split size is equal 1gb. The result is four files weights 1gb.

I'm looking for a solution with something like Rx*/Bacon or any other similar library in any language.


Source: (StackOverflow)

Retrying a operation with highlandjs when an error occurs

I'm trying to figure out if it's possible to retry a operation with highland js

highland( arrayOfOperations )
    .map( doOperation )
    .parallel( numberOfParallelOperations )
    .errors( (operation) ->
        if operation.attempts < maxRetries
          // retry the operation
    )
    .each( (operation) ->
        // operation success
    )
    .done(
        // all complete
    )

```

Here's my code, I'm trying to figure out ho I retry an operation when I get an error.


Source: (StackOverflow)

Advertisements

highland.js event handlers not firing as expected

I am trying to use the request library which streams responses. Following code works as expected.

request.get('http://someurl.com')
  .on('response', function(response) {
    console.log(response.statusCode) // 200
});

However, when I try to do the same thing with highland.js the event handler is never fired.

var r = request.get('http://someurl.com');
var p =_('response', r);
  p.map(function(x){
  console.log(x.statusCode);
});

Is there something super trivial that I am missing here?


Source: (StackOverflow)

How do I read from a highland stream?

So I have a highland stream:

var getData = highland.wrapCallback(fs.readFile);
var fileStream = getData('myfile')
console.log(fileStream)

How do I read from fileStream? According to the Highland docs, there is no read() method on streams.


Source: (StackOverflow)

Print result from reduce in Highland.js

I written a simple script which should count line

var
  H = require('highland'),
  fs = require('fs'),
  split = require('split');

var lineStream = fs.createReadStream('data-samples/sample.log').pipe(split());

H('data', lineStream).reduce(0, function(count) {
  return count + 1;
}).each(console.log);

But for some reasons I see nothing in console. Documentation says about lazyness but each call should "invoke" the stream. How to fix the issue?

NB: It's a question about highland.js and not about a way to count lines


Source: (StackOverflow)

Reduce nodejs application memory using stream?

This is probably a newbie question but I searched and couldn't find a satisfying answer.

My node.js application seems to consume a lot of memory. Each process consumes about 100MB. I heard that nodejs itself has a ~30MB memory footprint per process.

The application is a JSON api, backed by MongoDB. In may cases, one API request will result in many database requests, mainly to populate the child relationships. A typical query is like this: (1) get an array of objectIds based on query condition, and (2) iterate each objectId, and issue a query to the database to populate the data (some call that hydration).

The code is heavily using async.js. I tried to profile the memory usage and it seems async.js is using a lot of memory but there is no sign of memory leak. The author of async.js also came out with a stream library highland.js (http://highlandjs.org/). I am new to nodejs stream, and I am curious if this is a possible tool to replace async.js? The web site seems to mention underscore but I mainly use async.js for the asynchronous processing.

Thanks!


Source: (StackOverflow)

The request stream in highland is not correctly passed forward

I am having problems with getting work the nodejs request stream as I pass to map it's already empty array.

var _       = require('highland'),
    fs      = require('fs'),
    request = require('request');

// This works but not using the stream approach
// function get(path) {

//     return _(function (push, next) {

//         request(path, function (error, response, body) {
//             // The response itself also contains the body
//             push(error, response);
//             push(null, _.nil);
//         });
//     });
// }

var google = _(request.get('http://www.google.com'));

google
// res is empty array
.map(function (res) {
    // console.log(res);
    return res;
})
// res is empty array
.toArray(function (res) {

    console.log(res);
});

Source: (StackOverflow)

mapping knockout observable changes to event streams

I'm working off of a module that's built in knockout.js and I want to map changes to observables onto an event stream after the manner of RxJs, highland.js or bacon.js.

I know there are ways to do this by extending knockout, like ko.subscribable.fn.someFilter. But that assumes I want knockout to do my work for me - I don't, particularly.

Typically in Bacon or highland you would see something like this:

//bacon
var strm1 = $('#el').toEventStream('click');
strm1.map(transformFn);
//highland
var strm2 = _('click',$('#el'));
strm2.map(transformFn);

Unfortunately in knockout I'm not able to find an easy way to hook onto events since knockout implements pub/sub rather than emitters. Any insights into how I might hook on?

I was thinking something like a mediator emitter that subscribes to all observable properties and then emits the events.

assuming I have something like this bound by a ko viewmodel:

<input data-bind="value:name" />

and the viewmodel is something like

function Person(){
this.name = ko.observable('Bob');
this.age = ko.observable(21);
    //...
};
var person = new Person();
var emitter = Emitter({});
var streams = {};

and then I am mapping as follows, where _ is highland.

for(var key in person){
streams[key]=_(key+':changed',watcher);
person[key].subscribe(function(){
    watcher.emit(key+':changed',arguments);
});
}
function streamHandler(){
console.log('streamHandler got',arguments);
}

All well and good, my emitter fires just fine,but then this does nothing:

 streams['name'].each(streamHandler);

So, I am missing something. What's wrong?


Source: (StackOverflow)

How do I use Highland.js to create a project workflow?

I'm trying to create a project development automation (like grunt/gulp) tool using Highland.js to replace my current gulp.js setup: https://github.com/Industrial/id-project. I'm not sure how exactly to use Highland.js so the current code is nothing but a guess;

fs   = require "fs"
path = require "path"

_      = require "highland"
mkdirp = require "mkdirp"
rimraf = require "rimraf"

sourceDirectory = path.resolve "./src"
buildDirectory  = path.resolve "./build"

# 1) How do I avoid using wrapCallback for all the logic and actually do it
# functionally and with Highland.js?

createDirectories = _.wrapCallback (cb) ->
  # 2) How do I parallelize these? Don't tell me "Use async.js." ...
  mkdirp sourceDirectory, ->
    mkdirp buildDirectory, cb

cleanBuildDirectory = _.wrapCallback (cb) ->
  rimraf buildDirectory, cb

compile = _.wrapCallback (cb) ->
  # Eventually I want to compile CoffeeScript, Jade, Less, Copy other files,
  # Browserify, and BrowserSync.
  cb()

watch = _.wrapCallback (cb) ->
  # And on the fly.
  cb()

# 3) How do I get a pipeline of streams to start executing these steps in series
# and within the steps things in series and in parallel?
pipeline = _.pipeline createDirectories,
  cleanBuildDirectory,
  compile,
  watch

# 4) Is the way I initially start the program okay?
pipeline
  .pipe process.stdout

pipeline.write "start"

Source: (StackOverflow)

Highland.js consumer which will be able buffer incomming data, not pause provider, emit new data

I need a consumer which will be able:

  • buffer incomming data
  • not pause provider
  • emit new data in async mode

I.e. something like this:

var consumer = function() {
  var buffer = [];

  var jobStarted = false;

  var startAsyncJob = function() {
    if(jobStarted) {
      return;
    }
    jobStarted = true;

    // later
    onJobComplete();
  };

  var onJobComplete = function() {
    jobStarted = false;

    // TODO: push buffer or some other data downstream
  };

  return function(err, value, push, next) {
    // I will skip here all if(err) and if (value === _.nil)
    buffer.push(value);
    startAsyncJob();
  };
};

// usage
source
  .consume(consumer())
  .map(function() {/* something here */})
  .each(_.log);

Is it possible to do with help highland.js?

Thanks


Source: (StackOverflow)

Convert Scala async code to HighlandJS? Best practices?

I have a complex workflow I'm trying to get working in highlandjs. What makes it complex is (1) you need to transform a list-of-futures to future-of-list and (2) it's a sequence of async calls where the 3rd call depends on both the output of the 1st and 2nd.

It's working in Scala. Checkout this gist: https://gist.github.com/frankandrobot/ad857698f680aceb44e8

Unfortunately, I haven't been able to convert #preprocess1 into the highland equivalent.

  • The highland version doesn't return anything (when calling #apply).
  • A debug "hi from console" doesn't get fired either, so it's not getting called completely.
  • The 3 async calls have been tested and are working.

UPDATE: yea, I'm in highlandjs nested hell. So pointers on how to get out of there would help.

UPDATE: It turns I wasn't passing "targets" to preprocess(). Doh. It's working as expected.
The question is now how can I clean this up? Best practices? Something about it doesn't seem right.

This is the JS code:

//this spits out a list of tags

this._fetchAllTags = _.wrapCallback(db.fetchAllTags);


//this also spits out a list of tags

this._fetchCombinedNearestNeighbors = function(targets, alltags) {

  var run_algorithm = _.wrapCallback(function(target, callback) {

           db.findKNearestNeighbors(target, alltags, callback)
        });

  return _(targets)
    .map(run_algorithm)
    .parallel(4)
    .sequence()
    .collect()
    .map(uniq); //get unique
};


//as you can see this depends on output of previous two async calls

this._fetchSSA = _.wrapCallback(function(allnearestneighbors, alltags, callback) {

  db.fetchSSA(allnearestneighbors, alltags, callback)
});


//can we clean this up?

this.preprocess = function() {

  return that._fetchAllTags()
    .flatMap(function(alltags) {

      return that._fetchCombinedNearestNeighbors(targets, alltags)
         .flatMap(function (allneighbors) {

         console.log('hi') 
         return that._fetchSSA(allneighbors, alltags);
      });
     });
};

Source: (StackOverflow)

Integrating Meteor with Highland.js

I'm trying to marry two different ways of functional reactive programming.

Meteor has its own way of doing reactive things using Tracker. Using an autorun, you can bind two things together. For example, b.get() will always be equal to a.get() in the following scenario.

a = new ReactiveVar()
b = new ReactiveVar()
Tracker.autorun ->
  b.set(a.get())

Meteor's livequery basically gives you this reactivity from a database which is pretty cool. And now I'm trying to get this to work with Highland.js.

The naive approach is actually really easy:

stream = _()
Tracker.autorun ->
  results = Posts.find().fetch()
  stream.write(results)

Then you could so something like:

stream.map(renderResults)

The tricky part is cleaning up. Tracker.autorun returns a computation object that will stop the autorun when you're finished with it. Thus, once the stream is ended, I would like to stop the autorun.

stream = _()
c = Tracker.autorun ->
  results = Posts.find().fetch()
  stream.write(results)
# a function that doesnt exist...
stream.onEnd ->
  c.stop()

So the question at the end of the day, is how can I stop the Tracker computation when the stream ends so I can clean up?


Source: (StackOverflow)

Keep request POST stream open until .end()?

I can't get the following code to work since the 'upload' request is closed from the server before I can start piping in data. Not sure how to get this to work?

import _ from 'highland'
import request from 'request'

request.debug = true

(process.stdin)
  .split()
  .filter(id => id)
  .map(id => _([id])
    .through(request.post(`${process.env.HOST}/download`))
    .map(str => str.toString().toUpperCase())
    .through(request.post(`${process.env.HOST}/upload`))) // How to keep this open until .end()?
  .sequence()
  .pipe(process.stdout)

Output:

REQUEST make request http://localhost:8080/upload
REQUEST onRequestResponse http://localhost:8080/upload 200 { 'x-powered-by': 'Express',
  date: 'Sun, 19 Apr 2015 14:42:42 GMT',
  connection: 'close',
  'transfer-encoding': 'chunked' }
REQUEST finish init function http://localhost:8080/upload
REQUEST response end http://localhost:8080/upload 200 { 'x-powered-by': 'Express',
  date: 'Sun, 19 Apr 2015 14:42:42 GMT',
  connection: 'close',
  'transfer-encoding': 'chunked' }
REQUEST onRequestResponse http://localhost:8080/download 200 { 'x-powered-by': 'Express',
  date: 'Sun, 19 Apr 2015 14:42:42 GMT',
  connection: 'close',
  'transfer-encoding': 'chunked' }
REQUEST finish init function http://localhost:8080/download
events.js:85
      throw er; // Unhandled 'error' event
            ^
Error: write after end
    at ClientRequest.OutgoingMessage.write (_http_outgoing.js:413:15)

Source: (StackOverflow)

Dependency management in node.js with highland.js

I am getting huge value out of node.js and loving the stream processing model. I'm mostly using it for stream processing with data enrichment and ETL like jobs.

For enrichment, I may have a record like this...

{ "ip":"123.45.789.01", "productId": 12345 }

I would like to enrich this perhaps by adding product details

{ "ip":"123.45.789.01", "productId": 12345, "description" : "Coca-Cola 12Pk", "price":4.00 }

The data for the descriptions and the data for the prices both come from separate streams. What is the best way to approach such dependencies in highland?

H = require('highland')

descriptionStream = H(['[{"productId":1,"description":"Coca-Cola 12Pk"},{"productId":2,"description":"Coca-Cola 20oz Bottle"}]'])
  .flatMap(JSON.parse)

priceStream = H(['[{"productId":1,"price":4.00},{"productId":2,"price":1.25}]'])
  .flatMap(JSON.parse)

#  the file is a 10G file with a json record on each line
activityStream = H(fs.createReadStream('8-11-all.json',{flags:'r',encoding:'utf8'}))
  .splitBy("\n")
  .take(100000) # just take 100k for testing
  .filter((line)-> line.trim().length > 0) # to prevent barfing on empty lines
  .doto((v)->
    # here i want to add the decription from the descriptionStream
    # and i want to add the price from the price stream.
    # in order to do that, i need to make the execution of this
    # stream dependent on the completion of the first two and
    # availability of that data.  this is easy with declarative
    # programming but less intuitive with functional programming
  )
  .toArray((results)->
    # dump my results here
  )

Any thoughts?


Source: (StackOverflow)

Asynchronous transforms on streams in Highland.js

I'm trying to use Highland.js for a database update script on a set of Mongoose models, it seems pretty perfect for a QueryStream call on a Model.find(). I have some synchronous things to do (updating my model to conform to a new schema, a few cleanup operations), and at the end I want to save() the document. I have some pre-save hooks configured which need to run, and the updates aren't really compatible with a straight Model.update(). I've managed to get it sort-of working through a combination of Q.js and Highland:

var sender_stream = Sender.find({}).stream();
var promise_save = function(document) {
    var deferred = Q.defer();
    document.save(deferred.makeNodeResolver());
    return _(deferred.promise);
}

var sender_deferred = Q.defer();
_(sender_stream).map(function(sender) {
    // set some fields on sender...
    return sender;
}).map(promise_save).series().on('done', sender_deferred.resolve).resume();

However, this doesn't seem to resolve the promise and I'm not sure if this is the "right" way to keep things nice and stream-y...it also seems weird to combine Q.js and Highland.js so intimately. Is there a better way?


Source: (StackOverflow)