Aggregates

The Aggregates class provides static factory methods that build aggregation pipeline stages. Each method returns an instance of the Bson type, which can in turn be passed to the aggregate method of MongoCollection.

For brevity, you may choose to import the methods of the Aggregates class statically:

import static com.mongodb.client.model.Aggregates.*;

All the examples below assume this static import.

Match

The $match pipeline stage passes all documents matching the specified filter to the next stage. Though the filter can be an instance of any class that implements Bson, it’s convenient to combine with use of the Filters class. In the example below, it’s assumed that the eq method of the Filters class has been statically imported.

This example creates a pipeline stage that matches all documents where the author field is equal to "Dave":

match(eq("author", "Dave"))

Project

The $project pipeline stage passes the projected fields of all documents to the next stage. Though the projection can be an instance of any class that implements Bson, it’s convenient to combine with use of the Projections class. In the example below, it’s assumed that the include, excludeId, and fields methods of the Projections class have been statically imported.

This example creates a pipeline stage that excludes the _id field but includes the title and author fields:

project(fields(include("title", "author"), excludeId()))

Projecting Computed Fields

The $project stage can project computed fields as well.

This example simply projects the qty field into a new field called quantity. In other words, it renames the field:

project(computed("quantity", "$qty"))

Sample

The $sample pipeline stage randomly select N documents from its input. This example creates a pipeline stage that randomly selects 5 documents from the collection:

sample(5)

Sort

The $sort pipeline stage passes all documents to the next stage, sorted by the specified sort criteria. Though the sort criteria can be an instance of any class that implements Bson, it’s convenient to combine with use of the Sorts class. In the example below, it’s assumed that the descending, ascending, and orderBy methods of the Sorts class have been statically imported.

This example creates a pipeline stage that sorts in descending order according to the value of the age field and then in ascending order according to the value of the posts field:

sort(orderBy(descending("age"), ascending("posts")))

Skip

The $skip pipeline stage skips over the specified number of documents that pass into the stage and passes the remaining documents to the next stage.

This example skips the first 5 documents:

skip(5)

Limit

The $limit pipeline stage limits the number of documents passed to the next stage.

This example limits the number of documents to 10:

limit(10)

Lookup

Starting in 3.2, MongoDB provides a new $lookup pipeline stage that performs a left outer join with another collection to filter in documents from the joined collection for processing.

This example performs a left outer join on the fromCollection collection, joining the local field to the from field and outputted in the joinedOutput field:

lookup("fromCollection", "local", "from", "joinedOutput")

Starting in 3.6, the $lookup pipeline stage also supports uncorrelated subqueries between two collections as well as allows other join conditions besides a single equality match.

For example, given a collection orders with the following documents:

  { "_id" : 1, "item" : "almonds", "price" : 12, "ordered" : 2 }
  { "_id" : 2, "item" : "pecans",  "price" : 20, "ordered" : 1 }
  { "_id" : 3, "item" : "cookies", "price" : 10, "ordered" : 60 }

and another collection warehouses with the following documents:

  { "_id" : 1, "stock_item" : "almonds", "warehouse": "A", "instock" : 120 }
  { "_id" : 2, "stock_item" : "pecans",  "warehouse": "A", "instock" : 80 }
  { "_id" : 3, "stock_item" : "almonds", "warehouse": "B", "instock" : 60 }
  { "_id" : 4, "stock_item" : "cookies", "warehouse": "B", "instock" : 40 }
  { "_id" : 5, "stock_item" : "cookies", "warehouse": "A", "instock" : 80 }

The following $lookup stage, executed in an aggregation against the orders collection, joins with the warehouses collection by the item and whether the quantity in stock is sufficient to cover the ordered quantity:

        List<Variable<?>> variables = asList(
                new Variable<>("order_item", "$item"),
                new Variable<>("order_qty", "$ordered"));

        List<Bson> pipeline = asList(
                match(expr(new Document("$and",
                        asList(new Document("$eq", asList("$stock_item", "$$order_item")),
                                new Document("$gte", asList("$instock", "$$order_qty")))))),
                project(fields(exclude("stock_item"), excludeId())));

        lookup("warehouses", variables, pipeline, "stockdata");

The aggregation produces the following documents:

{ "_id" : 1, "item" : "almonds", "price" : 12, "ordered" : 2,
   "stockdata" : [ { "warehouse" : "A", "instock" : 120 }, { "warehouse" : "B", "instock" : 60 } ] }
{ "_id" : 2, "item" : "pecans", "price" : 20, "ordered" : 1,
   "stockdata" : [ { "warehouse" : "A", "instock" : 80 } ] }
{ "_id" : 3, "item" : "cookies", "price" : 10, "ordered" : 60,
   "stockdata" : [ { "warehouse" : "A", "instock" : 80 } ] }

Group

