285 lines
13 KiB
Python
Executable File
285 lines
13 KiB
Python
Executable File
# From https://github.com/hwchase17/langchain/tree/master/langchain/experimental/generative_agents
|
|
"""
|
|
The MIT License
|
|
|
|
Copyright (c) Harrison Chase
|
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
of this software and associated documentation files (the "Software"), to deal
|
|
in the Software without restriction, including without limitation the rights
|
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
copies of the Software, and to permit persons to whom the Software is
|
|
furnished to do so, subject to the following conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included in
|
|
all copies or substantial portions of the Software.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
THE SOFTWARE.
|
|
"""
|
|
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
from langchain import LLMChain
|
|
from langchain.base_language import BaseLanguageModel
|
|
from langchain.experimental.generative_agents.memory import GenerativeAgentMemory
|
|
from langchain.prompts import PromptTemplate
|
|
|
|
from .memory import GenerativeAgentMemory
|
|
from .prompts import get_prompt, get_stop_tokens
|
|
|
|
class GenerativeAgent(BaseModel):
|
|
"""A character with memory and innate characteristics."""
|
|
|
|
name: str
|
|
"""The character's name."""
|
|
|
|
sex: str
|
|
"""The character's sex."""
|
|
|
|
age: Optional[int] = None
|
|
"""The optional age of the character."""
|
|
traits: str = "N/A"
|
|
"""Permanent traits to ascribe to the character."""
|
|
status: str
|
|
"""The traits of the character you wish not to change."""
|
|
memory: GenerativeAgentMemory
|
|
"""The memory object that combines relevance, recency, and 'importance'."""
|
|
llm: BaseLanguageModel
|
|
"""The underlying language model."""
|
|
verbose: bool = True
|
|
summary: str = "N/A" #: :meta private:
|
|
"""Stateful self-summary generated via reflection on the character's memory."""
|
|
|
|
summary_refresh_seconds: int = 3600 #: :meta private:
|
|
"""How frequently to re-generate the summary."""
|
|
|
|
last_refreshed: datetime = Field(default_factory=datetime.now) # : :meta private:
|
|
"""The last time the character's summary was regenerated."""
|
|
|
|
summaries: List[str] = Field(default_factory=list) # : :meta private:
|
|
"""Summary of the events in the plan that the agent took."""
|
|
|
|
class Config:
|
|
"""Configuration for this pydantic object."""
|
|
|
|
arbitrary_types_allowed = True
|
|
|
|
# LLM-related methods
|
|
@staticmethod
|
|
def _parse_list(text: str) -> List[str]:
|
|
"""Parse a newline-separated string into a list of strings."""
|
|
lines = re.split(r"\n", text.strip())
|
|
return [re.sub(r"^\s*\d+\.\s*", "", line).strip() for line in lines]
|
|
|
|
def chain(self, prompt: PromptTemplate) -> LLMChain:
|
|
return LLMChain(
|
|
llm=self.llm, prompt=prompt, verbose=self.verbose, memory=self.memory
|
|
)
|
|
|
|
def _get_entity_from_observation(self, observation: str) -> str:
|
|
prompt = PromptTemplate.from_template(get_prompt('entity_from_observation'))
|
|
response = self.chain(prompt).run(stop=get_stop_tokens([".", "(", "'"]), observation=observation).strip()
|
|
if self.verbose:
|
|
print(response)
|
|
return response
|
|
|
|
def _get_entity_action(self, observation: str, entity_name: str) -> str:
|
|
prompt = PromptTemplate.from_template(get_prompt('entity_action'))
|
|
response = self.chain(prompt).run(stop=get_stop_tokens(), entity=entity_name, observation=observation).strip()
|
|
if self.verbose:
|
|
print(response)
|
|
return response
|
|
|
|
def get_most_recent_memories(self, last_k: int = 4) -> str:
|
|
memories = self.memory.memory_retriever.memory_stream[-last_k:]
|
|
return [ document.page_content for document in memories ]
|
|
|
|
def summarize_related_memories(self, observation: str) -> str:
|
|
"""Summarize memories that are most relevant to an observation."""
|
|
prompt = PromptTemplate.from_template(get_prompt('summarize_related_memories'))
|
|
q1 = f"What is the relationship between the subjects in that interaction?"
|
|
summary = self.chain(prompt=prompt).run(name=self.name, stop=get_stop_tokens(), q1=q1, observation=observation, queries=[observation]).strip()
|
|
"""
|
|
entity_name = self._get_entity_from_observation(observation).split("\n")[0].strip()
|
|
q1 = f"What is the relationship between {self.name} and {entity_name}"
|
|
if self.name.strip() in entity_name:
|
|
return "N/A"
|
|
|
|
entity_action = self._get_entity_action(observation, entity_name)
|
|
q2 = f"{entity_name} is {entity_action}"
|
|
summary = self.chain(prompt=prompt).run(name=self.name, stop=get_stop_tokens(), q1=q1, queries=[q1, q2]).strip()
|
|
"""
|
|
return f'{self.name} {summary}'
|
|
|
|
#return self.chain(prompt=prompt).run(stop=get_stop_tokens(), q1=q1, q2=q2).strip()
|
|
|
|
def _generate_reaction(self, observation: str, suffix: str) -> str:
|
|
"""React to a given observation or dialogue act."""
|
|
prompt = PromptTemplate.from_template(
|
|
get_prompt('generate_reaction').replace("{suffix}", suffix)
|
|
)
|
|
summary = self.get_summary().replace(u"\u200B", "").strip()
|
|
relevant_memories = self.summarize_related_memories(observation).replace(u"\u200B", "").strip()
|
|
recent_memories = "\n".join(self.get_most_recent_memories())
|
|
|
|
# I think relevant_memories is suppose to only provide context for a relationship between agent and observer, as suggested with the query
|
|
# but the original implementation seems to just leverage it to further filter relevant memories, per the name
|
|
|
|
if relevant_memories and relevant_memories != "N/A":
|
|
memory = relevant_memories
|
|
else:
|
|
memory = "\n".join(self.get_most_recent_memories())
|
|
|
|
current_time_str = datetime.now().strftime("%B %d, %Y, %I:%M %p")
|
|
kwargs: Dict[str, Any] = dict(
|
|
current_time=current_time_str,
|
|
name=self.name,
|
|
status=self.status if self.status else "N/A",
|
|
summary=summary if summary else "N/A",
|
|
memory=memory if memory else "N/A",
|
|
#relevant_memories=relevant_memories if relevant_memories else "N/A",
|
|
#recent_memories=recent_memories if recent_memories else "N/A",
|
|
observation=observation if observation else "N/A",
|
|
)
|
|
reaction = self.chain(prompt=prompt).run(stop=get_stop_tokens(), **kwargs).strip()
|
|
if self.verbose:
|
|
print(reaction)
|
|
return reaction
|
|
|
|
def _clean_response(self, text: str) -> str:
|
|
return re.sub(f"^{self.name} ", "", text.strip()).strip()
|
|
|
|
def generate_response(self, observation: str) -> Tuple[bool, str]:
|
|
"""React to a given observation."""
|
|
call_to_action_template = get_prompt('suffix_generate_response')
|
|
full_result = f"{self.name} {self._generate_reaction(observation, call_to_action_template)}"
|
|
|
|
self.memory.save_context(
|
|
{},
|
|
{
|
|
self.memory.add_memory_key: full_result
|
|
},
|
|
)
|
|
|
|
return True, full_result
|
|
|
|
def generate_reaction(self, observation: str) -> Tuple[bool, str]:
|
|
"""React to a given observation."""
|
|
full_result = self._generate_reaction(observation, get_prompt('suffix_generate_reaction'))
|
|
candidates = full_result.replace(u"\u200B", "").strip().split("\n")
|
|
|
|
response = ""
|
|
results = []
|
|
|
|
for candidate in candidates:
|
|
if "REACT:" in candidate or "SAY:" in candidate:
|
|
# can't be assed to iteratively replace
|
|
candidate = candidate.strip().replace("React:", "REACT:").replace("Say:", "SAY:")
|
|
results.append(f'{candidate}'.replace("SAY:", "said").replace(f"REACT: {self.name}", "").replace("REACT:", ""))
|
|
if len(results) > 0:
|
|
response = " and ".join(results).strip().replace(" ", " ")
|
|
valid = True
|
|
else:
|
|
response = f"did not react in a relevant way"
|
|
valid = False
|
|
|
|
# AAA
|
|
self.memory.save_context(
|
|
{},
|
|
{
|
|
self.memory.add_memory_key: f"{self.name} observed: {observation}; {self.name}'s reaction: {response}"
|
|
},
|
|
)
|
|
|
|
return valid, f"{self.name} {response}"
|
|
|
|
"""
|
|
if "REACT:" in result:
|
|
reaction = self._clean_response(result.split("REACT:")[-1])
|
|
return True, f"{self.name} {reaction}"
|
|
if "SAY:" in result:
|
|
said_value = self._clean_response(result.split("SAY:")[-1])
|
|
return True, f"{self.name} said {said_value}"
|
|
else:
|
|
return False, f"{self.name} did not react in a relevant way"
|
|
"""
|
|
|
|
def generate_dialogue(self, observation: str) -> Tuple[bool, str]:
|
|
"""React to a given observation."""
|
|
call_to_action_template = (get_prompt('suffix_generate_dialogue'))
|
|
full_result = self._generate_reaction(observation, call_to_action_template)
|
|
result = full_result.strip().split("\n")[0]
|
|
if "GOODBYE:" in result:
|
|
farewell = self._clean_response(result.split("GOODBYE:")[-1])
|
|
self.memory.save_context(
|
|
{},
|
|
{
|
|
self.memory.add_memory_key: f"{self.name} observed: {observation}; {self.name}'s farewell response: {farewell}"
|
|
},
|
|
)
|
|
return False, f"{self.name} said {farewell}"
|
|
if "SAY:" in result:
|
|
response_text = self._clean_response(result.split("SAY:")[-1])
|
|
self.memory.save_context(
|
|
{},
|
|
{
|
|
self.memory.add_memory_key: f"{self.name} observed: {observation}; {self.name}'s response: {response_text}"
|
|
},
|
|
)
|
|
return True, f"{self.name} said {response_text}"
|
|
else:
|
|
return False, result
|
|
|
|
######################################################
|
|
# Agent stateful' summary methods. #
|
|
# Each dialog or response prompt includes a header #
|
|
# summarizing the agent's self-description. This is #
|
|
# updated periodically through probing its memories #
|
|
######################################################
|
|
def _compute_agent_summary(self) -> str:
|
|
""""""
|
|
# The agent seeks to think about their core characteristics.
|
|
prompt = PromptTemplate.from_template(get_prompt('compute_agent_summary'))
|
|
summary = self.chain(prompt).run(stop=get_stop_tokens(), name=self.name, summary=self.summaries[-1] if len(self.summaries) else self.summary, queries=[f"{self.name}'s core characteristics"]).strip()
|
|
if self.verbose:
|
|
print(summary)
|
|
return summary
|
|
|
|
def get_summary(self, force_refresh: bool = False) -> str:
|
|
"""Return a descriptive summary of the agent."""
|
|
current_time = datetime.now()
|
|
since_refresh = (current_time - self.last_refreshed).seconds
|
|
if (
|
|
not self.summary
|
|
or since_refresh >= self.summary_refresh_seconds
|
|
or force_refresh
|
|
):
|
|
self.summary = self._compute_agent_summary()
|
|
self.summaries.append(self.summary)
|
|
self.last_refreshed = current_time
|
|
|
|
values = [
|
|
f"Name: {self.name} (sex: {self.sex}, age: {self.age if self.age is not None else 'N/A'})",
|
|
f"Innate traits: {self.traits}",
|
|
f"Status: {self.status}"
|
|
]
|
|
|
|
return "\n".join([ value for value in values if value[-3:] != "N/A" ]) + f"\n{self.summary.strip()}"
|
|
|
|
def get_full_header(self, force_refresh: bool = False) -> str:
|
|
"""Return a full header of the agent's status, summary, and current time."""
|
|
summary = self.get_summary(force_refresh=force_refresh)
|
|
current_time_str = datetime.now().strftime("%B %d, %Y, %I:%M %p")
|
|
return (
|
|
f"{summary}\nIt is {current_time_str}.\n{self.name}'s status: {self.status}"
|
|
) |