Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test Ashwin and Jiani's changes #4436

Closed
wants to merge 93 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
05a2696
Bump ratarmountcore to 0.3.1
epicfaace Apr 11, 2022
ff048fe
Merge branch 'master' into epicfaace-patch-9
epicfaace May 31, 2022
ac5ed66
POC: parallel file uploading and index creation
epicfaace Aug 16, 2022
b9fbc35
local test
wwwjn Jan 2, 2023
20aea6d
still buggy
wwwjn Jan 3, 2023
a1842a4
find error with GZipStrema
wwwjn Jan 4, 2023
ce7de68
clean
wwwjn Jan 4, 2023
78e2b0d
more tests
wwwjn Jan 13, 2023
14cbc59
more tests
wwwjn Jan 21, 2023
fda189a
might be good
wwwjn Jan 25, 2023
54507b9
clean
wwwjn Jan 25, 2023
40e0fe8
Merge remote-tracking branch 'origin/master' into parallel-upload
wwwjn Feb 1, 2023
de13ee5
indexed_gzip success, but does not work for folder
wwwjn Feb 1, 2023
2db90e7
works for both file and folder
wwwjn Feb 1, 2023
bde9f10
format
wwwjn Feb 1, 2023
cdb2240
test
wwwjn Feb 10, 2023
0a6e9da
Merge remote-tracking branch 'origin/master' into parallel-upload
wwwjn Feb 14, 2023
d4c6b58
fix unit test
wwwjn Feb 14, 2023
3a9f8d0
fix
wwwjn Feb 15, 2023
8ba4f68
fix
wwwjn Feb 21, 2023
0f409b0
fix half
wwwjn Feb 21, 2023
5c9cc8a
fix file size
wwwjn Feb 21, 2023
d4509f6
temporary fix to pass unittest
wwwjn Feb 21, 2023
99402f7
fix
wwwjn Feb 22, 2023
5f55e54
update file size
wwwjn Feb 23, 2023
f29e7fa
add API
wwwjn Feb 28, 2023
5cc9025
finish test1
wwwjn Mar 1, 2023
cbbb4f9
checkout all tests
wwwjn Mar 1, 2023
5ce3bf8
fix client
wwwjn Mar 1, 2023
0e50f94
Kubernetes: ensure only one pod can be scheduled per node
epicfaace Mar 1, 2023
0490cf5
fix unittest
wwwjn Mar 1, 2023
48d430a
add test
epicfaace Mar 1, 2023
29ba357
Remove assign_cpu_and_gpu_sets
epicfaace Mar 1, 2023
00e1674
fix
epicfaace Mar 1, 2023
4cb879c
Report cpu/gpu/memory from parent node
epicfaace Mar 1, 2023
90b13f0
fix client
wwwjn Mar 8, 2023
e48d91b
fix format
wwwjn Mar 8, 2023
32d39db
Merge remote-tracking branch 'origin/master' into parallel-upload
wwwjn Mar 8, 2023
7e8f511
add requirments
wwwjn Mar 8, 2023
28e5e14
fix upload string
wwwjn Mar 8, 2023
4111e81
add more print
wwwjn Mar 15, 2023
ef3c907
fix --force-compression
wwwjn Mar 15, 2023
c17aa9a
Merge branch 'master' into parallel-upload
wwwjn Mar 15, 2023
ce715e8
Merge remote-tracking branch 'origin/master' into parallel-upload
wwwjn Mar 18, 2023
8b28b1f
fix stream file error
wwwjn Mar 18, 2023
76d2888
comment
epicfaace Mar 21, 2023
61617b3
test
epicfaace Mar 21, 2023
b0af9ed
revert changes, simpler GHA
epicfaace Mar 21, 2023
7741c94
fix
epicfaace Mar 21, 2023
f193e80
revert gha changes
epicfaace Mar 21, 2023
77480f1
cleanup v1
wwwjn Mar 22, 2023
1bd3bbd
delete pycache
wwwjn Mar 22, 2023
1ab8002
Merge branch 'master' into kstats
epicfaace Mar 22, 2023
570aadd
Upgrade kind to fix bug
epicfaace Mar 22, 2023
68594ce
swap
epicfaace Mar 22, 2023
5b97796
Add resources test
epicfaace Mar 22, 2023
caf2864
fix runtime -- bytes reporting
epicfaace Mar 22, 2023
63b185d
comment
epicfaace Mar 22, 2023
bbbc46a
int
epicfaace Mar 22, 2023
1355b0c
cmt
epicfaace Mar 22, 2023
6ed8bf8
fix
epicfaace Mar 22, 2023
92df8e1
fix
epicfaace Mar 22, 2023
8311595
fmt
epicfaace Mar 22, 2023
25e2de5
fix
epicfaace Mar 22, 2023
01d8d66
update
epicfaace Mar 22, 2023
8bb7c5b
rm __pycache__
wwwjn Mar 22, 2023
d6bdefd
more fix
wwwjn Mar 22, 2023
f0e3215
fmt
wwwjn Mar 22, 2023
6d86462
fix fmt
wwwjn Mar 23, 2023
2ab01be
fix docs
wwwjn Mar 23, 2023
f6ab881
change signed url expire time
wwwjn Apr 5, 2023
1e7862e
Allow specifying kubernetes cert directly, not cert path
epicfaace Apr 5, 2023
abe8431
Update test_cli.py
epicfaace Apr 5, 2023
8a31cbf
Merge remote-tracking branch 'origin/kstats' into test
epicfaace Apr 5, 2023
61da1ae
Merge branch 'cpu-gpu-set' into test
epicfaace Apr 5, 2023
d643d55
Merge branch 'epicfaace-patch-18' into test
epicfaace Apr 5, 2023
611a973
Merge branch 'epicfaace-patch-9' into test
epicfaace Apr 5, 2023
d298db5
Merge remote-tracking branch 'origin/parallel-upload' into test
epicfaace Apr 5, 2023
4960cf7
updates, fixes to k8s worker manager
epicfaace Apr 5, 2023
3b4f4dc
pyc
epicfaace Apr 5, 2023
2d930ab
Update docker-compose
epicfaace Apr 6, 2023
7370976
fix
epicfaace Apr 6, 2023
f4589ba
synchronize busy waiting version
wwwjn Apr 10, 2023
0201b6b
Merge branch 'master' into test
wwwjn Apr 12, 2023
3edf962
Merge branch 'parallel-upload' into test
wwwjn Apr 12, 2023
6ad6950
remove pycache
wwwjn Apr 12, 2023
f89109b
Merge branch 'master' into parallel-upload
wwwjn Apr 12, 2023
ddac93b
downgrade ratarmountcore
wwwjn Apr 12, 2023
c06f37d
fmt
wwwjn Apr 12, 2023
eae14d0
merge
wwwjn Apr 12, 2023
75f81af
fix upload1
wwwjn Apr 12, 2023
0ea65a2
Merge branch 'parallel-upload' into test
wwwjn Apr 12, 2023
20fd24e
rm pycache
wwwjn Apr 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,15 @@ jobs:
- netcat netcurl
- edit
- open wopen
- store_add
- store_add parallel
- kubernetes_runtime
runtime: [docker, kubernetes]
exclude:
# netcat / netcurl not supported for kubernetes.
- test: netcat netcurl
runtime: kubernetes
- test: kubernetes_runtime
runtime: docker
steps:
- name: Clear free space
run: |
Expand Down
2 changes: 1 addition & 1 deletion codalab/client/json_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ def update(self, resource_type, data, params=None):
)
)
# Return list iff original data was list
return result if isinstance(data, list) else result[0]
return result if isinstance(data, list) or result is None else result[0]

