highland
High-level streams library for Node.js and the browser
Highland.js
Consider a problem:
- split file by lines
- write lines to a result file
- if a result file exceeds some size create a new result file
For example, if I have a file which weights 4gb and split size is equal 1gb. The result is four files weights 1gb.
I'm looking for a solution with something like Rx*/Bacon or any other similar library in any language.
Source: (StackOverflow)
I'm trying to figure out if it's possible to retry a operation with highland js
highland( arrayOfOperations )
.map( doOperation )
.parallel( numberOfParallelOperations )
.errors( (operation) ->
if operation.attempts < maxRetries
// retry the operation
)
.each( (operation) ->
// operation success
)
.done(
// all complete
)
```
Here's my code, I'm trying to figure out ho I retry an operation when I get an error.
Source: (StackOverflow)
I am trying to use the request library which streams responses. Following code works as expected.
request.get('http://someurl.com')
.on('response', function(response) {
console.log(response.statusCode) // 200
});
However, when I try to do the same thing with highland.js the event handler is never fired.
var r = request.get('http://someurl.com');
var p =_('response', r);
p.map(function(x){
console.log(x.statusCode);
});
Is there something super trivial that I am missing here?
Source: (StackOverflow)
So I have a highland stream:
var getData = highland.wrapCallback(fs.readFile);
var fileStream = getData('myfile')
console.log(fileStream)
How do I read from fileStream
? According to the Highland docs, there is no read()
method on streams.
Source: (StackOverflow)
I written a simple script which should count line
var
H = require('highland'),
fs = require('fs'),
split = require('split');
var lineStream = fs.createReadStream('data-samples/sample.log').pipe(split());
H('data', lineStream).reduce(0, function(count) {
return count + 1;
}).each(console.log);
But for some reasons I see nothing in console. Documentation says about lazyness but each
call should "invoke" the stream. How to fix the issue?
NB: It's a question about highland.js and not about a way to count lines
Source: (StackOverflow)
This is probably a newbie question but I searched and couldn't find a satisfying answer.
My node.js application seems to consume a lot of memory. Each process consumes about 100MB. I heard that nodejs itself has a ~30MB memory footprint per process.
The application is a JSON api, backed by MongoDB. In may cases, one API request will result in many database requests, mainly to populate the child relationships. A typical query is like this: (1) get an array of objectIds based on query condition, and (2) iterate each objectId, and issue a query to the database to populate the data (some call that hydration).
The code is heavily using async.js. I tried to profile the memory usage and it seems async.js is using a lot of memory but there is no sign of memory leak. The author of async.js also came out with a stream library highland.js (http://highlandjs.org/). I am new to nodejs stream, and I am curious if this is a possible tool to replace async.js? The web site seems to mention underscore
but I mainly use async.js
for the asynchronous processing.
Thanks!
Source: (StackOverflow)
I am having problems with getting work the nodejs request
stream as I pass to map it's already empty array.
var _ = require('highland'),
fs = require('fs'),
request = require('request');
// This works but not using the stream approach
// function get(path) {
// return _(function (push, next) {
// request(path, function (error, response, body) {
// // The response itself also contains the body
// push(error, response);
// push(null, _.nil);
// });
// });
// }
var google = _(request.get('http://www.google.com'));
google
// res is empty array
.map(function (res) {
// console.log(res);
return res;
})
// res is empty array
.toArray(function (res) {
console.log(res);
});
Source: (StackOverflow)
I'm working off of a module that's built in knockout.js and I want to map changes to observables onto an event stream after the manner of RxJs, highland.js or bacon.js.
I know there are ways to do this by extending knockout, like ko.subscribable.fn.someFilter
. But that assumes I want knockout to do my work for me - I don't, particularly.
Typically in Bacon or highland you would see something like this:
//bacon
var strm1 = $('#el').toEventStream('click');
strm1.map(transformFn);
//highland
var strm2 = _('click',$('#el'));
strm2.map(transformFn);
Unfortunately in knockout I'm not able to find an easy way to hook onto events since knockout implements pub/sub rather than emitters. Any insights into how I might hook on?
I was thinking something like a mediator emitter that subscribes to all observable properties and then emits the events.
assuming I have something like this bound by a ko viewmodel:
<input data-bind="value:name" />
and the viewmodel is something like
function Person(){
this.name = ko.observable('Bob');
this.age = ko.observable(21);
//...
};
var person = new Person();
var emitter = Emitter({});
var streams = {};
and then I am mapping as follows, where _
is highland.
for(var key in person){
streams[key]=_(key+':changed',watcher);
person[key].subscribe(function(){
watcher.emit(key+':changed',arguments);
});
}
function streamHandler(){
console.log('streamHandler got',arguments);
}
All well and good, my emitter fires just fine,but then this does nothing:
streams['name'].each(streamHandler);
So, I am missing something. What's wrong?
Source: (StackOverflow)
I'm trying to create a project development automation (like grunt/gulp) tool
using Highland.js
to replace my current gulp.js setup:
https://github.com/Industrial/id-project. I'm not sure how exactly to use
Highland.js
so the current code is nothing but a guess;
fs = require "fs"
path = require "path"
_ = require "highland"
mkdirp = require "mkdirp"
rimraf = require "rimraf"
sourceDirectory = path.resolve "./src"
buildDirectory = path.resolve "./build"
# 1) How do I avoid using wrapCallback for all the logic and actually do it
# functionally and with Highland.js?
createDirectories = _.wrapCallback (cb) ->
# 2) How do I parallelize these? Don't tell me "Use async.js." ...
mkdirp sourceDirectory, ->
mkdirp buildDirectory, cb
cleanBuildDirectory = _.wrapCallback (cb) ->
rimraf buildDirectory, cb
compile = _.wrapCallback (cb) ->
# Eventually I want to compile CoffeeScript, Jade, Less, Copy other files,
# Browserify, and BrowserSync.
cb()
watch = _.wrapCallback (cb) ->
# And on the fly.
cb()
# 3) How do I get a pipeline of streams to start executing these steps in series
# and within the steps things in series and in parallel?
pipeline = _.pipeline createDirectories,
cleanBuildDirectory,
compile,
watch
# 4) Is the way I initially start the program okay?
pipeline
.pipe process.stdout
pipeline.write "start"
Source: (StackOverflow)
I need a consumer which will be able:
- buffer incomming data
- not pause provider
- emit new data in async mode
I.e. something like this:
var consumer = function() {
var buffer = [];
var jobStarted = false;
var startAsyncJob = function() {
if(jobStarted) {
return;
}
jobStarted = true;
// later
onJobComplete();
};
var onJobComplete = function() {
jobStarted = false;
// TODO: push buffer or some other data downstream
};
return function(err, value, push, next) {
// I will skip here all if(err) and if (value === _.nil)
buffer.push(value);
startAsyncJob();
};
};
// usage
source
.consume(consumer())
.map(function() {/* something here */})
.each(_.log);
Is it possible to do with help highland.js?
Thanks
Source: (StackOverflow)
I have a complex workflow I'm trying to get working in highlandjs. What makes it complex is (1) you need to transform a list-of-futures to future-of-list and (2) it's a sequence of async calls where the 3rd call depends on both the output of the 1st and 2nd.
It's working in Scala. Checkout this gist: https://gist.github.com/frankandrobot/ad857698f680aceb44e8
Unfortunately, I haven't been able to convert #preprocess1 into the highland equivalent.
- The highland version doesn't return anything (when calling #apply).
- A debug "hi from console" doesn't get fired either, so it's not getting called completely.
- The 3 async calls have been tested and are working.
UPDATE: yea, I'm in highlandjs nested hell. So pointers on how to get out of there would help.
UPDATE: It turns I wasn't passing "targets" to preprocess(). Doh. It's working as expected.
The question is now how can I clean this up? Best practices? Something about it doesn't seem right.
This is the JS code:
//this spits out a list of tags
this._fetchAllTags = _.wrapCallback(db.fetchAllTags);
//this also spits out a list of tags
this._fetchCombinedNearestNeighbors = function(targets, alltags) {
var run_algorithm = _.wrapCallback(function(target, callback) {
db.findKNearestNeighbors(target, alltags, callback)
});
return _(targets)
.map(run_algorithm)
.parallel(4)
.sequence()
.collect()
.map(uniq); //get unique
};
//as you can see this depends on output of previous two async calls
this._fetchSSA = _.wrapCallback(function(allnearestneighbors, alltags, callback) {
db.fetchSSA(allnearestneighbors, alltags, callback)
});
//can we clean this up?
this.preprocess = function() {
return that._fetchAllTags()
.flatMap(function(alltags) {
return that._fetchCombinedNearestNeighbors(targets, alltags)
.flatMap(function (allneighbors) {
console.log('hi')
return that._fetchSSA(allneighbors, alltags);
});
});
};
Source: (StackOverflow)
I'm trying to marry two different ways of functional reactive programming.
Meteor has its own way of doing reactive things using Tracker. Using an autorun
, you can bind two things together. For example, b.get()
will always be equal to a.get()
in the following scenario.
a = new ReactiveVar()
b = new ReactiveVar()
Tracker.autorun ->
b.set(a.get())
Meteor's livequery basically gives you this reactivity from a database which is pretty cool. And now I'm trying to get this to work with Highland.js.
The naive approach is actually really easy:
stream = _()
Tracker.autorun ->
results = Posts.find().fetch()
stream.write(results)
Then you could so something like:
stream.map(renderResults)
The tricky part is cleaning up. Tracker.autorun returns a computation object that will stop the autorun when you're finished with it. Thus, once the stream is ended, I would like to stop the autorun.
stream = _()
c = Tracker.autorun ->
results = Posts.find().fetch()
stream.write(results)
# a function that doesnt exist...
stream.onEnd ->
c.stop()
So the question at the end of the day, is how can I stop the Tracker computation when the stream ends so I can clean up?
Source: (StackOverflow)
I can't get the following code to work since the 'upload' request is closed from the server before I can start piping in data. Not sure how to get this to work?
import _ from 'highland'
import request from 'request'
request.debug = true
(process.stdin)
.split()
.filter(id => id)
.map(id => _([id])
.through(request.post(`${process.env.HOST}/download`))
.map(str => str.toString().toUpperCase())
.through(request.post(`${process.env.HOST}/upload`))) // How to keep this open until .end()?
.sequence()
.pipe(process.stdout)
Output:
REQUEST make request http://localhost:8080/upload
REQUEST onRequestResponse http://localhost:8080/upload 200 { 'x-powered-by': 'Express',
date: 'Sun, 19 Apr 2015 14:42:42 GMT',
connection: 'close',
'transfer-encoding': 'chunked' }
REQUEST finish init function http://localhost:8080/upload
REQUEST response end http://localhost:8080/upload 200 { 'x-powered-by': 'Express',
date: 'Sun, 19 Apr 2015 14:42:42 GMT',
connection: 'close',
'transfer-encoding': 'chunked' }
REQUEST onRequestResponse http://localhost:8080/download 200 { 'x-powered-by': 'Express',
date: 'Sun, 19 Apr 2015 14:42:42 GMT',
connection: 'close',
'transfer-encoding': 'chunked' }
REQUEST finish init function http://localhost:8080/download
events.js:85
throw er; // Unhandled 'error' event
^
Error: write after end
at ClientRequest.OutgoingMessage.write (_http_outgoing.js:413:15)
Source: (StackOverflow)
I am getting huge value out of node.js and loving the stream processing model. I'm mostly using it for stream processing with data enrichment and ETL like jobs.
For enrichment, I may have a record like this...
{ "ip":"123.45.789.01", "productId": 12345 }
I would like to enrich this perhaps by adding product details
{ "ip":"123.45.789.01", "productId": 12345, "description" : "Coca-Cola 12Pk", "price":4.00 }
The data for the descriptions and the data for the prices both come from separate streams. What is the best way to approach such dependencies in highland?
H = require('highland')
descriptionStream = H(['[{"productId":1,"description":"Coca-Cola 12Pk"},{"productId":2,"description":"Coca-Cola 20oz Bottle"}]'])
.flatMap(JSON.parse)
priceStream = H(['[{"productId":1,"price":4.00},{"productId":2,"price":1.25}]'])
.flatMap(JSON.parse)
# the file is a 10G file with a json record on each line
activityStream = H(fs.createReadStream('8-11-all.json',{flags:'r',encoding:'utf8'}))
.splitBy("\n")
.take(100000) # just take 100k for testing
.filter((line)-> line.trim().length > 0) # to prevent barfing on empty lines
.doto((v)->
# here i want to add the decription from the descriptionStream
# and i want to add the price from the price stream.
# in order to do that, i need to make the execution of this
# stream dependent on the completion of the first two and
# availability of that data. this is easy with declarative
# programming but less intuitive with functional programming
)
.toArray((results)->
# dump my results here
)
Any thoughts?
Source: (StackOverflow)
I'm trying to use Highland.js for a database update script on a set of Mongoose models, it seems pretty perfect for a QueryStream call on a Model.find()
. I have some synchronous things to do (updating my model to conform to a new schema, a few cleanup operations), and at the end I want to save()
the document. I have some pre-save hooks configured which need to run, and the updates aren't really compatible with a straight Model.update()
. I've managed to get it sort-of working through a combination of Q.js and Highland:
var sender_stream = Sender.find({}).stream();
var promise_save = function(document) {
var deferred = Q.defer();
document.save(deferred.makeNodeResolver());
return _(deferred.promise);
}
var sender_deferred = Q.defer();
_(sender_stream).map(function(sender) {
// set some fields on sender...
return sender;
}).map(promise_save).series().on('done', sender_deferred.resolve).resume();
However, this doesn't seem to resolve the promise and I'm not sure if this is the "right" way to keep things nice and stream-y...it also seems weird to combine Q.js and Highland.js so intimately. Is there a better way?
Source: (StackOverflow)