diff --git a/qlib/finco/knowledge.py b/qlib/finco/knowledge.py index f3f834ea..538345c5 100644 --- a/qlib/finco/knowledge.py +++ b/qlib/finco/knowledge.py @@ -7,7 +7,7 @@ import yaml from qlib.workflow import R from qlib.finco.log import FinCoLog from qlib.finco.llm import APIBackend -from qlib.finco.utils import similarity, random_string +from qlib.finco.utils import similarity, random_string, SingletonBaseClass logger = FinCoLog() @@ -140,8 +140,10 @@ class Knowledge: Return ------ """ + knowledge = [] for storage in self.storages: - self.knowledge.extend(storage.documents) + knowledge.extend(storage.documents) + self.knowledge = knowledge @classmethod def load(cls, path: Union[str, Path]): @@ -212,12 +214,16 @@ class PracticeKnowledge(Knowledge): self.summarize() - def add(self, docs: List): - storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(YamlStorage.DEFAULT_NAME)) - storage.add(documents=docs) - self.storages.append(storage) - self.summarize() + def add(self, docs: List, storage_name: str = YamlStorage.DEFAULT_NAME): + storage = self.get_storage(storage_name) + if storage is None: + storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(storage_name)) + storage.add(documents=docs) + self.storages.append(storage) + else: + storage.add(documents=docs) + self.summarize() self.save() @@ -232,18 +238,27 @@ class FinanceKnowledge(Knowledge): storage = self.get_storage(YamlStorage.DEFAULT_NAME) if len(storage.documents) == 0: docs = self.read_files_in_directory(self.workdir.joinpath(self.name)) + docs.extend([ + {"content": "[Success]: XXXX, the results looks reasonable # Keywords: supervised learning, data"}, + {"content": "[Fail]: XXXX, it raise memory error due to YYYYY " + "# Keywords: supervised learning, data"}]) self.add(docs) - - def add(self, docs: List): - storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(YamlStorage.DEFAULT_NAME)) - storage.add(documents=docs) - self.storages.append(storage) self.summarize() + def add(self, docs: List, storage_name: str = YamlStorage.DEFAULT_NAME): + storage = self.get_storage(storage_name) + if storage is None: + storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(storage_name)) + storage.add(documents=docs) + self.storages.append(storage) + else: + storage.add(documents=docs) + + self.summarize() self.save() @staticmethod - def read_files_in_directory(directory): + def read_files_in_directory(directory) -> List: """ read all .txt files under directory """ @@ -265,12 +280,24 @@ class ExecuteKnowledge(Knowledge): super().__init__(storages=storages, name="execute") self.summarize() - def add(self, docs: List): - storage = YamlStorage(path=self.workdir.joinpath(YamlStorage.DEFAULT_NAME)) - storage.add(documents=docs) - self.storages.append(storage) + storage = self.get_storage(YamlStorage.DEFAULT_NAME) + if len(storage.documents) == 0: + docs = [{"content": "[Success]: XXXX, the results looks reasonable # Keywords: supervised learning, data"}, + {"content": "[Fail]: XXXX, it raise memory error due to YYYYY " + "# Keywords: supervised learning, data"}] + self.add(docs) self.summarize() + def add(self, docs: List, storage_name: str = YamlStorage.DEFAULT_NAME): + storage = self.get_storage(storage_name) + if storage is None: + storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(storage_name)) + storage.add(documents=docs) + self.storages.append(storage) + else: + storage.add(documents=docs) + + self.summarize() self.save() @@ -285,17 +312,26 @@ class InfrastructureKnowledge(Knowledge): storage = self.get_storage(YamlStorage.DEFAULT_NAME) if len(storage.documents) == 0: docs = self.get_functions_and_docstrings(Path(__file__).parent.parent.parent) + docs.extend([{"docstring": "All the models can be import from `qlib.contrib.models` " + "# Keywords: supervised learning"}, + {"docstring": "The API to run rolling models can be found in … #Keywords: control"}, + {"docstring": "Here are a list of Qlib’s available analyzers. #KEYWORDS: analysis"}]) self.add(docs) - - def add(self, docs: List): - storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(YamlStorage.DEFAULT_NAME)) - storage.add(documents=docs) - self.storages.append(storage) self.summarize() + def add(self, docs: List, storage_name: str = YamlStorage.DEFAULT_NAME): + storage = self.get_storage(storage_name) + if storage is None: + storage = YamlStorage(path=self.workdir.joinpath(self.name).joinpath(storage_name)) + storage.add(documents=docs) + self.storages.append(storage) + else: + storage.add(documents=docs) + + self.summarize() self.save() - def get_functions_and_docstrings(self, directory): + def get_functions_and_docstrings(self, directory) -> List: """ get all method and docstring in .py files under directory @@ -350,15 +386,16 @@ class Topic: self.logger = FinCoLog() def summarize(self, docs: list): - self.logger.info(f"Summarize topic: \nname: {self.name}\ndescribe: {self.describe.module}") + self.logger.info(f"Summarize Topic \nname: {self.name}\ndescribe: {self.describe.module}") prompt_workflow_selection = self.describe.render(docs=docs) response = APIBackend().build_messages_and_create_chat_completion(user_prompt=prompt_workflow_selection) self.knowledge = response self.docs = docs + self.logger.info(f"Summary of {self.name}:\n{self.knowledge}") -class KnowledgeBase: +class KnowledgeBase(SingletonBaseClass): """ Load knowledge, offer brief information of knowledge and common handle interfaces """ @@ -431,10 +468,10 @@ class KnowledgeBase: knowledge = self.infrastructure_knowledge.knowledge else: knowledge = ( - self.execute_knowledge.knowledge - + self.practice_knowledge.knowledge - + self.finance_knowledge.knowledge - + self.infrastructure_knowledge.knowledge + self.execute_knowledge.knowledge + + self.practice_knowledge.knowledge + + self.finance_knowledge.knowledge + + self.infrastructure_knowledge.knowledge ) return knowledge @@ -461,8 +498,12 @@ class KnowledgeBase: similar_n_docs = [knowledge[i] for i in similar_n_indexes] prompt = Template( - """find the most relevant doc with this query: '{{content}}' from docs='{{docs}}'. - Just return the most relevant item I provided, no more explain. For example: {'function': 'config.resolve_path', 'docstring': None}""" + """find the most relevant doc with this query: '{{content}}' + from docs='{{docs}}. Just return the most relevant item I provided, no more explain. + For example: + user: find the most relevant doc with this query: ab \n from docs = {abc, xyz, lmn}. + response: abc + """ ) prompt_workflow_selection = prompt.render(content=content, docs=similar_n_docs) response = APIBackend().build_messages_and_create_chat_completion( @@ -470,3 +511,7 @@ class KnowledgeBase: ) return response + + +# perhaps init KnowledgeBase in other place +KnowledgeBase(workdir=Path.cwd().joinpath('knowledge')) diff --git a/qlib/finco/prompt_template.yaml b/qlib/finco/prompt_template.yaml index b15bf8a9..5ca56294 100644 --- a/qlib/finco/prompt_template.yaml +++ b/qlib/finco/prompt_template.yaml @@ -993,7 +993,7 @@ SummarizeTask_context_user : |- Here is my information: '{{key}}:{{value}}' SummarizeTask_metrics_system : |- - Your purpose is to summarize the information by metrics in markdown format. + Your purpose is to summarize the information by metrics in markdown format. If possible, try to display data in percentages. SummarizeTask_metrics_user : |- Here is my information: '{{information}}' @@ -1012,7 +1012,10 @@ LearnManager_user : |- you will adjust {{task}}'s system prompt to: Topic_IC : |- - Summarize the influence of parameters on IC: {{docs}} + Summarize the influence of parameters on IC: {{docs}}. (Example response: Max draw-down become larger over time) Topic_MaxDropDown : |- - Summarize the influence of parameters on max dropdown: {{docs}} \ No newline at end of file + Summarize the influence of parameters on max dropdown: {{docs}}. (Example response: Max draw-down become larger over time) + +Topic_RollingModel : |- + What conclusion can you draw from: {{docs}}. Answer questions as concisely as possible. (Example response: rolling model is good at making the Max draw-down smaller.) \ No newline at end of file diff --git a/qlib/finco/task.py b/qlib/finco/task.py index 9ae87a60..c6c4ee46 100644 --- a/qlib/finco/task.py +++ b/qlib/finco/task.py @@ -18,7 +18,7 @@ from qlib.contrib.analyzer import HFAnalyzer, SignalAnalyzer from qlib.workflow import R from qlib.finco.log import FinCoLog, LogColors from qlib.finco.conf import Config -from qlib.finco.knowledge import KnowledgeBase, Topic +from qlib.finco.knowledge import KnowledgeBase from qlib.finco.context import Design, Exp, WorkflowContextManager @@ -401,6 +401,8 @@ class TrainTask(Task): if confirm is False: return [] + # todo: change global R.uri & experiment name + R.set_uri(Path(workspace).joinpath("mlruns").as_uri()) if not self._rolling: command = ["qrun", str(workflow_path)] try: @@ -415,10 +417,11 @@ class TrainTask(Task): encoding="utf8", cwd=str(workspace), ) + exp = R.get_exp(experiment_name="finCo") except subprocess.CalledProcessError as e: print(f"An error occurred while running the subprocess: {e.stderr} {e.stdout}") - real_error = e.stderr + e.stdout + real_error = e.stderr+e.stdout KnowledgeBase().execute_knowledge.add([real_error]) if "data" in e.stdout.lower() or "handler" in e.stdout.lower(): @@ -451,17 +454,27 @@ class TrainTask(Task): # Run the command and capture the output workspace = self._context_manager.struct_context.workspace subprocess.run( - command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True, cwd=str(workspace) + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, + text=True, encoding="utf8", cwd=str(workspace) ) + # todo: dont manage record by id, experiment_id=2 doesnt contains metrics + exp = R.get_exp(experiment_id="3") + else: command = f"python -m qlib.contrib.rolling ddgda --conf_path {workflow_path} run" # Run the command and capture the output workspace = self._context_manager.struct_context.workspace subprocess.run( - command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True, cwd=str(workspace) + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, + encoding="utf8", text=True, cwd=str(workspace) ) + exp = R.get_exp(experiment_id="3") - return [AnalysisTask()] + # first recorder is the latest + recorder = exp.list_recorders(rtype=exp.RT_L)[0] + self._context_manager.set_context(f"experiment_{self._experiment_index}_recorder", recorder) + + return [] def summarize(self): if self._output is not None: @@ -520,30 +533,25 @@ class AnalysisTask(Task): if isinstance(analysers, list) and len(analysers): self.logger.info(f"selected analysers: {analysers}", plain=True) + experiment_count = self._context_manager.get_context("experiment_count") - workflow_config = ( - self._context_manager.get_context("workflow_config") - if self._context_manager.get_context("workflow_config") - else "workflow_config.yaml" - ) workspace = self._context_manager.get_context("workspace") + for exp_id in range(1, experiment_count + 1): + recorder = self._context_manager.get_context(f"experiment_{exp_id}_recorder") - # todo: analysis multi experiment(get recorder by id) - experiment_name = "workflow" - R.set_uri(Path.joinpath(workspace, "mlruns").as_uri()) - - tasks = [] - for analyser in analysers: - if analyser in self.__ANALYZERS_PROJECT.keys(): - tasks.append( - self.__ANALYZERS_PROJECT.get(analyser)( - recorder=R.get_recorder(experiment_name=experiment_name), output_dir=workspace + tasks = [] + for analyser in analysers: + if analyser in self.__ANALYZERS_PROJECT.keys(): + tasks.append( + self.__ANALYZERS_PROJECT.get(analyser)( + recorder=recorder, output_dir=workspace + ) ) - ) - for task in tasks: - resp = task.analyse() - self._context_manager.set_context(resp, task.__class__.__doc__) + # todo: set by experiment + for task in tasks: + resp = task.analyse() + self._context_manager.set_context(resp, task.__class__.__doc__) return [] @@ -1100,7 +1108,7 @@ class CodeDumpTask(ActionTask): class SummarizeTask(Task): - __DEFAULT_SUMMARIZE_CONTEXT = ["workflow_yaml", "metrics"] + __DEFAULT_SUMMARIZE_CONTEXT = ["metrics"] # TODO: 2048 is close to exceed GPT token limit __MAX_LENGTH_OF_FILE = 2048 @@ -1129,63 +1137,61 @@ class SummarizeTask(Task): def execute(self) -> Any: workspace = self._context_manager.get_context("workspace") user_prompt = self._context_manager.get_context("user_prompt") - workflow_yaml = self._context_manager.get_context("workflow_yaml") file_info = self.get_info_from_file(workspace) context_info = self.get_info_from_context() # too long context make response unstable. - # todo: experiments perhaps have the same name, summarize experiment by loop - record_info = self.get_info_from_recorder(workspace, "workflow") figure_path = self.get_figure_path(workspace) - information = context_info + file_info + record_info - - def _get_value_from_info(info: list, k: str): - for i in information: - if k in i.keys(): - return i.get(k) - return "" - # todo: remove 'be' after test be = APIBackend() be.debug_mode = False - context_summary = {} - for key in self.__DEFAULT_SUMMARIZE_CONTEXT: - prompt_workflow_selection = self.summarize_context_user.render( - key=key, value=_get_value_from_info(info=information, k=key) - ) - response = be.build_messages_and_create_chat_completion( - user_prompt=prompt_workflow_selection, system_prompt=self.summarize_context_system.render() - ) - context_summary.update({key: response}) + def _get_value_from_info(info: list, k: str): + for i in info: + if k in i.keys(): + return i.get(k) + return "" - recorder = R.get_recorder(experiment_name="workflow") - recorder.save_objects(context_summary=context_summary) + experiment_count = self._context_manager.get_context("experiment_count") + for exp_id in range(1, experiment_count + 1): + recorder = self._context_manager.get_context(f"experiment_{exp_id}_recorder") + reason = self._context_manager.get_context(f"experiment_{exp_id}_config_finetune_reason") + workflow_yaml = self._context_manager.get_context(f"workflow_{exp_id}_yaml") + record_info = [{"metrics": recorder.list_metrics()}] - prompt_workflow_selection = self.summarize_metrics_user.render( - information=_get_value_from_info(info=record_info, k="metrics"), user_prompt=user_prompt - ) - metrics_response = be.build_messages_and_create_chat_completion( - user_prompt=prompt_workflow_selection, system_prompt=self.summarize_metrics_system.render() - ) + information = context_info + file_info + record_info + + context_summary = {} + for key in self.__DEFAULT_SUMMARIZE_CONTEXT: + prompt_workflow_selection = self.summarize_context_user.render( + key=key, value=_get_value_from_info(info=information, k=key) + ) + response = be.build_messages_and_create_chat_completion( + user_prompt=prompt_workflow_selection, system_prompt=self.summarize_context_system.render() + ) + context_summary.update({key: response}) + + recorder.save_objects(context_summary=context_summary) + + prompt_workflow_selection = self.summarize_metrics_user.render( + information=_get_value_from_info(info=record_info, k="metrics"), user_prompt=user_prompt + ) + metrics_response = be.build_messages_and_create_chat_completion( + user_prompt=prompt_workflow_selection, system_prompt=self.summarize_metrics_system.render() + ) + + KnowledgeBase().practice_knowledge.add([{"user_intention": user_prompt, "experiment_id": exp_id, + "workflow": workflow_yaml, "reason": reason, + "experiment_metrics": metrics_response}]) prompt_workflow_selection = self.user.render( - information=file_info + [{"metrics": metrics_response}], figure_path=figure_path, user_prompt=user_prompt + information=file_info + KnowledgeBase().practice_knowledge.knowledge[-2:], + figure_path=figure_path, user_prompt=user_prompt ) response = be.build_messages_and_create_chat_completion( user_prompt=prompt_workflow_selection, system_prompt=self.system.render() ) - - KnowledgeBase().practice_knowledge.add( - [{"user_intention": user_prompt, "experiment_metrics": metrics_response}] - ) - - # notes: summarize after all experiment added to KnowledgeBase - topic = Topic(name="rollingModel", describe=Template("What conclusion can you draw")) - topic.summarize(KnowledgeBase().practice_knowledge.knowledge) - self.logger.info(f"Summary of topic: {topic.name}: {topic.knowledge}") - self._context_manager.set_context("summary", response) self.save_markdown(content=response, path=workspace) self.logger.info(f"Report has saved to {self.__DEFAULT_REPORT_NAME}", title="End") @@ -1209,7 +1215,8 @@ class SummarizeTask(Task): result = [] for file in file_list: postfix = file.name.split(".")[-1] - if postfix in ["py", "log", "yaml"]: + # todo: filter file info more reasonable + if postfix in ["py", "log", "yaml"] and file.name.startswith("experiment"): with open(file) as f: content = f.read() self.logger.info(f"file to summarize: {file}", plain=True) diff --git a/qlib/finco/workflow.py b/qlib/finco/workflow.py index 15d0ac34..8f1fd1e0 100644 --- a/qlib/finco/workflow.py +++ b/qlib/finco/workflow.py @@ -1,8 +1,9 @@ import sys import shutil from typing import List + from pathlib import Path -from qlib.finco.task import HighLevelPlanTask, SummarizeTask +from qlib.finco.task import HighLevelPlanTask, SummarizeTask, AnalysisTask from qlib.finco.prompt_template import PromptTemplate, Template from qlib.finco.log import FinCoLog, LogColors from qlib.finco.llm import APIBackend @@ -52,7 +53,7 @@ class WorkflowManager: self.prompt_template = PromptTemplate() self.context = WorkflowContextManager(workspace=self._workspace) - self.default_user_prompt = "Please help me build a low turnover strategy that focus more on longterm return in China A csi300. Please help to use lightgbm model." + self.default_user_prompt = "build an A-share stock market daily portfolio in quantitative investment and minimize the maximum drawdown." def _confirm_and_rm(self): # if workspace exists, please confirm and remove it. Otherwise exit. @@ -114,7 +115,7 @@ class WorkflowManager: self.logger.info(f"user_prompt: {self.get_context().get_context('user_prompt')}", title="Start") # NOTE: list may not be enough for general task list - task_list = [HighLevelPlanTask(), SummarizeTask()] + task_list = [HighLevelPlanTask(), AnalysisTask(), SummarizeTask()] task_finished = [] while len(task_list): task_list_info = [str(task) for task in task_list] @@ -143,7 +144,7 @@ class WorkflowManager: class LearnManager: - __DEFAULT_TOPICS = ["IC", "MaxDropDown"] + __DEFAULT_TOPICS = ["IC", "MaxDropDown", "RollingModel"] def __init__(self): self.epoch = 0 @@ -152,9 +153,7 @@ class LearnManager: self.topics = [ Topic(name=topic, describe=self.wm.prompt_template.get(f"Topic_{topic}")) for topic in self.__DEFAULT_TOPICS ] - self.knowledge_base = KnowledgeBase(workdir=Path.cwd().joinpath("knowledge")) - self.knowledge_base.execute_knowledge.add([]) - self.knowledge_base.query(knowledge_type="infrastructure", content="resolve_path") + self.knowledge_base = KnowledgeBase() def run(self, prompt): # todo: add early stop condition @@ -180,7 +179,8 @@ class LearnManager: user_prompt = self.wm.context.get_context("user_prompt") summary = self.wm.context.get_context("summary") - [topic.summarize(self.knowledge_base.get_knowledge()) for topic in self.topics] + [topic.summarize(self.knowledge_base.practice_knowledge.knowledge[-2:]) for topic in self.topics] + [self.knowledge_base.practice_knowledge.add([{"practice_knowledge": topic.knowledge}]) for topic in self.topics] knowledge_of_topics = [{topic.name: topic.knowledge} for topic in self.topics] for task in task_finished: