Optimize KnowledgeBase to complete workflow (#1598)

* optimize KnowledgeBase to complete workflow;
* Update Knowledge methods of handle data IO;
* Update task to handle multi recorders;
* Integrate Knowledge to workflow;

* optimize KnowledgeBase to complete workflow
* Update TrainTask & AnalyseTask's recorder method;
* Update SummarizeTask;
* Update Workflow & Topic prompt;
This commit is contained in:
Fivele-Li 2023-07-17 18:17:04 +08:00 коммит произвёл GitHub
Родитель 1c9841b15e
Коммит 8c1905d1d7
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 162 добавлений и 107 удалений

Просмотреть файл

@ -7,7 +7,7 @@ import yaml
from qlib.workflow import R
from qlib.finco.log import FinCoLog
from qlib.finco.llm import APIBackend
from qlib.finco.utils import similarity, random_string
from qlib.finco.utils import similarity, random_string, SingletonBaseClass
logger = FinCoLog()
@ -140,8 +140,10 @@ class Knowledge:
Return
------
"""
knowledge = []
for storage in self.storages:
self.knowledge.extend(storage.documents)
knowledge.extend(storage.documents)
self.knowledge = knowledge
@classmethod
def load(cls, path: Union[str, Path]):
@ -212,12 +214,16 @@ class PracticeKnowledge(Knowledge):
self.summarize()
def add(self, docs: List):
storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(YamlStorage.DEFAULT_NAME))
storage.add(documents=docs)
self.storages.append(storage)
self.summarize()
def add(self, docs: List, storage_name: str = YamlStorage.DEFAULT_NAME):
storage = self.get_storage(storage_name)
if storage is None:
storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(storage_name))
storage.add(documents=docs)
self.storages.append(storage)
else:
storage.add(documents=docs)
self.summarize()
self.save()
@ -232,18 +238,27 @@ class FinanceKnowledge(Knowledge):
storage = self.get_storage(YamlStorage.DEFAULT_NAME)
if len(storage.documents) == 0:
docs = self.read_files_in_directory(self.workdir.joinpath(self.name))
docs.extend([
{"content": "[Success]: XXXX, the results looks reasonable # Keywords: supervised learning, data"},
{"content": "[Fail]: XXXX, it raise memory error due to YYYYY "
"# Keywords: supervised learning, data"}])
self.add(docs)
def add(self, docs: List):
storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(YamlStorage.DEFAULT_NAME))
storage.add(documents=docs)
self.storages.append(storage)
self.summarize()
def add(self, docs: List, storage_name: str = YamlStorage.DEFAULT_NAME):
storage = self.get_storage(storage_name)
if storage is None:
storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(storage_name))
storage.add(documents=docs)
self.storages.append(storage)
else:
storage.add(documents=docs)
self.summarize()
self.save()
@staticmethod
def read_files_in_directory(directory):
def read_files_in_directory(directory) -> List:
"""
read all .txt files under directory
"""
@ -265,12 +280,24 @@ class ExecuteKnowledge(Knowledge):
super().__init__(storages=storages, name="execute")
self.summarize()
def add(self, docs: List):
storage = YamlStorage(path=self.workdir.joinpath(YamlStorage.DEFAULT_NAME))
storage.add(documents=docs)
self.storages.append(storage)
storage = self.get_storage(YamlStorage.DEFAULT_NAME)
if len(storage.documents) == 0:
docs = [{"content": "[Success]: XXXX, the results looks reasonable # Keywords: supervised learning, data"},
{"content": "[Fail]: XXXX, it raise memory error due to YYYYY "
"# Keywords: supervised learning, data"}]
self.add(docs)
self.summarize()
def add(self, docs: List, storage_name: str = YamlStorage.DEFAULT_NAME):
storage = self.get_storage(storage_name)
if storage is None:
storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(storage_name))
storage.add(documents=docs)
self.storages.append(storage)
else:
storage.add(documents=docs)
self.summarize()
self.save()
@ -285,17 +312,26 @@ class InfrastructureKnowledge(Knowledge):
storage = self.get_storage(YamlStorage.DEFAULT_NAME)
if len(storage.documents) == 0:
docs = self.get_functions_and_docstrings(Path(__file__).parent.parent.parent)
docs.extend([{"docstring": "All the models can be import from `qlib.contrib.models` "
"# Keywords: supervised learning"},
{"docstring": "The API to run rolling models can be found in … #Keywords: control"},
{"docstring": "Here are a list of Qlibs available analyzers. #KEYWORDS: analysis"}])
self.add(docs)
def add(self, docs: List):
storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(YamlStorage.DEFAULT_NAME))
storage.add(documents=docs)
self.storages.append(storage)
self.summarize()
def add(self, docs: List, storage_name: str = YamlStorage.DEFAULT_NAME):
storage = self.get_storage(storage_name)
if storage is None:
storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(storage_name))
storage.add(documents=docs)
self.storages.append(storage)
else:
storage.add(documents=docs)
self.summarize()
self.save()
def get_functions_and_docstrings(self, directory):
def get_functions_and_docstrings(self, directory) -> List:
"""
get all method and docstring in .py files under directory
@ -350,15 +386,16 @@ class Topic:
self.logger = FinCoLog()
def summarize(self, docs: list):
self.logger.info(f"Summarize topic: \nname: {self.name}\ndescribe: {self.describe.module}")
self.logger.info(f"Summarize Topic \nname: {self.name}\ndescribe: {self.describe.module}")
prompt_workflow_selection = self.describe.render(docs=docs)
response = APIBackend().build_messages_and_create_chat_completion(user_prompt=prompt_workflow_selection)
self.knowledge = response
self.docs = docs
self.logger.info(f"Summary of {self.name}:\n{self.knowledge}")
class KnowledgeBase:
class KnowledgeBase(SingletonBaseClass):
"""
Load knowledge, offer brief information of knowledge and common handle interfaces
"""
@ -431,10 +468,10 @@ class KnowledgeBase:
knowledge = self.infrastructure_knowledge.knowledge
else:
knowledge = (
self.execute_knowledge.knowledge
+ self.practice_knowledge.knowledge
+ self.finance_knowledge.knowledge
+ self.infrastructure_knowledge.knowledge
self.execute_knowledge.knowledge
+ self.practice_knowledge.knowledge
+ self.finance_knowledge.knowledge
+ self.infrastructure_knowledge.knowledge
)
return knowledge
@ -461,8 +498,12 @@ class KnowledgeBase:
similar_n_docs = [knowledge[i] for i in similar_n_indexes]
prompt = Template(
"""find the most relevant doc with this query: '{{content}}' from docs='{{docs}}'.
Just return the most relevant item I provided, no more explain. For example: {'function': 'config.resolve_path', 'docstring': None}"""
"""find the most relevant doc with this query: '{{content}}'
from docs='{{docs}}. Just return the most relevant item I provided, no more explain.
For example:
user: find the most relevant doc with this query: ab \n from docs = {abc, xyz, lmn}.
response: abc
"""
)
prompt_workflow_selection = prompt.render(content=content, docs=similar_n_docs)
response = APIBackend().build_messages_and_create_chat_completion(
@ -470,3 +511,7 @@ class KnowledgeBase:
)
return response
# perhaps init KnowledgeBase in other place
KnowledgeBase(workdir=Path.cwd().joinpath('knowledge'))

Просмотреть файл

@ -993,7 +993,7 @@ SummarizeTask_context_user : |-
Here is my information: '{{key}}:{{value}}'
SummarizeTask_metrics_system : |-
Your purpose is to summarize the information by metrics in markdown format.
Your purpose is to summarize the information by metrics in markdown format. If possible, try to display data in percentages.
SummarizeTask_metrics_user : |-
Here is my information: '{{information}}'
@ -1012,7 +1012,10 @@ LearnManager_user : |-
you will adjust {{task}}'s system prompt to:
Topic_IC : |-
Summarize the influence of parameters on IC: {{docs}}
Summarize the influence of parameters on IC: {{docs}}. (Example response: Max draw-down become larger over time)
Topic_MaxDropDown : |-
Summarize the influence of parameters on max dropdown: {{docs}}
Summarize the influence of parameters on max dropdown: {{docs}}. (Example response: Max draw-down become larger over time)
Topic_RollingModel : |-
What conclusion can you draw from: {{docs}}. Answer questions as concisely as possible. (Example response: rolling model is good at making the Max draw-down smaller.)

Просмотреть файл

@ -18,7 +18,7 @@ from qlib.contrib.analyzer import HFAnalyzer, SignalAnalyzer
from qlib.workflow import R
from qlib.finco.log import FinCoLog, LogColors
from qlib.finco.conf import Config
from qlib.finco.knowledge import KnowledgeBase, Topic
from qlib.finco.knowledge import KnowledgeBase
from qlib.finco.context import Design, Exp, WorkflowContextManager
@ -401,6 +401,8 @@ class TrainTask(Task):
if confirm is False:
return []
# todo: change global R.uri & experiment name
R.set_uri(Path(workspace).joinpath("mlruns").as_uri())
if not self._rolling:
command = ["qrun", str(workflow_path)]
try:
@ -415,10 +417,11 @@ class TrainTask(Task):
encoding="utf8",
cwd=str(workspace),
)
exp = R.get_exp(experiment_name="finCo")
except subprocess.CalledProcessError as e:
print(f"An error occurred while running the subprocess: {e.stderr} {e.stdout}")
real_error = e.stderr + e.stdout
real_error = e.stderr+e.stdout
KnowledgeBase().execute_knowledge.add([real_error])
if "data" in e.stdout.lower() or "handler" in e.stdout.lower():
@ -451,17 +454,27 @@ class TrainTask(Task):
# Run the command and capture the output
workspace = self._context_manager.struct_context.workspace
subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True, cwd=str(workspace)
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True,
text=True, encoding="utf8", cwd=str(workspace)
)
# todo: dont manage record by id, experiment_id=2 doesnt contains metrics
exp = R.get_exp(experiment_id="3")
else:
command = f"python -m qlib.contrib.rolling ddgda --conf_path {workflow_path} run"
# Run the command and capture the output
workspace = self._context_manager.struct_context.workspace
subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True, cwd=str(workspace)
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True,
encoding="utf8", text=True, cwd=str(workspace)
)
exp = R.get_exp(experiment_id="3")
return [AnalysisTask()]
# first recorder is the latest
recorder = exp.list_recorders(rtype=exp.RT_L)[0]
self._context_manager.set_context(f"experiment_{self._experiment_index}_recorder", recorder)
return []
def summarize(self):
if self._output is not None:
@ -520,30 +533,25 @@ class AnalysisTask(Task):
if isinstance(analysers, list) and len(analysers):
self.logger.info(f"selected analysers: {analysers}", plain=True)
experiment_count = self._context_manager.get_context("experiment_count")
workflow_config = (
self._context_manager.get_context("workflow_config")
if self._context_manager.get_context("workflow_config")
else "workflow_config.yaml"
)
workspace = self._context_manager.get_context("workspace")
for exp_id in range(1, experiment_count + 1):
recorder = self._context_manager.get_context(f"experiment_{exp_id}_recorder")
# todo: analysis multi experiment(get recorder by id)
experiment_name = "workflow"
R.set_uri(Path.joinpath(workspace, "mlruns").as_uri())
tasks = []
for analyser in analysers:
if analyser in self.__ANALYZERS_PROJECT.keys():
tasks.append(
self.__ANALYZERS_PROJECT.get(analyser)(
recorder=R.get_recorder(experiment_name=experiment_name), output_dir=workspace
tasks = []
for analyser in analysers:
if analyser in self.__ANALYZERS_PROJECT.keys():
tasks.append(
self.__ANALYZERS_PROJECT.get(analyser)(
recorder=recorder, output_dir=workspace
)
)
)
for task in tasks:
resp = task.analyse()
self._context_manager.set_context(resp, task.__class__.__doc__)
# todo: set by experiment
for task in tasks:
resp = task.analyse()
self._context_manager.set_context(resp, task.__class__.__doc__)
return []
@ -1100,7 +1108,7 @@ class CodeDumpTask(ActionTask):
class SummarizeTask(Task):
__DEFAULT_SUMMARIZE_CONTEXT = ["workflow_yaml", "metrics"]
__DEFAULT_SUMMARIZE_CONTEXT = ["metrics"]
# TODO: 2048 is close to exceed GPT token limit
__MAX_LENGTH_OF_FILE = 2048
@ -1129,63 +1137,61 @@ class SummarizeTask(Task):
def execute(self) -> Any:
workspace = self._context_manager.get_context("workspace")
user_prompt = self._context_manager.get_context("user_prompt")
workflow_yaml = self._context_manager.get_context("workflow_yaml")
file_info = self.get_info_from_file(workspace)
context_info = self.get_info_from_context() # too long context make response unstable.
# todo: experiments perhaps have the same name, summarize experiment by loop
record_info = self.get_info_from_recorder(workspace, "workflow")
figure_path = self.get_figure_path(workspace)
information = context_info + file_info + record_info
def _get_value_from_info(info: list, k: str):
for i in information:
if k in i.keys():
return i.get(k)
return ""
# todo: remove 'be' after test
be = APIBackend()
be.debug_mode = False
context_summary = {}
for key in self.__DEFAULT_SUMMARIZE_CONTEXT:
prompt_workflow_selection = self.summarize_context_user.render(
key=key, value=_get_value_from_info(info=information, k=key)
)
response = be.build_messages_and_create_chat_completion(
user_prompt=prompt_workflow_selection, system_prompt=self.summarize_context_system.render()
)
context_summary.update({key: response})
def _get_value_from_info(info: list, k: str):
for i in info:
if k in i.keys():
return i.get(k)
return ""
recorder = R.get_recorder(experiment_name="workflow")
recorder.save_objects(context_summary=context_summary)
experiment_count = self._context_manager.get_context("experiment_count")
for exp_id in range(1, experiment_count + 1):
recorder = self._context_manager.get_context(f"experiment_{exp_id}_recorder")
reason = self._context_manager.get_context(f"experiment_{exp_id}_config_finetune_reason")
workflow_yaml = self._context_manager.get_context(f"workflow_{exp_id}_yaml")
record_info = [{"metrics": recorder.list_metrics()}]
prompt_workflow_selection = self.summarize_metrics_user.render(
information=_get_value_from_info(info=record_info, k="metrics"), user_prompt=user_prompt
)
metrics_response = be.build_messages_and_create_chat_completion(
user_prompt=prompt_workflow_selection, system_prompt=self.summarize_metrics_system.render()
)
information = context_info + file_info + record_info
context_summary = {}
for key in self.__DEFAULT_SUMMARIZE_CONTEXT:
prompt_workflow_selection = self.summarize_context_user.render(
key=key, value=_get_value_from_info(info=information, k=key)
)
response = be.build_messages_and_create_chat_completion(
user_prompt=prompt_workflow_selection, system_prompt=self.summarize_context_system.render()
)
context_summary.update({key: response})
recorder.save_objects(context_summary=context_summary)
prompt_workflow_selection = self.summarize_metrics_user.render(
information=_get_value_from_info(info=record_info, k="metrics"), user_prompt=user_prompt
)
metrics_response = be.build_messages_and_create_chat_completion(
user_prompt=prompt_workflow_selection, system_prompt=self.summarize_metrics_system.render()
)
KnowledgeBase().practice_knowledge.add([{"user_intention": user_prompt, "experiment_id": exp_id,
"workflow": workflow_yaml, "reason": reason,
"experiment_metrics": metrics_response}])
prompt_workflow_selection = self.user.render(
information=file_info + [{"metrics": metrics_response}], figure_path=figure_path, user_prompt=user_prompt
information=file_info + KnowledgeBase().practice_knowledge.knowledge[-2:],
figure_path=figure_path, user_prompt=user_prompt
)
response = be.build_messages_and_create_chat_completion(
user_prompt=prompt_workflow_selection, system_prompt=self.system.render()
)
KnowledgeBase().practice_knowledge.add(
[{"user_intention": user_prompt, "experiment_metrics": metrics_response}]
)
# notes: summarize after all experiment added to KnowledgeBase
topic = Topic(name="rollingModel", describe=Template("What conclusion can you draw"))
topic.summarize(KnowledgeBase().practice_knowledge.knowledge)
self.logger.info(f"Summary of topic: {topic.name}: {topic.knowledge}")
self._context_manager.set_context("summary", response)
self.save_markdown(content=response, path=workspace)
self.logger.info(f"Report has saved to {self.__DEFAULT_REPORT_NAME}", title="End")
@ -1209,7 +1215,8 @@ class SummarizeTask(Task):
result = []
for file in file_list:
postfix = file.name.split(".")[-1]
if postfix in ["py", "log", "yaml"]:
# todo: filter file info more reasonable
if postfix in ["py", "log", "yaml"] and file.name.startswith("experiment"):
with open(file) as f:
content = f.read()
self.logger.info(f"file to summarize: {file}", plain=True)

Просмотреть файл

@ -1,8 +1,9 @@
import sys
import shutil
from typing import List
from pathlib import Path
from qlib.finco.task import HighLevelPlanTask, SummarizeTask
from qlib.finco.task import HighLevelPlanTask, SummarizeTask, AnalysisTask
from qlib.finco.prompt_template import PromptTemplate, Template
from qlib.finco.log import FinCoLog, LogColors
from qlib.finco.llm import APIBackend
@ -52,7 +53,7 @@ class WorkflowManager:
self.prompt_template = PromptTemplate()
self.context = WorkflowContextManager(workspace=self._workspace)
self.default_user_prompt = "Please help me build a low turnover strategy that focus more on longterm return in China A csi300. Please help to use lightgbm model."
self.default_user_prompt = "build an A-share stock market daily portfolio in quantitative investment and minimize the maximum drawdown."
def _confirm_and_rm(self):
# if workspace exists, please confirm and remove it. Otherwise exit.
@ -114,7 +115,7 @@ class WorkflowManager:
self.logger.info(f"user_prompt: {self.get_context().get_context('user_prompt')}", title="Start")
# NOTE: list may not be enough for general task list
task_list = [HighLevelPlanTask(), SummarizeTask()]
task_list = [HighLevelPlanTask(), AnalysisTask(), SummarizeTask()]
task_finished = []
while len(task_list):
task_list_info = [str(task) for task in task_list]
@ -143,7 +144,7 @@ class WorkflowManager:
class LearnManager:
__DEFAULT_TOPICS = ["IC", "MaxDropDown"]
__DEFAULT_TOPICS = ["IC", "MaxDropDown", "RollingModel"]
def __init__(self):
self.epoch = 0
@ -152,9 +153,7 @@ class LearnManager:
self.topics = [
Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in self.__DEFAULT_TOPICS
]
self.knowledge_base = KnowledgeBase(workdir=Path.cwd().joinpath("knowledge"))
self.knowledge_base.execute_knowledge.add([])
self.knowledge_base.query(knowledge_type="infrastructure", content="resolve_path")
self.knowledge_base = KnowledgeBase()
def run(self, prompt):
# todo: add early stop condition
@ -180,7 +179,8 @@ class LearnManager:
user_prompt = self.wm.context.get_context("user_prompt")
summary = self.wm.context.get_context("summary")
[topic.summarize(self.knowledge_base.get_knowledge()) for topic in self.topics]
[topic.summarize(self.knowledge_base.practice_knowledge.knowledge[-2:]) for topic in self.topics]
[self.knowledge_base.practice_knowledge.add([{"practice_knowledge": topic.knowledge}]) for topic in self.topics]
knowledge_of_topics = [{topic.name: topic.knowledge} for topic in self.topics]
for task in task_finished: