"distinct" and "mapReduce"
On this page
This document provides some practical, simple, examples to demonstrate the distinct
and mapReduce
commands.
Setup
First we'll write some code to insert sample data:
bool insert_data (mongoc_collection_t *collection) { mongoc_bulk_operation_t *bulk; enum N { ndocs = 4 }; bson_t *docs[ndocs]; bson_error_t error; int i = 0; bool ret; bulk = mongoc_collection_create_bulk_operation_with_opts (collection, NULL); docs[0] = BCON_NEW ("x", BCON_DOUBLE (1.0), "tags", "[", "dog", "cat", "]"); docs[1] = BCON_NEW ("x", BCON_DOUBLE (2.0), "tags", "[", "cat", "]"); docs[2] = BCON_NEW ("x", BCON_DOUBLE (2.0), "tags", "[", "mouse", "cat", "dog", "]"); docs[3] = BCON_NEW ("x", BCON_DOUBLE (3.0), "tags", "[", "]"); for (i = 0; i < ndocs; i++) { mongoc_bulk_operation_insert (bulk, docs[i]); bson_destroy (docs[i]); docs[i] = NULL; } ret = mongoc_bulk_operation_execute (bulk, NULL, &error); if (!ret) { fprintf (stderr, "Error inserting data: %s\n", error.message); } mongoc_bulk_operation_destroy (bulk); return ret; } /* A helper which we'll use a lot later on */ void print_res (const bson_t *reply) { char *str; BSON_ASSERT (reply); str = bson_as_canonical_extended_json (reply, NULL); printf ("%s\n", str); bson_free (str); }
"distinct" Command
This is how to use the distinct
command to get the distinct values of x
which are greater
than 1
:
bool distinct (mongoc_database_t *database) { bson_t *command; bson_t reply; bson_error_t error; bool res; bson_iter_t iter; bson_iter_t array_iter; double val; command = BCON_NEW ("distinct", BCON_UTF8 (COLLECTION_NAME), "key", BCON_UTF8 ("x"), "query", "{", "x", "{", "$gt", BCON_DOUBLE (1.0), "}", "}"); res = mongoc_database_command_simple (database, command, NULL, &reply, &error); if (!res) { fprintf (stderr, "Error with distinct: %s\n", error.message); goto cleanup; } /* Do something with reply (in this case iterate through the values) */ if (!(bson_iter_init_find (&iter, &reply, "values") && BSON_ITER_HOLDS_ARRAY (&iter) && bson_iter_recurse (&iter, &array_iter))) { fprintf (stderr, "Couldn't extract \"values\" field from response\n"); goto cleanup; } while (bson_iter_next (&array_iter)) { if (BSON_ITER_HOLDS_DOUBLE (&array_iter)) { val = bson_iter_double (&array_iter); printf ("Next double: %f\n", val); } } cleanup: /* cleanup */ bson_destroy (command); bson_destroy (&reply); return res; }
"mapReduce" - Basic Example
A simple example using the map reduce framework. It simply adds up the number of occurrences of each "tag".
First define the map
and reduce
functions:
const char *const COLLECTION_NAME = "things"; /* Our map function just emits a single (key, 1) pair for each tag in the array: */ const char *const MAPPER = "function () {" "this.tags.forEach(function(z) {" "emit(z, 1);" "});" "}"; /* The reduce function sums over all of the emitted values for a given key: */ const char *const REDUCER = "function (key, values) {" "var total = 0;" "for (var i = 0; i < values.length; i++) {" "total += values[i];" "}" "return total;" "}"; /* Note We can't just return values.length as the reduce function might be called iteratively on the results of other reduce steps. */
Run the mapReduce
command. Use the generic command helpers (e.g. mongoc_database_command_simple).
Do not the read command helpers (e.g. mongoc_database_read_command_with_opts)
because they are considered retryable read operations. If retryable reads are enabled, those operations will retry once on
a retryable error, giving undesirable behavior for mapReduce
.
bool map_reduce_basic (mongoc_database_t *database) { bson_t reply; bool res = false; bson_error_t error; mongoc_cursor_t *cursor = NULL; bool query_done = false; const char *out_collection_name = "outCollection"; mongoc_collection_t *out_collection = NULL; /* Empty find query */ bson_t find_query = BSON_INITIALIZER; /* Construct the mapReduce command */ /* Other arguments can also be specified here, like "query" or "limit" and so on */ bson_t *const command = BCON_NEW ("mapReduce", BCON_UTF8 (COLLECTION_NAME), "map", BCON_CODE (MAPPER), "reduce", BCON_CODE (REDUCER), "out", BCON_UTF8 (out_collection_name)); res = mongoc_database_command_simple (database, command, NULL, &reply, &error); if (!res) { fprintf (stderr, "MapReduce failed: %s\n", error.message); goto cleanup; } /* Do something with the reply (it doesn't contain the mapReduce results) */ print_res (&reply); /* Now we'll query outCollection to see what the results are */ out_collection = mongoc_database_get_collection (database, out_collection_name); cursor = mongoc_collection_find_with_opts (out_collection, &find_query, NULL, NULL); query_done = true; /* Do something with the results */ const bson_t *doc = NULL; while (mongoc_cursor_next (cursor, &doc)) { print_res (doc); } if (mongoc_cursor_error (cursor, &error)) { fprintf (stderr, "ERROR: %s\n", error.message); res = false; goto cleanup; } cleanup: /* cleanup */ if (query_done) { mongoc_cursor_destroy (cursor); mongoc_collection_destroy (out_collection); } bson_destroy (&reply); bson_destroy (command); return res; }
"mapReduce" - More Complicated Example
You must have replica set running for this.
In this example we contact a secondary in the replica set and do an "inline" map reduce, so the results are returned immediately:
bool map_reduce_advanced (mongoc_database_t *database) { bson_t *command; bson_error_t error; bool res = true; mongoc_cursor_t *cursor; mongoc_read_prefs_t *read_pref; const bson_t *doc; /* Construct the mapReduce command */ /* Other arguments can also be specified here, like "query" or "limit" and so on */ /* Read the results inline from a secondary replica */ command = BCON_NEW ("mapReduce", BCON_UTF8 (COLLECTION_NAME), "map", BCON_CODE (MAPPER), "reduce", BCON_CODE (REDUCER), "out", "{", "inline", "1", "}"); read_pref = mongoc_read_prefs_new (MONGOC_READ_SECONDARY); cursor = mongoc_database_command (database, MONGOC_QUERY_NONE, 0, 0, 0, command, NULL, read_pref); /* Do something with the results */ while (mongoc_cursor_next (cursor, &doc)) { print_res (doc); } if (mongoc_cursor_error (cursor, &error)) { fprintf (stderr, "ERROR: %s\n", error.message); res = false; } mongoc_cursor_destroy (cursor); mongoc_read_prefs_destroy (read_pref); bson_destroy (command); return res; }
Running the Examples
Here's how to run the example code
/* * Copyright 2009-present MongoDB, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ int main (int argc, char *argv[]) { mongoc_database_t *database = NULL; mongoc_client_t *client = NULL; mongoc_collection_t *collection = NULL; mongoc_uri_t *uri = NULL; bson_error_t error; char *host_and_port = NULL; int exit_code = EXIT_FAILURE; if (argc != 2) { fprintf (stderr, "usage: %s CONNECTION-STRING\n", argv[0]); fprintf (stderr, "the connection string can be of the following forms:\n"); fprintf (stderr, "localhost\t\t\t\tlocal machine\n"); fprintf (stderr, "localhost:27018\t\t\t\tlocal machine on port 27018\n"); fprintf (stderr, "mongodb://user:pass@localhost:27017\t" "local machine on port 27017, and authenticate with username " "user and password pass\n"); return exit_code; } mongoc_init (); if (strncmp (argv[1], "mongodb://", 10) == 0) { host_and_port = bson_strdup (argv[1]); } else { host_and_port = bson_strdup_printf ("mongodb://%s", argv[1]); } uri = mongoc_uri_new_with_error (host_and_port, &error); if (!uri) { fprintf (stderr, "failed to parse URI: %s\n" "error message: %s\n", host_and_port, error.message); goto cleanup; } client = mongoc_client_new_from_uri (uri); if (!client) { goto cleanup; } mongoc_client_set_error_api (client, 2); database = mongoc_client_get_database (client, "test"); collection = mongoc_database_get_collection (database, COLLECTION_NAME); printf ("Inserting data\n"); if (!insert_data (collection)) { goto cleanup; } printf ("distinct\n"); if (!distinct (database)) { goto cleanup; } printf ("map reduce\n"); if (!map_reduce_basic (database)) { goto cleanup; } printf ("more complicated map reduce\n"); if (!map_reduce_advanced (database)) { goto cleanup; } exit_code = EXIT_SUCCESS; cleanup: if (collection) { mongoc_collection_destroy (collection); } if (database) { mongoc_database_destroy (database); } if (client) { mongoc_client_destroy (client); } if (uri) { mongoc_uri_destroy (uri); } if (host_and_port) { bson_free (host_and_port); } mongoc_cleanup (); return exit_code; }
If you want to try the advanced map reduce example with a secondary, start a replica set (instructions for how to do this can be found here).
Otherwise, just start an instance of MongoDB:
$ mongod
Now compile and run the example program:
$ cd examples/basic_aggregation/ $ gcc -Wall -o agg-example basic-aggregation.c $(pkg-config --cflags --libs libmongoc-1.0) $ ./agg-example localhost Inserting data distinct Next double: 2.000000 Next double: 3.000000 map reduce { "result" : "outCollection", "timeMillis" : 155, "counts" : { "input" : 84, "emit" : 126, "reduce" : 3, "output" : 3 }, "ok" : 1 } { "_id" : "cat", "value" : 63 } { "_id" : "dog", "value" : 42 } { "_id" : "mouse", "value" : 21 } more complicated map reduce { "results" : [ { "_id" : "cat", "value" : 63 }, { "_id" : "dog", "value" : 42 }, { "_id" : "mouse", "value" : 21 } ], "timeMillis" : 14, "counts" : { "input" : 84, "emit" : 126, "reduce" : 3, "output" : 3 }, "ok" : 1 }