Skip to content

Ray client

Bases: RayToolClient, OperatorClient

Ray client for operators, inherits from RayToolClient and OperatorClient.

Source code in blue/operators/clients/ray_client.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class RayOperatorClient(RayToolClient, OperatorClient):
    """Ray client for operators, inherits from RayToolClient and OperatorClient."""

    def __init__(self, name, operators={}, properties={}):
        """Initialize the RayOperatorClient.

        Parameters:
            name: Name of the ray operator client.
            operators: List of operators to be included as a dictionary of Operator objects. Defaults to {}.
            properties: Properties for the client. Defaults to {}.
        """
        super().__init__(name, tools=operators, properties=properties)

    ######### operator
    def fetch_operators(self):
        """Fetch available operators from ray client.

        Returns:
            List of available operators.
        """
        return self.fetch_tools()

    def fetch_operator_metadata(self, operator):
        """Fetch metadata for a specific operator from ray client.

        Parameters:
            operator: Name of the operator.

        Returns:
            Dictionary containing operator metadata.
        """
        return self.fetch_tool_metadata(operator)

    ######### execute operator
    def execute_operator(self, operator, args, kwargs):
        """Execute a specific operator with given arguments.

        Parameters:
            operator: Name of the operator.
            args: Arguments for the operator.
            kwargs: Keyword arguments for the operator.

        Returns:
            Result of the operator execution
        """
        return self.execute_tool(operator, args, kwargs)

    ######### refine operator
    def refine_operator(self, operator, args, kwargs):
        """Refine the operator based on given arguments, returns list of possible refinements as DataPipeline objects.

        Parameters:
            operator: Name of the operator.
            args: Arguments for the operator.
            kwargs: Keyword arguments for the operator.

        Raises:
            Exception: No operator matching

        Returns:
            List of possible refinements as DataPipeline objects.
        """
        if operator is None:
            raise Exception("No operator matching...")

        result_ref = None

        if operator in self.tools:
            operator_obj = self.tools[operator]

            remote_function = ray.remote(operator_obj.refiner)
            result_ref = remote_function.remote(**kwargs)

        if result_ref:
            result = ray.get(result_ref)
            return result
        else:
            return []

    def get_operator_attributes(self, operator):
        """Get attributes of a specific operator.

        Parameters:
            operator: Name of the operator.

        Raises:
            Exception: No operator matching

        Returns:

        """
        if operator is None:
            raise Exception("No operator matching...")

        attributes = {}

        if operator in self.tools:
            operator_obj = self.tools[operator]
            attributes = operator_obj.get_attributes()

        return attributes

__init__(name, operators={}, properties={})

Initialize the RayOperatorClient.

Parameters:

Name Type Description Default
name

Name of the ray operator client.

required
operators

List of operators to be included as a dictionary of Operator objects. Defaults to {}.

{}
properties

Properties for the client. Defaults to {}.

{}
Source code in blue/operators/clients/ray_client.py
22
23
24
25
26
27
28
29
30
def __init__(self, name, operators={}, properties={}):
    """Initialize the RayOperatorClient.

    Parameters:
        name: Name of the ray operator client.
        operators: List of operators to be included as a dictionary of Operator objects. Defaults to {}.
        properties: Properties for the client. Defaults to {}.
    """
    super().__init__(name, tools=operators, properties=properties)

execute_operator(operator, args, kwargs)

Execute a specific operator with given arguments.

Parameters:

Name Type Description Default
operator

Name of the operator.

required
args

Arguments for the operator.

required
kwargs

Keyword arguments for the operator.

required

Returns:

Type Description

Result of the operator execution

Source code in blue/operators/clients/ray_client.py
53
54
55
56
57
58
59
60
61
62
63
64
def execute_operator(self, operator, args, kwargs):
    """Execute a specific operator with given arguments.

    Parameters:
        operator: Name of the operator.
        args: Arguments for the operator.
        kwargs: Keyword arguments for the operator.

    Returns:
        Result of the operator execution
    """
    return self.execute_tool(operator, args, kwargs)

fetch_operator_metadata(operator)

Fetch metadata for a specific operator from ray client.

Parameters:

Name Type Description Default
operator

Name of the operator.

required

Returns:

Type Description

Dictionary containing operator metadata.

Source code in blue/operators/clients/ray_client.py
41
42
43
44
45
46
47
48
49
50
def fetch_operator_metadata(self, operator):
    """Fetch metadata for a specific operator from ray client.

    Parameters:
        operator: Name of the operator.

    Returns:
        Dictionary containing operator metadata.
    """
    return self.fetch_tool_metadata(operator)

fetch_operators()

Fetch available operators from ray client.

Returns:

Type Description

List of available operators.

Source code in blue/operators/clients/ray_client.py
33
34
35
36
37
38
39
def fetch_operators(self):
    """Fetch available operators from ray client.

    Returns:
        List of available operators.
    """
    return self.fetch_tools()

get_operator_attributes(operator)

Get attributes of a specific operator.

Parameters:

Name Type Description Default
operator

Name of the operator.

required

Raises:

Type Description
Exception

No operator matching

Returns:

Source code in blue/operators/clients/ray_client.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def get_operator_attributes(self, operator):
    """Get attributes of a specific operator.

    Parameters:
        operator: Name of the operator.

    Raises:
        Exception: No operator matching

    Returns:

    """
    if operator is None:
        raise Exception("No operator matching...")

    attributes = {}

    if operator in self.tools:
        operator_obj = self.tools[operator]
        attributes = operator_obj.get_attributes()

    return attributes

refine_operator(operator, args, kwargs)

Refine the operator based on given arguments, returns list of possible refinements as DataPipeline objects.

Parameters:

Name Type Description Default
operator

Name of the operator.

required
args

Arguments for the operator.

required
kwargs

Keyword arguments for the operator.

required

Raises:

Type Description
Exception

No operator matching

Returns:

Type Description

List of possible refinements as DataPipeline objects.

Source code in blue/operators/clients/ray_client.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def refine_operator(self, operator, args, kwargs):
    """Refine the operator based on given arguments, returns list of possible refinements as DataPipeline objects.

    Parameters:
        operator: Name of the operator.
        args: Arguments for the operator.
        kwargs: Keyword arguments for the operator.

    Raises:
        Exception: No operator matching

    Returns:
        List of possible refinements as DataPipeline objects.
    """
    if operator is None:
        raise Exception("No operator matching...")

    result_ref = None

    if operator in self.tools:
        operator_obj = self.tools[operator]

        remote_function = ray.remote(operator_obj.refiner)
        result_ref = remote_function.remote(**kwargs)

    if result_ref:
        result = ray.get(result_ref)
        return result
    else:
        return []
Last update: 2025-10-08