目录

DataLoader2

轻量级 DataLoader2 被引入,以将过载的数据操作功能从 torch.utils.data.DataLoader 解耦到 DataPipe 操作。此外,某些功能只能通过 DataLoader2 实现,例如快照和切换后端服务以执行高性能操作。

DataLoader2

class torchdata.dataloader2.DataLoader2(datapipe: Union[IterDataPipe, MapDataPipe], datapipe_adapter_fn: Optional[Union[Iterable[Adapter], Adapter]] = None, reading_service: Optional[ReadingServiceInterface] = None)

DataLoader2 是用于优化和执行给定的 DataPipe 图形 基于 ReadingServiceAdapter 函数,支持

  • 动态分片以支持多进程和分布式数据加载

  • 多个后端 ReadingServices

  • DataPipe 图像在内存中的局部修改,如重排控制、内存绑定等。

  • 截图数据预处理管道的状态(WIP)

Parameters:
  • datapipe (IterDataPipe or MapDataPipe) – DataPipe 从何处加载数据。在初始化过程中,将创建一个该对象的深拷贝,以便在不同的DataLoader2中使用输入时不会共享状态。

  • datapipe_adapter_fn (Iterable[Adapter] or Adapter, optional) – Adapter 函数(s) 将被应用于 DataPipe (默认: None).

  • reading_service (ReadingServiceInterface, 可选) – 定义了 DataLoader2 如何在 DataPipe 上执行操作,例如多进程/分布式(默认:None)。在此初始化期间将对此进行深拷贝,允许在不同的 DataLoader2 中重用输入而不共享状态。

__iter__() DataLoader2Iterator[T_co]

返回一个单例迭代器,从 DataPipe 适应的图中创建。 DataPipe 将在提供序列化状态以构造时恢复。 DataLoader2finalize_iterator 将分别在迭代开始和结束时调用。

classmethod from_state(state: Dict[str, Any], reading_service: CheckpointableReadingServiceInterface) DataLoader2[T_co]

创建新的 DataLoader2DataPipe 图形和 ReadingService 从序列化状态恢复

load_state_dict(state: Dict[str, Any]) None

对于现有的 DataLoader2,加载序列化状态以恢复 DataPipe 图表,并重置 ReadingService 的内部状态。

shutdown() None

关闭 ReadingService 并清理迭代器。

state_dict() Dict[str, Any]

返回一个字典来表示数据处理管道的状态,键为:

  • serialized_datapipe:序列化 DataPipe 之前 ReadingService 适应。

  • reading_service_state: 当前状态的 ReadingService 和适应性 DataPipe

注意: DataLoader2 不支持 torch.utils.data.Datasettorch.utils.data.IterableDataset。请将它们中的每一个用对应的 DataPipe 包裹起来:

ReadingService

ReadingService 指定数据处理图的执行后端。TorchData中有三种类型的ReadingServices

DistributedReadingService

DistributedReadingSerivceDataPipe的图上处理分布式分片,并通过在分布式进程中共享相同的种子来保证随机性。

MultiProcessingReadingService

MultiProcessingReadingService 用于 torch.utils.data.DataLoader 启动子进程以执行 DataPipe 图形。

PrototypeMultiProcessingReadingService

PrototypeMultiProcessingReadingService 生成多个子进程以迭代 DataPipe 图形。

每个 ReadingServices 都会使用 DataPipe 图形并对其进行修改,以实现一些功能,如动态分片、共享随机种子和多/分布式过程的快照。

这也可以更容易地将数据预处理管道从研究转移到生产。在创建和使用DataPipe图并通过ReadingServices验证后,可以提供一个不同的ReadingService来配置并连接到生产服务/基础设施,例如AIStore,作为对DataLoader2的直接替换。该ReadingService可能会搜索图,并找到可以委托给生产服务/基础设施的DataPipe操作,然后相应地修改图以实现更高性能的执行。

以下是自定义ReadingService的接口。

class torchdata.dataloader2.ReadingServiceInterface

