Skip to content

Query executor

QueryExecutorAgent

Bases: Agent

An agent that executes queries against a specified data source and database using the DataRegistry. The agent takes input in the form of JSON containing the source, database, and query to execute.

Properties (in addition to Agent properties):

Name Type Default Description
output_filters list of str ['all'] List of output filters to apply to the query result (e.g., 'all', 'question', 'source', 'query', 'result', 'error').
output_max_results int None Maximum number of results to return in the output. If not specified, all results are returned.

Inputs: - DEFAULT: The main input stream where the agent receives query requests in JSON format.

Outputs: - DEFAULT: The output stream where the query results are sent, tagged as QUERY, RESULT, and HIDDEN.

Source code in blue/agents/query_executor.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class QueryExecutorAgent(Agent):
    """
    An agent that executes queries against a specified data source and database using the DataRegistry.
    The agent takes input in the form of JSON containing the source, database, and query to execute.

    Properties (in addition to Agent properties):
    ----------
    | Name           | Type                 | Default | Description |
    |----------------|--------------------|----------|---------|
    | `output_filters` | `list of str`        | `['all']` | List of output filters to apply to the query result (e.g., 'all', 'question', 'source', 'query', 'result', 'error'). |
    | `output_max_results` | `int`                | `None`  | Maximum number of results to return in the output. If not specified, all results are returned. |

    Inputs:
    - `DEFAULT`: The main input stream where the agent receives query requests in JSON format.

    Outputs:
    - `DEFAULT`: The output stream where the query results are sent, tagged as QUERY, RESULT, and HIDDEN.
    """

    def __init__(self, **kwargs):
        if 'name' not in kwargs:
            kwargs['name'] = "QUERYEXECUTOR"
        super().__init__(**kwargs)

    def _start(self):
        """Start the QueryExecutorAgent by initializing the data registry."""
        super()._start()

        # initialize registry
        self._init_registry()

    def _init_registry(self):
        """Initialize the data registry for the QueryExecutorAgent."""
        # create instance of data registry
        platform_id = self.properties["platform.name"]
        prefix = 'PLATFORM:' + platform_id
        self.registry = DataRegistry(id=self.properties['data_registry.name'], prefix=prefix, properties=self.properties)

    ####### inputs / outputs
    def _initialize_inputs(self):
        """Initialize input parameters for the query executor agent."""
        self.add_input("DEFAULT", description="input query")

    def _initialize_outputs(self):
        """Initialize outputs for the query executor agent, tagged as QUERY, RESULT, and HIDDEN."""
        self.add_output("DEFAULT", description="query results", tags=["QUERY", "RESULT", "HIDDEN"])

    def execute_sql_query(self, path, query):
        """Execute a SQL query against the specified data source and database.

        Parameters:
            path: The data source path in the format 'PLATFORM:<platform_id>/<source>/<database>/<collection>'.
            query: The SQL query to execute.

        Returns:
            A dictionary containing the query results or an error message.
        """
        result = None
        question = None
        error = None
        try:
            # extract source, database, collection
            _, source, database, collection = path.split('/')
            # connect
            source_connection = self.registry.connect_source(source)
            # execute query
            result = source_connection.execute_query(query, database=database, collection=collection)
        except Exception as e:
            error = str(e)

        return {'question': question, 'source': path, 'query': query, 'result': result, 'error': error}

    def _apply_filter(self, output):
        """Apply output filters to the query result based on agent properties.

        Parameters:
            output: The output dictionary containing question, source, query, result, and error.

        Returns:
            The filtered output based on specified output filters.
        """
        output_filters = ['all']

        if 'output_filters' in self.properties:
            output_filters = self.properties['output_filters']

        question = output['question']
        source = output['source']
        query = output['query']
        result = output['result']
        error = output['error']

        # max results
        if "output_max_results" in self.properties and self.properties['output_max_results']:
            if isinstance(result, list):
                result = result[: self.properties['output_max_results']]

        message = None
        if 'all' in output_filters:
            message = {'question': question, 'source': source, 'query': query, 'result': result, 'error': error}
        elif len(output_filters) == 1:
            if 'question' in output_filters:
                message = question
            if 'source' in output_filters:
                message = source
            if 'query' in output_filters:
                message = query
            if 'error' in output_filters:
                message = error
            if 'result' in output_filters:
                message = result
        else:
            message = {}
            if 'question' in output_filters:
                message['question'] = question
            if 'source' in output_filters:
                message['source'] = source
            if 'query' in output_filters:
                message['query'] = query
            if 'result' in output_filters:
                message['result'] = result
            if 'error' in output_filters:
                message['error'] = error

        if message:
            return message

    def default_processor(self, message, input="DEFAULT", properties=None, worker=None):
        """Process messages for the query executor agent, executing SQL queries based on input JSON data.

        Parameters:
            message: The message to process.
            input: The input stream label.
            properties: Additional properties for processing.
            worker: The worker handling the processing.

        Returns:
            None or a response message.
        """
        ##### Upon USER/Agent input text
        if input == "DEFAULT":
            if message.isEOS():
                # get all data received from user stream
                stream = message.getStream()

                # extract json
                input = " ".join(worker.get_data(stream))

                # self.logger.info("input: "  + input)

                if worker:
                    if input.strip() != '':
                        try:
                            data = json.loads(input)
                            path = data['source']
                            query = data['query']
                            output = self.execute_sql_query(path, query)

                            worker.write_data(self._apply_filter(output))

                        except:
                            print("Input is not JSON")
                            pass

                    worker.write_eos()

            elif message.isBOS():
                stream = message.getStream()

                # init private stream data to empty array
                if worker:
                    worker.set_data(stream, [])
                pass
            elif message.isData():
                # store data value
                data = message.getData()
                stream = message.getStream()

                if message.getContentType() == ContentType.JSON:
                    # extract path, query
                    path = data['source']
                    query = data['query']
                    output = self.execute_sql_query(path, query)

                    return self._apply_filter(output)
                else:
                    # append to private stream data
                    if worker:
                        worker.append_data(stream, data)

default_processor(message, input='DEFAULT', properties=None, worker=None)

Process messages for the query executor agent, executing SQL queries based on input JSON data.

Parameters:

Name Type Description Default
message

The message to process.

required
input

The input stream label.

'DEFAULT'
properties

Additional properties for processing.

None
worker

The worker handling the processing.

None

Returns:

Type Description

None or a response message.

Source code in blue/agents/query_executor.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def default_processor(self, message, input="DEFAULT", properties=None, worker=None):
    """Process messages for the query executor agent, executing SQL queries based on input JSON data.

    Parameters:
        message: The message to process.
        input: The input stream label.
        properties: Additional properties for processing.
        worker: The worker handling the processing.

    Returns:
        None or a response message.
    """
    ##### Upon USER/Agent input text
    if input == "DEFAULT":
        if message.isEOS():
            # get all data received from user stream
            stream = message.getStream()

            # extract json
            input = " ".join(worker.get_data(stream))

            # self.logger.info("input: "  + input)

            if worker:
                if input.strip() != '':
                    try:
                        data = json.loads(input)
                        path = data['source']
                        query = data['query']
                        output = self.execute_sql_query(path, query)

                        worker.write_data(self._apply_filter(output))

                    except:
                        print("Input is not JSON")
                        pass

                worker.write_eos()

        elif message.isBOS():
            stream = message.getStream()

            # init private stream data to empty array
            if worker:
                worker.set_data(stream, [])
            pass
        elif message.isData():
            # store data value
            data = message.getData()
            stream = message.getStream()

            if message.getContentType() == ContentType.JSON:
                # extract path, query
                path = data['source']
                query = data['query']
                output = self.execute_sql_query(path, query)

                return self._apply_filter(output)
            else:
                # append to private stream data
                if worker:
                    worker.append_data(stream, data)

execute_sql_query(path, query)

Execute a SQL query against the specified data source and database.

Parameters:

Name Type Description Default
path

The data source path in the format 'PLATFORM:///'.

required
query

The SQL query to execute.

required

Returns:

Type Description

A dictionary containing the query results or an error message.

Source code in blue/agents/query_executor.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def execute_sql_query(self, path, query):
    """Execute a SQL query against the specified data source and database.

    Parameters:
        path: The data source path in the format 'PLATFORM:<platform_id>/<source>/<database>/<collection>'.
        query: The SQL query to execute.

    Returns:
        A dictionary containing the query results or an error message.
    """
    result = None
    question = None
    error = None
    try:
        # extract source, database, collection
        _, source, database, collection = path.split('/')
        # connect
        source_connection = self.registry.connect_source(source)
        # execute query
        result = source_connection.execute_query(query, database=database, collection=collection)
    except Exception as e:
        error = str(e)

    return {'question': question, 'source': path, 'query': query, 'result': result, 'error': error}
Last update: 2025-10-09