diff --git a/docs/running_a_miner.md b/docs/running_a_miner.md new file mode 100644 index 0000000..eb589c0 --- /dev/null +++ b/docs/running_a_miner.md @@ -0,0 +1,27 @@ +# Setup +Install the text-prompting repository. We recommend using pm2 to manage your processes. Install [here](https://pm2.io/docs/runtime/guide/installation/) + +We will show how to run a vicuna miner below: + +```bash +python -m pip install -e ~/text_prompting +``` + +Install the requirements for your specific miner. (if they exist) +```bash +python -m pip install -r ~/text_prompting/miners/vicuna/requirements.txt +``` + +Load it up: +```bash +CUDA_VISIBLE_DEVICES=0 pm2 start ~/text_prompting/neurons/miners/vicuna/miner.py \\ +--name vicuna \\ +--interpreter -- \\ +--vicuna.model_name TheBloke/Wizard-Vicuna-7B-Uncensored-HF \\ # This can be changed to any Vicuna style model +--wallet.name \\ +--wallet.hotkey \\ +--netuid \\ # netuid 8 is on testnet currently +--subtensor.network \\ (finney, test, local, etc) +--logging.debug \\ # set your desired logging level +--axon.port +``` \ No newline at end of file diff --git a/docs/running_a_validator.md b/docs/running_a_validator.md new file mode 100644 index 0000000..08983eb --- /dev/null +++ b/docs/running_a_validator.md @@ -0,0 +1,23 @@ +# Setup + +We recommend using `pm2` to manage your processes. See the pm2 install [guide](https://pm2.io/docs/runtime/guide/installation/) for more info. + +## Hardware requirements: +- Recommended: A100 80GB +- Minimum: A40 48GB or A6000 48GB + +## Run command +`CUDA_VISIBLE_DEVICES=0 pm2 start ~/text_prompting/neurons/validators/validator.py \\ # path to the repo + --name \\ + --interpreter -- \\ + --wallet.name --netuid \\ # netuid 8 is currently test netuid + --wallet.hotkey \\ + --subtensor.network \\ (finney, local, test, etc) + --logging.debug \\ # set desired logging level + --neuron.reward_path ~/.bittensor/validators \\ # where to store logs + --axon.port \\ + --neuron.followup_sample_size \\# This sets top-k for the followup prompts + --neuron.answer_sample_size \\ # This sets top-k for answer prompts + + +> Note: Make sure you have at least >50GB free disk space for wandb logs. \ No newline at end of file diff --git a/examples/streaming.ipynb b/examples/streaming.ipynb new file mode 100644 index 0000000..7fac4a8 --- /dev/null +++ b/examples/streaming.ipynb @@ -0,0 +1,337 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import bittensor as bt\n", + "import pydantic\n", + "from starlette.types import Send\n", + "from starlette.responses import Response, StreamingResponse\n", + "from functools import partial\n", + "from typing import Callable, Awaitable, List, Tuple\n", + "import asyncio\n", + "from transformers import GPT2Tokenizer\n", + "\n", + "bt.debug()\n", + "\n", + "\n", + "# This is a subclass of StreamingSynapse for prompting network functionality\n", + "class StreamPrompting(bt.StreamingSynapse):\n", + " \"\"\"\n", + " StreamPrompting is a subclass of StreamingSynapse that is specifically designed for prompting network functionality.\n", + " It overrides abstract methods from the parent class to provide concrete implementations for processing streaming responses,\n", + " deserializing the response, and extracting JSON data.\n", + "\n", + " Attributes:\n", + " roles: List of roles associated with the prompt.\n", + " messages: List of messages to be processed.\n", + " completion: A string to store the completion result.\n", + " \"\"\"\n", + "\n", + " roles: List[str] = pydantic.Field(\n", + " ...,\n", + " title=\"Roles\",\n", + " description=\"A list of roles in the Prompting scenario. Immuatable.\",\n", + " allow_mutation=False,\n", + " )\n", + "\n", + " messages: List[str] = pydantic.Field(\n", + " ...,\n", + " title=\"Messages\",\n", + " description=\"A list of messages in the Prompting scenario. Immutable.\",\n", + " allow_mutation=False,\n", + " )\n", + "\n", + " # NOTE: It is required to have a hash of the roles and messages in the Prompting scenario\n", + " # These are required to hash the body of the request and verify that the response is valid\n", + " messages_hash: str = pydantic.Field(\n", + " \"\",\n", + " title=\"Messages Hash\",\n", + " description=\"Hash of the messages in the Prompting scenario. Immutable.\",\n", + " allow_mutation=False,\n", + " )\n", + "\n", + " roles_hash: str = pydantic.Field(\n", + " \"\",\n", + " title=\"Roles Hash\",\n", + " description=\"Hash of the roles in the Prompting scenario. Immutable.\",\n", + " allow_mutation=False,\n", + " )\n", + "\n", + " completion: str = pydantic.Field(\n", + " \"\",\n", + " title=\"Completion\",\n", + " description=\"Completion status of the current Prompting object. This attribute is mutable and can be updated.\",\n", + " )\n", + "\n", + " async def process_streaming_response(self, response):\n", + " \"\"\"\n", + " Asynchronously processes chunks of a streaming response, decoding the chunks from utf-8 to strings \n", + " and appending them to the `completion` attribute. The primary goal of this method is to accumulate the \n", + " content from the streaming response in a sequential manner.\n", + "\n", + " This method is particularly vital when the streaming response from the server is broken down into multiple \n", + " chunks, and a comprehensive result needs to be constructed from these individual chunks.\n", + "\n", + " Args:\n", + " response: The response object from which the streamed content is fetched. This content typically \n", + " contains chunks of string data that are being streamed from the server.\n", + "\n", + " Raises:\n", + " ValueError: If there is an issue decoding the streamed chunks.\n", + "\n", + " Note:\n", + " This method is designed for utf-8 encoded strings. If the streamed content has a different encoding, \n", + " it may need to be adjusted accordingly.\n", + " \"\"\"\n", + " if self.completion is None:\n", + " self.completion = \"\"\n", + " async for chunk in response.content.iter_any():\n", + " tokens = chunk.decode('utf-8').split('\\n')\n", + " for token in tokens:\n", + " if token:\n", + " self.completion += token\n", + "\n", + " def deserialize(self):\n", + " \"\"\"\n", + " Deserializes the response by returning the completion attribute.\n", + "\n", + " Returns:\n", + " str: The completion result.\n", + " \"\"\"\n", + " return self.completion\n", + "\n", + " def extract_response_json(self, response):\n", + " \"\"\"\n", + " Extracts various components of the response object, including headers and specific information related \n", + " to dendrite and axon, into a structured JSON format. This method aids in simplifying the raw response \n", + " object into a format that's easier to read and interpret.\n", + "\n", + " The method is particularly useful for extracting specific metadata from the response headers which \n", + " provide insights about the response or the server's configurations. Moreover, details about dendrite \n", + " and axon extracted from headers can provide information about the neural network layers that were \n", + " involved in the request-response cycle.\n", + "\n", + " Args:\n", + " response: The response object, typically an instance of an HTTP response, containing the headers \n", + " and the content that needs to be extracted.\n", + "\n", + " Returns:\n", + " dict: A dictionary containing the structured data extracted from the response object. This includes \n", + " data such as the server's name, timeout details, data sizes, and information about dendrite \n", + " and axon among others.\n", + "\n", + " Raises:\n", + " KeyError: If expected headers or response components are missing.\n", + "\n", + " Note:\n", + " This method assumes a certain structure and naming convention for the headers. If the server \n", + " changes its header naming convention or structure, this method may need adjustments.\n", + " \"\"\"\n", + " headers = {k.decode('utf-8'): v.decode('utf-8') for k, v in response.__dict__[\"_raw_headers\"]}\n", + "\n", + " def extract_info(prefix):\n", + " return {key.split('_')[-1]: value for key, value in headers.items() if key.startswith(prefix)}\n", + "\n", + " return {\n", + " \"name\": headers.get('name', ''),\n", + " \"timeout\": float(headers.get('timeout', 0)),\n", + " \"total_size\": int(headers.get('total_size', 0)),\n", + " \"header_size\": int(headers.get('header_size', 0)),\n", + " \"dendrite\": extract_info('bt_header_dendrite'),\n", + " \"axon\": extract_info('bt_header_axon'),\n", + " \"roles\": self.roles,\n", + " \"messages\": self.messages,\n", + " \"completion\": self.completion,\n", + " }\n", + "\n", + "# This should encapsulate all the logic for generating a streaming response\n", + "def prompt(synapse: StreamPrompting) -> StreamPrompting:\n", + " \"\"\"\n", + " Generates a streaming response based on the given StreamPrompting synapse.\n", + "\n", + " This function integrates tokenization, model inference, and streaming functionality to \n", + " generate a response for the Bittensor network. It defines the tokenizer based on the GPT-2 model, \n", + " the decoding mechanism for the model, and the actual streaming mechanism to stream tokens.\n", + "\n", + " The streaming process mimics a model inference on the provided message, encodes the output tokens,\n", + " and sends them in a streaming fashion to the client. To simulate the behavior of streaming \n", + " responses, there's an artificial delay introduced between sending tokens.\n", + "\n", + " Args:\n", + " synapse (StreamPrompting): A StreamPrompting instance containing the messages that need to be processed. \n", + " This object contains details like roles, messages, and other prompting \n", + " details which guide the streaming response's behavior.\n", + "\n", + " Returns:\n", + " StreamPrompting: A modified StreamPrompting instance with the streaming response set up. This response \n", + " is ready to be sent to the client as a continuous stream of tokens based on the input \n", + " message and the model inference.\n", + "\n", + " Note:\n", + " The function assumes the use of the GPT-2 tokenizer and simulates model inference. In a real-world \n", + " scenario, it can be integrated with an actual model inference mechanism.\n", + " \"\"\"\n", + " tokenizer = GPT2Tokenizer.from_pretrained('gpt2')\n", + "\n", + " def model(ids):\n", + " return (tokenizer.decode(id) for id in ids)\n", + "\n", + " async def _prompt(text: str, send: Send):\n", + " \"\"\"\n", + " Asynchronously simulates model inference and streams decoded tokens to the client.\n", + "\n", + " This function takes a given input text, tokenizes it, and simulates the behavior of model inference\n", + " by decoding the tokenized input. Decoded tokens are sent to the client in a streaming manner. An artificial\n", + " delay is introduced between sending tokens to mimic the real-world behavior of streaming responses.\n", + "\n", + " Args:\n", + " text (str): The input message text to be tokenized and used for simulated model inference.\n", + " This typically represents a part of the client's request or a message from the StreamPrompting synapse.\n", + " \n", + " send (Send): A callable provided by the ASGI server, responsible for sending the response to the client.\n", + " This function is used to stream the decoded tokens to the client in real-time.\n", + "\n", + " Note:\n", + " The function uses a simulated model inference mechanism and an artificial delay to showcase \n", + " the streaming effect. In a real-world scenario, this could be integrated with an actual \n", + " deep learning model for generating response tokens.\n", + " \"\"\"\n", + " # Simulate model inference\n", + " input_ids = tokenizer(text, return_tensors=\"pt\").input_ids.squeeze()\n", + " for token in model(input_ids):\n", + " await send({\"type\": \"http.response.body\", \"body\": (token + '\\n').encode('utf-8'), \"more_body\": True})\n", + " bt.logging.debug(f\"Streamed token: {token}\")\n", + " # Sleep to show the streaming effect\n", + " await asyncio.sleep(1)\n", + "\n", + " message = synapse.messages[0]\n", + " token_streamer = partial(_prompt, message)\n", + " return synapse.create_streaming_response(token_streamer)\n", + "\n", + "def blacklist(synapse: StreamPrompting) -> Tuple[bool, str]:\n", + " \"\"\"\n", + " Determines whether the synapse should be blacklisted.\n", + "\n", + " Args:\n", + " synapse: A StreamPrompting instance.\n", + "\n", + " Returns:\n", + " Tuple[bool, str]: Always returns False, indicating that the synapse should not be blacklisted.\n", + " \"\"\"\n", + " return False, \"\"\n", + "\n", + "def priority(synapse: StreamPrompting) -> float:\n", + " \"\"\"\n", + " Determines the priority of the synapse.\n", + "\n", + " Args:\n", + " synapse: A StreamPrompting instance.\n", + "\n", + " Returns:\n", + " float: Always returns 0.0, indicating the default priority.\n", + " \"\"\"\n", + " return 0.0\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[34m2023-09-15 20:57:00.086\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | dendrite | --> | 3468 B | StreamPrompting | 5C86aJ2uQawR6P6veaJQXNK9HaWh6NMbUhTiLs65kq4ZW3NH | 216.153.62.113:8099 | 0 | Success\n", + "\u001b[34m2023-09-15 20:57:00.097\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | axon | <-- | 843 B | StreamPrompting | 5C86aJ2uQawR6P6veaJQXNK9HaWh6NMbUhTiLs65kq4ZW3NH | 127.0.0.1:46428 | 200 | Success \n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[34m2023-09-15 20:57:00.465\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | axon | --> | -1 B | StreamPrompting | 5C86aJ2uQawR6P6veaJQXNK9HaWh6NMbUhTiLs65kq4ZW3NH | 127.0.0.1:46428 | 200 | Success\n", + "\u001b[34m2023-09-15 20:57:00.467\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: hello \n", + "\u001b[34m2023-09-15 20:57:01.468\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: this \n", + "\u001b[34m2023-09-15 20:57:02.470\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: is \n", + "\u001b[34m2023-09-15 20:57:03.470\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: a \n", + "\u001b[34m2023-09-15 20:57:04.472\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: test \n", + "\u001b[34m2023-09-15 20:57:05.473\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: of \n", + "\u001b[34m2023-09-15 20:57:06.473\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: a \n", + "\u001b[34m2023-09-15 20:57:07.475\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: streaming \n", + "\u001b[34m2023-09-15 20:57:08.476\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: response \n", + "\u001b[34m2023-09-15 20:57:09.477\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | Streamed token: . \n", + "\u001b[34m2023-09-15 20:57:10.488\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | dendrite | <-- | 4008 B | StreamPrompting | 5C86aJ2uQawR6P6veaJQXNK9HaWh6NMbUhTiLs65kq4ZW3NH | 216.153.62.113:8099 | 200 | Success\n", + "\u001b[34m2023-09-15 20:57:10.489\u001b[0m | \u001b[34m\u001b[1m DEBUG \u001b[0m | dendrite | <-- | 4008 B | StreamPrompting | 5C86aJ2uQawR6P6veaJQXNK9HaWh6NMbUhTiLs65kq4ZW3NH | 216.153.62.113:8099 | 200 | Success\n" + ] + }, + { + "data": { + "text/plain": [ + "['hello this is a test of a streaming response.']" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Create an Axon instance on port 8099.\n", + "axon = bt.axon(port=8099)\n", + "\n", + "# Attach the forward, blacklist, and priority functions to the Axon.\n", + "# forward_fn: The function to handle forwarding logic.\n", + "# blacklist_fn: The function to determine if a request should be blacklisted.\n", + "# priority_fn: The function to determine the priority of the request.\n", + "axon.attach(\n", + " forward_fn=prompt,\n", + " blacklist_fn=blacklist,\n", + " priority_fn=priority\n", + ")\n", + "\n", + "# Start the Axon to begin listening for requests.\n", + "axon.start()\n", + "\n", + "# Create a Dendrite instance to handle client-side communication.\n", + "d = bt.dendrite()\n", + "\n", + "# Send a request to the Axon using the Dendrite, passing in a StreamPrompting instance with roles and messages.\n", + "# The response is awaited, as the Dendrite communicates asynchronously with the Axon.\n", + "resp = await d(\n", + " [axon],\n", + " StreamPrompting(roles=[\"user\"], messages=[\"hello this is a test of a streaming response.\"])\n", + ")\n", + "\n", + "# The response object contains the result of the streaming operation.\n", + "resp\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "rev2", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.12" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/neurons/miners/vicuna/miner.py b/neurons/miners/vicuna/miner.py index 1336f81..7946b75 100644 --- a/neurons/miners/vicuna/miner.py +++ b/neurons/miners/vicuna/miner.py @@ -44,7 +44,7 @@ class VicunaMiner(Miner): model (AutoModelForCausalLM): The causal language model for text generation. """ - def config(self) -> "bt.Config": + def config(self) -> "bt.config": """ Configures the Vicuna Miner with relevant arguments. @@ -65,9 +65,9 @@ def add_args(cls, parser: argparse.ArgumentParser): """ parser.add_argument( "--vicuna.model_name", - default="TheBloke/Wizard-Vicuna-7B-Uncensored-HF", type=str, - help="Name/path of model to load", + default="TheBloke/Wizard-Vicuna-7B-Uncensored-HF", + help="Name/path of model to load. Also can be a filepath to the model weights (HF)", ) parser.add_argument( "--vicuna.device", type=str, help="Device to load model", default="cuda"