Skip to content

Join operator

FieldMetadata

Bases: NamedTuple

Metadata for tracking field provenance

Source code in blue/operators/join_operator.py
150
151
152
153
154
155
156
157
158
class FieldMetadata(NamedTuple):
    """Metadata for tracking field provenance"""

    original_name: str
    data_source: int
    suffix: str
    is_join_key: bool
    join_key_index: int = -1
    is_original: bool = True

JoinOperator

Bases: Operator

Join operator performs N-way join on JSON array datas.

Attributes:

Name Type Required Default Description
join_on list[list[str]] - List of join key lists for each data source
join_type str "inner" Type of join: 'inner', 'left', 'right', 'outer'
join_suffix list[str] [] Suffixes for non-key fields
keep_keys str "left" 'left' to keep left keys only, 'both' to keep both
Source code in blue/operators/join_operator.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class JoinOperator(Operator):
    """
    Join operator performs N-way join on JSON array datas.

    Attributes:
    ----------
    | Name        | Type           | Required | Default | Description                                      |
    |------------|----------------|----------|---------|--------------------------------------------------|
    | `join_on`    | list[list[str]] | :fontawesome-solid-circle-check: {.green-check}     | -       | List of join key lists for each data source     |
    | `join_type`  | str             |     | "inner" | Type of join: 'inner', 'left', 'right', 'outer'|
    | `join_suffix`| list[str]       |     | []      | Suffixes for non-key fields                      |
    | `keep_keys`  | str             |     | "left"  | 'left' to keep left keys only, 'both' to keep both |
    """

    PROPERTIES = {}

    name = "join"
    description = "Joins multiple JSON array data sources using N-way join operations"
    default_attributes = {
        "join_on": {"type": "list[list[str]]", "description": "List of join key lists for each data source", "required": True},
        "join_type": {"type": "str", "description": "Type of join: 'inner', 'left', 'right', 'outer'", "required": False, "default": "inner"},
        "join_suffix": {"type": "list[str]", "description": "Suffixes for non-key fields", "required": False, "default": []},
        "keep_keys": {"type": "str", "description": "'left' to keep left keys only, 'both' to keep both", "required": False, "default": "left"},
    }

    def __init__(self, description: str = None, properties: Dict[str, Any] = None):
        super().__init__(
            self.name,
            function=join_operator_function,
            description=description or self.description,
            properties=properties,
            validator=join_operator_validator,
            explainer=join_operator_explainer,
        )

    def _initialize_properties(self):
        super()._initialize_properties()

        # attribute definitions
        self.properties["attributes"] = self.default_attributes

join_operator_explainer(output, input_data, attributes)

Generate explanation for join operator execution.

Parameters:

Name Type Description Default
output Any

The output result from the operator execution.

required
input_data List[List[Dict[str, Any]]]

The input data that was processed.

required
attributes Dict[str, Any]

The attributes used for the operation.

required

Returns:

Type Description
Dict[str, Any]

Dictionary containing explanation of the operation.

Source code in blue/operators/join_operator.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def join_operator_explainer(output: Any, input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any]) -> Dict[str, Any]:
    """Generate explanation for join operator execution.

    Parameters:
        output: The output result from the operator execution.
        input_data: The input data that was processed.
        attributes: The attributes used for the operation.

    Returns:
        Dictionary containing explanation of the operation.
    """
    return default_operator_explainer(output, input_data, attributes)

join_operator_function(input_data, attributes, properties=None)

Perform N-way join on multiple JSON array data sources.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) to join, requires at least 2 data sources.

required
attributes Dict[str, Any]

Dictionary containing join parameters including join_on, join_type, join_suffix, and keep_keys.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
List[List[Dict[str, Any]]]

List containing the joined records from all data sources.

Source code in blue/operators/join_operator.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def join_operator_function(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> List[List[Dict[str, Any]]]:
    """Perform N-way join on multiple JSON array data sources.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) to join, requires at least 2 data sources.
        attributes: Dictionary containing join parameters including join_on, join_type, join_suffix, and keep_keys.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        List containing the joined records from all data sources.
    """
    join_on = attributes.get('join_on', [])
    join_type = attributes.get('join_type', 'inner')
    join_suffix = attributes.get('join_suffix', [])
    keep_keys = attributes.get('keep_keys', 'left')

    # validation check regarding input data and attributes
    if not input_data or len(input_data) < 2:
        return []
    if len(join_on) != len(input_data):
        return []
    # default suffix is _ds{i} for each data source, in future we can add prefix if needed
    if not join_suffix:
        join_suffix = [f"_ds{i}" for i in range(len(input_data))]
    if len(join_suffix) != len(input_data):
        return []
    if len(join_suffix) != len(set(join_suffix)):  # suffix must be unique
        return []

    # perform n-way join with metadata tracking
    result, schema_info = _perform_n_way_join_with_metadata(input_data, join_on, join_type, join_suffix, keep_keys)

    return [result]

join_operator_validator(input_data, attributes, properties=None)

Validate join operator attributes.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) to validate.

required
attributes Dict[str, Any]

Dictionary containing operator attributes to validate.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
bool

True if attributes are valid, False otherwise.

Source code in blue/operators/join_operator.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def join_operator_validator(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> bool:
    """Validate join operator attributes.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) to validate.
        attributes: Dictionary containing operator attributes to validate.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        True if attributes are valid, False otherwise.
    """
    try:
        if not default_operator_validator(input_data, attributes, properties):
            return False
    except Exception:
        return False

    join_on = attributes.get('join_on', [])
    join_suffix = attributes.get('join_suffix', [])
    keep_keys = attributes.get('keep_keys', 'left')

    if not isinstance(join_on, list) or len(join_on) < 2:
        return False

    for field_list in join_on:
        if not isinstance(field_list, list) or not field_list:
            return False
        for field in field_list:
            if not isinstance(field, str):
                return False

    if join_suffix:
        if not isinstance(join_suffix, list):
            return False
        if len(join_suffix) != len(join_on):
            return False

    if keep_keys not in ['left', 'both']:
        return False

    join_type = attributes.get('join_type', 'inner')
    if join_type not in ['inner', 'left', 'right', 'outer']:
        return False
    return True
Last update: 2025-10-08