Skip to content

Mongodb source

MongoDBSource

Bases: DataSource

Source code in blue/data/sources/mongodb_source.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class MongoDBSource(DataSource):
    def __init__(self, name, properties={}):
        super().__init__(name, properties=properties)
        self._schema_cache = {}

    ###### connection
    def _initialize_connection_properties(self):
        super()._initialize_connection_properties()

        # set host, port, protocol
        self.properties['connection']['host'] = 'localhost'
        self.properties['connection']['port'] = 27017
        self.properties['connection']['protocol'] = 'mongodb'

    def _connect(self, **connection):
        host = connection['host']
        port = connection['port']

        connection_url = self.properties['protocol'] + "://" + host + ":" + str(port)
        return MongoClient(connection_url)

    def _disconnect(self):
        # TODO:
        return None

    ######### source
    def fetch_metadata(self):
        """
        Fetch metadata for the MongoDB source.

        Returns:
            dict: General metadata about the MongoDB source.
                Currently returns an empty dictionary.
        """
        return {}

    def fetch_schema(self):
        """
        Fetch schema for the MongoDB source.

        Returns:
            dict: Schema information for the entire MongoDB source.
                Currently returns an empty dictionary.
        """
        return {}

    ######### database
    def fetch_databases(self):
        """
        List all databases in the MongoDB source.

        Returns:
            list[str]: Names of all available databases.
        """
        dbs = self.connection.list_database_names()
        return dbs

    def fetch_database_metadata(self, database):
        """
        Fetch metadata for a specific database.

        Parameters:
            database (str): Database name.

        Returns:
            dict: Database-level metadata (default empty).
        """
        return {}

    def fetch_database_schema(self, database):
        """
        Fetch schema information for a specific database.

        Parameters:
            database (str): Database name.

        Returns:
            dict: Schema definition for all collections in the database.
                Currently returns an empty dictionary.
        """
        return {}

    ######### database/collection
    def fetch_database_collections(self, database):
        """
        List all collections within a database.

        Parameters:
            database (str): Database name.

        Returns:
            list[str]: Names of collections.
        """
        collections = self.connection[database].list_collection_names()
        return collections

    def fetch_database_collection_metadata(self, database, collection):
        """
        Fetch metadata for a specific collection within a database.

        Parameters:
            database (str): Name of the database.
            collection (str): Name of the collection.

        Returns:
            dict: Metadata for the collection (default empty).
        """
        return {}

    def _get_collection_schema(self, database, collection):
        """
        Internal helper: return cached schema object for a collection.
        """
        cache_key = (database, collection)
        if cache_key in self._schema_cache:
            return self._schema_cache[cache_key]

        coll = self.connection[database][collection]
        sample = coll.find_one()

        schema = self.extract_schema(sample)

        self._schema_cache[cache_key] = schema
        return schema

    def fetch_database_collection_entities(self, database, collection):
        """
        Fetch entities (document structures) for a collection.

        Parameters:
            database (str): Database name.
            collection (str): Collection name.

        Returns:
            list[str]: Entity names in the collection schema.
        """
        schema = self._get_collection_schema(database, collection)
        return schema.get_entities()

    def fetch_database_collection_relations(self, database, collection):
        """
        Fetch relations (document nesting relationships) for a collection.

        Parameters:
            database (str): Database name.
            collection (str): Collection name.

        Returns:
            list[str]: Relations between entities.
        """
        schema = self._get_collection_schema(database, collection)
        return schema.get_relations()

    def extract_schema(self, sample, schema=None, source=None):
        """
        Recursively infer schema structure from a sample MongoDB document.

        Parameters:
            sample (dict): Sample document for inference.
            schema (DataSchema, optional): Existing schema object to update.
            source (str, optional): Current entity source node.

        Returns:
            DataSchema: Inferred or updated schema object.
        """
        if schema is None:
            schema = DataSchema()

        if source == None:
            source = schema.add_entity("ROOT")

        if type(sample) == dict:
            for key in sample:
                value = sample[key]
                if type(value) == list:
                    target = schema.add_entity(key)
                    # (1)-->(M)
                    schema.add_relation(source, source + ":" + target, target)
                    if len(value) > 0:
                        self.extract_schema(value[0], schema=schema, source=target)
                elif type(value) == dict:
                    target = schema.add_entity(key)
                    # (1)-->(1)
                    schema.add_relation(source, source + ":" + target, target)
                    self.extract_schema(value, schema=schema, source=target)
                else:
                    schema.add_entity_property(source, key, value.__class__.__name__)

        return schema

    ######### execute query
    def execute_query(self, query, database=None, collection=None, optional_properties={}):
        """
        Execute a MongoDB query on a specific collection.

        Parameters:
            query (str): JSON-formatted MongoDB query string.
            database (str, optional): Target database name.
            collection (str, optional): Target collection name.
            optional_properties (dict, optional): Extra query parameters.

        Returns:
            list[dict]: List of documents matching the query, with `_id` as string.

        Raises:
            Exception: If `database` or `collection` is not provided.
        """
        if database is None:
            raise Exception("No database provided")

        if collection is None:
            raise Exception("No collection provided")

        db = self.connection[database]
        col = db[collection]

        q = json.loads(query)
        result = col.find(q)

        # Convert cursor to a list of dictionaries and handle ObjectId safely
        result_list = []
        for doc in result:
            if '_id' in doc:  # Check if '_id' exists
                doc['_id'] = str(doc['_id'])  # Convert ObjectId to string
            result_list.append(doc)

        return result_list

