Skip to content

Union operator

UnionOperator

Bases: Operator

Union operator performs n-way union on multiple data sources with exact duplicate removal. Supports key-based matching and sequential matching strategies.

Attributes:

Name Type Required Default Description
match_option str "key_match" Matching strategy for duplicate detection: 'key_match' (exact field names and values) or 'seq_match' (position-based comparison regardless of field names)
Source code in blue/operators/union_operator.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class UnionOperator(Operator):
    """
    Union operator performs n-way union on multiple data sources with exact duplicate removal.
    Supports key-based matching and sequential matching strategies.

    Attributes:
    ----------
    | Name         | Type | Required | Default    | Description                                                                                   |
    |--------------|------|----------|------------|-----------------------------------------------------------------------------------------------|
    | `match_option` | str  |     | "key_match" | Matching strategy for duplicate detection: 'key_match' (exact field names and values) or 'seq_match' (position-based comparison regardless of field names) |

    """

    PROPERTIES = {}

    name = "union"
    description = "Given multiple input data sources, combine all records and remove exact duplicates"
    default_attributes = {
        "match_option": {
            "type": "str",
            "description": "Matching strategy for duplicate detection: 'key_match' (exact field names and values) or 'seq_match' (position-based comparison regardless of field names)",
            "required": False,
            "default": "key_match",
        },
    }

    def __init__(self, description: str = None, properties: Dict[str, Any] = None):
        super().__init__(
            self.name,
            function=union_operator_function,
            description=description or self.description,
            properties=properties,
            validator=union_operator_validator,
            explainer=union_operator_explainer,
        )

    def _initialize_properties(self):
        super()._initialize_properties()

        # attribute definitions
        self.properties["attributes"] = self.default_attributes

union_operator_explainer(output, input_data, attributes)

Generate explanation for union operator execution.

Parameters:

Name Type Description Default
output Any

The output result from the operator execution.

required
input_data List[List[Dict[str, Any]]]

The input data that was processed.

required
attributes Dict[str, Any]

The attributes used for the operation.

required

Returns:

Type Description
Dict[str, Any]

Dictionary containing explanation of the operation.

Source code in blue/operators/union_operator.py
62
63
64
65
66
67
68
69
70
71
72
73
def union_operator_explainer(output: Any, input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any]) -> Dict[str, Any]:
    """Generate explanation for union operator execution.

    Parameters:
        output: The output result from the operator execution.
        input_data: The input data that was processed.
        attributes: The attributes used for the operation.

    Returns:
        Dictionary containing explanation of the operation.
    """
    return default_operator_explainer(output, input_data, attributes)

union_operator_function(input_data, attributes, properties=None)

Perform N-way union on multiple data sources with exact duplicate removal.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) to union, requires at least 1 data source.

required
attributes Dict[str, Any]

Dictionary containing union parameters including match_option.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
List[List[Dict[str, Any]]]

List containing all unique records from all data sources.

Source code in blue/operators/union_operator.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def union_operator_function(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> List[List[Dict[str, Any]]]:
    """Perform N-way union on multiple data sources with exact duplicate removal.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) to union, requires at least 1 data source.
        attributes: Dictionary containing union parameters including match_option.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        List containing all unique records from all data sources.
    """
    # Extract attributes
    match_option = attributes.get('match_option', 'key_match')

    # Validate input
    if not input_data or len(input_data) < 1:
        return []

    all_records = []
    for data_source in input_data:
        if data_source:
            all_records.extend(data_source)
    unique_records = _remove_duplicates(all_records, match_option)

    return [unique_records]

union_operator_validator(input_data, attributes, properties=None)

Validate union operator attributes.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) to validate.

required
attributes Dict[str, Any]

Dictionary containing operator attributes to validate.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
bool

True if attributes are valid, False otherwise.

Source code in blue/operators/union_operator.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def union_operator_validator(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> bool:
    """Validate union operator attributes.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) to validate.
        attributes: Dictionary containing operator attributes to validate.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        True if attributes are valid, False otherwise.
    """
    try:
        if not default_operator_validator(input_data, attributes, properties):
            return False
    except Exception:
        return False

    match_option = attributes.get('match_option', 'key_match')
    if match_option not in ['seq_match', 'key_match']:
        return False

    return True
Last update: 2025-10-08