EzDevInfo.com

monifu

Reactive Programming for Scala and Scala.js monifu/monifu · GitHub monifu - reactive programming for scala and scala.js

Is Javascript event loop task queue overflow possible?

Is it possible to define a boundary that shouldn't be crossed for the application to scale well regarding task scheduling (over)use?

Questions :

  1. Is there a certain cost of doing setTimeout? Let say 0.1ms or CPU time? There is certainly order of magnitude lower cost than spawning a thread in different environments. But is there any?
  2. Is it better to avoid using setTimout for micro tasks that take like 1-2 ms ?
  3. Is there something that doesn't like scheduling? For instance I noticed of some sort of IndexedDb starvation for write locks when scheduling Store retrieval and other things
  4. Can DOM operations be scheduled safely ?

I'm asking because I started using Scala.js and an Rx implementation Monifu that is using scheduling at massive scale. Sometimes one line of code submits like 5 tasks to an event loop's queue so basically I'm asking myself, is there anything like task queue overflow that would slow the performance down? I'm asking this question especially when running test suites where hundreds of tasks might be enqueued per second.

Which leads to another question, is it possible to list cases when one should use RunNow/Trampoline scheduler and when Queue/Async scheduler in regards to Rx? I'm wondering about this every time I write stuff like obs.buffer(3).last.flatMap{..} which itself schedules multiple tasks


Source: (StackOverflow)

Scala Rx Observable using Monifu

I'm just trying to grasp the concepts between a hot and a cold observable and trying out the Monifu library. My understanding is that the following code should result in only one of the subscriber getting the events emitted by the Observable, but it is not!

scala> :paste
// Entering paste mode (ctrl-D to finish)

import monifu.reactive._
import scala.concurrent.duration._

import monifu.concurrent.Implicits.globalScheduler

val obs = Observable.interval(1.second).take(10)

val x = obs.foreach(a => println(s"from x ${a}"))
val y = obs.foreach(a => println(s"from y ${a}"))

// Exiting paste mode, now interpreting.

from x 0
from y 0
import monifu.reactive._
import scala.concurrent.duration._
import monifu.concurrent.Implicits.globalScheduler
obs: monifu.reactive.Observable[Long] = monifu.reactive.Observable$$anon$5@2c3c615d
x: Unit = ()
y: Unit = ()

scala> from x 1
from y 1
from x 2
from y 2
from x 3
from y 3
from x 4
from y 4
from x 5
from y 5
from x 6
from y 6
from x 7
from y 7
from x 8
from y 8
from x 9
from y 9

So, this to me looks like the Observable is publishing events to all interested subscribers?


Source: (StackOverflow)

Advertisements

Subject that emits events to subscribers in specific order with back-pressure

Imagine a pipe of subscribers that you emit event to and it visits one subscriber after another.

Having a PublishSubject and x subscribers/observables. Normally events are emitted to observers in a specific order but simultaneously regardless of when observers return. Is it possible to do this flow :

  1. emit event to observerA
  2. after osbserverA returns, emit the event to observerB
  3. after observerB returns, emit the event to observerC

I'm using RxScala and Monifu Rx implementations

Monifu even has a back-pressure implementation :

def onNext(elem: T): Future[Ack]

I'd like to see "And Result was : Changed !!" be printed out in this sample:

  val subject = PublishSubject[Int]()

  var result = "Not Changed"
  subject.subscribe { i =>
    Observable.timerOneTime(3.seconds, Continue).asFuture.map { x =>
      result = "Changed !!"
      x.get
    }
  }

  subject.subscribe { i =>
    Observable.timerOneTime(1.seconds, Continue).asFuture.map { x =>
      println("And Result was : " + result)
      x.get
    }
  }

  subject.onNext(1)

Is it possible in RxScala/RxJava or Monifu without extending Subject and overriding onNext implementation? These classes are declared final anyway so it would be rather hacking.


Source: (StackOverflow)