execute_query(query, database=None, collection=None, optional_properties={})

Execute a MongoDB query on a specific collection.

Parameters:

Name Type Description Default
query str

JSON-formatted MongoDB query string.

required
database str

Target database name.

None
collection str

Target collection name.

None
optional_properties dict

Extra query parameters.

{}

Returns:

Type Description

list[dict]: List of documents matching the query, with _id as string.

Raises:

Type Description
Exception

If database or collection is not provided.

Source code in blue/data/sources/mongodb_source.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def execute_query(self, query, database=None, collection=None, optional_properties={}):
    """
    Execute a MongoDB query on a specific collection.

    Parameters:
        query (str): JSON-formatted MongoDB query string.
        database (str, optional): Target database name.
        collection (str, optional): Target collection name.
        optional_properties (dict, optional): Extra query parameters.

    Returns:
        list[dict]: List of documents matching the query, with `_id` as string.

    Raises:
        Exception: If `database` or `collection` is not provided.
    """
    if database is None:
        raise Exception("No database provided")

    if collection is None:
        raise Exception("No collection provided")

    db = self.connection[database]
    col = db[collection]

    q = json.loads(query)
    result = col.find(q)

    # Convert cursor to a list of dictionaries and handle ObjectId safely
    result_list = []
    for doc in result:
        if '_id' in doc:  # Check if '_id' exists
            doc['_id'] = str(doc['_id'])  # Convert ObjectId to string
        result_list.append(doc)

    return result_list

extract_schema(sample, schema=None, source=None)

Recursively infer schema structure from a sample MongoDB document.

Parameters:

Name Type Description Default
sample dict

Sample document for inference.

required
schema DataSchema

Existing schema object to update.

None
source str

Current entity source node.

None

Returns:

Name Type Description
DataSchema

Inferred or updated schema object.

Source code in blue/data/sources/mongodb_source.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def extract_schema(self, sample, schema=None, source=None):
    """
    Recursively infer schema structure from a sample MongoDB document.

    Parameters:
        sample (dict): Sample document for inference.
        schema (DataSchema, optional): Existing schema object to update.
        source (str, optional): Current entity source node.

    Returns:
        DataSchema: Inferred or updated schema object.
    """
    if schema is None:
        schema = DataSchema()

    if source == None:
        source = schema.add_entity("ROOT")

    if type(sample) == dict:
        for key in sample:
            value = sample[key]
            if type(value) == list:
                target = schema.add_entity(key)
                # (1)-->(M)
                schema.add_relation(source, source + ":" + target, target)
                if len(value) > 0:
                    self.extract_schema(value[0], schema=schema, source=target)
            elif type(value) == dict:
                target = schema.add_entity(key)
                # (1)-->(1)
                schema.add_relation(source, source + ":" + target, target)
                self.extract_schema(value, schema=schema, source=target)
            else:
                schema.add_entity_property(source, key, value.__class__.__name__)

    return schema

fetch_database_collection_entities(database, collection)

Fetch entities (document structures) for a collection.

Parameters:

Name Type Description Default
database str

Database name.

required
collection str

Collection name.

required

Returns:

Type Description

list[str]: Entity names in the collection schema.

Source code in blue/data/sources/mongodb_source.py
141
142
143
144
145
146
147
148
149
150
151
152
153
def fetch_database_collection_entities(self, database, collection):
    """
    Fetch entities (document structures) for a collection.

    Parameters:
        database (str): Database name.
        collection (str): Collection name.

    Returns:
        list[str]: Entity names in the collection schema.
    """
    schema = self._get_collection_schema(database, collection)
    return schema.get_entities()