The $group pipeline stage groups documents by some specified expression and outputs to the next stage a document for each distinct grouping. A group consists of an _id which specifies the expression on which to group, and zero or more accumulators which are evaluated for each grouping. To simplify the expression of accumulators, the driver includes an Accumulators class with static factory methods for each of the supported accumulators. In the example below, it’s assumed that the sum and avg methods of the Accumulators class have been statically imported.

This example groups documents by the value of the customerId field, and for each group accumulates the sum and average of the values of the quantity field into the totalQuantity and averageQuantity fields, respectively.

group("$customerId", sum("totalQuantity", "$quantity"), avg("averageQuantity", "$quantity"))

Unwind

The $unwind pipeline stage deconstructs an array field from the input documents to output a document for each element.

This example outputs, for each document, a document for each element in the sizes array:

unwind("$sizes")

Available with MongoDB 3.2, this example also includes any documents that have missing or null values for the $sizes field or where the $sizes list is empty:

unwind("$sizes", new UnwindOptions().preserveNullAndEmptyArrays(true))

Available with MongoDB 3.2, this example unwinds the sizes array and also outputs the array index into the $position field:

unwind("$sizes", new UnwindOptions().includeArrayIndex("$position"))

Out

The $out pipeline stage outputs all documents to the specified collection. It must be the last stage in any aggregate pipeline:

This example writes the pipeline to the authors collection:

out("authors")

Merge

The $merge pipeline stage merges all documents into the specified collection. It must be the last stage in any aggregate pipeline:

This example merges the pipeline into the authors collection using the default options:

merge("authors")

This example merges the pipeline into the authors collection using some non-default options:

