Skip to content

Service utils

ServiceClient

Source code in blue/utils/service_utils.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
class ServiceClient:
    def __init__(self, name, properties=None):
        """Initialize ServiceClient to support calling external services.

        Parameters:
            name: Name of the service client.
            properties: Properties for the service client.
        """
        self.name = name

        self._initialize(properties=properties)

    ###### initialization
    def _initialize(self, properties=None):
        self._initialize_properties()
        self._update_properties(properties=properties)

    def _initialize_properties(self):
        self.properties = {}

        # service url
        self.properties['service_url'] = "ws://localhost:8001"

        # input / output processing properties
        self.properties['input_json'] = None
        self.properties['input_context'] = None
        self.properties['input_context_field'] = None
        self.properties['input_field'] = 'input'
        self.properties['output_path'] = 'output'

        # overrride with additional properties if needed
        # properties to pass on to API call should have a prefix of self.name

    def _update_properties(self, properties=None):
        if properties is None:
            return

        # override
        for p in properties:
            self.properties[p] = properties[p]

    def get_properties(self, properties=None):
        """Get properties, overriding with provided properties.

        Parameters:
            properties: Properties to override.

        Returns:
            Merged properties.
        """
        if properties is None:
            properties = {}
        return json_utils.merge_json(self.properties, properties)

    def extract_input_params(self, input_data, properties=None):
        """Extract input parameters from input data based on optional properties.

        Parameters:
            input_data: Input data to extract parameters from.
            properties: Optional properties to use for extraction

        Returns:
            Extracted input parameters.
        """
        properties = self.get_properties(properties=properties)

        return {"input": input_data}

    def extract_output_params(self, output_data, properties=None):
        """Extract output parameters from output data based on optional properties.

        Parameters:
            output_data: Output data to extract parameters from.
            properties: Optional properties to use for extraction

        Returns:
            Extracted output parameters.
        """
        properties = self.get_properties(properties=properties)
        return {}

    def extract_api_properties(self, properties=None):
        """Extract API-related properties based on service prefix.

        Parameters:
            properties: Optional properties to override.

        Returns:
            Extracted API properties.
        """
        properties = self.get_properties(properties=properties)

        api_properties = {}

        # api properties have a prefix of name, e.g. openai.model
        for p in properties:
            if p.find(self.get_service_prefix()) == 0:
                property = p[len(self.get_service_prefix()) + 1 :]
                api_properties[property] = properties[p]

        return api_properties

    def create_message(self, input_data, properties=None, additional_data=None):
        """Create message to send to service based on input data and properties.

        Parameters:
            input_data: Input data to create the message.
            properties: Optional properties to override.
            additional_data: Additional data to be used for creating the message.

        Returns:
            Created message.
        """
        # add properties to pass onto api
        message = self.extract_api_properties(properties=properties)

        properties = self.get_properties(properties=properties)

        if additional_data is None:
            additional_data = {}
        ## prepare input
        if 'input_template' in properties and properties['input_template'] is not None:
            input_template = properties['input_template']
            input_params = self.extract_input_params(input_data, properties=properties)
            input_data = string_utils.safe_substitute(input_template, **properties, **input_params, **additional_data)

        # set input text to message
        input_object = input_data

        if 'input_json' in properties and properties['input_json'] is not None:
            input_object = {}
            if type(properties['input_json']) == str:
                input_object = json.loads(properties['input_json'])
            else:
                input_object = copy.deepcopy(properties['input_json'])

            # set input text in object
            json_utils.json_query_set(input_object, properties['input_context_field'], input_data, context=properties['input_context'])

        message[properties['input_field']] = input_object
        return message

    def create_output(self, response, properties=None):
        """Create output from service response based on properties.

        Parameters:
            response: Service response to create output from.
            properties: Optional properties to override.
        Returns:
            Created output.
        """
        # get properties, overriding with properties provided
        properties = self.get_properties(properties=properties)

        output_data = json_utils.json_query(response, properties['output_path'], single=True)

        # pre-process output from response
        output_data = self._preprocess_output(output_data, properties=properties)

        # apply output template
        if 'output_template' in properties and properties['output_template'] is not None:
            output_template = properties['output_template']
            output_params = self.extract_output_params(output_data, properties=properties)
            output_data = string_utils.safe_substitute(output_template, **properties, **output_params, output=output_data)
        return output_data

    def validate_input(self, input_data, properties=None):
        """Validate input data based on properties.

        Parameters:
            input_data: Input data to validate.
            properties: Optional properties to override.

        Returns:
            True if input is valid, False otherwise.
        """
        # get properties, overriding with properties provided
        properties = self.get_properties(properties=properties)

        return True

    def process_output(self, output_data, properties=None):
        """Process output data based on properties, such as casting.
        Parameters:
            output_data: Output data to process.
            properties: Optional properties to override.

        Returns:
            Processed output data.
        """
        # get properties, overriding with properties provided
        properties = self.get_properties(properties=properties)

        # cast
        if 'output_cast' in properties:
            if properties['output_cast'].lower() == "int":
                output_data = int(output_data)
            elif properties['output_cast'].lower() == "float":
                output_data = float(output_data)
            elif properties['output_cast'].lower() == "json":
                output_data = json.loads(output_data)
            elif properties['output_cast'].lower() == "str":
                output_data = str(output_data)

        return output_data

    def _preprocess_output(self, output_data, properties=None):
        """Preprocess output data based on properties, such as stripping and transformations.

        Parameters:
            output_data: Output data to preprocess.
            properties: Optional properties to override.

        Returns:
            Preprocessed output data.
        """
        # get properties, overriding with properties provided
        properties = self.get_properties(properties=properties)

        # string transformations
        if type(output_data) == str:

            # strip
            if 'output_strip' in properties:
                output_data = output_data.strip()

            # re transformations
            if 'output_transformations' in properties:
                transformations = properties['output_transformations']
                for transformation in transformations:
                    tf = transformation['transformation']
                    if tf == 'replace':
                        tfrom = transformation['from']
                        tto = transformation['to']
                        output_data = output_data.replace(tfrom, tto)
                    elif tf == 'sub':
                        tfrom = transformation['from']
                        tto = transformation['to']
                        tfromre = re.compile(tfrom)
                        ttore = re.compile(tfrom)
                        output_data = re.sub(tfromre, ttore, output_data)

        return output_data

    def execute_api_call(self, input, properties=None, additional_data=None):
        """Execute API call to the service with the given input and properties.

        Parameters:
            input: Input data for the API call.
            properties: Optional properties to override.
            additional_data: Additional data to be used for creating the message for the API call.

        Returns:
            Output from the service after processing.
        """
        # create message from input
        message = self.create_message(input, properties=properties, additional_data=additional_data)

        # serialize message, call service
        url = self.get_service_address(properties=properties)
        m = json.dumps(message)
        r = self.call_service(url, m)

        response = json.loads(r)

        # create output from response
        output = self.create_output(response, properties=properties)

        # process output data
        output = self.process_output(output, properties=properties)

        return output

    def get_service_prefix(self):
        """Get service prefix from properties.

        Returns:
            Service prefix.
        """
        service_prefix = self.name.lower()
        if 'service_prefix' in self.properties:
            service_prefix = self.properties['service_prefix']
        return service_prefix

    def get_service_address(self, properties=None):
        """Get service address (URL) from properties.

        Parameters:
            properties: Optional properties to override.

        Returns:
            Service address (URL).
        """
        properties = self.get_properties(properties=properties)
        if 'service_url' in properties:
            return properties['service_url']
        return None

    def call_service(self, url, data):
        """Call the service at the given URL with the provided data.

        Parameters:
            url: Service URL.
            data: Data to send to the service.

        Returns:
            Response from the service.
        """
        logging.info("sending data to:" + str(url))
        logging.info(str(data))
        with connect(url) as websocket:
            websocket.send(data)
            message = websocket.recv()
            return message