fetch_database_collection_metadata(database, collection)

Fetch metadata for a specific collection within a database.

Parameters:

Name Type Description Default
database str

Name of the database.

required
collection str

Name of the collection.

required

Returns:

Name Type Description
dict

Metadata for the collection (default empty).

Source code in blue/data/sources/mongodb_source.py
112
113
114
115
116
117
118
119
120
121
122
123
def fetch_database_collection_metadata(self, database, collection):
    """
    Fetch metadata for a specific collection within a database.

    Parameters:
        database (str): Name of the database.
        collection (str): Name of the collection.

    Returns:
        dict: Metadata for the collection (default empty).
    """
    return {}

fetch_database_collection_relations(database, collection)

Fetch relations (document nesting relationships) for a collection.

Parameters:

Name Type Description Default
database str

Database name.

required
collection str

Collection name.

required

Returns:

Type Description

list[str]: Relations between entities.

Source code in blue/data/sources/mongodb_source.py
155
156
157
158
159
160
161
162
163
164
165
166
167
def fetch_database_collection_relations(self, database, collection):
    """
    Fetch relations (document nesting relationships) for a collection.

    Parameters:
        database (str): Database name.
        collection (str): Collection name.

    Returns:
        list[str]: Relations between entities.
    """
    schema = self._get_collection_schema(database, collection)
    return schema.get_relations()

fetch_database_collections(database)

List all collections within a database.

Parameters:

Name Type Description Default
database str

Database name.

required

Returns:

Type Description

list[str]: Names of collections.

Source code in blue/data/sources/mongodb_source.py
 99
100
101
102
103
104
105
106
107
108
109
110
def fetch_database_collections(self, database):
    """
    List all collections within a database.

    Parameters:
        database (str): Database name.

    Returns:
        list[str]: Names of collections.
    """
    collections = self.connection[database].list_collection_names()
    return collections

fetch_database_metadata(database)

Fetch metadata for a specific database.

Parameters:

Name Type Description Default
database str

Database name.

required

Returns:

Name Type Description
dict

Database-level metadata (default empty).

Source code in blue/data/sources/mongodb_source.py
73
74
75
76
77
78
79
80
81
82
83
def fetch_database_metadata(self, database):
    """
    Fetch metadata for a specific database.

    Parameters:
        database (str): Database name.

    Returns:
        dict: Database-level metadata (default empty).
    """
    return {}

fetch_database_schema(database)

Fetch schema information for a specific database.

Parameters:

Name Type Description Default
database str

Database name.

required

Returns:

Name Type Description
dict

Schema definition for all collections in the database. Currently returns an empty dictionary.

Source code in blue/data/sources/mongodb_source.py
85
86
87
88
89
90
91
92
93
94
95
96
def fetch_database_schema(self, database):
    """
    Fetch schema information for a specific database.

    Parameters:
        database (str): Database name.

    Returns:
        dict: Schema definition for all collections in the database.
            Currently returns an empty dictionary.
    """
    return {}

fetch_databases()

List all databases in the MongoDB source.

Returns:

Type Description

list[str]: Names of all available databases.

Source code in blue/data/sources/mongodb_source.py
63
64
65
66
67
68
69
70
71
def fetch_databases(self):
    """
    List all databases in the MongoDB source.

    Returns:
        list[str]: Names of all available databases.
    """
    dbs = self.connection.list_database_names()
    return dbs

fetch_metadata()

Fetch metadata for the MongoDB source.

Returns:

Name Type Description
dict

General metadata about the MongoDB source. Currently returns an empty dictionary.

Source code in blue/data/sources/mongodb_source.py
42
43
44
45
46
47
48
49
50
def fetch_metadata(self):
    """
    Fetch metadata for the MongoDB source.

    Returns:
        dict: General metadata about the MongoDB source.
            Currently returns an empty dictionary.
    """
    return {}

fetch_schema()

Fetch schema for the MongoDB source.

Returns:

Name Type Description
dict

Schema information for the entire MongoDB source. Currently returns an empty dictionary.

Source code in blue/data/sources/mongodb_source.py
52
53
54
55
56
57
58
59
60
def fetch_schema(self):
    """
    Fetch schema for the MongoDB source.

    Returns:
        dict: Schema information for the entire MongoDB source.
            Currently returns an empty dictionary.
    """
    return {}
Last update: 2025-10-09