Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Cython in HTTP and fix TCyBufferedTransport early flush issue #129

Merged
merged 4 commits into from
Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 28 additions & 28 deletions thriftpy2/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,10 @@

from thriftpy2.thrift import TProcessor, TClient
from thriftpy2.server import TServer
from thriftpy2.transport import (
TTransportBase,
TMemoryBuffer
)
# Explicitly use Python version instead of Cython version for libraries below
# to address some mystery issues for now.
#
# Avoid TypeError: Cannot convert TBufferedTransport to
# thriftpy2.transport.cybase.CyTransportBase.
from thriftpy2.protocol.binary import TBinaryProtocolFactory
# Avoid raised error of too small buffer allocated by TCyBufferedTransport.
# Also, using TCyBufferedTransportFactory will let THttpClient write a broken
# string to server, which making server freezed in transport.readall() method.
from thriftpy2.transport.buffered import (
TBufferedTransport,
TBufferedTransportFactory,
)
from thriftpy2.transport import TTransportBase, TMemoryBuffer

from thriftpy2.protocol import TBinaryProtocolFactory
from thriftpy2.transport import TBufferedTransportFactory


HTTP_URI = '{scheme}://{host}:{port}{path}'
Expand Down Expand Up @@ -120,14 +107,16 @@ class THttpServer(TServer):
def __init__(self,
processor,
server_address,
itrans_factory,
iprot_factory,
server_class=http_server.HTTPServer):
"""Set up protocol factories and HTTP server.
See http.server for server_address.
See TServer for protocol factories.
"""
TServer.__init__(self, processor, trans=None,
itrans_factory=None, iprot_factory=iprot_factory,
itrans_factory=itrans_factory,
iprot_factory=iprot_factory,
otrans_factory=None, oprot_factory=None)

thttpserver = self
Expand All @@ -137,12 +126,18 @@ class RequestHander(http_server.BaseHTTPRequestHandler):

def do_POST(self):
# Don't care about the request path.
itrans = TFileObjectTransport(self.rfile)
otrans = TFileObjectTransport(self.wfile)
itrans = TBufferedTransport(
itrans, int(self.headers['Content-Length']))
otrans = TMemoryBuffer()
# Pre-read all of the data into a BytesIO. Buffered transport
# was previously configured to read everything on the first
# consumption, but that was a hack relying on the internal
# mechanism and prevents other transports from working, so
# replicate that properly to prevent timeout issues
content_len = int(self.headers['Content-Length'])
buf = BytesIO(self.rfile.read(content_len))
itrans = TFileObjectTransport(buf)
itrans = thttpserver.itrans_factory.get_transport(itrans)
iprot = thttpserver.iprot_factory.get_protocol(itrans)

otrans = TMemoryBuffer()
oprot = thttpserver.oprot_factory.get_protocol(otrans)
try:
thttpserver.processor.process(iprot, oprot)
Expand Down Expand Up @@ -222,13 +217,16 @@ def write(self, buf):
self.__wbuf.write(buf)

def flush(self):
if self.isOpen():
self.close()
self.open()

# Pull data out of buffer
# Do this before opening a new connection in case there isn't data
data = self.__wbuf.getvalue()
self.__wbuf = BytesIO()
if not data: # No data to flush, ignore
return

if self.isOpen():
self.close()
self.open()

# HTTP request
self.__http.putrequest('POST', self.path, skip_host=True)
Expand Down Expand Up @@ -323,8 +321,10 @@ def client_context(service, host='localhost', port=9090, path='', scheme='http',


def make_server(service, handler, host, port,
proto_factory=TBinaryProtocolFactory()):
proto_factory=TBinaryProtocolFactory(),
trans_factory=TBufferedTransportFactory()):
processor = TProcessor(service, handler)
server = THttpServer(processor, (host, port),
itrans_factory=trans_factory,
iprot_factory=proto_factory)
return server
2 changes: 1 addition & 1 deletion thriftpy2/protocol/cybin/cybin.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ cdef class TCyBinaryProtocol(object):
write_i32(self.trans, seqid)

def write_message_end(self):
self.trans.c_flush()
pass

def read_struct(self, obj):
try:
Expand Down
7 changes: 5 additions & 2 deletions thriftpy2/transport/buffered/cybuffered.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ cdef class TCyBufferedTransport(CyTransportBase):
int r

if cap < sz:
self.c_flush()
self.c_dump_wbuf()

r = self.wbuf.write(sz, data)
if r == -1:
Expand All @@ -74,11 +74,14 @@ cdef class TCyBufferedTransport(CyTransportBase):
raise MemoryError("grow read buffer fail")

cdef c_flush(self):
self.c_dump_wbuf()
self.trans.flush()

cdef c_dump_wbuf(self):
cdef bytes data
if self.wbuf.data_size > 0:
data = self.wbuf.buf[:self.wbuf.data_size]
self.trans.write(data)
self.trans.flush()
self.wbuf.clean()

def getvalue(self):
Expand Down