接口用于 ReadingService。请基于此接口类扩展自定义 ReadingService

finalize() None

ReadingService 清理内部状态并完全关闭服务。 在 DataLoader2shutdown__del__ 中调用。

finalize_iteration() None

ReadingService 在一个epoch结束后终止服务。当DataLoader2的迭代器耗尽时调用。

abstract initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]

ReadingService 接受一个 DataPipe 图,根据自定义需求将其转换为一个新的 DataPipe 图。 在首次创建 DataLoader2 迭代器时调用一次。

Parameters:

datapipe – 原始 DataPipe 图表。

Returns:

一个适应的或新的 DataPipe 图表。

initialize_iteration() None

ReadingService 启动一个epoch的服务。在每次获取 DataLoader2 迭代器时调用。

检查点/快照功能正在进行中。这里是初步的接口(可能会有小的变化):

class torchdata.dataloader2.CheckpointableReadingServiceInterface

使用两种额外的方法扩展 ReadingServiceInterface 以保存/恢复数据处理图的状态。

abstract checkpoint() bytes

ReadingService 序列化内部状态。在 DataLoader2.state_dict 中调用。

abstract restore(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe]

ReadingService 根据序列化状态调整 DataPipe 图。 在首次创建 DataLoader2 迭代器时调用一次。 与 initialize 相对应,后者从头开始调整 DataPipe 图。

Parameters:
  • datapipe – 原始 DataPipe 图表在被 ReadingService 适配之前

  • serialized_state – 用于恢复适应的 DataPipe 图内部状态的序列化状态。

Returns:

从序列化状态生成的经过调整的DataPipe

并且,torchdata.dataloader.graph 中提供了图形实用函数,以帮助用户定义自己的 ReadingService 并修改图形:

traverse_dps

遍历 DataPipes 及其属性以提取 DataPipe 图。

find_dps

给定由traverse_dps函数生成的数据管道图,返回具有所提供数据管道类型的数据管道实例。

replace_dp

给定由traverse_dps函数生成的DataPipe图以及要替换的DataPipe和新的DataPipe,返回新的DataPipe图。

remove_dp

给定由traverse_dps函数生成的数据管道图以及要移除的数据管道,返回新的数据管道图。

适配器

Adapter 用于配置、修改和扩展 DataPipe 图表在 DataLoader2 中。它允许就地修改或替换由 PyTorch 领域提供的预组装 DataPipe 图表。例如,可以将 Shuffle(False) 提供给 DataLoader2,这将禁用 DataPipes 图表中的任何 shuffle 操作。

class torchdata.dataloader2.adapter.Adapter

遵循 Python Callable 协议的适配器基类。

abstract __call__(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]

可调用函数,该函数要么对DataPipe图进行原地修改,要么返回一个新的DataPipe图。

Parameters:

datapipeDataPipe 需要进行调整。

Returns:

适应的 DataPipe 或新的 DataPipe

以下是TorchData在torchdata.dataloader2.adapter中提供的Adapter列表:

Shuffle

Shuffle DataPipes适配器允许控制图表中所有现有的Shuffler(shuffle)DataPipes。

CacheTimeout

CacheTimeout DataPipes 适配器允许控制图中所有现有的 EndOnDiskCacheHolder (end_caching) 的超时。

并且,我们将提供更多 Adapters 以涵盖数据处理选项:

  • PinMemory: 在数据处理图的末尾附加一个DataPipe,将输出数据转换为torch.Tensor固定内存中的数据。

  • FullSync: 附加一个 DataPipe 以确保分布式进程之间的数据处理图同步,防止挂起。

  • ShardingPolicy: 如果sharding_filter出现在DataPipe图中,则修改分片策略。

  • PrefetchPolicy, InvalidateCache,等等。

如果您对Adapters有功能请求,希望提供,请在GitHub上打开一个问题。对于特定需求,DataLoader2也接受任何自定义的Adapter,只要它继承自Adapter类。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并解答您的问题

查看资源