Skip to content

Create table operator

CreateTableOperator

Bases: Operator

Create table operator that creates tables (entities) in database collections

Attributes:

Name Type Required Default Description
source str "default_source" Name of the data source where the table will be created.
database str "default" Name of the database where the table will be created.
collection str "public" Name of the collection where the table will be created. For SQLite sources, defaults to 'public' if not specified.
table str "" Name of the table to be created.
description str "" Description of the table to be created.
properties str {} Properties of the table to be created.
columns list [] Properties of the table to be created.
misc dict {} Miscellaneous keys such as primary and foreign keys.
created_by str "" Creator of the table.
overwrite bool False Whether to overwrite the existing table.
Source code in blue/operators/create_table_operator.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
class CreateTableOperator(Operator):
    """
    Create table operator that creates tables (entities) in database collections

    Attributes:
    ----------
    | Name         | Type  | Required | Default          | Description                                                                 |
    |---------------|-------|-----------|------------------|-----------------------------------------------------------------------------|
    | `source`        | str   | :fontawesome-solid-circle-check: {.green-check}       | "default_source" | Name of the data source where the table will be created.                   |
    | `database`      | str   | :fontawesome-solid-circle-check: {.green-check}       | "default"        | Name of the database where the table will be created.                      |
    | `collection`    | str   |       | "public"         | Name of the collection where the table will be created. For SQLite sources, defaults to 'public' if not specified. |
    | `table`         | str   | :fontawesome-solid-circle-check: {.green-check}       | ""               | Name of the table to be created.                                           |
    | `description`   | str   |       | ""               | Description of the table to be created.                                    |
    | `properties`    | str   |       | {}               | Properties of the table to be created.                                     |
    | `columns`       | list  | :fontawesome-solid-circle-check: {.green-check}       | []               | Properties of the table to be created.                                     |
    | `misc`          | dict  |       | {}               | Miscellaneous keys such as primary and foreign keys.                       |
    | `created_by`    | str   |       | ""               | Creator of the table.                                                      |
    | `overwrite`     | bool  |       | False            | Whether to overwrite the existing table.                                   |

    """

    PROPERTIES = {}

    name = "create_table"
    description = "Creates tables (entities) in database collections using the data registry. If the table already exists, it will be overwritten if overwrite is True."
    default_attributes = {
        "source": {"type": "str", "description": "Name of the data source where the table will be created", "required": True, "default": "default_source"},
        "database": {"type": "str", "description": "Name of the database where the table will be created", "required": True, "default": "default"},
        "collection": {
            "type": "str",
            "description": "Name of the collection where the table will be created. For SQLite sources, defaults to 'public' if not specified",
            "required": False,
            "default": "public",
        },
        "table": {"type": "str", "description": "Name of the table to be created", "required": True, "default": ""},
        "description": {"type": "str", "description": "Description of the table to be created", "required": False, "default": ""},
        "properties": {"type": "str", "description": "Properties of the table to be created", "required": False, "default": {}},
        "columns": {"type": "list", "description": "Properties of the table to be created", "required": True, "default": []},
        "misc": {"type": "dict", "description": "Miscellaneous keys such as primary and foreign keys ", "required": False, "default": {}},
        "created_by": {"type": "str", "description": "Creator of the table", "required": False, "default": ""},
        "overwrite": {"type": "bool", "description": "Whether to overwrite the existing table", "required": False, "default": False},
    }

    def __init__(self, description: str = None, properties: Dict[str, Any] = None):
        super().__init__(
            self.name,
            function=create_table_operator_function,
            description=description or self.description,
            properties=properties,
            validator=create_table_operator_validator,
            explainer=create_table_operator_explainer,
        )

    def _initialize_properties(self):
        super()._initialize_properties()

        # attribute definitions
        self.properties["attributes"] = self.default_attributes

create_table_operator_explainer(output, input_data, attributes)

Generate explanation for create table operator execution.

Parameters:

Name Type Description Default
output Any

The output result from the operator execution.

required
input_data List[List[Dict[str, Any]]]

The input data that was processed.

required
attributes Dict[str, Any]

The attributes used for the operation.

required

Returns:

Type Description
Dict[str, Any]

Dictionary containing explanation of the table creation operation.

Source code in blue/operators/create_table_operator.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def create_table_operator_explainer(output: Any, input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any]) -> Dict[str, Any]:
    """Generate explanation for create table operator execution.

    Parameters:
        output: The output result from the operator execution.
        input_data: The input data that was processed.
        attributes: The attributes used for the operation.

    Returns:
        Dictionary containing explanation of the table creation operation.
    """
    source = attributes.get('source', 'default_source')
    database = attributes.get('database', 'default')
    collection = attributes.get('collection', 'public')
    overwrite = attributes.get('overwrite', False)

    try:
        table_name = input_data[0][0].get('name', '') if input_data and input_data[0] else ''
    except (IndexError, KeyError, TypeError, AttributeError):
        table_name = ''

    create_table_explanation = {
        'input_data': input_data,
        'attributes': attributes,
        'explanation': f"Create table operator {'overwrote' if overwrite else 'created'} table '{table_name}' in database '{database}' collection '{collection}' of source '{source}'.",
    }
    return create_table_explanation

create_table_operator_function(input_data, attributes, properties=None)

Create tables (entities) in database collections using the data registry.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]), passes through unchanged.

required
attributes Dict[str, Any]

Dictionary containing table creation parameters including source, database, collection, table, columns, and other table properties.

required
properties Dict[str, Any]

Optional properties dictionary containing data registry information. Defaults to None.

None

Returns:

Type Description
List[List[Dict[str, Any]]]

List containing the input data passed through unchanged.

Source code in blue/operators/create_table_operator.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def create_table_operator_function(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> List[List[Dict[str, Any]]]:
    """Create tables (entities) in database collections using the data registry.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]), passes through unchanged.
        attributes: Dictionary containing table creation parameters including source, database, collection, table, columns, and other table properties.
        properties: Optional properties dictionary containing data registry information. Defaults to None.

    Returns:
        List containing the input data passed through unchanged.
    """
    # Extract attributes
    overwrite = attributes.get('overwrite', False)
    source = attributes.get('source', 'default_source')
    database = attributes.get('database', 'default')
    collection = attributes.get('collection', 'public')
    table = attributes.get('table')
    table_description = attributes.get('description', '')
    table_properties = attributes.get('properties', {})
    columns = attributes.get('columns')
    misc = attributes.get('misc', {})

    # Get data registry from properties - follow agent pattern
    data_registry = _get_data_registry_from_properties(properties)
    if not data_registry:
        logging.error("Error: Data registry not found")
        # pass through input to output
        return input_data

    # Set collection to 'public' for SQLite sources even caller specifies a different collection
    try:
        source_properties = data_registry.get_source_properties(source)
        if source_properties and 'connection' in source_properties:
            protocol = source_properties['connection'].get('protocol', '')
            if protocol == 'sqlite':
                collection = 'public'  # always use 'public' for SQLite as collection name
    except Exception:
        pass

    try:

        # TODO: modify this after discussion
        # creation related properties
        creation_properties = misc
        creation_properties['cols_definition'] = columns

        # Create the table using data registry
        data_registry.create_source_database_collection_entity(
            source=source,
            database=database,
            collection=collection,
            entity=table,
            properties=table_properties,
            creation_properties=creation_properties,
            overwrite=overwrite,
            rebuild=True,
            recursive=False,
        )

        # Set the description after table creation
        if table_description:
            data_registry.set_source_database_collection_entity_description(source=source, database=database, collection=collection, entity=table, description=table_description, rebuild=True)

        # Set the created_by after table creation
        created_by = attributes.get('created_by')
        if created_by:
            data_registry.set_record_data(name=table, type='entity', scope=f'/source/{source}/database/{database}/collection/{collection}', key='created_by', value=created_by, rebuild=True)

        logging.info(f"Successfully created table '{table}' in database '{database}' collection '{collection}' of source '{source}'.")

        # pass through input to output
        return input_data

    except Exception as e:
        logging.error(traceback.format_exc())
        # pass through input to output
        return input_data

create_table_operator_validator(input_data, attributes, properties=None)

Validate create table operator attributes.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) to validate.

required
attributes Dict[str, Any]

Dictionary containing operator attributes to validate.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
bool

True if attributes are valid, False otherwise.

Source code in blue/operators/create_table_operator.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def create_table_operator_validator(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> bool:
    """Validate create table operator attributes.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) to validate.
        attributes: Dictionary containing operator attributes to validate.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        True if attributes are valid, False otherwise.
    """
    try:
        if not default_operator_validator(input_data, attributes, properties):
            return False
    except Exception:
        return False

    # Check required attributes
    source = attributes.get('source', '')
    if not source or not source.strip():
        return False

    database = attributes.get('database', '')
    if not database or not database.strip():
        return False

    collection = attributes.get('collection', 'public')
    if not collection or not collection.strip():
        return False

    table = attributes.get('table', '')
    if not table or not table.strip():
        return False

    columns = attributes.get('columns', [])
    if not isinstance(columns, list) or not columns:
        return False

    # Validate each column definition
    for column in columns:
        if not isinstance(column, dict):
            return False
        if 'name' not in column or not column['name']:
            return False
        # type is optional but if provided should be a string
        if 'type' in column and not isinstance(column['type'], str):
            return False
        # misc is optional but if provided should be a string
        if 'misc' in column and not isinstance(column['misc'], str):
            return False

    misc = attributes.get('misc', {})
    # Validate primary_key if provided
    primary_key = misc.get('primary_key', [])
    if primary_key:
        if not isinstance(primary_key, list):
            return False
        # Check that all primary key columns exist in columns
        col_names = [col['name'] for col in columns]
        for pk_col in primary_key:
            if pk_col not in col_names:
                return False

    # Validate foreign_keys if provided
    foreign_keys = misc.get('foreign_keys', [])
    if foreign_keys:
        if not isinstance(foreign_keys, list):
            return False
        for fk in foreign_keys:
            if not isinstance(fk, dict):
                return False
            required_fk_fields = ['foreign_keys_source_columns', 'foreign_keys_target_table', 'foreign_keys_target_columns']
            for field in required_fk_fields:
                if field not in fk:
                    return False
            # Check that source columns exist in columns
            col_names = [col['name'] for col in columns]
            for fk_col in fk['foreign_keys_source_columns']:
                if fk_col not in col_names:
                    return False

    return True
Last update: 2025-10-08