Skip to content

Operator

DeclarativeOperator

Bases: Operator

DeclarativeOperator is a specialized Operator that declaratively specifies the execution of the operator as a set of plans. Declarative plans are specified as part of the operator attributes plans which are added to the main plan as part of the planning / refine phase.

Parameters:

Name Type Description Default
properties Dict[str, Any]

Properties dictionary containing plan definitions and other operator properties. Defaults to None.

None
Source code in blue/operators/operator.py
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
class DeclarativeOperator(Operator):
    """DeclarativeOperator is a specialized Operator that declaratively specifies the execution of the operator as a set of plans.
    Declarative plans are specified as part of the operator attributes `plans` which are added to the main plan as part of the planning / refine phase.

    Parameters:
        properties: Properties dictionary containing plan definitions and other operator properties. Defaults to None.
    """

    PROPERTIES = {"plans": []}

    name = "declarative_operator"
    description = "Declaratively specifies the execution of the operator as a set of plans"
    default_attributes = {}

    def __init__(self, properties: Dict[str, Any] = None):
        super().__init__(
            properties['name'] if 'name' in properties else self.name,
            function=declarative_operator_function,
            description=properties['description'] if 'description' in properties else self.description,
            properties=properties,
            validator=declarative_operator_validator,
            explainer=declarative_operator_explainer,
            refiner=declarative_operator_refiner,
        )

    def _initialize_properties(self):
        super()._initialize_properties()

        # refine
        self.properties["refine"] = True

Operator

Bases: Tool

Base class for all operators in the Blue framework.

Data in operator scope refers to JSON array of records (list of dictionaries) in Blue. Operator is a specialized Tool to perform data operations in Blue. Input data for operators: always expects multiple Data as [data_1, data_2, ...] (list of lists of dictionaries) Output data for operators: same as input data, always returns a list of JSON array of records. If there is only one data returned, it will return a list with one element (data).

Parameters:

Name Type Description Default
name str

Name of the operator.

required
function Callable

The operator function to execute. Defaults to default_operator_function.

None
description str

Description of the operator. Defaults to None.

None
properties Dict[str, Any]

Properties dictionary for the operator. Defaults to None.

None
validator Callable

Validation function for operator attributes. Defaults to default_operator_validator.

None
explainer Callable

Explanation function for operator output. Defaults to default_operator_explainer.

None
refiner Callable

Refinement function for operator planning. Defaults to default_operator_refiner.

