Ray:Data

Ray:Data

四月 07, 2022

Ray数据集:分布式数据加载和计算

Ray数据集是在Ray库和应用程序中加载和交换数据的标准方式。它们提供基本的分布式数据转换,如映射过滤重分区,并与各种文件格式、数据源和分布式框架兼容。

Ray数据集简化了通用GPU和CPU并行计算;例如,GPU批处理推理。在这种尴尬的并行计算情况下,它为Ray任务和角色提供了更高层次的API,内部处理批处理、流水线和内存管理等操作。

Ray_data_head

作为Ray生态系统的一部分,Ray数据集可以充分利用Ray的分布式调度器的全部功能,例如,使用actor来优化设置时间和GPU调度。

数据集快速启动

Ray数据集实现分布式 Arrow。数据集由Ray对象引用blocks的列表组成。每个block包含Arrow table(创建或转换为表格或张量数据时)、Pandas DataFrame(创建或转换为Pandas数据时)或Python列表(否则)中的一组项。让我们从创建数据集开始。

创建数据集

运行pip install "ray[data]"开始!

可以从使用ray.data.range()ray.data.from_items()从合成数据创建数据集开始。数据集既可以保存普通的Python对象(即它们的模式是Python类型),也可以保存Arrow记录(在这种情况下它们的模式是Arrow)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import ray

# Create a Dataset of Python objects.
ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

ds.take(5)
# -> [0, 1, 2, 3, 4]

ds.count()
# -> 10000

# Create a Dataset of Arrow records.
ds = ray.data.from_items([{"col1": i, "col2": str(i)} for i in range(10000)])
# -> Dataset(num_blocks=200, num_rows=10000, schema={col1: int64, col2: string})

ds.show(5)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
# -> {'col1': 3, 'col2': '3'}
# -> {'col1': 4, 'col2': '4'}

ds.schema()
# -> col1: int64
# -> col2: string

数据集可以从本地磁盘或远程数据源(如S3)上的文件创建。pyarrow支持的任何文件系统都可以用来指定文件位置:

1
2
3
4
5
6
7
8
# Read a directory of files in remote storage.
ds = ray.data.read_csv("s3://bucket/path")

# Read multiple local files.
ds = ray.data.read_csv(["/path/to/file1", "/path/to/file2"])

# Read multiple directories.
ds = ray.data.read_csv(["s3://bucket/path1", "s3://bucket/path2"])

最后,你可以从Ray对象存储或兼容Ray的分布式DataFrames中创建一个数据集:

1
2
3
4
5
6
7
8
9
10
11
import pandas as pd
import dask.dataframe as dd

# Create a Dataset from a list of Pandas DataFrame objects.
pdf = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
ds = ray.data.from_pandas([pdf])

# Create a Dataset from a Dask-on-Ray DataFrame.
dask_df = dd.from_pandas(pdf, npartitions=10)
ds = ray.data.from_dask(dask_df)

保存数据集

数据集可以使用.write_csv().write_json().write_parquet()写入本地或远程存储。

1
2
3
4
5
6
7
# Write to csv files in /tmp/output.
ray.data.range(10000).write_csv("/tmp/output")
# -> /tmp/output/data0.csv, /tmp/output/data1.csv, ...

# Use repartition to control the number of output files:
ray.data.range(10000).repartition(1).write_csv("/tmp/output2")
# -> /tmp/output2/data0.csv

Transforming Datasets

数据集可以使用.map_batches()进行并行转换。Ray将使用给定的函数转换数据集中的记录批。函数必须返回一批记录。允许您筛选或向批处理中添加其他记录,这将改变数据集的大小。
转换被急切地执行并阻塞,直到操作完成。

1
2
3
4
5
6
7
8
9
def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame:
return df.applymap(lambda x: x * 2)

ds = ray.data.range_arrow(10000)
ds = ds.map_batches(transform_batch, batch_format="pandas")
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1927.62it/s]
ds.take(5)
# -> [{'value': 0}, {'value': 2}, ...]

批处理格式可以使用batch_format选项指定,默认值为” native “,这意味着支持arrow的批处理使用pandas格式,其他类型使用Python列表。您还可以显式地指定“arrow”或“pandas”来强制转换为该批格式。批量大小也可以选择。如果没有给出,批处理大小将默认为整个块。

数据集还提供了方便的方法map、flat_map和filter,它们不是向量化的(比map_batches慢),但可能对开发有用。

默认情况下,使用Ray任务执行转换。对于需要设置的转换,指定compute=ray.data.ActorPoolStrategy(min, max)和Ray将使用一个由min到max的角色自动伸缩的角色池来执行你的转换。对于固定大小的actor池,指定ActorPoolStrategy(n, n)。下面是一个使用Ray Data读取、转换和保存批处理推理结果的端到端示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from ray.data import ActorPoolStrategy

# Example of GPU batch inference on an ImageNet model.
def preprocess(images: List[bytes]) -> List[bytes]:
return images

class BatchInferModel:
def __init__(self):
self.model = ImageNetModel()
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
return self.model(batch)

ds = ray.data.read_binary_files("s3://bucket/image-dir")

# Preprocess the data.
ds = ds.map_batches(preprocess)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]

# Apply GPU batch inference with actors, and assign each actor a GPU using
# ``num_gpus=1`` (any Ray remote decorator argument can be used here).
ds = ds.map_batches(
BatchInferModel, compute=ActorPoolStrategy(10, 20),
batch_size=256, num_gpus=1)
# -> Map Progress (16 actors 4 pending): 100%|██████| 200/200 [00:07, 27.60it/s]

# Save the results.
ds.repartition(1).write_json("s3://bucket/inference-results")

传递和访问数据集

数据集可以传递给Ray任务或角色,并通过.iter_batches()或.iter_rows()访问。这不会导致复制,因为数据集的块是作为Ray对象引用传递的:

1
2
3
4
5
6
7
8
9
10
11
@ray.remote
def consume(data: Dataset[int]) -> int:
num_batches = 0
for batch in data.iter_batches():
num_batches += 1
return num_batches

ds = ray.data.range(10000)
ray.get(consume.remote(ds))
# -> 200

数据集可以被分割成不相交的子数据集。如果您向split()函数传递一个actor句柄列表以及所需的分割数量,那么支持位置感知的分割。这是一个常见的模式,用于在分布式训练参与者之间加载和分割数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@ray.remote(num_gpus=1)
class Worker:
def __init__(self, rank: int):
pass

def train(self, shard: ray.data.Dataset[int]) -> int:
for batch in shard.iter_batches(batch_size=256):
pass
return shard.count()

workers = [Worker.remote(i) for i in range(16)]
# -> [Actor(Worker, ...), Actor(Worker, ...), ...]

ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

shards = ds.split(n=16, locality_hints=workers)
# -> [Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>),
# Dataset(num_blocks=13, num_rows=650, schema=<class 'int'>), ...]

ray.get([w.train.remote(s) for w, s in zip(workers, shards)])
# -> [650, 650, ...]