Skip to content

Openai source

OpenAISource

Bases: DataSource, ServiceClient

Source code in blue/data/sources/openai_source.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class OpenAISource(DataSource, ServiceClient):
    PROMPT = """
Your task is to process a natural language query and return the results in JSON format.
The response should be a valid JSON array containing the requested information.

Here are the requirements:
- There might be optional context provided for domain knowledge. Use it to assist the query if provided and not empty.
- There might be specificed attr_names, which are the attributes of the objects in the output.
- The output should be a JSON array of objects. Each element is a JSON object with proper attribute value pairs.
- Each object should contain the requested information in a structured format
- When interpreting the query, use additional context provided if provided and not empty.
- Please try to return non-empty output. If the query is not clear, please use your best judgement to return a non-empty output.
- The response should be well-formatted and easy to parse
- Output the JSON directly. Do not generate explanation or other additional output.

Query: ${input}

Attr_names:
${attr_names}

Context:
${context}

Output:
"""

    PROPERTIES = {
        # openai related properties
        "openai.api": "ChatCompletion",
        "openai.model": "gpt-4o",
        "openai.stream": False,
        "openai.max_tokens": 4096,
        "openai.temperature": 0,
        # io related properties (used by requestor operator)
        "input_json": "[{\"role\": \"user\"}]",
        "input_context": "$[0]",
        "input_context_field": "content",
        "input_field": "messages",
        "input_template": PROMPT,
        "output_path": "$.choices[0].message.content",
        # service related properties
        "service_prefix": "openai",
        # output transformations
        "output_transformations": [{"transformation": "replace", "from": "```", "to": ""}, {"transformation": "replace", "from": "json", "to": ""}],
        "output_strip": True,
        "output_cast": "json",
    }

    def __init__(self, name, properties={}):
        super().__init__(name, properties=properties)

    ###### initialization
    def _initialize_properties(self):
        super()._initialize_properties()

        # Initialize default properties
        for key in OpenAISource.PROPERTIES:
            self.properties[key] = OpenAISource.PROPERTIES[key]

    def _update_properties(self, properties=None):
        super()._update_properties(properties)

        if 'connection' in self.properties:
            connection = self.properties['connection']
            self.host = connection.get('host', 'localhost')
            self.port = connection.get('port', 8001)

    ###### connection
    def _initialize_connection_properties(self):
        super()._initialize_connection_properties()

        # set host, port, protocol
        self.properties['connection']['protocol'] = 'openai'

    ###### connection
    def _connect(self, **connection):
        return {}

    def _disconnect(self):
        # OpenAI source doesn't require persistent connection
        return None

    ######### source
    def fetch_metadata(self):
        """
        Fetch general metadata for the OpenAI source.

        Returns:
            dict: Empty dictionary, as OpenAI does not expose metadata like databases.
        """
        return {}

    def fetch_schema(self):
        """
        Fetch the overall schema of the OpenAI source.

        Returns:
            dict: Empty dictionary
        """
        return {}

    ######### database
    def fetch_databases(self):
        """
        Fetch available databases in the OpenAI source.

        Returns:
            list: Contains a single virtual database ['openai'].
        """
        dbs = []
        dbs.append("openai")
        return dbs

    def fetch_database_metadata(self, database):
        """
        Fetch metadata for a given virtual OpenAI database.

        Parameters:
            database (str): The virtual database name.

        Returns:
            dict: Empty dictionary since OpenAI databases are virtual.
        """
        return {}

    def fetch_database_schema(self, database):
        """
        Fetch the schema of the specified OpenAI database.

        Parameters:
            database (str): The virtual database name.

        Returns:
            dict: Empty dictionary
        """
        return {}

    ######### database/collection
    def fetch_database_collections(self, database):
        """
        Fetch collections  in the OpenAI database.

        Parameters:
            database (str): The virtual database name.

        Returns:
            list: A single virtual collection ['public'].
        """
        collections = []
        collections.append("public")
        return collections

    def fetch_database_collection_metadata(self, database, collection):
        """
        Fetch metadata for a specific OpenAI collection.

        Parameters:
            database (str): The virtual database name.
            collection (str): The virtual collection name.

        Returns:
            dict: Empty dictionary, as no collection metadata is stored.
        """
        return {}

    def fetch_database_collection_entities(self, database, collection):
        """
        Fetch entities (analogous to tables) and their attributes for OpenAI.

        Parameters:
            database (str): The virtual database name.
            collection (str): The virtual collection name.

        Returns:
            dict: Dictionary of one virtual entity with a single text attribute.
        """
        table_name = "openai_entity"
        column_name = "openai_entity_attribute"
        data_type = "text"

        schema = DataSchema()
        schema.add_entity(table_name)

        property_def = {"type": data_type}
        schema.add_entity_property(table_name, column_name, property_def)

        return schema.get_entities()

    def fetch_database_collection_relations(self, database, collection):
        """
        Fetch relationships between entities in an OpenAI collection.

        Parameters:
            database (str): The virtual database name.
            collection (str): The virtual collection name.

        Returns:
            dict: Empty dictionary since no relationships exist in OpenAI source.
        """
        return {}

    def get_service_address(self, properties=None):
        """
        Construct and return the WebSocket service address for OpenAI interaction.

        Parameters:
            properties (dict, optional): Optional properties to override connection info.

        Returns:
            str: Service address in 'ws://host:port' format.
        """
        service_address = f"ws://{self.host}:{self.port}"
        return service_address

    ######### execute query
    def execute_query(self, query, database=None, collection=None, optional_properties={}):
        """Execute a natural language query against OpenAI service synchronously."""

        # Execute API Call
        return self.execute_api_call(query, properties=self.properties, additional_data=optional_properties)

execute_query(query, database=None, collection=None, optional_properties={})

Execute a natural language query against OpenAI service synchronously.

Source code in blue/data/sources/openai_source.py
240
241
242
243
244
def execute_query(self, query, database=None, collection=None, optional_properties={}):
    """Execute a natural language query against OpenAI service synchronously."""

    # Execute API Call
    return self.execute_api_call(query, properties=self.properties, additional_data=optional_properties)

fetch_database_collection_entities(database, collection)

Fetch entities (analogous to tables) and their attributes for OpenAI.

Parameters:

Name Type Description Default
database str

The virtual database name.

required
collection str

The virtual collection name.

required

Returns:

Name Type Description
dict

Dictionary of one virtual entity with a single text attribute.

Source code in blue/data/sources/openai_source.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def fetch_database_collection_entities(self, database, collection):
    """
    Fetch entities (analogous to tables) and their attributes for OpenAI.

    Parameters:
        database (str): The virtual database name.
        collection (str): The virtual collection name.

    Returns:
        dict: Dictionary of one virtual entity with a single text attribute.
    """
    table_name = "openai_entity"
    column_name = "openai_entity_attribute"
    data_type = "text"

    schema = DataSchema()
    schema.add_entity(table_name)

    property_def = {"type": data_type}
    schema.add_entity_property(table_name, column_name, property_def)

    return schema.get_entities()

fetch_database_collection_metadata(database, collection)

Fetch metadata for a specific OpenAI collection.

Parameters:

Name Type Description Default
database str

The virtual database name.

required
collection str

The virtual collection name.

required

Returns:

Name Type Description
dict

Empty dictionary, as no collection metadata is stored.

Source code in blue/data/sources/openai_source.py
177
178
179
180
181
182
183
184
185
186
187
188
def fetch_database_collection_metadata(self, database, collection):
    """
    Fetch metadata for a specific OpenAI collection.

    Parameters:
        database (str): The virtual database name.
        collection (str): The virtual collection name.

    Returns:
        dict: Empty dictionary, as no collection metadata is stored.
    """
    return {}

fetch_database_collection_relations(database, collection)

Fetch relationships between entities in an OpenAI collection.

Parameters:

Name Type Description Default
database str

The virtual database name.

required
collection str

The virtual collection name.

required

Returns:

Name Type Description
dict

Empty dictionary since no relationships exist in OpenAI source.

Source code in blue/data/sources/openai_source.py
213
214
215
216
217
218
219
220
221
222
223
224
def fetch_database_collection_relations(self, database, collection):
    """
    Fetch relationships between entities in an OpenAI collection.

    Parameters:
        database (str): The virtual database name.
        collection (str): The virtual collection name.

    Returns:
        dict: Empty dictionary since no relationships exist in OpenAI source.
    """
    return {}

fetch_database_collections(database)

Fetch collections in the OpenAI database.

Parameters:

Name Type Description Default
database str

The virtual database name.

required

Returns:

Name Type Description
list

A single virtual collection ['public'].

Source code in blue/data/sources/openai_source.py
163
164
165
166
167
168
169
170
171
172
173
174
175
def fetch_database_collections(self, database):
    """
    Fetch collections  in the OpenAI database.

    Parameters:
        database (str): The virtual database name.

    Returns:
        list: A single virtual collection ['public'].
    """
    collections = []
    collections.append("public")
    return collections

fetch_database_metadata(database)

Fetch metadata for a given virtual OpenAI database.

Parameters:

Name Type Description Default
database str

The virtual database name.

required

Returns:

Name Type Description
dict

Empty dictionary since OpenAI databases are virtual.

Source code in blue/data/sources/openai_source.py
138
139
140
141
142
143
144
145
146
147
148
def fetch_database_metadata(self, database):
    """
    Fetch metadata for a given virtual OpenAI database.

    Parameters:
        database (str): The virtual database name.

    Returns:
        dict: Empty dictionary since OpenAI databases are virtual.
    """
    return {}

fetch_database_schema(database)

Fetch the schema of the specified OpenAI database.

Parameters:

Name Type Description Default
database str

The virtual database name.

required

Returns:

Name Type Description
dict

Empty dictionary

Source code in blue/data/sources/openai_source.py
150
151
152
153
154
155
156
157
158
159
160
def fetch_database_schema(self, database):
    """
    Fetch the schema of the specified OpenAI database.

    Parameters:
        database (str): The virtual database name.

    Returns:
        dict: Empty dictionary
    """
    return {}

fetch_databases()

Fetch available databases in the OpenAI source.

Returns:

Name Type Description
list

Contains a single virtual database ['openai'].

Source code in blue/data/sources/openai_source.py
127
128
129
130
131
132
133
134
135
136
def fetch_databases(self):
    """
    Fetch available databases in the OpenAI source.

    Returns:
        list: Contains a single virtual database ['openai'].
    """
    dbs = []
    dbs.append("openai")
    return dbs

fetch_metadata()

Fetch general metadata for the OpenAI source.

Returns:

Name Type Description
dict

Empty dictionary, as OpenAI does not expose metadata like databases.

Source code in blue/data/sources/openai_source.py
108
109
110
111
112
113
114
115
def fetch_metadata(self):
    """
    Fetch general metadata for the OpenAI source.

    Returns:
        dict: Empty dictionary, as OpenAI does not expose metadata like databases.
    """
    return {}

fetch_schema()

Fetch the overall schema of the OpenAI source.

Returns:

Name Type Description
dict

Empty dictionary

Source code in blue/data/sources/openai_source.py
117
118
119
120
121
122
123
124
def fetch_schema(self):
    """
    Fetch the overall schema of the OpenAI source.

    Returns:
        dict: Empty dictionary
    """
    return {}

get_service_address(properties=None)

Construct and return the WebSocket service address for OpenAI interaction.

Parameters:

Name Type Description Default
properties dict

Optional properties to override connection info.

None

Returns:

Name Type Description
str

Service address in 'ws://host:port' format.

Source code in blue/data/sources/openai_source.py
226
227
228
229
230
231
232
233
234
235
236
237
def get_service_address(self, properties=None):
    """
    Construct and return the WebSocket service address for OpenAI interaction.

    Parameters:
        properties (dict, optional): Optional properties to override connection info.

    Returns:
        str: Service address in 'ws://host:port' format.
    """
    service_address = f"ws://{self.host}:{self.port}"
    return service_address
Last update: 2025-10-09