None
Source code in blue/operators/operator.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
class Operator(Tool):
    """Base class for all operators in the Blue framework.

    Data in operator scope refers to JSON array of records (list of dictionaries) in Blue.
    Operator is a specialized Tool to perform data operations in Blue.
    Input data for operators: always expects multiple Data as [data_1, data_2, ...] (list of lists of dictionaries)
    Output data for operators: same as input data, always returns a list of JSON array of records. If there is only one data returned, it will return a list with one element (data).

    Parameters:
        name: Name of the operator.
        function: The operator function to execute. Defaults to default_operator_function.
        description: Description of the operator. Defaults to None.
        properties: Properties dictionary for the operator. Defaults to None.
        validator: Validation function for operator attributes. Defaults to default_operator_validator.
        explainer: Explanation function for operator output. Defaults to default_operator_explainer.
        refiner: Refinement function for operator planning. Defaults to default_operator_refiner.
    """

    PROPERTIES = {}

    def __init__(
        self, name: str, function: Callable = None, description: str = None, properties: Dict[str, Any] = None, validator: Callable = None, explainer: Callable = None, refiner: Callable = None
    ):
        """Initialize the Operator.

        Parameters:
            name: Name of the operator.
            function: The operator function to execute. Defaults to default_operator_function.
            description: Description of the operator. Defaults to None.
            properties: Properties dictionary for the operator. Defaults to None.
            validator: Validation function for operator attributes. Defaults to default_operator_validator.
            explainer: Explanation function for operator output. Defaults to default_operator_explainer.
            refiner (callable): Refinement function for operator planning. Defaults to default_operator_refiner.
        """
        if function is None:
            function = default_operator_function
        if validator is None:
            validator = default_operator_validator
        if explainer is None:
            explainer = default_operator_explainer
        if refiner is None:
            refiner = default_operator_refiner
        self.refiner = refiner

        super().__init__(name, function, description=description, properties=properties, validator=validator, explainer=explainer)

    def _initialize_properties(self):
        super()._initialize_properties()

        # Tool type
        self.properties["tool_type"] = "operator"

        # Operator identification properties
        self.properties["validate_input"] = True
        self.properties["validate_output"] = True
        self.properties["validation_error_handling"] = "fail"  # fail, log, skip
        # Processing properties
        self.properties["max_records"] = None  # None means no limit
        self.properties["timeout"] = 300  # 5 minutes timeout

        # Error handling properties
        self.properties["error_handling"] = "skip"  # fail, log, skip
        self.properties["log_processing_stats"] = True

        # attribute definitions
        self.properties["attributes"] = {}

        # hyperparameter definitions
        self.properties["hyperparameters"] = {}

        # refine
        self.properties["refine"] = False

        # process default PROPERTIES
        for property in self.PROPERTIES:
            self.properties[property] = self.PROPERTIES[property]

    def _extract_signature(self):
        super()._extract_signature()

        # expand parameters with attribute metadata, in function signature attributes
        signature = self.properties['signature']

        if 'attributes' in signature['parameters']:
            attributes = signature['parameters']['attributes']
            attributes['properties'] = copy.deepcopy(self.properties["attributes"])
            for a in attributes['properties']:
                attribute = attributes['properties'][a]
                attribute['type'] = tool_utils.convert_type_string_to_mcp(attribute['type'])

    def get_attributes(self):
        """Get all operator attributes.

        Returns:
            (dict): Dictionary containing all operator attributes and their definitions.
        """
        return self.properties["attributes"]

    def update_attributes(self, attributes=None):
        """Update operator attributes with new definitions.

        Parameters:
            (dict, None): attributes: Dictionary of attribute definitions to update. Defaults to None.
        """
        if attributes is None:
            return

        # override
        for p in attributes:
            self.properties["attributes"][p] = attributes[p]

        # update signature with updated attributes
        self._extract_signature()

    def get_attribute(self, attribute):
        """Get a specific operator attribute definition.

        Parameters:
            attribute: Name of the attribute to retrieve.

        Returns:
            (dict, None): Dictionary containing the attribute definition, or None if not found.
        """
        attributes = self.get_attributes()
        if attribute in attributes:
            return attributes[attribute]
        return None

    def get_attribute_type(self, attribute):
        """Get the type of a specific operator attribute.

        Parameters:
            attribute: Name of the attribute.

        Returns:
            (str, None): String containing the attribute type, or None if not found.
        """
        attribute = self.get_attribute(attribute)
        if attribute:
            if 'type' in attribute:
                return attribute['type']
        return None

    def get_attribute_description(self, attribute):
        """Get the description of a specific operator attribute.

        Parameters:
            attribute: Name of the attribute.

        Returns:
            (str, None): String containing the attribute description, or None if not found.
        """
        attribute = self.get_attribute(attribute)
        if attribute:
            if 'description' in attribute:
                return attribute['description']
        return None

    def set_attribute_description(self, attribute, description):
        """Set the description of a specific operator attribute.

        Parameters:
            attribute: Name of the attribute.
            description (str): New description for the attribute.
        """
        attribute = self.get_attribute(attribute)
        if attribute:
            attribute['description'] = description
            # update signature with updated attributes
            self._extract_signature()
        return None

    def set_attribute_required(self, attribute, required):
        """Set whether a specific operator attribute is required.

        Parameters:
            attribute: Name of the attribute.
            required (bool): Boolean indicating if the attribute is required.
        """
        attribute = self.get_attribute(attribute)
        if attribute:
            attribute['required'] = required
            # update signature with updated attributes
            self._extract_signature()
        return None

    def set_attribute_hidden(self, attribute, hidden):
        """Set whether a specific operator attribute is hidden.

        Parameters:
            attribute: Name of the attribute.
            hidden: Boolean indicating if the attribute is hidden.
        """
        attribute = self.get_attribute(attribute)
        if attribute:
            attribute['hidden'] = hidden
            # update signature with updated attributes
            self._extract_signature()
        return None

    def is_attribute_required(self, attribute):
        """Check if a specific operator attribute is required.

        Parameters:
            attribute: Name of the attribute.

        Returns:
            Boolean indicating if the attribute is required, or None if not found.
        """
        attribute = self.get_attribute(attribute)
        if attribute:
            if 'required' in attribute:
                return attribute['required']
        return None

    def is_attribute_hidden(self, attribute):
        """Check if a specific operator attribute is hidden.

        Parameters:
            attribute: Name of the attribute.

        Returns:
            Boolean indicating if the attribute is hidden, or None if not found.
        """
        attribute = self.get_attribute(attribute)
        if attribute:
            if 'hidden' in attribute:
                return attribute['hidden']
        return None

    def get_hyperparameters(self):
        """Get all operator hyperparameters.

        Returns:
            Dictionary containing all operator hyperparameters.
        """
        return self.properties["hyperparameters"]

    def update_hyperparameters(self, hyperparameters=None):
        """Update operator hyperparameters with new values.

        Parameters:
            hyperparameters: Dictionary of hyperparameter values to update. Defaults to None.
        """
        if hyperparameters is None:
            return
        # override
        for p in hyperparameters:
            self.properties["hyperparameters"][p] = hyperparameters[p]

    ######### Seperation functions to let LLM or other caller know if it's an operator or a function
    @classmethod
    def is_operator(cls, function_or_operator) -> bool:
        """Check if a tool/operator is actually an operator.

        Parameters:
            function_or_operator: The object to check.

        Returns:
            True if the object is an operator, False otherwise.
        """
        if hasattr(function_or_operator, 'properties'):
            return function_or_operator.properties.get("tool_type") == "operator"
        return False

    @classmethod
    def get_tool_type(cls, function_or_operator) -> str:
        """Get the type of a function/operator.

        Parameters:
            function_or_operator: The object to check.

        Returns:
            String indicating the type: "operator" or "function".
        """
        if cls.is_operator(function_or_operator):
            return "operator"
        return "function"

__init__(name, function=None, description=None, properties=None, validator=None, explainer=None, refiner=None)

Initialize the Operator.

Parameters:

Name Type Description Default
name str

Name of the operator.

required
function Callable

The operator function to execute. Defaults to default_operator_function.

None
description str

Description of the operator. Defaults to None.

None
properties Dict[str, Any]

Properties dictionary for the operator. Defaults to None.

None
validator Callable

Validation function for operator attributes. Defaults to default_operator_validator.

None
explainer Callable

Explanation function for operator output. Defaults to default_operator_explainer.

None
refiner callable

Refinement function for operator planning. Defaults to default_operator_refiner.

None
Source code in blue/operators/operator.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def __init__(
    self, name: str, function: Callable = None, description: str = None, properties: Dict[str, Any] = None, validator: Callable = None, explainer: Callable = None, refiner: Callable = None
):
    """Initialize the Operator.

    Parameters:
        name: Name of the operator.
        function: The operator function to execute. Defaults to default_operator_function.
        description: Description of the operator. Defaults to None.
        properties: Properties dictionary for the operator. Defaults to None.
        validator: Validation function for operator attributes. Defaults to default_operator_validator.
        explainer: Explanation function for operator output. Defaults to default_operator_explainer.
        refiner (callable): Refinement function for operator planning. Defaults to default_operator_refiner.
    """
    if function is None:
        function = default_operator_function
    if validator is None:
        validator = default_operator_validator
    if explainer is None:
        explainer = default_operator_explainer
    if refiner is None:
        refiner = default_operator_refiner
    self.refiner = refiner

    super().__init__(name, function, description=description, properties=properties, validator=validator, explainer=explainer)

get_attribute(attribute)

Get a specific operator attribute definition.

Parameters:

