Hyperparameter Optimization with Ray Tune¶
You can run this notebook directly in
Colab.
For this chapter you need to install the following dependencies:
In [ ]:
                Copied!
                
                
            ! pip install "ray[tune]==2.2.0"
! pip install "hyperopt==0.2.7"
! pip install "bayesian-optimization==1.3.1"
! pip install "tensorflow>=2.9.0"
! pip install "ray[tune]==2.2.0"
! pip install "hyperopt==0.2.7"
! pip install "bayesian-optimization==1.3.1"
! pip install "tensorflow>=2.9.0"
    
        To import utility files for this chapter, on Colab you will also have to clone the repo and copy the code files to the base path of the runtime:
In [ ]:
                Copied!
                
                
            !git clone https://github.com/maxpumperla/learning_ray
%cp -r learning_ray/notebooks/* .
!git clone https://github.com/maxpumperla/learning_ray
%cp -r learning_ray/notebooks/* .
    
        In [ ]:
                Copied!
                
                
            from maze_gym_env import Environment
import time
import numpy as np
class Policy:
    def __init__(self, env):
        """A Policy suggests actions based on the current state.
        We do this by tracking the value of each state-action pair.
        """
        self.state_action_table = [
            [0 for _ in range(env.action_space.n)]
            for _ in range(env.observation_space.n)
        ]
        self.action_space = env.action_space
    def get_action(self, state, explore=True, epsilon=0.1):
        """Explore randomly or exploit the best value currently available."""
        if explore and random.uniform(0, 1) < epsilon:
            return self.action_space.sample()
        return np.argmax(self.state_action_table[state])
class Simulation(object):
    def __init__(self, env):
        """Simulates rollouts of an environment, given a policy to follow."""
        self.env = env
    def rollout(self, policy, render=False, explore=True, epsilon=0.1):
        """Returns experiences for a policy rollout."""
        experiences = []
        state = self.env.reset()
        done = False
        while not done:
            action = policy.get_action(state, explore, epsilon)
            next_state, reward, done, info = self.env.step(action)
            experiences.append([state, action, reward, next_state])
            state = next_state
            if render:
                time.sleep(0.05)
                self.env.render()
        return experiences
def update_policy(policy, experiences, weight=0.1, discount_factor=0.9):
    """Updates a given policy with a list of (state, action, reward, state)
    experiences."""
    for state, action, reward, next_state in experiences:
        next_max = np.max(policy.state_action_table[next_state])
        value = policy.state_action_table[state][action]
        new_value = (1 - weight) * value + weight * \
                    (reward + discount_factor * next_max)
        policy.state_action_table[state][action] = new_value
def train_policy(env, num_episodes=10000, weight=0.1, discount_factor=0.9):
    """Training a policy by updating it with rollout experiences."""
    policy = Policy(env)
    sim = Simulation(env)
    for _ in range(num_episodes):
        experiences = sim.rollout(policy)
        update_policy(policy, experiences, weight, discount_factor)
    return policy
def evaluate_policy(env, policy, num_episodes=10):
    """Evaluate a trained policy through rollouts."""
    simulation = Simulation(env)
    steps = 0
    for _ in range(num_episodes):
        experiences = simulation.rollout(policy, render=True, explore=False)
        steps += len(experiences)
    print(f"{steps / num_episodes} steps on average "
          f"for a total of {num_episodes} episodes.")
    return steps / num_episodes
from maze_gym_env import Environment
import time
import numpy as np
class Policy:
    def __init__(self, env):
        """A Policy suggests actions based on the current state.
        We do this by tracking the value of each state-action pair.
        """
        self.state_action_table = [
            [0 for _ in range(env.action_space.n)]
            for _ in range(env.observation_space.n)
        ]
        self.action_space = env.action_space
    def get_action(self, state, explore=True, epsilon=0.1):
        """Explore randomly or exploit the best value currently available."""
        if explore and random.uniform(0, 1) < epsilon:
            return self.action_space.sample()
        return np.argmax(self.state_action_table[state])
class Simulation(object):
    def __init__(self, env):
        """Simulates rollouts of an environment, given a policy to follow."""
        self.env = env
    def rollout(self, policy, render=False, explore=True, epsilon=0.1):
        """Returns experiences for a policy rollout."""
        experiences = []
        state = self.env.reset()
        done = False
        while not done:
            action = policy.get_action(state, explore, epsilon)
            next_state, reward, done, info = self.env.step(action)
            experiences.append([state, action, reward, next_state])
            state = next_state
            if render:
                time.sleep(0.05)
                self.env.render()
        return experiences
def update_policy(policy, experiences, weight=0.1, discount_factor=0.9):
    """Updates a given policy with a list of (state, action, reward, state)
    experiences."""
    for state, action, reward, next_state in experiences:
        next_max = np.max(policy.state_action_table[next_state])
        value = policy.state_action_table[state][action]
        new_value = (1 - weight) * value + weight * \
                    (reward + discount_factor * next_max)
        policy.state_action_table[state][action] = new_value
def train_policy(env, num_episodes=10000, weight=0.1, discount_factor=0.9):
    """Training a policy by updating it with rollout experiences."""
    policy = Policy(env)
    sim = Simulation(env)
    for _ in range(num_episodes):
        experiences = sim.rollout(policy)
        update_policy(policy, experiences, weight, discount_factor)
    return policy
def evaluate_policy(env, policy, num_episodes=10):
    """Evaluate a trained policy through rollouts."""
    simulation = Simulation(env)
    steps = 0
    for _ in range(num_episodes):
        experiences = simulation.rollout(policy, render=True, explore=False)
        steps += len(experiences)
    print(f"{steps / num_episodes} steps on average "
          f"for a total of {num_episodes} episodes.")
    return steps / num_episodes
    
        
In [ ]:
                Copied!
                
                
            import random
search_space = []
for i in range(10):
    random_choice = {
        'weight': random.uniform(0, 1),
        'discount_factor': random.uniform(0, 1)
    }
    search_space.append(random_choice)
import random
search_space = []
for i in range(10):
    random_choice = {
        'weight': random.uniform(0, 1),
        'discount_factor': random.uniform(0, 1)
    }
    search_space.append(random_choice)
    
        In [ ]:
                Copied!
                
                
            import ray
@ray.remote
def objective(config):
    environment = Environment()
    policy = train_policy(
        environment,
        weight=config["weight"],
        discount_factor=config["discount_factor"]
    )
    score = evaluate_policy(environment, policy)
    return [score, config]
import ray
@ray.remote
def objective(config):
    environment = Environment()
    policy = train_policy(
        environment,
        weight=config["weight"],
        discount_factor=config["discount_factor"]
    )
    score = evaluate_policy(environment, policy)
    return [score, config]
    
        In [ ]:
                Copied!
                
                
            result_objects = [objective.remote(choice) for choice in search_space]
results = ray.get(result_objects)
results.sort(key=lambda x: x[0])
print(results[-1])
result_objects = [objective.remote(choice) for choice in search_space]
results = ray.get(result_objects)
results.sort(key=lambda x: x[0])
print(results[-1])
    
        In [ ]:
                Copied!
                
                
            from ray import tune
search_space = {
    "weight": tune.uniform(0, 1),
    "discount_factor": tune.uniform(0, 1),
}
from ray import tune
search_space = {
    "weight": tune.uniform(0, 1),
    "discount_factor": tune.uniform(0, 1),
}
    
        In [ ]:
                Copied!
                
                
            def tune_objective(config):
    environment = Environment()
    policy = train_policy(
        environment,
        weight=config["weight"],
        discount_factor=config["discount_factor"]
    )
    score = evaluate_policy(environment, policy)
    return {"score": score}
def tune_objective(config):
    environment = Environment()
    policy = train_policy(
        environment,
        weight=config["weight"],
        discount_factor=config["discount_factor"]
    )
    score = evaluate_policy(environment, policy)
    return {"score": score}
    
        In [ ]:
                Copied!
                
                
            analysis = tune.run(tune_objective, config=search_space)
print(analysis.get_best_config(metric="score", mode="min"))
analysis = tune.run(tune_objective, config=search_space)
print(analysis.get_best_config(metric="score", mode="min"))
    
        In [ ]:
                Copied!
                
                
            from ray.tune.suggest.bayesopt import BayesOptSearch
algo = BayesOptSearch(random_search_steps=4)
tune.run(
    tune_objective,
    config=search_space,
    metric="score",
    mode="min",
    search_alg=algo,
    stop={"training_iteration": 10},
)
from ray.tune.suggest.bayesopt import BayesOptSearch
algo = BayesOptSearch(random_search_steps=4)
tune.run(
    tune_objective,
    config=search_space,
    metric="score",
    mode="min",
    search_alg=algo,
    stop={"training_iteration": 10},
)
    
        In [ ]:
                Copied!
                
                
            def objective(config):
    for step in range(30):
        score = config["weight"] * (step ** 0.5) + config["bias"]
        tune.report(score=score)
search_space = {"weight": tune.uniform(0, 1), "bias": tune.uniform(0, 1)}
def objective(config):
    for step in range(30):
        score = config["weight"] * (step ** 0.5) + config["bias"]
        tune.report(score=score)
search_space = {"weight": tune.uniform(0, 1), "bias": tune.uniform(0, 1)}
    
        In [ ]:
                Copied!
                
                
            from ray.tune.schedulers import HyperBandScheduler
scheduler = HyperBandScheduler(metric="score", mode="min")
analysis = tune.run(
    objective,
    config=search_space,
    scheduler=scheduler,
    num_samples=10,
)
print(analysis.get_best_config(metric="score", mode="min"))
from ray.tune.schedulers import HyperBandScheduler
scheduler = HyperBandScheduler(metric="score", mode="min")
analysis = tune.run(
    objective,
    config=search_space,
    scheduler=scheduler,
    num_samples=10,
)
print(analysis.get_best_config(metric="score", mode="min"))
    
        In [ ]:
                Copied!
                
                
            # NOTE: in the book we have 0.5 GPUs, but set this to 0 here so that it runs on Colab.
from ray import tune
tune.run(
    objective,
    config=search_space,
    num_samples=10,
    resources_per_trial={"cpu": 2, "gpu": 0}
)
# NOTE: in the book we have 0.5 GPUs, but set this to 0 here so that it runs on Colab.
from ray import tune
tune.run(
    objective,
    config=search_space,
    num_samples=10,
    resources_per_trial={"cpu": 2, "gpu": 0}
)
    
        In [ ]:
                Copied!
                
                
            from ray import tune
from ray.tune import Callback
from ray.tune.logger import pretty_print
class PrintResultCallback(Callback):
    def on_trial_result(self, iteration, trials, trial, result, **info):
        print(f"Trial {trial} in iteration {iteration}, "
              f"got result: {result['score']}")
def objective(config):
    for step in range(30):
        score = config["weight"] * (step ** 0.5) + config["bias"]
        tune.report(score=score, step=step, more_metrics={})
from ray import tune
from ray.tune import Callback
from ray.tune.logger import pretty_print
class PrintResultCallback(Callback):
    def on_trial_result(self, iteration, trials, trial, result, **info):
        print(f"Trial {trial} in iteration {iteration}, "
              f"got result: {result['score']}")
def objective(config):
    for step in range(30):
        score = config["weight"] * (step ** 0.5) + config["bias"]
        tune.report(score=score, step=step, more_metrics={})
    
        In [ ]:
                Copied!
                
                
            search_space = {"weight": tune.uniform(0, 1), "bias": tune.uniform(0, 1)}
analysis = tune.run(
    objective,
    config=search_space,
    mode="min",
    metric="score",
    callbacks=[PrintResultCallback()])
best = analysis.best_trial
print(pretty_print(best.last_result))
search_space = {"weight": tune.uniform(0, 1), "bias": tune.uniform(0, 1)}
analysis = tune.run(
    objective,
    config=search_space,
    mode="min",
    metric="score",
    callbacks=[PrintResultCallback()])
best = analysis.best_trial
print(pretty_print(best.last_result))
    
        In [ ]:
                Copied!
                
                
            # NOTE: this will only run if you insert a correct logdir.
analysis = tune.run(
    objective,
    name="<your-logdir>",
    resume=True,
    config=search_space)
# NOTE: this will only run if you insert a correct logdir.
analysis = tune.run(
    objective,
    name="",
    resume=True,
    config=search_space) 
    
        In [ ]:
                Copied!
                
                
            tune.run(
    objective,
    config=search_space,
    stop={"training_iteration": 10})
tune.run(
    objective,
    config=search_space,
    stop={"training_iteration": 10})
    
        In [ ]:
                Copied!
                
                
            def stopper(trial_id, result):
    return result["score"] < 2
tune.run(
    objective,
    config=search_space,
    stop=stopper)
def stopper(trial_id, result):
    return result["score"] < 2
tune.run(
    objective,
    config=search_space,
    stop=stopper)
    
        In [ ]:
                Copied!
                
                
            from ray import tune
import numpy as np
search_space = {
    "weight": tune.sample_from(
        lambda context: np.random.uniform(low=0.0, high=1.0)
    ),
    "bias": tune.sample_from(
        lambda context: context.config.weight * np.random.normal()
    )}
tune.run(objective, config=search_space)
from ray import tune
import numpy as np
search_space = {
    "weight": tune.sample_from(
        lambda context: np.random.uniform(low=0.0, high=1.0)
    ),
    "bias": tune.sample_from(
        lambda context: context.config.weight * np.random.normal()
    )}
tune.run(objective, config=search_space)
    
        In [ ]:
                Copied!
                
                
            # NOTE: this run will take incredibly long on Colab, be warned!
from ray import tune
analysis = tune.run(
    "DQN",
    metric="episode_reward_mean",
    mode="max",
    config={
        "env": "CartPole-v1",
        "lr": tune.uniform(1e-5, 1e-4),
        "train_batch_size": tune.choice([10000, 20000, 40000]),
    },
)
# NOTE: this run will take incredibly long on Colab, be warned!
from ray import tune
analysis = tune.run(
    "DQN",
    metric="episode_reward_mean",
    mode="max",
    config={
        "env": "CartPole-v1",
        "lr": tune.uniform(1e-5, 1e-4),
        "train_batch_size": tune.choice([10000, 20000, 40000]),
    },
)
    
        
In [ ]:
                Copied!
                
                
            from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical
def load_data():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    num_classes = 10
    x_train, x_test = x_train / 255.0, x_test / 255.0
    y_train = to_categorical(y_train, num_classes)
    y_test = to_categorical(y_test, num_classes)
    return (x_train, y_train), (x_test, y_test)
load_data()
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical
def load_data():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    num_classes = 10
    x_train, x_test = x_train / 255.0, x_test / 255.0
    y_train = to_categorical(y_train, num_classes)
    y_test = to_categorical(y_test, num_classes)
    return (x_train, y_train), (x_test, y_test)
load_data()
    
        In [ ]:
                Copied!
                
                
            from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense, Dropout
from ray.tune.integration.keras import TuneReportCallback
def objective(config):
    (x_train, y_train), (x_test, y_test) = load_data()
    model = Sequential()
    model.add(Flatten(input_shape=(28, 28)))
    model.add(Dense(config["hidden"], activation=config["activation"]))
    model.add(Dropout(config["rate"]))
    model.add(Dense(10, activation="softmax"))
    model.compile(loss="categorical_crossentropy", metrics=["accuracy"])
    model.fit(x_train, y_train, batch_size=128, epochs=10,
              validation_data=(x_test, y_test),
              callbacks=[TuneReportCallback({"mean_accuracy": "accuracy"})])
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense, Dropout
from ray.tune.integration.keras import TuneReportCallback
def objective(config):
    (x_train, y_train), (x_test, y_test) = load_data()
    model = Sequential()
    model.add(Flatten(input_shape=(28, 28)))
    model.add(Dense(config["hidden"], activation=config["activation"]))
    model.add(Dropout(config["rate"]))
    model.add(Dense(10, activation="softmax"))
    model.compile(loss="categorical_crossentropy", metrics=["accuracy"])
    model.fit(x_train, y_train, batch_size=128, epochs=10,
              validation_data=(x_test, y_test),
              callbacks=[TuneReportCallback({"mean_accuracy": "accuracy"})])
    
        In [ ]:
                Copied!
                
                
            from ray import tune
from ray.tune.suggest.hyperopt import HyperOptSearch
initial_params = [{"rate": 0.2, "hidden": 128, "activation": "relu"}]
algo = HyperOptSearch(points_to_evaluate=initial_params)
search_space = {
    "rate": tune.uniform(0.1, 0.5),
    "hidden": tune.randint(32, 512),
    "activation": tune.choice(["relu", "tanh"])
}
analysis = tune.run(
    objective,
    name="keras_hyperopt_exp",
    search_alg=algo,
    metric="mean_accuracy",
    mode="max",
    stop={"mean_accuracy": 0.99},
    num_samples=10,
    config=search_space,
)
print("Best hyperparameters found were: ", analysis.best_config)
from ray import tune
from ray.tune.suggest.hyperopt import HyperOptSearch
initial_params = [{"rate": 0.2, "hidden": 128, "activation": "relu"}]
algo = HyperOptSearch(points_to_evaluate=initial_params)
search_space = {
    "rate": tune.uniform(0.1, 0.5),
    "hidden": tune.randint(32, 512),
    "activation": tune.choice(["relu", "tanh"])
}
analysis = tune.run(
    objective,
    name="keras_hyperopt_exp",
    search_alg=algo,
    metric="mean_accuracy",
    mode="max",
    stop={"mean_accuracy": 0.99},
    num_samples=10,
    config=search_space,
)
print("Best hyperparameters found were: ", analysis.best_config)