Skip to content

Postgres source

PostgresDBSource

Bases: DataSource

Source code in blue/data/sources/postgres_source.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
class PostgresDBSource(DataSource):
    def __init__(self, name, properties={}):
        super().__init__(name, properties=properties)

    ###### initialization
    def _initialize_properties(self):
        super()._initialize_properties()

    ###### connection
    def _initialize_connection_properties(self):
        super()._initialize_connection_properties()

        # set host, port, protocol
        self.properties['connection']['host'] = 'localhost'
        self.properties['connection']['port'] = 5432
        self.properties['connection']['protocol'] = 'postgres'

    def _connect(self, **connection):
        c = copy.deepcopy(connection)
        if 'protocol' in c:
            del c['protocol']

        return psycopg2.connect(**c)

    def _disconnect(self):
        # TODO:
        return None

    ######### source
    def fetch_metadata(self):
        """
        Fetch source-level metadata for the PostgreSQL server.

        Returns:
            dict: Currently returns an empty dictionary. Can be extended to include server info, version, etc.
        """
        return {}

    def fetch_schema(self):
        """
        Fetch global schema metadata for the PostgreSQL source.

        Returns:
            dict: Currently returns an empty dictionary. Can be extended to include schema definitions.
        """
        return {}

    ######### database
    def fetch_databases(self):
        """
        Retrieve a list of available databases from the PostgreSQL server.

        This method queries the system catalog `pg_database` to list all databases
        on the connected PostgreSQL instance, excluding template databases such as
        `template0` and `template1`.

        Returns:
            list[str]:
                A list of database names available on the server.

        """
        query = "SELECT datname FROM pg_database;"
        cursor = self.connection.cursor()
        cursor.execute(query)
        data = cursor.fetchall()
        dbs = []
        for datum in data:
            db = datum[0]
            # ignore template<d> databases
            if db.find("template") == 0:
                continue
            dbs.append(db)
        return dbs

    def fetch_database_metadata(self, database):
        """
        Fetch high-level metadata for a specific PostgreSQL database.

        Parameters:
            database (str):
                The name of the database for which to fetch metadata.

        Returns:
            dict:
                Currently returns an empty dictionary.
        """
        return {}

    def fetch_database_schema(self, database):
        """
        Fetch schema definitions for a specific PostgreSQL database.

        Parameters:
            database (str): Database name.

        Returns:
            dict: Currently returns an empty dictionary. Can be extended to return table and column definitions.
        """
        return {}

    ######### database/collection
    def _db_connect(self, database):
        # connect to database
        c = copy.deepcopy(self.properties['connection'])
        if 'protocol' in c:
            del c['protocol']
        # override database
        c['database'] = database

        db_connection = self._connect(**c)
        return db_connection

    def _db_disconnect(self, connection):
        # TODO:
        return None

    def fetch_database_collections(self, database):
        """
        Retrieve a list of collections (schemas) within a PostgreSQL database.

        This method connects to a specific database and queries the
        `information_schema.tables` system catalog to find all distinct schemas
        that contain tables, excluding system schemas such as
        `'pg_catalog'` and `'information_schema'`.

        Parameters:
            database (str):
                The name of the database to inspect.

        Returns:
            list[str]:
                A list of schema (collection) names within the specified database.
        """
        # connect to specific database (not source directly)
        db_connection = self._db_connect(database)

        # exclude 'pg_catalog', 'information_schema'
        query = "SELECT DISTINCT table_schema FROM information_schema.tables WHERE table_schema NOT IN ('pg_catalog', 'information_schema');"
        cursor = db_connection.cursor()
        cursor.execute(query)
        data = cursor.fetchall()
        collections = []
        for datum in data:
            collections.append(datum[0])

        # disconnect
        self._db_disconnect(db_connection)
        return collections

    def fetch_database_collection_metadata(self, database, collection):
        """
        Fetch metadata for a specific schema (collection) in a PostgreSQL database.

        Parameters:
            database (str):
                Name of the database to connect to.
            collection (str):
                Name of the schema (collection) whose metadata should be retrieved.

        Returns:
            dict:
                Currently returns an empty dictionary.
        """
        return {}

    def fetch_enum_types(self, db_connection):
        """
        Fetch all PostgreSQL ENUM types and their possible values.

        This method queries the system catalog tables (`pg_type`, `pg_enum`,
        and `pg_namespace`) to collect all user-defined ENUM types and
        their corresponding labels (values). It excludes system schemas
        such as `pg_catalog` and `information_schema`.

        Parameters:
            db_connection:
                A live PostgreSQL database connection object (e.g., from psycopg2).

        Returns:
            dict[str, list[str]]:
                A mapping of enum type names (qualified by schema) to their list of values.
        """

        query = """
        SELECT
          n.nspname AS schema,
          t.typname AS type_name,
          e.enumlabel AS enum_value
        FROM
          pg_type t
        JOIN
          pg_enum e ON t.oid = e.enumtypid
        JOIN
          pg_catalog.pg_namespace n ON n.oid = t.typnamespace
        WHERE
          n.nspname NOT IN ('pg_catalog', 'information_schema')
        ORDER BY
          t.typname, e.enumsortorder;
        """
        cursor = db_connection.cursor()
        cursor.execute(query)
        data = cursor.fetchall()

        enum_types = {}

        for schema, type_name, enum_value in data:
            if type_name not in enum_types:
                enum_types[type_name] = []
            enum_types[type_name].append(enum_value)

        return enum_types

    def fetch_database_collection_entities(self, database, collection, max_distinct=50, max_ratio=0.1, max_length=100):
        """
        Collect entity (table) and property (column) metadata for a given schema in a PostgreSQL database.

        This method retrieves table and column definitions from the `information_schema.columns`
        view for a specific schema (`collection`). It identifies ENUM-backed columns, gathers
        column-level statistics (distinct counts, average text lengths), and optionally collects
        representative sample values for categorical string columns.

        The method is designed to populate a `DataSchema` object that models entities and their
        properties, useful for metadata inspection, schema inference, or automated documentation.

        Parameters:
            database (str):
                The logical name or connection identifier for the target PostgreSQL database.
            collection (str):
                The schema name within the database to inspect (e.g., "public").
            max_distinct (int, optional):
                The maximum number of distinct values to allow for a text column before
                it is considered non-categorical. Defaults to 50.
            max_ratio (float, optional):
                The maximum allowed ratio of distinct values to total rows for a column to
                be considered categorical. Defaults to 0.1.
            max_length (int, optional):
                The maximum average string length allowed for categorical columns. Defaults to 100.

        Returns:
            dict:
                A dictionary representation of entities (tables) and their properties (columns),
                as produced by `DataSchema.get_entities()`.

        Raises:
            psycopg2.Error:
                If any SQL execution or database interaction fails.

        """
        db_connection = self._db_connect(database)

        query = """
        SELECT table_name, column_name, data_type, udt_name
        FROM information_schema.columns
        WHERE table_schema = %s
        """
        cursor = db_connection.cursor()
        cursor.execute(query, (collection,))
        data = cursor.fetchall()

        enum_types = self.fetch_enum_types(db_connection)
        schema = DataSchema()

        for table_name, column_name, data_type, udt_name in data:
            if not schema.has_entity(table_name):
                schema.add_entity(table_name)

            property_def = {"type": data_type}

            if enum_types and udt_name in enum_types:
                property_def["enum"] = enum_types[udt_name]

            if data_type.lower() in ("character varying", "varchar", "character", "text", "name"):

                cursor.execute(
                    f"""
                  SELECT COUNT(DISTINCT "{column_name}"), COUNT(*) 
                  FROM "{collection}"."{table_name}"
                  WHERE "{column_name}" IS NOT NULL
                 """
                )

                distinct_count, total_count = cursor.fetchone()

                cursor.execute(
                    f'''
                    SELECT AVG(LENGTH("{column_name}"))
                    FROM "{collection}"."{table_name}"
                    WHERE "{column_name}" IS NOT NULL
                '''
                )
                avg_length = cursor.fetchone()[0] or 0

                if distinct_count <= max_distinct and (total_count == 0 or distinct_count / total_count <= max_ratio) and avg_length <= max_length:
                    cursor.execute(
                        f"""
                        SELECT DISTINCT "{column_name}"
                        FROM "{collection}"."{table_name}"
                        WHERE "{column_name}" IS NOT NULL
                        LIMIT {max_distinct};
                    """
                    )
                    values = [row[0] for row in cursor.fetchall()]
                    property_def["values"] = values

            schema.add_entity_property(table_name, column_name, property_def)

        self._db_disconnect(db_connection)
        return schema.get_entities()

    ### TODO
    def fetch_database_collection_relations(self, database, collection):
        """
        Retrieve relationships (foreign key constraints) between tables in a given schema.

        Currently a placeholder method. Intended to extract relational metadata
        such as foreign key relationships, joins, and dependencies between tables
        in the specified database and schema/collection.

        Parameters:
            database (str): The database name to inspect.
            collection (str): The schema name within the database.

        Returns:
            dict: A dictionary representing table relationships.
                Currently returns an empty dictionary.
        """
        return {}

    ######### execute query
    def execute_query(self, query, database=None, collection=None, optional_properties={}):
        """
        Execute a SQL query on a specific PostgreSQL database and return results as JSON.

        This method connects to the specified database, executes the provided
        SQL query, fetches all results, converts them to a pandas DataFrame,
        and finally serializes the DataFrame into a JSON array.

        Parameters:
            query (str): The SQL query string to execute.
            database (str): Name of the database to connect to.
            collection (str, optional): Not used for PostgreSQL, kept for interface consistency.
            optional_properties (dict, optional): Additional options. Supported keys:
                - 'commit' (bool): If True, commits the transaction after execution.

        Raises:
            Exception: If `database` is not provided.

        Returns:
            list[dict]: Query results serialized as a list of dictionaries (JSON objects),
                        where each dictionary corresponds to a row in the query result.
        """
        if database is None:
            raise Exception("No database provided")

        # create connection to db
        db_connection = self._db_connect(database)

        cursor = db_connection.cursor()
        cursor.execute(query)
        data = cursor.fetchall()

        # transform to json
        columns = [desc[0] for desc in cursor.description]
        df = pd.DataFrame(data, columns=columns)
        df.fillna(value=np.nan, inplace=True)
        result = json.loads(df.to_json(orient='records'))

        # disconnect
        self._db_disconnect(db_connection)

        return result

    ######### stats

    def fetch_source_stats(self):
        """
        Collect high-level statistics about the PostgreSQL source connection.

        Retrieves general metadata such as:
        - PostgreSQL version
        - Number of non-template databases
        - Server uptime since postmaster start

        Returns:
            dict: Dictionary containing source-level statistics like version,
            database count, uptime, or an error message if collection fails.
        """

        stats = {}

        try:
            with self.connection.cursor() as cur:
                cur.execute("SELECT version()")
                stats["version"] = cur.fetchone()[0]

                # Get list of databases
                cur.execute("SELECT datname FROM pg_database WHERE datistemplate = false;")
                databases = [row[0] for row in cur.fetchall()]
                stats["database_count"] = len(databases)

                cur.execute(
                    """
                    SELECT now() - pg_postmaster_start_time() AS uptime;
                """
                )
                stats["uptime"] = str(cur.fetchone()[0])

        except Exception as e:
            logging.warning(f"Failed to collect source-level stats: {e}")
            stats["error"] = str(e)

        return stats

    def fetch_database_stats(self, database):
        """
        Collect size and table-level statistics for a specific PostgreSQL database.

        Connects to the given database to retrieve:
        - Total database size in bytes
        - Total number of user tables (excluding system schemas)

        Parameters:
            database (str): Name of the database to analyze.

        Returns:
            dict: Dictionary containing database-level stats such as size (bytes)
            and table count.
        """

        conn = self._db_connect(database)
        cur = conn.cursor()

        stats = {}
        try:
            # Size of database in bytes
            cur.execute("SELECT pg_database_size(%s);", (database,))
            size = cur.fetchone()
            stats["size_bytes"] = size[0] if size else None

            cur.execute(
                """
            SELECT COUNT(*) 
            FROM information_schema.tables 
            WHERE table_schema NOT IN ('pg_catalog', 'information_schema') 
            AND table_type = 'BASE TABLE';
            """
            )

            stats["table_count"] = cur.fetchone()[0]

        except Exception as e:
            logging.warning(f"Error fetching database stats for {database}: {e}")
        finally:
            cur.close()

        return stats

    def fetch_collection_stats(self, database, collection_name, entities, relations):
        """
        Collect summary statistics for a collection within a PostgreSQL database.

        Computes basic counts of entities (tables) and relations to provide
        high-level structural metadata for the data registry.

        Parameters:
            database (str): Name of the database the collection belongs to.
            collection_name (str): Name of the collection or schema.
            entities (list): List of entities (tables) in the collection.
            relations (list): List of relationships among entities.

        Returns:
            dict: Dictionary with counts of entities and relations.
        """

        stats = {}
        num_entities = len(entities)
        num_relations = len(relations)

        stats["num_entities"] = num_entities
        stats["num_relations"] = num_relations

        return stats

    def fetch_entity_stats(self, database, collection, entity):

        conn = self._db_connect(database)
        cursor = conn.cursor()

        stats = {}

        try:
            query = f'SELECT COUNT(*) FROM "{collection}"."{entity}";'
            cursor.execute(query)
            stats["row_count"] = cursor.fetchone()[0]

        except psycopg2.Error as e:
            logging.warning(f"Failed to get row count for {collection}.{entity}: {e}")
            stats["row_count"] = None

        finally:
            self._db_disconnect(conn)

        return stats

    def fetch_property_stats(self, database, collection, table, property_name, sample_limit=10):
        """
        Fetch statistics for a specific column (property) in a PostgreSQL table.

        This method queries both `information_schema.columns` and `pg_stats` to
        gather metadata about the column, including counts, distinct values,
        nulls, sample values, min/max values (when applicable), and most common values.

        Parameters:
            database (str): The database name to connect to.
            collection (str): The schema name (PostgreSQL schema) of the table.
            table (str): The table name containing the property.
            property_name (str): The column name (property) to analyze.
            sample_limit (int, optional): Maximum number of distinct sample values
                to return. Defaults to 10.

        Returns:
            dict: A dictionary containing the following keys:
                - count (int): Number of non-null values in the column.
                - distinct_count (int): Number of distinct non-null values.
                - null_count (int): Number of null values.
                - sample_values (list of str): Up to `sample_limit` distinct sample values.
                - min (str or None): Minimum value (if column type supports it), else None.
                - max (str or None): Maximum value (if column type supports it), else None.
                - most_common_vals (list): List of most common values from `pg_stats`.

        Notes:
            - Min/max values are only computed for numeric, date, timestamp,
            boolean, and enum-like column types.
            - If an error occurs (e.g., invalid table or column), an empty dict is returned.
        """

        conn = self._db_connect(database)
        cursor = conn.cursor()

        schema = collection

        column = f'"{property_name}"'

        try:
            cursor.execute(
                """
                SELECT data_type
                FROM information_schema.columns
                WHERE table_schema = %s AND table_name = %s AND column_name = %s;
            """,
                (schema, table, property_name),
            )
            type_result = cursor.fetchone()
            column_type = type_result[0] if type_result else None

            # Set flags for whether to compute min/max
            include_min_max = column_type in (
                'integer',
                'bigint',
                'smallint',
                'numeric',
                'real',
                'double precision',
                'date',
                'timestamp without time zone',
                'timestamp with time zone',
                'boolean',
                'enum',
            )

            # Build query dynamically
            query = f"""
                SELECT
                COUNT({column}) AS non_null_count,
                COUNT(DISTINCT {column}) AS distinct_count,
                COUNT(*) FILTER (WHERE {column} IS NULL) AS null_count,
                ARRAY(
                    SELECT DISTINCT {column}
                    FROM {table}
                    WHERE {column} IS NOT NULL
                    LIMIT {sample_limit}
                )::text[] AS sample_values
            """

            if include_min_max:
                query += f""",
                    MIN({column})::text AS min_value,
                    MAX({column})::text AS max_value
                """
            else:
                query += ", NULL AS min_value, NULL AS max_value"

            query += f" FROM {table};"

            cursor.execute(query)
            row = cursor.fetchone()

            stats = {
                "count": row[0],
                "distinct_count": row[1],
                "null_count": row[2],
                "sample_values": row[3],
                "min": row[4],
                "max": row[5],
            }

            # Additional query for most_common_vals from pg_stats
            cursor.execute(
                """
                SELECT most_common_vals
                FROM pg_stats
                WHERE schemaname = %s AND tablename = %s AND attname = %s;
            """,
                (schema, table, property_name),
            )

            mc_row = cursor.fetchone()

            if mc_row and mc_row[0]:
                stats["most_common_vals"] = mc_row[0]
            else:
                stats["most_common_vals"] = []

            return stats

        except Exception as e:
            logging.warning(f"Failed to fetch property stats for {collection}.{table}.{property_name}: {str(e)}")
            return {}
        finally:
            self._db_disconnect(conn)

