Skip to content

Nl2sql operator

NL2SQLOperator

Bases: Operator, ServiceClient

NL2SQL operator translates natural language questions into SQL queries using LLM models. It can also execute the generated SQL query against the specified database and return the results.

Attributes:

Name Type Required Default Description
source str "" Data source name
question str Natural language question to translate to SQL
protocol str "postgres" Database protocol (postgres, mysql, sqlite)
database str "" Database name
collection str "" Collection/schema name
case_insensitive bool True Case insensitive string matching
additional_requirements str "" Additional requirements for SQL generation
context str "" Optional context for domain knowledge
schema str "" JSON string of database schema (optional - will be fetched automatically if not provided)
attr_names list[str] [] Optional list of target field names for the output objects
Source code in blue/operators/nl2sql_operator.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
class NL2SQLOperator(Operator, ServiceClient):
    """
    NL2SQL operator translates natural language questions into SQL queries using LLM models.
    It can also execute the generated SQL query against the specified database and return the results.

    Attributes:
    ----------
    | Name                  | Type         | Required | Default   | Description                                                                 |
    |-----------------------|--------------|----------|-----------|-----------------------------------------------------------------------------|
    | `source`              | str          | :fontawesome-solid-circle-check: {.green-check}     | ""        | Data source name                                                            |
    | `question`            | str          | :fontawesome-solid-circle-check: {.green-check}     |           | Natural language question to translate to SQL                               |
    | `protocol`            | str          | :fontawesome-solid-circle-check: {.green-check}     | "postgres"| Database protocol (postgres, mysql, sqlite)                                 |
    | `database`            | str          | :fontawesome-solid-circle-check: {.green-check}     | ""        | Database name                                                               |
    | `collection`          | str          | :fontawesome-solid-circle-check: {.green-check}     | ""        | Collection/schema name                                                      |
    | `case_insensitive`    | bool         |     | True      | Case insensitive string matching                                            |
    | `additional_requirements`| str         |     | ""        | Additional requirements for SQL generation                                  |
    | `context`             | str          |     | ""        | Optional context for domain knowledge                                       |
    | `schema`              | str          |     | ""        | JSON string of database schema (optional - will be fetched automatically if not provided) |
    | `attr_names`          | list[str]    |     | []        | Optional list of target field names for the output objects                 |

    """

    PROMPT = """
Your task is to translate a natural language question into a SQL query based on the provided database schema.

## Here are the requirements:
- The output should be a JSON object with the following fields:
  - "question": the original natural language question
  - "query": the SQL query that is translated from the natural language question
- The SQL query should be compatible with the provided schema.
- The SQL query should be compatible with the syntax of the corresponding database's protocol.
- For enum fields, do not use LOWER(), ILIKE, or other string functions. Compare enum fields using exact equality.
- Always do case-${sensitivity} matching for string comparison.
- The query should start with any of the following prefixes: ${force_query_prefixes}
- When interpreting the "question" use additional context provided, if available.
- Output the JSON directly. Do not generate explanation or other additional output.
${additional_requirements}

## Database Protocol: 
```
${protocol}
```

##Database Schema:
```
${schema}
```

## Context: ${context}

## Question: ${question}

${attr_names_section}

---
## Output (JSON only):
"""

    PROPERTIES = {
        # nl2sql related
        "execute_query": True,
        # "force_query_prefixes": "SELECT",
        "validate_query_prefixes": ["SELECT"],
        # service utils related
        "openai.api": "ChatCompletion",
        "openai.model": "gpt-4o",
        "openai.stream": False,
        "openai.max_tokens": 512,
        "openai.temperature": 0,
        "input_json": "[{\"role\": \"user\"}]",
        "input_context": "$[0]",
        "input_context_field": "content",
        "input_field": "messages",
        "input_template": PROMPT,
        "output_path": "$.choices[0].message.content",
        "service_prefix": "openai",
        "output_transformations": [{"transformation": "replace", "from": "```", "to": ""}, {"transformation": "replace", "from": "json", "to": ""}],
        "output_strip": True,
        "output_cast": "json",
        # connection
        # "connection": {"host": "localhost", "port": 5432, "protocol": "postgres", "user": "postgres", "password": "postgres"},
    }

    name = "nl2sql"
    description = "Translates natural language questions into SQL queries using LLM models"
    default_attributes = {
        "source": {"type": "str", "description": "Data source name", "required": True, "default": ""},
        "question": {"type": "str", "description": "Natural language question to translate to SQL", "required": True},
        "protocol": {"type": "str", "description": "Database protocol (postgres, mysql, sqlite)", "required": True, "default": "postgres"},
        "database": {"type": "str", "description": "Database name", "required": True, "default": ""},
        "collection": {"type": "str", "description": "Collection/schema name", "required": True, "default": ""},
        "case_insensitive": {"type": "bool", "description": "Case insensitive string matching", "required": False, "default": True},
        "additional_requirements": {"type": "str", "description": "Additional requirements for SQL generation", "required": False, "default": ""},
        "context": {"type": "str", "description": "Optional context for domain knowledge", "required": False, "default": ""},
        "schema": {"type": "str", "description": "JSON string of database schema (optional - will be fetched automatically if not provided)", "required": False, "default": ""},
        "attr_names": {"type": "list[str]", "description": "Optional list of target field names for the output objects", "required": False, "default": []},
    }

    def __init__(self, description: str = None, properties: Dict[str, Any] = None):
        super().__init__(
            self.name,
            function=nl2sql_operator_function,
            description=description or self.description,
            properties=properties,
            validator=nl2sql_operator_validator,
            explainer=nl2sql_operator_explainer,
        )

    def _initialize_properties(self):
        super()._initialize_properties()

        # attribute definitions
        self.properties["attributes"] = self.default_attributes

        # service_url, set as default
        self.properties["service_url"] = PROPERTIES["services.openai.service_url"]

    def extract_input_attributes(self, input_data, properties=None):
        """Extract input attributes for template substitution"""
        # For NL2SQL, input_data is a dictionary containing all the template variables
        if isinstance(input_data, dict):
            return input_data
        return {}

