ReactiveMongo
Non-blocking, Reactive MongoDB Driver for Scala
ReactiveMongo reactivemongo, reactive scala driver for mongodb
I have a project set up with playframework 2.2.0
and play2-reactivemongo 0.10.0-SNAPSHOT
. I'd like to query for few documents by their ids, in a fashion similar to this:
def usersCollection = db.collection[JSONCollection]("users")
val ids: List[String] = /* fetched from somewhere else */
val query = ??
val users = usersCollection.find(query).cursor[User].collect[List]()
As a query I tried:
Json.obj("_id" -> Json.obj("$in" -> ids)) // 1
Json.obj("_id.$oid" -> Json.obj("$in" -> ids)) // 2
Json.obj("_id" -> Json.obj("$oid" -> Json.obj("$in" -> ids))) // 3
for which first and second return empty lists and the third fails with error assertion 10068 invalid operator: $oid
.
Source: (StackOverflow)
I am integrating an application using ReactiveMongo with a legacy application.
As, I must maintain legacy application interfaces at some point I must block and/or transform my code into the specified interface types. I have that code distilled down to the example below.
Is there a better way than, getChunks, to consume all of an Enumerator with the output type being a List? What is the standard practice?
implicit def legacyAdapter[TInput,TResult]
(block: Future[Enumerator[TInput]])
(implicit translator : (TInput => TResult),
executionContext:ExecutionContext,
timeOut : Duration): List[TResult] = {
val iter = Iteratee.getChunks[TResult]
val exhaustFuture = block.flatMap{
enumy => { enumy.map(i => translator(i) ).run(iter) }
}
val r = Await.result(exhaustFuture , timeOut)
r
}
Source: (StackOverflow)
I want to update a JSON document in MongoDB like this:
{
"_id":{"$oid":"52dfc13ec20900c2093155cf"},
"email": "joe@domain.com",
"name": "joe",
"_version": 2
}
... and want to create a vermongo document like this on each update:
{
"_id { "_id":{"$oid":"52dfc13ec20900c2093155cf"}, "_version": 1},
"email": "joe@domain.com",
"name": "joe",
"_version": 1,
"_timestamp" : "2014-02-02T00:11:45.542"
}
I've tried a solution like this:
trait MyDao {
...
private val shadowCollection = ReactiveMongoPlugin.db.collection[JSONCollection](
collection.name + ".vermongo"
)
private def toVersioned(deleted: Boolean) = __.json.update(
(__ \ '_id).json.copyFrom((__ \ '_id \ '$oid).json.pickBranch) andThen
(__ \ '_id \ '_version).json.copyFrom((__ \ '_version).json.pick) andThen
// (__ \ '_version).json.put(if (deleted) JsString(s"deleted:$version") else JsNumber(version)) andThen
(__ \ '_timestamp).json.put(Json.toJson(LocalDateTime.now))
)
private def version(doc: JsValue, deleted: Boolean): Future[LastError] = {
shadowCollection.insert(doc.transform(toVersioned(deleted)).get)
}
}
The toVersioned
method has three problems:
Line 1: It doesn't create the multi-fields _id
Line 2: It crashes when I try to create _version
as the second field of _id
Line 3: (commented out) if parameter deleted
is true
, I'd like to mark the document as deleted by replacing "_version": 1
with "_version": "deleted:1"
; it is not clear to me how to handle the condition here.
Source: (StackOverflow)
Here is an simple JSON I want to write/read to/from MongoDB:
{
"id": "ff59ab34cc59ff59ab34cc59",
"name": "Joe",
"surname": "Cocker"
}
Before storing it in MongoDB, "ff59ab34cc59ff59ab34cc59"
has to be transformed into an ObjectID
and id
renamed to _id
... so given the following Reads
, how do I achieve that?
val personReads: Reads[JsObject] = (
(__ \ 'id).read[String] ~ // how do I rename id to _id AND transform "ff59ab34cc59ff59ab34cc59" to an ObjectID?
(__ \ 'name).read[String] ~
(__ \ 'surname).read[String]
) reduce
And of course, I also need the contrary for my Writes
, i.e. renaming _id
to id
and transforming an ObjectID
to plain text in the format "ff59ab34cc59ff59ab34cc59"
.
Source: (StackOverflow)
I'm getting a:
No Json deserializer found for type Option[reactivemongo.bson.BSONObjectID]. Try to implement an implicit Reads or Format for this type.
When trying to deserialise my review object.
Review:
case class Review(override var id: Option[BSONObjectID] = None,
override var createdAt: Option[DateTime] = None,
override var updatedAt: Option[DateTime] = None,
grade: Int,
text: String,
originIPAddress: Option[String],
status: ReviewStatus,
origin: ReviewOrigin,
rId: Option[Long],
userId: Option[Long]
) extends TemporalModel
object Review {
import mongo_models.enums.EnumFormat._
implicit val reviewStatusReads = enumReads(ReviewStatus)
implicit val reviewOriginReads = enumReads(ReviewOrigin)
implicit val reviewReads: Reads[Review] = (
(__ \ "id").read[Option[BSONObjectID]] and
(__ \ "createdAt").read[Option[DateTime]] and
(__ \ "updatedAt").read[Option[DateTime]] and
(__ \ "grade").read[Int] and
(__ \ "text").read[String] and
(__ \ "originIPAddress").read[Option[String]] and
(__ \ "status").read[ReviewStatus] and
(__ \ "origin").read[ReviewOrigin] and
(__ \ "rId").read[Option[Long]] and
(__ \ "userId").read[Option[Long]]
)(Review.apply _)
implicit val reviewWrites: Writes[Review] = (
(__ \ "id").write[Option[BSONObjectID]] and
(__ \ "createdAt").write[Option[DateTime]] and
(__ \ "updatedAt").write[Option[DateTime]] and
(__ \ "grade").write[Int] and
(__ \ "text").write[String] and
(__ \ "originIPAddress").write[Option[String]] and
(__ \ "status").write[ReviewStatus] and
(__ \ "origin").write[ReviewOrigin] and
(__ \ "rId").write[Option[Long]] and
(__ \ "userId").write[Option[Long]]
)(unlift(Review.unapply))
val form = Form(
mapping(
"id" -> optional(of[String] verifying pattern(
"""[a-fA-F0-9]{24}""".r,
"constraint.objectId",
"error.objectId")),
"creationDate" -> optional(of[Long]),
"updateDate" -> optional(of[Long]),
"grade" -> number,
"text" -> text(minLength = 30, maxLength = 5000),
"originIPAddress" -> optional(of[String]),
"status" -> text,
"origin" -> text,
"rId" -> optional(of[Long]),
"userId" -> optional(of[Long])
) {
(id, createdAt, updatedAt, grade, text, originIPAddress, status, origin, rId, userId) =>
Review(
id.map(new BSONObjectID(_)),
createdAt.map(new DateTime(_)),
updatedAt.map(new DateTime(_)),
grade,
text,
originIPAddress,
ReviewStatus.withName(status),
ReviewOrigin.withName(origin),
rId,
userId
)
} {
review => {
Some(
(review.id.map(_.stringify)),
review.createdAt.map(_.getMillis),
review.updatedAt.map(_.getMillis),
review.grade,
review.text,
review.originIPAddress,
review.status.toString,
review.origin.toString,
review.rId,
review.userId
)
}
}
)
}
Source: (StackOverflow)
I've started using Play and the Play-ReactiveMongo plugin and testing for a 404 response on a GET "document by id" scenario. Unfortunately instead of Play returning a 404 NotFound response I get this exception:
java.util.NoSuchElementException: JsError.get
at play.api.libs.json.JsError.get(JsResult.scala:11) ~[play_2.10.jar:2.1.1]
at play.api.libs.json.JsError.get(JsResult.scala:10) ~[play_2.10.jar:2.1.1]
at play.modules.reactivemongo.json.collection.JSONGenericHandlers$StructureBufferWriter$.write(jsoncollection.scala:44) ~[play2-reactivemongo_2.10-0.9.jar:0.9]
at play.modules.reactivemongo.json.collection.JSONGenericHandlers$StructureBufferWriter$.write(jsoncollection.scala:42) ~[play2-reactivemongo_2.10-0.9.jar:0.9]
at reactivemongo.api.collections.GenericQueryBuilder$class.reactivemongo$api$collections$GenericQueryBuilder$$write(genericcollection.scala:323) ~[reactivemongo_2.10-0.9.jar:0.9]
at reactivemongo.api.collections.GenericQueryBuilder$class.cursor(genericcollection.scala:333) ~[reactivemongo_2.10-0.9.jar:0.9]
The getById function below successfully returns a single document if the id parameter matches an existing document, but an exception on the line "one[JsValue]" if document not found.
Route file:
GET /items/:id controllers.ItemsController.getById(id: String)
Controller object:
object ItemsController extends Controller with MongoController {
def itemsCollection: JSONCollection = db.collection[JSONCollection]("items")
def getById(id: String) = Action {
Async {
val query = Json.obj("_id" -> Json.obj("$oid" ->id))
val futureItem = itemsCollection.
find(query).
one[JsValue]
futureItem.map {
case Some(item) => Ok(item)
case None => NotFound(Json.obj("message" -> "No such item"))
}
}
}
}
Maybe I missed something in the docs?
There is partial example documented here:
https://github.com/sgodbillon/reactivemongo-demo-app#simple-query
The mandubian coast-to-coast example handles BadRequest as well NotFound scenario, but code is maybe out of date as it doesn't use the newer looking find(...).one[...] semantics?
http://mandubian.com/2013/01/13/JSON-Coast-to-Coast/#action-get
Source: (StackOverflow)
Reading the documentation about the Play Framework and ReactiveMongo leads me to believe that ReactiveMongo works in such a way that it uses few threads and never blocks.
However, it seems that the communication from the Play application to the Mongo server would have to happen on some thread somewhere. How is this implemented? Links to the source code for Play, ReactiveMongo, Akka, etc. would also be very appreciated.
The Play Framework includes some documentation about this on this page about thread pools. It starts off:
Play framework is, from the bottom up, an asynchronous web framework. Streams are handled asynchronously using iteratees. Thread pools in Play are tuned to use fewer threads than in traditional web frameworks, since IO in play-core never blocks.
It then talks a little bit about ReactiveMongo:
The most common place that a typical Play application will block is when it’s talking to a database. Unfortunately, none of the major databases provide asynchronous database drivers for the JVM, so for most databases, your only option is to using blocking IO. A notable exception to this is ReactiveMongo, a driver for MongoDB that uses Play’s Iteratee library to talk to MongoDB.
Following is a note about using Futures:
Note that you may be tempted to therefore wrap your blocking code in Futures. This does not make it non blocking, it just means the blocking will happen in a different thread. You still need to make sure that the thread pool that you are using there has enough threads to handle the blocking.
There is a similar note in the Play documentation on the page Handling Asynchronous Results:
You can’t magically turn synchronous IO into asynchronous by wrapping it in a Future. If you can’t change the application’s architecture to avoid blocking operations, at some point that operation will have to be executed, and that thread is going to block. So in addition to enclosing the operation in a Future, it’s necessary to configure it to run in a separate execution context that has been configured with enough threads to deal with the expected concurrency.
The documentation seems to be saying that ReactiveMongo is non-blocking, so you don't have to worry about it eating up a lot of the threads in your thread pool. But ReactiveMongo has to communicate with the Mongo server somewhere.
How is this communication implemented so that Mongo doesn't use up threads from Play's default thread pool?
Once again, links to the specific files in Play, ReactiveMongo, Akka, etc, would be very appreciated.
Source: (StackOverflow)
I'm using Scala, Play Framework 2.1.x, and reactivemongo driver.
I have an api call :
def getStuff(userId: String) = Action(implicit request => {
Async {
UserDao().getStuffOf(userId = userId).toList() map {
stuffLst => Ok(stuffLst)
}
}
})
It works fine 99% of the time but it may fail sometimes (doesn't matter why, that's not the issue).
I wanted to recover in a case of an error so i added:
recover { case _ => BadRequest("")}
But this does not recover me from errors.
I tried the same concept on the scala console and it worked:
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
var f = future { throw new Exception("") } map {_ => 2} recover { case _ => 1}
Await.result(f, 1 nanos)
This returns 1 as expected.
I currently wrapped the Async with:
try{
Async {...}
} catch {
case _ => BadRequest("")
}
And this catches the errors.
I went over some Scala's Future docs on the net and I'm baffled why recover did not work for me.
Does anyone know why? What do I miss to sort it out?
Source: (StackOverflow)
I have the following code that works in a console app when referencing "org.reactivemongo" %% "play2-reactivemongo" % "0.10.5.0.akka23"
when I update the reference to "org.reactivemongo" % "play2-reactivemongo_2.11" % "0.11.0.play23-M3"
I get:
No Json serializer as JsObject found for type play.api.libs.json.JsObject. Try to implement an implicit OWrites or OFormat for this type.
import org.joda.time.DateTime
import reactivemongo.bson.BSONObjectID
import play.modules.reactivemongo.json.BSONFormats._
case class GoogleToken
(
id: Option[BSONObjectID],
name: String,
emailAddress: String,
refreshToken: String,
expires: DateTime
)
object GoogleToken {
import play.api.libs.json.Json
// Generates Writes and Reads
implicit val googleTokenFormat = Json.format[GoogleToken]
}
and then
val collection = db.collectionJSONCollection
val query = Json.obj()
val cursor = collection.find(query).
cursor[GoogleToken](ReadPreference.nearest).
collect[List]()
What am I doing wrong?
Source: (StackOverflow)
I've started to learn reactivemongo by creating a simple Scala project. I started with SBT. Here is my build.sbt
file:
name := "mongo-test"
version := "1.0"
scalaVersion := "2.10.2"
libraryDependencies ++= Seq(
"org.reactivemongo" %% "reactivemongo" % "0.9"
)
But I get the error when executing the compile command:
[info] Resolving play#play-iteratees_2.10;2.1.0 ...
[warn] module not found: play#play-iteratees_2.10;2.1.0
[warn] ==== local: tried
[warn] /home/amir/.ivy2/local/play/play-iteratees_2.10/2.1.0/ivys/ivy.xml
[warn] ==== public: tried
[warn] http://repo1.maven.org/maven2/play/play-iteratees_2.10/2.1.0/play-iteratees_2.10-2.1.0.pom
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: UNRESOLVED DEPENDENCIES ::
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: play#play-iteratees_2.10;2.1.0: not found
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
UPDATE:
My SBT version is 0.13.0.
Source: (StackOverflow)
I am trying to find the best approach to sharing the same pool of connection between actors withing the cluster workers. I have the following structure:
Master Actor -> Worker Actors(can be up to 100 or more) -> MongoDB
Between workers and MongoDB I want to put reactivemongo, however I am not sure how exactly to provide connection pool sharing between all actors.
According to reactivemongo documentation:
A MongoDriver instance manages an actor system; a connection manages a pool of connections. In general, MongoDriver or create a MongoConnection are never instantiated more than once. You can provide a list of one ore more servers; the driver will guess if it's a standalone server or a replica set configuration. Even with one replica node, the driver will probe for other nodes and add them automatically.
Should I just create it in Master actor and then bundle with each message?
So, this would be in Master actor:
val driver = new MongoDriver
val connection = driver.connection(List("localhost"))
And then I pass connection to actors in a message. Or should I query a connection in each Work Actor and pass just driver in a message?
Any help is very appreciated.
Thanks.
Source: (StackOverflow)
I have a mongo entry where one field is a list. If entry doesn't exist, I want to add new one. If it exists, I want to append a new element to the list.
At the end I want to return an Ok to the client, but only after the operation was completed successfully. Not a strict requirement, but it's what makes most sense for the user, in my opinion.
This is what I have currently - it works, but on update it overwrites the old list, instead of appending a new element.
def myMethod(value:String, value2:String) = Action {
Async {
val myElement = Json.obj(
"key" -> value2
)
val myDBEntry = Json.obj(
"key" -> value,
"list" -> List(myElement)
)
collection.update(Json.obj("key" -> value), myDBEntry, upsert = true).map(lastError =>
Ok("Mongo LastError: %s".format(lastError)))
}
}
In order to check if the list exists and append the element / create a new list, I started with something like (this replaces the collection.update
block in previous code):
val futureResult:Future[Option[JsObject]] = collection.find(Json.obj("key" -> value)).one[JsObject]
futureResult.map { result =>
if (result.isEmpty) {
collection.insert(myDBEntry).map(lastError =>
Ok("Mongo LastError: %s".format(lastError)))
} else {
//this not the correct command yet - but compiler already fails because method is not returning correct future
collection.update(Json.obj("key" -> value), myDBEntry, upsert = true).map(lastError =>
Ok("Mongo LastError: %s".format(lastError)))
}
}
But the compiler doesn't seem to like this nesting: "Type mismatch, expected: Future[Result], actual: Future:[Future[SimpleResult[Nothing]]]"
Anyways I feel this way is a bit awkward, and there must be a more elegant way to solve this, but I'm new to Futures and ReactiveMongo and have no idea. How do I solve it?
Edit:
I also found this post But I think that's returning the response before the DB operation is finished and I don't want that.
Source: (StackOverflow)
I was wondering if there is a way to delete elements from the Play cache using a regex.
I'm using play 2.2.x and I'm storing elements in the cache following this pattern:
collectionName.identifier
Is there a way to expire the caché using a regular expression to match the key, like:
collectionName.[a-zA-Z0-9]+
The reason I want to do that is because sometimes I will update elements in db matching some fields, and I can't really know which elements were updated.
If there is a way in ReactiveMongo to get the updated object identifiers, that would help me as well.
Thanks for any help.
Source: (StackOverflow)
i'm writing a play 2.3 application using secure social and reactivemongo library, with scala.
Now i'm trying to implement the UserService[T] trait but i'm getting compile errors on the updatePasswordInfo method.
This is the method:
def updatePasswordInfo(user: LoginUser,info: PasswordInfo): scala.concurrent.Future[Option[BasicProfile]] = {
implicit val passwordInfoFormat = Json.format[PasswordInfo]
//the document query
val query = Json.obj("providerId" -> user.providerId,
"userId" -> user.userId
)
//search if the user exists
val futureUser: Future[Option[LoginUser]] = UserServiceLogin.find(query).one
futureUser map {
case Some(x) => val newPassword = Json.obj("passswordInfo" -> info)// the new password
UserServiceLogin.update(query, newPassword) //update the document
val newDocument: Future[Option[LoginUser]] = UserServiceLogin.find(query).one
newDocument map {
case Some(x) => x
case None => None
} //return the new LoginUser
case None => None
}
}
And this is the compiler error:
/Users/alberto/git/recommendation-system/app/security/UserService.scala:203: type mismatch;
[error] found : scala.concurrent.Future[Product with Serializable]
[error] required: Option[securesocial.core.BasicProfile]
[error] newDocument map {
What's wrong?
Source: (StackOverflow)
I am trying to save an attachment using reactivemongo in Play 2.1 using the following code:
def upload = Action(parse.multipartFormData) { request =>
request.body.file("carPicture").map { picture =>
val filename = picture.filename
val contentType = picture.contentType
val gridFS = new GridFS(db, "attachments")
val fileToSave = DefaultFileToSave(filename, contentType)
val futureResult: Future[ReadFile[BSONValue]] = gridFS.writeFromInputStream(fileToSave, new FileInputStream(new File(filename)))
Ok(Json.obj("e" -> 0))
}.getOrElse {
Redirect(routes.Application.index).flashing(
"error" -> "Missing file"
)
}
}
I am getting the following error:
could not find implicit value for parameter readFileReader: reactivemongo.bson.BSONDocumentReader[reactivemongo.api.gridfs.ReadFile[reactivemongo.bson.BSONValue]]
[error] val futureResult: Future[ReadFile[BSONValue]] = gridFS.writeFromInputStream(fileToSave, new FileInputStream(new File(filename)))
What am I missing?
Thanks,
GA
Source: (StackOverflow)