Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NA] Offline logging support #115

Merged
merged 25 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
66a7d9d
Add config variables for offline mode
alexkuzmik Mar 14, 2024
851111d
Add message processing
alexkuzmik Mar 14, 2024
0d3be9f
Fix lint errors
alexkuzmik Mar 14, 2024
17f31cf
Fix lint errors, change some names
alexkuzmik Mar 14, 2024
c6f613e
Refactor names, add message_processing namespace
alexkuzmik Mar 14, 2024
e69b9b0
Move prompt sending logic to online_senders
alexkuzmik Mar 14, 2024
9a94f4d
Move sending logic for chains to online_senders.chain
alexkuzmik Mar 14, 2024
629885e
Fix some lint errors
alexkuzmik Mar 14, 2024
e015358
Update message processing implementation, split the logic and refacto…
alexkuzmik Mar 18, 2024
6147c42
Fix some lint errors
alexkuzmik Mar 18, 2024
ce34afd
Remove odd files from another branch that were added here accidentally
alexkuzmik Mar 18, 2024
05740d3
Fix lint errors
alexkuzmik Mar 18, 2024
bb6e5a9
Add draft version of offline_message_processor (without file rotations)
alexkuzmik Mar 19, 2024
574df02
Fix lint errors, add file rotation logic
alexkuzmik Mar 20, 2024
a3f50ba
Refactor message processing logic, add unit tests
alexkuzmik Mar 20, 2024
64758ab
Exclude api_key from serialized messages, rename experiment_informati…
alexkuzmik Mar 21, 2024
36dd950
Add test for message to_dict and from_dict
alexkuzmik Mar 21, 2024
df7c785
Add directory creation if it doesnt exist, add versions to messages, …
alexkuzmik Mar 21, 2024
6b623b0
Don't raise errors if offline mode enabled
alexkuzmik Mar 21, 2024
ef2229e
Fix lint errors
alexkuzmik Mar 21, 2024
9df536e
Enable error raising in test workflows, fix few tests
alexkuzmik Mar 21, 2024
55146a7
Add new test for offline message processor
alexkuzmik Mar 22, 2024
8098025
Rename test
alexkuzmik Mar 22, 2024
5473f4c
Add random numbers to the end of the file
alexkuzmik Mar 22, 2024
99b0e2c
Config renamings
alexkuzmik Mar 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/lib-integration-tests-runner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ on:
env:
SLACK_WEBHOOK_URL: ${{ secrets.ACTION_MONITORING_SLACK }}
LIBS: ${{ github.event.inputs.libs != '' && github.event.inputs.libs || 'all' }}
COMET_RAISE_EXCEPTIONS_ON_ERROR: "1"

jobs:
init_environment:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/lib-openai-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ name: Lib OpenAI Tests
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
OPENAI_ORG_ID: ${{ secrets.OPENAI_ORG_ID }}
COMET_RAISE_EXCEPTIONS_ON_ERROR: "1"
on:
workflow_call:

Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/sanity-tests.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
name: Sanity Tests Comet LLM
env:
COMET_RAISE_EXCEPTIONS_ON_ERROR: "1"
on:
pull_request:

Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
name: Unit Tests Comet LLM
env:
COMET_RAISE_EXCEPTIONS_ON_ERROR: "1"
on:
pull_request:

Expand Down
54 changes: 19 additions & 35 deletions src/comet_llm/chains/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
llm_result,
logging_messages,
)
from ..message_processing import api as message_processing_api, messages
from ..types import JSONEncodable
from . import chain, state