extract_input_attributes(input_data, properties=None)

Extract input attributes for template substitution

Source code in blue/operators/nl2sql_operator.py
273
274
275
276
277
278
def extract_input_attributes(self, input_data, properties=None):
    """Extract input attributes for template substitution"""
    # For NL2SQL, input_data is a dictionary containing all the template variables
    if isinstance(input_data, dict):
        return input_data
    return {}

nl2sql_operator_explainer(output, input_data, attributes)

Generate explanation for nl2sql operator execution.

Parameters:

Name Type Description Default
output Any

The output result from the operator execution.

required
input_data List[List[Dict[str, Any]]]

The input data that was processed.

required
attributes Dict[str, Any]

The attributes used for the operation.

required

Returns:

Type Description
Dict[str, Any]

Dictionary containing explanation of the SQL generation and execution operation.

Source code in blue/operators/nl2sql_operator.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def nl2sql_operator_explainer(output: Any, input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any]) -> Dict[str, Any]:
    """Generate explanation for nl2sql operator execution.

    Parameters:
        output: The output result from the operator execution.
        input_data: The input data that was processed.
        attributes: The attributes used for the operation.

    Returns:
        Dictionary containing explanation of the SQL generation and execution operation.
    """
    nl2sql_explanation = {
        'output': output,
        "attributes": attributes,
    }
    return nl2sql_explanation

nl2sql_operator_function(input_data, attributes, properties=None)

Translate natural language questions into SQL queries using LLM models.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]), not used for query processing.

required
attributes Dict[str, Any]

Dictionary containing query parameters including question, source, protocol, database, collection, and other SQL generation settings.

required
properties Dict[str, Any]

Optional properties dictionary containing service configuration and data registry information. Defaults to None.

None

Returns:

Type Description
List[List[Dict[str, Any]]]

List containing SQL query results or the generated SQL query if execution is disabled.

