O'Reilly logo

Scaling MongoDB by Kristina Chodorow

Stay ahead with the world's most comprehensive technology and business learning platform.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, tutorials, and more.

Start Free Trial

No credit card required

Chapter 4. Working With a Cluster

Querying a MongoDB cluster is usually identical to querying a single mongod. However, there are some exceptions that are worth knowing about.


If you are using replica sets as shards and a mongos version 1.7.4 or more recent, you can distribute reads to slaves in a cluster. This can be handy for handling read load, although the usual caveats on querying slaves apply: you must be willing to get older data.

To query a slave through mongos, you must set the “slave okay” option (basically checking off that you’re okay with getting possibly out-of-date data) with whatever driver you’re using. In the shell, this looks like:

> db.getMongo().setSlaveOk()

Then query the mongos normally.

“Why Am I Getting This?”

When you work with a cluster, you lose the ability to examine an entire collection as a single “snapshot in time.” Many people don’t realize the ramifications of this until it hits them in the nose, so we’ll go over some of the common ways it can affect applications.


When you do a count on a sharded collection, you may not get the results you expect. You may get quite a few more documents than actually exist.

The way a count works is the mongos forwards the count command to every shard in the cluster. Then, each shard does a count and sends its results back to the mongos, which totals them up and sends them to the user. If there is a migration occurring, many documents can be present (and thus counted) on more than one shard.

When MongoDB migrates a chunk, it starts copying it from one shard to another. It still routes all reads and writes to that chunk to the old shard, but it is gradually being populated on the other shard. Once the chunk has finished “moving,” it actually exists on both shards. As the final step, MongoDB updates the config servers and deletes the copy of the data from the original shard (see Figure 4-1).

A chunk is migrated by copying it to the new shard, then deleting it from the shard it came from

Figure 4-1. A chunk is migrated by copying it to the new shard, then deleting it from the shard it came from

Thus, when data is counted, it ends up getting counted twice. MongoDB may hack around this in the future, but for now, keep in mind that counts may overshoot the actual number of documents.

Unique Indexes

Suppose we were sharding on email and wanted to have a unique index on username. This is not possible to enforce with a cluster.

Let’s say we have two application servers processing users. One application server adds a new user document with the following fields:

    "_id" : ObjectId("4d2a2e9f74de15b8306fe7d0"),
    "username" : "andrew",
    "email" : "awesome.guy@example.com"

The only way to check that “andrew” is the only “andrew” in the cluster is to go through every username entry on every machine. Let’s say MongoDB goes through all the shards and no one else has an “andrew” username, so it’s just about to write the document on Shard 3 when the second appserver sends this document to be inserted:

    "_id" : ObjectId("4d2a2f7c56d1bb09196fe7d0"),
    "username" : "andrew",
    "email" : "cool.guy@example.com"

Once again, every shard checks that it has no users with username “andrew”. They still don’t because the first document hasn’t been written yet, so Shard 1 goes ahead and writes this document. Then Shard 3 finally gets around to writing the first document. Now there are two people with the same username!

The only way to guarantee no duplicates between shards in the general case is to lock down the entire cluster every time you do a write until the write has been confirmed successful. This is not performant for a system with a decent rate of writes.

Therefore, you cannot guarantee uniqueness on any key other than the shard key. You can guarantee uniqueness on the shard key because a given document can only go to one chunk, so it only has to be unique on that one shard, and it’ll be guaranteed unique in the whole cluster. You can also have a unique index that is prefixed by the shard key. For example, if we sharded the users collection on username, as above, but with the unique option, we could create a unique index on {username : 1, email : 1}.

One interesting consequence of this is that, unless you’re sharding on _id, you can create non-unique _ids. This isn’t recommended (and it can get you into trouble if chunks move), but it is possible.


Updates, by default, only update a single record. This means that they run into the same problem unique indexes do: there’s no good way of guaranteeing that something happens once across multiple shards. If you’re doing a single-document update, it must use the shard key in the criteria (update’s first argument). If you do not, you’ll get an error.

> db.adminCommand({shardCollection : "test.x", key : {"y" : 1}})
{ "shardedCollection" : "test.x", "ok" : 1 }
> // works okay
> db.x.update({y : 1}, {$set : {z : 2}}, true)
> // error
> db.x.update({z : 2}, {$set : {w : 4}})
can't do non-multi update with query that doesn't have the shard key

You can do a multiupdate using any criteria you want.

> db.x.update({z : 2}, {$set : {w : 4}}, false, true)
> // no error

If you run across an odd error message, consider whether the operation you’re trying to perform would have to atomically look at the entire cluster. Such operations are not allowed.


When you run a MapReduce on a cluster, each shard performs its own map and reduce. mongos chooses a “leader” shard and sends all the reduced data from the other shards to that one for a final reduce. Once the data is reduced to its final form, it will be output in whatever method you’ve specified.

As sharding splits the job across multiple machines, it can perform MapReduces faster than a single server. However, it still isn’t meant for real-time calculations.

Temporary Collections

In 1.6, MapReduce created temporary collections unless you specified the “out” option. These temporary collections were dropped when the connection that created them was closed. This worked well on a single server, but mongos keeps its own connection pools and never closes connections to shards. Thus, temporary collections were never cleaned up (because the connection that created them never closed), and they would just hang around forever, growing more and more numerous.

If you’re running 1.6 and doing MapReduces, you’ll have to manually clean up your temporary collections. You can run the following function to delete all of the temporary collections in a given database:

var dropTempCollections = function(dbName) {
    var target = db.getSisterDB(dbName);
    var names = target.getCollectionNames();

    for (var i = 0; i < names.length; i++) {
        if (names[i].match(/tmp\.mr\./)){

In later versions, MapReduce forces you to choose to do something with your output. See the documentation for details.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, interactive tutorials, and more.

Start Free Trial

No credit card required