__init__(name, properties=None)

Initialize ServiceClient to support calling external services.

Parameters:

Name Type Description Default
name

Name of the service client.

required
properties

Properties for the service client.

None
Source code in blue/utils/service_utils.py
20
21
22
23
24
25
26
27
28
29
def __init__(self, name, properties=None):
    """Initialize ServiceClient to support calling external services.

    Parameters:
        name: Name of the service client.
        properties: Properties for the service client.
    """
    self.name = name

    self._initialize(properties=properties)

call_service(url, data)

Call the service at the given URL with the provided data.

Parameters:

Name Type Description Default
url

Service URL.

required
data

Data to send to the service.

required

Returns:

Type Description

Response from the service.

Source code in blue/utils/service_utils.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def call_service(self, url, data):
    """Call the service at the given URL with the provided data.

    Parameters:
        url: Service URL.
        data: Data to send to the service.

    Returns:
        Response from the service.
    """
    logging.info("sending data to:" + str(url))
    logging.info(str(data))
    with connect(url) as websocket:
        websocket.send(data)
        message = websocket.recv()
        return message

create_message(input_data, properties=None, additional_data=None)

Create message to send to service based on input data and properties.

Parameters:

Name Type Description Default
input_data

Input data to create the message.

required
properties

Optional properties to override.

None
additional_data

Additional data to be used for creating the message.

None

Returns:

Type Description

Created message.

Source code in blue/utils/service_utils.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def create_message(self, input_data, properties=None, additional_data=None):
    """Create message to send to service based on input data and properties.

    Parameters:
        input_data: Input data to create the message.
        properties: Optional properties to override.
        additional_data: Additional data to be used for creating the message.

    Returns:
        Created message.
    """
    # add properties to pass onto api
    message = self.extract_api_properties(properties=properties)

    properties = self.get_properties(properties=properties)

    if additional_data is None:
        additional_data = {}
    ## prepare input
    if 'input_template' in properties and properties['input_template'] is not None:
        input_template = properties['input_template']
        input_params = self.extract_input_params(input_data, properties=properties)
        input_data = string_utils.safe_substitute(input_template, **properties, **input_params, **additional_data)

    # set input text to message
    input_object = input_data

    if 'input_json' in properties and properties['input_json'] is not None:
        input_object = {}
        if type(properties['input_json']) == str:
            input_object = json.loads(properties['input_json'])
        else:
            input_object = copy.deepcopy(properties['input_json'])

        # set input text in object
        json_utils.json_query_set(input_object, properties['input_context_field'], input_data, context=properties['input_context'])

    message[properties['input_field']] = input_object
    return message

create_output(response, properties=None)

Create output from service response based on properties.

Parameters:

Name Type Description Default
response

Service response to create output from.

required
properties

Optional properties to override.

None

Returns: Created output.

Source code in blue/utils/service_utils.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def create_output(self, response, properties=None):
    """Create output from service response based on properties.

    Parameters:
        response: Service response to create output from.
        properties: Optional properties to override.
    Returns:
        Created output.
    """
    # get properties, overriding with properties provided
    properties = self.get_properties(properties=properties)

    output_data = json_utils.json_query(response, properties['output_path'], single=True)

    # pre-process output from response
    output_data = self._preprocess_output(output_data, properties=properties)

    # apply output template
    if 'output_template' in properties and properties['output_template'] is not None:
        output_template = properties['output_template']
        output_params = self.extract_output_params(output_data, properties=properties)
        output_data = string_utils.safe_substitute(output_template, **properties, **output_params, output=output_data)
    return output_data

execute_api_call(input, properties=None, additional_data=None)

Execute API call to the service with the given input and properties.

Parameters:

Name Type Description Default
input

Input data for the API call.

required
properties

Optional properties to override.

None
additional_data

Additional data to be used for creating the message for the API call.

None

Returns:

Type Description

Output from the service after processing.

Source code in blue/utils/service_utils.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def execute_api_call(self, input, properties=None, additional_data=None):
    """Execute API call to the service with the given input and properties.

    Parameters:
        input: Input data for the API call.
        properties: Optional properties to override.
        additional_data: Additional data to be used for creating the message for the API call.

    Returns:
        Output from the service after processing.
    """
    # create message from input
    message = self.create_message(input, properties=properties, additional_data=additional_data)

    # serialize message, call service
    url = self.get_service_address(properties=properties)
    m = json.dumps(message)
    r = self.call_service(url, m)

    response = json.loads(r)

    # create output from response
    output = self.create_output(response, properties=properties)

    # process output data
    output = self.process_output(output, properties=properties)

    return output

extract_api_properties(properties=None)

Extract API-related properties based on service prefix.

Parameters:

Name Type Description Default
properties

Optional properties to override.

None

Returns:

Type Description

Extracted API properties.

Source code in blue/utils/service_utils.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def extract_api_properties(self, properties=None):
    """Extract API-related properties based on service prefix.

    Parameters:
        properties: Optional properties to override.

    Returns:
        Extracted API properties.
    """
    properties = self.get_properties(properties=properties)

    api_properties = {}

    # api properties have a prefix of name, e.g. openai.model
    for p in properties:
        if p.find(self.get_service_prefix()) == 0:
            property = p[len(self.get_service_prefix()) + 1 :]
            api_properties[property] = properties[p]

    return api_properties