execute_query(query, database=None, collection=None, optional_properties={})

Execute a SQL query on a specific PostgreSQL database and return results as JSON.

This method connects to the specified database, executes the provided SQL query, fetches all results, converts them to a pandas DataFrame, and finally serializes the DataFrame into a JSON array.

Parameters:

Name Type Description Default
query str

The SQL query string to execute.

required
database str

Name of the database to connect to.

None
collection str

Not used for PostgreSQL, kept for interface consistency.

None
optional_properties dict

Additional options. Supported keys: - 'commit' (bool): If True, commits the transaction after execution.

{}

Raises:

Type Description
Exception

If database is not provided.

Returns:

Type Description

list[dict]: Query results serialized as a list of dictionaries (JSON objects), where each dictionary corresponds to a row in the query result.

Source code in blue/data/sources/postgres_source.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def execute_query(self, query, database=None, collection=None, optional_properties={}):
    """
    Execute a SQL query on a specific PostgreSQL database and return results as JSON.

    This method connects to the specified database, executes the provided
    SQL query, fetches all results, converts them to a pandas DataFrame,
    and finally serializes the DataFrame into a JSON array.

    Parameters:
        query (str): The SQL query string to execute.
        database (str): Name of the database to connect to.
        collection (str, optional): Not used for PostgreSQL, kept for interface consistency.
        optional_properties (dict, optional): Additional options. Supported keys:
            - 'commit' (bool): If True, commits the transaction after execution.

    Raises:
        Exception: If `database` is not provided.

    Returns:
        list[dict]: Query results serialized as a list of dictionaries (JSON objects),
                    where each dictionary corresponds to a row in the query result.
    """
    if database is None:
        raise Exception("No database provided")

    # create connection to db
    db_connection = self._db_connect(database)

    cursor = db_connection.cursor()
    cursor.execute(query)
    data = cursor.fetchall()

    # transform to json
    columns = [desc[0] for desc in cursor.description]
    df = pd.DataFrame(data, columns=columns)
    df.fillna(value=np.nan, inplace=True)
    result = json.loads(df.to_json(orient='records'))

    # disconnect
    self._db_disconnect(db_connection)

    return result

