Skip to content

Source

DataSource

Source code in blue/data/source.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
class DataSource:
    def __init__(self, name, properties={}):
        """
        Initialize a generic data source.

        Parameters:
            name (str): Name of the source.
            properties (dict, optional): Additional configuration properties.
        """

        self.name = name

        self._initialize(properties=properties)

        self._start()

    ###### initialization
    def _initialize(self, properties=None):
        """
        Perform internal initialization: properties and logger.
        """
        self._initialize_properties()
        self._update_properties(properties=properties)

        self._initialize_logger()

    def _initialize_properties(self):
        """
        Initialize default properties, including connection settings.
        """

        self.properties = {}

        # connection properties
        self._initialize_connection_properties()

    def _update_properties(self, properties=None):
        """
        Override default properties with provided dictionary.

        Parameters:
            properties (dict, optional): Properties to update.
        """
        if properties is None:
            return

        # override
        for p in properties:
            self.properties[p] = properties[p]

    def _initialize_connection_properties(self):
        connection_properties = {}

        connection_properties['protocol'] = 'default'
        self.properties['connection'] = connection_properties

    def _initialize_logger(self):
        self.logger = log_utils.CustomLogger()
        # customize log
        self.logger.set_config_data(
            "stack",
            "%(call_stack)s",
        )
        self.logger.set_config_data("source", self.name, -1)

    ###### connection
    def _start_connection(self):
        connection = self.properties['connection']

        self.connection = self._connect(**connection)

    def _stop_connection(self):
        self._disconnect()

    def _connect(self, **connection):
        return None

    def _disconnect(self):
        return None

    def _start(self):
        # self.logger.info('Starting session {name}'.format(name=self.name))
        self._start_connection()

        self.logger.info('Started source {name}'.format(name=self.name))

    def _stop(self):
        self._stop_connection()

        self.logger.info('Stopped source {name}'.format(name=self.name))

    ######### source
    def fetch_metadata(self):
        """
        Retrieve high-level metadata about the data source.

        Returns:
            dict: Source metadata such as name, type, or description.
                Default is an empty dictionary.
        """
        return {}

    def fetch_schema(self):
        """
        Retrieve the overall schema of the data source.

        Returns:
            dict: Schema definition including databases, collections, and entities.
                Default is an empty dictionary.
        """
        return {}

    ######### source/database
    def fetch_databases(self):
        """
        List all databases available in the data source.

        Returns:
            list[str]: Names of databases. Default is an empty list.
        """
        return []

    def fetch_database_metadata(self, database):
        """
        Retrieve metadata for a specific database.

        Parameters:
            database (str): Name of the database.

        Returns:
            dict: Database-level metadata such as size, owner, or creation date.
                Default is an empty dictionary.
        """
        return {}

    def fetch_database_schema(self, database):
        """
        Retrieve the schema of a specific database.

        Parameters:
            database (str): Name of the database.

        Returns:
            dict: Database schema including collections and entities.
                Default is an empty dictionary.
        """
        return {}

    def create_database(self, database, properties={}):
        """
        Create a new database in the data source.

        Parameters:
            database (str): Name of the new database.
            properties (dict, optional): Database-specific configuration options.

        Returns:
            dict: Metadata or result of the creation operation. Default is empty.
        """
        return {}

    ######### source/database/collection
    def fetch_database_collections(self, database):
        """
        List all collections in a database.

        Parameters:
            database (str): Database name.

        Returns:
            list: Collection names (default empty).
        """
        return []

    def fetch_database_collection_metadata(self, database, collection):
        """
        Fetch metadata for a collection.

        Parameters:
            database (str): Database name.
            collection (str): Collection name.

        Returns:
            dict: Collection metadata (default empty).
        """
        return {}

    def fetch_database_collection_entities(self, database, collection):
        """
        Retrieve entities (tables, objects, or equivalent) for a specific collection.

        Parameters:
            database (str): Name of the database.
            collection (str): Name of the collection or schema.

        Returns:
            dict: Dictionary of entities with their properties and metadata.
                Default is empty.
        """
        return {}

    def fetch_database_collection_relations(self, database, collection):
        """
        Retrieve relationships (e.g., foreign keys or links) between entities in a collection.

        Parameters:
            database (str): Name of the database.
            collection (str): Name of the collection or schema.

        Returns:
            dict: Dictionary of relations between entities.
                Default is empty.
        """
        return {}

    def create_database_collection(self, database, collection, properties={}):
        """
        Create a new collection (schema, table group, or equivalent) in a database.

        Parameters:
            database (str): Database name.
            collection (str): Name of the new collection.
            properties (dict, optional): Collection-specific properties.

        Returns:
            dict: Metadata or result of the creation operation. Default is empty.
        """
        return {}

    ######### source/database/collection/entity
    # properties: {
    #     "properties": [    <--- entity properties
    #         {
    #             "name": "",
    #             "type": "",
    #             "misc":
    #         }
    #      ]
    # }
    # note: misc can include primary key, etc. features that are db specific
    #
    def create_database_collection_entity(self, database, collection, entity, properties={}):
        """
        Create a new entity (table, object, or equivalent) within a collection.

        Parameters:
            database (str): Database name.
            collection (str): Collection or schema name.
            entity (str): Name of the entity to create.
            properties (dict, optional): Entity properties, including column definitions,
                                        types, and misc metadata such as primary keys.

        Returns:
            dict: Metadata or result of the creation operation. Default is empty.
        """
        return {}

    ######### source/database/collection/relation
    def create_database_collection_relation(self, database, collection, relation, properties={}):
        """
        Create a new relationship between entities within a collection.

        Parameters:
            database (str): Database name.
            collection (str): Collection or schema name.
            relation (str): Name or identifier of the relation.
            properties (dict, optional): Relation-specific metadata.

        Returns:
            dict: Metadata or result of the creation operation. Default is empty.
        """
        return {}

    ######### execute query
    def execute_query(self, query, database=None, collection=None, optional_properties={}):
        """
        Execute a query on the data source and return results.

        Parameters:
            query (str): Query string to execute.
            database (str, optional): Target database name.
            collection (str, optional): Target collection name.
            optional_properties (dict, optional): Additional execution options.

        Returns:
            list[dict]: List of results, each row as a dictionary.
                        Default is a single empty dictionary.
        """
        return [{}]

    #######  stats ############
    def fetch_source_stats(self):
        """
        Retrieve high-level statistics about the source itself.

        Returns:
            dict or None: Source-level statistics such as connection info,
                        number of databases, or performance metrics.
                        Default is None.
        """
        return None

    def fetch_database_stats(self, database):
        """
        Retrieve statistics for a specific database.

        Parameters:
            database (str): Database name.

        Returns:
            dict or None: Database-level statistics such as size, table count,
                        or other relevant metrics. Default is None.
        """
        return None

    def fetch_collection_stats(self, database, collection_name, schema_json=None, sample_limit=None):
        """
        Retrieve statistics for a specific collection (schema) in a database.

        Parameters:
            database (str): Database name.
            collection_name (str): Collection or schema name.
            schema_json (dict, optional): Schema definition for computing statistics.
            sample_limit (int, optional): Maximum number of samples to collect for properties.

        Returns:
            dict or None: Collection-level statistics such as number of entities,
                        relations, or sampled property values. Default is None.
        """
        return None

    def fetch_entity_stats(self, database, collection, entity):
        """
        Retrieve statistics for a specific entity (table/object) in a collection.

        Parameters:
            database (str): Database name.
            collection (str): Collection or schema name.
            entity (str): Entity name.

        Returns:
            dict or None: Entity-level statistics such as row count, size, or other metrics.
                        Default is None.
        """
        return None

    def fetch_property_stats(self, database, collection, entity, property_name, sample_limit=None):
        """
        Retrieve statistics for a specific property (column/attribute) of an entity.

        Parameters:
            database (str): Database name.
            collection (str): Collection or schema name.
            entity (str): Entity name.
            property_name (str): Property/column name.
            sample_limit (int, optional): Maximum number of sample values to fetch.

        Returns:
            dict or None: Property-level statistics such as count, distinct values,
                        null count, min/max, or sampled values. Default is None.
        """
        return None