Name Type Description Default
attribute

Name of the attribute to retrieve.

required

Returns:

Type Description
(dict, None)

Dictionary containing the attribute definition, or None if not found.

Source code in blue/operators/operator.py
261
262
263
264
265
266
267
268
269
270
271
272
273
def get_attribute(self, attribute):
    """Get a specific operator attribute definition.

    Parameters:
        attribute: Name of the attribute to retrieve.

    Returns:
        (dict, None): Dictionary containing the attribute definition, or None if not found.
    """
    attributes = self.get_attributes()
    if attribute in attributes:
        return attributes[attribute]
    return None

get_attribute_description(attribute)

Get the description of a specific operator attribute.

Parameters:

Name Type Description Default
attribute

Name of the attribute.

required

Returns:

Type Description
(str, None)

String containing the attribute description, or None if not found.

Source code in blue/operators/operator.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def get_attribute_description(self, attribute):
    """Get the description of a specific operator attribute.

    Parameters:
        attribute: Name of the attribute.

    Returns:
        (str, None): String containing the attribute description, or None if not found.
    """
    attribute = self.get_attribute(attribute)
    if attribute:
        if 'description' in attribute:
            return attribute['description']
    return None

get_attribute_type(attribute)

Get the type of a specific operator attribute.

Parameters:

Name Type Description Default
attribute

Name of the attribute.

required

Returns:

Type Description
(str, None)

String containing the attribute type, or None if not found.

Source code in blue/operators/operator.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def get_attribute_type(self, attribute):
    """Get the type of a specific operator attribute.

    Parameters:
        attribute: Name of the attribute.

    Returns:
        (str, None): String containing the attribute type, or None if not found.
    """
    attribute = self.get_attribute(attribute)
    if attribute:
        if 'type' in attribute:
            return attribute['type']
    return None

get_attributes()

Get all operator attributes.

Returns:

Type Description
dict

Dictionary containing all operator attributes and their definitions.

Source code in blue/operators/operator.py
237
238
239
240
241
242
243
def get_attributes(self):
    """Get all operator attributes.

    Returns:
        (dict): Dictionary containing all operator attributes and their definitions.
    """
    return self.properties["attributes"]

get_hyperparameters()

Get all operator hyperparameters.

Returns:

Type Description

Dictionary containing all operator hyperparameters.

Source code in blue/operators/operator.py
377
378
379
380
381
382
383
def get_hyperparameters(self):
    """Get all operator hyperparameters.

    Returns:
        Dictionary containing all operator hyperparameters.
    """
    return self.properties["hyperparameters"]

get_tool_type(function_or_operator) classmethod

Get the type of a function/operator.

Parameters:

Name Type Description Default
function_or_operator

The object to check.

required

Returns:

Type Description
str

String indicating the type: "operator" or "function".

Source code in blue/operators/operator.py
412
413
414
415
416
417
418
419
420
421
422
423
424
@classmethod
def get_tool_type(cls, function_or_operator) -> str:
    """Get the type of a function/operator.

    Parameters:
        function_or_operator: The object to check.

    Returns:
        String indicating the type: "operator" or "function".
    """
    if cls.is_operator(function_or_operator):
        return "operator"
    return "function"

is_attribute_hidden(attribute)

Check if a specific operator attribute is hidden.

Parameters:

Name Type Description Default
attribute

Name of the attribute.

required

Returns:

Type Description

Boolean indicating if the attribute is hidden, or None if not found.

Source code in blue/operators/operator.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def is_attribute_hidden(self, attribute):
    """Check if a specific operator attribute is hidden.

    Parameters:
        attribute: Name of the attribute.

    Returns:
        Boolean indicating if the attribute is hidden, or None if not found.
    """
    attribute = self.get_attribute(attribute)
    if attribute:
        if 'hidden' in attribute:
            return attribute['hidden']
    return None

is_attribute_required(attribute)

Check if a specific operator attribute is required.

Parameters:

Name Type Description Default
attribute

Name of the attribute.

required

Returns:

Type Description

Boolean indicating if the attribute is required, or None if not found.

