Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WebServerPlugin ] Add support for Asynchronous response streaming #289

Closed
ahmedtalhakhan opened this issue Feb 4, 2020 · 8 comments · Fixed by #675
Closed

[WebServerPlugin ] Add support for Asynchronous response streaming #289

ahmedtalhakhan opened this issue Feb 4, 2020 · 8 comments · Fixed by #675
Assignees

Comments

@ahmedtalhakhan
Copy link

ahmedtalhakhan commented Feb 4, 2020

Describe the bug
WebServerPlugin does not render all data from upstream in a timely manner

To Reproduce
Steps to reproduce the behavior:

  1. Run proxy.py as '--enable-webserver --plugin proxy.plugin.WebServerPlugin'
  2. Some modifications to the WebServerPlugin have been done as explained in the context section
  3. Do a curl to trigger error
  4. See error

Expected behavior
All response data from upstream should come through in a timely fashion. But instead the socket is held for sometime by the proxy/plugin even after reading upstream data and there is a delay in sending the data back to the client

Version information

  • OS: Mac OS Caltalina
  • Browser curl
  • Device: Mac
  • proxy.py Version [e.g. 1.1.1]

Additional context
Following changes have been done to the WebServerPlugin. Note that the route has a * which is supposed to match all.

The upstream server is hosted at local port 5678 and hosts a plain css file.

The modifications are inspired by the plugin reverse_proxy to reach to some upstream server but note that these modifications are required for this plugin to work correctly to process all data. The original code in the reverse_proxy plugin just does a single conn.recv call which is not enough to handle larger datasets.

class WebServerPlugin(HttpWebServerBasePlugin):
    """Demonstrates inbuilt web server routing using plugin."""

    def routes(self) -> List[Tuple[int, str]]:
        return [
            (httpProtocolTypes.HTTP, r'/*'),
        ]

    def handle_request(self, request: HttpParser) -> None:
        upstream = b'http://localhost:5678'

        url = urlparse.urlsplit(upstream)
        assert url.hostname
      
        with socket_connection((text_(url.hostname), url.port if url.port else DEFAULT_HTTP_PORT)) as conn:
               while True:
                    print("I am reading data")
                    data = conn.recv(DEFAULT_BUFFER_SIZE)
                    print(len(data))
                    self.client.queue(memoryview(data))
                    print("pushed data")
                    if not data:
                        print("breaking from loop")
                        break

Now run the following curl command. Note the the css file is just a random css file of size 268K

curl -v http://localhost:8899/css/app.cff42e0c.cs

The output of the curl command is the following. Note that the transfer took upwards of 5 seconds whereas the code has no such delay or sleep. By running the code with the debug prints, it becomes evident that data is read from the upstream socket very quickly, but it is not returned to the client for sometime.

curl -v http://localhost:8899/css/app.cff42e0c.css >o
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8899 (#0)
> GET /css/app.cff42e0c.css HTTP/1.1
> Host: localhost:8899
> User-Agent: curl/7.64.1
> Accept: */*
> 
  0     0    0     0    0     0      0      0 --:--:--  0:00:04 --:--:--     0< HTTP/1.1 200 OK
< X-Powered-By: Express
< Access-Control-Allow-Origin: http://localhost:5678
< Access-Control-Allow-Methods: GET, POST, OPTIONS, PUT, PATCH, DELETE
< Access-Control-Allow-Headers: Origin, X-Requested-With, Content-Type, Accept, sessionid
< Accept-Ranges: bytes
< Cache-Control: public, max-age=0
< Last-Modified: Sat, 01 Feb 2020 23:47:30 GMT
< ETag: W/"43093-1700326f6c3"
< Content-Type: text/css; charset=UTF-8
< Content-Length: 274579
< Vary: Accept-Encoding
< Date: Tue, 04 Feb 2020 21:04:27 GMT
< Connection: keep-alive
< 
{ [54576 bytes data]
100  268k  100  268k    0     0  54697      0  0:00:05  0:00:05 --:--:-- 72030
* Connection #0 to host localhost left intact
* Closing connection 0

@ahmedtalhakhan ahmedtalhakhan added the Bug Bug report in proxy server label Feb 4, 2020
@abhinavsingh abhinavsingh added Enhancement and removed Bug Bug report in proxy server labels Feb 4, 2020
@abhinavsingh
Copy link
Owner

abhinavsingh commented Feb 4, 2020

Hi @ahmedtalhakhan thanks for reporting such a scenario.

I think what's happening here is, response is being queued but not getting flushed, since you have a while True loop within handle_request. This prevents proxy.py core from flushing buffer to the client, even when client is ready to receive response, as core is blocked on handle_request to complete.

What you will have to do is, make receiving from upstream socket nonblocking so that proxy.py core can continue flushing client buffer. Then you can call client.queue whenever there is data received from upstream server.

I have marked this ticket as enhancement as it's not really a bug :) But we should be able to create ways to handle such scenarios (more efficiently).

@ahmedtalhakhan
Copy link
Author

@abhinavsingh that makes sense. But how do i make the receiving from upstream non-blocking with the current scenario. If I return from handle_request, the context of the request will be lost. How will I re-enter the flow again? Also, there is no callback mechanism here.

@abhinavsingh
Copy link
Owner

abhinavsingh commented Feb 5, 2020

When you return from handle_request context of request is not gone. Every plugin object in proxy.py handles a single request and request context is valid for plugin object until connection is closed by server or client (or timeout). handle_request is just a lifecycle event, signaling the plugin that "Hey request is complete, now you can process it"

  1. Here web server core calls handle_request callback https://github.com/abhinavsingh/proxy.py/blob/develop/proxy/http/server/web.py#L155-L156

  2. As you can see, it returns False by default, which is a signal to underlying event loop that "Hey, don't teardown the connection just yet".

So it should work, but I haven't tried it before (for non-websocket connections). Can you try asynchronous delivery of responses? For a quick proof-of-concept, ignore non-blocking sockets and simply try to dispatch a response chunk every second asynchronously and lemme know how it goes.

Thank you!!!

@abhinavsingh abhinavsingh changed the title WebServerPlugin does not render all data from upstream. Asynchronous response streaming for WebServerPlugin Feb 5, 2020
@ahmedtalhakhan
Copy link
Author

Hey @abhinavsingh. Thanks for clarity.

The issue of core server not being able to flush can be resolved by calling the flush from within the loop. The above code can be changed to

with socket_connection((text_(url.hostname), url.port if url.port else DEFAULT_HTTP_PORT)) as conn:
               while True:
                    print("I am reading data")
                    data = conn.recv(DEFAULT_BUFFER_SIZE)
                    self.client.queue(memoryview(data)
                    self.client.flush()                      // call flush here
                    if not data:
                        print("breaking from loop")
                        break

I think the problem then becomes a little different. In the original code, the line
data = conn.recv(DEFAULT_BUFFER_SIZE)

seems to block for any upstream server when HTTP 1.1 is used because the upstream server never closes the socket. The only way at that point is to either set Connection:Close header on the upstream request(not very desirable at all) or come up with a way to read exactly bytes. Do you have any recommendations on this front?

On your suggestion about "try to dispatch a response chunk every second asynchronously", did you mean launch some new thread/process within the handle_request which then keeps on calling self.client.queue with some data after every 1 second.

@abhinavsingh
Copy link
Owner

The issue of core server not being able to flush can be resolved by calling the flush from within the loop. The above code can be changed to

with socket_connection((text_(url.hostname), url.port if url.port else DEFAULT_HTTP_PORT)) as conn:
               while True:
                    print("I am reading data")
                    data = conn.recv(DEFAULT_BUFFER_SIZE)
                    self.client.queue(memoryview(data)
                    self.client.flush()                      // call flush here
                    if not data:
                        print("breaking from loop")
                        break

I frankly won't recommend doing it, if you care about scalability and such. Problem is, now you will be calling flush explicitly, resulting in blocking IO call because client might not be ready.

I think the problem then becomes a little different. In the original code, the line
data = conn.recv(DEFAULT_BUFFER_SIZE)

seems to block for any upstream server when HTTP 1.1 is used because the upstream server never closes the socket. The only way at that point is to either set Connection:Close header on the upstream request(not very desirable at all) or come up with a way to read exactly bytes. Do you have any recommendations on this front?

Of course this is bound to happen, as business logic is now just a tunnel and it doesn't keep any state management i.e. by transparent tunneling, you have offloaded connection teardown to either upstream server or the client, because tunnel doesn't know when response has finished.

On your suggestion about "try to dispatch a response chunk every second asynchronously", did you mean launch some new thread/process within the handle_request which then keeps on calling self.client.queue with some data after every 1 second.

Yes. I think this is the way to go here. Start a separate thread (not process) and then repeatedly call client.queue whenever there is data to be queued. This way everything remains non-blocking.

@abhinavsingh abhinavsingh changed the title Asynchronous response streaming for WebServerPlugin [WebServerPlugin ] Asynchronous response streaming Mar 24, 2020
@abhinavsingh abhinavsingh changed the title [WebServerPlugin ] Asynchronous response streaming [WebServerPlugin ] Add support for Asynchronous response streaming Mar 24, 2020
@sebastiendarocha
Copy link

sebastiendarocha commented Aug 12, 2020

Thanks @abhinavsingh

I have the same problem, I'm proxying image so the standard plugin didn't work. When looping to send server data to the client , had a 10sec timeout after the server finished to send the data.

Using a thread improved greatly the performance:

class DataFetcher(Thread):
    def __init__(self, url, request, client):
        Thread.__init__(self)
        self.url = url
        self.request = request
        self.client = client
        logger.debug("DataFetcher created")


    def run(self):
        logger.debug("DataFetcher running on %s", self.url)
        url = self.url
        with socket_connection((text_(url.hostname), url.port if url.port else 443)) as sock:

            if url.scheme == b'https':
                with context.wrap_socket(sock, server_hostname=url.hostname) as conn:
                    self.fetch_server_data(conn, self.request)

            else:
                self.fetch_server_data(sock, self.request)

    def fetch_server_data(self, conn, request):

        conn.send(request.build())
        while True:
            buff = conn.recv(DEFAULT_BUFFER_SIZE)

            if not buff:
                logger.debug('Remote host connection ended')
                return

            self.client.queue(memoryview(buff))

On the Web plugin, only need this now:

    def handle_request(self, request: HttpParser) -> None:

        upstream = random.choice(self.REVERSE_PROXY_PASS)
        url = urlsplit(upstream)
        assert url.hostname

        fetch = DataFetcher(url, request, self.client)
        fetch.start()

But a 1sec timeout remains, any idea why ?

Edit:
See this logs

2020-08-12 14:28:19,871 - pid:28897 [D] fetch_server_data:79 - Remote host connection ended
2020-08-12 14:28:20,872 - pid:28897 [D] handle_writables:277 - Client is ready for writes, flushing buffer
2020-08-12 14:28:20,872 - pid:28897 [D] flush:91 - flushed 17 bytes to client
2020-08-12 14:28:20,872 - pid:28897 [D] handle_writables:277 - Client is ready for writes, flushing buffer
2020-08-12 14:28:20,872 - pid:28897 [D] flush:91 - flushed 135 bytes to client
2020-08-12 14:28:20,872 - pid:28897 [D] handle_writables:277 - Client is ready for writes, flushing buffer
2020-08-12 14:28:20,872 - pid:28897 [D] flush:91 - flushed 2 bytes to client
2020-08-12 14:28:20,872 - pid:28897 [D] handle_readables:302 - Client is ready for reads, reading
2020-08-12 14:28:20,872 - pid:28897 [D] handle_readables:320 - Client closed connection, tearing down...
2020-08-12 14:28:20,872 - pid:28897 [I] access_log:232 - ::1:56800 - GET /maps/my_mapfile?SERVICE=WMS&VERSION=1.3.0&REQUEST=GetMap&BBOX=42.33270000000000266,1.721810000000000063,44.94259999999999877,4.407079999999999664&CRS=EPSG:4326&WIDTH=281&HEIGHT=274&LAYERS=layer_public&STYLES=&FORMAT=image/png&DPI=96&MAP_RESOLUTION=96&FORMAT_OPTIONS=dpi:96&TRANSPARENT=TRUE - 1006.13 ms
2020-08-12 14:28:20,872 - pid:28897 [D] shutdown:217 - Closing client connection <socket.socket fd=21, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=0, laddr=('::1', 8899, 0, 0), raddr=('::1', 56800, 0, 0)> at address ('::1', 56800, 0, 0) has buffer False
2020-08-12 14:28:20,872 - pid:28897 [D] shutdown:225 - Client connection shutdown successful
2020-08-12 14:28:20,872 - pid:28897 [D] shutdown:230 - Client connection closed

proxy seems to wait 1 second before sending the data to the client. Do I have to do something to tell the core that the request is finished, go on ?

@abhinavsingh
Copy link
Owner

@sebastiendarocha @ahmedtalhakhan Necessary support to allow async IO operation within WebServerPlugin context has landed. This should now be possible.

I'll update the ReverseProxyPlugin which can be used as base to write your own custom async IO operations. Then large files will also not be an issue.

Due to synchronous operations within the plugin, proxy.py doesn't work well as sync operation blocks the core loop.

See https://github.com/abhinavsingh/proxy.py/blob/develop/proxy/http/server/plugin.py#L41-L70 for new plugin callback methods.

@abhinavsingh
Copy link
Owner

abhinavsingh commented Nov 5, 2021

@sebastiendarocha @ahmedtalhakhan

PTAL at #675

Thank you for your patience.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants