class delu.EarlyStopping[source]#

Bases: EarlyStopping

[DEPRECATED] (Renamed to delu.tools.EarlyStopping)

forget_bad_updates() None[source]#

In the new class, see delu.tools.EarlyStopping.reset_unsuccessful_updates.

class delu.Timer[source]#

Bases: Timer

[DEPRECATED] (Renamed to delu.tools.Timer)

__call__() float[source]#

In the new class, see delu.tools.Timer.elapsed.

format(format_str: str, /) str[source]#

In the new class, see the tutorial in delu.tools.Timer.

class delu.ProgressTracker[source]#

Bases: object

Helps with early stopping and tracks the best metric value.

[DEPRECATED] (Instead, use delu.EarlyStopping and manually track the best score.)

For ProgressTracker, the greater score is the better score. At any moment the tracker is in one of the following states:

  • success: the last update increased the best score

  • fail: last n > patience updates did not improve the best score

  • neutral: if neither success nor fail


progress = delu.ProgressTracker(2)
assert progress.success  # the first update always updates the best score

assert progress.success
assert progress.best_score == 123

assert not progress.success and not progress.fail

assert not progress.success and not progress.fail
# patience is 2 and the best score is not updated for more than 2 steps
assert progress.fail
assert progress.best_score == 123  # fail doesn't affect the best score
assert progress.fail  # still no improvements

assert not progress.fail and not progress.success
assert progress.best_score == 123
assert not progress.fail  # just 1 bad update (the patience is 2)

assert not progress.fail and not progress.success
assert progress.best_score is None
__init__(patience: int | None, min_delta: float = 0.0) None[source]#
  • patience – Allowed number of unsuccessfull updates. For example, if patience is 2, then 2 unsuccessfull updates in a row is not a fail, but 3 unsuccessfull updates in a row is a fail. None means “infinite patience” and the progress tracker is never in the “fail” state.

  • min_delta – the minimal improvement over the current best score to count it as success.


progress = delu.ProgressTracker(2)
progress = delu.ProgressTracker(3, 0.1)
property best_score: float | None#

The best score so far.

If the tracker is just created/reset, return None.

property fail: bool#

Check if the tracker is in the “fail” state.

forget_bad_updates() None[source]#

Reset unsuccessfull update counter and set the status to “neutral”.

Note that this method does NOT reset the best score.

reset() None[source]#

Reset everything.

property success: bool#

Check if the tracker is in the “success” state.

update(score: float) None[source]#

Submit a new score and update the tracker’s state accordingly.


score – the score to use for the update.

class delu.Stream[source]#

Bases: object

Smart wrapper for data loaders and iterables.


Stream simplifies managing loops, especially in typical Deep Learning scenarios. Stream:

  • manages the “epoch” and “iteration” variables

  • allows to dump and restore loop’s state: epoch, iteration, etc.

  • allows to customize the size of epoch

  • allows to change the underlying data loader on the fly

  • enables useful patterns


Let’s start with the most common training loop:

loader = DataLoader(...)
iteration = 0
for epoch in range(max_epoch):
    for batch in loader:
        iteration += 1
        print('Epoch:', epoch, 'Iteration:', iteration)

Let’s enhance the loop using Stream:

stream = delu.Stream(DataLoader(...))  # (A)
for epoch in stream.epochs(max_epoch):  # (B)
    for batch in epoch:  # (C)
        print('Epoch:', stream.epoch, 'Iteration:', stream.iteration)  # (D)

Some comments for the above code:

  • (A) Stream is created by passing a dataloader as a single argument (in fact, you can pass any iterable object); the dataloader is accessible via Stream.loader

  • (B) epoch is an iterator over batches for one epoch

  • (C) a progress bar for batches is displayed (for the whole training loop, not just for one epoch)

  • (D) Stream.epoch and Stream.iteration are managed automatically

Saving the loop’s state and resuming the loop is possible with the methods Stream.state_dict, Stream.load_state_dict. In practice, it can look like this:

model = ...
optimizer = ...
stream = delu.Stream(DataLoader(...))
if load_from_checkpoint:
    checkpoint = torch.load(checkpoint_path)
