Skip to content

Intersect operator

IntersectOperator

Bases: Operator

Intersect operator finds records that exist in all input data sources. Supports key-based matching and sequential matching strategies.

Attributes:

Name Type Required Default Description
match_option str "key_match" Matching strategy for record comparison: 'key_match' (exact field names and values) or 'seq_match' (position-based comparison regardless of field names)
Source code in blue/operators/intersect_operator.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class IntersectOperator(Operator):
    """
    Intersect operator finds records that exist in all input data sources.
    Supports key-based matching and sequential matching strategies.

    Attributes:
    ----------
    | Name         | Type | Required | Default     | Description                                                                                                      |
    |--------------|------|----------|-------------|------------------------------------------------------------------------------------------------------------------|
    | `match_option` | str  |     | "key_match" | Matching strategy for record comparison: 'key_match' (exact field names and values) or 'seq_match' (position-based comparison regardless of field names) |

    """

    PROPERTIES = {}

    name = "intersect"
    description = "Given multiple input data sources, return only records that exist in all data sources"
    default_attributes = {
        "match_option": {
            "type": "str",
            "description": "Matching strategy for record comparison: 'key_match' (exact field names and values) or 'seq_match' (position-based comparison regardless of field names)",
            "required": False,
            "default": "key_match",
        },
    }

    def __init__(self, description: str = None, properties: Dict[str, Any] = None):
        super().__init__(
            self.name,
            function=intersect_operator_function,
            description=description or self.description,
            properties=properties,
            validator=intersect_operator_validator,
            explainer=intersect_operator_explainer,
        )

    def _initialize_properties(self):
        super()._initialize_properties()

        # attribute definitions
        self.properties["attributes"] = self.default_attributes

intersect_operator_explainer(output, input_data, attributes)

Generate explanation for intersect operator execution.

Parameters:

Name Type Description Default
output Any

The output result from the operator execution.

required
input_data List[List[Dict[str, Any]]]

The input data that was processed.

required
attributes Dict[str, Any]

The attributes used for the operation.

required

Returns:

Type Description
Dict[str, Any]

Dictionary containing explanation of the operation.

Source code in blue/operators/intersect_operator.py
60
61
62
63
64
65
66
67
68
69
70
71
def intersect_operator_explainer(output: Any, input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any]) -> Dict[str, Any]:
    """Generate explanation for intersect operator execution.

    Parameters:
        output: The output result from the operator execution.
        input_data: The input data that was processed.
        attributes: The attributes used for the operation.

    Returns:
        Dictionary containing explanation of the operation.
    """
    return default_operator_explainer(output, input_data, attributes)

intersect_operator_function(input_data, attributes, properties=None)

Find records that exist in all input data sources.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) to intersect, requires at least 2 data sources.

required
attributes Dict[str, Any]

Dictionary containing intersection parameters including match_option.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
List[List[Dict[str, Any]]]

List containing records that exist in all data sources.

Source code in blue/operators/intersect_operator.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def intersect_operator_function(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> List[List[Dict[str, Any]]]:
    """Find records that exist in all input data sources.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) to intersect, requires at least 2 data sources.
        attributes: Dictionary containing intersection parameters including match_option.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        List containing records that exist in all data sources.
    """
    # Extract attributes
    match_option = attributes.get('match_option', 'key_match')

    # Validate input
    if not input_data or len(input_data) < 2:
        return []
    non_empty_data = [data for data in input_data if data]
    if len(non_empty_data) < 2:
        return []

    intersect_records = _find_intersect_records(non_empty_data, match_option)
    return [intersect_records]

intersect_operator_validator(input_data, attributes, properties=None)

Validate intersect operator attributes.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) to validate.

required
attributes Dict[str, Any]

Dictionary containing operator attributes to validate.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
bool

True if attributes are valid, False otherwise.

Source code in blue/operators/intersect_operator.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def intersect_operator_validator(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> bool:
    """Validate intersect operator attributes.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) to validate.
        attributes: Dictionary containing operator attributes to validate.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        True if attributes are valid, False otherwise.
    """
    try:
        if not default_operator_validator(input_data, attributes, properties):
            return False
    except Exception:
        return False

    match_option = attributes.get('match_option', 'key_match')
    if match_option not in ['seq_match', 'key_match']:
        return False

    return True
Last update: 2025-10-08