Source code in blue/operators/operator.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def is_attribute_required(self, attribute):
    """Check if a specific operator attribute is required.

    Parameters:
        attribute: Name of the attribute.

    Returns:
        Boolean indicating if the attribute is required, or None if not found.
    """
    attribute = self.get_attribute(attribute)
    if attribute:
        if 'required' in attribute:
            return attribute['required']
    return None

is_operator(function_or_operator) classmethod

Check if a tool/operator is actually an operator.

Parameters:

Name Type Description Default
function_or_operator

The object to check.

required

Returns:

Type Description
bool

True if the object is an operator, False otherwise.

Source code in blue/operators/operator.py
398
399
400
401
402
403
404
405
406
407
408
409
410
@classmethod
def is_operator(cls, function_or_operator) -> bool:
    """Check if a tool/operator is actually an operator.

    Parameters:
        function_or_operator: The object to check.

    Returns:
        True if the object is an operator, False otherwise.
    """
    if hasattr(function_or_operator, 'properties'):
        return function_or_operator.properties.get("tool_type") == "operator"
    return False

set_attribute_description(attribute, description)

Set the description of a specific operator attribute.

Parameters:

Name Type Description Default
attribute

Name of the attribute.

required
description str

New description for the attribute.

required
Source code in blue/operators/operator.py
305
306
307
308
309
310
311
312
313
314
315
316
317
def set_attribute_description(self, attribute, description):
    """Set the description of a specific operator attribute.

    Parameters:
        attribute: Name of the attribute.
        description (str): New description for the attribute.
    """
    attribute = self.get_attribute(attribute)
    if attribute:
        attribute['description'] = description
        # update signature with updated attributes
        self._extract_signature()
    return None

set_attribute_hidden(attribute, hidden)

Set whether a specific operator attribute is hidden.

Parameters:

Name Type Description Default
attribute

Name of the attribute.

required
hidden

Boolean indicating if the attribute is hidden.

required
Source code in blue/operators/operator.py
333
334
335
336
337
338
339
340
341
342
343
344
345
def set_attribute_hidden(self, attribute, hidden):
    """Set whether a specific operator attribute is hidden.

    Parameters:
        attribute: Name of the attribute.
        hidden: Boolean indicating if the attribute is hidden.
    """
    attribute = self.get_attribute(attribute)
    if attribute:
        attribute['hidden'] = hidden
        # update signature with updated attributes
        self._extract_signature()
    return None

set_attribute_required(attribute, required)

Set whether a specific operator attribute is required.

Parameters:

Name Type Description Default
attribute

Name of the attribute.

required
required bool

Boolean indicating if the attribute is required.

required
Source code in blue/operators/operator.py
319
320
321
322
323
324
325
326
327
328
329
330
331
def set_attribute_required(self, attribute, required):
    """Set whether a specific operator attribute is required.

    Parameters:
        attribute: Name of the attribute.
        required (bool): Boolean indicating if the attribute is required.
    """
    attribute = self.get_attribute(attribute)
    if attribute:
        attribute['required'] = required
        # update signature with updated attributes
        self._extract_signature()
    return None

update_attributes(attributes=None)

Update operator attributes with new definitions.

Parameters:

Name Type Description Default
(dict, None)

attributes: Dictionary of attribute definitions to update. Defaults to None.

required
Source code in blue/operators/operator.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def update_attributes(self, attributes=None):
    """Update operator attributes with new definitions.

    Parameters:
        (dict, None): attributes: Dictionary of attribute definitions to update. Defaults to None.
    """
    if attributes is None:
        return

    # override
    for p in attributes:
        self.properties["attributes"][p] = attributes[p]

    # update signature with updated attributes
    self._extract_signature()

update_hyperparameters(hyperparameters=None)

Update operator hyperparameters with new values.

Parameters:

Name Type Description Default
hyperparameters

Dictionary of hyperparameter values to update. Defaults to None.

None
Source code in blue/operators/operator.py
385
386
387
388
389
390
391
392
393
394
395
def update_hyperparameters(self, hyperparameters=None):
    """Update operator hyperparameters with new values.

    Parameters:
        hyperparameters: Dictionary of hyperparameter values to update. Defaults to None.
    """
    if hyperparameters is None:
        return
    # override
    for p in hyperparameters:
        self.properties["hyperparameters"][p] = hyperparameters[p]

declarative_operator_explainer(output, input_data, attributes)

Generate explanation for declarative operator execution.

Parameters:

Name Type Description Default
output Any

The output result from the operator execution.

required
input_data List[List[Dict[str, Any]]]

The input data that was processed.

required
attributes Dict[str, Any]

The attributes used for the operation.

required

Returns:

Type Description
Dict[str, Any]

Dictionary containing explanation of the declarative operation.

Source code in blue/operators/operator.py
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
def declarative_operator_explainer(output: Any, input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any]) -> Dict[str, Any]:
    """Generate explanation for declarative operator execution.

    Parameters:
        output: The output result from the operator execution.
        input_data: The input data that was processed.
        attributes: The attributes used for the operation.

    Returns:
        Dictionary containing explanation of the declarative operation.
    """
    declarative_operator_explanation = {
        'output': output,
        'input_data': input_data,
        'attributes': attributes,
        'explanation': f"Declarative operator passed input data to plans specified",
    }
    return declarative_operator_explanation

declarative_operator_function(input_data, attributes, properties=None)

Default function for declarative operator, simply passes execution to sub plans.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) containing records to process.

required
attributes Dict[str, Any]

Dictionary containing operator-specific parameters.

required
properties Dict[str, Any]

Optional properties dictionary containing plan definitions. Defaults to None.

None

Returns:

Type Description
List[List[Dict[str, Any]]]

List containing empty results as default implementation.

Source code in blue/operators/operator.py
522
523
524
525
526
527
528
529
530
531
532
533
534
535
def declarative_operator_function(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> List[List[Dict[str, Any]]]:
    """Default function for declarative operator, simply passes execution to sub plans.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) containing records to process.
        attributes: Dictionary containing operator-specific parameters.
        properties: Optional properties dictionary containing plan definitions. Defaults to None.

    Returns:
        List containing empty results as default implementation.
    """
    # TODO:
    # pass execution to plans
    return [[]]

declarative_operator_refiner(input_data, attributes, properties=None)

Default refiner for declarative operator, returning plans declaratively specified as operator properties.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) containing records to process.

required
attributes Dict[str, Any]

Dictionary containing operator-specific parameters.

required
properties Dict[str, Any]

Optional properties dictionary containing plan definitions. Defaults to None.

None

Returns:

Type Description
List[Dict[str, Any]]

List containing pipeline definitions based on declarative plans.