merge(new MongoNamespace("reporting", customers"),
    new MergeOptions().uniqueIdentifier(Arrays.asList("date", "customerId"))
                      .whenMatched(MergeOptions.WhenMatched.REPLACE)
                      .whenNotMatched(MergeOptions.WhenNotMatched.INSERT))

GraphLookup

The $graphLookup pipeline stage performs a recursive search on a specified collection to match field A of one document to some field B of the other documents. For the matching documents, the stage repeats the search to match field A from the matching documents to the field B of the remaining documents until no new documents are encountered or until a specified depth. To each output document, $graphLookup adds a new array field that contains the traversal results of the search for that document.

The following example computes the social network graph for users in the contacts collection, recursively matching the value in the friends field to the name field, up to recursive depth of 1.

graphLookup("contacts", "$friends", "friends", "name", "socialNetwork",
	new GraphLookupOptions().maxDepth(1))

Using GraphLookupOptions, the output can be tailored to restrict the depth of the recursion as well to inject a field containing the depth of the recursion at which a document was included.

The recursive search can be filtered by specifying additional conditions.

graphLookup("contacts", "$friends", "friends", "name", "socialNetwork",
	new GraphLookupOptions().maxDepth(1).restrictSearchWithMatch(Filters.eq("hobbies","golf"))

SortByCount

The $sortByCount stage groups documents by a given expression and then sorts these groups by count in descending order. The sortByCount outputs documents that contains an _id field, which contains the discrete values of the given expression, and the count field that contains the number of documents that fall into that group.

The following example groups documents by the truncated value of the field x and computes the count for each distinct value of x.

sortByCount(new Document("$floor", "$x"))

ReplaceRoot

The $replaceRoot pipeline stage replaces each input document to the stage with the specified document. All existing fields, including the _id field, are replaced.

If each input document to the replaceRoot stage has a field a1 that contains a field b whose value is a document, the following operation replaces each input document with the document in the b field.

replaceRoot("$a1.b")

AddFields

The $addFields pipeline stage adds new fields to documents. The stage outputs documents that contain all existing fields from the input documents and the newly added fields.

This example adds two new fields, myNewField and z to the input documents; myNewField has the value {c: 3, d: 4}, z has the value 5.

addFields(new Field("myNewField", new Document("c", 3).append("d", 4)),
	new Field("z", 5))

These new fields do not need to be statically defined. The following example shows how to add a new field which is a function of the current document’s values. In this case, a new field alt3 is added with a value of true if the current value of the field a is less than 3. Otherwise, alt3 will be false in the new field.

addFields(new Field("alt3", new Document("$lt", asList("$a", 3))))

Count

The $count pipeline stage specifies the name of the field that will contain the number of documents that enter this stage. The $count stage is syntactic sugar for: {$group:{_id:null, count:{$sum:1}}}

There are two ways to invoke this stage. The first way is to explicitly name the resulting field as in the two following examples:

count("count")
count("total")

These two invocations will put the count in the count and total fields respectively. If count is the field name to be used, this can be shortened with the following convenience method:

count()

This invocation defaults the field name to count.

Bucket

The $bucket pipeline stage automates the bucketing of data around predefined boundary values.

The following example shows a basic $bucket stage:

bucket("$screenSize", [0, 24, 32, 50, 70, 200])

This will result in output that looks like this:

[_id:0, count:1]
[_id:24, count:2]
[_id:32, count:1]
[_id:50, count:1]
[_id:70, count:2]

The default output is simply the lower bound as the _id and a single field containing the size of that bucket. This output can be modified using the BucketOptions class. The above example can be expanded to look like this:

bucket("$screenSize", [0, 24, 32, 50, 70], new BucketOptions()
                .defaultBucket("monster")
                .output(sum("count", 1), push("matches", "$screenSize")))

The optional value defaultBucket defines the name of the bucket for values that fall outside defined bucket boundaries. If defaultBucket is undefined and values exist outside of the defined bucket boundaries, the stage will produce an error. The other value is the output field which defines the shape of the document output for each bucket. The output of this stage looks something like this:

[[_id: 0, count: 1, matches: [22]],
 [_id: 24, count: 2, matches: [24, 30]],
 [_id: 32, count: 1, matches: [42]],
 [_id: 50, count: 1, matches: [55]],
 [_id: monster, count: 2, matches: [75, 155]]]

This output contains not only the size of the bucket but also the values in the bucket. Notice the enormous screen sizes are found in the synthetic bucket named monster reflecting the outrageously large screen sizes.

BucketAuto

The $bucketAuto pipeline stage automatically determines the boundaries of each bucket in its attempt to distribute the documents evenly into a specified number of buckets. Depending on the input documents, the number of buckets may be less than the specified number of buckets.

For example, this stage creates 10 buckets:

bucketAuto("$price", 10)

This results in output that looks something like this:

[[_id: [min: 2, max: 30], count: 14],
 [_id: [min: 30, max: 58], count: 14],
 [_id: [min: 58, max: 86], count: 14],
 [_id: [min: 86, max: 114], count: 14],
 [_id: [min: 114, max: 142], count: 14],
 [_id: [min: 142, max: 170], count: 14],
 [_id: [min: 170, max: 198], count: 14],
 [_id: [min: 198, max: 226], count: 14],
 [_id: [min: 226, max: 254], count: 14],
 [_id: [min: 254, max: 274], count: 11]]

Note the uniformity of bucket sizes except for the last bucket. For a more precise scheme of bucket definition, the BucketAutoOptions class exposes the opportunity to use a preferred number based scheme to determine those boundary values. As with BucketOptions, the output document shape can be defined using the output value on BucketAutoOptions. An example of these options is shown below:

bucketAuto("$price", 10, new BucketAutoOptions()
            .granularity(BucketGranularity.POWERSOF2)
            .output(sum("count", 1), avg("avgPrice", "$price")))

Facet

The $facet pipeline stage allows for the definition of a faceted search. The stage is defined with a set of names and nested aggregation pipelines which define each particular facet. For example, to return to the example of the television screen size search, the following $facet will return a document that groups televisions by size and manufacturer:

facet(
	new Facet("Screen Sizes",
		unwind("$attributes"),
		bucketAuto("$attributes.screen_size", 5, new BucketAutoOptions()
			.output(sum("count", 1)))),
	new Facet("Manufacturer",
		sortByCount("$attributes.manufacturer"),
		limit(5))
)

This stage returns a document that looks like this:

{
	"Manufacturer": [
		{"_id": "Vizio", "count": 17},
		{"_id": "Samsung", "count": 17},
		{"_id": "Sony", "count": 17}
	],
	"Screen Sizes": [
		{"_id": {"min": 35, "max": 45}, "count": 10},
		{"_id": {"min": 45, "max": 55}, "count": 10},
		{"_id": {"min": 55, "max": 65}, "count": 10},
		{"_id": {"min": 65, "max": 75}, "count": 10},
		{"_id": {"min": 75, "max": 85}, "count": 11}
	]
}

SetWindowFields

important

Support for $setWindowFields is in beta. Backwards-breaking changes may be made before the final release.

The $setWindowFields pipeline stage allows using window operators. This stage partitions the input documents similarly to the $group pipeline stage, optionally sorts them, computes fields in the documents by computing window functions over windows specified per function (a window is a subset of a partition), and outputs the documents. The important difference from the $group pipeline stage is that documents belonging to the same partition or window are not folded into a single document.

The driver includes the WindowedComputations class with factory methods for supported window operators.

This example computes the accumulated rainfall and the average temperature over the past month per each locality from more fine-grained measurements presented via the rainfall and temperature fields:

Window pastMonth = Windows.timeRange(-1, Windows.Bound.CURRENT, MongoTimeUnit.MONTH);
setWindowFields("$localityId", Sorts.ascending("measurementDateTime"),
        WindowedComputations.sum("monthlyRainfall", "$rainfall", pastMonth),
        WindowedComputations.avg("monthlyAvgTemp", "$temperature", pastMonth));

Creating a Pipeline

The above pipeline stages are typically combined into a list and passed to the aggregate method of a MongoCollection. For instance:

collection.aggregate(Arrays.asList(match(eq("author", "Dave")),
                                   group("$customerId", sum("totalQuantity", "$quantity"),
                                                        avg("averageQuantity", "$quantity"))
                                   out("authors")));