@wrap_exception('Unable to delete {1}')
def delete(self, resource_type, resource_ids, params=None):
Expand Down
4 changes: 2 additions & 2 deletions codalab/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def _get_azure_sas_url(self, path, **kwargs):
account_name=AZURE_BLOB_ACCOUNT_NAME,
container_name=AZURE_BLOB_CONTAINER_NAME,
account_key=AZURE_BLOB_ACCOUNT_KEY,
expiry=datetime.datetime.now() + datetime.timedelta(hours=1),
expiry=datetime.datetime.now() + datetime.timedelta(hours=10),
blob_name=blob_name,
)
return f"{AZURE_BLOB_HTTP_ENDPOINT}/{AZURE_BLOB_CONTAINER_NAME}/{blob_name}?{sas_token}"
Expand All @@ -306,7 +306,7 @@ def _get_gcs_signed_url(self, path, **kwargs):
blob = bucket.blob(blob_name)
signed_url = blob.generate_signed_url(
version="v4",
expiration=datetime.timedelta(hours=1),
expiration=datetime.timedelta(hours=10),
method=kwargs.get("method", "GET"), # HTTP method. eg, GET, PUT
content_type=kwargs.get("request_content_type", None),
response_disposition=kwargs.get("content_disposition", None),
Expand Down
81 changes: 81 additions & 0 deletions codalab/lib/beam/MultiReaderFileStream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from io import BytesIO
from threading import Lock

from codalab.worker.un_gzip_stream import BytesBuffer


class MultiReaderFileStream(BytesIO):
"""
FileStream that support multiple readers
"""
NUM_READERS = 2
# MAX memory usage <= MAX_BUF_SIZE + max(num_bytes called in read)
MAX_BUF_SIZE = 1024 * 1024 * 1024 # 10 MiB for test

def __init__(self, fileobj):
self._bufs = [BytesBuffer() for _ in range(0, self.NUM_READERS)]
self._pos = [0 for _ in range(0, self.NUM_READERS)]
self._fileobj = fileobj
self._lock = Lock() # lock to ensure one does not concurrently read self._fileobj / write to the buffers.
self._current_max_buf_length = 0

class FileStreamReader(BytesIO):
def __init__(s, index):
s._index = index

def read(s, num_bytes=None):
return self.read(s._index, num_bytes)

def peek(s, num_bytes):
return self.peek(s._index, num_bytes)

self.readers = [FileStreamReader(i) for i in range(0, self.NUM_READERS)]

def _fill_buf_bytes(self, index: int, num_bytes=None):
with self._lock:
while num_bytes is None or len(self._bufs[index]) < num_bytes:
s = self._fileobj.read(num_bytes)
if not s:
break

for i in range(0, self.NUM_READERS):
self._bufs[i].write(s)
self.find_largest_buffer()

def find_largest_buffer(self):
self._current_max_buf_length = len(self._bufs[0])
for i in range(1, self.NUM_READERS):
self._current_max_buf_length = max(self._current_max_buf_length, len(self._bufs[i]))
# print(f"find largest buffer: {self._current_max_buf_length} in thread: {threading.current_thread().name}")

def read(self, index: int, num_bytes=None): # type: ignore
"""Read the specified number of bytes from the associated file.
index: index that specifies which reader is reading.
"""
# print(f"calling read() in thread {threading.current_thread().name}, num_bytes={num_bytes}")
# busy waiting until
while(self._current_max_buf_length > self.MAX_BUF_SIZE and len(self._bufs[index]) < self._current_max_buf_length):
# only the slowest reader could read
# print(f"Busy waiting in thread: {threading.current_thread().name}, current max_len = {self._current_max_buf_length}, current_buf_size = {len(self._bufs[index])}")
pass

# If current thread is the slowest reader, continue read.
# If current thread is the slowest reader, and num_bytes > len(self._buf[index]) / num_bytes = None, will continue grow the buffer.
# max memory usage <= MAX_BUF_SIZE + max(num_bytes called in read)
self._fill_buf_bytes(index, num_bytes)
assert self._current_max_buf_length <= 2 * self.MAX_BUF_SIZE
if num_bytes is None:
num_bytes = len(self._bufs[index])
s = self._bufs[index].read(num_bytes)
self.find_largest_buffer()
# print("Current thread name: ", threading.current_thread().name)
self._pos[index] += len(s)
return s

def peek(self, index: int, num_bytes): # type: ignore
self._fill_buf_bytes(index, num_bytes)
s = self._bufs[index].peek(num_bytes)
return s

def close(self):
self.__input.close()
Loading