Skip to content

Commit

Permalink
Add IPFS integration (#35)
Browse files Browse the repository at this point in the history
* add IPFS integration
* add rpc wait_timeout for ipfs peers
* add more options, upgrade hivemind, add test
* allow the use of persistent identities
* ensure ipfs bootstrap success
* commit docker recommendations

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Jirka Borovec <6035284+Borda@users.noreply.github.com>
  • Loading branch information
3 people committed Jul 3, 2024
1 parent 27b1bda commit a140804
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 2 deletions.
15 changes: 15 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM ubuntu:22.04

ENV DEBIAN_FRONTEND="noninteractive"

RUN apt-get update \
&& apt-get install -y --no-install-recommends \
python3-full \
python3-pip \
python3-packaging \
&& rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY . ./
RUN python3 -m pip install . -r tests/requirements.txt
14 changes: 14 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: '3.9'

services:
tests:
image: lightning-universe/lightning-hivemind:latest
command: pytest tests/ -v
tty: true
stdin_open: true
build:
context: .
dockerfile: Dockerfile
volumes:
- ./src:/app/src
- ./tests:/app/tests
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
numpy <2.0 # needed for older Torch
torch <2.3 # fixme: freeze until AMP compatibility is resolved
lightning >=2.0.0
hivemind >=1.1.0, <=1.1.5; sys_platform == 'linux'
hivemind >=1.1.0, <=1.1.10.post2; sys_platform == 'linux'

pydantic <2.0.0 # fixme: lift when resolved
29 changes: 29 additions & 0 deletions src/lightning_hivemind/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,22 @@ class HivemindStrategy(Strategy):
initial_peers: If connecting to a running process, a list of initial peers needs to be passed in.
This can also be set via the env variable ``INITIAL_PEERS``.
use_ipfs: Use IPFS to find initial_peers. If enabled, you only need to provide /p2p/XXXX part of the
multiaddrs for the initial_peers (no need to specify a particular IPv4/IPv6 host and port)"
wait_timeout: a kademlia rpc request is deemed lost if we did not receive a reply in this many seconds,
useful if `use_ipfs=True`
bootstrap_timeout: after one of peers responds, await other peers for at most this many seconds
use_relay: disable circuit relay functionality in libp2p (see https://docs.libp2p.io/concepts/nat/circuit-relay/)
use_auto_relay: look for libp2p relays to become reachable if we are behind NAT/firewall
identity_path: Path to a private key file. If defined, makes the peer ID deterministic.
If the file does not exist, writes a new private key to this file.
)
**optimizer_kwargs: kwargs are passed to the :class:`hivemind.Optimizer` class.
"""

Expand All @@ -128,6 +144,12 @@ def __init__(
averager_opts: Optional[Dict] = None,
host_maddrs: Optional[List] = None,
initial_peers: Optional[Union[str, List]] = None,
use_ipfs: bool = False,
wait_timeout: int = 3,
bootstrap_timeout: Optional[float] = None,
use_relay: bool = True,
use_auto_relay: bool = False,
identity_path: Optional[str] = None,
**optimizer_kwargs: Any,
):
if platform.system() != "Linux":
Expand Down Expand Up @@ -165,6 +187,13 @@ def __init__(
start=True,
initial_peers=initial_peers,
host_maddrs=host_maddrs if host_maddrs is not None else ["/ip4/0.0.0.0/tcp/0", "/ip4/0.0.0.0/udp/0/quic"],
use_ipfs=use_ipfs,
ensure_bootstrap_success=True,
wait_timeout=wait_timeout,
bootstrap_timeout=bootstrap_timeout,
use_relay=use_relay,
use_auto_relay=use_auto_relay,
identity_path=identity_path,
)

visible_addresses = [
Expand Down
21 changes: 20 additions & 1 deletion tests/test_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,25 @@ def configure_optimizers(self):
trainer.fit(model)


@mock.patch.dict(os.environ, {"HIVEMIND_MEMORY_SHARING_STRATEGY": "file_descriptor"}, clear=True)
def test_ipfs_integration():
class TestModel(BoringModel):
def on_before_backward(self, loss: Tensor) -> None:
scheduler = self.trainer.lr_scheduler_configs[0].scheduler
assert isinstance(scheduler, HiveMindScheduler)

def configure_optimizers(self):
optimizer = torch.optim.SGD(self.layer.parameters(), lr=0.1)
return [optimizer], [torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)]

model = TestModel()
trainer = Trainer(
strategy=HivemindStrategy(target_batch_size=1, use_ipfs=True, use_relay=True, use_auto_relay=True),
fast_dev_run=True,
)
trainer.fit(model)


@mock.patch.dict(
os.environ,
{
Expand Down Expand Up @@ -139,7 +158,7 @@ def test_raise_exception_no_batch_size(mock__extract_batch_size):
[(True, True, True), (False, True, False)],
)
def test_warn_if_argument_passed(delay_grad_averaging, delay_state_averaging, delay_optimizer_step):
"""Ensure that valid combination of HiveMind delay arguments warn if scheduler isn't passed in as a function."""
"""Ensure that valid combination of HiveMind delay arguments warn if scheduler isn't passed in as a function."""
model = BoringModel()
trainer = Trainer(
strategy=HivemindStrategy(
Expand Down

0 comments on commit a140804

Please sign in to comment.