extract_input_params(input_data, properties=None)

Extract input parameters from input data based on optional properties.

Parameters:

Name Type Description Default
input_data

Input data to extract parameters from.

required
properties

Optional properties to use for extraction

None

Returns:

Type Description

Extracted input parameters.

Source code in blue/utils/service_utils.py
73
74
75
76
77
78
79
80
81
82
83
84
85
def extract_input_params(self, input_data, properties=None):
    """Extract input parameters from input data based on optional properties.

    Parameters:
        input_data: Input data to extract parameters from.
        properties: Optional properties to use for extraction

    Returns:
        Extracted input parameters.
    """
    properties = self.get_properties(properties=properties)

    return {"input": input_data}

extract_output_params(output_data, properties=None)

Extract output parameters from output data based on optional properties.

Parameters:

Name Type Description Default
output_data

Output data to extract parameters from.

required
properties

Optional properties to use for extraction

None

Returns:

Type Description

Extracted output parameters.

Source code in blue/utils/service_utils.py
87
88
89
90
91
92
93
94
95
96
97
98
def extract_output_params(self, output_data, properties=None):
    """Extract output parameters from output data based on optional properties.

    Parameters:
        output_data: Output data to extract parameters from.
        properties: Optional properties to use for extraction

    Returns:
        Extracted output parameters.
    """
    properties = self.get_properties(properties=properties)
    return {}

get_properties(properties=None)

Get properties, overriding with provided properties.

Parameters:

Name Type Description Default
properties

Properties to override.

None

Returns:

Type Description

Merged properties.

Source code in blue/utils/service_utils.py
60
61
62
63
64
65
66
67
68
69
70
71
def get_properties(self, properties=None):
    """Get properties, overriding with provided properties.

    Parameters:
        properties: Properties to override.

    Returns:
        Merged properties.
    """
    if properties is None:
        properties = {}
    return json_utils.merge_json(self.properties, properties)

get_service_address(properties=None)

Get service address (URL) from properties.

Parameters:

Name Type Description Default
properties

Optional properties to override.

None

Returns:

Type Description

Service address (URL).

Source code in blue/utils/service_utils.py
303
304
305
306
307
308
309
310
311
312
313
314
315
def get_service_address(self, properties=None):
    """Get service address (URL) from properties.

    Parameters:
        properties: Optional properties to override.

    Returns:
        Service address (URL).
    """
    properties = self.get_properties(properties=properties)
    if 'service_url' in properties:
        return properties['service_url']
    return None

get_service_prefix()

Get service prefix from properties.

Returns:

Type Description

Service prefix.

Source code in blue/utils/service_utils.py
292
293
294
295
296
297
298
299
300
301
def get_service_prefix(self):
    """Get service prefix from properties.

    Returns:
        Service prefix.
    """
    service_prefix = self.name.lower()
    if 'service_prefix' in self.properties:
        service_prefix = self.properties['service_prefix']
    return service_prefix

process_output(output_data, properties=None)

Process output data based on properties, such as casting. Parameters: output_data: Output data to process. properties: Optional properties to override.

Returns:

Type Description

Processed output data.

Source code in blue/utils/service_utils.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def process_output(self, output_data, properties=None):
    """Process output data based on properties, such as casting.
    Parameters:
        output_data: Output data to process.
        properties: Optional properties to override.

    Returns:
        Processed output data.
    """
    # get properties, overriding with properties provided
    properties = self.get_properties(properties=properties)

    # cast
    if 'output_cast' in properties:
        if properties['output_cast'].lower() == "int":
            output_data = int(output_data)
        elif properties['output_cast'].lower() == "float":
            output_data = float(output_data)
        elif properties['output_cast'].lower() == "json":
            output_data = json.loads(output_data)
        elif properties['output_cast'].lower() == "str":
            output_data = str(output_data)

    return output_data

validate_input(input_data, properties=None)

Validate input data based on properties.

Parameters:

Name Type Description Default
input_data

Input data to validate.

required
properties

Optional properties to override.

None

Returns:

Type Description

True if input is valid, False otherwise.

Source code in blue/utils/service_utils.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def validate_input(self, input_data, properties=None):
    """Validate input data based on properties.

    Parameters:
        input_data: Input data to validate.
        properties: Optional properties to override.

    Returns:
        True if input is valid, False otherwise.
    """
    # get properties, overriding with properties provided
    properties = self.get_properties(properties=properties)

    return True
Last update: 2025-10-09