Ray:Data
Ray数据集:分布式数据加载和计算
Ray数据集是在Ray库和应用程序中加载和交换数据的标准方式。它们提供基本的分布式数据转换,如映射
、过滤
和重分区
,并与各种文件格式、数据源和分布式框架兼容。
Ray数据集简化了通用GPU和CPU并行计算;例如,GPU批处理推理。在这种尴尬的并行计算情况下,它为Ray任务和角色提供了更高层次的API,内部处理批处理、流水线和内存管理等操作。
作为Ray生态系统的一部分,Ray数据集可以充分利用Ray的分布式调度器的全部功能,例如,使用actor来优化设置时间和GPU调度。
数据集快速启动
Ray数据集实现分布式 Arrow。数据集由Ray对象引用blocks的列表组成。每个block包含Arrow table(创建或转换为表格或张量数据时)、Pandas DataFrame(创建或转换为Pandas数据时)或Python列表(否则)中的一组项。让我们从创建数据集开始。
创建数据集
运行pip install "ray[data]"
开始!
可以从使用ray.data.range()
和ray.data.from_items()
从合成数据创建数据集开始。数据集既可以保存普通的Python对象(即它们的模式是Python类型),也可以保存Arrow记录(在这种情况下它们的模式是Arrow)
1 | import ray |
数据集可以从本地磁盘或远程数据源(如S3)上的文件创建。pyarrow支持的任何文件系统都可以用来指定文件位置:
1 | # Read a directory of files in remote storage. |
最后,你可以从Ray对象存储或兼容Ray的分布式DataFrames中创建一个数据集:
1 | import pandas as pd |
保存数据集
数据集可以使用.write_csv()
、.write_json()
和.write_parquet()
写入本地或远程存储。1
2
3
4
5
6
7# Write to csv files in /tmp/output.
ray.data.range(10000).write_csv("/tmp/output")
# -> /tmp/output/data0.csv, /tmp/output/data1.csv, ...
# Use repartition to control the number of output files:
ray.data.range(10000).repartition(1).write_csv("/tmp/output2")
# -> /tmp/output2/data0.csv
Transforming Datasets
数据集可以使用.map_batches()
进行并行转换。Ray将使用给定的函数转换数据集中的记录批。函数必须返回一批记录。允许您筛选或向批处理中添加其他记录,这将改变数据集的大小。
转换被急切地执行并阻塞,直到操作完成。
1 | def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame: |
批处理格式可以使用batch_format选项指定,默认值为” native “,这意味着支持arrow的批处理使用pandas格式,其他类型使用Python列表。您还可以显式地指定“arrow”或“pandas”来强制转换为该批格式。批量大小也可以选择。如果没有给出,批处理大小将默认为整个块。
数据集还提供了方便的方法map、flat_map和filter,它们不是向量化的(比map_batches慢),但可能对开发有用。
默认情况下,使用Ray任务执行转换。对于需要设置的转换,指定compute=ray.data.ActorPoolStrategy(min, max)
和Ray将使用一个由min到max的角色自动伸缩的角色池来执行你的转换。对于固定大小的actor池,指定ActorPoolStrategy(n, n)
。下面是一个使用Ray Data读取、转换和保存批处理推理结果的端到端示例:
1 | from ray.data import ActorPoolStrategy |
传递和访问数据集
数据集可以传递给Ray任务或角色,并通过.iter_batches()或.iter_rows()访问。这不会导致复制,因为数据集的块是作为Ray对象引用传递的:
1 |
|
数据集可以被分割成不相交的子数据集。如果您向split()函数传递一个actor句柄列表以及所需的分割数量,那么支持位置感知的分割。这是一个常见的模式,用于在分布式训练参与者之间加载和分割数据:
1 |
|