Expand All @@ -52,11 +53,11 @@ def start_chain(
tags: List[str] (optional) user-defined tags attached to the chain
"""

MESSAGE = """
CometLLM requires an API key. Please provide it as the
api_key argument to comet_llm.start_chain or as an environment
variable named COMET_API_KEY
"""
MESSAGE = (
None
if config.offline_enabled()
else (logging_messages.API_KEY_NOT_FOUND_MESSAGE % "comet_llm.start_chain")
)

experiment_info_ = experiment_info.get(
api_key,
Expand All @@ -77,7 +78,7 @@ def start_chain(
def end_chain(
outputs: Dict[str, JSONEncodable],
metadata: Optional[Dict[str, JSONEncodable]] = None,
) -> llm_result.LLMResult:
) -> Optional[llm_result.LLMResult]:
"""
Commits global chain and logs the result to Comet.
Args:
Expand All @@ -100,38 +101,21 @@ def end_chain(
return log_chain(global_chain)


def log_chain(chain: chain.Chain) -> llm_result.LLMResult:
def log_chain(chain: chain.Chain) -> Optional[llm_result.LLMResult]:
chain_data = chain.as_dict()

experiment_info_ = chain.experiment_info
experiment_api_ = experiment_api.ExperimentAPI.create_new(
api_key=experiment_info_.api_key,
workspace=experiment_info_.workspace,
project_name=experiment_info_.project_name,
message = messages.ChainMessage(
experiment_info_=chain.experiment_info,
tags=chain.tags,
chain_data=chain_data,
duration=chain_data["chain_duration"],
metadata=chain_data["metadata"],
others=chain.others,
)

if chain.tags is not None:
experiment_api_.log_tags(chain.tags)
result = message_processing_api.MESSAGE_PROCESSOR.process(message)

experiment_api_.log_asset_with_io(
name="comet_llm_data.json",
file=io.StringIO(json.dumps(chain_data)),
asset_type="llm_data",
)

experiment_api_.log_metric(
name="chain_duration", value=chain_data["chain_duration"]
)
if result is not None:
app.SUMMARY.add_log(result.project_url, "chain")

parameters = convert.chain_metadata_to_flat_parameters(chain_data["metadata"])
for name, value in parameters.items():
experiment_api_.log_parameter(name, value)

for name, value in chain.others.items():
experiment_api_.log_other(name, value)

app.SUMMARY.add_log(experiment_api_.project_url, "chain")

return llm_result.LLMResult(
id=experiment_api_.id, project_url=experiment_api_.project_url
)
return result
15 changes: 15 additions & 0 deletions src/comet_llm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def _extend_comet_ml_config() -> None:
"comet.logging.console": {"type": str, "default": "INFO"},
"comet.raise_exceptions_on_error": {"type": int, "default": 0},
"comet.internal.check_tls_certificate": {"type": bool, "default": True},
"comet.offline": {"type": bool, "default": False},
alexkuzmik marked this conversation as resolved.
Show resolved Hide resolved
"comet.offline_folder_path": {"type": str, "default": ".cometllm-runs"},
alexkuzmik marked this conversation as resolved.
Show resolved Hide resolved
"comet.offline_batch_duration_seconds": {"type": int, "default": 300},
}

comet_ml_config.CONFIG_MAP.update(CONFIG_MAP_EXTENSION)
Expand Down Expand Up @@ -95,6 +98,18 @@ def tls_verification_enabled() -> bool:
return _COMET_ML_CONFIG["comet.internal.check_tls_certificate"] # type: ignore


def offline_enabled() -> bool:
return bool(_COMET_ML_CONFIG["comet.offline"])


def offline_folder_path() -> str:
return str(_COMET_ML_CONFIG["comet.offline_folder_path"])


def offline_batch_duration_seconds() -> int:
return int(_COMET_ML_CONFIG["comet.offline_batch_duration_seconds"])


def init(
api_key: Optional[str] = None,
workspace: Optional[str] = None,
Expand Down
5 changes: 5 additions & 0 deletions src/comet_llm/logging_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
variable named COMET_API_KEY
"""

API_KEY_NOT_CONFIGURED = """
CometLLM requires an API key. Please provide it as the
as an environment variable named COMET_API_KEY
"""

NON_ALLOWED_SCORE = "Score can only be 0 or 1 when calling 'log_user_feedback'"

METADATA_KEY_COLLISION_DURING_DEEPMERGE = (
Expand Down
13 changes: 13 additions & 0 deletions src/comet_llm/message_processing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
# *******************************************************
# ____ _ _
# / ___|___ _ __ ___ ___| |_ _ __ ___ | |
# | | / _ \| '_ ` _ \ / _ \ __| | '_ ` _ \| |
# | |__| (_) | | | | | | __/ |_ _| | | | | | |
# \____\___/|_| |_| |_|\___|\__(_)_| |_| |_|_|
#
# Sign up for free at https://www.comet.com
# Copyright (C) 2015-2024 Comet ML INC
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this package.
# *******************************************************
31 changes: 31 additions & 0 deletions src/comet_llm/message_processing/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
# *******************************************************
# ____ _ _
# / ___|___ _ __ ___ ___| |_ _ __ ___ | |
# | | / _ \| '_ ` _ \ / _ \ __| | '_ ` _ \| |
# | |__| (_) | | | | | | __/ |_ _| | | | | | |
# \____\___/|_| |_| |_|\___|\__(_)_| |_| |_|_|
#
# Sign up for free at https://www.comet.com
# Copyright (C) 2015-2024 Comet ML INC
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this package.
# *******************************************************

from typing import Union

from .. import config
from . import offline_message_processor, online_message_processor

MESSAGE_PROCESSOR: Union[
offline_message_processor.OfflineMessageProcessor,
online_message_processor.OnlineMessageProcessor,
]

if config.offline_enabled():
MESSAGE_PROCESSOR = offline_message_processor.OfflineMessageProcessor(
offline_directory=config.offline_folder_path(),
file_usage_duration=config.offline_batch_duration_seconds(),
)
else:
MESSAGE_PROCESSOR = online_message_processor.OnlineMessageProcessor()
72 changes: 72 additions & 0 deletions src/comet_llm/message_processing/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
# *******************************************************
# ____ _ _
# / ___|___ _ __ ___ ___| |_ _ __ ___ | |
# | | / _ \| '_ ` _ \ / _ \ __| | '_ ` _ \| |
# | |__| (_) | | | | | | __/ |_ _| | | | | | |
# \____\___/|_| |_| |_|\___|\__(_)_| |_| |_|_|
#
# Sign up for free at https://www.comet.com
# Copyright (C) 2015-2023 Comet ML INC
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this package.
# *******************************************************

import dataclasses
import inspect
from typing import Any, ClassVar, Dict, List, Optional, Union

from comet_llm.types import JSONEncodable

from .. import experiment_info, logging_messages


@dataclasses.dataclass
class BaseMessage:
experiment_info_: experiment_info.ExperimentInfo
VERSION: ClassVar[int]

@classmethod
def from_dict(
cls, d: Dict[str, Any], api_key: Optional[str] = None
) -> "BaseMessage":
d.pop("VERSION") #

experiment_info_dict: Dict[str, Optional[str]] = d.pop("experiment_info_")
experiment_info_ = experiment_info.get(
**experiment_info_dict,
api_key=api_key,
api_key_not_found_message=logging_messages.API_KEY_NOT_CONFIGURED
)

return cls(experiment_info_=experiment_info_, **d)

def to_dict(self) -> Dict[str, Any]:
result = dataclasses.asdict(self)

del result["experiment_info_"]["api_key"]
result["VERSION"] = self.VERSION

return result


@dataclasses.dataclass
class PromptMessage(BaseMessage):
prompt_asset_data: Dict[str, Any]
duration: Optional[float]
metadata: Optional[Dict[str, Union[str, bool, float, None]]]
tags: Optional[List[str]]

VERSION: ClassVar[int] = 1


@dataclasses.dataclass
class ChainMessage(BaseMessage):
chain_data: Dict[str, JSONEncodable]
duration: float
tags: Optional[List[str]]
metadata: Optional[Dict[str, JSONEncodable]]
others: Dict[str, JSONEncodable]
# 'other' - is a name of an attribute of experiment, logged via log_other

VERSION: ClassVar[int] = 1
71 changes: 71 additions & 0 deletions src/comet_llm/message_processing/offline_message_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# *******************************************************
# ____ _ _
# / ___|___ _ __ ___ ___| |_ _ __ ___ | |
# | | / _ \| '_ ` _ \ / _ \ __| | '_ ` _ \| |
# | |__| (_) | | | | | | __/ |_ _| | | | | | |
# \____\___/|_| |_| |_|\___|\__(_)_| |_| |_|_|
#
# Sign up for free at https://www.comet.com
# Copyright (C) 2015-2024 Comet ML INC
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this package.
# *******************************************************

import logging
import os
import pathlib
import random
import threading
import time
from typing import Optional

from . import messages
from .offline_senders import chain, prompt

LOGGER = logging.getLogger(__name__)


class OfflineMessageProcessor:
def __init__(self, offline_directory: str, file_usage_duration: float) -> None:
self._offline_directory = offline_directory
self._batch_duration_seconds = file_usage_duration
self._lock = threading.Lock()

self._current_file_started_at: Optional[float] = None
self._current_file_name: Optional[str] = None

os.makedirs(self._offline_directory, exist_ok=True)

def process(self, message: messages.BaseMessage) -> None:
with self._lock:
self._update_current_file_if_needed()
assert self._current_file_name is not None
file_path = pathlib.Path(self._offline_directory, self._current_file_name)

if isinstance(message, messages.PromptMessage):
try:
return prompt.send(message, str(file_path))
except Exception:
LOGGER.error("Failed to log prompt", exc_info=True)
elif isinstance(message, messages.ChainMessage):
try:
return chain.send(message, str(file_path))
except Exception:
LOGGER.error("Failed to log chain", exc_info=True)

LOGGER.debug(f"Unsupported message type {message}")
return None

def _update_current_file_if_needed(self) -> None:
current_time = time.time()

if (
self._current_file_started_at is None
or (current_time - self._current_file_started_at)
>= self._batch_duration_seconds
):
self._current_file_started_at = current_time
self._current_file_name = (
f"messages_{current_time}_{random.randint(1111,9999)}.jsonl"
)
13 changes: 13 additions & 0 deletions src/comet_llm/message_processing/offline_senders/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
# *******************************************************
# ____ _ _
# / ___|___ _ __ ___ ___| |_ _ __ ___ | |
# | | / _ \| '_ ` _ \ / _ \ __| | '_ ` _ \| |
# | |__| (_) | | | | | | __/ |_ _| | | | | | |
# \____\___/|_| |_| |_|\___|\__(_)_| |_| |_|_|
#
# Sign up for free at https://www.comet.com
# Copyright (C) 2015-2024 Comet ML INC
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this package.
# *******************************************************
26 changes: 26 additions & 0 deletions src/comet_llm/message_processing/offline_senders/chain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# *******************************************************
# ____ _ _
# / ___|___ _ __ ___ ___| |_ _ __ ___ | |
# | | / _ \| '_ ` _ \ / _ \ __| | '_ ` _ \| |
# | |__| (_) | | | | | | __/ |_ _| | | | | | |
# \____\___/|_| |_| |_|\___|\__(_)_| |_| |_|_|
#
# Sign up for free at https://www.comet.com
# Copyright (C) 2015-2024 Comet ML INC
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this package.
# *******************************************************

import json
import pathlib

from .. import messages


def send(message: messages.ChainMessage, file_name: str) -> None:
to_dump = {"type": "ChainMessage", "message": message.to_dict()}
with open(file_name, mode="at", encoding="utf-8") as out_stream:
out_stream.write(json.dumps(to_dump) + "\n")

return None
Loading
Loading