Skip to content

Mongodb source

MongoDBSource

Bases: DataSource

Source code in blue/data/sources/mongodb_source.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
class MongoDBSource(DataSource):
    def __init__(self, name, properties={}):
        super().__init__(name, properties=properties)
        self._schema_cache = {}

    ###### connection
    def _initialize_connection_properties(self):
        super()._initialize_connection_properties()

        # set host, port, protocol
        self.properties['connection']['host'] = 'localhost'
        self.properties['connection']['port'] = 27017
        self.properties['connection']['protocol'] = 'mongodb'

    def _connect(self, **connection):
        host = connection['host']
        port = connection['port']

        connection_url = self.properties['connection']['protocol'] + "://" + host + ":" + str(port)
        return MongoClient(connection_url)

    def _disconnect(self):
        # TODO:
        return None

    ######### source
    def fetch_metadata(self):
        """
        Fetch metadata for the MongoDB source.

        Returns:
            dict: General metadata about the MongoDB source.
                Currently returns an empty dictionary.
        """
        return {}

    def fetch_schema(self):
        """
        Fetch schema for the MongoDB source.

        Returns:
            dict: Schema information for the entire MongoDB source.
                Currently returns an empty dictionary.
        """
        return {}

    ######### database
    def fetch_databases(self):
        """
        List all databases in the MongoDB source.

        Returns:
            list[str]: Names of all available databases.
        """
        dbs = self.connection.list_database_names()
        # Exclude MongoDB internal databases
        system_dbs = {"admin", "config", "local"}
        user_dbs = [db for db in dbs if db not in system_dbs]
        return user_dbs

    def fetch_database_metadata(self, database):
        """
        Fetch metadata for a specific database.

        Parameters:
            database (str): Database name.

        Returns:
            dict: Database-level metadata (default empty).
        """
        return {}

    def fetch_database_schema(self, database):
        """
        Fetch schema information for a specific database.

        Parameters:
            database (str): Database name.

        Returns:
            dict: Schema definition for all collections in the database.
                Currently returns an empty dictionary.
        """
        return {}

    ######### database/collection
    def fetch_database_collections(self, database):
        """
        List all collections within a database.

        Parameters:
            database (str): Database name.

        Returns:
            list[str]: Names of collections.
        """
        collections = []
        collections.append("public")
        return collections

    def fetch_database_collection_metadata(self, database, collection):
        """
        Fetch metadata for a specific collection within a database.

        Parameters:
            database (str): Name of the database.
            collection (str): Name of the collection.

        Returns:
            dict: Metadata for the collection (default empty).
        """
        return {}

    def _get_collection_schema(self, database, collection):
        """
        Internal helper: return cached schema object for a collection.
        """
        cache_key = (database, collection)
        if cache_key in self._schema_cache:
            return self._schema_cache[cache_key]

        coll = self.connection[database][collection]
        sample = coll.find_one()

        schema = self.extract_schema(sample)

        self._schema_cache[cache_key] = schema
        return schema

    def fetch_database_collection_entities(self, database, collection, sample_limit=10):
        """
        Fetch entities (document structures) for a collection.

        Parameters:
            database (str): Database name.
            collection (str): Collection name.

        Returns:
            dict: A dictionary mapping entity names to their inferred schema, where each entry contains:
            - properties (dict): Metadata such as sample count.
            - contents (dict): Attributes inferred from sample documents with their types.
        """
        db = self.connection[database]
        collection_names = db.list_collection_names()

        schema = {}

        for coll_name in collection_names:
            coll = db[coll_name]
            sample_docs = list(coll.find().limit(sample_limit))

            attributes = {}

            # Infer attributes from documents
            for doc in sample_docs:
                for key, value in doc.items():
                    if key not in attributes:
                        attr_type = type(value).__name__
                        attributes[key] = {"type": attr_type}
                    else:
                        pass

            # Add entity metadata
            schema[coll_name] = {
                "properties": {"sample_count": len(sample_docs)},
                "contents": {"attributes": attributes},
            }

        return schema

    def fetch_database_collection_relations(self, database, collection):
        """
        Fetch relations (document nesting relationships) for a collection.

        Parameters:
            database (str): Database name.
            collection (str): Collection name.

        Returns:
            list[str]: Relations between entities.
        """
        schema = self._get_collection_schema(database, collection)
        return schema.get_relations()

    def extract_schema(self, sample, schema=None, source=None):
        """
        Recursively infer schema structure from a sample MongoDB document.

        Parameters:
            sample (dict): Sample document for inference.
            schema (DataSchema, optional): Existing schema object to update.
            source (str, optional): Current entity source node.

        Returns:
            DataSchema: Inferred or updated schema object.
        """
        if schema is None:
            schema = DataSchema()

        if source == None:
            source = schema.add_entity("ROOT")

        if type(sample) == dict:
            for key in sample:
                value = sample[key]
                if type(value) == list:
                    target = schema.add_entity(key)
                    # (1)-->(M)
                    schema.add_relation(source, source + ":" + target, target)
                    if len(value) > 0:
                        self.extract_schema(value[0], schema=schema, source=target)
                elif type(value) == dict:
                    target = schema.add_entity(key)
                    # (1)-->(1)
                    schema.add_relation(source, source + ":" + target, target)
                    self.extract_schema(value, schema=schema, source=target)
                else:
                    schema.add_entity_property(source, key, value.__class__.__name__)

        return schema

    ######### execute query
    def execute_query(self, query, database=None, collection=None, optional_properties={}):
        """
        Execute a MongoDB query on a specific collection.

        Parameters:
            query (str): JSON-formatted MongoDB query string.
            database (str, optional): Target database name.
            collection (str, optional): Target collection name.
            optional_properties (dict, optional): Extra query parameters.

        Returns:
            list[dict]: List of documents matching the query, with `_id` as string.

        Raises:
            Exception: If `database` or `collection` is not provided.
        """
        if database is None:
            raise Exception("No database provided")

        if collection is None:
            raise Exception("No collection provided")

        db = self.connection[database]
        col = db[collection]

        q = json.loads(query)
        result = col.find(q)

        # Convert cursor to a list of dictionaries and handle ObjectId safely
        result_list = []
        for doc in result:
            if '_id' in doc:  # Check if '_id' exists
                doc['_id'] = str(doc['_id'])  # Convert ObjectId to string
            result_list.append(doc)

        return result_list

execute_query(query, database=None, collection=None, optional_properties={})

Execute a MongoDB query on a specific collection.

Parameters:

Name Type Description Default
query str

JSON-formatted MongoDB query string.

required
database str

Target database name.

None
collection str

Target collection name.

None
optional_properties dict

Extra query parameters.

{}

Returns:

Type Description

list[dict]: List of documents matching the query, with _id as string.

Raises:

Type Description
Exception

If database or collection is not provided.

Source code in blue/data/sources/mongodb_source.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def execute_query(self, query, database=None, collection=None, optional_properties={}):
    """
    Execute a MongoDB query on a specific collection.

    Parameters:
        query (str): JSON-formatted MongoDB query string.
        database (str, optional): Target database name.
        collection (str, optional): Target collection name.
        optional_properties (dict, optional): Extra query parameters.

    Returns:
        list[dict]: List of documents matching the query, with `_id` as string.

    Raises:
        Exception: If `database` or `collection` is not provided.
    """
    if database is None:
        raise Exception("No database provided")

    if collection is None:
        raise Exception("No collection provided")

    db = self.connection[database]
    col = db[collection]

    q = json.loads(query)
    result = col.find(q)

    # Convert cursor to a list of dictionaries and handle ObjectId safely
    result_list = []
    for doc in result:
        if '_id' in doc:  # Check if '_id' exists
            doc['_id'] = str(doc['_id'])  # Convert ObjectId to string
        result_list.append(doc)

    return result_list

extract_schema(sample, schema=None, source=None)

Recursively infer schema structure from a sample MongoDB document.

Parameters:

Name Type Description Default
sample dict

Sample document for inference.

required
schema DataSchema

Existing schema object to update.

None
source str

Current entity source node.

None

Returns:

Name Type Description
DataSchema

Inferred or updated schema object.

Source code in blue/data/sources/mongodb_source.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def extract_schema(self, sample, schema=None, source=None):
    """
    Recursively infer schema structure from a sample MongoDB document.

    Parameters:
        sample (dict): Sample document for inference.
        schema (DataSchema, optional): Existing schema object to update.
        source (str, optional): Current entity source node.

    Returns:
        DataSchema: Inferred or updated schema object.
    """
    if schema is None:
        schema = DataSchema()

    if source == None:
        source = schema.add_entity("ROOT")

    if type(sample) == dict:
        for key in sample:
            value = sample[key]
            if type(value) == list:
                target = schema.add_entity(key)
                # (1)-->(M)
                schema.add_relation(source, source + ":" + target, target)
                if len(value) > 0:
                    self.extract_schema(value[0], schema=schema, source=target)
            elif type(value) == dict:
                target = schema.add_entity(key)
                # (1)-->(1)
                schema.add_relation(source, source + ":" + target, target)
                self.extract_schema(value, schema=schema, source=target)
            else:
                schema.add_entity_property(source, key, value.__class__.__name__)

    return schema

fetch_database_collection_entities(database, collection, sample_limit=10)

Fetch entities (document structures) for a collection.

Parameters:

Name Type Description Default
database str

Database name.

required
collection str

Collection name.

required

Returns:

Name Type Description
dict

A dictionary mapping entity names to their inferred schema, where each entry contains:

  • properties (dict): Metadata such as sample count.
  • contents (dict): Attributes inferred from sample documents with their types.
