2 / 5
Jul 2024

Hello,

I am using the mongo-c-driver version 1.25.1 I have a process running inside a docker container which is using this driver.

I am attempting to sync all collections from one database to another. I dont have access to mongodump and mongorestore so i have to do it manually.

Here is how i am doing this :

bson_t filter = BSON_INITIALIZER; mongoc_cursor_t *cursor = mongoc_collection_find_with_opts(collectionRemote, &filter, NULL, NULL); bson_destroy(&filter); const bson_t *doc; if (sync_individual_documents) { mongoc_cursor_set_batch_size(cursor,(uint32_t)1); while(mongoc_cursor_next(cursor, &doc)) { //do something } } else { mongoc_cursor_set_batch_size(cursor,(uint32_t)1000); while(mongoc_cursor_next(cursor, &doc) { //do something } } mongoc_cursor_destroy(cursor); return;

I have verified using mongoc-stat , Valgrind, and ASAN that no memory leaks are appearing in my code.

However, when executing this mongoc_cursor_next(), the memory in my program rises considerably. This is expected as i am reading in potentially hundresds of thousands of documents. The memory drops at the end after the cursor_destroy(), but not to what it was before.

Each time this code is executed, the memory of my program has remained higher each time. Eventually after many executions i run out of memory.

This code is executed for multiple collections with individual document sizes ranging from a couple hundred Bytes, to 1MB or so per document.

Any ideas on how to fix this issue?

here is a more detailed version of the code

function (mongoc_collection_t *pCollectionRemote, mongoc_collection_t *pCollectionLocal) { bson_error_t errorLocal; mongoc_bulk_operation_t *bulk = NULL; mongoc_cursor_t *cursor = NULL; mongoc_cursor_t *indexCursor = NULL; //add unique indexes int numIndexKeys = 0; const char **indexKeyList = NULL; indexCursor = mongoc_collection_find_indexes_with_opts(pCollectionRemote, NULL); const bson_t *indexDoc; mongoc_cursor_set_batch_size (indexCursor, (uint32_t)1); while(mongoc_cursor_next(indexCursor, &indexDoc)) { //if the indexName is for _id_ - its the automatically created one - no need to copy it over bson_iter_t iter; const char *indexName; bool bAddNewIndex = false; bool bIsUniqueIndex = false; if (bson_iter_init (&iter, indexDoc)) { while (bson_iter_next (&iter)) { const char *key = bson_iter_key(&iter); if (!bson_strcasecmp(key, "unique") && BSON_ITER_HOLDS_BOOL(&iter)) { bIsUniqueIndex = true; } else if (!bson_strcasecmp(key, "key") && BSON_ITER_HOLDS_DOCUMENT(&iter)) { numIndexKeys = 0; bson_iter_t keyIterFindKeys; if (bson_iter_recurse(&iter, &keyIterFindKeys)) { while (bson_iter_next(&keyIterFindKeys)) { numIndexKeys++; } } indexKeyList = calloc(numIndexKeys, sizeof(char *)); bson_iter_t keyIter; if (bson_iter_recurse(&iter, &keyIter)) { int loopCount = 0; while (bson_iter_next(&keyIter)) { indexKeyList[loopCount] = bson_iter_key(&keyIter);; loopCount++; } } } else if (!bson_strcasecmp(key, "name") && BSON_ITER_HOLDS_UTF8(&iter)) { indexName = bson_iter_utf8(&iter, NULL); if (bson_strcasecmp(indexName, "_id_") != 0) { bAddNewIndex = true; } } } } if (bAddNewIndex) { bson_t index_keys = BSON_INITIALIZER; bson_t *index_opts = NULL; for (int i=0;i<numIndexKeys;i++) { BSON_APPEND_INT32 (&index_keys, indexKeyList[i], 1); } if (bIsUniqueIndex) { index_opts = BCON_NEW("unique", BCON_BOOL (true), "name", BCON_UTF8(indexName)); } else { index_opts = BCON_NEW("name", BCON_UTF8(indexName)); } mongoc_index_model_t *im = mongoc_index_model_new (&index_keys, index_opts ); if (mongoc_collection_create_indexes_with_opts (pCollectionLocal, &im,1, NULL , NULL , &errorLocal)) { CNCORE_LOG(PACKAGE_NAME,LOG_DEBUG, NULL, "Created new index"); } else { CNCORE_LOG(PACKAGE_NAME,LOG_ERR, NULL, "Failed to create new index due to %s", errorLocal.message); mongoc_index_model_destroy(im); bson_destroy (&index_keys); bson_destroy (index_opts); if (indexKeyList) { free(indexKeyList); indexKeyList = NULL; } goto cleanup; } mongoc_index_model_destroy(im); bson_destroy (&index_keys); //chris comment bson_destroy (index_opts); } //free(indexName); if (indexKeyList) { free(indexKeyList); indexKeyList = NULL; } } if (indexCursor) { mongoc_cursor_destroy (indexCursor); indexCursor = NULL; } const bson_t *doc; bson_t filter = BSON_INITIALIZER; cursor = mongoc_collection_find_with_opts (pCollectionRemote, &filter, NULL , NULL); mongoc_cursor_set_batch_size (cursor, custom_batch_size); bson_destroy(&filter); bool ret; bson_error_t retError; if (custom_batch_size == 1) { while (mongoc_cursor_next (cursor, &doc)) { ret = mongoc_collection_insert_one(pCollectionLocal,doc,NULL,NULL,&retError); if (!ret) { CNCORE_LOG(PACKAGE_NAME,LOG_ERR, NULL, "Individual write error occured"); goto cleanup; } } } //perform bulk writes else { bulk = mongoc_collection_create_bulk_operation_with_opts (pCollectionLocal, NULL); uint32_t numDocumentsProcessed = 0; while (mongoc_cursor_next (cursor, &doc)) { //int64_t currentBulkLatencyWrite = 0; //struct timeval stop, start; numDocumentsProcessed++; mongoc_bulk_operation_insert(bulk,doc); //bulk write in batches of 1000 if (numDocumentsProcessed == custom_batch_size) { numDocumentsProcessed = 0; ret = mongoc_bulk_operation_execute (bulk, &reply, &retError); bson_destroy (&reply); if (bulk) { mongoc_bulk_operation_destroy (bulk); bulk = NULL; bulk = mongoc_collection_create_bulk_operation_with_opts (pCollectionLocal,NULL); } if (!ret) { CNCORE_LOG(PACKAGE_NAME,LOG_ERR, NULL, "Bulk Write Error Occured: %s", retError.message); goto cleanup; } } } if (numDocumentsProcessed > 0) { ret = mongoc_bulk_operation_execute (bulk, NULL, &retError); if (!ret) { CNCORE_LOG(PACKAGE_NAME,LOG_ERR, NULL, "Bulk Write Error Occured: %s", retError.message); goto cleanup; } } } cleanup: mongoc_bulk_operation_destroy (bulk); bulk = NULL; mongoc_cursor_destroy(cursor); cursor = NULL; mongoc_cursor_destroy (indexCursor); return status; } //function returns and collection_destroy() is called on collectionLocal and collectionRemote //this funciton is called for every collection //collections are of various document and collection sizes.

@Rishabh_Bisht any suggestions?

valgrind doesnt show any leaks. the issue only happens during the reads from mongoc_cursor_next(). I have investigated all other areas of code and they are not causing this issue.

if i comment out all my index logic, and all my writing of documents, there are no problems.

i have been tracking the ‘RES’ memory of my process via the top command. This is what i see increasing when i execute this code.

this code is in a for loop called for multiple collections, some of them are small collections, some of them are large. Some of these collections have very large documents, some are very small.

Nothing seems out of the order on a quick glance. Running that example locally with ASAN also does not report leaks.
Do you have a sample collection that can be used to reproduce the issue?