Aggregation Framework

The aggregation pipeline is a framework for data aggregation, modeled on the concept of data processing pipelines.

Prerequisites

  • The example below requires a restaurants collection in the test database. To create and populate the collection, follow the directions in github.

  • Include the following import statements:

     import org.mongodb.scala._
    
     import org.mongodb.scala.model.Aggregates._
     import org.mongodb.scala.model.Accumulators._
     import org.mongodb.scala.model.Filters._
     import org.mongodb.scala.model.Projections._
    
important

This guide uses the Observable implicits as covered in the Quick Start Primer.

Connect to a MongoDB Deployment

Connect to a MongoDB deployment and declare and define a MongoDatabase and a MongoCollection instances.

For example, include the following code to connect to a standalone MongoDB deployment running on localhost on port 27017 and define database to refer to the test database and collection to refer to the restaurants collection.

val mongoClient: MongoClient = MongoClient()
val database: MongoDatabase = mongoClient.getDatabase("test")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

For additional information on connecting to MongoDB, see Connect to MongoDB.

Perform Aggregation

To perform aggregation, pass a list of aggregation stages to the MongoCollection.aggregate() method. The Scala driver provides the Aggregates helper class that contains builders for aggregation stages.

In the following example, the aggregation pipeline

  • First uses a $match stage to filter for documents whose categories array field contains the element Bakery. The example uses Aggregates.filter to build the $match stage.

  • Then, uses a $group stage to group the matching documents by the stars field, accumulating a count of documents for each distinct value of stars. The example uses Aggregates.group to build the $group stage and Accumulators.sum to build the accumulator expression. For the accumulator expressions for use within the $group stage, the Scala driver provides Accumulators helper class.

    // Note: this code example uses a custom implicit helper referenced in the Quick Start Primer
    collection.aggregate(Seq(
    Aggregates.filter(Filters.equal("categories", "Bakery")),
    Aggregates.group("$stars", Accumulators.sum("count", 1))
    )).printResults()
    

Use Aggregation Expressions

For $group accumulator expressions, the Scala driver provides Accumulators helper class. For other aggregation expressions, manually build the expression Document.

In the following example, the aggregation pipeline uses a $project stage to return only the name field and the calculated field firstCategory whose value is the first element in the categories array. The example uses Aggregates.project and various Projections methods to build the $project stage.

// Note: this code example uses a custom implicit helper referenced in the Quick Start Primer
collection.aggregate(
  Seq(
    Aggregates.project(
      Projections.fields(
        Projections.excludeId(),
        Projections.include("name"),
        Projections.computed(
          "firstCategory",
          Document("$arrayElemAt"-> Seq("$categories", 0))
        )
      )
    )
  )
).printResults()

Explain an Aggregation

To explain an aggregation pipeline, call the AggregateObservable.explain() method:

// Note: this code example uses a custom implicit helper referenced in the Quick Start Primer
collection.aggregate(
  Seq(Aggregates.filter(Filters.eq("categories", "Bakery")),
      Aggregates.group("$stars", Accumulators.sum("count", 1))))
  .explain()
  .printResults()

The driver supports explain of aggregation pipelines starting with MongoDB 3.6.