2 / 2
Sep 2024

Hi all,
I’m working on a script that creates a new database (this is no problem) and a 2 new users with a specific user role. I’ve did a lot of tries in both python as node-js but I always get an authorisation error. Here I continue with the python version but they are basically the same.
In my project access manager, I created an API key with project permissions set to Project Owner and I access the database with MongoDB Roles set to atlasAdmin @admin but still I get this reply from the script:

Response status code: 401
Response content: b’{\n “error” : 401,\n “reason” : “Unauthorized”,\n “detail” : “You are not authorized for this resource.”\n}’

Does anybody has a clue or an working example? Here is my code:

import sys
import json
import random
import string
import requests
from pymongo import MongoClient
from pymongo.errors import OperationFailure
from config import Config
from bson import ObjectId

class MongoDBAtlasManager:
def init(self, public_key: str, private_key: str, project_id: str):
self.public_key = public_key
self.private_key = private_key
self.project_id = project_id
self.base_url = f"https://cloud.mongodb.com/api/atlas/v1.0/groups/{project_id}"

def create_database(self, customer_id: str):
    # MongoDB Atlas does not have a direct API to create a database.
    # The database is created when the first collection is created.
    print(f"Creating collections for customer '{customer_id}'")
    uri = Config.MONGO_URI
    client = MongoClient(uri)
    db = client[customer_id]
    db.create_collection("consumers")
    db.create_collection("users")
    db.create_collection("channels")
    print(f"Collections 'consumers', 'users', and 'channels' created for database '{customer_id}'")

def create_role(self, role_name: str, privileges: list):
    url = f"{self.base_url}/customDBRoles/admin"
    headers = {
        "Content-Type": "application/json"
    }
    auth = (self.public_key, self.private_key)
    payload = {
        "roleName": role_name,
        "actions": privileges
    }
    print(f"Creating role with URL: {url}")
    print(f"Payload: {json.dumps(payload)}")
    response = requests.post(url, headers=headers, auth=auth, data=json.dumps(payload))
    print(f"Response status code: {response.status_code}")
    print(f"Response content: {response.content}")
    if response.status_code == 201:
        print(f"Role '{role_name}' created successfully.")
    else:
        print(f"Failed to create role: {response.json()}")

def create_user(self, username: str, password: str, roles: list):
    url = f"{self.base_url}/databaseUsers"
    headers = {
        "Content-Type": "application/json"
    }
    auth = (self.public_key, self.private_key)
    payload = {
        "databaseName": "admin",
        "username": username,
        "password": password,
        "roles": roles
    }
    print(f"Creating user with URL: {url}")
    print(f"Payload: {json.dumps(payload)}")
    response = requests.post(url, headers=headers, auth=auth, data=json.dumps(payload))
    print(f"Response status code: {response.status_code}")
    print(f"Response content: {response.content}")
    if response.status_code == 201:
        print(f"User '{username}' created successfully.")
    else:
        print(f"Failed to create user: {response.json()}")

class MongoDBManager:
def init(self, uri: str, telemetry_db_name: str):
self.client = MongoClient(uri)
self.telemetry_db = self.client[telemetry_db_name]
self.customers_collection = self.telemetry_db[‘customers’]
self.atlas_manager = MongoDBAtlasManager(Config.PUBLIC_KEY, Config.PRIVATE_KEY, Config.PROJECT_ID)

def generate_password(self, length=12):
    characters = string.ascii_letters + string.digits + string.punctuation
    return ''.join(random.choice(characters) for i in range(length))