for epoch in stream.epochs(...):
    for batch in epoch:
            'model': model.state_dict(),
            'optimizer': optimizer.state_dict(),
            'stream': stream.state_dict(),


Stream’s state does not include the loader’s state. See Stream.state_dict and Stream.load_state_dict for details.

In order to customize the epoch size, pass the size as the second argument:

for epoch in stream.epochs(max_epoch, custom_epoch_size):
    for batch in epoch:

Changing the underlying loader on the fly is possible at any moment (even in the middle of epoch) via Stream.set_loader. For example:

for epoch in stream.epochs(max_epoch, custom_epoch_size):
    for batch in epoch:
        if need_new_data():

If the method Stream.epochs does not fit your workflow and you want more control over the loop, there are more “low-level” methods (in fact, Stream.epochs is just a thin wrapper around them):

For example, the most common training loop can be implemented as follows:

# A
while stream.epoch < max_epoch:
    for batch in stream.data():

# B
while stream.epoch < max_epoch:
    for _ in range(len(stream.loader)):
        batch = stream.next()  # stream.iteration is incremented automatically

The “infinite” stream of data can be implemented as follows:

for item in stream.data(float('inf')):
    if condition:  # for example: `if stream.iteration % frequency == 0`


For better technical understanding, keep in mind that Stream simply encapsulates an “infinite iterator” that is constantly moving forward. The behavior is absolutely the same for both finite and infinite iterables and can be expressed with the following loop:

while True:
    for item in loader:  # the loader which is passed to the constructor

Documentation for Stream.next and Stream.data provide helpful examples.

__init__(loader: Iterable) None[source]#

loader – any kind of iterable (DataLoader, list, iterator, generator, …)


stream = delu.Stream([0, 1, 2, 3])
stream = delu.Stream(range(10))
import itertools
stream = delu.Stream(itertools.repeat(0))

from torch.utils.data import DataLoader, TensorDataset
dataset = TensorDataset(torch.randn(10, 2))
stream = delu.Stream(DataLoader(dataset, batch_size=3, shuffle=True))
data(n_items: int | float | None = None) Iterator[source]#

Iterate over the loader.

Under the hood, Stream.next is called, hence, Stream.iteration changes during iterations.


n_items – how many items to produce. If None, interpreted as len(self.loader). If float, must be float('inf') or math.inf.


stream = delu.Stream(range(5))
assert list(stream.data()) == [0, 1, 2, 3, 4]
assert list(stream.data(3)) == [0, 1, 2]
# stream doesn't "start over"!
assert list(stream.data(3)) == [3, 4, 0]
assert list(stream.data(1)) == [1]
assert list(stream.data(2)) == [2, 3]
for x in stream.data(float('inf')):
    if stream.iteration % frequency:
property epoch: int#

Current epoch.

Technically, the number of Stream.increment_epoch calls.

max_epoch: int | float,
epoch_size: int | float | None = None,
progress_bar_config: Dict[str, Any] | None = {},
) Iterator[Iterator[Any]][source]#

Iterate over data epochs.

A shortcut for what is probably the most popular form of a training loop in Deep Learning (plus a progress bar):

for epoch in stream.epochs(max_epoch, epoch_size):
    for batch in epoch:

# is equivalent to:

