232 строки
7.6 KiB
Python
232 строки
7.6 KiB
Python
#!/usr/bin/env python3
|
|
|
|
'''
|
|
Alternative implementation of self-play using Ray Tune custom schedulers
|
|
'''
|
|
|
|
import argparse
|
|
from collections import defaultdict
|
|
import yaml
|
|
|
|
import ray
|
|
from ray.tune import run_experiments
|
|
from ray.tune.registry import ENV_CREATOR, _global_registry
|
|
from ray.tune.result import TIME_TOTAL_S, TIMESTEPS_TOTAL, EPISODES_TOTAL, TRAINING_ITERATION
|
|
from ray.tune.schedulers import FIFOScheduler, TrialScheduler
|
|
from ray.tune.trial import Trial, Checkpoint
|
|
|
|
import algorithms
|
|
import environments
|
|
|
|
|
|
class SelfPlayTrial:
|
|
|
|
def __init__(self, trial, trial_runner, config):
|
|
self._trial = trial
|
|
self._trial_runner = trial_runner
|
|
self._config = config
|
|
|
|
# Initialize iteration counter
|
|
self._self_play_round = 0
|
|
|
|
# Initialize previous result
|
|
self._last_update = defaultdict(lambda: 0)
|
|
|
|
# Get multiagent config
|
|
multiagent = trial.config["multiagent"]
|
|
|
|
if "policies_to_train" in multiagent:
|
|
self._policy_ids = multiagent["policies_to_train"]
|
|
else:
|
|
self._policy_ids = list(multiagent["policies"].keys())
|
|
|
|
if "initial_policy" in config:
|
|
self._current_policy = self._policy_ids.index(config["initial_policy"])
|
|
else:
|
|
self._current_policy = 0
|
|
|
|
self._is_burn_in = config.get("burn_in", False)
|
|
|
|
# Initialize training policies
|
|
if not self._is_burn_in:
|
|
multiagent["policies_to_train"] = [self._policy_ids[self._current_policy]]
|
|
|
|
def _update(self, result):
|
|
self._self_play_round += 1
|
|
|
|
# Update training policy
|
|
if self._is_burn_in:
|
|
self._is_burn_in = False
|
|
else:
|
|
self._current_policy = (self._current_policy + 1) % len(self._policy_ids)
|
|
|
|
# Update last result
|
|
for key in self._config["round"].keys():
|
|
self._last_update[key] = result[key]
|
|
|
|
# Save current state
|
|
checkpoint = self._trial_runner.trial_executor.save(self._trial, Checkpoint.MEMORY, result=result)
|
|
|
|
# Update config
|
|
self._trial.config["multiagent"]["policies_to_train"] = [self._policy_ids[self._current_policy]]
|
|
|
|
# Reset trial
|
|
self._trial_runner.trial_executor.reset_trial(self._trial, self._trial.config, self._trial.experiment_tag)
|
|
|
|
# Restore
|
|
self._trial_runner.trial_executor.restore(self._trial, checkpoint)
|
|
|
|
def on_result(self, result):
|
|
for key, value in self._config["round"].items():
|
|
if result[key] - self._last_update[key] >= value:
|
|
self._update(result)
|
|
break
|
|
|
|
result["self_play_round"] = self._self_play_round
|
|
|
|
return TrialScheduler.CONTINUE
|
|
|
|
|
|
class SelfPlayScheduler(FIFOScheduler):
|
|
|
|
def __init__(self):
|
|
self._trials = {}
|
|
|
|
def on_trial_add(self, trial_runner, trial):
|
|
if "self_play" in trial.config:
|
|
self._trials[trial] = SelfPlayTrial(trial, trial_runner, trial.config.pop("self_play"))
|
|
|
|
def on_trial_result(self, trial_runner, trial, result):
|
|
return self._trials[trial].on_result(result)
|
|
|
|
def debug_string(self):
|
|
return "Using custom scheduler to implement self play"
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser("Generic training script for any registered multi-agent environment, logs intrinsic reward stats")
|
|
|
|
parser.add_argument("-f", "--config-file", default=None, type=str, action="append",
|
|
help="If specified, use config options from these files.")
|
|
parser.add_argument("--local-dir", type=str, default="../ray_results",
|
|
help="path to save training results")
|
|
parser.add_argument("--num-cpus", type=int, default=7,
|
|
help="the maximum number of CPUs ray is allowed to us, useful for running locally")
|
|
parser.add_argument("--num-gpus", type=int, default=0,
|
|
help="the maximum number of GPUs ray is allowed to use")
|
|
parser.add_argument("--num-workers", type=int, default=0,
|
|
help="the number of parallel workers per experiment")
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def log_intrinsic(info):
|
|
episode = info["episode"]
|
|
batch = info["post_batch"]
|
|
|
|
if "intrinsic_reward" in batch:
|
|
reward = np.sum(batch["intrinsic_reward"])
|
|
|
|
if "intrinsic_reward" not in episode.custom_metrics:
|
|
episode.custom_metrics["intrinsic_reward"] = reward
|
|
else:
|
|
episode.custom_metrics["intrinsic_reward"] += reward
|
|
|
|
|
|
def main(args):
|
|
if args.config_file:
|
|
EXPERIMENTS = dict()
|
|
|
|
for config_file in args.config_file:
|
|
with open(config_file) as f:
|
|
EXPERIMENTS.update(yaml.load(f, Loader=yaml.FullLoader))
|
|
else:
|
|
EXPERIMENTS = {
|
|
"PPO_KeepAway": {
|
|
"run": "PPO",
|
|
"stop": {
|
|
"timesteps_total": 20000,
|
|
},
|
|
"checkpoint_freq": 10,
|
|
"num_samples": 1,
|
|
"config": {
|
|
"self_play": {
|
|
"burn_in": True,
|
|
"round": {
|
|
"timesteps_total": 2000,
|
|
},
|
|
},
|
|
"horizon": 200,
|
|
"env": "roshambo",
|
|
"env_config": {},
|
|
"gamma": 0.99,
|
|
"lambda": 0.95,
|
|
"entropy_coeff": 0.001,
|
|
"clip_param": 0.1,
|
|
"lr": 0.001,
|
|
"num_sgd_iter": 4,
|
|
"train_batch_size": 400,
|
|
"rollout_fragment_length": 200,
|
|
},
|
|
},
|
|
}
|
|
|
|
# Add multiagent configurations
|
|
for experiment in EXPERIMENTS.values():
|
|
exp_config = experiment["config"]
|
|
|
|
# Create temporary env instance to query observation space, action space and number of agents
|
|
env_name = exp_config["env"]
|
|
|
|
if _global_registry.contains(ENV_CREATOR, env_name):
|
|
env_creator = _global_registry.get(ENV_CREATOR, env_name)
|
|
else:
|
|
import gym
|
|
env_creator = lambda env_config: gym.make(env_name)
|
|
|
|
env = env_creator(exp_config.get("env_config", {}))
|
|
|
|
# One policy per agent for multiple individual learners
|
|
policies = dict()
|
|
for pid in env.observation_space_dict.keys():
|
|
policies[f"policy_{pid}"] = (
|
|
None,
|
|
env.observation_space_dict[pid],
|
|
env.action_space_dict[pid],
|
|
{}
|
|
)
|
|
|
|
exp_config["multiagent"] = {"policies": policies,
|
|
"policy_mapping_fn": lambda pid: f"policy_{pid}"}
|
|
|
|
# Set local directory for checkpoints
|
|
experiment["local_dir"] = str(args.local_dir)
|
|
|
|
# Set num workers
|
|
experiment["config"]["num_workers"] = args.num_workers
|
|
|
|
# Add intrinsic reward logging
|
|
experiment["config"]["callbacks"] = {
|
|
"on_postprocess_traj": log_intrinsic
|
|
}
|
|
|
|
# Modify config to reduce TensorFlow thrashing
|
|
experiment["config"]["tf_session_args"] = {
|
|
"intra_op_parallelism_threads": 1,
|
|
"inter_op_parallelism_threads": 1,
|
|
}
|
|
|
|
experiment["config"]["local_tf_session_args"] = {
|
|
"intra_op_parallelism_threads": 1,
|
|
"inter_op_parallelism_threads": 1,
|
|
}
|
|
|
|
# Build scheduler
|
|
scheduler = SelfPlayScheduler()
|
|
|
|
ray.init(num_cpus=args.num_cpus, num_gpus=args.num_gpus)
|
|
run_experiments(EXPERIMENTS, verbose=1, scheduler=scheduler)
|
|
|
|
if __name__ == '__main__':
|
|
args = parse_args()
|
|
main(args) |