Skip to content

Semantic transform operator

SemanticTransformOperator

Bases: Operator, ServiceClient

Operator that transforms data into target fields and values using LLM-based transformations.

Attributes:

Name Type Required Default Description
input_meta dict {} Optional metadata about input fields
output_desc dict N/A Required description of target fields to create
strategy str "auto" Execution strategy: 'auto' (automatic cost-based selection), 'per_record' (one LLM call per record), 'distinct_required_values' (deduplicate by distinct values), 'distinct_required_values_with_merged_fields' (merged distinct optimization)
Source code in blue/operators/semantic_transform_operator.py
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
class SemanticTransformOperator(Operator, ServiceClient):
    """
    Operator that transforms data into target fields and values using LLM-based transformations.

    Attributes:
    ----------
    | Name     | Type | Required | Default | Description |
    |----------|------|----------|---------|-------------|
    | `input_meta` | dict |  | {} | Optional metadata about input fields |
    | `output_desc` | dict | :fontawesome-solid-circle-check: {.green-check} | N/A | Required description of target fields to create |
    | `strategy` | str |  | "auto" | Execution strategy: 'auto' (automatic cost-based selection), 'per_record' (one LLM call per record), 'distinct_required_values' (deduplicate by distinct values), 'distinct_required_values_with_merged_fields' (merged distinct optimization) |


    """

    PLAN_RESOLUTION_PROMPT = """## Task
You are a data transformation planner. Analyze the input schema and output requirements to create a transformation plan.

## Input Schema
${schema}

## Input Metadata (if provided)
${input_metadata}

## Sample Data (first 5 records)
${sample_data}

## Output Requirements
${output_requirements}

## Instructions
Create a transformation plan that maps each target field to its source fields and transformation type.

For each target field, determine:
1. **source_fields**: List of input fields needed to create this target field
2. **transformation_type**: Either "field_mapping_without_value_change" or "field_mapping_with_value_change"
   - Use "field_mapping_without_value_change" when you can simply rename/copy a field without any value modification
   - Use "field_mapping_with_value_change" when you need to modify, combine, derive, or transform values

## Output Format
Return a JSON object where each key is a target field name and the value is:
{
  "source_fields": ["field1", "field2", ...],
  "transformation_type": "field_mapping_without_value_change" | "field_mapping_with_value_change"
}

## Examples
- Simple rename: `{"full_name": {"source_fields": ["name"], "transformation_type": "field_mapping_without_value_change"}}`
- Value transformation: `{"age_group": {"source_fields": ["age"], "transformation_type": "field_mapping_with_value_change"}}`
- Field combination: `{"full_address": {"source_fields": ["street", "city", "state"], "transformation_type": "field_mapping_with_value_change"}}`

Return only the JSON object, no additional text.
"""

    TRANSFORM_PROMPT = """## Task
You are a data transformation expert. Transform the input data to match the required output schema.

## Available Input Fields
${schema}

## Target Output Fields
${output_requirements}

## Input Data Record
${input_data}

## Instructions
1. Transform the input data record to create the target output fields
2. Use the field descriptions, optional types, optional required flags, and optional hints to guide your transformations.
3. Ensure output values match the specified data types (str, integer, boolean, etc.) if specified.
4. **Field Inclusion Rules**:
   - If a field is marked as "required: True", it MUST be present in the output with an appropriate value
   - If a field is marked as "required: False", it can be omitted if you cannot reasonably infer its value from the input data
   - If no "required" information is provided, only include fields that you confidently infer or derive from the available input data
5. **Required Field Generation**: If a required field cannot be created from available data, use a null value (None) that satisfies the field type.ck

6. **Conservative Approach**: When in doubt about whether to include a field, err on the side of omitting it rather than generating speculative values
7. **Do NOT generate fake or speculative values** for fields that cannot be reasonably inferred from the input data
8. Return the transformed data as a JSON object

Return only the transformed JSON object, no additional text.
"""

    PROPERTIES = {
        # openai related properties
        "openai.api": "ChatCompletion",
        "openai.model": "gpt-4o",
        "openai.stream": False,
        "openai.max_tokens": 1024,
        "openai.temperature": 0.0,
        # io related properties
        "input_json": "[{\"role\": \"user\"}]",
        "input_context": "$[0]",
        "input_context_field": "content",
        "input_field": "messages",
        "input_template": TRANSFORM_PROMPT,
        "output_path": "$.choices[0].message.content",
        # service related properties
        "service_prefix": "openai",
        # output transformations
        "output_transformations": [{"transformation": "replace", "from": "```", "to": ""}, {"transformation": "replace", "from": "json", "to": ""}],
        "output_strip": True,
        "output_cast": "json",
    }

    name = "semantic_transform"
    description = "Transforms data into target fields and values using LLM-based transformations"
    default_attributes = {
        "input_meta": {"type": "dict", "description": "Optional metadata about input fields", "required": False, "default": {}},
        "output_desc": {"type": "dict", "description": "Required description of target fields to create", "required": True},
        "strategy": {
            "type": "str",
            "description": "Execution strategy: 'auto' (automatic cost-based selection), 'per_record' (one LLM call per record), 'distinct_required_values' (deduplicate by distinct values), 'distinct_required_values_with_merged_fields' (merged distinct optimization).",
            "required": False,
            "default": "auto",
        },
    }

    def __init__(self, description: str = None, properties: Dict[str, Any] = None):
        super().__init__(
            self.name,
            function=semantic_transform_operator_function,
            description=description or self.description,
            properties=properties,
            validator=semantic_transform_operator_validator,
            explainer=semantic_transform_operator_explainer,
        )

    def _initialize_properties(self):
        super()._initialize_properties()
        self.properties["attributes"] = self.default_attributes

        # service_url, set as default
        self.properties["service_url"] = PROPERTIES["services.openai.service_url"]

semantic_transform_operator_explainer(output, input_data, attributes)

Generate explanation for semantic transform operator execution.

Parameters:

Name Type Description Default
output Any

The output result from the operator execution.

required
input_data List[List[Dict[str, Any]]]

The input data that was processed.

required
attributes Dict[str, Any]

The attributes used for the operation.

required

Returns:

Type Description
Dict[str, Any]

Dictionary containing explanation of the operation.

Source code in blue/operators/semantic_transform_operator.py
107
108
109
110
111
112
113
114
115
116
117
118
def semantic_transform_operator_explainer(output: Any, input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any]) -> Dict[str, Any]:
    """Generate explanation for semantic transform operator execution.

    Parameters:
        output: The output result from the operator execution.
        input_data: The input data that was processed.
        attributes: The attributes used for the operation.

    Returns:
        Dictionary containing explanation of the operation.
    """
    return default_operator_explainer(output, input_data, attributes)

semantic_transform_operator_function(input_data, attributes, properties=None)

Transform data into target fields and values using LLM-based transformations.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) containing records to transform.

required
attributes Dict[str, Any]

Dictionary containing transformation parameters including input_meta, output_desc, and strategy.

required
properties Dict[str, Any]

Optional properties dictionary containing service configuration. Defaults to None.

None

Returns:

Type Description
List[List[Dict[str, Any]]]

List containing transformed records with target fields and values.

Source code in blue/operators/semantic_transform_operator.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def semantic_transform_operator_function(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> List[List[Dict[str, Any]]]:
    """Transform data into target fields and values using LLM-based transformations.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) containing records to transform.
        attributes: Dictionary containing transformation parameters including input_meta, output_desc, and strategy.
        properties: Optional properties dictionary containing service configuration. Defaults to None.

    Returns:
        List containing transformed records with target fields and values.
    """
    input_meta = attributes.get('input_meta', {})
    output_desc = attributes.get('output_desc', {})

    if not input_data or not input_data[0] or not output_desc:
        return []

    service_client = ServiceClient(name="semantic_transform_operator_service_client", properties=properties)

    results = []
    for data_group in input_data:
        if not data_group:
            results.append([])
            continue

        # Generate transformation plan and merge into output_desc
        enhanced_output_desc = _generate_transformation_plan(data_group, input_meta, output_desc, service_client, properties)
        print(f"Enhanced output desc: {enhanced_output_desc}")

        # Select transformation execution strategy
        specified_strategy = attributes.get('strategy', 'auto')
        if specified_strategy == 'auto':
            strategy_info = _select_execution_strategy(enhanced_output_desc, data_group)
        else:
            strategy_info = _force_strategy(enhanced_output_desc, data_group, specified_strategy)

        print(f"Strategy info: {strategy_info}")

        # Execute transformation based on strategy
        strategy = strategy_info["strategy"]
        simple_fields = strategy_info["simple_fields"]
        complex_fields = strategy_info["complex_fields"]

        print(f"Selected strategy: {strategy}")
        print(f"Simple fields: {simple_fields}")
        print(f"Complex fields: {complex_fields}")

        if strategy == "simple_rename":
            result = _execute_simple_rename(data_group, enhanced_output_desc, simple_fields)
        elif strategy == "per_record":
            result = _execute_per_record(data_group, enhanced_output_desc, simple_fields, complex_fields, service_client, properties)
        elif strategy == "distinct_required_values":
            result = _execute_distinct_required_values(data_group, enhanced_output_desc, simple_fields, complex_fields, service_client, properties)
        elif strategy == "distinct_required_values_with_merged_fields":
            result = _execute_distinct_required_values_with_merged_fields(data_group, enhanced_output_desc, simple_fields, complex_fields, strategy_info, service_client, properties)
        else:  # fallback to per_record
            result = _execute_per_record(data_group, enhanced_output_desc, simple_fields, complex_fields, service_client, properties)

        results.append(result)

    return results

semantic_transform_operator_validator(input_data, attributes, properties=None)

Validate semantic transform operator attributes.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) to validate.

required
attributes Dict[str, Any]

Dictionary containing operator attributes to validate.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
bool

True if attributes are valid, False otherwise.

Source code in blue/operators/semantic_transform_operator.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def semantic_transform_operator_validator(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> bool:
    """Validate semantic transform operator attributes.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) to validate.
        attributes: Dictionary containing operator attributes to validate.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        True if attributes are valid, False otherwise.
    """
    try:
        if not default_operator_validator(input_data, attributes, properties):
            return False
    except Exception:
        return False

    output_desc = attributes.get('output_desc', {})
    if not output_desc or not isinstance(output_desc, dict):
        return False

    for field_name, field_info in output_desc.items():
        if not isinstance(field_name, str) or not isinstance(field_info, dict):
            return False
        if 'description' not in field_info or not isinstance(field_info['description'], str):
            return False

    return True
Last update: 2025-10-08