Skip to content
This repository has been archived by the owner on Aug 8, 2018. It is now read-only.

Commit

Permalink
body fetching flow control
Browse files Browse the repository at this point in the history
  • Loading branch information
rairyx committed Oct 2, 2017
1 parent 6652afe commit 2c5ca1a
Showing 1 changed file with 33 additions and 52 deletions.
85 changes: 33 additions & 52 deletions pyethapp/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ def fetch_hashchain(self):

if skeleton_fetch and not skeleton:
remaining = self.fetch_headers(self.skeleton_peer,from0)

skeleton_fetch = False
if not remaining:
log_st.warn('no more skeleton received')
Expand All @@ -155,20 +154,18 @@ def fetch_hashchain(self):
#should not continuew??

if skeleton_fetch:

self.fetch_headerbatch(from0,skeleton)
log_st.debug('header batch', headerbatch= self.batch_result)
# log_st.debug('header batch', headerbatch= self.batch_result)
# check result
if self.header_processed>0 :
# self.fetch_blocks(header_batch)
#from0 = from0 + self.max_skeleton_size*self.max_blockheaders_per_request
from0 += self.header_processed
remaining =self.batch_result[self.header_processed:]
else:
return self.exit(success=False)

#scheduling block download for unprocessed headers in the skeleton or last header batch
if remaining:
log_st.debug('fetching new skeletons')
# self.fetch_blocks(remaining)
log_st.debug('scheduling new headers',count= len(remaining), start=from0)
self.synchronizer.blockheader_queue.put(remaining)
from0+=len(remaining)


Expand Down Expand Up @@ -233,7 +230,7 @@ def fetch_headerbatch(self,origin,skeleton):
self.pending_headerRequests[proto] = HeaderRequest(start)
proto.idle = False
fetching = True
log_st.debug('sent header request',request= start , proto=proto)
log_st.debug('sent header request',request=start , proto=proto)
else:
task_empty= True
break
Expand Down Expand Up @@ -331,15 +328,19 @@ def verify_headers(self,proto,headers):

def fetch_headers(self,proto, fromx):
deferred = AsyncResult()
blockheaders_batch=[]
proto.send_getblockheaders(fromx,self.max_blockheaders_per_request)
blockheaders_batch=None
proto.send_getblockheaders(fromx,self.max_blockheaders_per_request,0,1)
try:
self.requests[proto] = deferred
blockheaders_batch = deferred.get(block=True,timeout=self.blockheaders_request_timeout)
except gevent.Timeout:
log_st.warn('syncing batch hashchain timed out')
return []
proto.stop()
return self.exit(success=False)
finally:
return blockheaders_batch
del self.requests[proto]

return blockheaders_batch


def receive_blockheaders(self, proto, blockheaders):
Expand Down Expand Up @@ -405,7 +406,7 @@ def run(self):
def schedule_block_fetch(self):
batch_header = []
log_st.debug('start sheduleing blocks')
self.synchronizer.blockheader_queue = Queue()
self.synchronizer.blockheader_queue = Queue(maxsize=8192)

while True:
batch_header= self.synchronizer.blockheader_queue.get()
Expand Down Expand Up @@ -436,32 +437,18 @@ def fetch_blocks(self):
num_blocks = 0
num_fetched = 0
retry = 0
last_block = None
# batch_header = []
#while not self.blockheaders_queue.empty():
# self.synchronizer.blockheader_queue = Queue()
# while True:
# batch_header= self.synchronizer.blockheader_queue.get()
# num_blocks = len(batch_header)
# log_body_st.debug('delivered headers', delivered_heades=batch_header)
# gevent.sleep(0)
# while batch_header:
# limit = len(batch_header) if len(batch_header) < self.max_blocks_process else self.max_blocks_process
# blockbody_batch = batch_header[:limit]
# for header in blockbody_batch:
#check chain order
#self.block_requests_pool[header.hash]= header
# self.block_requests_pool.append(header)
# self.bodytask_queue.put((header.number,header))
#check if length block_requests_pool is equal to blockhashes_batch
# batch_header = batch_header[limit:]

last_block = None
throttled = False
while True:
try:
result = self.fetch_ready.get()
log_st.debug('start fetching blocks')
if throttled:
gevent.sleep(0.1)
num_blocks = len(self.block_requests_pool)
deferred = AsyncResult()
self.body_request=deferred
# throttled = False

#check timed out pending requests
for proto in list(self.pending_bodyRequests):
Expand All @@ -477,8 +464,6 @@ def fetch_blocks(self):
# proto.body_idle = True
proto.stop()

# log_st.debug('header task queue size, pending queue size, batch_requestsize',size=self.bodytask_queue.qsize(),pending=len(self.pending_blockRequests),batch_request=len(self.block_requests_pool))
#if self.headertask_queue.qsize == 0 and len(self.pending_headerRequests)==0 and len(self.batch_requests)==0 :
if len(self.block_requests_pool) == 0:
log_body_st.debug('block body fetching completed!')
# return True
Expand All @@ -491,6 +476,14 @@ def fetch_blocks(self):
# assert proto not in self.requests
if proto.is_stopped:
continue

#if self.chainservice.block_queue.qsize()> 1010:
if pending>8192-num_fetched:
throttled = True
log_body_st.debug('throttled')
break
else:
throttled = False
proto_deferred = AsyncResult()
# check if it's finished
block_batch=[]
Expand All @@ -515,10 +508,10 @@ def fetch_blocks(self):
break
# check if there are protos available for header fetching
#if not fetching and not self.headertask_queue.empty() and pending == len(self.pending_headerRequests):
if not fetching and not task_empty:
if not fetching and not task_empty and not throttled:
log_body_st.warn('no protocols available')
return self.exit(success=False)
elif task_empty and pending==len(self.pending_bodyRequests):
if task_empty or throttled and pending==len(self.pending_bodyRequests):
continue
try:
proto_received = deferred.get(timeout=self.blocks_request_timeout)['proto']
Expand Down Expand Up @@ -558,7 +551,7 @@ def fetch_blocks(self):
# add received t_blocks
num_fetched += len(bodies)
log_body_st.debug('received block bodies',
num=len(bodies),blockbody=bodies,num_fetched=num_fetched,
num=len(bodies),num_fetched=num_fetched,
total=num_blocks, missing=num_blocks - num_fetched)
proto_received.body_idle=True
del self.requests[proto_received]
Expand All @@ -581,18 +574,6 @@ def fetch_blocks(self):
except Exception as ex:
log_body_st.error(error = ex)



# for body in bodies:
# try:
# h = headers.pop(0)
# t_block = TransientBlock(h, body.transactions, body.uncles)
# self.chainservice.add_block(t_block, proto) # this blocks if the queue is full
# self.block_requests_pool.remove(h)
# except IndexError as e:
# log_st.error('headers and bodies mismatch', error=e)
# self.exit(success=False)
# log_st.debug('adding blocks done', took=time.time() - ts)

# done
last_block = t_block
Expand Down Expand Up @@ -665,7 +646,7 @@ def __init__(self, chainservice, force_sync=None):
self._protocols = dict() # proto: chain_difficulty
self.synctask = None
self.syncbody = None
self.blockheader_queue = Queue()
self.blockheader_queue = Queue(maxsize=8192)

def synctask_exited(self, success=False):
# note: synctask broadcasts best block
Expand Down

0 comments on commit 2c5ca1a

Please sign in to comment.