__init__(name, properties={})

Initialize a generic data source.

Parameters:

Name Type Description Default
name str

Name of the source.

required
properties dict

Additional configuration properties.

{}
Source code in blue/data/source.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def __init__(self, name, properties={}):
    """
    Initialize a generic data source.

    Parameters:
        name (str): Name of the source.
        properties (dict, optional): Additional configuration properties.
    """

    self.name = name

    self._initialize(properties=properties)

    self._start()

create_database(database, properties={})

Create a new database in the data source.

Parameters:

Name Type Description Default
database str

Name of the new database.

required
properties dict

Database-specific configuration options.

{}

Returns:

Name Type Description
dict

Metadata or result of the creation operation. Default is empty.

Source code in blue/data/source.py
161
162
163
164
165
166
167
168
169
170
171
172
def create_database(self, database, properties={}):
    """
    Create a new database in the data source.

    Parameters:
        database (str): Name of the new database.
        properties (dict, optional): Database-specific configuration options.

    Returns:
        dict: Metadata or result of the creation operation. Default is empty.
    """
    return {}

create_database_collection(database, collection, properties={})

Create a new collection (schema, table group, or equivalent) in a database.

Parameters:

Name Type Description Default
database str

Database name.

required
collection str

Name of the new collection.

required
properties dict

Collection-specific properties.

{}

Returns:

Name Type Description
dict

Metadata or result of the creation operation. Default is empty.

Source code in blue/data/source.py
228
229
230
231
232
233
234
235
236
237
238
239
240
def create_database_collection(self, database, collection, properties={}):
    """
    Create a new collection (schema, table group, or equivalent) in a database.

    Parameters:
        database (str): Database name.
        collection (str): Name of the new collection.
        properties (dict, optional): Collection-specific properties.

    Returns:
        dict: Metadata or result of the creation operation. Default is empty.
    """
    return {}

create_database_collection_entity(database, collection, entity, properties={})

Create a new entity (table, object, or equivalent) within a collection.

Parameters:

Name Type Description Default
database str

Database name.

required
collection str

Collection or schema name.

required
entity str

Name of the entity to create.

required
properties dict

Entity properties, including column definitions, types, and misc metadata such as primary keys.

{}

Returns:

Name Type Description
dict

Metadata or result of the creation operation. Default is empty.

Source code in blue/data/source.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def create_database_collection_entity(self, database, collection, entity, properties={}):
    """
    Create a new entity (table, object, or equivalent) within a collection.

    Parameters:
        database (str): Database name.
        collection (str): Collection or schema name.
        entity (str): Name of the entity to create.
        properties (dict, optional): Entity properties, including column definitions,
                                    types, and misc metadata such as primary keys.

    Returns:
        dict: Metadata or result of the creation operation. Default is empty.
    """
    return {}

create_database_collection_relation(database, collection, relation, properties={})

Create a new relationship between entities within a collection.

Parameters:

Name Type Description Default
database str

Database name.

required
collection str

Collection or schema name.

required
relation str

Name or identifier of the relation.

required
properties dict

Relation-specific metadata.

{}

Returns:

Name Type Description
dict

Metadata or result of the creation operation. Default is empty.

Source code in blue/data/source.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def create_database_collection_relation(self, database, collection, relation, properties={}):
    """
    Create a new relationship between entities within a collection.

    Parameters:
        database (str): Database name.
        collection (str): Collection or schema name.
        relation (str): Name or identifier of the relation.
        properties (dict, optional): Relation-specific metadata.

    Returns:
        dict: Metadata or result of the creation operation. Default is empty.
    """
    return {}

execute_query(query, database=None, collection=None, optional_properties={})

Execute a query on the data source and return results.

Parameters:

Name Type Description Default
query str

Query string to execute.

required
database str

Target database name.

None
collection str

Target collection name.

None
optional_properties dict

Additional execution options.

{}

Returns:

Type Description

list[dict]: List of results, each row as a dictionary. Default is a single empty dictionary.

Source code in blue/data/source.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def execute_query(self, query, database=None, collection=None, optional_properties={}):
    """
    Execute a query on the data source and return results.

    Parameters:
        query (str): Query string to execute.
        database (str, optional): Target database name.
        collection (str, optional): Target collection name.
        optional_properties (dict, optional): Additional execution options.

    Returns:
        list[dict]: List of results, each row as a dictionary.
                    Default is a single empty dictionary.
    """
    return [{}]

fetch_collection_stats(database, collection_name, schema_json=None, sample_limit=None)

Retrieve statistics for a specific collection (schema) in a database.

Parameters:

Name Type Description Default
database str

Database name.

required
collection_name str

Collection or schema name.