fetch_collection_stats(database, collection_name, entities, relations)

Collect summary statistics for a collection within a PostgreSQL database.

Computes basic counts of entities (tables) and relations to provide high-level structural metadata for the data registry.

Parameters:

Name Type Description Default
database str

Name of the database the collection belongs to.

required
collection_name str

Name of the collection or schema.

required
entities list

List of entities (tables) in the collection.

required
relations list

List of relationships among entities.

required

Returns:

Name Type Description
dict

Dictionary with counts of entities and relations.

Source code in blue/data/sources/postgres_source.py
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
def fetch_collection_stats(self, database, collection_name, entities, relations):
    """
    Collect summary statistics for a collection within a PostgreSQL database.

    Computes basic counts of entities (tables) and relations to provide
    high-level structural metadata for the data registry.

    Parameters:
        database (str): Name of the database the collection belongs to.
        collection_name (str): Name of the collection or schema.
        entities (list): List of entities (tables) in the collection.
        relations (list): List of relationships among entities.

    Returns:
        dict: Dictionary with counts of entities and relations.
    """

    stats = {}
    num_entities = len(entities)
    num_relations = len(relations)

    stats["num_entities"] = num_entities
    stats["num_relations"] = num_relations

    return stats

fetch_database_collection_entities(database, collection, max_distinct=50, max_ratio=0.1, max_length=100)

Collect entity (table) and property (column) metadata for a given schema in a PostgreSQL database.

This method retrieves table and column definitions from the information_schema.columns view for a specific schema (collection). It identifies ENUM-backed columns, gathers column-level statistics (distinct counts, average text lengths), and optionally collects representative sample values for categorical string columns.

The method is designed to populate a DataSchema object that models entities and their properties, useful for metadata inspection, schema inference, or automated documentation.

Parameters:

Name Type Description Default
database str

The logical name or connection identifier for the target PostgreSQL database.

required
collection str

The schema name within the database to inspect (e.g., "public").

required
max_distinct int

The maximum number of distinct values to allow for a text column before it is considered non-categorical. Defaults to 50.

50
max_ratio float

The maximum allowed ratio of distinct values to total rows for a column to be considered categorical. Defaults to 0.1.

0.1
max_length int

The maximum average string length allowed for categorical columns. Defaults to 100.

100

Returns:

Name Type Description
dict

A dictionary representation of entities (tables) and their properties (columns), as produced by DataSchema.get_entities().

Raises:

Type Description
Error

If any SQL execution or database interaction fails.

Source code in blue/data/sources/postgres_source.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def fetch_database_collection_entities(self, database, collection, max_distinct=50, max_ratio=0.1, max_length=100):
    """
    Collect entity (table) and property (column) metadata for a given schema in a PostgreSQL database.

    This method retrieves table and column definitions from the `information_schema.columns`
    view for a specific schema (`collection`). It identifies ENUM-backed columns, gathers
    column-level statistics (distinct counts, average text lengths), and optionally collects
    representative sample values for categorical string columns.

    The method is designed to populate a `DataSchema` object that models entities and their
    properties, useful for metadata inspection, schema inference, or automated documentation.

    Parameters:
        database (str):
            The logical name or connection identifier for the target PostgreSQL database.
        collection (str):
            The schema name within the database to inspect (e.g., "public").
        max_distinct (int, optional):
            The maximum number of distinct values to allow for a text column before
            it is considered non-categorical. Defaults to 50.
        max_ratio (float, optional):
            The maximum allowed ratio of distinct values to total rows for a column to
            be considered categorical. Defaults to 0.1.
        max_length (int, optional):
            The maximum average string length allowed for categorical columns. Defaults to 100.

    Returns:
        dict:
            A dictionary representation of entities (tables) and their properties (columns),
            as produced by `DataSchema.get_entities()`.

    Raises:
        psycopg2.Error:
            If any SQL execution or database interaction fails.

    """
    db_connection = self._db_connect(database)

    query = """
    SELECT table_name, column_name, data_type, udt_name
    FROM information_schema.columns
    WHERE table_schema = %s
    """
    cursor = db_connection.cursor()
    cursor.execute(query, (collection,))
    data = cursor.fetchall()

    enum_types = self.fetch_enum_types(db_connection)
    schema = DataSchema()

    for table_name, column_name, data_type, udt_name in data:
        if not schema.has_entity(table_name):
            schema.add_entity(table_name)

        property_def = {"type": data_type}

        if enum_types and udt_name in enum_types:
            property_def["enum"] = enum_types[udt_name]

        if data_type.lower() in ("character varying", "varchar", "character", "text", "name"):

            cursor.execute(
                f"""
              SELECT COUNT(DISTINCT "{column_name}"), COUNT(*) 
              FROM "{collection}"."{table_name}"
              WHERE "{column_name}" IS NOT NULL
             """
            )

            distinct_count, total_count = cursor.fetchone()

            cursor.execute(
                f'''
                SELECT AVG(LENGTH("{column_name}"))
                FROM "{collection}"."{table_name}"
                WHERE "{column_name}" IS NOT NULL
            '''
            )
            avg_length = cursor.fetchone()[0] or 0

            if distinct_count <= max_distinct and (total_count == 0 or distinct_count / total_count <= max_ratio) and avg_length <= max_length:
                cursor.execute(
                    f"""
                    SELECT DISTINCT "{column_name}"
                    FROM "{collection}"."{table_name}"
                    WHERE "{column_name}" IS NOT NULL
                    LIMIT {max_distinct};
                """
                )
                values = [row[0] for row in cursor.fetchall()]
                property_def["values"] = values

        schema.add_entity_property(table_name, column_name, property_def)

    self._db_disconnect(db_connection)
    return schema.get_entities()

fetch_database_collection_metadata(database, collection)

Fetch metadata for a specific schema (collection) in a PostgreSQL database.

Parameters:

Name Type Description Default
database str

Name of the database to connect to.

required
collection str

Name of the schema (collection) whose metadata should be retrieved.

required

Returns:

Name Type Description
dict

Currently returns an empty dictionary.

Source code in blue/data/sources/postgres_source.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def fetch_database_collection_metadata(self, database, collection):
    """
    Fetch metadata for a specific schema (collection) in a PostgreSQL database.

    Parameters:
        database (str):
            Name of the database to connect to.
        collection (str):
            Name of the schema (collection) whose metadata should be retrieved.

    Returns:
        dict:
            Currently returns an empty dictionary.
    """
    return {}

fetch_database_collection_relations(database, collection)

Retrieve relationships (foreign key constraints) between tables in a given schema.

Currently a placeholder method. Intended to extract relational metadata such as foreign key relationships, joins, and dependencies between tables in the specified database and schema/collection.

Parameters:

Name Type Description Default
database str

The database name to inspect.

required
collection str

The schema name within the database.

required

Returns:

Name Type Description
dict

A dictionary representing table relationships. Currently returns an empty dictionary.

Source code in blue/data/sources/postgres_source.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def fetch_database_collection_relations(self, database, collection):
    """
    Retrieve relationships (foreign key constraints) between tables in a given schema.

    Currently a placeholder method. Intended to extract relational metadata
    such as foreign key relationships, joins, and dependencies between tables
    in the specified database and schema/collection.

    Parameters:
        database (str): The database name to inspect.
        collection (str): The schema name within the database.

    Returns:
        dict: A dictionary representing table relationships.
            Currently returns an empty dictionary.
    """
    return {}

fetch_database_collections(database)

Retrieve a list of collections (schemas) within a PostgreSQL database.

This method connects to a specific database and queries the information_schema.tables system catalog to find all distinct schemas that contain tables, excluding system schemas such as 'pg_catalog' and 'information_schema'.

Parameters:

Name Type Description Default
database str

The name of the database to inspect.

required

Returns:

Type Description

list[str]: A list of schema (collection) names within the specified database.

Source code in blue/data/sources/postgres_source.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def fetch_database_collections(self, database):
    """
    Retrieve a list of collections (schemas) within a PostgreSQL database.

    This method connects to a specific database and queries the
    `information_schema.tables` system catalog to find all distinct schemas
    that contain tables, excluding system schemas such as
    `'pg_catalog'` and `'information_schema'`.

    Parameters:
        database (str):
            The name of the database to inspect.

    Returns:
        list[str]:
            A list of schema (collection) names within the specified database.
    """
    # connect to specific database (not source directly)
    db_connection = self._db_connect(database)

    # exclude 'pg_catalog', 'information_schema'
    query = "SELECT DISTINCT table_schema FROM information_schema.tables WHERE table_schema NOT IN ('pg_catalog', 'information_schema');"
    cursor = db_connection.cursor()
    cursor.execute(query)
    data = cursor.fetchall()
    collections = []
    for datum in data:
        collections.append(datum[0])

    # disconnect
    self._db_disconnect(db_connection)
    return collections

fetch_database_metadata(database)

Fetch high-level metadata for a specific PostgreSQL database.

Parameters:

Name Type Description Default
database str

The name of the database for which to fetch metadata.

required

Returns:

Name Type Description
dict

Currently returns an empty dictionary.

Source code in blue/data/sources/postgres_source.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def fetch_database_metadata(self, database):
    """
    Fetch high-level metadata for a specific PostgreSQL database.

    Parameters:
        database (str):
            The name of the database for which to fetch metadata.

    Returns:
        dict:
            Currently returns an empty dictionary.
    """
    return {}

fetch_database_schema(database)

Fetch schema definitions for a specific PostgreSQL database.

Parameters:

Name Type Description Default
database str

Database name.

required

Returns:

Name Type Description
dict

Currently returns an empty dictionary. Can be extended to return table and column definitions.

Source code in blue/data/sources/postgres_source.py
108
109
110
111
112
113
114
115
116
117
118
def fetch_database_schema(self, database):
    """
    Fetch schema definitions for a specific PostgreSQL database.

    Parameters:
        database (str): Database name.

    Returns:
        dict: Currently returns an empty dictionary. Can be extended to return table and column definitions.
    """
    return {}

fetch_database_stats(database)

Collect size and table-level statistics for a specific PostgreSQL database.

Connects to the given database to retrieve: - Total database size in bytes - Total number of user tables (excluding system schemas)

Parameters:

Name Type Description Default
database str

Name of the database to analyze.

required

Returns:

Name Type Description
dict

Dictionary containing database-level stats such as size (bytes)

and table count.

Source code in blue/data/sources/postgres_source.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
def fetch_database_stats(self, database):
    """
    Collect size and table-level statistics for a specific PostgreSQL database.

    Connects to the given database to retrieve:
    - Total database size in bytes
    - Total number of user tables (excluding system schemas)

    Parameters:
        database (str): Name of the database to analyze.

    Returns:
        dict: Dictionary containing database-level stats such as size (bytes)
        and table count.
    """

    conn = self._db_connect(database)
    cur = conn.cursor()

    stats = {}
    try:
        # Size of database in bytes
        cur.execute("SELECT pg_database_size(%s);", (database,))
        size = cur.fetchone()
        stats["size_bytes"] = size[0] if size else None

        cur.execute(
            """
        SELECT COUNT(*) 
        FROM information_schema.tables 
        WHERE table_schema NOT IN ('pg_catalog', 'information_schema') 
        AND table_type = 'BASE TABLE';
        """
        )

        stats["table_count"] = cur.fetchone()[0]

    except Exception as e:
        logging.warning(f"Error fetching database stats for {database}: {e}")
    finally:
        cur.close()

    return stats

fetch_databases()

Retrieve a list of available databases from the PostgreSQL server.

This method queries the system catalog pg_database to list all databases on the connected PostgreSQL instance, excluding template databases such as template0 and template1.

Returns:

Type Description

list[str]: A list of database names available on the server.

Source code in blue/data/sources/postgres_source.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def fetch_databases(self):
    """
    Retrieve a list of available databases from the PostgreSQL server.

    This method queries the system catalog `pg_database` to list all databases
    on the connected PostgreSQL instance, excluding template databases such as
    `template0` and `template1`.

    Returns:
        list[str]:
            A list of database names available on the server.

    """
    query = "SELECT datname FROM pg_database;"
    cursor = self.connection.cursor()
    cursor.execute(query)
    data = cursor.fetchall()
    dbs = []
    for datum in data:
        db = datum[0]
        # ignore template<d> databases
        if db.find("template") == 0:
            continue
        dbs.append(db)
    return dbs

fetch_enum_types(db_connection)

Fetch all PostgreSQL ENUM types and their possible values.

This method queries the system catalog tables (pg_type, pg_enum, and pg_namespace) to collect all user-defined ENUM types and their corresponding labels (values). It excludes system schemas such as pg_catalog and information_schema.

Parameters:

Name Type Description Default
db_connection

A live PostgreSQL database connection object (e.g., from psycopg2).

required

Returns:

Type Description

dict[str, list[str]]: A mapping of enum type names (qualified by schema) to their list of values.

Source code in blue/data/sources/postgres_source.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def fetch_enum_types(self, db_connection):
    """
    Fetch all PostgreSQL ENUM types and their possible values.

    This method queries the system catalog tables (`pg_type`, `pg_enum`,
    and `pg_namespace`) to collect all user-defined ENUM types and
    their corresponding labels (values). It excludes system schemas
    such as `pg_catalog` and `information_schema`.

    Parameters:
        db_connection:
            A live PostgreSQL database connection object (e.g., from psycopg2).

    Returns:
        dict[str, list[str]]:
            A mapping of enum type names (qualified by schema) to their list of values.
    """

    query = """
    SELECT
      n.nspname AS schema,
      t.typname AS type_name,
      e.enumlabel AS enum_value
    FROM
      pg_type t
    JOIN
      pg_enum e ON t.oid = e.enumtypid
    JOIN
      pg_catalog.pg_namespace n ON n.oid = t.typnamespace
    WHERE
      n.nspname NOT IN ('pg_catalog', 'information_schema')
    ORDER BY
      t.typname, e.enumsortorder;
    """
    cursor = db_connection.cursor()
    cursor.execute(query)
    data = cursor.fetchall()

    enum_types = {}

    for schema, type_name, enum_value in data:
        if type_name not in enum_types:
            enum_types[type_name] = []
        enum_types[type_name].append(enum_value)

    return enum_types

fetch_metadata()

Fetch source-level metadata for the PostgreSQL server.

Returns:

Name Type Description
dict

Currently returns an empty dictionary. Can be extended to include server info, version, etc.

Source code in blue/data/sources/postgres_source.py
49
50
51
52
53
54
55
56
def fetch_metadata(self):
    """
    Fetch source-level metadata for the PostgreSQL server.

    Returns:
        dict: Currently returns an empty dictionary. Can be extended to include server info, version, etc.
    """
    return {}

fetch_property_stats(database, collection, table, property_name, sample_limit=10)

Fetch statistics for a specific column (property) in a PostgreSQL table.

This method queries both information_schema.columns and pg_stats to gather metadata about the column, including counts, distinct values, nulls, sample values, min/max values (when applicable), and most common values.

Parameters:

Name Type Description Default
database str

The database name to connect to.

required
collection str

The schema name (PostgreSQL schema) of the table.

required
table str

The table name containing the property.

required
property_name str

The column name (property) to analyze.

required
sample_limit int

Maximum number of distinct sample values to return. Defaults to 10.

10

Returns:

Name Type Description
dict

A dictionary containing the following keys: - count (int): Number of non-null values in the column. - distinct_count (int): Number of distinct non-null values. - null_count (int): Number of null values. - sample_values (list of str): Up to sample_limit distinct sample values. - min (str or None): Minimum value (if column type supports it), else None. - max (str or None): Maximum value (if column type supports it), else None. - most_common_vals (list): List of most common values from pg_stats.

Notes
  • Min/max values are only computed for numeric, date, timestamp, boolean, and enum-like column types.
  • If an error occurs (e.g., invalid table or column), an empty dict is returned.