Source code in blue/operators/nl2sql_operator.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def nl2sql_operator_function(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> List[List[Dict[str, Any]]]:
    """Translate natural language questions into SQL queries using LLM models.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]), not used for query processing.
        attributes: Dictionary containing query parameters including question, source, protocol, database, collection, and other SQL generation settings.
        properties: Optional properties dictionary containing service configuration and data registry information. Defaults to None.

    Returns:
        List containing SQL query results or the generated SQL query if execution is disabled.
    """
    question = attributes.get('question', '')
    source = attributes.get('source', '')
    protocol = attributes.get('protocol', 'postgres')
    database = attributes.get('database', '')
    collection = attributes.get('collection', '')
    force_query_prefixes = attributes.get('force_query_prefixes', 'SELECT')
    case_insensitive = attributes.get('case_insensitive', True)
    additional_requirements = attributes.get('additional_requirements', '')
    context = attributes.get('context', '')
    schema = attributes.get('schema', '')
    attr_names = attributes.get('attr_names', [])

    if not question or not question.strip():
        return [[]]

    # protocol, database, collection are required
    if not protocol or not database or not collection:
        raise ValueError("Protocol, database, and collection are required")

    data_registry = _get_data_registry_from_properties(properties)
    if not data_registry:
        return [[]]

    # get schema from data registry
    schema = data_registry.get_data_source_schema(source, database, collection)
    logging.debug("SCHEMA:")
    logging.debug(schema)
    # convert schema to JSON string if it's a dictionary
    if isinstance(schema, dict):
        schema_str = json.dumps(schema, indent=2)
    else:
        schema_str = str(schema)

    execute_query = properties.get('execute_query', True) if properties else True
    validate_query_prefixes = properties.get('validate_query_prefixes', ['SELECT']) if properties else ['SELECT']
    if protocol not in ['postgres', 'mysql', 'sqlite']:
        raise ValueError(f"Unsupported protocol: {protocol}. Supported protocols are: postgres, mysql, sqlite")

    service_client = ServiceClient(name="nl2sql_operator_service_client", properties=properties)

    # Create optional attr_names_section
    attr_names_section = ""
    if attr_names and len(attr_names) > 0:
        attr_names_list = ", ".join(attr_names)
        attr_names_section = f"## Target Field Names (SELECT AS):\nPlease use the following field names as column aliases if it's not same with the column names. Use 'AS' keyword if needed to alias columns:\n{attr_names_list}\n"

    additional_data = {
        'question': question,
        'schema': schema_str,
        'protocol': protocol,
        'sensitivity': 'insensitive' if case_insensitive else 'sensitive',
        'force_query_prefixes': force_query_prefixes,
        'additional_requirements': additional_requirements,
        'context': context,
        'attr_names_section': attr_names_section,
    }
    sql_result = service_client.execute_api_call({}, properties=properties, additional_data=additional_data)

    # Parse the result to get the query
    if isinstance(sql_result, str):
        try:
            sql_data = json.loads(sql_result)
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid JSON response from LLM: {str(e)}")
    elif isinstance(sql_result, dict):
        sql_data = sql_result
    else:
        raise ValueError("Invalid response from LLM: " + str(sql_result))

    generated_query = sql_data.get('query', '')
    if not generated_query:
        raise ValueError("No query found in LLM response")

    # Validate query prefix
    if not any(generated_query.upper().startswith(prefix.upper()) for prefix in validate_query_prefixes):
        raise ValueError(f'Invalid query prefix: {generated_query}')

    # If execution is enabled, execute the generated SQL
    if execute_query and generated_query:
        # use data registry to execute query
        logging.info("Generated Query: " + generated_query)
        result = data_registry.execute_query(generated_query, source, database, collection)
        logging.debug("Result: ")
        logging.debug(result)
        result = _format_execution_result_format(result)
        return result
    # if execution is disabled, return the sql query only
    return [[{"sql": generated_query}]]

nl2sql_operator_validator(input_data, attributes, properties=None)

Validate nl2sql operator attributes.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) to validate.

required
attributes Dict[str, Any]

Dictionary containing operator attributes to validate.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
bool

True if attributes are valid, False otherwise.

Source code in blue/operators/nl2sql_operator.py
124
125
126
127
128
129
130
131
132
133
134
135
def nl2sql_operator_validator(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> bool:
    """Validate nl2sql operator attributes.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) to validate.
        attributes: Dictionary containing operator attributes to validate.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        True if attributes are valid, False otherwise.
    """
    return default_operator_validator(input_data, attributes, properties)
Last update: 2025-10-08