required
schema_json dict

Schema definition for computing statistics.

None
sample_limit int

Maximum number of samples to collect for properties.

None

Returns:

Type Description

dict or None: Collection-level statistics such as number of entities, relations, or sampled property values. Default is None.

Source code in blue/data/source.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def fetch_collection_stats(self, database, collection_name, schema_json=None, sample_limit=None):
    """
    Retrieve statistics for a specific collection (schema) in a database.

    Parameters:
        database (str): Database name.
        collection_name (str): Collection or schema name.
        schema_json (dict, optional): Schema definition for computing statistics.
        sample_limit (int, optional): Maximum number of samples to collect for properties.

    Returns:
        dict or None: Collection-level statistics such as number of entities,
                    relations, or sampled property values. Default is None.
    """
    return None

fetch_database_collection_entities(database, collection)

Retrieve entities (tables, objects, or equivalent) for a specific collection.

Parameters:

Name Type Description Default
database str

Name of the database.

required
collection str

Name of the collection or schema.

required

Returns:

Name Type Description
dict

Dictionary of entities with their properties and metadata. Default is empty.

Source code in blue/data/source.py
200
201
202
203
204
205
206
207
208
209
210
211
212
def fetch_database_collection_entities(self, database, collection):
    """
    Retrieve entities (tables, objects, or equivalent) for a specific collection.

    Parameters:
        database (str): Name of the database.
        collection (str): Name of the collection or schema.

    Returns:
        dict: Dictionary of entities with their properties and metadata.
            Default is empty.
    """
    return {}

fetch_database_collection_metadata(database, collection)

Fetch metadata for a collection.

Parameters:

Name Type Description Default
database str

Database name.

required
collection str

Collection name.

required

Returns:

Name Type Description
dict

Collection metadata (default empty).

Source code in blue/data/source.py
187
188
189
190
191
192
193
194
195
196
197
198
def fetch_database_collection_metadata(self, database, collection):
    """
    Fetch metadata for a collection.

    Parameters:
        database (str): Database name.
        collection (str): Collection name.

    Returns:
        dict: Collection metadata (default empty).
    """
    return {}

fetch_database_collection_relations(database, collection)

Retrieve relationships (e.g., foreign keys or links) between entities in a collection.

Parameters:

Name Type Description Default
database str

Name of the database.

required
collection str

Name of the collection or schema.

required

Returns:

Name Type Description
dict

Dictionary of relations between entities. Default is empty.

Source code in blue/data/source.py
214
215
216
217
218
219
220
221
222
223
224
225
226
def fetch_database_collection_relations(self, database, collection):
    """
    Retrieve relationships (e.g., foreign keys or links) between entities in a collection.

    Parameters:
        database (str): Name of the database.
        collection (str): Name of the collection or schema.

    Returns:
        dict: Dictionary of relations between entities.
            Default is empty.
    """
    return {}

fetch_database_collections(database)

List all collections in a database.

Parameters:

Name Type Description Default
database str

Database name.

required

Returns:

Name Type Description
list

Collection names (default empty).

Source code in blue/data/source.py
175
176
177
178
179
180
181
182
183
184
185
def fetch_database_collections(self, database):
    """
    List all collections in a database.

    Parameters:
        database (str): Database name.

    Returns:
        list: Collection names (default empty).
    """
    return []

fetch_database_metadata(database)

Retrieve metadata for a specific database.

Parameters:

Name Type Description Default
database str

Name of the database.

required

Returns:

Name Type Description
dict

Database-level metadata such as size, owner, or creation date. Default is an empty dictionary.

Source code in blue/data/source.py
135
136
137
138
139
140
141
142
143
144
145
146
def fetch_database_metadata(self, database):
    """
    Retrieve metadata for a specific database.

    Parameters:
        database (str): Name of the database.

    Returns:
        dict: Database-level metadata such as size, owner, or creation date.
            Default is an empty dictionary.
    """
    return {}

fetch_database_schema(database)

Retrieve the schema of a specific database.

Parameters:

Name Type Description Default
database str

Name of the database.

required

Returns:

Name Type Description
dict

Database schema including collections and entities. Default is an empty dictionary.

