Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConnectionResetError on long running service #26

Open
nuthanmunaiah opened this issue Jun 26, 2019 · 2 comments
Open

ConnectionResetError on long running service #26

nuthanmunaiah opened this issue Jun 26, 2019 · 2 comments

Comments

@nuthanmunaiah
Copy link

Hello,

I'm a PhD student working on metrics to assist developers in discovering security vulnerabilities in software. I am using Nameko to develop a number of microservices to collect these metrics from large projects. I am migrating the services I have developed so far to use the nameko-grpc extension but I encountered a ConnectionResetError from one of the services. I have included two source code files (service.py and client.py) below which shows customized code to demonstrate the implementation. trace.txt contains the stack trace from the service when the exception occurred.

The service in question is an implementation of the collaboration centrality metric. The service depends on another nameko-grpc service (called repository) which stream changes (commits) from a git repository.

The exception seems to be thrown only when the service takes a long time to start streaming the results. Is there something I am missing in the implementation?

Thank you,
Nuthan Munaiah

service.py

import logging

import graph_tool
from graph_tool.centrality import betweenness

from nameko.dependency_providers import Config
from nameko_grpc.entrypoint import Grpc
from nameko_grpc.dependency_provider import GrpcProxy

from .centrality_pb2 import CentralityResponse
from .centrality_pb2_grpc import centralityStub
from .repository_pb2 import RepositoryRequest
from .repository_pb2_grpc import repositoryStub

logger = logging.getLogger(__name__)
grpc = Grpc.implementing(centralityStub)

def _build_graph(changes):
    graph = graph_tool.Graph()
    for change in changes:
        graph.add_edge(change.developer, change.path)
    logger.debug('# Nodes: %s', graph.num_vertices())
    logger.debug('# Edges: %s', graph.num_edges())   
    return graph

def _get_centrality(changes):
    graph = _build_graph(changes)
    _, centralities = betweenness(graph)
    for edge, centrality in centralities.items():
        yield CentralityResponse(edge=edge, centrality=centrality)

class CentralityService:
    name = 'centrality'

    config = Config()
    repository_grpc = GrpcProxy('//repository', repositoryStub)

    @grpc
    def get(self, request, context):
        repository_request = RepositoryRequest(project=request.project)
        changes = self.repository_grpc.get_changes(repository_request)
        for centrality in _get_centrality(changes):
            yield centrality

client.py

from nameko_grpc.client import Client

from .centrality_pb2 import CentralityRequest
from .centrality_pb2_grpc import centralityStub

def main():
    request = CentralityRequest(project='firefox')
    with Client('//127.0.0.1', centralityStub) as client:
        response = client.get(request)
        for centrality in response:
            print(centrality.edge, centrality.centrality)

if __name__ == '__main__':
    main()
@mattbennett
Copy link
Member

Thank you for reporting this @nuthanmunaiah. The problem is that the nameko-grpc entrypoint does not protect against errors on the underlying socket. Really we should catch it and attempt to re-establish the connection.

@nuthanmunaiah
Copy link
Author

Thanks so much for taking a look at this. I'll attempt a fix and open a pull request when I have one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants