注意力
2024 年 6 月状态更新:删除 DataPipes 和 DataLoader V2
我们将 torchdata 存储库重新调整为torch.utils.data.DataLoader的迭代增强。我们不打算 继续开发或维护 [DataPipes] 和 [DataLoaderV2] 解决方案,它们将从 torchdata 存储库。我们还将重新访问 pytorch/pytorch 中的 DataPipes 引用。在 torchdata==0.8.0(2024 年 7 月)版本中,它们将被标记为已弃用,而在 0.9.0(2024 年 10 月)中,它们将被删除。现存 建议用户固定到 torchdata==0.8.0 或更旧版本,直到他们能够迁移出去。随后的 版本将不包含 DataPipes 或 DataLoaderV2。 如果您有建议或评论,请联系我们(请使用此问题进行反馈)
DataPipe 教程¶
使用 DataPipes¶
假设我们想通过以下步骤从 CSV 文件加载数据:
列出目录中的所有 CSV 文件
加载 CSV 文件
解析 CSV 文件并生成行
将我们的数据集拆分为训练集和验证集
有一些内置的 DataPipes 可以帮助我们完成上述操作。
FileLister
- 列出目录中的文件Filter
- 根据给定的 功能FileOpener
- 使用文件路径并返回打开的文件 流CSVParser
- 使用文件流,解析 CSV 内容,并在 时间RandomSplitter
- 将源 DataPipe 中的样本随机拆分为 组
例如,的源代码如下所示:CSVParser
@functional_datapipe("parse_csv")
class CSVParserIterDataPipe(IterDataPipe):
def __init__(self, dp, **fmtparams) -> None:
self.dp = dp
self.fmtparams = fmtparams
def __iter__(self) -> Iterator[Union[Str_Or_Bytes, Tuple[str, Str_Or_Bytes]]]:
for path, file in self.source_datapipe:
stream = self._helper.skip_lines(file)
stream = self._helper.strip_newline(stream)
stream = self._helper.decode(stream)
yield from self._helper.return_path(stream, path=path) # Returns 1 line at a time as List[str or bytes]
如另一节所述,DataPipes 可以使用其函数形式(推荐)或 类构造函数。管道可以按以下方式组装:
import torchdata.datapipes as dp
FOLDER = 'path/2/csv/folder'
datapipe = dp.iter.FileLister([FOLDER]).filter(filter_fn=lambda filename: filename.endswith('.csv'))
datapipe = dp.iter.FileOpener(datapipe, mode='rt')
datapipe = datapipe.parse_csv(delimiter=',')
N_ROWS = 10000 # total number of rows of data
train, valid = datapipe.random_split(total_length=N_ROWS, weights={"train": 0.5, "valid": 0.5}, seed=0)
for x in train: # Iterating through the training dataset
pass
for y in valid: # Iterating through the validation dataset
pass
您可以在此处找到内置 IterDataPipes 的完整列表,在此处找到 MapDataPipes 的完整列表。
使用 DataLoader¶
在本节中,我们将演示如何使用 。
在大多数情况下,您应该能够通过作为输入参数传递来使用它
到 .有关的详细文档 ,
请访问此 PyTorch Core 页面。DataPipe
DataLoader
dataset=datapipe
DataLoader
DataLoader
请参阅此页面,了解如何与 一起使用。DataPipe
DataLoader2
在此示例中,我们首先使用一个辅助函数,该函数生成一些具有随机标签和数据的 CSV 文件。
import csv
import random
def generate_csv(file_label, num_rows: int = 5000, num_features: int = 20) -> None:
fieldnames = ['label'] + [f'c{i}' for i in range(num_features)]
writer = csv.DictWriter(open(f"sample_data{file_label}.csv", "w", newline=''), fieldnames=fieldnames)
writer.writeheader()
for i in range(num_rows):
row_data = {col: random.random() for col in fieldnames}
row_data['label'] = random.randint(0, 9)
writer.writerow(row_data)
接下来,我们将构建 DataPipes 来读取和解析生成的 CSV 文件。请注意,我们更喜欢 将定义的函数传递给 DataPipes 而不是 lambda 函数,因为前者可以使用 pickle 序列化。
import numpy as np
import torchdata.datapipes as dp
def filter_for_data(filename):
return "sample_data" in filename and filename.endswith(".csv")
def row_processor(row):
return {"label": np.array(row[0], np.int32), "data": np.array(row[1:], dtype=np.float64)}
def build_datapipes(root_dir="."):
datapipe = dp.iter.FileLister(root_dir)
datapipe = datapipe.filter(filter_fn=filter_for_data)
datapipe = datapipe.open_files(mode='rt')
datapipe = datapipe.parse_csv(delimiter=",", skip_lines=1)
# Shuffle will happen as long as you do NOT set `shuffle=False` later in the DataLoader
datapipe = datapipe.shuffle()
datapipe = datapipe.map(row_processor)
return datapipe
最后,我们将所有内容放在一起,并将 DataPipe 传递到 DataLoader 中。请注意,
如果您选择使用 while 设置为 DataLoader,则您的样本将为
batched 多次。您应该选择一个或另一个。'__main__'
Batcher
batch_size > 1
from torch.utils.data import DataLoader
if __name__ == '__main__':
num_files_to_generate = 3
for i in range(num_files_to_generate):
generate_csv(file_label=i, num_rows=10, num_features=3)
datapipe = build_datapipes()
dl = DataLoader(dataset=datapipe, batch_size=5, num_workers=2)
first = next(iter(dl))
labels, features = first['label'], first['data']
print(f"Labels batch shape: {labels.size()}")
print(f"Feature batch shape: {features.size()}")
print(f"{labels = }\n{features = }")
n_sample = 0
for row in iter(dl):
n_sample += 1
print(f"{n_sample = }")
将打印以下语句以显示单个标签和特征的形状。
Labels batch shape: torch.Size([5])
Feature batch shape: torch.Size([5, 3])
labels = tensor([8, 9, 5, 9, 7], dtype=torch.int32)
features = tensor([[0.2867, 0.5973, 0.0730],
[0.7890, 0.9279, 0.7392],
[0.8930, 0.7434, 0.0780],
[0.8225, 0.4047, 0.0800],
[0.1655, 0.0323, 0.5561]], dtype=torch.float64)
n_sample = 12
原因是因为 () 没有被使用,使得
每个 worker 将独立返回所有样本。在这种情况下,每个文件有 10 行和 3 个文件,其中
batch 大小为 5,则每个 worker 有 6 个批次。使用 2 个工作线程,我们从 中获得总共 12 个批次。n_sample = 12
ShardingFilter
datapipe.sharding_filter()
DataLoader
为了使 DataPipe 分片能够正常工作,我们需要添加以下内容。DataLoader
def build_datapipes(root_dir="."):
datapipe = ...
# Add the following line to `build_datapipes`
# Note that it is somewhere after `Shuffler` in the DataPipe line, but before expensive operations
datapipe = datapipe.sharding_filter()
return datapipe
当我们重新运行时,我们将获得:
...
n_sample = 6
注意:
尽早将 () 放置在管道中,尤其是在昂贵的 操作,以避免在 worker/分布式进程之间重复这些昂贵的操作。
ShardingFilter
datapipe.sharding_filter
对于需要分片的数据源,之前添加以确保数据在拆分为分片之前进行全局混洗至关重要。否则,每个 worker 进程将 始终为所有 epoch 处理相同的数据分片。而且,这意味着每个批次将仅包含数据 ,这会导致训练期间的准确率较低。但是,它不适用于数据 source 中已为每个多/分布式进程分片的 source 的 SOURCE,因为 no 更长的时间需要呈现在管道中。
Shuffler
ShardingFilter
ShardingFilter
在某些情况下,在管道中较早放置会导致性能变差,因为某些 使用 Sequential Reading 时,操作(例如解压缩)会更快。在这些情况下,我们建议解压缩 随机排序之前的文件(可能在加载任何数据之前)。
Shuffler
您可以在此页面上找到各种研究领域的更多 DataPipe 实现示例。
实现自定义 DataPipe¶
目前,我们已经有大量的内置 DataPipe,我们希望它们能够覆盖最必要的 数据处理操作。如果它们都不支持您的需求,您可以创建自己的自定义 DataPipe。
作为一个指导性示例,让我们实现一个将可调用对象应用于输入迭代器的 an。对于 ,请查看 map 文件夹的示例,然后对方法而不是方法执行以下步骤。IterDataPipe
MapDataPipe
__getitem__
__iter__
命名¶
的命名约定是 “Operation”-er,后跟 或 ,因为每个
DataPipe 本质上是一个容器,用于将操作应用于从 source 生成的数据。为了简洁,
我们在 init 文件中仅别名为 “Operation-er”。对于我们的示例,我们将模块命名并为其别名,如下所示。DataPipe
IterDataPipe
MapDataPipe
DataPipe
IterDataPipe
MapperIterDataPipe
iter.Mapper
torchdata.datapipes
对于函数方法名称,命名约定为 .例如
的函数方法名称 为 ,以便它可以由 调用。datapipe.<operation>
Mapper
map
datapipe.map(...)
构造 函数¶
DataSet 现在通常构造为 ,因此每个 DataSet 通常都采用
source 作为其第一个参数。下面是一个简化版本的 Mapper 作为示例:DataPipes
DataPipe
DataPipe
from torchdata.datapipes.iter import IterDataPipe
class MapperIterDataPipe(IterDataPipe):
def __init__(self, source_dp: IterDataPipe, fn) -> None:
super().__init__()
self.source_dp = source_dp
self.fn = fn
注意:
避免在函数中从源 DataPipe 加载数据,以支持延迟数据加载和保存 记忆。
__init__
如果 instance 将数据保存在内存中,请注意数据的就地修改。当第二个 iterator 是从实例创建的,则数据可能已经更改。请将 class 作为每个迭代器的 data 的引用。
IterDataPipe
IterableWrapper
deepcopy
避免使用现有 DataPipes 的函数名称获取的变量名称。例如,是 可用于调用 的函数名称 。在 另一个可能会导致混乱。
.filter
FilterIterDataPipe
filter
IterDataPipe
迭 代¶
对于 ,需要一个函数来使用来自源的数据,然后
对 之前的数据应用操作。IterDataPipes
__iter__
IterDataPipe
yield
class MapperIterDataPipe(IterDataPipe):
# ... See __init__() defined above
def __iter__(self):
for d in self.dp:
yield self.fn(d)
长度¶
在许多情况下,如我们的示例所示,DataPipe 的方法返回
source DataPipe 的 API API 中。MapperIterDataPipe
__len__
class MapperIterDataPipe(IterDataPipe):
# ... See __iter__() defined above
def __len__(self):
return len(self.dp)
但是,请注意,这是可选的,并且通常不可取。For 在下面的 using DataPipes 部分中,未实现,因为每个文件中的行数
在加载之前是未知的。在某些特殊情况下,可以返回一个整数或引发
错误,具体取决于输入。在这些情况下,Error 必须是 a 才能支持 Python 的
内置函数,如 .__len__
IterDataPipe
CSVParserIterDataPipe
__len__
__len__
TypeError
list(dp)
使用函数式 API 注册 DataPipes¶
每个 DataPipe 都可以注册以支持使用 decorator 进行功能调用 。functional_datapipe
@functional_datapipe("map")
class MapperIterDataPipe(IterDataPipe):
# ...
然后可以使用其函数形式(推荐)或类构造函数构造 DataPipes 堆栈:
import torchdata.datapipes as dp
# Using functional form (recommended)
datapipes1 = dp.iter.FileOpener(['a.file', 'b.file']).map(fn=decoder).shuffle().batch(2)
# Using class constructors
datapipes2 = dp.iter.FileOpener(['a.file', 'b.file'])
datapipes2 = dp.iter.Mapper(datapipes2, fn=decoder)
datapipes2 = dp.iter.Shuffler(datapipes2)
datapipes2 = dp.iter.Batcher(datapipes2, 2)
在上面的例子中,和 表示完全相同的 s 堆栈。我们
建议使用 DataPipes 的函数形式。datapipes1
datapipes2
IterDataPipe
与云存储提供商合作¶
在本节中,我们将展示使用内置 DataPipes 访问 AWS S3、Google Cloud Storage 和 Azure Cloud Storage 的示例。
虽然这里只讨论了这两个提供程序,但对于其他库,DataPipes
应该还允许您与其他存储系统连接(已知
implementations) 的 Implementations)。fsspec
fsspec
如果您有其他云存储提供商的支持请求,请在 GitHub 上告诉我们。 或者您有代码示例要与社区共享。
使用 DataPipes 访问 AWS S3fsspec
¶
这需要安装库 (documentation) 和 (s3fs GitHub repo)。fsspec
s3fs
您可以通过传递以
替换为 FSSpecFileLister ()。"s3://BUCKET_NAME"
.list_files_by_fsspec(...)
from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(["s3://BUCKET_NAME"]).list_files_by_fsspec()
您还可以使用 FSSpecFileOpener () 打开文件并流式传输它们
(如果文件格式支持)。.open_files_by_fsspec(...)
请注意,您还可以通过
参数 .这对于访问特定的
Bucket 版本,您可以通过传入
关于 S3 存储桶版本感知 )。支持的参数因您正在访问的 (云) 文件系统而异。kwargs_for_open
{version_id: 'SOMEVERSIONID'}
s3fs
在下面的示例中,我们使用 TarArchiveLoader () 流式传输存档,
与通常的 .这使我们能够开始处理档案中的数据
而无需先将整个存档下载到内存中。.load_from_tar(mode="r|")
mode="r:"
from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(["s3://BUCKET_NAME/DIRECTORY/1.tar"])
dp = dp.open_files_by_fsspec(mode="rb", anon=True).load_from_tar(mode="r|") # Streaming version
# The rest of data processing logic goes here
最后,FSSpecFileSaver 也可用于将数据写入云。
使用 DataPipes 访问 Google Cloud Storage (GCS)fsspec
¶
这需要安装库 (documentation) 和 (gcsfs GitHub repo)。fsspec
gcsfs
您可以通过指定以
跟。以下示例中的存储桶名称为 。"gcs://BUCKET_NAME"
uspto-pair
from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(["gcs://uspto-pair/"]).list_files_by_fsspec()
print(list(dp))
# ['gcs://uspto-pair/applications', 'gcs://uspto-pair/docs', 'gcs://uspto-pair/prosecution-history-docs']
以下是从
目录。05900035.zip
uspto-pair
applications
from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(["gcs://uspto-pair/applications/05900035.zip"]) \
.open_files_by_fsspec(mode="rb") \
.load_from_zip()
# Logic to process those archive files comes after
for path, filestream in dp:
print(path, filestream)
# gcs:/uspto-pair/applications/05900035.zip/05900035/README.txt, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-address_and_attorney_agent.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-application_data.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-continuity_data.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-transaction_history.tsv, StreamWrapper<...>
使用 DataPipes 访问 Azure Blob 存储fsspec
¶
这需要安装库 (documentation) 和 (adlfs GitHub repo)。
可以通过提供以 .
例如,FSSpecFileLister ()
可用于列出容器中目录中的文件:fsspec
adlfs
abfs://
.list_files_by_fsspec(...)
from torchdata.datapipes.iter import IterableWrapper
storage_options={'account_name': ACCOUNT_NAME, 'account_key': ACCOUNT_KEY}
dp = IterableWrapper(['abfs://CONTAINER/DIRECTORY']).list_files_by_fsspec(**storage_options)
print(list(dp))
# ['abfs://container/directory/file1.txt', 'abfs://container/directory/file2.txt', ...]
您还可以使用 FSSpecFileOpener () 打开文件并流式传输它们
(如果文件格式支持)。.open_files_by_fsspec(...)
以下是从
目录 ,属于帐户 。ecdc_cases.csv
curated/covid-19/ecdc_cases/latest
pandemicdatalake
from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(['abfs://public/curated/covid-19/ecdc_cases/latest/ecdc_cases.csv']) \
.open_files_by_fsspec(account_name='pandemicdatalake') \
.parse_csv()
print(list(dp)[:3])
# [['date_rep', 'day', ..., 'iso_country', 'daterep'],
# ['2020-12-14', '14', ..., 'AF', '2020-12-14'],
# ['2020-12-13', '13', ..., 'AF', '2020-12-13']]
如有必要,还可以使用以 和 开头的 URI 访问 Azure Data Lake Storage Gen1 中的数据,如 adlfs 存储库的自述文件中所述adl://
abfs://
使用 DataPipes 访问 Azure ML 数据存储fsspec
¶
Azure ML 数据存储是对 Azure 上现有存储帐户的引用。创建和使用 Azure ML 数据存储的主要优势包括:
一个通用且易于使用的 API,用于与 Azure 中的不同存储类型 (Blob/Files/<datastore>) 进行交互。
在团队合作时更容易发现有用的数据存储。
身份验证是自动处理的 - 支持基于凭据的访问(服务主体/SAS/密钥)和基于身份的访问(Azure Active Directory/托管身份)。使用基于凭据的身份验证时,您无需在代码中公开密钥。
这需要安装库 (documentation)。azureml-fsspec
您可以通过提供以 .
例如,FSSpecFileLister ()
可用于列出容器中目录中的文件:azureml://
.list_files_by_fsspec(...)
from torchdata.datapipes.iter import IterableWrapper
# set the subscription_id, resource_group, and AzureML workspace_name
subscription_id = "<subscription_id>"
resource_group = "<resource_group>"
workspace_name = "<workspace_name>"
# set the datastore name and path on the datastore
datastore_name = "<datastore_name>"
path_on_datastore = "<path_on_datastore>"
uri = f"azureml://subscriptions/{subscription_id}/resourcegroups/{resource_group}/workspaces/{workspace_name}/datastores/{datastore_name}/paths/{path_on_datastore}"
dp = IterableWrapper([uri]).list_files_by_fsspec()
print(list(dp))
# ['azureml:///<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/<folder>/file1.txt',
# 'azureml:///<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/<folder>/file2.txt', ...]
您还可以使用 FSSpecFileOpener () 打开文件并流式传输它们
(如果文件格式支持)。.open_files_by_fsspec(...)
下面是从路径所在的默认 Azure ML 数据存储(顶级文件夹)加载 tar 文件的示例。workspaceblobstore
/cifar-10-python.tar.gz
from torchdata.datapipes.iter import IterableWrapper
# set the subscription_id, resource_group, and AzureML workspace_name
subscription_id = "<subscription_id>"
resource_group = "<resource_group>"
workspace_name = "<workspace_name>"
# set the datastore name and path on the datastore
datastore_name = "workspaceblobstore"
path_on_datastore = "cifar-10-python.tar.gz"
uri = f"azureml://subscriptions/{subscription_id}/resourcegroups/{resource_group}/workspaces/{workspace_name}/datastores/{datastore_name}/paths/{path_on_datastore}"
dp = IterableWrapper([uri]) \
.open_files_by_fsspec(mode="rb") \
.load_from_tar()
for path, filestream in dp:
print(path)
# ['azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/data_batch_4',
# 'azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/readme.html',
# 'azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/test_batch',
# 'azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/data_batch_3',
# 'azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/batches.meta',
# 'azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/data_batch_2',
# 'azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/data_batch_5',
# 'azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/data_batch_1]
下面是从路径所在的 Azure ML 数据存储(顶级文件夹)加载 CSV 文件(著名的泰坦尼克号数据集(下载))的示例。workspaceblobstore
/titanic.csv
from torchdata.datapipes.iter import IterableWrapper
# set the subscription_id, resource_group, and AzureML workspace_name
subscription_id = "<subscription_id>"
resource_group = "<resource_group>"
workspace_name = "<workspace_name>"
# set the datastore name and path on the datastore
datastore_name = "workspaceblobstore"
path_on_datastore = "titanic.csv"
uri = f"azureml://subscriptions/{subscription_id}/resourcegroups/{resource_group}/workspaces/{workspace_name}/datastores/{datastore_name}/paths/{path_on_datastore}"
def row_processer(row):
# return the label and data (the class and age of the passenger)
# if missing age, set to 50
if row[5] == "":
row[5] = 50.0
return {"label": np.array(row[1], np.int32), "data": np.array([row[2],row[5]], dtype=np.float32)}
dp = IterableWrapper([uri]) \
.open_files_by_fsspec() \
.parse_csv(delimiter=",", skip_lines=1) \
.map(row_processer)
print(list(dp)[:3])
# [{'label': array(0, dtype=int32), 'data': array([ 3., 22.], dtype=float32)},
# {'label': array(1, dtype=int32), 'data': array([ 1., 38.], dtype=float32)},
# {'label': array(1, dtype=int32), 'data': array([ 3., 26.], dtype=float32)}]