Skip to content

Data discover operator

DataDiscoverOperator

Bases: Operator

Data discover operator that searches for data sources

Attributes:

Name Type Required Default Description
search_query str "" Text to search for in source names and descriptions.
approximate bool True Whether to use approximate (vector) search.
hybrid bool False Whether to use hybrid search (text + vector).
limit int -1 Max number of results to return (-1 means unlimited).
page int 0 Page number for pagination.
page_size int 10 Number of results per page (default: 10, max: 100).
include_metadata bool False Whether to include metadata in results (description and properties always included).
threshold float 0.5 Similarity threshold for filtering results (0.0–1.0, lower = more similar, only applies to approximate/hybrid search).
progressive_pagination bool False Whether to use progressive pagination for approximate/hybrid search (searches all pages until threshold exceeded).
concept_type str "source" Record type to search for (e.g., 'source', 'database', 'collection', 'entity', 'attribute', 'relation').
use_hierarchical_search bool True Whether to use hierarchical search or regular search.
scope str None Search scope to limit results.
source str None Source name to limit search scope.
database str None Database name to limit search scope (requires source).
collection str None Collection name to limit search scope (requires source and database).
auto_construct_scope bool True Whether to auto-construct scope from individual attributes or use scope as-is.
filter_names list [] Filter out results with matching names in the filter list.
Source code in blue/operators/data_discover_operator.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
class DataDiscoverOperator(Operator):
    """
     Data discover operator that searches for data sources

    Attributes:
    ----------
    | Name                   | Type   | Required | Default | Description |
    |-------------------------|--------|-----------|----------|--------------|
    | `search_query`          | str    | :fontawesome-solid-circle-check: {.green-check}    | ""       | Text to search for in source names and descriptions. |
    | `approximate`           | bool   | :fontawesome-solid-circle-check: {.green-check}    | True     | Whether to use approximate (vector) search. |
    | `hybrid`                | bool   |    | False    | Whether to use hybrid search (text + vector). |
    | `limit`                 | int    |    | -1       | Max number of results to return (-1 means unlimited). |
    | `page`                  | int    |    | 0        | Page number for pagination. |
    | `page_size`             | int    |    | 10       | Number of results per page (default: 10, max: 100). |
    | `include_metadata`      | bool   |    | False    | Whether to include metadata in results (description and properties always included). |
    | `threshold`             | float  |    | 0.5      | Similarity threshold for filtering results (0.0–1.0, lower = more similar, only applies to approximate/hybrid search). |
    | `progressive_pagination`| bool   |    | False    | Whether to use progressive pagination for approximate/hybrid search (searches all pages until threshold exceeded). |
    | `concept_type`          | str    |    | "source" | Record type to search for (e.g., 'source', 'database', 'collection', 'entity', 'attribute', 'relation'). |
    | `use_hierarchical_search` | bool |    | True     | Whether to use hierarchical search or regular search. |
    | `scope`                 | str    |      | None     | Search scope to limit results. |
    | `source`                | str    |      | None     | Source name to limit search scope. |
    | `database`              | str    |      | None     | Database name to limit search scope (requires source). |
    | `collection`            | str    |      | None     | Collection name to limit search scope (requires source and database). |
    | `auto_construct_scope`  | bool   |      | True     | Whether to auto-construct scope from individual attributes or use scope as-is. |
    | `filter_names`          | list   |      | []       | Filter out results with matching names in the filter list. |

    """

    PROPERTIES = {}

    name = "data_discover"
    description = "Discovers data sources using the data registry"
    default_attributes = {
        "search_query": {"type": "str", "description": "Text to search for in source names and descriptions", "required": True, "default": ""},
        "approximate": {"type": "bool", "description": "Whether to use approximate (vector) search", "required": True, "default": True},
        "hybrid": {"type": "bool", "description": "Whether to use hybrid search (text + vector)", "required": False, "default": False},
        "limit": {"type": "int", "description": "Max number of results to return (-1, unlimited)", "required": False, "default": -1},
        "page": {"type": "int", "description": "Page number for pagination", "required": False, "default": 0},
        "page_size": {"type": "int", "description": "Number of results per page (default: 10, max: 100)", "required": False, "default": 10},
        "include_metadata": {"type": "bool", "description": "Whether to include metadata in results (description and properties always included)", "required": False, "default": False},
        "threshold": {
            "type": "float",
            "description": "Similarity threshold for filtering results (0.0-1.0, lower = more similar, only applies to approximate/hybrid search)",
            "required": False,
            "default": 0.5,
        },
        "progressive_pagination": {
            "type": "bool",
            "description": "Whether to use progressive pagination for approximate/hybrid search (searches all pages until threshold exceeded)",
            "required": False,
            "default": False,
        },
        "concept_type": {
            "type": "str",
            "description": "Record type to search for (e.g., 'source', 'database', 'collection', 'entity', 'attribute', 'relation')",
            "required": False,
            "default": "source",
        },
        "use_hierarchical_search": {"type": "bool", "description": "Whether to use hierarchical search or regular search", "required": False, "default": True},
        "scope": {"type": "str", "description": "Search scope to limit results", "required": False, "default": None},
        "source": {"type": "str", "description": "Source name to limit search scope", "required": False, "default": None},
        "database": {"type": "str", "description": "Database name to limit search scope (requires source)", "required": False, "default": None},
        "collection": {"type": "str", "description": "Collection name to limit search scope (requires source and database)", "required": False, "default": None},
        "auto_construct_scope": {"type": "bool", "description": "Whether to auto-construct scope from individual attributes or use scope as-is", "required": False, "default": True},
        "filter_names": {"type": "list", "description": "Filter out results with matching names in the filter list", "required": False, "default": []},
    }

    def __init__(self, description: str = None, properties: Dict[str, Any] = None):
        super().__init__(
            self.name,
            function=data_discover_operator_function,
            description=description or self.description,
            properties=properties,
            validator=data_discover_operator_validator,
            explainer=data_discover_operator_explainer,
        )

    def _initialize_properties(self):
        super()._initialize_properties()

        # attribute definitions
        self.properties["attributes"] = self.default_attributes

