Getting Started with the Ray AI Runtime¶
You can run this notebook directly in
Colab.
For this chapter you will also need to install the following dependencies:
In [ ]:
                Copied!
                
                
            ! pip install "ray[air]==2.2.0" "xgboost-ray>=0.1.10" "xgboost>=1.6.2"
! pip install "numpy>=1.19.5" "pandas>=1.3.5" "pyarrow>=6.0.1" "aiorwlock==1.3.0"
! pip install "ray[air]==2.2.0" "xgboost-ray>=0.1.10" "xgboost>=1.6.2"
! pip install "numpy>=1.19.5" "pandas>=1.3.5" "pyarrow>=6.0.1" "aiorwlock==1.3.0"
    
        








In [ ]:
                Copied!
                
                
            import ray
from ray.data.preprocessors import StandardScaler
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.2)
test_dataset = valid_dataset.drop_columns(cols=["target"])
preprocessor = StandardScaler(columns=["mean radius", "mean texture"])
import ray
from ray.data.preprocessors import StandardScaler
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.2)
test_dataset = valid_dataset.drop_columns(cols=["target"])
preprocessor = StandardScaler(columns=["mean radius", "mean texture"])
    
        In [ ]:
                Copied!
                
                
            # NOTE: Colab does not have enough resources to run this example.
# try using num_workers=1, resources_per_worker={"CPU": 1, "GPU": 0} in your
# ScalingConfig below.
# In any case, this training loop will take considerable time to run.
from ray.air.config import ScalingConfig
from ray.train.xgboost import XGBoostTrainer
trainer = XGBoostTrainer(
    scaling_config=ScalingConfig(
        num_workers=2,
        use_gpu=False,
    ),
    label_column="target",
    num_boost_round=20,
    params={
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    },
    datasets={"train": train_dataset, "valid": valid_dataset},
    preprocessor=preprocessor,
)
result = trainer.fit()
print(result.metrics)
# NOTE: Colab does not have enough resources to run this example.
# try using num_workers=1, resources_per_worker={"CPU": 1, "GPU": 0} in your
# ScalingConfig below.
# In any case, this training loop will take considerable time to run.
from ray.air.config import ScalingConfig
from ray.train.xgboost import XGBoostTrainer
trainer = XGBoostTrainer(
    scaling_config=ScalingConfig(
        num_workers=2,
        use_gpu=False,
    ),
    label_column="target",
    num_boost_round=20,
    params={
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    },
    datasets={"train": train_dataset, "valid": valid_dataset},
    preprocessor=preprocessor,
)
result = trainer.fit()
print(result.metrics)
    
        In [ ]:
                Copied!
                
                
            # NOTE: Colab does not have enough resources to run this example.
from ray import tune
param_space = {"params": {"max_depth": tune.randint(1, 9)}}
metric = "train-logloss"
from ray.tune.tuner import Tuner, TuneConfig
from ray.air.config import RunConfig
tuner = Tuner(
    trainer,
    param_space=param_space,
    run_config=RunConfig(verbose=1),
    tune_config=TuneConfig(num_samples=2, metric=metric, mode="min"),
)
result_grid = tuner.fit()
best_result = result_grid.get_best_result()
print("Best Result:", best_result)
# NOTE: Colab does not have enough resources to run this example.
from ray import tune
param_space = {"params": {"max_depth": tune.randint(1, 9)}}
metric = "train-logloss"
from ray.tune.tuner import Tuner, TuneConfig
from ray.air.config import RunConfig
tuner = Tuner(
    trainer,
    param_space=param_space,
    run_config=RunConfig(verbose=1),
    tune_config=TuneConfig(num_samples=2, metric=metric, mode="min"),
)
result_grid = tuner.fit()
best_result = result_grid.get_best_result()
print("Best Result:", best_result)
    
        In [ ]:
                Copied!
                
                
            checkpoint = best_result.checkpoint
print(checkpoint)
checkpoint = best_result.checkpoint
print(checkpoint)
    
        In [ ]:
                Copied!
                
                
            from ray.train.tensorflow import TensorflowCheckpoint
import tensorflow as tf
model = tf.keras.Sequential([
    tf.keras.layers.InputLayer(input_shape=(1,)),
    tf.keras.layers.Dense(1)
])
keras_checkpoint = TensorflowCheckpoint.from_model(model)
from ray.train.tensorflow import TensorflowCheckpoint
import tensorflow as tf
model = tf.keras.Sequential([
    tf.keras.layers.InputLayer(input_shape=(1,)),
    tf.keras.layers.Dense(1)
])
keras_checkpoint = TensorflowCheckpoint.from_model(model)
    
        In [ ]:
                Copied!
                
                
            from ray.train.batch_predictor import BatchPredictor
from ray.train.xgboost import XGBoostPredictor
checkpoint = best_result.checkpoint
batch_predictor = BatchPredictor.from_checkpoint(checkpoint, XGBoostPredictor)
predicted_probabilities = batch_predictor.predict(test_dataset)
predicted_probabilities.show()
from ray.train.batch_predictor import BatchPredictor
from ray.train.xgboost import XGBoostPredictor
checkpoint = best_result.checkpoint
batch_predictor = BatchPredictor.from_checkpoint(checkpoint, XGBoostPredictor)
predicted_probabilities = batch_predictor.predict(test_dataset)
predicted_probabilities.show()
    
        In [ ]:
                Copied!
                
                
            from ray import serve
from fastapi import Request
import pandas as pd
from ray.serve import PredictorDeployment
async def adapter(request: Request):
    payload = await request.json()
    return pd.DataFrame.from_dict(payload)
serve.start(detached=True)
deployment = PredictorDeployment.options(name="XGBoostService")
deployment.deploy(
    XGBoostPredictor,
    checkpoint,
    http_adapter=adapter
)
print(deployment.url)
from ray import serve
from fastapi import Request
import pandas as pd
from ray.serve import PredictorDeployment
async def adapter(request: Request):
    payload = await request.json()
    return pd.DataFrame.from_dict(payload)
serve.start(detached=True)
deployment = PredictorDeployment.options(name="XGBoostService")
deployment.deploy(
    XGBoostPredictor,
    checkpoint,
    http_adapter=adapter
)
print(deployment.url)
    
        In [ ]:
                Copied!
                
                
            import requests
first_item = test_dataset.take(1)
sample_input = dict(first_item[0])
result = requests.post(
    deployment.url,
    json=[sample_input]
)
print(result.json())
serve.shutdown()
import requests
first_item = test_dataset.take(1)
sample_input = dict(first_item[0])
result = requests.post(
    deployment.url,
    json=[sample_input]
)
print(result.json())
serve.shutdown()
    
        In [ ]:
                Copied!
                
                
            from ray.tune.tuner import Tuner
from ray.train.rl.rl_trainer import RLTrainer
from ray.air.config import RunConfig, ScalingConfig
trainer = RLTrainer(
    run_config=RunConfig(stop={"training_iteration": 5}),
    scaling_config=ScalingConfig(num_workers=2, use_gpu=False),
    algorithm="PPO",
    config={"env": "CartPole-v1"},
)
tuner = Tuner(
    trainer,
    _tuner_kwargs={"checkpoint_at_end": True},
)
result = tuner.fit()[0]
from ray.tune.tuner import Tuner
from ray.train.rl.rl_trainer import RLTrainer
from ray.air.config import RunConfig, ScalingConfig
trainer = RLTrainer(
    run_config=RunConfig(stop={"training_iteration": 5}),
    scaling_config=ScalingConfig(num_workers=2, use_gpu=False),
    algorithm="PPO",
    config={"env": "CartPole-v1"},
)
tuner = Tuner(
    trainer,
    _tuner_kwargs={"checkpoint_at_end": True},
)
result = tuner.fit()[0]
    
        In [ ]:
                Copied!
                
                
            from ray.train.rl.rl_predictor import RLPredictor
from ray.serve import PredictorDeployment
serve.start(detached=True)
deployment = PredictorDeployment.options(name="RLDeployment")
deployment.deploy(RLPredictor, result.checkpoint)
serve.run(
    PredictorDeployment.options(name="RLDeployment").bind(RLPredictor, result.checkpoint)
)
from ray.train.rl.rl_predictor import RLPredictor
from ray.serve import PredictorDeployment
serve.start(detached=True)
deployment = PredictorDeployment.options(name="RLDeployment")
deployment.deploy(RLPredictor, result.checkpoint)
serve.run(
    PredictorDeployment.options(name="RLDeployment").bind(RLPredictor, result.checkpoint)
)
    
        In [ ]:
                Copied!
                
                
            import gym
import requests
num_episodes = 5
env = gym.make("CartPole-v1")
rewards = []
for i in range(num_episodes):
    obs = env.reset()
    reward = 0.0
    done = False
    while not done:
        action = requests.post(
            deployment.url,
            json={"array": obs.tolist()}
        ).json()
        obs, rew, done, _ = env.step(action)
        reward += rew
    rewards.append(reward)
print("Episode rewards:", rewards)
serve.shutdown()
import gym
import requests
num_episodes = 5
env = gym.make("CartPole-v1")
rewards = []
for i in range(num_episodes):
    obs = env.reset()
    reward = 0.0
    done = False
    while not done:
        action = requests.post(
            deployment.url,
            json={"array": obs.tolist()}
        ).json()
        obs, rew, done, _ = env.step(action)
        reward += rew
    rewards.append(reward)
print("Episode rewards:", rewards)
serve.shutdown()