Skip to content

Commit

Permalink
Merge pull request #55 from martindurant/sessions
Browse files Browse the repository at this point in the history
All HTTP calls through a session
  • Loading branch information
martindurant authored Jan 3, 2018
2 parents 428f2f4 + a0b3b57 commit 562719b
Showing 1 changed file with 38 additions and 30 deletions.
68 changes: 38 additions & 30 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def connect(self, refresh=False):
Force refresh, even if the token is expired.
"""
token = self.input_token
self.session = requests.Session()
project, access = self.project, self.access
if token is not None and str(token) != 'cloud':
if 'type' in token or isinstance(token, str):
Expand All @@ -247,16 +248,16 @@ def connect(self, refresh=False):
# no credentials - try to ask google in the browser
scope = "https://www.googleapis.com/auth/devstorage." + access
path = 'https://accounts.google.com/o/oauth2/device/code'
r = requests.post(path,
params={'client_id': not_secret['client_id'],
'scope': scope})
r = self.session.post(
path, params={'client_id': not_secret['client_id'],
'scope': scope})
validate_response(r, path)
data = json.loads(r.content.decode())
print('Navigate to:', data['verification_url'])
print('Enter code:', data['user_code'])
while True:
time.sleep(1)
r = requests.post(
r = self.session.post(
"https://www.googleapis.com/oauth2/v4/token",
params={
'client_id': not_secret['client_id'],
Expand All @@ -276,14 +277,15 @@ def connect(self, refresh=False):
if data.get('type', None) == 'cloud':
path = ('http://metadata.google.internal/computeMetadata/v1/'
'instance/service-accounts/default/token')
r = requests.get(path, headers={'Metadata-Flavor': 'Google'})
r = self.session.get(
path, headers={'Metadata-Flavor': 'Google'})
validate_response(r, path)
data = r.json()
data['timestamp'] = time.time()
data['type'] = 'cloud'
else:
path = "https://www.googleapis.com/oauth2/v4/token"
r = requests.post(
r = self.session.post(
path,
params={'client_id': data['client_id'],
'client_secret': data['client_secret'],
Expand Down Expand Up @@ -319,7 +321,7 @@ def _call(self, method, path, *args, **kwargs):
if v is None:
del kwargs[k]
json = kwargs.pop('json', None)
meth = getattr(requests, method)
meth = getattr(self.session, method)
if args:
path = path.format(*[quote_plus(p) for p in args])
for retry in range(self.retries):
Expand Down Expand Up @@ -526,7 +528,7 @@ def url(self, path):
def cat(self, path):
""" Simple one-shot get of file data """
details = self.info(path)
return _fetch_range(self.header, details)
return _fetch_range(self.header, details, self.session)

def get(self, rpath, lpath, blocksize=5 * 2 ** 20):
with self.open(rpath, 'rb', block_size=blocksize) as f1:
Expand Down Expand Up @@ -867,8 +869,9 @@ def _upload_chunk(self, final=False):
self.offset, self.offset + l - 1)
head.update({'Content-Type': 'application/octet-stream',
'Content-Length': str(l)})
r = requests.post(self.location, params={'uploadType': 'resumable'},
headers=head, data=data)
r = self.gcsfs.session.post(
self.location, params={'uploadType': 'resumable'},
headers=head, data=data)
validate_response(r, self.location)
if 'Range' in r.headers:
assert not final, "Response looks like upload is partial"
Expand Down Expand Up @@ -896,10 +899,12 @@ def _upload_chunk(self, final=False):
self.offset += l

def _initiate_upload(self):
r = requests.post('https://www.googleapis.com/upload/storage/v1/b/%s/o'
% quote_plus(self.bucket),
params={'uploadType': 'resumable'},
headers=self.gcsfs.header, json={'name': self.key, 'metadata': self.metadata})
r = self.gcsfs.session.post(
'https://www.googleapis.com/upload/storage/v1/b/%s/o'
% quote_plus(self.bucket),
params={'uploadType': 'resumable'},
headers=self.gcsfs.header, json={'name': self.key,
'metadata': self.metadata})
self.location = r.headers['Location']

def _simple_upload(self):
Expand All @@ -909,9 +914,9 @@ def _simple_upload(self):
data = self.buffer.read()
path = ('https://www.googleapis.com/upload/storage/v1/b/%s/o'
% quote_plus(self.bucket))
r = requests.post(path,
params={'uploadType': 'media', 'name': self.key},
headers=head, data=data)
r = self.gcsfs.session.post(
path, params={'uploadType': 'media', 'name': self.key},
headers=head, data=data)
validate_response(r, path)
size, md5 = int(r.json()['size']), r.json()['md5Hash']
if self.consistency == 'size':
Expand All @@ -929,17 +934,18 @@ def _fetch(self, start, end):
self.start = start
self.end = end + self.blocksize
self.cache = _fetch_range(self.gcsfs.header, self.details,
start, self.end)
self.gcsfs.session, start, self.end)
if start < self.start:
new = _fetch_range(self.gcsfs.header, self.details,
start, self.start)
self.gcsfs.session, start, self.start)
self.start = start
self.cache = new + self.cache
if end > self.end:
if self.end > self.size:
return
new = _fetch_range(self.gcsfs.header, self.details,
self.end, end + self.blocksize)
self.gcsfs.session, self.end,
end + self.blocksize)
self.end = end + self.blocksize
self.cache = self.cache + new

Expand Down Expand Up @@ -1007,12 +1013,13 @@ def __exit__(self, *args):
self.close()


def _fetch_range(head, obj_dict, start=None, end=None):
def _fetch_range(head, obj_dict, session, start=None, end=None):
""" Get data from GCS
head : dict
Contains authorization header
obj_dict : an entry from ls() or info()
session: requests.Session instance
start, end : None or integers
if not both None, fetch only given range
"""
Expand All @@ -1024,28 +1031,29 @@ def _fetch_range(head, obj_dict, start=None, end=None):
end = end or 0
head = head.copy()
head['Range'] = 'bytes=%i-%i' % (start, end - 1)
back = requests.get(obj_dict['mediaLink'], headers=head)
back = session.get(obj_dict['mediaLink'], headers=head)
data = back.content
if data == b'Request range not satisfiable':
return b''
return data


def put_object(credentials, bucket, name, data):
def put_object(credentials, bucket, name, data, session):
""" Simple put, up to 5MB of data
credentials : from auth()
bucket : string
name : object name
data : binary
session: requests.Session instance
"""
out = requests.post('https://www.googleapis.com/upload/storage/'
'v1/b/%s/o?uploadType=media&name=%s' % (
quote_plus(bucket), quote_plus(name)),
headers={'Authorization': 'Bearer ' +
credentials.access_token,
'Content-Type': 'application/octet-stream',
'Content-Length': len(data)}, data=data)
out = session.post('https://www.googleapis.com/upload/storage/'
'v1/b/%s/o?uploadType=media&name=%s' % (
quote_plus(bucket), quote_plus(name)),
headers={'Authorization': 'Bearer ' +
credentials.access_token,
'Content-Type': 'application/octet-stream',
'Content-Length': len(data)}, data=data)
assert out.status_code == 200


Expand Down

0 comments on commit 562719b

Please sign in to comment.