- Aggregation >
- Map-Reduce >
- Perform Incremental Map-Reduce
Perform Incremental Map-Reduce¶
On this page
Aggregation Pipeline as Alternative
Aggregation pipeline provides better performance and a more coherent interface than map-reduce.
Various map-reduce operations can be rewritten using
aggregation pipeline operators, such as $group
,
$merge
, etc. For map-reduce operations that require
custom functionality, MongoDB provides the $accumulator
and
$function
aggregation operators starting in version
4.4.
The example below includes aggregation pipeline alternative without requiring custom function.
For examples using the custom aggregation function, see Map-Reduce to Aggregation Pipeline.
To perform map-reduce operations, MongoDB provides the
mapReduce
command and, in the mongo
shell,
the db.collection.mapReduce()
wrapper method.
If the map-reduce data set is constantly growing, you may want to perform an incremental map-reduce rather than performing the map-reduce operation over the entire data set each time.
To perform incremental map-reduce:
- Run a map-reduce job over the current collection and output the result to a separate collection.
- When you have more data to process, run subsequent map-reduce job
with:
- the
query
parameter that specifies conditions that match only the new documents. - the
out
parameter that specifies thereduce
action to merge the new results into the existing output collection.
- the
Consider the following example where you schedule a map-reduce
operation on a usersessions
collection to run at the end of each day.
Data Setup¶
The usersessions
collection contains documents that log users’ sessions
each day, for example:
Initial Map-Reduce of Current Collection¶
Run the first map-reduce operation as follows:
Define the map function that maps the
userid
to an object that contains the fieldstotal_time
,count
, andavg_time
:Define the corresponding reduce function with two arguments
key
andvalues
to calculate the total time and the count. Thekey
corresponds to theuserid
, and thevalues
is an array whose elements corresponds to the individual objects mapped to theuserid
in themapFunction
.Define the finalize function with two arguments
key
andreducedValue
. The function modifies thereducedValue
document to add another fieldaverage
and returns the modified document.Perform map-reduce on the
usersessions
collection using themapFunction
, thereduceFunction
, and thefinalizeFunction
functions. Output the results to a collectionsession_stats
. If thesession_stats
collection already exists, the operation will replace the contents:Query the
session_stats
collection to verify the results:The operation returns the following document:
Subsequent Incremental Map-Reduce¶
Later, as the usersessions
collection grows, you can run additional
map-reduce operations. For example, add new documents to the
usersessions
collection:
At the end of the day, perform incremental map-reduce on the
usersessions
collection, but use the query
field to select only the
new documents. Output the results to the collection session_stats
,
but reduce
the contents with the results of the incremental
map-reduce:
Query the session_stats
collection to verify the results:
The operation returns the following document:
Aggregation Alternative¶
Prereq: Set up the collection to its original state:
Using the available aggregation pipeline operators, you can rewrite the map-reduce example without defining custom functions:
The
$group
groups by theuserid
and calculates:- The
total_time
using the$sum
operator - The
count
using the$sum
operator - The
avg_time
using the$avg
operator
The operation returns the following documents:
- The
The
$project
stage reshapes the output document to mirror the map-reduce’s output to have two fields_id
andvalue
. The stage is optional if you do not need to mirror the_id
andvalue
structure.The
$merge
stage outputs the results to asession_stats_agg
collection. If an existing document has the same_id
as the new result, the operation applies the specified pipeline to calculate the total_time, count, and avg_time from the result and the existing document. If there is no existing document with the same_id
in thesession_stats_agg
, the operation inserts the document.Query the
session_stats_agg
collection to verify the results:The operation returns the following document:
Add new documents to the
usersessions
collection:Add a
$match
stage at the start of the pipeline to specify the date filter:Query the
session_stats_agg
collection to verify the results:The operation returns the following document:
Optional. To avoid having to modify the aggregation pipeline’s
$match
date condition each time you run, you can define wrap the aggregation in a helper function:Then, to run, you would just pass in the start date to the
updateSessionStats()
function: