Skip to content

Insert table operator

InsertTableOperator

Bases: Operator

Insert table operator that inserts data rows into database tables

Attributes:

Name Type Required Default Description
source str default_source Name of the data source where the table is located
database str default Name of the database where the table is located
collection str public Name of the collection where the table is located. For SQLite sources, defaults to 'public' if not specified
table str "" Name of the table to insert data into
batch_size int 100 Number of rows to insert in each batch (default: 100)
Source code in blue/operators/insert_table_operator.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
class InsertTableOperator(Operator):
    """
    Insert table operator that inserts data rows into database tables

    Attributes:
    ----------
    | Name       | Type | Required | Default          | Description                                                                 |
    |------------|------|----------|-----------------|-----------------------------------------------------------------------------|
    | `source`     | str  | :fontawesome-solid-circle-check: {.green-check}     | default_source  | Name of the data source where the table is located                          |
    | `database`   | str  | :fontawesome-solid-circle-check: {.green-check}     | default         | Name of the database where the table is located                             |
    | `collection` | str  |     | public          | Name of the collection where the table is located. For SQLite sources, defaults to 'public' if not specified |
    | `table`      | str  | :fontawesome-solid-circle-check: {.green-check}     | ""              | Name of the table to insert data into                                       |
    | `batch_size` | int  |     | 100             | Number of rows to insert in each batch (default: 100)                       |

    """

    PROPERTIES = {}

    name = "insert_table"
    description = "Inserts data rows into database tables in a data source."
    default_attributes = {
        "source": {"type": "str", "description": "Name of the data source where the table is located", "required": True, "default": "default_source"},
        "database": {"type": "str", "description": "Name of the database where the table is located", "required": True, "default": "default"},
        "collection": {
            "type": "str",
            "description": "Name of the collection where the table is located. For SQLite sources, defaults to 'public' if not specified",
            "required": False,
            "default": "public",
        },
        "table": {"type": "str", "description": "Name of the table to insert data into", "required": True, "default": ""},
        "batch_size": {"type": "int", "description": "Number of rows to insert in each batch (default: 100)", "required": False, "default": 100},
    }

    def __init__(self, description: str = None, properties: Dict[str, Any] = None):
        super().__init__(
            self.name,
            function=insert_table_operator_function,
            description=description or self.description,
            properties=properties,
            validator=insert_table_operator_validator,
            explainer=insert_table_operator_explainer,
        )

    def _initialize_properties(self):
        super()._initialize_properties()

        # attribute definitions
        self.properties["attributes"] = self.default_attributes

insert_table_operator_explainer(output, input_data, attributes)

Generate explanation for insert table operator execution.

Parameters:

Name Type Description Default
output Any

The output result from the operator execution.

required
input_data List[List[Dict[str, Any]]]

The input data that was processed.

required
attributes Dict[str, Any]

The attributes used for the operation.

required

Returns:

Type Description
Dict[str, Any]

Dictionary containing explanation of the table insertion operation.

Source code in blue/operators/insert_table_operator.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def insert_table_operator_explainer(output: Any, input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any]) -> Dict[str, Any]:
    """Generate explanation for insert table operator execution.

    Parameters:
        output: The output result from the operator execution.
        input_data: The input data that was processed.
        attributes: The attributes used for the operation.

    Returns:
        Dictionary containing explanation of the table insertion operation.
    """
    source = attributes.get('source', 'default_source')
    database = attributes.get('database', 'default')
    collection = attributes.get('collection', 'public')
    table = attributes.get('table', '')
    batch_size = attributes.get('batch_size', 100)
    total_rows = sum(len(row_group) for row_group in input_data)
    num_data_groups = len(input_data)

    insert_table_explanation = {
        'input_data': input_data,
        'attributes': attributes,
        'explanation': f"Insert table operator inserted {total_rows} data rows from {num_data_groups} data groups into table '{table}' in database '{database}' collection '{collection}' of source '{source}' using batch size {batch_size}.",
    }
    return insert_table_explanation

insert_table_operator_function(input_data, attributes, properties=None)

Insert data rows into database tables in a data source.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) containing records to insert.

required
attributes Dict[str, Any]

Dictionary containing insertion parameters including source, database, collection, table, and batch_size.

required
properties Dict[str, Any]

Optional properties dictionary containing data registry information. Defaults to None.

None

Returns:

Type Description
List[List[Dict[str, Any]]]

List containing the input data passed through unchanged.

Source code in blue/operators/insert_table_operator.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def insert_table_operator_function(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> List[List[Dict[str, Any]]]:
    """Insert data rows into database tables in a data source.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) containing records to insert.
        attributes: Dictionary containing insertion parameters including source, database, collection, table, and batch_size.
        properties: Optional properties dictionary containing data registry information. Defaults to None.

    Returns:
        List containing the input data passed through unchanged.
    """
    # Extract attributes
    source = attributes.get('source', 'default_source')
    database = attributes.get('database', 'default')
    collection = attributes.get('collection', 'public')
    table = attributes.get('table', '')
    batch_size = attributes.get('batch_size', 100)

    # Get data registry from properties - follow agent pattern
    data_registry = _get_data_registry_from_properties(properties)
    if not data_registry:
        logging.error("Error: Data registry not found")
        return input_data

    # Set collection to 'public' for SQLite sources even caller specifies a different collection
    try:
        source_properties = data_registry.get_source_properties(source)
        if source_properties and 'connection' in source_properties:
            protocol = source_properties['connection'].get('protocol', '')
            if protocol == 'sqlite':
                collection = 'public'  # always use 'public' for SQLite as collection name
    except Exception:
        pass

    try:
        # Validate input data
        if not input_data or not input_data[0]:
            return [[]]

        # Process each data group
        total_inserted = 0
        for group_idx, row_group in enumerate(input_data):
            if not row_group:
                continue

            group_inserted = _insert_data_group(data_registry, source, database, collection, table, row_group, batch_size, group_idx)

            if group_inserted is None:
                return [[]]

            total_inserted += group_inserted

        logging.info(f"Successfully inserted {total_inserted} rows into table '{table}' in database '{database}' collection '{collection}' of source '{source}'.")

        # Return summary of inserted data
        # return [[{"table": table, "rows_inserted": total_inserted, "source": source, "database": database, "collection": collection}]]
        return input_data

    except Exception as e:
        logging.error(traceback.format_exc())
        return input_data

insert_table_operator_validator(input_data, attributes, properties=None)

Validate insert table operator attributes.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) to validate.

required
attributes Dict[str, Any]

Dictionary containing operator attributes to validate.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
bool

True if attributes are valid, False otherwise.

Source code in blue/operators/insert_table_operator.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def insert_table_operator_validator(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> bool:
    """Validate insert table operator attributes.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) to validate.
        attributes: Dictionary containing operator attributes to validate.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        True if attributes are valid, False otherwise.
    """
    try:
        if not default_operator_validator(input_data, attributes, properties):
            return False
    except Exception:
        return False

    # Check required attributes
    source = attributes.get('source', '')
    database = attributes.get('database', '')
    table = attributes.get('table', '')

    if not source or not source.strip():
        return False
    if not database or not database.strip():
        return False
    if not table or not table.strip():
        return False

    # Validate batch_size if provided
    batch_size = attributes.get('batch_size', 100)
    if not isinstance(batch_size, int) or batch_size <= 0:
        return False

    return True
Last update: 2025-10-08