Source code in blue/data/source.py
148
149
150
151
152
153
154
155
156
157
158
159
def fetch_database_schema(self, database):
    """
    Retrieve the schema of a specific database.

    Parameters:
        database (str): Name of the database.

    Returns:
        dict: Database schema including collections and entities.
            Default is an empty dictionary.
    """
    return {}

fetch_database_stats(database)

Retrieve statistics for a specific database.

Parameters:

Name Type Description Default
database str

Database name.

required

Returns:

Type Description

dict or None: Database-level statistics such as size, table count, or other relevant metrics. Default is None.

Source code in blue/data/source.py
315
316
317
318
319
320
321
322
323
324
325
326
def fetch_database_stats(self, database):
    """
    Retrieve statistics for a specific database.

    Parameters:
        database (str): Database name.

    Returns:
        dict or None: Database-level statistics such as size, table count,
                    or other relevant metrics. Default is None.
    """
    return None

fetch_databases()

List all databases available in the data source.

Returns:

Type Description

list[str]: Names of databases. Default is an empty list.

Source code in blue/data/source.py
126
127
128
129
130
131
132
133
def fetch_databases(self):
    """
    List all databases available in the data source.

    Returns:
        list[str]: Names of databases. Default is an empty list.
    """
    return []

fetch_entity_stats(database, collection, entity)

Retrieve statistics for a specific entity (table/object) in a collection.

Parameters:

Name Type Description Default
database str

Database name.

required
collection str

Collection or schema name.

required
entity str

Entity name.

required

Returns:

Type Description

dict or None: Entity-level statistics such as row count, size, or other metrics. Default is None.

Source code in blue/data/source.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def fetch_entity_stats(self, database, collection, entity):
    """
    Retrieve statistics for a specific entity (table/object) in a collection.

    Parameters:
        database (str): Database name.
        collection (str): Collection or schema name.
        entity (str): Entity name.

    Returns:
        dict or None: Entity-level statistics such as row count, size, or other metrics.
                    Default is None.
    """
    return None

fetch_metadata()

Retrieve high-level metadata about the data source.

Returns:

Name Type Description
dict

Source metadata such as name, type, or description. Default is an empty dictionary.

Source code in blue/data/source.py
105
106
107
108
109
110
111
112
113
def fetch_metadata(self):
    """
    Retrieve high-level metadata about the data source.

    Returns:
        dict: Source metadata such as name, type, or description.
            Default is an empty dictionary.
    """
    return {}

fetch_property_stats(database, collection, entity, property_name, sample_limit=None)

Retrieve statistics for a specific property (column/attribute) of an entity.

Parameters:

Name Type Description Default
database str

Database name.

required
collection str

Collection or schema name.

required
entity str

Entity name.

required
property_name str

Property/column name.

required
sample_limit int

Maximum number of sample values to fetch.

None

Returns:

Type Description

dict or None: Property-level statistics such as count, distinct values, null count, min/max, or sampled values. Default is None.

Source code in blue/data/source.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def fetch_property_stats(self, database, collection, entity, property_name, sample_limit=None):
    """
    Retrieve statistics for a specific property (column/attribute) of an entity.

    Parameters:
        database (str): Database name.
        collection (str): Collection or schema name.
        entity (str): Entity name.
        property_name (str): Property/column name.
        sample_limit (int, optional): Maximum number of sample values to fetch.

    Returns:
        dict or None: Property-level statistics such as count, distinct values,
                    null count, min/max, or sampled values. Default is None.
    """
    return None

fetch_schema()

Retrieve the overall schema of the data source.

Returns:

Name Type Description
dict

Schema definition including databases, collections, and entities. Default is an empty dictionary.

Source code in blue/data/source.py
115
116
117
118
119
120
121
122
123
def fetch_schema(self):
    """
    Retrieve the overall schema of the data source.

    Returns:
        dict: Schema definition including databases, collections, and entities.
            Default is an empty dictionary.
    """
    return {}

fetch_source_stats()

Retrieve high-level statistics about the source itself.

Returns:

Type Description

dict or None: Source-level statistics such as connection info, number of databases, or performance metrics. Default is None.

Source code in blue/data/source.py
304
305
306
307
308
309
310
311
312
313
def fetch_source_stats(self):
    """
    Retrieve high-level statistics about the source itself.

    Returns:
        dict or None: Source-level statistics such as connection info,
                    number of databases, or performance metrics.
                    Default is None.
    """
    return None
Last update: 2025-10-09