Source code in blue/data/sources/postgres_source.py
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
def fetch_property_stats(self, database, collection, table, property_name, sample_limit=10):
    """
    Fetch statistics for a specific column (property) in a PostgreSQL table.

    This method queries both `information_schema.columns` and `pg_stats` to
    gather metadata about the column, including counts, distinct values,
    nulls, sample values, min/max values (when applicable), and most common values.

    Parameters:
        database (str): The database name to connect to.
        collection (str): The schema name (PostgreSQL schema) of the table.
        table (str): The table name containing the property.
        property_name (str): The column name (property) to analyze.
        sample_limit (int, optional): Maximum number of distinct sample values
            to return. Defaults to 10.

    Returns:
        dict: A dictionary containing the following keys:
            - count (int): Number of non-null values in the column.
            - distinct_count (int): Number of distinct non-null values.
            - null_count (int): Number of null values.
            - sample_values (list of str): Up to `sample_limit` distinct sample values.
            - min (str or None): Minimum value (if column type supports it), else None.
            - max (str or None): Maximum value (if column type supports it), else None.
            - most_common_vals (list): List of most common values from `pg_stats`.

    Notes:
        - Min/max values are only computed for numeric, date, timestamp,
        boolean, and enum-like column types.
        - If an error occurs (e.g., invalid table or column), an empty dict is returned.
    """

    conn = self._db_connect(database)
    cursor = conn.cursor()

    schema = collection

    column = f'"{property_name}"'

    try:
        cursor.execute(
            """
            SELECT data_type
            FROM information_schema.columns
            WHERE table_schema = %s AND table_name = %s AND column_name = %s;
        """,
            (schema, table, property_name),
        )
        type_result = cursor.fetchone()
        column_type = type_result[0] if type_result else None

        # Set flags for whether to compute min/max
        include_min_max = column_type in (
            'integer',
            'bigint',
            'smallint',
            'numeric',
            'real',
            'double precision',
            'date',
            'timestamp without time zone',
            'timestamp with time zone',
            'boolean',
            'enum',
        )

        # Build query dynamically
        query = f"""
            SELECT
            COUNT({column}) AS non_null_count,
            COUNT(DISTINCT {column}) AS distinct_count,
            COUNT(*) FILTER (WHERE {column} IS NULL) AS null_count,
            ARRAY(
                SELECT DISTINCT {column}
                FROM {table}
                WHERE {column} IS NOT NULL
                LIMIT {sample_limit}
            )::text[] AS sample_values
        """

        if include_min_max:
            query += f""",
                MIN({column})::text AS min_value,
                MAX({column})::text AS max_value
            """
        else:
            query += ", NULL AS min_value, NULL AS max_value"

        query += f" FROM {table};"

        cursor.execute(query)
        row = cursor.fetchone()

        stats = {
            "count": row[0],
            "distinct_count": row[1],
            "null_count": row[2],
            "sample_values": row[3],
            "min": row[4],
            "max": row[5],
        }

        # Additional query for most_common_vals from pg_stats
        cursor.execute(
            """
            SELECT most_common_vals
            FROM pg_stats
            WHERE schemaname = %s AND tablename = %s AND attname = %s;
        """,
            (schema, table, property_name),
        )

        mc_row = cursor.fetchone()

        if mc_row and mc_row[0]:
            stats["most_common_vals"] = mc_row[0]
        else:
            stats["most_common_vals"] = []

        return stats

    except Exception as e:
        logging.warning(f"Failed to fetch property stats for {collection}.{table}.{property_name}: {str(e)}")
        return {}
    finally:
        self._db_disconnect(conn)

fetch_schema()

Fetch global schema metadata for the PostgreSQL source.

Returns:

Name Type Description
dict

Currently returns an empty dictionary. Can be extended to include schema definitions.

Source code in blue/data/sources/postgres_source.py
58
59
60
61
62
63
64
65
def fetch_schema(self):
    """
    Fetch global schema metadata for the PostgreSQL source.

    Returns:
        dict: Currently returns an empty dictionary. Can be extended to include schema definitions.
    """
    return {}

fetch_source_stats()

Collect high-level statistics about the PostgreSQL source connection.

Retrieves general metadata such as: - PostgreSQL version - Number of non-template databases - Server uptime since postmaster start

Returns:

Name Type Description
dict

Dictionary containing source-level statistics like version,

database count, uptime, or an error message if collection fails.

Source code in blue/data/sources/postgres_source.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def fetch_source_stats(self):
    """
    Collect high-level statistics about the PostgreSQL source connection.

    Retrieves general metadata such as:
    - PostgreSQL version
    - Number of non-template databases
    - Server uptime since postmaster start

    Returns:
        dict: Dictionary containing source-level statistics like version,
        database count, uptime, or an error message if collection fails.
    """

    stats = {}

    try:
        with self.connection.cursor() as cur:
            cur.execute("SELECT version()")
            stats["version"] = cur.fetchone()[0]

            # Get list of databases
            cur.execute("SELECT datname FROM pg_database WHERE datistemplate = false;")
            databases = [row[0] for row in cur.fetchall()]
            stats["database_count"] = len(databases)

            cur.execute(
                """
                SELECT now() - pg_postmaster_start_time() AS uptime;
            """
            )
            stats["uptime"] = str(cur.fetchone()[0])

    except Exception as e:
        logging.warning(f"Failed to collect source-level stats: {e}")
        stats["error"] = str(e)

    return stats
Last update: 2025-10-09