Getting Started with the Ray AI Runtime¶
You can run this notebook directly in Colab.
For this chapter you will also need to install the following dependencies:
In [ ]:
Copied!
! pip install "ray[air]==2.2.0" "xgboost-ray>=0.1.10" "xgboost>=1.6.2"
! pip install "numpy>=1.19.5" "pandas>=1.3.5" "pyarrow>=6.0.1" "aiorwlock==1.3.0"
! pip install "ray[air]==2.2.0" "xgboost-ray>=0.1.10" "xgboost>=1.6.2"
! pip install "numpy>=1.19.5" "pandas>=1.3.5" "pyarrow>=6.0.1" "aiorwlock==1.3.0"
In [ ]:
Copied!
import ray
from ray.data.preprocessors import StandardScaler
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.2)
test_dataset = valid_dataset.drop_columns(cols=["target"])
preprocessor = StandardScaler(columns=["mean radius", "mean texture"])
import ray
from ray.data.preprocessors import StandardScaler
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.2)
test_dataset = valid_dataset.drop_columns(cols=["target"])
preprocessor = StandardScaler(columns=["mean radius", "mean texture"])
In [ ]:
Copied!
# NOTE: Colab does not have enough resources to run this example.
# try using num_workers=1, resources_per_worker={"CPU": 1, "GPU": 0} in your
# ScalingConfig below.
# In any case, this training loop will take considerable time to run.
from ray.air.config import ScalingConfig
from ray.train.xgboost import XGBoostTrainer
trainer = XGBoostTrainer(
scaling_config=ScalingConfig(
num_workers=2,
use_gpu=False,
),
label_column="target",
num_boost_round=20,
params={
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
},
datasets={"train": train_dataset, "valid": valid_dataset},
preprocessor=preprocessor,
)
result = trainer.fit()
print(result.metrics)
# NOTE: Colab does not have enough resources to run this example.
# try using num_workers=1, resources_per_worker={"CPU": 1, "GPU": 0} in your
# ScalingConfig below.
# In any case, this training loop will take considerable time to run.
from ray.air.config import ScalingConfig
from ray.train.xgboost import XGBoostTrainer
trainer = XGBoostTrainer(
scaling_config=ScalingConfig(
num_workers=2,
use_gpu=False,
),
label_column="target",
num_boost_round=20,
params={
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
},
datasets={"train": train_dataset, "valid": valid_dataset},
preprocessor=preprocessor,
)
result = trainer.fit()
print(result.metrics)
In [ ]:
Copied!
# NOTE: Colab does not have enough resources to run this example.
from ray import tune
param_space = {"params": {"max_depth": tune.randint(1, 9)}}
metric = "train-logloss"
from ray.tune.tuner import Tuner, TuneConfig
from ray.air.config import RunConfig
tuner = Tuner(
trainer,
param_space=param_space,
run_config=RunConfig(verbose=1),
tune_config=TuneConfig(num_samples=2, metric=metric, mode="min"),
)
result_grid = tuner.fit()
best_result = result_grid.get_best_result()
print("Best Result:", best_result)
# NOTE: Colab does not have enough resources to run this example.
from ray import tune
param_space = {"params": {"max_depth": tune.randint(1, 9)}}
metric = "train-logloss"
from ray.tune.tuner import Tuner, TuneConfig
from ray.air.config import RunConfig
tuner = Tuner(
trainer,
param_space=param_space,
run_config=RunConfig(verbose=1),
tune_config=TuneConfig(num_samples=2, metric=metric, mode="min"),
)
result_grid = tuner.fit()
best_result = result_grid.get_best_result()
print("Best Result:", best_result)
In [ ]:
Copied!
checkpoint = best_result.checkpoint
print(checkpoint)
checkpoint = best_result.checkpoint
print(checkpoint)
In [ ]:
Copied!
from ray.train.tensorflow import TensorflowCheckpoint
import tensorflow as tf
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(1,)),
tf.keras.layers.Dense(1)
])
keras_checkpoint = TensorflowCheckpoint.from_model(model)
from ray.train.tensorflow import TensorflowCheckpoint
import tensorflow as tf
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(1,)),
tf.keras.layers.Dense(1)
])
keras_checkpoint = TensorflowCheckpoint.from_model(model)
In [ ]:
Copied!
from ray.train.batch_predictor import BatchPredictor
from ray.train.xgboost import XGBoostPredictor
checkpoint = best_result.checkpoint
batch_predictor = BatchPredictor.from_checkpoint(checkpoint, XGBoostPredictor)
predicted_probabilities = batch_predictor.predict(test_dataset)
predicted_probabilities.show()
from ray.train.batch_predictor import BatchPredictor
from ray.train.xgboost import XGBoostPredictor
checkpoint = best_result.checkpoint
batch_predictor = BatchPredictor.from_checkpoint(checkpoint, XGBoostPredictor)
predicted_probabilities = batch_predictor.predict(test_dataset)
predicted_probabilities.show()
In [ ]:
Copied!
from ray import serve
from fastapi import Request
import pandas as pd
from ray.serve import PredictorDeployment
async def adapter(request: Request):
payload = await request.json()
return pd.DataFrame.from_dict(payload)
serve.start(detached=True)
deployment = PredictorDeployment.options(name="XGBoostService")
deployment.deploy(
XGBoostPredictor,
checkpoint,
http_adapter=adapter
)
print(deployment.url)
from ray import serve
from fastapi import Request
import pandas as pd
from ray.serve import PredictorDeployment
async def adapter(request: Request):
payload = await request.json()
return pd.DataFrame.from_dict(payload)
serve.start(detached=True)
deployment = PredictorDeployment.options(name="XGBoostService")
deployment.deploy(
XGBoostPredictor,
checkpoint,
http_adapter=adapter
)
print(deployment.url)
In [ ]:
Copied!
import requests
first_item = test_dataset.take(1)
sample_input = dict(first_item[0])
result = requests.post(
deployment.url,
json=[sample_input]
)
print(result.json())
serve.shutdown()
import requests
first_item = test_dataset.take(1)
sample_input = dict(first_item[0])
result = requests.post(
deployment.url,
json=[sample_input]
)
print(result.json())
serve.shutdown()
In [ ]:
Copied!
from ray.tune.tuner import Tuner
from ray.train.rl.rl_trainer import RLTrainer
from ray.air.config import RunConfig, ScalingConfig
trainer = RLTrainer(
run_config=RunConfig(stop={"training_iteration": 5}),
scaling_config=ScalingConfig(num_workers=2, use_gpu=False),
algorithm="PPO",
config={"env": "CartPole-v1"},
)
tuner = Tuner(
trainer,
_tuner_kwargs={"checkpoint_at_end": True},
)
result = tuner.fit()[0]
from ray.tune.tuner import Tuner
from ray.train.rl.rl_trainer import RLTrainer
from ray.air.config import RunConfig, ScalingConfig
trainer = RLTrainer(
run_config=RunConfig(stop={"training_iteration": 5}),
scaling_config=ScalingConfig(num_workers=2, use_gpu=False),
algorithm="PPO",
config={"env": "CartPole-v1"},
)
tuner = Tuner(
trainer,
_tuner_kwargs={"checkpoint_at_end": True},
)
result = tuner.fit()[0]
In [ ]:
Copied!
from ray.train.rl.rl_predictor import RLPredictor
from ray.serve import PredictorDeployment
serve.start(detached=True)
deployment = PredictorDeployment.options(name="RLDeployment")
deployment.deploy(RLPredictor, result.checkpoint)
serve.run(
PredictorDeployment.options(name="RLDeployment").bind(RLPredictor, result.checkpoint)
)
from ray.train.rl.rl_predictor import RLPredictor
from ray.serve import PredictorDeployment
serve.start(detached=True)
deployment = PredictorDeployment.options(name="RLDeployment")
deployment.deploy(RLPredictor, result.checkpoint)
serve.run(
PredictorDeployment.options(name="RLDeployment").bind(RLPredictor, result.checkpoint)
)
In [ ]:
Copied!
import gym
import requests
num_episodes = 5
env = gym.make("CartPole-v1")
rewards = []
for i in range(num_episodes):
obs = env.reset()
reward = 0.0
done = False
while not done:
action = requests.post(
deployment.url,
json={"array": obs.tolist()}
).json()
obs, rew, done, _ = env.step(action)
reward += rew
rewards.append(reward)
print("Episode rewards:", rewards)
serve.shutdown()
import gym
import requests
num_episodes = 5
env = gym.make("CartPole-v1")
rewards = []
for i in range(num_episodes):
obs = env.reset()
reward = 0.0
done = False
while not done:
action = requests.post(
deployment.url,
json={"array": obs.tolist()}
).json()
obs, rew, done, _ = env.step(action)
reward += rew
rewards.append(reward)
print("Episode rewards:", rewards)
serve.shutdown()