def add_customer(self, customer_id: str):
    try:
        customer_id_obj = ObjectId(customer_id)
    except:
        customer_id_obj = customer_id

    if self.customers_collection.find_one({"_id": customer_id_obj}):
        if customer_id not in self.client.list_database_names():
            self.atlas_manager.create_database(customer_id)

            customer_role_password = self.generate_password()
            consumer_role_password = self.generate_password()

            self.atlas_manager.create_user(f"{customer_id}_customer_user", customer_role_password, [])
            self.atlas_manager.create_user(f"{customer_id}_consumer_user", consumer_role_password, [])

            customer_role_privileges = [
                {"action": "INSERT", "resources": [{"db": customer_id, "collection": ""}]},
                {"action": "FIND", "resources": [{"db": customer_id, "collection": ""}]},
                {"action": "UPDATE", "resources": [{"db": customer_id, "collection": ""}]},
                {"action": "REMOVE", "resources": [{"db": customer_id, "collection": ""}]},
                {"action": "FIND", "resources": [{"db": Config.MONGO_RO_DATABASE_NAME, "collection": ""}]}
            ]
            consumer_role_privileges = [
                {"action": "FIND", "resources": [{"db": customer_id, "collection": ""}]},
                {"action": "FIND", "resources": [{"db": Config.MONGO_RO_DATABASE_NAME, "collection": ""}]}
            ]

            self.atlas_manager.create_role(f"{customer_id}_customer_role", customer_role_privileges)
            self.atlas_manager.create_role(f"{customer_id}_consumer_role", consumer_role_privileges)

            self.atlas_manager.create_user(f"{customer_id}_customer_user", customer_role_password,
                                           [{"roleName": f"{customer_id}_customer_role", "databaseName": "admin"}])
            self.atlas_manager.create_user(f"{customer_id}_consumer_user", consumer_role_password,
                                           [{"roleName": f"{customer_id}_consumer_role", "databaseName": "admin"}])

            self.customers_collection.update_one(
                {"_id": customer_id_obj},
                {"$set": {"customer_role_password": customer_role_password,
                          "consumer_role_password": consumer_role_password}}
            )
            print(f"Customer '{customer_id}' added successfully.")
        else:
            print(f"Database for customer '{customer_id}' already exists.")
    else:
        print(f"Customer with ID '{customer_id}' not found in the 'customers' collection.")
'''
def add_customer(self, customer_id: str):
    try:
        customer_id_obj = ObjectId(customer_id)
    except:
        customer_id_obj = customer_id

    if self.customers_collection.find_one({"_id": customer_id_obj}):
        if customer_id not in self.client.list_database_names():
            self.atlas_manager.create_database(customer_id)

            customer_role_password = self.generate_password()
            consumer_role_password = self.generate_password()

            customer_role_privileges = [
                {"action": "INSERT", "resources": [{"db": customer_id, "collection": ""}]},
                {"action": "FIND", "resources": [{"db": customer_id, "collection": ""}]},
                {"action": "UPDATE", "resources": [{"db": customer_id, "collection": ""}]},
                {"action": "REMOVE", "resources": [{"db": customer_id, "collection": ""}]},
                {"action": "FIND", "resources": [{"db": Config.MONGO_RO_DATABASE_NAME, "collection": ""}]}
            ]
            consumer_role_privileges = [
                {"action": "FIND", "resources": [{"db": customer_id, "collection": ""}]},
                {"action": "FIND", "resources": [{"db": Config.MONGO_RO_DATABASE_NAME, "collection": ""}]}
            ]

            self.atlas_manager.create_role(f"{customer_id}_customer_role", customer_role_privileges)
            self.atlas_manager.create_role(f"{customer_id}_consumer_role", consumer_role_privileges)

            self.atlas_manager.create_user(f"{customer_id}_customer_user", customer_role_password, [{"roleName": f"{customer_id}_customer_role", "databaseName": "admin"}])
            self.atlas_manager.create_user(f"{customer_id}_consumer_user", consumer_role_password, [{"roleName": f"{customer_id}_consumer_role", "databaseName": "admin"}])

            self.customers_collection.update_one(
                {"_id": customer_id_obj},
                {"$set": {"customer_role_password": customer_role_password, "consumer_role_password": consumer_role_password}}
            )
            print(f"Customer '{customer_id}' added successfully.")
        else:
            print(f"Database for customer '{customer_id}' already exists.")
    else:
        print(f"Customer with ID '{customer_id}' not found in the 'customers' collection.")
'''
def remove_customer(self, customer_id: str):
    try:
        customer_id_obj = ObjectId(customer_id)
    except:
        customer_id_obj = customer_id

    if self.customers_collection.find_one({"_id": customer_id_obj}):
        if customer_id in self.client.list_database_names():
            self.client.drop_database(customer_id)
            print(f"Database '{customer_id}' removed.")
        else:
            print(f"Database for customer '{customer_id}' does not exist.")
    else:
        print(f"Customer with ID '{customer_id}' not found in the 'customers' collection.")

def main():
if len(sys.argv) != 3:
print(“Usage: python manage_customers.py <add|remove> <customer_id>”)
sys.exit(1)

action = sys.argv[1]
customer_id = sys.argv[2]

manager = MongoDBManager(Config.MONGO_URI, Config.MONGO_RO_DATABASE_NAME)

if action == "add":
    manager.add_customer(customer_id)
elif action == "remove":
    manager.remove_customer(customer_id)
else:
    print("Invalid action. Use 'add' or 'remove'.")
    sys.exit(1)

if name == “main”:
main()