Source code in blue/operators/operator.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
def declarative_operator_refiner(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> List[Dict[str, Any]]:
    """Default refiner for declarative operator, returning plans declaratively specified as operator properties.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) containing records to process.
        attributes: Dictionary containing operator-specific parameters.
        properties: Optional properties dictionary containing plan definitions. Defaults to None.

    Returns:
        List containing pipeline definitions based on declarative plans.
    """
    plans = properties['plans']

    if plans is None:
        return []

    # process plan specifications to full pipelines
    pipelines = []

    for plan in plans:
        nodes = plan['nodes']
        mappings = {}
        pipeline = DataPipeline(properties=properties)

        plan_input_node = None
        plan_output_node = None
        # first pass, create nodes/entities
        for node_label in nodes:
            node = nodes[node_label]
            node_type = node['type']
            node_id = None
            if node_type == "OPERATOR":
                operator_name = node['name']
                operator_attributes = node['attributes'] if 'attributes' in node else {}
                operator_properties = node['properties'] if 'properties' in node else {}
                operator_node = pipeline.define_operator(operator_name, attributes=operator_attributes, properties=operator_properties)
                node_id = operator_node.get_id()
            elif node_type == "INPUT":
                input_label = node_label
                input_value = node['value'] if 'value' in node else None
                input_properties = node['properties'] if 'properties' in node else {}
                input_node = pipeline.define_input(label=input_label, value=input_value, properties=input_properties)
                node_id = input_node.get_id()
                # if no value specified, designate as plan input node
                if input_value is None:
                    plan_input_node = input_node
                    plan_input_node.set_data("value", input_data)
                else:
                    input_node.set_data("status", str(Status.EXECUTED))
            elif node_type == "OUTPUT":
                output_label = node_label
                output_value = node['value'] if 'value' in node else None
                output_properties = node['properties'] if 'properties' in node else {}
                output_node = pipeline.define_output(label=output_label, value=output_value, properties=output_properties)
                node_id = output_node.get_id()
                # any output is plan output
                plan_output_node = output_node
            # mappings
            mappings[node_label] = node_id
            mappings[node_id] = node_label

        # create plan input and output nodes, if missing
        if plan_input_node is None:
            plan_input_node = pipeline.define_input(value=input_data, properties={})

        if plan_output_node is None:
            plan_output_node = pipeline.define_output(properties={})
        # second pass, connect
        for node_label in nodes:
            node = nodes[node_label]
            node_id = mappings[node_label]
            # prev
            node_prev = node['prev'] if 'prev' in node else []
            for prev_label in node_prev:
                to_id = node_id
                prev_id = mappings[prev_label]
                pipeline.connect_nodes(from_id, to_id)
            # next
            node_next = node['next'] if 'next' in node else []
            for next_label in node_next:
                from_id = node_id
                to_id = mappings[next_label]
                pipeline.connect_nodes(from_id, to_id)

        # third pass, operators with no prev, connect to input; no next, connect to output
        operators = pipeline.filter_nodes(filter_node_type=["OPERATOR"])
        for operator_id in operators:
            prev_nodes = pipeline.get_prev_nodes(operator_id)
            next_nodes = pipeline.get_next_nodes(operator_id)
            operator_node = pipeline.get_node(operator_id)
            if len(prev_nodes) == 0:
                # connect to input
                pipeline.connect_nodes(plan_input_node, operator_node)
            if len(next_nodes) == 0:
                # connect to output
                pipeline.connect_nodes(operator_node, plan_output_node)

        # set plan input / output
        pipeline.set_plan_input(plan_input_node)
        pipeline.set_plan_output(plan_output_node)

        # add to pipelines
        pipelines.append(pipeline.to_dict())

    return pipelines

declarative_operator_validator(input_data, attributes, properties=None)

Validate declarative operator attributes.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) to validate.

required
attributes Dict[str, Any]

Dictionary containing operator attributes to validate.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
bool

True if attributes are valid, False otherwise.

Source code in blue/operators/operator.py
645
646
647
648
649
650
651
652
653
654
655
656
def declarative_operator_validator(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> bool:
    """Validate declarative operator attributes.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) to validate.
        attributes: Dictionary containing operator attributes to validate.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        True if attributes are valid, False otherwise.
    """
    return default_operator_validator(input_data, attributes=attributes, properties=properties)

default_attributes_validator(attributes, properties=None)

Validate actual attributes (attributes) using the attribute definitions in properties.

Parameters:

Name Type Description Default
attributes Dict[str, Any]

Dictionary containing actual attribute values to validate.

required
properties Dict[str, Any]

Optional properties dictionary containing attribute definitions. Defaults to None.

None

Returns:

Type Description
bool

True if attributes are valid, False otherwise.

Source code in blue/operators/operator.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def default_attributes_validator(attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> bool:
    """Validate actual attributes (attributes) using the attribute definitions in properties.

    Parameters:
        attributes: Dictionary containing actual attribute values to validate.
        properties: Optional properties dictionary containing attribute definitions. Defaults to None.

    Returns:
        True if attributes are valid, False otherwise.
    """
    # Need to get the attributes definition and validation error handling from properties
    logging.debug("Validating attributes...")
    if properties is None:
        properties = {}
    attributes_def = properties.get("attributes", {})
    validation_error_handling = properties.get("validation_error_handling", "fail")
    # Validate required attributes
    for attrib_name, attrib_def in attributes_def.items():
        # check if required attribute is present
        required = attrib_def.get("required", False)
        if required and attrib_name not in attributes:
            logging.error("Failed for " + attrib_name)
            return False
        # validate attribute type
        if attrib_name in attributes:
            attrib_value = attributes[attrib_name]
            attrib_type = attrib_def.get("type")
            if attrib_type:
                try:
                    if not validate_parameter_type(attrib_value, attrib_type):
                        logging.error("Failed type for " + attrib_name)
                        return False
                except Exception as e:
                    # System failure in validation - handle based on configuration
                    error_msg = f"attribute validation system error for '{attrib_name}': {e}"
                    if validation_error_handling == "fail":
                        # raise validation error
                        logging.error(error_msg)
                        raise e
                    elif validation_error_handling == "log":
                        logging.error(error_msg)
                        logging.error(error_msg)
                        logging.error(attrib_name)
                        return False
                    else:  # skip
                        # Continue with validation (treat as if validation passed)
                        logging.error(error_msg)
                        logging.error(attrib_name)

    return True

default_operator_explainer(output, input_data, attributes)

Default explainer for operator output.

Parameters:

Name Type Description Default
output Any

The output result from the operator execution.

required
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) that was processed.

required
attributes Dict[str, Any]

The attributes used for the operation.

required

Returns:

Type Description
Dict[str, Any]

Dictionary containing explanation of the operation with statistics.

Source code in blue/operators/operator.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def default_operator_explainer(output: Any, input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any]) -> Dict[str, Any]:
    """Default explainer for operator output.

    Parameters:
        output: The output result from the operator execution.
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) that was processed.
        attributes: The attributes used for the operation.

    Returns:
        Dictionary containing explanation of the operation with statistics.
    """
    total_input_records = sum(len(data) for data in input_data)
    output_count = len(output) if isinstance(output, list) else 1

    explanation = {
        "output": output,  # maybe not needed
        "num_input_data": len(input_data),
        "num_input_records": total_input_records,
        "num_input_records_per_data": [len(data) for data in input_data],
        "num_output_data": len(output),
        "num_output_records": output_count,
        "transformation_ratio": output_count / total_input_records if total_input_records > 0 else 0,
        "attributes": attributes,
    }
    return explanation

default_operator_function(input_data, attributes, properties=None)

Default function for operator. It should be overridden by each operator.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) containing records to process.

required
attributes Dict[str, Any]

Dictionary containing operator-specific parameters.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
List[List[Dict[str, Any]]]

Empty list as default implementation.

Source code in blue/operators/operator.py
22
23
24
25
26
27
28
29
30
31
32
33
def default_operator_function(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> List[List[Dict[str, Any]]]:
    """Default function for operator. It should be overridden by each operator.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) containing records to process.
        attributes: Dictionary containing operator-specific parameters.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        Empty list as default implementation.
    """
    return []

default_operator_refiner(input_data, attributes, properties=None)

Default refiner for operator. It should be overridden by each operator.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) containing records to process.

required
attributes Dict[str, Any]

Dictionary containing operator-specific parameters.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
List[Dict[str, Any]]

Empty list as default implementation.

Source code in blue/operators/operator.py
36
37
38
39
40
41
42
43
44
45
46
47
def default_operator_refiner(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> List[Dict[str, Any]]:
    """Default refiner for operator. It should be overridden by each operator.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) containing records to process.
        attributes: Dictionary containing operator-specific parameters.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        Empty list as default implementation.
    """
    return []

default_operator_validator(input_data, attributes, properties=None)

Default validator for operator attributes.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) to validate.

required
attributes Dict[str, Any]

Dictionary containing operator attributes to validate.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
bool

True if attributes are valid, False otherwise.

Source code in blue/operators/operator.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def default_operator_validator(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> bool:
    """Default validator for operator attributes.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) to validate.
        attributes: Dictionary containing operator attributes to validate.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        True if attributes are valid, False otherwise.
    """
    try:
        return default_attributes_validator(attributes, properties)
    except Exception as e:
        # validation error
        return False
Last update: 2025-10-08