Skip to content

Nl2query router operator

NL2QueryRouterOperator

Bases: Operator

NL2Query router operator refines to the right nl2q operator based on source.

Attributes:

Name Type Required Default Description
search_query str - Natural language query to process
columns list[dict] [] List of attribute specifications (dicts with name and optional type)
execute_query bool True Whether to execute query or just translate NL to query
protocol str "" Protocol of the source
Source code in blue/operators/nl2query_router_operator.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
class NL2QueryRouterOperator(Operator):
    """
    NL2Query router operator refines to the right nl2q operator based on source.

    Attributes:
    ----------
    | Name           | Type        | Required | Default | Description                                                |
    |----------------|------------|---------|---------|------------------------------------------------------------|
    | `search_query`    | str        | :fontawesome-solid-circle-check: {.green-check}    | -       | Natural language query to process                          |
    | `columns`         | list[dict] |    | []      | List of attribute specifications (dicts with name and optional type) |
    | `execute_query`   | bool       |    | True    | Whether to execute query or just translate NL to query    |
    | `protocol`        | str        |    | ""      | Protocol of the source                                     |

    """

    PROPERTIES = {}

    name = "nl2query_router"
    description = "Routees the execution of query, based on source"
    default_attributes = {
        "search_query": {"type": "str", "description": "Natural language query to process", "required": True},
        "columns": {"type": "list[dict]", "description": "List of attribute specifications (dicts with name and optional type)", "required": False, "default": []},
        "execute_query": {"type": "bool", "description": "Whether to execute query or just translate nl to query", "required": False, "default": True},
        "protocol": {"type": "str", "description": "Protocol of the source", "required": False, "default": ""},
    }

    def __init__(self, description: str = None, properties: Dict[str, Any] = None):
        super().__init__(
            self.name,
            function=nl2query_router_operator_function,
            description=description or self.description,
            properties=properties,
            validator=default_attributes_validator,
            explainer=nl2query_router_operator_explainer,
            refiner=nl2query_router_operator_refiner,
        )

    def _initialize_properties(self):
        super()._initialize_properties()

        # attribute definitions
        self.properties["attributes"] = self.default_attributes

        # refine
        self.properties["refine"] = True

nl2query_router_operator_explainer(output, input_data, attributes)

Explain nl2query router operator output.

Source code in blue/operators/nl2query_router_operator.py
163
164
165
166
167
168
169
170
171
def nl2query_router_operator_explainer(output: Any, input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any]) -> Dict[str, Any]:
    """Explain nl2query router operator output."""
    nl2query_router_explanation = {
        'output': output,
        'input_data': input_data,
        'attributes': attributes,
        'explanation': f"routes the execution of query to...",
    }
    return nl2query_router_explanation

nl2query_router_operator_function(input_data, attributes, properties=None)

Route the execution of query to the right nl2q operator based on source.

NL2QueryRouterOperator only does plan refinement. This function simply returns empty output.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]), uses first data source as base.

required
attributes Dict[str, Any]

Dictionary containing operator attributes including search_query, columns, execute_query, protocol.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
List[List[Dict[str, Any]]]

Empty list.

Source code in blue/operators/nl2query_router_operator.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def nl2query_router_operator_function(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> List[List[Dict[str, Any]]]:
    """Route the execution of query to the right nl2q operator based on source.

    NL2QueryRouterOperator only does plan refinement. This function simply returns empty output.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]), uses first data source as base.
        attributes: Dictionary containing operator attributes including search_query, columns, execute_query, protocol.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        Empty list.
    """
    return [[]]

nl2query_router_operator_refiner(input_data, attributes, properties=None)

Refine the nl2query router plan by constructing a data pipeline for each source or collection.

Depending on the protocol of the source/collection, it routes to either nl2llm or nl2sql operator, and may do additional data discovery.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]), each array represents a source or collection to route the query to.

required
attributes Dict[str, Any]

Dictionary containing operator attributes including search_query, columns, execute_query, protocol.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
List[Dict[str, Any]]

List of data pipelines (as dictionaries) representing the refined nl2query router plans.

