generative-agents/src/utils.py
2023-05-09 23:57:54 +00:00

242 lines
7.9 KiB
Python
Executable File

import logging
logging.basicConfig(level=logging.ERROR)
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple
from termcolor import colored
import os
import copy
import math
import faiss
import re
import pickle
import random
from langchain.docstore import InMemoryDocstore
from langchain.retrievers import TimeWeightedVectorStoreRetriever
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.vectorstores import FAISS
# shit I can shove behind an env var
LLM_TYPE = os.environ.get('LLM_TYPE', "llamacpp") # options: llamacpp, oai
LLM_LOCAL_MODEL = os.environ.get('LLM_MODEL',
#"./models/ggml-vicuna-13b-1.1/ggml-vic13b-uncensored-q4_2.bin"
"./models/ggml-vicuna-13b-cocktail-v1-q5_0.bin"
#"./models/llama-13b-supercot-ggml/ggml-model-q4_2.bin"
#"./models/llama-33b-supercot-ggml/ggml-model-q4_2.bin"
#"./models/gpt4-x-alpasta-30b-ggml-q4_1.bin"
)
LLM_CONTEXT = int(os.environ.get('LLM_CONTEXT', '2048'))
LLM_THREADS = int(os.environ.get('LLM_THREADS', '6'))
LLM_TEMPERATURE = float(os.environ.get('LLM_TEMPERATURE', '0.99'))
EMBEDDING_TYPE = os.environ.get("LLM_EMBEDDING_TYPE", "hf") # options: llamacpp, oai, hf
#LLM_TYPE="oai"
#os.environ['OPENAI_API_BASE']="https://oai.ecker.tech/proxy/openai"
#os.environ['OPENAI_API_KEY']=""
# deduce a default given a model path
if LLM_TYPE=="oai":
LLM_PROMPT_TUNE_DEFAULT = "oai"
else:
if "supercot" in LLM_LOCAL_MODEL.lower():
LLM_PROMPT_TUNE_DEFAULT = "supercot"
elif "vicuna" in LLM_LOCAL_MODEL.lower():
LLM_PROMPT_TUNE_DEFAULT = "vicuna"
elif "alpasta" in LLM_LOCAL_MODEL.lower():
LLM_PROMPT_TUNE_DEFAULT = "alpasta"
elif "cocktail" in LLM_LOCAL_MODEL.lower():
LLM_PROMPT_TUNE_DEFAULT = "cocktail"
else:
LLM_PROMPT_TUNE_DEFAULT = "llama"
LLM_PROMPT_TUNE = os.environ.get('LLM_PROMPT_TUNE', LLM_PROMPT_TUNE_DEFAULT)
os.environ['LLM_PROMPT_TUNE'] = LLM_PROMPT_TUNE # sync it back to prompts
callback_manager = CallbackManager([StreamingStdOutCallbackHandler()]) # unncessesary but whatever
# Overrides for some fixes, like scoring memory and LLM-specific promptings
from ext import GenerativeAgent, GenerativeAgentMemory, get_roles
if LLM_TYPE=="llamacpp":
from langchain.llms import LlamaCpp
LLM = LlamaCpp(
model_path=LLM_LOCAL_MODEL,
callback_manager=callback_manager,
verbose=True,
n_ctx=LLM_CONTEXT,
temperature=LLM_TEMPERATURE,
#n_threads=LLM_THREADS,
#use_mlock=True,
#use_mmap=True,
)
elif LLM_TYPE=="oai":
from langchain.chat_models import ChatOpenAI
# Override for Todd
if os.environ.get('LANGCHAIN_OVERRIDE_RESULT', '1') == '1':
from langchain.schema import Generation, ChatResult, LLMResult, ChatGeneration
from langchain.chat_models.openai import _convert_dict_to_message
def _create_chat_result(self, response: Mapping[str, Any]) -> ChatResult:
token_usage = { "prompt_tokens": 5, "completion_tokens": 5, "total_tokens": 10 }
generations = []
for res in response["choices"]:
message = _convert_dict_to_message(res["message"])
gen = ChatGeneration(message=message)
generations.append(gen)
llm_output = {"token_usage": response["usage"] if "usage" in response else token_usage, "model_name": self.model_name}
return ChatResult(generations=generations, llm_output=llm_output)
ChatOpenAI._create_chat_result = _create_chat_result
LLM = ChatOpenAI(
max_tokens=LLM_CONTEXT,
temperature=LLM_TEMPERATURE,
model_name=os.environ.get('OPENAI_MODEL_NAME', 'gpt-4'),
)
else:
raise f"Invalid LLM type: {LLM_TYPE}"
if EMBEDDING_TYPE == "hf":
from langchain.embeddings import HuggingFaceEmbeddings
EMBEDDINGS_MODEL = HuggingFaceEmbeddings()
EMBEDDINGS_SIZE = 768
elif EMBEDDING_TYPE == "oai":
from langchain.embeddings import OpenAIEmbeddings
EMBEDDINGS_MODEL = OpenAIEmbeddings()
EMBEDDINGS_SIZE = 1536
elif EMBEDDING_TYPE == "llamacpp":
from langchain.embeddings import LlamaCppEmbeddings
EMBEDDINGS_MODEL = LlamaCppEmbeddings(
model_path=LLM_LOCAL_MODEL,
)
EMBEDDINGS_SIZE = 5120
else:
raise f"Invalid embedding type: {EMBEDDING_TYPE}"
def _relevance_score_fn(score: float) -> float:
if EMBEDDING_TYPE == "oai":
return 1.0 - score / math.sqrt(2)
NORM = 3.5
if EMBEDDING_TYPE == "llamacpp":
NORM = 14000.0
normalized = score / NORM
res = 1.0 - normalized
# print(score, normalized, res)
return res
def _create_new_memory_retriever():
"""Create a new vector store retriever unique to the agent."""
index = faiss.IndexFlatL2(EMBEDDINGS_SIZE)
vectorstore = FAISS(EMBEDDINGS_MODEL.embed_query, index, InMemoryDocstore({}), {}, relevance_score_fn=_relevance_score_fn)
return TimeWeightedVectorStoreRetriever(vectorstore=vectorstore, other_score_keys=["importance"], k=15)
def _create_new_memories():
return GenerativeAgentMemory(llm=LLM,
memory_retriever=_create_new_memory_retriever(),
reflection_threshold=8,
verbose=True,
max_tokens_limit=LLM_CONTEXT/2
)
def create_agent(**kwargs):
settings = {
"llm": LLM,
"verbose": True,
"sex": "Male",
"memory": _create_new_memories(),
}
settings.update(kwargs)
for k in settings:
if isinstance(settings[k], str):
settings[k] = settings[k].replace("{name}", settings["name"])
return GenerativeAgent(**settings)
def save_agent( agent ):
os.makedirs(f"./agents/", exist_ok=True)
obj = {
"name": agent.name,
"age": agent.age,
"sex": agent.sex,
"traits": agent.traits,
"status": agent.status,
"summary": agent.summary,
"summaries": agent.summaries,
"memory_retriever": agent.memory.memory_retriever,
}
path = f"./agents/{agent.name}.pth"
pickle.dump(obj, open(path, 'wb'))
print(f"Saved agent:", path)
def load_agent( name ):
path = f"./agents/{name}.pth"
obj = pickle.load(open(path, 'rb'))
agent = create_agent(**obj)
agent.memory.memory_retriever = obj["memory_retriever"]
print(f"Loaded agent:", path)
return agent
def get_summary(agent: GenerativeAgent, force_refresh: bool = True) -> str:
print(colored("[Summary]", "magenta"))
summary = agent.get_summary(force_refresh=force_refresh)
print(summary)
return summary
def agent_observes( agent: GenerativeAgent, observations: List[str], importance_score=0 ):
results = []
for observation in observations:
observation = observation.replace("{name}", agent.name)
print(colored("[Observation]", "magenta"), f'[{agent.name}] {observation}')
results.append(agent.memory.add_memory(observation, importance_score=importance_score))
return results
def agent_reacts( agent: GenerativeAgent, observations: List[str] ):
results = []
for observation in observations:
observation = observation.replace("{name}", agent.name)
print(colored("[Observation]", "magenta"), f'[{agent.name}] {observation}')
_, response = agent.generate_response(observation)
print(colored("[Reaction]", "magenta"), f'[{agent.name}] {response}')
results.append(response)
return results
def interview_agent(agent: GenerativeAgent, message: str) -> str:
message = message.replace("{name}", agent.name)
print(colored("[Interview]", "magenta"), f"[User] {message}")
_, response = agent.generate_response(message)
print(colored("[Interview]", "magenta"), f"[{agent.name}] {response}")
return response
def run_conversation(agents: List[GenerativeAgent], observation: str, limit: int = 0, p_reaction: float = 1 ) -> None:
print(colored("[Conversation]", "magenta"))
for agent in agents:
agent_observes( agent, [observation] )
agents = agents[1:] + [agents[0]]
dialogue = []
while True:
for agent in agents:
observation = agent_reacts( agent, [ observation ] )[0]
for a in agents:
if a is agent:
continue
agent_observes( a, [ observation ] )
if limit > 0 and len(dialogue) >= limit:
break
return dialogue