EzDevInfo.com

RxJS

The Reactive Extensions for JavaScript ReactiveX

NodeJS Streams v.s. Observables

After learning about Observables, I find them quite similar to NodeJS streams. Both have a mechanism of notifying the consumer whenever new data arrives, an error occurs or there is no more data (EOF).

I would love to learn about the conceptual/functional differences between the two. Thanks!


Source: (StackOverflow)

Create a Observable that delays the next value

I'm trying to create an observable using RxJS that does what is pictured.

Expected observable mapping

  • Grabs a value and waits a fixed period of time before getting the next one.
  • The next one will be the last value emitted in the period of the wait, skipping the rest.
  • If an wait interval goes by where no value was emitted, the next one should be grabbed immediately as the last example of the image depicts.

Source: (StackOverflow)

Advertisements

What's the difference between Knockout.js and Rx.js?

Does anyone know the differences between RxJs and Knockout? To me on the surface they seem to be trying to solve the same problem, to build an event driven UI. But someone who has had experience with both, how do they differ/ how are they similar? Can you describe something about them to help me choose?


Source: (StackOverflow)

GHCJS: How do I import a high order javascript function using FFI?

How do I import in GHCJS a Javascript function like the following ?

xs.subscribe(function(x) { console.log(x) })

I tried various combinations of the following without success:

data Observable_
data Disposable_

type Observable a = JSRef Observable_
type Disposable = JSRef ()

foreign import javascript unsafe "$1.subscribe($2)"
  rx_subscribe :: Observable a -> JSRef (a -> IO()) -> IO Disposable

Any help is appreciated, and links to documentation of the GHCJS FFI.

Thanks


Source: (StackOverflow)

what is a `Scheduler` in RxJS

I'v seen the term Scheduler very frequently in the documentation.

But, what does this term mean? I even don't know how to use a so called Scheduler. The official documentation didn't tell me what a Scheduler exactly is. Is this just a common concept or a specific concept in RxJS?


Source: (StackOverflow)

Reactive Programming - RxJS vs EventEmitter in Node.js

Recently I've started looking at RxJS and RxJava(from Netflix) libraries which work on the concept of Reactive Programming.

Node.js works on the basis of event loops, which provides you all the arsenal for asynchronous programming and the subsequent node libraries like "cluster" help you to get best out of your multi-core machine. And Node.js also provides you the EventEmitter functionality where you can subscribe to events and act upon it asynchronously.

On the other hand if I understand correctly RxJS (and Reactive Programming in general) works on the principle of event streams, subscribing to event streams, transforming the event stream data asynchronously.

So, the question is what does using Rx packages in Node.js mean. How different is the Node's event loop, event emitter & subscriptions to the Rx's streams and subscriptions.


Source: (StackOverflow)

Does Functional Reactive Programming in JavaScript cause bigger problems with listener references?

In JavaScript the observer pattern is used quite often. There is one tricky thing with it and that's the references the subject keeps of the observers. They require cleanup. For regular applications I use the following rules of thumb:

  • If the subject has a life span shorter than (or equal to) the observer, I can just do subject.on('event', ...)
  • If the subject has a life span longer than the observer, I need to use observer.listenTo(subject, 'event', ...)

In the second case, the listenTo is aware of the life-cycle of the observer and it will automatically remove the listeners when it's time for the observer to die.

In modern day SPA (Single Page Application) style, where only parts of the application are active at any time this is something that becomes very important. If you combine that with web sockets, which are a perfect candidate for an event stream and most likely long lived, this becomes even more important.

With FRP, having something like an event stream representing changing values over time, I am (without knowing it) creating a lot of listeners. Each filter, map and flatMap creates a new stream that is tied (probably using a listener) to the previous one.

In my mind it seems quite tricky to determine how and when I need to remove those listeners. I can not imagine me being the first to think about this problem, yet I could not find much about this on the Internet.

I have seen some frameworks in other languages use weak references. JavaScript does not have the concept of weak references (WeakMap is not usable here). Even if it had though, it seems like a bad idea because it's unclear when garbage collection takes place.

  • How is this solved in the current frameworks?
  • Do the frameworks tie into the life-cycle of objects? If yes: how?

Source: (StackOverflow)

ReactiveX (Rx) - Detecting Long Press Events

I am wondering what is the canonical approach to solve the following problem in Rx: Say I have two observables, mouse_down and mouse_up, whose elements represent mouse button presses. In a very simplistic scenario, if I wanted to detect a long press, I could do it the following way (in this case using RxPy, but conceptually the same in any Rx implementation):

mouse_long_press = mouse_down.delay(1000).take_until(mouse_up).repeat()

However, problems arise when we need to hoist some information from the mouse_down observable to the mouse_up observable. For example, consider if the elements of the observable stored information about which mouse button was pressed. Obviously, we would only want to pair mouse_down with mouse_up of the corresponding button. One solution that I came up with is this:

mouse_long_press = mouse_down.select_many(lambda x:
    rx.Observable.just(x).delay(1000)\
        .take_until(mouse_up.where(lambda y: x.button == y.button))
)

If there is a more straight forward solution, I would love to hear it - but as far as I can tell this works. However, things get more complicated, if we also want to detect how far the mouse has moved between mouse_down and mouse_up. For this we need to introduce a new observable mouse_move, which carries information about the mouse position.

mouse_long_press = mouse_down.select_many(lambda x:
    mouse_move.select(lambda z: distance(x, z) > 100).delay(1000)\
        .take_until(mouse_up.where(lambda y: x.button == y.button))
)

However, this is pretty much where I get stuck. Whenever a button is held down longer than 1 second, I get a bunch of boolean values. However, I only want to detect a long press when all of them are false, which sounds like the perfect case for the all operator. It feels like there's only a small step missing, but I haven't been able to figure out how to make it work so far. Perhaps I am also doing things in a backwards way. Looking forward to any suggestions.


Source: (StackOverflow)

Unexpected cold Observable behavior

I thought I understood the difference between cold and hot Observable, but apparently something escapes me. This code works as expected:

var obs = Rx.Observable.interval(2000);

var A = obs.subscribe(function(value) { console.log('A', value) });
var B = obs.subscribe(function(value) { console.log('B', value) });

With this, I get the following result:

A 0
B 0
A 1
B 1
...

But when I add a flatMap to retrieve a remote JSONP resource:

var obs = Rx.Observable.interval(2000).flatMap(function() {
  return Rx.DOM.jsonpRequest({ url: URL });
})
.map(function(value) { return value.prop; });

var A = obs.subscribe(function(value) { console.log('A', value) });
var B = obs.subscribe(function(value) { console.log('B', value) });

I only receive A logs:

A prop
A prop
A prop
...

If turn the Observable into a hot one using publish().refCount(), it works as I would expect, which is that both subscribers receive the same values.

Now, I understand that by the Observer being cold, I shouldn't expect to receive the same values at the same time, but I would expect that both A and B observers receive values, not only A.

What am I missing here?


Source: (StackOverflow)

Using RxJs groupBy with objects as keys

I'm trying to use groupBy with RxJs, and I need to use objects as keys. If I don't and I use, for example, simple strings like this:

var types = stream.groupBy(
    function (e) { return e.x; }); //x is a string

then everything goes fine and my subscription is called once, and once only, for each different key. But if I try with objects, the subscription is called for each element from stream, even if a key happens to be the same as previous ones.

There is of course a problem about object equality, but this is where I get confused because I don't understant how to use the additional arguments for groupBy. The latest version of docs says there have a 3rd argument that can be a comparer, but it never gets called. Earlier docs talk about a key serializer which is a totally different idea, but neither ways work for me.

Looking at Rx source code, I see attempts to check for a getHashCode function, but I do not find and documentation about it. Code like this:

var types = stream.groupBy(
    function (e) {
        return { x: e.x, y: e.y };   //is this valid at all?
    },
    function (e) { return e; },
    function (...) { return ???; }); //what am I supposed to do here?

is what i'm trying to write, but no luck, and whatever I put for the 3rd callback is not called.

What's wrong here?


Source: (StackOverflow)

Reactive-Extensions / RxJS Implementatation to node.js

I simply want to implement

https://github.com/Reactive-Extensions/RxJS

to my node project.

Surely, there is the npm-package available, but I see it less updated, less modules, and uses only min. files, so I want to use rxjs from git sources.

I downloaded RxJS-master and copy the whole files under the Dir to ../myProject/lib/rx/

I see

rx.node.js among those files

var Rx = require('./rx');
require('./rx.aggregates');
require('./rx.binding');
require('./rx.coincidence');
require('./rx.experimental');
require('./rx.joinpatterns');
require('./rx.testing');
require('./rx.time');
module.exports = Rx;

so, my app.js code is like this

var rx = require("./lib/rx/rx.node.js")

function test()
{
    var as = new rx.AsyncSubject()
    setTimeout(function ()
    {
        as.onNext("works!")
        as.onCompleted()
    }, 500)
    return as
}

var a = test().subscribe(function (result)
{
    console.log("Got result: " + result)
})

This gives an error as follows,

.../rx/lib/rx/rx.binding.js:173
    var BehaviorSubject = Rx.BehaviorSubject = (function (_super) {
                          ^
ReferenceError: Rx is not defined
    at .../rx/lib/rx/rx.binding.js:173:27
    at Observable (.../rx/lib/rx/rx.binding.js:14:26)
    at Object.<anonymous> (.../rx/lib/rx/rx.binding.js:18:2)
    at Module._compile (module.js:449:26)
    at Object.Module._extensions..js (module.js:467:10)
    at Module.load (module.js:356:32)
    at Function.Module._load (module.js:312:12)
    at Module.require (module.js:362:17)
    at require (module.js:378:17)
    at Object.<anonymous> (.../rx/lib/rx/rx.node.js:3:1)

Process finished with exit code 1

What is wrong?


If I modify rx.node.js to

var Rx = require('./rx');
module.exports = Rx;

The code works as expected, so obviously require - sub modules section does not go well.


Thanks.


Source: (StackOverflow)

Is there an "async" version of filter operator in RxJs?

I need to filter entries emitted by an observable by checking the entry against some web service. The normal observable.filter operator is not suitable here, as it expects the predicate function to return the verdict synchronously, but in this situation, the verdict can only be retrieved asynchronously.

I can make shift by the following code, but I was wondering whether there is some better operator I can use for this case.

someObservable.flatmap(function(entry) {
  return Rx.Observable.fromNodeCallback(someAsynCheckFunc)(entry).map(function(verdict) {
    return {
      verdict: verdict,
      entry: entry
    };
  });
}).filter(function(obj) {
  return obj.verdict === true;
}).map(function(obj) {
  return obj.entry;
});

Source: (StackOverflow)

Separating single clicks from click and hold

I need to implement a behavior:

  • when element clicked - one thing happens
  • but when it's clicked and held for more than one second, something else happens (e.g element becomes draggable) and then the first event never fires

I think I know how to catch click&hold type of events, but how to distinguish between first and second?

Can you show me how to do that using this jsbin. I already made the "click, hold & drag" part, except that it is still firing the 'click' event after dragging the element and it shouldn't.

again: element clicked - one event, click and hold - element is draggable (even after mouse up) and when clicked again it's back to normal (undraggable) state.

I am not looking for a trivial solution, it has to be built using Rx.Observable or at least Bacon's streamEvent object

Thank you


Source: (StackOverflow)

how to avoid glitches in Rx

Unlike other "FRP" libraries, Rx doesn't prevent glitches: callbacks invoked with time-mismatched data. Is there a good way to work around this?

As an example, imagine that we have a series of expensive computations derived from a single stream (e.g. instead of _.identity, below, we do a sort, or an ajax fetch). We do distinctUntilChanged to avoid recomputing the expensive things.

sub = new Rx.Subject();
a = sub.distinctUntilChanged().share();
b = a.select(_.identity).distinctUntilChanged().share();
c = b.select(_.identity).distinctUntilChanged();
d = Rx.Observable.combineLatest(a, b, c, function () { return _.toArray(arguments); });
d.subscribe(console.log.bind(console));
sub.onNext('a');
sub.onNext('b');

The second event will end up causing a number of glitchy states: we get three events out, instead of one, which wastes a bunch of cpu and requires us to explicitly work around the mismatched data.

This particular example can be worked around by dropping the distinctUntilChanged, and writing some wonky scan() functions to pass through the previous result if the input hasn't changed. Then you can zip the results, instead of using combineLatest. It's clumsy, but doable.

However if there is asynchrony anywhere, e.g. an ajax call, then zip doesn't work: the ajax call will complete either synchronously (if cached) or asynchronously, so you can't use zip.

Edit

Trying to clarify the desired behavior with a simpler example:

You have two streams, a and b. b depends on a. b is asynchronous, but the browser may cache it, so it can either update independently of a, or at the same time as a. So, a particular event in the browser can cause one of three things: a updates; b updates; both a and b update. The desired behavior is to have a callback (e.g. render method) invoked exactly once in all three cases.

zip does not work, because when a or b fires alone, we get no callback from zip. combineLatest does not work because when a and b fire together we get two callbacks.


Source: (StackOverflow)

RxJS: Recursive list of observables and single observer

I've been having some trouble with a recursive chain of observables.

I am working with RxJS, which is currently in version 1.0.10621, and contains most basic Rx functionality, in conjunction with Rx for jQuery.

Let me introduce an example scenario for my problem: I am polling the Twitter search API (JSON response) for tweets/updates containing a certain keyword. The response also includes a "refresh_url" which one should use to generate follow-up request. The response to that follow-up request will again contain a new refresh_url, etc.

Rx.jQuery allows me to make the Twitter search API call an observable event, which produces one onNext and then completes. What I have tried so far is to have the onNext handler remember the refresh_url and use it in the onCompleted handler to produce both a new observable and corresponding observer for the next request. This way, one observable + observer pair follows the other indefinitely.

The problem with this approach is:

  1. The follow-up observable/observer are already alive when their predecessors have not yet been disposed of.

  2. I have to do lots of nasty bookkeeping to maintain a valid reference to the currently living observer, of which there can actually be two. (One in onCompleted and the other somewhere else in its life-cycle) This reference is, of course, needed to unsubscribe/dispose of the observer. An alternative to the bookkeeping would be to implement a side effect by the means of a "still running?"-boolean, as I have done in my example.

Example code:

            running = true;
            twitterUrl = "http://search.twitter.com/search.json";
            twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
            twitterMaxId = 0; //actually twitter ignores its since_id parameter

            newTweetObserver = function () {
                return Rx.Observer.create(
                        function (tweet) {
                            if (tweet.id > twitterMaxId) {
                                twitterMaxId = tweet.id;
                                displayTweet(tweet);
                            }
                        }
                    );
            }

            createTwitterObserver = function() {
                twitterObserver = Rx.Observer.create(
                        function (response) {
                            if (response.textStatus == "success") {
                                var data = response.data;
                                if (data.error == undefined) {
                                    twitterQuery = data.refresh_url;
                                    var tweetObservable;
                                    tweetObservable = Rx.Observable.fromArray(data.results.reverse());
                                    tweetObservable.subscribe(newTweetObserver());
                                }
                            }
                        },
                        function(error) { alert(error); },
                        function () {
                            //create and listen to new observer that includes a delay 
                            if (running) {
                                twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
                                twitterObservable.subscribe(createTwitterObserver());
                            }
                        } 
                    );
                return twitterObserver;
            }
            twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
            twitterObservable.subscribe(createTwitterObserver());

Don't be fooled by the double layer of observables/observers from requests to tweets. My example concerns itself mainly with the first layer: requesting data from Twitter. If in solving this problem the second layer (converting responses into tweets) can become one with the first one, that would be fantastic; But i think that's a whole different thing. For now.

Erik Meijer pointed out the Expand operator to me (see example below), and suggested Join patterns as an alternative.

var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
                   , i => ( i == 10 ? Observable.Empty<int>() // terminate
         : new[]{i+1}.ToObservable() // recurse
 )
);

ys.ToArray().Select(a => string.Join(",", a)).DumpLive();

This should be copy-pastable into LINQPad. It assumes singleton observables and produces one final observer.

So my question is: How can I do the expand trick nicest in RxJS?

EDIT:
The expand operator can probably be implemented as shown in this thread. But one would need generators (and I only have JS < 1.6).
Unfortunately RxJS 2.0.20304-beta does not implement the Extend method.


Source: (StackOverflow)