Source code in blue/operators/nl2query_router_operator.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def nl2query_router_operator_refiner(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> List[Dict[str, Any]]:
    """Refine the nl2query router plan by constructing a data pipeline for each source or collection.

    Depending on the protocol of the source/collection, it routes to either nl2llm or nl2sql operator, and may do additional data discovery.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]), each array represents a source or collection to route the query to.
        attributes: Dictionary containing operator attributes including search_query, columns, execute_query, protocol.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        List of data pipelines (as dictionaries) representing the refined nl2query router plans.
    """
    pipelines = []

    if len(input_data) == 0:
        return pipelines

    elements = input_data[0]

    query = attributes.get('search_query')
    columns = attributes.get('columns')
    execute_query = attributes.get('execute_query', True)
    protocol = attributes.get('protocol', '')

    for element in elements:
        name = element['name']
        type = element['type']
        scope = element['scope']

        parsed = parse_scope(scope)

        # create pipeline for each source
        pipeline = DataPipeline(properties=properties)

        # input
        input_node = pipeline.define_input(value=None)

        # output
        output_node = pipeline.define_output(properties={})

        # set plan input / output
        pipeline.set_plan_input(input_node)
        pipeline.set_plan_output(output_node)

        if type == "source":
            # route based on source
            source = element
            # source name
            source_name = source['name']
            # check source protocol
            if 'properties' not in source:
                continue
            properties = source['properties']
            if 'connection' not in properties:
                continue
            connection = properties['connection']
            if 'protocol' not in connection:
                continue
            protocol = connection['protocol']

            if protocol == "openai":
                nl2llm_attributes = {"source": source_name, "query": query, "attrs": columns}
                # TODO: need to pass specific source to the nl2lmm as well (if there are multiple )
                nl2llm_node = pipeline.define_operator("/server/blue_ray/operator/nl2llm", attributes=nl2llm_attributes, properties={})

                # directly refine to nl2llm
                pipeline.connect_nodes(input_node, nl2llm_node)
                pipeline.connect_nodes(nl2llm_node, output_node)

            elif protocol == "postgres" or protocol == "mysql" or protocol == "sqlite":
                # do further data discovery and route again
                data_discovery_attributes = {
                    "source": source_name,
                    "scope": "/source/" + source_name,
                    "search_query": query,
                    "approximate": True,
                    "concept_type": 'collection',
                    'limit': 1,
                    'use_hierarchical_search': True,
                }
                data_discovery_node = pipeline.define_operator("/server/blue_ray/operator/data_discover", attributes=data_discovery_attributes, properties={})

                nl2query_router_attributes = {"search_query": query, "protocol": protocol, "execute_query": True, "columns": columns}
                nl2query_router_node = pipeline.define_operator("/server/blue_ray/operator/nl2query_router", attributes=nl2query_router_attributes, properties={})

                # firtst data discover then nl2query_router
                pipeline.connect_nodes(input_node, data_discovery_node)
                pipeline.connect_nodes(data_discovery_node, nl2query_router_node)
                pipeline.connect_nodes(nl2query_router_node, output_node)
            else:
                # TODO: support other protocols
                continue

        elif type == "collection":
            source = parsed['source']
            database = parsed['database']
            collection = name

            if protocol == "openai":
                nl2llm_attributes = {"query": query, "attrs": columns}
                nl2llm_node = pipeline.define_operator("/server/blue_ray/operator/nl2llm", attributes=nl2llm_attributes, properties={})

                # directly refine to nl2llm
                pipeline.connect_nodes(input_node, nl2llm_node)
                pipeline.connect_nodes(nl2llm_node, output_node)

            elif protocol == "postgres" or protocol == "mysql" or protocol == "sqlite":
                attr_names = [column['name'] for column in columns]
                nl2sql_attributes = {
                    "question": query,
                    "protocol": protocol,
                    "source": source,
                    "database": database,
                    "collection": collection,
                    "attr_names": attr_names,
                    "execute_query": execute_query,
                }
                nl2sql_node = pipeline.define_operator("/server/blue_ray/operator/nl2sql", attributes=nl2sql_attributes, properties={})

                # directly refine to nl2sql
                pipeline.connect_nodes(input_node, nl2sql_node)
                pipeline.connect_nodes(nl2sql_node, output_node)

        # add to pipelines
        pipelines.append(pipeline.to_dict())

    return pipelines
Last update: 2025-10-09