while stream.epoch < max_epoch:
    for batch in stream.data(epoch_size):
  • max_epoch – defines the number of epochs. The loop keeps running while self.epoch < max_epoch. If float, must be float('inf') or math.inf.

  • epoch_size – the number of data items in one epoch (is forwarded to Stream.data).

  • progress_bar_config – if not None (the default value is {}!), a progress bar for iterations will be displayed and the argument will be interpreted as key-word arguments for tqdm. The following key-word arguments will be automatically added if not presented in progress_bar_config: initial, total (if can be inferred from the arguments and/or from Stream.loader). If [ipywidgets](https://ipywidgets.readthedocs.io) is installed and the program is running in JupyterLab (Jupyter Notebook), then the pretty widget is used instead of the text-based one.


Iterator over iterators over data from Stream.loader.


stream = delu.Stream(range(3))
for epoch in stream.epochs(2):
    for x in epoch:
stream = delu.Stream(range(3))
for epoch in stream.epochs(3, 2):
    for x in epoch:
increment_epoch() None[source]#

Increment Stream.epoch.


stream = delu.Stream(range(5))
assert stream.epoch == 0
assert stream.epoch == 1
assert stream.epoch == 2
property iteration: int#

Current iteration.

Technically, the number of Stream.next calls.

load_state_dict(state_dict: Dict[str, Any]) None[source]#

Load state dictionary.


state_dict – state. Must be produced by Stream.state_dict.


The method does not affect data that is produced by Stream.epochs, Stream.data, Stream.next (see the examples below), i.e. the method only sets some “metadata” such as epoch, iteration etc. If you want to load the “state of data stream”, you have to load the state of corresponding random number generators separately.


stream = delu.Stream(range(10))
assert stream.state_dict() == {'epoch': 1, 'iteration': 1}

new_stream = delu.Stream(range(10))
assert new_stream.state_dict() == {'epoch': 1, 'iteration': 1}
assert new_stream.next() == 0
assert new_stream.state_dict() == {'epoch': 1, 'iteration': 2}
property loader: Iterable#

The underlying loader.

next() Any[source]#

Get the next item and increment iteration.


The next item.


stream = delu.Stream(range(3))
assert stream.iteration == 0
assert stream.next() == 0
assert stream.iteration == 1
assert stream.next() == 1
assert stream.next() == 2
assert stream.next() == 0
assert stream.iteration == 4
while True:
    x = stream.next()
    if stream.iteration % frequency:
reload_iterator() None[source]#

Set the underlying iterator to iter(self.loader).

If the underlying loader is a finite iterable, the method can be used to interrupt and skip the current epoch (i.e. skip its data). If the loader is an iterator, the method does nothing.


stream = delu.Stream(range(5))
assert stream.next() == 0
assert stream.next() == 1
assert stream.next() == 0

stream = delu.Stream(iter(range(5)))
assert stream.next() == 0
assert stream.next() == 1
assert stream.next() == 2
set_loader(loader: Iterable) None[source]#

Set new loader.




from itertools import repeat
stream = delu.Stream(repeat(0))
for x in stream.data(5):
    print(stream.iteration, x)
    if stream.iteration == 2:
1 0
2 0
3 1
4 1
5 1
state_dict() Dict[str, Any][source]#

Get the stream’s state.

The result can be passed to Stream.load_state_dict. The result includes:

  • epoch

  • iteration


Fields related to data (loader, iterator etc.) are NOT included in the state. If you want to save the “state of data stream” then you have to save the state of corresponding random number generators separately.




stream = delu.Stream(range(10))
assert stream.state_dict() == {'epoch': 0, 'iteration': 0}
assert stream.state_dict() == {'epoch': 1, 'iteration': 2}
delu.collate(iterable: Iterable) Any[source]#

Almost an alias for torch.utils.data.dataloader.default_collate.

[DEPRECATED] (Instead, use torch.utils.data.default_collate)

Namely, the input is allowed to be any kind of iterable, not only a list. Firstly, if it is not a list, it is transformed to a list. Then, the list is passed to the original function and the result is returned as is.

delu.concat(*args, **kwargs)[source]#

[DEPRECATED] (Instead, use delu.cat.)

delu.improve_reproducibility(base_seed: int | None, one_cuda_seed: bool = False) int[source]#

Set seeds and turn off non-deterministic algorithms.

[DEPRECATED] (Instead, use delu.random.seed and manually set flags mentioned in the PyTorch docs on reproducibility)

Do everything possible to improve reproducibility for code that relies on global random number generators. See also the note below.


  1. seeds in random, numpy.random, torch, torch.cuda

  2. torch.backends.cudnn.benchmark to False

  3. torch.backends.cudnn.deterministic to True


if base_seed is set to None, the generated base seed is

returned; otherwise, base_seed is returned as is

Return type:



If you don’t want to choose the base seed, but still want to have a chance to reproduce things, you can use the following pattern:

print('Seed:', delu.improve_reproducibility(None))


100% reproducibility is not always possible in PyTorch. See this page for details.


assert delu.improve_reproducibility(0) == 0
seed = delu.improve_reproducibility(None)
class delu.evaluation[source]#

Context-manager & decorator for models evaluation.

[DEPRECATED] (Instead, use model.eval() + torch.no_inference/no_grad)

This code…

with delu.evaluation(model):  # or: evaluation(model_0, model_1, ...)
@delu.evaluation(model)  # or: @evaluation(model_0, model_1, ...)
def f():

…is equivalent to the following

context = getattr(torch, 'inference_mode', torch.no_grad)
with context():
def f():



The training status of modules is undefined once a context is finished or a decorated function returns.


The function must be used in the same way as torch.no_grad and torch.inference_mode, i.e. only as a context manager or a decorator as shown below in the examples. Otherwise, the behaviour is undefined.


Contrary to torch.no_grad and torch.inference_mode, the function cannot be used to decorate generators. So, in the case of generators, you have to manually create a context:

def my_generator():
    with delu.evaluation(...):
        for a in b:
            yield c


a = torch.nn.Linear(1, 1)
b = torch.nn.Linear(2, 2)
with delu.evaluation(a):
with delu.evaluation(a, b):
def f():
@delu.evaluation(a, b)
def f():
class delu.data.Enumerate[source]#

Bases: Enumerate

[DEPRECATED] (Renamed to delu.utils.data.Enumerate)

class delu.data.IndexDataset[source]#

Bases: IndexDataset

[DEPRECATED] (Instead, use delu.utils.data.IndexDataset)

class delu.data.FnDataset[source]#

Bases: Dataset

Create simple PyTorch datasets without classes and inheritance.


FnDataset allows avoiding implementing Dataset classes in simple cases.


First, a quick example. Without FnDataset:

from PIL import Image

class ImagesList(Dataset):
    def __init__(self, filenames, transform):
        self.filenames = filenames
        self.transform = transform

    def __len__(self):
        return len(self.filenames)

    def __getitem__(self, index):
        return self.transform(Image.open(self.filenames[index]))

dataset = ImagesList(filenames, transform)

With FnDataset:

dataset = delu.data.FnDataset(Image.open, filenames, transform)
# Cache images after the first load:
from functools import lru_cache
dataset = delu.data.FnDataset(lru_cache(None)(Image.open), filenames)

In other words, with the vanilla PyTorch, in order to create a dataset, you have to inherit from torch.utils.data.Dataset and implement three methods:

  • __init__

  • __len__

  • __getitem__

With FnDataset the only thing you may need to implement is the fn argument that will power __getitem__. The easiest way to learn FnDataset is to go through the examples below.

A list of images:

dataset = delu.data.FnDataset(Image.open, filenames)
# dataset[i] returns Image.open(filenames[i])

A list of images that are cached after the first load:

from functools import lru_cache
dataset = delu.data.FnDataset(lru_cache(None)(Image.open), filenames)

pathlib.Path is handy for creating datasets that read from files:

images_dir = Path(...)
dataset = delu.data.FnDataset(Image.open, images_dir.iterdir())

If you only need files with specific extensions:

dataset = delu.data.FnDataset(Image.open, images_dir.glob('*.png'))

If you only need files with specific extensions located in all subfolders:

dataset = delu.data.FnDataset(
    Image.open, (x for x in images_dir.rglob('**/*.png') if condition(x))

A segmentation dataset:

image_filenames = ...
gt_filenames = ...

def get(i):
    return Image.open(image_filenames[i]), Image.open(gt_filenames[i])

dataset = delu.data.FnDataset(get, len(image_filenames))

A dummy dataset that demonstrates that FnDataset is a very general thing:

def f(x):
    return x * 10

def g(x):
    return x * 2

dataset = delu.data.FnDataset(f, 3, g)
# dataset[i] returns g(f(i))
assert len(dataset) == 3
assert dataset[0] == 0
assert dataset[1] == 20
assert dataset[2] == 40
__getitem__(index: int) Any[source]#

Get value by index.

See FnDataset for details.





fn: Callable[[...], T],
args: int | Iterable,
transform: Callable[[T], Any] | None = None,
) None[source]#
  • fn – the function that produces values based on arguments from args

  • args – arguments for fn. If an iterable, but not a list, then is casted to a list. If an integer, then the behavior is the same as for list(range(args)). The size of args defines the return value for FnDataset.__len__.

  • transform – if presented, is applied to the return value of fn in FnDataset.__getitem__


import PIL.Image as Image
import torchvision.transforms as T

dataset = delu.data.FnDataset(Image.open, filenames, T.ToTensor())
__len__() int[source]#

Get the dataset size.

See FnDataset for details.



class delu.data.IndexLoader[source]#

Bases: object

Like DataLoader, but over indices instead of data.

[DEPRECATED] (Instead, use delu.data.IndexDataset and DataLoader)

The shuffling logic is delegated to the native PyTorch DataLoader, i.e. no custom logic is performed under the hood. The data loader which actually generates indices is available as IndexLoader.loader.


Usage for training:

train_loader = delu.data.IndexLoader(
    len(train_dataset), batch_size, shuffle=True
for epoch in range(n_epochs):
    for batch_idx in train_loader:

Other examples:

dataset_size = 10  # len(dataset)
for batch_idx in delu.data.IndexLoader(dataset_size, batch_size=3):
tensor([0, 1, 2])
tensor([3, 4, 5])
tensor([6, 7, 8])
dataset_size = 10  # len(dataset)
for batch_idx in delu.data.IndexLoader(dataset_size, 3, drop_last=True):
tensor([0, 1, 2])
tensor([3, 4, 5])
tensor([6, 7, 8])
size: int,
device: int | str | device = 'cpu',
) None[source]#
  • size – the number of items (for example, len(dataset))

  • args – positional arguments for torch.utils.data.DataLoader

  • device – if not CPU, then all indices are materialized and moved to the device at the beginning of every loop. It can be useful when the indices are applied to non-CPU data (e.g. CUDA-tensors) and moving data between devices takes non-negligible time (which can happen in the case of simple and fast models like MLPs).

  • kwargs – keyword arguments for torch.utils.data.DataLoader

__len__() int[source]#

Get the size of the original DataLoader.

property loader: DataLoader#

The original DataLoader.

size: int,
) DataLoader[source]#

Make DataLoader over indices instead of data.

[DEPRECATED] (Instead, use delu.data.IndexDataset and DataLoader)

This is just a shortcut for torch.utils.data.DataLoader(delu.data.IndexDataset(...), ...).



Usage for training:

train_loader = delu.data.make_index_dataloader(
    len(train_dataset), batch_size, shuffle=True
for epoch in range(n_epochs):
    for i_batch in train_loader:
        x_batch = X[i_batch]
        y_batch = Y[i_batch]

Other examples:

dataset_size = 10  # len(dataset)
for batch_idx in delu.data.make_index_dataloader(
    dataset_size, batch_size=3
tensor([0, 1, 2])
tensor([3, 4, 5])
tensor([6, 7, 8])
dataset_size = 10  # len(dataset)
for batch_idx in delu.data.make_index_dataloader(
    dataset_size, 3, drop_last=True
tensor([0, 1, 2])
tensor([3, 4, 5])
tensor([6, 7, 8])
delu.hardware.free_memory(*args, **kwargs) None[source]#

[DEPRECATED] (Instead, use delu.cuda.free_memory.)

delu.hardware.get_gpus_info() Dict[str, Any][source]#

Get information about GPU devices: driver version, memory, utilization etc.

[DEPRECATED] (Instead, use functions from torch.cuda)

The example below shows what kind of information is returned as the result. All figures about memory are given in bytes.


Information about GPU devices.


The ‘devices’ value contains information about all gpus regardless of the value of CUDA_VISIBLE_DEVICES.



Output example (formatted for convenience):

    'driver': '440.33.01',
    'devices': [
            'name': 'GeForce RTX 2080 Ti',
            'memory_total': 11554717696,
            'memory_free': 11554652160,
            'memory_used': 65536,
            'utilization': 0,
            'name': 'GeForce RTX 2080 Ti',
            'memory_total': 11552096256,
            'memory_free': 11552030720,
            'memory_used': 65536,
            'utilization': 0,