data_discover_operator_explainer(output, input_data, attributes)

Generate explanation for data discover operator execution.

Parameters:

Name Type Description Default
output Any

The output result from the operator execution.

required
input_data List[List[Dict[str, Any]]]

The input data that was processed.

required
attributes Dict[str, Any]

The attributes used for the operation.

required

Returns:

Type Description
Dict[str, Any]

Dictionary containing explanation of the data discovery operation.

Source code in blue/operators/data_discover_operator.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def data_discover_operator_explainer(output: Any, input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any]) -> Dict[str, Any]:
    """Generate explanation for data discover operator execution.

    Parameters:
        output: The output result from the operator execution.
        input_data: The input data that was processed.
        attributes: The attributes used for the operation.

    Returns:
        Dictionary containing explanation of the data discovery operation.
    """
    concept_type = attributes.get('concept_type', 'source')
    use_hierarchical = attributes.get('use_hierarchical_search', True)
    search_method = "hierarchical" if use_hierarchical else "regular"

    # Get scope information
    scope = attributes.get('scope', None)
    source = attributes.get('source', None)
    database = attributes.get('database', None)
    collection = attributes.get('collection', None)
    auto_construct_scope = attributes.get('auto_construct_scope', True)
    search_scope = _construct_scope(scope, source, database, collection, concept_type, auto_construct_scope)

    scope_info = f" within scope '{search_scope}'" if search_scope != '/' else ""

    data_discover_explanation = {
        'output': output,
        'input_data': input_data,
        'attributes': attributes,
        'explanation': f"Data discover operator searched for {concept_type} entities using {search_method} search with query '{attributes.get('search_query', '')}'{scope_info} and returned {len(output[0]) if output and len(output) > 0 else 0} results.",
    }
    return data_discover_explanation

data_discover_operator_function(input_data, attributes, properties=None)

Discover data sources using the data registry with search capabilities.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]), not used for discovery.

required
attributes Dict[str, Any]

Dictionary containing search parameters including search_query, approximate, hybrid, pagination settings, and scope information.

required
properties Dict[str, Any]

Optional properties dictionary containing data registry information. Defaults to None.

None

Returns:

Type Description
List[List[Dict[str, Any]]]

List containing discovered data sources matching the search criteria.

Source code in blue/operators/data_discover_operator.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def data_discover_operator_function(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> List[List[Dict[str, Any]]]:
    """Discover data sources using the data registry with search capabilities.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]), not used for discovery.
        attributes: Dictionary containing search parameters including search_query, approximate, hybrid, pagination settings, and scope information.
        properties: Optional properties dictionary containing data registry information. Defaults to None.

    Returns:
        List containing discovered data sources matching the search criteria.
    """
    # Extract attributes
    search_query = attributes.get('search_query', '')
    approximate = attributes.get('approximate', True)
    hybrid = attributes.get('hybrid', False)
    limit = attributes.get('limit', -1)
    page = attributes.get('page', 0)
    page_size = attributes.get('page_size', 10)
    include_metadata = attributes.get('include_metadata', False)
    threshold = attributes.get('threshold', 0.5)
    progressive_pagination = attributes.get('progressive_pagination', False)
    concept_type = attributes.get('concept_type', 'source')
    use_hierarchical_search = attributes.get('use_hierarchical_search', True)
    scope = attributes.get('scope', None)
    source = attributes.get('source', None)
    database = attributes.get('database', None)
    collection = attributes.get('collection', None)
    auto_construct_scope = attributes.get('auto_construct_scope', True)
    filter_names = attributes.get('filter_names', [])

    search_scope = _construct_scope(scope, source, database, collection, concept_type, auto_construct_scope)

    data_registry = _get_data_registry_from_properties(properties)
    if not data_registry:
        logging.error("Error: Data registry not found")
        return [[]]

    results = []

    try:
        # Choose the search method based on use_hierarchical_search flag
        search_method = data_registry.search_records_hierarchical if use_hierarchical_search else data_registry.search_records

        # Determine if we should use simple pagination
        use_simple_pagination = (not approximate and not hybrid) or not progressive_pagination

        if use_simple_pagination:
            # Simple pagination - single call
            if use_hierarchical_search:
                search_results = search_method(search_query, type=concept_type, scope=search_scope, page=page, page_size=page_size)
            else:
                search_results = search_method(search_query, type=concept_type, scope=search_scope, approximate=approximate, hybrid=hybrid, page=page, page_size=page_size)

            for result in search_results:
                # filter names
                if result['name'] in filter_names:
                    continue

                transformed_result = _transform_result(result, concept_type, data_registry, include_metadata)

                # Apply threshold filtering for approximate/hybrid search even in simple pagination mode
                if (approximate or hybrid) and 'score' in result:
                    score = float(result['score'])
                    if score <= threshold:
                        results.append(transformed_result)
                else:
                    results.append(transformed_result)
        else:
            # Progressive pagination - loop until threshold exceeded
            current_page = page

            while True:
                if use_hierarchical_search:
                    search_results = search_method(search_query, type=concept_type, scope=search_scope, page=current_page, page_size=page_size)
                else:
                    search_results = search_method(search_query, type=concept_type, scope=search_scope, approximate=approximate, hybrid=hybrid, page=current_page, page_size=page_size)

                if len(search_results) == 0:
                    break

                for result in search_results:
                    # filter names
                    if result['name'] in filter_names:
                        continue

                    # Check threshold for approximate/hybrid search
                    score = float(result['score'])
                    if score <= threshold:
                        transformed_result = _transform_result(result, concept_type, data_registry, include_metadata)
                        results.append(transformed_result)
                    else:
                        # Score exceeds threshold, stop searching
                        break

                # Check if last result exceeded threshold to break outer loop
                if len(search_results) > 0:
                    last_score = float(search_results[-1]['score'])
                    if last_score > threshold:
                        break

                # Move to next page
                current_page += 1

    except Exception as e:
        logging.error(traceback.format_exc())
        return [[]]

    # limit results
    if limit >= 0:
        return [results[:limit]]
    else:
        return [results]

data_discover_operator_validator(input_data, attributes, properties=None)

Validate data discover operator attributes.

Parameters:

Name Type Description Default
input_data List[List[Dict[str, Any]]]

List of JSON arrays (List[List[Dict[str, Any]]]) to validate.

required
attributes Dict[str, Any]

Dictionary containing operator attributes to validate.

required
properties Dict[str, Any]

Optional properties dictionary. Defaults to None.

None

Returns:

Type Description
bool

True if attributes are valid, False otherwise.

Source code in blue/operators/data_discover_operator.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def data_discover_operator_validator(input_data: List[List[Dict[str, Any]]], attributes: Dict[str, Any], properties: Dict[str, Any] = None) -> bool:
    """Validate data discover operator attributes.

    Parameters:
        input_data: List of JSON arrays (List[List[Dict[str, Any]]]) to validate.
        attributes: Dictionary containing operator attributes to validate.
        properties: Optional properties dictionary. Defaults to None.

    Returns:
        True if attributes are valid, False otherwise.
    """
    try:
        if not default_operator_validator(input_data, attributes, properties):
            return False
    except Exception:
        return False

    search_query = attributes.get('search_query', '')
    page = attributes.get('page', 0)
    page_size = attributes.get('page_size', 10)
    threshold = attributes.get('threshold', 0.5)
    source = attributes.get('source', None)
    database = attributes.get('database', None)
    collection = attributes.get('collection', None)

    if not search_query or not search_query.strip():
        return False
    if page < 0 or page_size <= 0:
        return False
    if threshold < 0 or threshold > 1:
        return False

    if database and not source:
        return False
    if collection and (not source or not database):
        return False

    concept_type = attributes.get('concept_type', 'source')
    valid_concept_types = ['source', 'database', 'collection', 'entity', 'relation', 'attribute']
    if concept_type not in valid_concept_types:
        return False

    return True
Last update: 2025-10-08