- Scala Driver
- Tutorials
- Aggregation
Aggregation Framework
The aggregation pipeline is a framework for data aggregation, modeled on the concept of data processing pipelines.
Prerequisites
The example below requires a
restaurants
collection in thetest
database. To create and populate the collection, follow the directions in github.Include the following import statements:
import org.mongodb.scala._ import org.mongodb.scala.model.Aggregates._ import org.mongodb.scala.model.Accumulators._ import org.mongodb.scala.model.Filters._ import org.mongodb.scala.model.Projections._
important
This guide uses the Observable
implicits as covered in the Quick Start Primer.
Connect to a MongoDB Deployment
Connect to a MongoDB deployment and declare and define a MongoDatabase
and a MongoCollection
instances.
For example, include the following code to connect to a standalone MongoDB deployment running on localhost on port 27017
and define database
to refer to the test
database and collection
to refer to the restaurants
collection.
val mongoClient: MongoClient = MongoClient()
val database: MongoDatabase = mongoClient.getDatabase("test")
val collection: MongoCollection[Document] = database.getCollection("restaurants")
For additional information on connecting to MongoDB, see Connect to MongoDB.
Perform Aggregation
To perform aggregation, pass a list of aggregation stages to the MongoCollection.aggregate()
method.
The Scala driver provides the Aggregates
helper class that contains builders for aggregation stages.
In the following example, the aggregation pipeline
First uses a
$match
stage to filter for documents whosecategories
array field contains the elementBakery
. The example usesAggregates.filter
to build the$match
stage.Then, uses a
$group
stage to group the matching documents by thestars
field, accumulating a count of documents for each distinct value ofstars
. The example usesAggregates.group
to build the$group
stage andAccumulators.sum
to build the accumulator expression. For the accumulator expressions for use within the$group
stage, the Scala driver providesAccumulators
helper class.// Note: this code example uses a custom implicit helper referenced in the Quick Start Primer collection.aggregate(Seq( Aggregates.filter(Filters.equal("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)) )).printResults()
Use Aggregation Expressions
For $group accumulator expressions, the Scala driver provides Accumulators
helper class. For other aggregation expressions, manually build the expression Document
.
In the following example, the aggregation pipeline uses a $project
stage to return only the name
field and the calculated field firstCategory
whose value is the first element in the categories
array. The example uses Aggregates.project
and various
Projections
methods to build the $project
stage.
// Note: this code example uses a custom implicit helper referenced in the Quick Start Primer
collection.aggregate(
Seq(
Aggregates.project(
Projections.fields(
Projections.excludeId(),
Projections.include("name"),
Projections.computed(
"firstCategory",
Document("$arrayElemAt"-> Seq("$categories", 0))
)
)
)
)
).printResults()
Explain an Aggregation
To explain an aggregation pipeline, call the
AggregateObservable.explain()
method:
// Note: this code example uses a custom implicit helper referenced in the Quick Start Primer
collection.aggregate(
Seq(Aggregates.filter(Filters.eq("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1))))
.explain()
.printResults()
The driver supports explain of aggregation pipelines starting with MongoDB 3.6.