The aggregation pipeline is a framework for data aggregation, modeled on the concept of data processing pipelines.
The example below requires a
restaurantscollection in the
testdatabase. To create and populate the collection, follow the directions in github.
Include the following import statements:
import org.mongodb.scala._ import org.mongodb.scala.model.Aggregates._ import org.mongodb.scala.model.Accumulators._ import org.mongodb.scala.model.Filters._ import org.mongodb.scala.model.Projections._
This guide uses the
Observable implicits as covered in the Quick Start Primer.
Connect to a MongoDB Deployment
Connect to a MongoDB deployment and declare and define a
MongoDatabase and a
For example, include the following code to connect to a standalone MongoDB deployment running on localhost on port
27017 and define
database to refer to the
test database and
collection to refer to the
val mongoClient: MongoClient = MongoClient() val database: MongoDatabase = mongoClient.getDatabase("test") val collection: MongoCollection[Document] = database.getCollection("restaurants")
For additional information on connecting to MongoDB, see Connect to MongoDB.
To perform aggregation, pass a list of aggregation stages to the
The Scala driver provides the
Aggregates helper class that contains builders for aggregation stages.
In the following example, the aggregation pipeline
Then, uses a
$groupstage to group the matching documents by the
starsfield, accumulating a count of documents for each distinct value of
stars. The example uses
Aggregates.groupto build the
Accumulators.sumto build the accumulator expression. For the accumulator expressions for use within the
$groupstage, the Scala driver provides
collection.aggregate(Seq( Aggregates.filter(Filters.equal("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)) )).printResults()
Use Aggregation Expressions
In the following example, the aggregation pipeline uses a
$project stage to return only the
name field and the calculated field
firstCategory whose value is the first element in the
categories array. The example uses
Aggregates.project and various
Projections methods to build the
collection.aggregate( Seq( Aggregates.project( Projections.fields( Projections.excludeId(), Projections.include("name"), Projections.computed( "firstCategory", Document("$arrayElemAt"-> Seq("$categories", 0)) ) ) ) ) ).printResults()
Explain an Aggregation
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)))) .explain() .printResults()
The driver supports explain of aggregation pipelines starting with MongoDB 3.6.