-
Notifications
You must be signed in to change notification settings - Fork 5
/
pmd5
executable file
·236 lines (196 loc) · 7.23 KB
/
pmd5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
#!/usr/bin/env python
# Copyright Genome Research Ltd 2014
# Author Guy Coates <gmpc@sanger.ac.uk>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
"""
Parallel md5 comparision. This program compares the MD5sums of
files in two directory trees.
md5s are calculated in 100 MB chunks; we do this to ensure
good load distribution between the workers.
"""
from mpi4py import MPI
from lib.parallelwalk import ParallelWalk
import argparse
import os
import stat
import sys
import traceback
import hashlib
import ctypes
from lib.readdir import readdir
import math
def safestat(filename):
"""lstat sometimes get Interrupted system calls; wrap it up so we can
retry"""
while True:
try:
statdata = os.lstat(filename)
return(statdata)
except IOError, error:
if error.errno != 4:
raise
def fadviseSeqNoCache(fileD):
"""Advise the kernel that we are only going to access file-descriptor
fileD once, sequentially."""
clib = ctypes.CDLL("libc.so.6", use_errno=True)
POSIX_FADV_SEQUENTIAL = 2
POSIX_FADV_DONTNEED = 4
offset = ctypes.c_int64(0)
length = ctypes.c_int64(0)
clib.posix_fadvise(fileD, offset, length, POSIX_FADV_SEQUENTIAL)
clib.posix_fadvise(fileD, offset, length, POSIX_FADV_DONTNEED)
def mungePath(src, dst, f):
"""Convert the sourcepath to the desinationpath"""
suffix = f.partition(src)[2]
dest = dst + suffix
return(dest)
def calcmd5(filename, chunk):
"""calculate the md5sum of part of a file."""
md5hash = hashlib.new("md5")
# Use the optimal blocksize for IO.
filestat = safestat(filename)
blksize = filestat.st_blksize
mode = filestat.st_mode
if stat.S_ISLNK(mode):
return(None)
fh = open(filename, "rb")
fadviseSeqNoCache(fh.fileno())
# MD5 the whole file
if chunk < 0:
while True:
data = fh.read(blksize)
if not data:
break
md5hash.update(data)
else:
#MD5 just our chunk
fh.seek(chunk * CHUNKSIZE)
nreads, remainder = divmod(CHUNKSIZE, blksize)
for i in xrange(nreads):
data = fh.read(blksize)
md5hash.update(data)
if remainder > 0:
data = fh.read(remainder)
md5hash.update(data)
fh.close()
digest = md5hash.hexdigest()
return(digest)
class parmd5(ParallelWalk):
"""Extend the generic parallel walk so that it md5s the files
"""
def ProcessFile(self, filename, chunk):
"""
md5 sum the source and destination files; complain if they are different.
"""
dstfile = mungePath(dir1, dir2, filename)
try:
srchash = calcmd5(filename, chunk)
except OSError as error:
print "cannot access `%s':" % filename,
print os.strerror(error.errno)
srchash = "NAN"
try:
dsthash = calcmd5(dstfile, chunk)
except OSError as error:
print "cannot access `%s':" % dstfile,
print os.strerror(error.errno)
dsthash = "NAN"
if srchash <> dsthash:
print "R%i: MISMATCH: %s %s: %s %s chunk %i" %(rank, filename, dstfile, srchash, dsthash,
chunk)
if VERBOSE:
print "R%i: %s %s: %s %s chunk %i" %(rank, filename, dstfile, srchash, dsthash,
chunk)
def _ProcessNode(self):
"""Process a node in the directory tree. If the node is another directory,
enumerate its contents and add it to the list of nodes to be processed in the
future. If the node is a file, check whether it is larger than chunksize. If
it is, chunk it."""
# chunk = -1, file is to small to be chunked.
# chunk >=0, file has been split into chunk 0...n.
item = self.items.pop()
# The seed won't have a chunk set.
if len(item) == 2:
filename, filetype, chunk = item[0], item[1], -1
else:
(filename, filetype, chunk) = (item)
try:
# If the filesystem supports readdir d_type, then we will know if the node is
# a file or a directory without doing any extra work. If it does not, we have
# to do a stat.
if filetype == 0:
s = os.lstat(filename)
if stat.S_ISDIR(s.st_mode):
filetype = 4
else:
filetype = 8
# If we a directory, enumerate its contents and add them to the list of nodes
# to be processed.
if filetype == 4:
for node in readdir(filename):
if not node.d_name in (".",".."):
fullname = os.path.join(filename, node.d_name)
self.items.appendleft((fullname, node.d_type, -1))
self.ProcessDir(filename)
else:
# Check to see if the file is large; if it is, chunk it and move on.
if chunk == -1:
size = os.lstat(filename).st_size
if size > CHUNKSIZE:
chunks = int(math.ceil(size / float(CHUNKSIZE)))
for i in range(chunks):
self.items.appendleft((filename, 8, i))
return()
self.ProcessFile(filename, chunk)
except OSError as error:
print "cannot access `%s':" % filename,
print os.strerror(error.errno)
return()
class MPIargparse(argparse.ArgumentParser):
"""Subclass argparse so we can add a call to Abort, to tidy up MPI bits and pieces."""
def error(self,message):
self.print_usage(sys.stderr)
Abort()
def print_help(self, file=None):
argparse.ArgumentParser.print_help(self, file=None)
Abort()
def Abort():
print ""
MPI.COMM_WORLD.Abort(0)
exit (1)
def parseargs():
parser = MPIargparse(
formatter_class = argparse.RawDescriptionHelpFormatter,
description = """Parallel md5tree compare. This program will walk
two directory trees and compare the md5sums of all files.""")
parser.add_argument("DIR", nargs=2, help="Directories to compare.")
parser.add_argument("-v", help="verbose", default=False, action="store_true")
if len(sys.argv) == 1:
parser.print_help()
Abort()
args = parser.parse_args()
return(args)
# Begin main program. We have to catch exceptions
# and call MPIABORT, otherwise other ranks will
# hang indefinately.
try:
# 100 MB chunk
CHUNKSIZE = 1024 * 1024 * 100
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
workers = comm.size
args = parseargs()
dir1 = args.DIR[0].rstrip("/")
dir2 = args.DIR[1].rstrip("/")
VERBOSE = args.v
crawler = parmd5(comm)
results = crawler.Execute(dir1)
print "done"
exit(0)
except (Exception, KeyboardInterrupt), err:
print "Exception on rank %i" %rank
print traceback.format_exc()
Abort()