Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs #12

Merged
merged 4 commits into from
Sep 18, 2023
Merged

Docs #12

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/running_a_miner.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Setup
Install the text-prompting repository. We recommend using pm2 to manage your processes. Install [here](https://pm2.io/docs/runtime/guide/installation/)

We will show how to run a vicuna miner below:

```bash
python -m pip install -e ~/text_prompting
```

Install the requirements for your specific miner. (if they exist)
```bash
python -m pip install -r ~/text_prompting/miners/vicuna/requirements.txt
```

Load it up:
```bash
CUDA_VISIBLE_DEVICES=0 pm2 start ~/text_prompting/neurons/miners/vicuna/miner.py \\
--name vicuna \\
--interpreter <path-to-python-binary> -- \\
--vicuna.model_name TheBloke/Wizard-Vicuna-7B-Uncensored-HF \\ # This can be changed to any Vicuna style model
--wallet.name <wallet-name> \\
--wallet.hotkey <wallet-hotkey> \\
--netuid <netuid> \\ # netuid 8 is on testnet currently
--subtensor.network <network> \\ (finney, test, local, etc)
--logging.debug \\ # set your desired logging level
--axon.port <port>
```
23 changes: 23 additions & 0 deletions docs/running_a_validator.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Setup

We recommend using `pm2` to manage your processes. See the pm2 install [guide](https://pm2.io/docs/runtime/guide/installation/) for more info.

## Hardware requirements:
- Recommended: A100 80GB
- Minimum: A40 48GB or A6000 48GB

## Run command
`CUDA_VISIBLE_DEVICES=0 pm2 start ~/text_prompting/neurons/validators/validator.py \\ # path to the repo
--name <your-validator-name> \\
--interpreter <path-to-your-env-python> -- \\
--wallet.name <validator-wallet> --netuid <netuid> \\ # netuid 8 is currently test netuid
--wallet.hotkey <validator-hotkey> \\
--subtensor.network <network> \\ (finney, local, test, etc)
--logging.debug \\ # set desired logging level
--neuron.reward_path ~/.bittensor/validators \\ # where to store logs
--axon.port <port> \\
--neuron.followup_sample_size <k> \\# This sets top-k for the followup prompts
--neuron.answer_sample_size <k> \\ # This sets top-k for answer prompts


> Note: Make sure you have at least >50GB free disk space for wandb logs.
337 changes: 337 additions & 0 deletions examples/streaming.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,337 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import bittensor as bt\n",
"import pydantic\n",
"from starlette.types import Send\n",
"from starlette.responses import Response, StreamingResponse\n",
"from functools import partial\n",
"from typing import Callable, Awaitable, List, Tuple\n",
"import asyncio\n",
"from transformers import GPT2Tokenizer\n",
"\n",
"bt.debug()\n",
"\n",
"\n",
"# This is a subclass of StreamingSynapse for prompting network functionality\n",
"class StreamPrompting(bt.StreamingSynapse):\n",
" \"\"\"\n",
" StreamPrompting is a subclass of StreamingSynapse that is specifically designed for prompting network functionality.\n",
" It overrides abstract methods from the parent class to provide concrete implementations for processing streaming responses,\n",
" deserializing the response, and extracting JSON data.\n",
"\n",
" Attributes:\n",
" roles: List of roles associated with the prompt.\n",
" messages: List of messages to be processed.\n",
" completion: A string to store the completion result.\n",
" \"\"\"\n",
"\n",
" roles: List[str] = pydantic.Field(\n",
" ...,\n",
" title=\"Roles\",\n",
" description=\"A list of roles in the Prompting scenario. Immuatable.\",\n",
" allow_mutation=False,\n",
" )\n",
"\n",
" messages: List[str] = pydantic.Field(\n",
" ...,\n",
" title=\"Messages\",\n",
" description=\"A list of messages in the Prompting scenario. Immutable.\",\n",
" allow_mutation=False,\n",
" )\n",
"\n",
" # NOTE: It is required to have a hash of the roles and messages in the Prompting scenario\n",
" # These are required to hash the body of the request and verify that the response is valid\n",
" messages_hash: str = pydantic.Field(\n",
" \"\",\n",
" title=\"Messages Hash\",\n",
" description=\"Hash of the messages in the Prompting scenario. Immutable.\",\n",
" allow_mutation=False,\n",
" )\n",
"\n",
" roles_hash: str = pydantic.Field(\n",
" \"\",\n",
" title=\"Roles Hash\",\n",
" description=\"Hash of the roles in the Prompting scenario. Immutable.\",\n",
" allow_mutation=False,\n",
" )\n",
"\n",
" completion: str = pydantic.Field(\n",
" \"\",\n",
" title=\"Completion\",\n",
" description=\"Completion status of the current Prompting object. This attribute is mutable and can be updated.\",\n",
" )\n",
"\n",
" async def process_streaming_response(self, response):\n",
" \"\"\"\n",
" Asynchronously processes chunks of a streaming response, decoding the chunks from utf-8 to strings \n",
" and appending them to the `completion` attribute. The primary goal of this method is to accumulate the \n",
" content from the streaming response in a sequential manner.\n",
"\n",
" This method is particularly vital when the streaming response from the server is broken down into multiple \n",
" chunks, and a comprehensive result needs to be constructed from these individual chunks.\n",
"\n",
" Args:\n",
" response: The response object from which the streamed content is fetched. This content typically \n",
" contains chunks of string data that are being streamed from the server.\n",
"\n",
" Raises:\n",
" ValueError: If there is an issue decoding the streamed chunks.\n",
"\n",
" Note:\n",
" This method is designed for utf-8 encoded strings. If the streamed content has a different encoding, \n",
" it may need to be adjusted accordingly.\n",
" \"\"\"\n",
" if self.completion is None:\n",
" self.completion = \"\"\n",
" async for chunk in response.content.iter_any():\n",
" tokens = chunk.decode('utf-8').split('\\n')\n",
" for token in tokens:\n",
" if token:\n",
" self.completion += token\n",
"\n",
" def deserialize(self):\n",
" \"\"\"\n",
" Deserializes the response by returning the completion attribute.\n",
"\n",
" Returns:\n",
" str: The completion result.\n",
" \"\"\"\n",
" return self.completion\n",
"\n",
" def extract_response_json(self, response):\n",
" \"\"\"\n",
" Extracts various components of the response object, including headers and specific information related \n",
" to dendrite and axon, into a structured JSON format. This method aids in simplifying the raw response \n",
" object into a format that's easier to read and interpret.\n",
"\n",
" The method is particularly useful for extracting specific metadata from the response headers which \n",
" provide insights about the response or the server's configurations. Moreover, details about dendrite \n",
" and axon extracted from headers can provide information about the neural network layers that were \n",
" involved in the request-response cycle.\n",
"\n",
" Args:\n",
" response: The response object, typically an instance of an HTTP response, containing the headers \n",
" and the content that needs to be extracted.\n",
"\n",
" Returns:\n",
" dict: A dictionary containing the structured data extracted from the response object. This includes \n",
" data such as the server's name, timeout details, data sizes, and information about dendrite \n",
" and axon among others.\n",
"\n",
" Raises:\n",
" KeyError: If expected headers or response components are missing.\n",
"\n",
" Note:\n",
" This method assumes a certain structure and naming convention for the headers. If the server \n",
" changes its header naming convention or structure, this method may need adjustments.\n",
" \"\"\"\n",
" headers = {k.decode('utf-8'): v.decode('utf-8') for k, v in response.__dict__[\"_raw_headers\"]}\n",
"\n",
" def extract_info(prefix):\n",
" return {key.split('_')[-1]: value for key, value in headers.items() if key.startswith(prefix)}\n",
"\n",
" return {\n",
" \"name\": headers.get('name', ''),\n",
" \"timeout\": float(headers.get('timeout', 0)),\n",
" \"total_size\": int(headers.get('total_size', 0)),\n",
" \"header_size\": int(headers.get('header_size', 0)),\n",
" \"dendrite\": extract_info('bt_header_dendrite'),\n",
" \"axon\": extract_info('bt_header_axon'),\n",
" \"roles\": self.roles,\n",
" \"messages\": self.messages,\n",
" \"completion\": self.completion,\n",
" }\n",
"\n",
"# This should encapsulate all the logic for generating a streaming response\n",
"def prompt(synapse: StreamPrompting) -> StreamPrompting:\n",
" \"\"\"\n",
" Generates a streaming response based on the given StreamPrompting synapse.\n",
"\n",
" This function integrates tokenization, model inference, and streaming functionality to \n",
" generate a response for the Bittensor network. It defines the tokenizer based on the GPT-2 model, \n",
" the decoding mechanism for the model, and the actual streaming mechanism to stream tokens.\n",
"\n",
" The streaming process mimics a model inference on the provided message, encodes the output tokens,\n",
" and sends them in a streaming fashion to the client. To simulate the behavior of streaming \n",
" responses, there's an artificial delay introduced between sending tokens.\n",
"\n",
" Args:\n",
" synapse (StreamPrompting): A StreamPrompting instance containing the messages that need to be processed. \n",
" This object contains details like roles, messages, and other prompting \n",
" details which guide the streaming response's behavior.\n",
"\n",
" Returns:\n",
" StreamPrompting: A modified StreamPrompting instance with the streaming response set up. This response \n",
" is ready to be sent to the client as a continuous stream of tokens based on the input \n",
" message and the model inference.\n",
"\n",
" Note:\n",
" The function assumes the use of the GPT-2 tokenizer and simulates model inference. In a real-world \n",
" scenario, it can be integrated with an actual model inference mechanism.\n",
" \"\"\"\n",
" tokenizer = GPT2Tokenizer.from_pretrained('gpt2')\n",
"\n",
" def model(ids):\n",
" return (tokenizer.decode(id) for id in ids)\n",
"\n",
" async def _prompt(text: str, send: Send):\n",
" \"\"\"\n",
" Asynchronously simulates model inference and streams decoded tokens to the client.\n",
"\n",
" This function takes a given input text, tokenizes it, and simulates the behavior of model inference\n",
" by decoding the tokenized input. Decoded tokens are sent to the client in a streaming manner. An artificial\n",
" delay is introduced between sending tokens to mimic the real-world behavior of streaming responses.\n",
"\n",
" Args:\n",
" text (str): The input message text to be tokenized and used for simulated model inference.\n",
" This typically represents a part of the client's request or a message from the StreamPrompting synapse.\n",
" \n",
" send (Send): A callable provided by the ASGI server, responsible for sending the response to the client.\n",
" This function is used to stream the decoded tokens to the client in real-time.\n",
"\n",
" Note:\n",
" The function uses a simulated model inference mechanism and an artificial delay to showcase \n",
" the streaming effect. In a real-world scenario, this could be integrated with an actual \n",
" deep learning model for generating response tokens.\n",
" \"\"\"\n",
" # Simulate model inference\n",
" input_ids = tokenizer(text, return_tensors=\"pt\").input_ids.squeeze()\n",
" for token in model(input_ids):\n",
" await send({\"type\": \"http.response.body\", \"body\": (token + '\\n').encode('utf-8'), \"more_body\": True})\n",
" bt.logging.debug(f\"Streamed token: {token}\")\n",
" # Sleep to show the streaming effect\n",
" await asyncio.sleep(1)\n",
"\n",
" message = synapse.messages[0]\n",
" token_streamer = partial(_prompt, message)\n",
" return synapse.create_streaming_response(token_streamer)\n",
"\n",
"def blacklist(synapse: StreamPrompting) -> Tuple[bool, str]:\n",
" \"\"\"\n",
" Determines whether the synapse should be blacklisted.\n",
"\n",
" Args:\n",
" synapse: A StreamPrompting instance.\n",
"\n",
" Returns:\n",
" Tuple[bool, str]: Always returns False, indicating that the synapse should not be blacklisted.\n",
" \"\"\"\n",
" return False, \"\"\n",
"\n",
"def priority(synapse: StreamPrompting) -> float:\n",
" \"\"\"\n",
" Determines the priority of the synapse.\n",
"\n",
" Args:\n",
" synapse: A StreamPrompting instance.\n",
"\n",
" Returns:\n",
" float: Always returns 0.0, indicating the default priority.\n",
" \"\"\"\n",
" return 0.0\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[34m2023-09-15 20:57:00.086\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | dendrite | --> | 3468 B | StreamPrompting | 5C86aJ2uQawR6P6veaJQXNK9HaWh6NMbUhTiLs65kq4ZW3NH | 216.153.62.113:8099 | 0 | Success\n",
"\u001b[34m2023-09-15 20:57:00.097\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | axon | <-- | 843 B | StreamPrompting | 5C86aJ2uQawR6P6veaJQXNK9HaWh6NMbUhTiLs65kq4ZW3NH | 127.0.0.1:46428 | 200 | Success \n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[34m2023-09-15 20:57:00.465\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | axon | --> | -1 B | StreamPrompting | 5C86aJ2uQawR6P6veaJQXNK9HaWh6NMbUhTiLs65kq4ZW3NH | 127.0.0.1:46428 | 200 | Success\n",
"\u001b[34m2023-09-15 20:57:00.467\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: hello \n",
"\u001b[34m2023-09-15 20:57:01.468\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: this \n",
"\u001b[34m2023-09-15 20:57:02.470\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: is \n",
"\u001b[34m2023-09-15 20:57:03.470\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: a \n",
"\u001b[34m2023-09-15 20:57:04.472\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: test \n",
"\u001b[34m2023-09-15 20:57:05.473\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: of \n",
"\u001b[34m2023-09-15 20:57:06.473\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: a \n",
"\u001b[34m2023-09-15 20:57:07.475\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: streaming \n",
"\u001b[34m2023-09-15 20:57:08.476\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: response \n",
"\u001b[34m2023-09-15 20:57:09.477\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: . \n",
"\u001b[34m2023-09-15 20:57:10.488\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | dendrite | <-- | 4008 B | StreamPrompting | 5C86aJ2uQawR6P6veaJQXNK9HaWh6NMbUhTiLs65kq4ZW3NH | 216.153.62.113:8099 | 200 | Success\n",
"\u001b[34m2023-09-15 20:57:10.489\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | dendrite | <-- | 4008 B | StreamPrompting | 5C86aJ2uQawR6P6veaJQXNK9HaWh6NMbUhTiLs65kq4ZW3NH | 216.153.62.113:8099 | 200 | Success\n"
]
},
{
"data": {
"text/plain": [
"['hello this is a test of a streaming response.']"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Create an Axon instance on port 8099.\n",
"axon = bt.axon(port=8099)\n",
"\n",
"# Attach the forward, blacklist, and priority functions to the Axon.\n",
"# forward_fn: The function to handle forwarding logic.\n",
"# blacklist_fn: The function to determine if a request should be blacklisted.\n",
"# priority_fn: The function to determine the priority of the request.\n",
"axon.attach(\n",
" forward_fn=prompt,\n",
" blacklist_fn=blacklist,\n",
" priority_fn=priority\n",
")\n",
"\n",
"# Start the Axon to begin listening for requests.\n",
"axon.start()\n",
"\n",
"# Create a Dendrite instance to handle client-side communication.\n",
"d = bt.dendrite()\n",
"\n",
"# Send a request to the Axon using the Dendrite, passing in a StreamPrompting instance with roles and messages.\n",
"# The response is awaited, as the Dendrite communicates asynchronously with the Axon.\n",
"resp = await d(\n",
" [axon],\n",
" StreamPrompting(roles=[\"user\"], messages=[\"hello this is a test of a streaming response.\"])\n",
")\n",
"\n",
"# The response object contains the result of the streaming operation.\n",
"resp\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "rev2",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading