Dream Engines

Datasets I/O — dream.io

dream.io is the Layer 1 dataset abstraction for bulk inference. It gives you a uniform way to load starting frames + action sequences from any source (Hugging Face, S3, or local disk), run a rollout for every row, and write the results to any sink — in one call. No iteration loop, no per-row boilerplate.

When to use dream.io

  • Synthetic data at scale — you have 50K episodes on Hugging Face and want 50K predicted rollouts back on HF. frames_from_hf + RolloutSink.hf + client.predict_many covers the whole pipeline in ~10 lines.
  • Training-loop augmentation — your dataloader needs a small batch of predictions each step. frames_from_dir + a quick predict_many call keeps the loop tight without per-row glue.
  • Custom backends — you have a proprietary data store. Subclass IterableSource or RolloutSink; predict_many accepts any conformant implementation.

For a walkthrough that ties these pieces together, see the Bulk inference quickstart and the bulk-augment-gr1 example.

Installing the [io] extra

BASH
pip install "dream-engine[io]"

The [io] extra adds datasets>=3.0, huggingface_hub>=0.24, pyarrow>=15, and boto3>=1.34. These are lazy-imported — only the loader/sink you actually call pays its import cost.

frames_from_dir and RolloutSink.dir work without [io].


Source loaders

All loaders return an IterableSource. Pass the source directly to client.predict_many or client.estimate_cost.

frames_from_hf

PYTHON
from dream.io import frames_from_hf
src = frames_from_hf(
repo,
*,
split="train",
frame_field,
actions_field,
id_field=None,
revision=None,
token=None,
) -> IterableSource

Streams rows from a Hugging Face dataset via datasets.load_dataset(..., streaming=True). Each row is encoded to PNG bytes (frame) + .npy bytes (actions) before being yielded as a SourceRow.

ParameterTypeDescription
repostrHF dataset repo id, e.g. "kingJulio/dream-engine-example-frames"
splitstrDataset split. Default "train".
frame_fieldstrColumn name holding the starting frame. Accepts PIL.Image, {"bytes":…,"path":…} dicts (HF Image feature), numpy.ndarray (H × W × 3 uint8), or string paths.
actions_fieldstrColumn name holding the action sequence. Shape must be (T, action_dim) float32.
id_fieldstr | NoneColumn to use as the stable row id. None defaults to "row_{i:08d}".
revisionstr | NoneDataset revision (branch, tag, or commit sha). None uses the default branch.
tokenstr | NoneHF read token. Falls back to HF_TOKEN env var. Public datasets work without a token.

Returns: IterableSource. rows_hint is populated from Hub metadata when available; for large streaming datasets you may get None (see estimate_cost).

Example:

PYTHON
import dream
from dream.io import frames_from_hf, RolloutSink
src = frames_from_hf(
"kingJulio/dream-engine-example-frames",
frame_field="start_frame",
actions_field="action_sequence",
)
for row in src:
print(row.row_id, len(row.frame_bytes), len(row.actions_bytes))
break # 16 rows total in the public fixture

Requires [io] extra.


frames_from_s3

PYTHON
from dream.io import frames_from_s3
src = frames_from_s3(
uri,
*,
frame_glob="*.png",
actions_glob="*.npy",
id_pattern=r"(.*)\.png$",
boto3_kwargs=None,
) -> IterableSource

Lists objects under an S3 prefix, pairs frames with same-stem action files, and yields SourceRow instances.

ParameterTypeDescription
uristrS3 URI prefix, e.g. "s3://my-bucket/frames/"
frame_globstrGlob pattern to find frame objects under the prefix.
actions_globstrGlob pattern to find matching action objects.
id_patternstrRegex applied to the frame key to extract the row id. Group 1 is used.
boto3_kwargsdict | NoneForwarded to boto3.client("s3", ...). Use for cross-account or non-default-profile setups.

Example:

PYTHON
from dream.io import frames_from_s3
src = frames_from_s3(
"s3://my-robot-data/episodes/",
frame_glob="*.png",
actions_glob="*.npy",
)

Requires [io] extra (boto3).


frames_from_dir

PYTHON
from dream.io import frames_from_dir
src = frames_from_dir(
path,
*,
frame_glob="*.png",
actions_glob="*.npy",
) -> IterableSource

Globs a local directory for frame/actions pairs. The row id is the file stem (the part before the extension).

ParameterTypeDescription
pathstr | PathDirectory containing the frames and action files.
frame_globstrGlob pattern for frame files.
actions_globstrGlob pattern for action files. Must pair 1-to-1 with frame files by stem.

Example:

PYTHON
from dream.io import frames_from_dir
# Directory layout:
# /data/episodes/ep_001.png /data/episodes/ep_001.npy
# /data/episodes/ep_002.png /data/episodes/ep_002.npy
src = frames_from_dir("/data/episodes")

Does not require the [io] extra. This is the only loader that works with the base pip install dream-engine.


Sinks

Sinks receive completed rollouts from client.predict_many and write them to their backing store. Construct via the factory classmethods.

RolloutSink.dir

PYTHON
sink = RolloutSink.dir(
path,
*,
videos_subdir="videos",
metadata_filename="metadata.parquet",
) -> RolloutSink

Writes to a local directory. Videos land at <path>/<videos_subdir>/<row_id>.mp4; a single Parquet file is written at <path>/<metadata_filename> when finalize() is called.

ParameterTypeDescription
pathstr | PathOutput directory. Created if it does not exist.
videos_subdirstrSubdirectory for mp4 files.
metadata_filenamestrFilename for the Parquet metadata table.

Example:

