Skip to content
This repository has been archived by the owner on Aug 8, 2018. It is now read-only.

Commit

Permalink
Rewrite header verification
Browse files Browse the repository at this point in the history
  • Loading branch information
rairyx committed Feb 1, 2018
1 parent 68520af commit c478bb0
Showing 1 changed file with 38 additions and 38 deletions.
76 changes: 38 additions & 38 deletions pyethapp/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __init__(self, synchronizer, proto, blockhash, chain_difficulty=0, originato
self.chain_difficulty = chain_difficulty
self.requests = dict() # proto: Event
self.header_processed = 0
self.batch_requests = [] #batch header request
self.batch_requests = dict() #batch header request
self.batch_result= [None]*self.max_skeleton_size*self.max_blockheaders_per_request
self.headertask_queue = Q.PriorityQueue()
self.pending_headerRequests = dict()
Expand Down Expand Up @@ -129,7 +129,8 @@ def fetch_hashchain(self):
else:
self.skeleton_peer=self.originating_proto
self.requests[self.skeleton_peer] = deferred
self.skeleton_peer.send_getblockheaders(from0+self.max_blockheaders_per_request,self.max_skeleton_size,self.max_blockheaders_per_request-1,0)
self.skeleton_peer.send_getblockheaders(from0+self.max_blockheaders_per_request-1,self.max_skeleton_size,self.max_blockheaders_per_request-1,0)

try:
skeleton = deferred.get(block=True,timeout=self.blockheaders_request_timeout)
# assert isinstance(skeleton,list)
Expand Down Expand Up @@ -185,16 +186,17 @@ def fetch_headerbatch(self,origin,skeleton):
# while True
self.header_processed = 0
#from0=skeleton[0]
self.batch_requests=[]
self.batch_requests=dict()
self.batch_result= [None]*self.max_skeleton_size*self.max_blockheaders_per_request
headers= []
proto = None
proto_received=None #proto which delivered the header
retry = 0
received = False
for header in skeleton:
self.batch_requests.append(header)
self.headertask_queue.put((header.number,header.number))
for i, header in enumerate(skeleton):
index = origin + i*self.max_blockheaders_per_request
self.batch_requests[index] = header
self.headertask_queue.put((index,index))

while True:
# requests = iter(self.batch_requests)
Expand Down Expand Up @@ -230,14 +232,16 @@ def fetch_headerbatch(self,origin,skeleton):
task_empty = False
pending=len(self.pending_headerRequests)
for proto in self.idle_protocols():

proto_deferred = AsyncResult()
# check if it's finished

# check if the proto is already busy

if self.pending_headerRequests.get(proto):
continue

if not self.headertask_queue.empty():
start=self.headertask_queue.get()[1]
self.requests[proto] = proto_deferred
proto.send_getblockheaders(start,self.max_blockheaders_per_request)
proto.send_getblockheaders(start,self.max_blockheaders_per_request,0,0)
self.pending_headerRequests[proto] = HeaderRequest(start)
proto.idle = False
fetching = True
Expand Down Expand Up @@ -281,21 +285,34 @@ def fetch_headerbatch(self,origin,skeleton):


def deliver_headers(self,origin,proto,header):
if header[0] not in self.batch_requests:
if header[0].number not in self.batch_requests:
log_st.debug('header delivered not matching requested headers')
return
start_header= self.pending_headerRequests[proto].start
index = self.pending_headerRequests[proto].start

log_st.debug('index', index=index)
del self.pending_headerRequests[proto]
verified = self.verify_headers(proto,header)

#start= self.batch_requests[index].number
headerhash= self.batch_requests[index].hash
verified = True
if len(header) != self.max_blockheaders_per_request :
verified = False
if verified:
if header[0].number != index:
log_st.warn('First header broke chain ordering',proto=proto,number = header[0].number,start=index)
verified= False
elif header[len(header)-1].hash != headerhash:
log_st.warn('Last header broke skeleton structure', proto= proto,
number= header[len(header)-1].number, headerhash = header[len(header)-1].hash, expected= headerhash)
verified= False
#
if not verified:
log_st.debug('header delivered not verified')
self.headertask_queue.put((start_header,start_header))
return
batch_header= header[::-1] #in hight rising order
self.batch_result[(batch_header[0].number-origin-1):batch_header[0].number-origin-1+len(batch_header)]= batch_header
# log_st.debug('batch result',batch_result= self.batch_result)
self.batch_requests.remove(header[0])
self.batch_result[(header[0].number-origin):header[0].number-origin+len(header)]=header
del self.batch_requests[index]
del self.requests[proto]
header_ready = 0
while (self.header_processed + header_ready) < len(self.batch_result) and self.batch_result[self.header_processed + header_ready]:
Expand All @@ -304,15 +321,15 @@ def deliver_headers(self,origin,proto,header):
if header_ready > 0 :
# Headers are ready for delivery, gather them
processed = self.batch_result[self.header_processed:self.header_processed+header_ready]
# log_st.debug('issue fetching blocks',header_processed=self.header_processed,blocks=processed, proto=proto,count=len(processed),start=processed[0].number)
log_st.debug('issue fetching blocks',header_processed=self.header_processed,blocks=processed, proto=proto,count=len(processed),start=processed[0].number)

count=len(processed)
self.synchronizer.blockheader_queue.put(processed)
# if self.fetch_blocks(processed):
self.header_processed += count
#else:
# return self.batch_result[:self.header_processed]
log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests)
log_st.debug('remaining headers',num=len(self.batch_requests),headers=self.batch_requests.items())



Expand All @@ -325,23 +342,6 @@ def idle_protocols(self):
return idle


def verify_headers(self,proto,headers):
# start = self.pending_headerRequests[proto]
# headerhash= self.batch_requests[start].hash
# if headers[0].number != start:
# log_st.warn('First header broke chain ordering', proto=proto,number =headers[0].number,headerhash = headers[0].hash,start=start)
# return False
# elif headers[len(headers)-1].hash != target

# if request:
# return 0, errNoFetchesPending
if len(headers) != self.max_blockheaders_per_request:
log_st.debug('headers batch count', count=len(headers))

return False

return True


def fetch_headers(self,proto, fromx):
deferred = AsyncResult()
Expand All @@ -365,7 +365,7 @@ def receive_blockheaders(self, proto, blockheaders):
if proto not in self.requests:
log.debug('unexpected blockheaders')
return
if self.batch_requests:
if any(self.batch_requests):
self.header_request.set({'proto':proto,'headers':blockheaders})
elif proto == self.skeleton_peer: #make sure it's from the originating proto
self.requests[proto].set(blockheaders)
Expand Down Expand Up @@ -399,7 +399,7 @@ def __init__(self, synchronizer, blockhash, chain_difficulty=0, originator_only=
self.block_requests_pool = []
self.bodytask_queue = Q.PriorityQueue()
self.body_cache = [None]*self.body_cache_limit
self.body_cache_offset= self.chain.head.number+1
self.body_cache_offset= self.chain.head.number
self.body_downloaded = dict()
self.pending_bodyRequests = dict()
self.requests = dict() # proto: Event
Expand Down

0 comments on commit c478bb0

Please sign in to comment.