Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keyerror data token #316

Merged
merged 12 commits into from
Feb 16, 2022
39 changes: 28 additions & 11 deletions pyicloud/services/drive.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
"""Drive service."""
from datetime import datetime, timedelta
import json
import logging
import io
import mimetypes
import os
import time
from re import search
from requests import Response

from pyicloud.exceptions import PyiCloudAPIResponseException


LOGGER = logging.getLogger(__name__)


class DriveService:
"""The 'Drive' iCloud service."""
Expand Down Expand Up @@ -42,6 +48,7 @@ def get_node_data(self, node_id):
]
),
)
self._raise_if_error(request)
return request.json()[0]

def get_file(self, file_id, **kwargs):
Expand All @@ -52,16 +59,22 @@ def get_file(self, file_id, **kwargs):
self._document_root + "/ws/com.apple.CloudDocs/download/by_id",
params=file_params,
)
if not response.ok:
return None
url = response.json()["data_token"]["url"]
return self.session.get(url, params=self.params, **kwargs)
self._raise_if_error(response)
response_json = response.json()
package_token = response_json.get("package_token")
data_token = response_json.get("data_token")
if data_token and data_token.get("url"):
return self.session.get(data_token["url"], params=self.params, **kwargs)
if package_token and package_token.get("url"):
return self.session.get(package_token["url"], params=self.params, **kwargs)
raise KeyError("'data_token' nor 'package_token'")

def get_app_data(self):
"""Returns the app library (previously ubiquity)."""
request = self.session.get(
self._service_root + "/retrieveAppLibraries", params=self.params
)
self._raise_if_error(request)
return request.json()["items"]

def _get_upload_contentws_url(self, file_object):
Expand Down Expand Up @@ -92,8 +105,7 @@ def _get_upload_contentws_url(self, file_object):
}
),
)
if not request.ok:
return None
self._raise_if_error(request)
return (request.json()[0]["document_id"], request.json()[0]["url"])

def _update_contentws(self, folder_id, sf_info, document_id, file_object):
Expand Down Expand Up @@ -131,19 +143,16 @@ def _update_contentws(self, folder_id, sf_info, document_id, file_object):
headers={"Content-Type": "text/plain"},
data=json.dumps(data),
)
if not request.ok:
return None
self._raise_if_error(request)
return request.json()

def send_file(self, folder_id, file_object):
"""Send new file to iCloud Drive."""
document_id, content_url = self._get_upload_contentws_url(file_object)

request = self.session.post(content_url, files={file_object.name: file_object})
if not request.ok:
return None
self._raise_if_error(request)
content_response = request.json()["singleFile"]

self._update_contentws(folder_id, content_response, document_id, file_object)

def create_folders(self, parent, name):
Expand All @@ -164,6 +173,7 @@ def create_folders(self, parent, name):
}
),
)
self._raise_if_error(request)
return request.json()

def rename_items(self, node_id, etag, name):
Expand All @@ -183,6 +193,7 @@ def rename_items(self, node_id, etag, name):
}
),
)
self._raise_if_error(request)
return request.json()

def move_items_to_trash(self, node_id, etag):
Expand All @@ -202,6 +213,7 @@ def move_items_to_trash(self, node_id, etag):
}
),
)
self._raise_if_error(request)
return request.json()

@property
Expand All @@ -217,6 +229,11 @@ def __getattr__(self, attr):
def __getitem__(self, key):
return self.root[key]

def _raise_if_error(self, response):
if not response.ok:
api_error = PyiCloudAPIResponseException(response.reason, response.status_code)
LOGGER.error(api_error)
raise api_error

class DriveNode:
"""Drive node."""
Expand Down