PYTHON
from dream.io import RolloutSink
sink = RolloutSink.dir("./out")
result = client.predict_many(src, sink, spec="dreamdojo-2b-gr1")
# ./out/videos/row_00000000.mp4
# ./out/videos/row_00000001.mp4
# …
# ./out/metadata.parquet
print(result.output_uri) # /absolute/path/to/out

Requires [io] extra (uses pyarrow to write the metadata Parquet on finalize()). The mp4 writes during predict_many work without it, but the run errors on the final flush — install [io] whenever you use RolloutSink.dir.


RolloutSink.s3

PYTHON
sink = RolloutSink.s3(
uri,
*,
boto3_kwargs=None,
videos_subdir="videos",
metadata_filename="metadata.parquet",
) -> RolloutSink

Mirrors the dir layout on S3. Each mp4 is uploaded as a separate put_object call; the Parquet metadata file is uploaded on finalize().

ParameterTypeDescription
uristrS3 URI prefix, e.g. "s3://my-bucket/rollouts/"
boto3_kwargsdict | NoneForwarded to boto3.client("s3", ...).
videos_subdirstrPrefix subdirectory for mp4 objects.
metadata_filenamestrObject key (under uri) for the Parquet table.

Example:

PYTHON
sink = RolloutSink.s3("s3://my-bucket/rollouts/run-2026-05-06/")
result = client.predict_many(src, sink, spec="dreamdojo-2b-gr1")
print(result.output_uri) # s3://my-bucket/rollouts/run-2026-05-06

Requires [io] extra (boto3).


RolloutSink.hf

PYTHON
sink = RolloutSink.hf(
repo,
*,
token=None,
private=False,
shard_rows=500,
license="cc-by-4.0",
pretty_name=None,
) -> RolloutSink

Writes a Hugging Face dataset with path-based mp4 files + Parquet metadata shards, following the Hub's recommended layout:

<repo>/
  videos/<row_id>.mp4
  metadata/
    data-00000.parquet   # one shard per shard_rows rollouts (zero-indexed)
    data-00001.parquet
    …
  README.md              # auto-generated dataset card

Each shard is uploaded via HfApi.upload_file as rows accumulate, so your training pipeline can start reading via streaming=True before the run finishes. The README.md is written once on the first shard and is not rewritten on subsequent flushes — re-running predict_many against the same repo appends new shards without touching the existing README.

ParameterTypeDescription
repostrHF dataset repo id, e.g. "my-org/predicted-rollouts". Created if it does not exist.
tokenstr | NoneHF write token. Falls back to HF_TOKEN env or huggingface-cli login cache.
privateboolCreate the repo as private on first commit. Has no effect on subsequent pushes.
shard_rowsintRows per Parquet shard. 500 ≈ 250 MB per shard for GR-1 rollouts.
licensestrDataset card license identifier. Must be compatible with the source model's license.
pretty_namestr | NoneHuman-readable name for the dataset card.

Example:

PYTHON
import os
from dream.io import frames_from_hf, RolloutSink
src = frames_from_hf(
"kingJulio/dream-engine-example-frames",
frame_field="start_frame",
actions_field="action_sequence",
)
sink = RolloutSink.hf(
"my-org/gr1-predictions",
token=os.environ["HF_WRITE_TOKEN"],
shard_rows=500,
)
result = client.predict_many(src, sink, spec="dreamdojo-2b-gr1")
print(result.output_uri) # hf://datasets/my-org/gr1-predictions

Requires [io] extra (huggingface_hub, pyarrow).


Custom sources

Subclass IterableSource to load rows from any data origin. You only need to implement __iter__:

PYTHON
import numpy as np
from io import BytesIO
from PIL import Image
from dream.io import IterableSource, SourceRow
class MyDatabaseSource(IterableSource):
"""Loads episodes from a custom database."""
def __init__(self, connection_string: str):
self._conn_str = connection_string
self.rows_hint = self._count_rows() # optional; enables estimate_cost
def _count_rows(self) -> int:
# query your database for the total row count
...
def __iter__(self):
for episode in self._fetch_episodes():
# encode frame to PNG bytes
buf = BytesIO()
Image.fromarray(episode["start_frame"]).save(buf, format="PNG")
frame_bytes = buf.getvalue()
# encode actions to .npy bytes
actions_buf = BytesIO()
np.save(actions_buf, episode["actions"].astype(np.float32))
actions_bytes = actions_buf.getvalue()
yield SourceRow(
row_id=str(episode["id"]),
frame_bytes=frame_bytes,
actions_bytes=actions_bytes,
metadata={"source": "my-database"},
)

Pass it directly to client.predict_many:

PYTHON
src = MyDatabaseSource("postgresql://...")
result = client.predict_many(src, sink, spec="dreamdojo-2b-gr1")

Custom sinks

Subclass RolloutSink to write completed rollouts to your own backend. Implement write (called per-row) and finalize (called once at the end):

PYTHON
from dream.io import RolloutSink, RolloutRecord
class MyWarehouseSink(RolloutSink):
"""Writes rollouts to a proprietary video warehouse."""
def __init__(self, warehouse_url: str):
self._url = warehouse_url
self._written = 0
def write(self, record: RolloutRecord) -> None:
# record.row_id — matches source row id
# record.mp4_bytes — raw mp4 from the engine
# record.metadata — provenance dict from Client.predict_many
self._upload(record.row_id, record.mp4_bytes)
self._written += 1
def finalize(self) -> str:
return f"{self._url} ({self._written} rollouts)"
def _upload(self, row_id: str, mp4_bytes: bytes) -> None:
...

write is called from predict_many's thread pool, so keep it thread-safe or use locking.


See also