Skip to content

Commit

Permalink
feat: Integrate groot into flex architecture (#3566)
Browse files Browse the repository at this point in the history
## What do these changes do?
- Flex Coordinator: Support Groot storage, including operating
graph、schema、and dataloading job.
- gsctl: Add `create vtype/etype` `create schema` `delete vtype/etype`
command
```bash
gsctl create vtype/etype -f /path/to/file
gsctl delete vtype person
gsctl delete etype knows -s person -d person
gsctl create schema -f /home/graphscope/alibaba/test/groot/schema.yaml
```
- Add batch interface for updating vertex/edge properties of Groot
```python
def update_vertex_properties_batch(self, vertices: list):
        request = to_write_requests_pb("VERTEX", vertices, write_service_pb2.UPDATE)
        return self._conn.batch_write(request)

def update_edge_properties_batch(self, edges: list):
        request = to_write_requests_pb("EDGE", edges, write_service_pb2.UPDATE)
        return self._conn.batch_write(request)
```
- Integrate Coordinator into Groot image, now we can build Flex
Groot(Insight image through `gsctl flexbuild` command
```
gsctl flexbuild insight --app docker
```

Doc: Documentation will be added later when the final format of `gsctl`
is determined.
  • Loading branch information
lidongze0629 authored Mar 15, 2024
1 parent 1ebbde6 commit e283fa4
Show file tree
Hide file tree
Showing 61 changed files with 8,122 additions and 453 deletions.
5 changes: 5 additions & 0 deletions flex/coordinator/.openapi-generator/FILES
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ gs_flex_coordinator/models/base_model.py
gs_flex_coordinator/models/column_mapping.py
gs_flex_coordinator/models/connection.py
gs_flex_coordinator/models/connection_status.py
gs_flex_coordinator/models/data_source.py
gs_flex_coordinator/models/deployment_info.py
gs_flex_coordinator/models/deployment_info_graphs_info_value.py
gs_flex_coordinator/models/deployment_status.py
gs_flex_coordinator/models/edge_data_source.py
gs_flex_coordinator/models/edge_mapping.py
gs_flex_coordinator/models/edge_mapping_destination_vertex_mappings_inner.py
gs_flex_coordinator/models/edge_mapping_source_vertex_mappings_inner.py
Expand All @@ -28,6 +30,8 @@ gs_flex_coordinator/models/edge_type_vertex_type_pair_relations_inner.py
gs_flex_coordinator/models/edge_type_vertex_type_pair_relations_inner_x_csr_params.py
gs_flex_coordinator/models/graph.py
gs_flex_coordinator/models/graph_stored_procedures.py
gs_flex_coordinator/models/groot_dataloading_job_config.py
gs_flex_coordinator/models/groot_dataloading_job_config_edges_inner.py
gs_flex_coordinator/models/groot_edge_type.py
gs_flex_coordinator/models/groot_edge_type_relations_inner.py
gs_flex_coordinator/models/groot_graph.py
Expand All @@ -50,6 +54,7 @@ gs_flex_coordinator/models/service_status.py
gs_flex_coordinator/models/service_status_sdk_endpoints.py
gs_flex_coordinator/models/start_service_request.py
gs_flex_coordinator/models/update_alert_messages_request.py
gs_flex_coordinator/models/vertex_data_source.py
gs_flex_coordinator/models/vertex_mapping.py
gs_flex_coordinator/models/vertex_type.py
gs_flex_coordinator/openapi/openapi.yaml
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import connexion
from typing import Dict
from typing import Tuple
from typing import Union

from gs_flex_coordinator.core import client_wrapper
from gs_flex_coordinator.core import handle_api_exception
from gs_flex_coordinator.models.data_source import DataSource # noqa: E501
from gs_flex_coordinator.models.edge_data_source import EdgeDataSource # noqa: E501
from gs_flex_coordinator.models.vertex_data_source import VertexDataSource # noqa: E501
from gs_flex_coordinator import util


@handle_api_exception()
def bind_edge_datasource(graph_name, edge_data_source): # noqa: E501
"""bind_edge_datasource
Bind data source on edge type # noqa: E501
:param graph_name:
:type graph_name: str
:param edge_data_source:
:type edge_data_source: dict | bytes
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
if connexion.request.is_json:
edge_data_source = EdgeDataSource.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.bind_edge_datasource(graph_name, edge_data_source)


@handle_api_exception()
def bind_vertex_datasource(graph_name, vertex_data_source): # noqa: E501
"""bind_vertex_datasource
Bind data source on vertex type # noqa: E501
:param graph_name:
:type graph_name: str
:param vertex_data_source:
:type vertex_data_source: dict | bytes
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
if connexion.request.is_json:
vertex_data_source = VertexDataSource.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.bind_vertex_datasource(graph_name, vertex_data_source)


@handle_api_exception()
def get_datasource(graph_name): # noqa: E501
"""get_datasource
List data source on graph # noqa: E501
:param graph_name:
:type graph_name: str
:rtype: Union[DataSource, Tuple[DataSource, int], Tuple[DataSource, int, Dict[str, str]]
"""
return client_wrapper.get_datasource(graph_name)


@handle_api_exception()
def get_edge_datasource(graph_name, type_name, source_vertex_type, destination_vertex_type): # noqa: E501
"""get_edge_datasource
Get edge data source # noqa: E501
:param graph_name:
:type graph_name: str
:param type_name:
:type type_name: str
:param source_vertex_type:
:type source_vertex_type: str
:param destination_vertex_type:
:type destination_vertex_type: str
:rtype: Union[EdgeDataSource, Tuple[EdgeDataSource, int], Tuple[EdgeDataSource, int, Dict[str, str]]
"""
return client_wrapper.get_edge_datasource(
graph_name,
type_name,
source_vertex_type,
destination_vertex_type
)


@handle_api_exception()
def get_vertex_datasource(graph_name, type_name): # noqa: E501
"""get_vertex_datasource
Get vertex data source # noqa: E501
:param graph_name:
:type graph_name: str
:param type_name:
:type type_name: str
:rtype: Union[VertexDataSource, Tuple[VertexDataSource, int], Tuple[VertexDataSource, int, Dict[str, str]]
"""
return client_wrapper.get_vertex_datasource(graph_name, type_name)


@handle_api_exception()
def import_datasource(graph_name, data_source): # noqa: E501
"""import_datasource
Import data source in batch # noqa: E501
:param graph_name:
:type graph_name: str
:param data_source:
:type data_source: dict | bytes
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
if connexion.request.is_json:
data_source = DataSource.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.import_datasource(graph_name, data_source)


@handle_api_exception()
def unbind_edge_datasource(graph_name, type_name, source_vertex_type, destination_vertex_type): # noqa: E501
"""unbind_edge_datasource
Unbind datasource on an edge type # noqa: E501
:param graph_name:
:type graph_name: str
:param type_name:
:type type_name: str
:param source_vertex_type:
:type source_vertex_type: str
:param destination_vertex_type:
:type destination_vertex_type: str
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.unbind_edge_datasource(
graph_name,
type_name,
source_vertex_type,
destination_vertex_type
)


@handle_api_exception()
def unbind_vertex_datasource(graph_name, type_name): # noqa: E501
"""unbind_vertex_datasource
Unbind datasource on a vertex type # noqa: E501
:param graph_name:
:type graph_name: str
:param type_name:
:type type_name: str
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.unbind_vertex_datasource(graph_name, type_name)
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def create_edge_type(graph_name, edge_type): # noqa: E501
"""
if connexion.request.is_json:
edge_type = EdgeType.from_dict(connexion.request.get_json()) # noqa: E501
return 'do some magic!'
return client_wrapper.create_edge_type(graph_name, edge_type)


@handle_api_exception()
Expand Down Expand Up @@ -61,7 +61,7 @@ def create_vertex_type(graph_name, vertex_type): # noqa: E501
"""
if connexion.request.is_json:
vertex_type = VertexType.from_dict(connexion.request.get_json()) # noqa: E501
return 'do some magic!'
return client_wrapper.create_vertex_type(graph_name, vertex_type)


@handle_api_exception()
Expand All @@ -81,7 +81,9 @@ def delete_edge_type(graph_name, type_name, source_vertex_type, destination_vert
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return 'do some magic!'
return client_wrapper.delete_edge_type(
graph_name, type_name, source_vertex_type, destination_vertex_type
)


@handle_api_exception()
Expand Down Expand Up @@ -111,7 +113,7 @@ def delete_vertex_type(graph_name, type_name): # noqa: E501
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return 'do some magic!'
return client_wrapper.delete_vertex_type(graph_name, type_name)


@handle_api_exception()
Expand Down
14 changes: 14 additions & 0 deletions flex/coordinator/gs_flex_coordinator/controllers/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,17 @@ def list_jobs(): # noqa: E501
:rtype: Union[List[JobStatus], Tuple[List[JobStatus], int], Tuple[List[JobStatus], int, Dict[str, str]]
"""
return client_wrapper.list_jobs()


@handle_api_exception()
def get_dataloading_config(graph_name): # noqa: E501
"""get_dataloading_config
get dataloading configuration # noqa: E501
:param graph_name:
:type graph_name: str
:rtype: Union[List[SchemaMapping], Tuple[List[SchemaMapping], int], Tuple[List[SchemaMapping], int, Dict[str, str]]
"""
return client_wrapper.get_dataloading_config(graph_name)
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,32 @@

from gs_flex_coordinator.core import client_wrapper
from gs_flex_coordinator.core import handle_api_exception
from gs_flex_coordinator.models.groot_dataloading_job_config import GrootDataloadingJobConfig # noqa: E501
from gs_flex_coordinator.models.groot_graph import GrootGraph # noqa: E501
from gs_flex_coordinator.models.groot_schema import GrootSchema # noqa: E501
from gs_flex_coordinator.models.schema_mapping import SchemaMapping # noqa: E501
from gs_flex_coordinator import util


def create_groot_dataloading_job(graph_name, groot_dataloading_job_config): # noqa: E501
"""create_groot_dataloading_job
# noqa: E501
:param graph_name:
:type graph_name: str
:param groot_dataloading_job_config:
:type groot_dataloading_job_config: dict | bytes
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
if connexion.request.is_json:
groot_dataloading_job_config = GrootDataloadingJobConfig.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.create_groot_dataloading_job(
graph_name, groot_dataloading_job_config
)


@handle_api_exception()
def get_groot_schema(graph_name): # noqa: E501
"""get_groot_schema
Expand All @@ -22,11 +42,11 @@ def get_groot_schema(graph_name): # noqa: E501
:rtype: Union[GrootSchema, Tuple[GrootSchema, int], Tuple[GrootSchema, int, Dict[str, str]]
"""
return 'do some magic!'
return client_wrapper.get_groot_schema(graph_name)


@handle_api_exception()
def import_schema(graph_name, groot_schema): # noqa: E501
def import_groot_schema(graph_name, groot_schema): # noqa: E501
"""import_schema
Import schema to groot graph # noqa: E501
Expand All @@ -40,7 +60,7 @@ def import_schema(graph_name, groot_schema): # noqa: E501
"""
if connexion.request.is_json:
groot_schema = GrootSchema.from_dict(connexion.request.get_json()) # noqa: E501
return 'do some magic!'
return client_wrapper.import_groot_schema(graph_name, groot_schema)


@handle_api_exception()
Expand All @@ -52,4 +72,4 @@ def list_groot_graph(): # noqa: E501
:rtype: Union[List[GrootGraph], Tuple[List[GrootGraph], int], Tuple[List[GrootGraph], int, Dict[str, str]]
"""
return 'do some magic!'
return client_wrapper.list_groot_graph()
7 changes: 7 additions & 0 deletions flex/coordinator/gs_flex_coordinator/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,12 @@
# limitations under the License.
#

import logging

logging.basicConfig(
format="%(asctime)s [%(levelname)s][%(module)s:%(lineno)d]: %(message)s",
level=logging.INFO,
)

from gs_flex_coordinator.core.client_wrapper import client_wrapper
from gs_flex_coordinator.core.utils import handle_api_exception
18 changes: 10 additions & 8 deletions flex/coordinator/gs_flex_coordinator/core/alert/alert_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
from typing import List, Union

from gs_flex_coordinator.core.alert.alert_receiver import DingTalkReceiver
from gs_flex_coordinator.core.alert.builtin_rules import init_builtin_alert_rules
from gs_flex_coordinator.core.alert.message_collector import AlertMessageCollector
from gs_flex_coordinator.core.alert.builtin_rules import \
init_builtin_alert_rules
from gs_flex_coordinator.core.alert.message_collector import \
AlertMessageCollector
from gs_flex_coordinator.core.config import ALERT_WORKSPACE
from gs_flex_coordinator.core.scheduler import schedule
from gs_flex_coordinator.core.utils import decode_datetimestr, encode_datetime
from gs_flex_coordinator.models import AlertMessage, AlertReceiver, AlertRule

logger = logging.getLogger("graphscope")


class AlertManager(object):
def __init__(self):
Expand All @@ -56,18 +56,20 @@ def __init__(self):
def _try_to_recover_from_disk(self):
try:
if os.path.exists(self._receiver_path):
logger.info("Recover alert receiver from file: %s", self._receiver_path)
logging.info(
"Recover alert receiver from file: %s", self._receiver_path
)
with open(self._receiver_path, "rb") as f:
self._receivers = pickle.load(f)
except Exception as e:
logger.warn("Failed to recover alert receiver: %s", str(e))
logging.warn("Failed to recover alert receiver: %s", str(e))

def _pickle_receiver_impl(self):
try:
with open(self._receiver_path, "wb") as f:
pickle.dump(self._receivers, f)
except Exception as e:
logger.warn("Failed to dump receiver: %s", str(e))
logging.warn("Failed to dump receiver: %s", str(e))

def list_alert_rules(self) -> List[AlertRule]:
rlt = []
Expand Down Expand Up @@ -114,7 +116,7 @@ def list_alert_messages(
# date -> date
start_date_filter = decode_datetimestr(start_time)
end_date_filter = decode_datetimestr(end_time)
logger.info(
logging.info(
"Fetch alert messages from %s to %s",
encode_datetime(start_date_filter),
encode_datetime(end_date_filter),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
import logging

import requests

from gs_flex_coordinator.core.alert.alert_message import AlertMessage
from gs_flex_coordinator.core.config import INSTANCE_NAME
from gs_flex_coordinator.core.utils import random_string

logger = logging.getLogger("graphscope")


class DingTalkReceiver(object):
"""DingTalk webhook receiver."""
Expand Down Expand Up @@ -117,5 +116,5 @@ def send(self, message: AlertMessage):
raise RuntimeError(str(rlt))

except Exception as e:
logger.warn("Failed to send dingtalk: %s", str(e))
logging.warn("Failed to send dingtalk: %s", str(e))
self._error_msg = str(e)
Loading

0 comments on commit e283fa4

Please sign in to comment.