Source code in blue/data/sources/mongodb_source.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def fetch_database_collection_entities(self, database, collection, sample_limit=10):
    """
    Fetch entities (document structures) for a collection.

    Parameters:
        database (str): Database name.
        collection (str): Collection name.

    Returns:
        dict: A dictionary mapping entity names to their inferred schema, where each entry contains:
        - properties (dict): Metadata such as sample count.
        - contents (dict): Attributes inferred from sample documents with their types.
    """
    db = self.connection[database]
    collection_names = db.list_collection_names()

    schema = {}

    for coll_name in collection_names:
        coll = db[coll_name]
        sample_docs = list(coll.find().limit(sample_limit))

        attributes = {}

        # Infer attributes from documents
        for doc in sample_docs:
            for key, value in doc.items():
                if key not in attributes:
                    attr_type = type(value).__name__
                    attributes[key] = {"type": attr_type}
                else:
                    pass

        # Add entity metadata
        schema[coll_name] = {
            "properties": {"sample_count": len(sample_docs)},
            "contents": {"attributes": attributes},
        }

    return schema

fetch_database_collection_metadata(database, collection)

Fetch metadata for a specific collection within a database.

Parameters:

Name Type Description Default
database str

Name of the database.

required
collection str

Name of the collection.

required

Returns:

Name Type Description
dict

Metadata for the collection (default empty).

Source code in blue/data/sources/mongodb_source.py
116
117
118
119
120
121
122
123
124
125
126
127
def fetch_database_collection_metadata(self, database, collection):
    """
    Fetch metadata for a specific collection within a database.

    Parameters:
        database (str): Name of the database.
        collection (str): Name of the collection.

    Returns:
        dict: Metadata for the collection (default empty).
    """
    return {}

fetch_database_collection_relations(database, collection)

Fetch relations (document nesting relationships) for a collection.

Parameters:

Name Type Description Default
database str

Database name.

required
collection str

Collection name.

required

Returns:

Type Description

list[str]: Relations between entities.

Source code in blue/data/sources/mongodb_source.py
186
187
188
189
190
191
192
193
194
195
196
197
198
def fetch_database_collection_relations(self, database, collection):
    """
    Fetch relations (document nesting relationships) for a collection.

    Parameters:
        database (str): Database name.
        collection (str): Collection name.

    Returns:
        list[str]: Relations between entities.
    """
    schema = self._get_collection_schema(database, collection)
    return schema.get_relations()

fetch_database_collections(database)

List all collections within a database.

Parameters:

Name Type Description Default
database str

Database name.

required

Returns:

Type Description

list[str]: Names of collections.

Source code in blue/data/sources/mongodb_source.py
102
103
104
105
106
107
108
109
110
111
112
113
114
def fetch_database_collections(self, database):
    """
    List all collections within a database.

    Parameters:
        database (str): Database name.

    Returns:
        list[str]: Names of collections.
    """
    collections = []
    collections.append("public")
    return collections

fetch_database_metadata(database)

Fetch metadata for a specific database.

Parameters:

Name Type Description Default
database str

Database name.

required

Returns:

Name Type Description
dict

Database-level metadata (default empty).

Source code in blue/data/sources/mongodb_source.py
76
77
78
79
80
81
82
83
84
85
86
def fetch_database_metadata(self, database):
    """
    Fetch metadata for a specific database.

    Parameters:
        database (str): Database name.

    Returns:
        dict: Database-level metadata (default empty).
    """
    return {}

fetch_database_schema(database)

Fetch schema information for a specific database.

Parameters:

Name Type Description Default
database str

Database name.

required

Returns:

Name Type Description
dict

Schema definition for all collections in the database. Currently returns an empty dictionary.

Source code in blue/data/sources/mongodb_source.py
88
89
90
91
92
93
94
95
96
97
98
99
def fetch_database_schema(self, database):
    """
    Fetch schema information for a specific database.

    Parameters:
        database (str): Database name.

    Returns:
        dict: Schema definition for all collections in the database.
            Currently returns an empty dictionary.
    """
    return {}

fetch_databases()

List all databases in the MongoDB source.

Returns:

Type Description

list[str]: Names of all available databases.

Source code in blue/data/sources/mongodb_source.py
63
64
65
66
67
68
69
70
71
72
73
74
def fetch_databases(self):
    """
    List all databases in the MongoDB source.

    Returns:
        list[str]: Names of all available databases.
    """
    dbs = self.connection.list_database_names()
    # Exclude MongoDB internal databases
    system_dbs = {"admin", "config", "local"}
    user_dbs = [db for db in dbs if db not in system_dbs]
    return user_dbs

fetch_metadata()

Fetch metadata for the MongoDB source.

Returns:

Name Type Description
dict

General metadata about the MongoDB source. Currently returns an empty dictionary.

Source code in blue/data/sources/mongodb_source.py
42
43
44
45
46
47
48
49
50
def fetch_metadata(self):
    """
    Fetch metadata for the MongoDB source.

    Returns:
        dict: General metadata about the MongoDB source.
            Currently returns an empty dictionary.
    """
    return {}

fetch_schema()

Fetch schema for the MongoDB source.

Returns:

Name Type Description
dict

Schema information for the entire MongoDB source. Currently returns an empty dictionary.

Source code in blue/data/sources/mongodb_source.py
52
53
54
55
56
57
58
59
60
def fetch_schema(self):
    """
    Fetch schema for the MongoDB source.

    Returns:
        dict: Schema information for the entire MongoDB source.
            Currently returns an empty dictionary.
    """
    return {}
Last update: 2025-10-09