Python API#
WRAPP provides a Python API for programmatic access to all package management functionality. Use it to integrate WRAPP into your build pipelines, automation scripts, or custom tools.
The API is organized into two parts:
Command functions (
create,install,mirror, etc.) — the primary operations for managing packages. These functions are mirrored by the CLI commands and are documented in the Commands Reference.Supporting infrastructure — initialization, authentication, configuration, and data structures, documented on this page
Commands#
See the Commands Reference for the primary API functions including
create, install, mirror, status, and more.
Setup and tear down#
Here is an example of a main function that sets everything up for running one or more wrapp commands
using the supplied class wrapp.ContextManager. That class takes full responsibility for setup and tear down
and calls initialize and shutdown appropriately:
# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
import asyncio
import os
import random
import sys
import wrapp
async def async_main():
"""
Example main that shows how to use the WRAPP commands within a wrapp.ContextManager block
"""
auth_info = wrapp.StorageAuthInfo.nucleus(
server_url=os.getenv("HOST_URL", "omniverse://localhost"),
username=os.getenv("OMNI_USER", "omniverse"),
password_or_token=os.getenv("OMNI_PASS", "omniverse"),
)
random_version = str(random.randint(1, 1000))
async with wrapp.ContextManager([auth_info]) as wrapp_context:
await wrapp.create(
"test_project",
random_version,
source=sys.argv[1],
catalog=False,
scheduler=wrapp_context,
)
print(f"Created test_project version {random_version} in {auth_info.server_url}")
asyncio.run(async_main())
You should use either the ContextManager class as above to manage the setup and tear down of wrapp, or use manual initialize and shutdown methods. The latter is only required when embedding wrapp in a larger existing program. See the examples in the Initializing wrapp section for working example code.
- wrapp.initialize(
- debug_level: bool = False,
- throttling_params: ThrottlingParameters | None = None,
Initialize the wrapp API. This is safe to be called multiple times.
Initialization performs the following steps in order:
Router — Creates all enabled storage backend objects (S3, Azure, GCS, Nucleus, Local, and optionally Storage API). Each backend is only created when its corresponding environment flag allows it (e.g.
WRAPP_ENABLE_STORAGE_API=True).Event loop thread — Starts the internal async event loop used by all storage operations.
Storage API eager initialization — If the Storage API backend is enabled, WRAPP attempts to connect to the gRPC endpoint, negotiate protocol versions, and populate URL route information so that subsequent URL routing is correct from the start. If this step fails (e.g. the discovery service is unreachable), it falls back to lazy initialization on the first actual storage operation.
Temp file manager — Initialises the temporary file pool used for cross-backend file transfers.
- Parameters:
debug_level – Enable debug level logging.
throttling_params – Optional scheduling parameters for throttling control. If not provided, uses default
ThrottlingParameters.
- Raises:
RuntimeError – If the wrapp API is unable to initialize.
- wrapp.shutdown(
- exc_type=None,
- exc_val=None,
- exc_tb=None,
- shutdown_client_lib=True,
- stop_timeout: float | None = None,
Shuts down the WRAPP API and should be invoked before process exit. Afterwards, no further WRAPP commands may be issued unless you call initialize() again and shutdown_client_lib was False. When shutdown_client_lib=True, the underlying client library is fully torn down – no subsequent WRAPP or omni.client calls (including a new initialize()) are possible in the same process.
The exception parameters can be passed on using
shutdown(*sys.exc_info())if the function is called within code handling exceptions.- Parameters:
stop_timeout – Timeout in seconds to wait for the event loop thread to stop.
None(default) waits indefinitely. If a finite timeout is given and the thread does not stop in time, raises RuntimeError.
Authentication and Credentials#
Authentication is controlled by the class AuthManager, which has two static methods to control the number of available credentials.
- class wrapp.AuthInfo(server: str, username: str, password_or_token: str)#
@deprecated Provided for backwards compatibility. Please use StorageAuthInfo instead.
Data object to store credentials for a given base URL. For example,
>>> AuthInfo(server='omniverse://localhost',username='omniverse',password_or_token='omniverse')
will generate credentials for a workstation Nucleus with default password. To use a user generated API token (e.g. for single sign-on), use the special username $omni-api-token
- class wrapp.StorageAuthInfo(
- server_url: str,
- auth_data: NucleusAuthInfo | Boto3AuthInfo | AzureBlobAuthInfo | GCSAuthInfo | StorageAPIBearerAuthInfo | StorageAPIClientCredentialsAuthInfo,
Credential container for a single storage server.
Each instance pairs a
server_urlwith backend-specific auth data. Use the class-method factories below to construct instances for the supported backends:# Nucleus auth = StorageAuthInfo.nucleus("omniverse://myserver", "user", "pass") # S3 (boto3) auth = StorageAuthInfo.boto3("s3://my-bucket", "<key>", "<secret>") # GCS auth = StorageAuthInfo.gcs("gs://my-bucket", credentials_file="/path/to/sa.json") # Storage API — bearer token auth = StorageAuthInfo.storage_api_bearer("https://discovery.example.com", "<TOKEN>") # Storage API — client credentials (machine-to-machine) auth = StorageAuthInfo.storage_api_client_credentials( "https://discovery.example.com", "<CLIENT_ID>", "<CLIENT_SECRET>" )
Pass one or more of these to
ContextManagerorAuthManagerto authenticate before running commands.- classmethod boto3(
- server_url: str,
- aws_access_key_id: str,
- aws_secret_access_key: str,
- aws_session_token: str | None = None,
Convenience function to set up StorageAuthInfo for accessing a S3 bucket directly using boto3.
- classmethod gcs(
- server_url: str,
- project_id: str | None = None,
- credentials_file: str | None = None,
Convenience function to set up StorageAuthInfo for accessing Google Cloud Storage.
- classmethod nucleus(
- server_url: str,
- username: str,
- password_or_token: str,
Convenience function to set up StorageAuthInfo for a Nucleus server.
- classmethod storage_api_bearer(
- discovery_url: str,
- bearer_token: str,
Create credentials for a Storage API service using a pre-obtained bearer token.
- Parameters:
discovery_url – Base URL of the Storage API discovery service (e.g.
https://my-discovery.example.com). Must match the value ofWRAPP_STORAGE_API_DISCOVERY_URL.bearer_token – OAuth2 access / bearer token.
- classmethod storage_api_client_credentials(
- discovery_url: str,
- client_id: str,
- client_secret: str,
Create credentials for a Storage API service using OAuth2 client credentials.
WRAPP will exchange these for an access token via the
grant_type=client_credentialsflow and automatically refresh it before expiry.- Parameters:
discovery_url – Base URL of the Storage API discovery service.
client_id – OAuth2 client identifier.
client_secret – OAuth2 client secret.
- class wrapp.AuthManager(
- auth_infos: List[StorageAuthInfo] | None = None,
- interactive_fallback: bool = True,
Use this class to register credential information to be used by subsequent API calls. This class can be used to programmatically set credentials used by a storage provider.
Example code for setting up credentials for Nucleus and using them:
>>> def download_file_from_staging(url, api_token): >>> with AuthManager([StorageAuthInfo.nucleus(server_url="omniverse://staging.nvidia.com", username= "$omni-api-token", password_or_token= api_token)], interactive_fallback=False) as auth: >>> result, version, content = wrapp_read_file_content(url) >>> if result == Result.OK: >>> return content >>> else: >>> raise Exception(f"Download error: {result} for {url}")
Note the use of the special username to designate an API token created for that server.
- add_auth(
- auth_info: StorageAuthInfo,
Add one credential pair to be used for the given server. These are not persisted but only kept in RAM.
- Parameters:
auth_info –
new auth info to be stored
- clear()#
Clear all previously stored credential information. This does not log out of a server or disable the registered callback
- initialize() Self#
Initializes the AuthManager after creation and installs authentication hook. Only after this the authentication info provided will be used. Make sure to call shutdown() in a ‘finally’ block somewhere to uninstall the authentication hook again.
- Returns:
self
- shutdown(exc_type=None, exc_val=None, exc_tb=None)#
De-registers the authentication callback hook installed by a previous call to initialize().
If called when handling exceptions, the function can be called using
auth_manager.shutdown(*sys.exc_info()). Otherwise the exception parameters can be left to their default values.
If no AuthManager is used, default client library behavior is enabled which is first checking for environment variables, and then falling back to an interactive login flow compatible with the Nucleus server version it’s trying to connect to. This might open a browser window, so don’t run this in non-interactive shells.
Working with package files#
For more advanced use cases, wrapp allows to read and write the content of the wrapp files themselves, exposed
as the wrapp.PackageInfo class.
Any given wrapp file can be read using the function
package = await read_package_info(file_url)
and subsequently inspected. The file format is versioned JSON, and the best way to write a potentially modified package back to the storage is via
await write_package_info(destination_file_url, package)
The relevant functions with their full parameter specifications:
- async wrapp.read_package_info(
- package_url: str,
Given a source url of a wrapp file, load and parse the content returning a PackageInfo object.
- Parameters:
package_url – URL of the wrapp file
- Returns:
The PackageInfo retrieved and parsed. None if the package file does not exist.
On failure, raises FailedCommand or StorageOperationError exception.
- async wrapp.write_package_info(
- package_url: str,
- package: PackageInfo,
- context: CommandParameters | None = None,
Given a destination URL including the file name and a package info, write the given information in the current JSON format.
- Parameters:
package_url – Destination path for the wrapp file to be created, including the file name
package – PackageInfo object to be stored
context – optional CommandParameters() object to control the storage operation
- Returns:
The filename written to
On failure, this will raise a StorageOperationError.
Ignore Rules#
These classes are used to configure which files should be ignored during package operations
like create, catalog, install, freeze, new, and status.
API functions that support ignore rules accept an ignores parameter that can be either:
A string path or URL to an ignore file (using gitignore syntax)
An
IgnoreEvaluatorobject for programmatic control
When ignores is not specified, functions automatically look for a .wrappignore file
in the root of the source directory being processed.
Path resolution behavior:
When an ignore file path is provided, it is resolved relative to the current working directory (for local paths) or used as-is (for URLs).
Patterns inside the ignore file are always resolved relative to the source directory being processed, regardless of where the ignore file is located.
For full documentation of the ignore file syntax, see Ignore Rules.
Example using a file path:
await wrapp.commands.create(
"my_package", "1.0.0", "omniverse://server/source",
catalog=False,
ignores="/path/to/myignore.txt"
)
Example using an IgnoreEvaluator:
evaluator = await wrapp.IgnoreEvaluator.create_from_file(
"omniverse://server/config/.wrappignore",
"omniverse://server/source"
)
await wrapp.commands.create(
"my_package", "1.0.0", "omniverse://server/source",
catalog=False,
ignores=evaluator
)
- class wrapp.IgnoreEvaluator(
- root_path: str | None,
- predicate: List[Callable[[...], bool]] | Callable[[...], bool],
Evaluates whether paths should be ignored based on configurable predicate functions.
This class is used throughout WRAPP to filter files during operations like
create,catalog,install, andfreeze. It supports combining multiple predicates and can be created from ignore files usingcreate_from_file().Example usage with a custom predicate:
evaluator = IgnoreEvaluator( root_path="/my/project", predicate=lambda path: path.endswith(".tmp") ) if evaluator.is_ignored("file.tmp"): print("File is ignored")
Example usage with an ignore file:
evaluator = await IgnoreEvaluator.create_from_file( "/path/to/.wrappignore", "omniverse://server/my/project" )
Create an IgnoreEvaluator with a root path and one or more predicates.
- Parameters:
root_path – The root path used to resolve relative paths when checking. Can be None if predicates work with absolute paths only.
predicate – A single predicate function or a list of predicate functions. Each predicate takes a path string and returns True if ignored.
- add_evaluator(
- predicate: List[Callable[[...], bool]] | Callable[[...], bool],
Add one or more additional predicates to this evaluator.
A path is considered ignored if any predicate returns True.
- Parameters:
predicate – A single predicate function or a list of predicate functions to add.
- async static create_from_file(
- ignore_file: str | None,
- base_url: str,
Create an IgnoreEvaluator from an ignore file.
The ignore file uses the same format as
.gitignorefiles. For full documentation of the supported syntax, see Ignore Rules.- Parameters:
ignore_file – Path or URL to the ignore file. If None, looks for
.wrappignorein base_url. If that file doesn’t exist, returns None.base_url – The base URL for resolving patterns in the ignore file. Patterns are always resolved relative to this base_url, regardless of where the ignore file is located.
- Returns:
An IgnoreEvaluator configured with the rules from the file, or None if no ignore file was found (only when ignore_file is None).
- Raises:
FailedCommand – If the ignore file cannot be read.
Path resolution behavior:
When
ignore_fileis provided, it is used as-is without joining withbase_url. This means local paths without a URL scheme (e.g.,"myignore.txt") are resolved relative to the current working directory, NOT relative tobase_url.Only when
ignore_fileis None does this method look for.wrappignoreinbase_url.
- is_ignored(path: str) bool#
Check if a path should be ignored.
Files ending with
.auto.png(thumbnails) are always ignored regardless of predicates.- Parameters:
path – The path to check. Can be relative (resolved against root_path) or absolute.
- Returns:
True if the path should be ignored, False otherwise.
- class wrapp.IgnoreFilePredicate(ignore_text: str, normalized_base_url: str)#
A predicate that evaluates ignore rules loaded from a file using gitignore syntax.
This class parses ignore rules in the same format as
.gitignorefiles and provides a callable interface to check if a given path should be ignored. It is typically used as the predicate for anIgnoreEvaluator.For full documentation of the ignore file syntax, see Ignore Rules.
Create an IgnoreFilePredicate from ignore rules text.
- Parameters:
ignore_text – The content of the ignore file (gitignore-format rules). See Ignore Rules for syntax details.
normalized_base_url – The normalized base URL against which paths will be matched.
- is_ignored(normalized_abs_url: str) bool#
Check if a path should be ignored according to the loaded rules.
- Parameters:
normalized_abs_url – The normalized absolute URL to check.
- Returns:
True if the path should be ignored, False otherwise.
- wrapp.IgnorePredicate: Callable[..., bool]#
A callable that takes a path string and returns True if the path should be ignored. This is a type alias for predicate functions used by
IgnoreEvaluator.
Data Objects#
These simple data objects are used by the API to carry more complex data. We use pydantic to make it easier to write code with structs in Python.
- class wrapp.Catalog(*args: Any, **kwargs: Any)#
The Catalog contains a bill of materials (BOM) of files at a specific version identified by their relative path relative to the root URL recorded.
- get_source_url_for_item(
- item: CatalogItem,
Returns the URL to use for fetching the actual data for a catalog item.
If a storage_map is present and has an entry for this item, returns the mapped URL (used for redirect packages). Otherwise returns the item’s source_path.
- Parameters:
item – The CatalogItem to get the source URL for
- Returns:
The URL to use for fetching the item’s data
- get_storage_path_for_item(
- item: CatalogItem,
Returns the storage path for a catalog item within a TAR file or similar archive.
If a storage_map is present, returns the mapped path. Otherwise returns the item’s relative_path. Used for TAR file operations with hash-based deduplication.
- Parameters:
item – The CatalogItem to get the storage path for
- Returns:
The path to use for storing/retrieving the item’s data
- items: list[CatalogItem]#
Content of catalog
- root: str#
Root URL of catalog operation, all relative_path entries of the Item resolve to this
- size_of_catalog() int#
Return the sum of bytes in all items of this catalog.
- size_of_catalog_deduped() int#
Return the sum of bytes in all items of this catalog, counting each distinct hash only once.
- storage_map: StorageMap | None = None#
Currently only used in TAR files to map the hash to the file with the content for deduplication
- class wrapp.CatalogItem(*args: Any, **kwargs: Any)#
A single item in a catalog represents a file. folders are not part of the catalog.
- hash: str | None = None#
content-based hash. For comparison, beware there are multiple algorithms around
- relative_path: str#
PRIMARY KEY in catalog, the location of the item relative to the root of the catalog
- size: int | None = None#
size of the file
- source_path: str#
An absolute URL pointing to the content of the item at the time of catalog creation
- class wrapp.PackageInfo(*args: Any, **kwargs: Any)#
The PackageInfo class encapsulates all information about a wrapp package. These are stored in .wrapp files
To load and save wrapp files, use the load() and save() member functions of the PackageInfo class.
- dependencies: list[Dependency] | None = None#
Optional list of dependency entries
- name: str = ''#
Name of the package
- repository: str | None = None#
Optional repository of last create or install operation.
- source: str | None = None#
Optional URL of source of last create operation
- version: str = ''#
“Version referenced
- class wrapp.Dependency(*args: Any, **kwargs: Any)#
A Dependency records the relative installation path of a subpackage, its name and version and the repository from which it was installed.
- package: str#
Name of the referenced package
- relative_destination: str#
Relative installation path
- repository: str#
The repository URL from which the package was installed
- version: str#
Version of the referenced package
- class wrapp.DependencySet(*args: Any, **kwargs: Any)#
A DependencySet represents a set of Dependencies, e.g. to create a metapackage easily.
- class wrapp.LsResultItem(*args: Any, **kwargs: Any)#
TheLsResultItem represents a single directory entry returned by the ls command, and it could be a file or a folder.
- hash: str | None = None#
A content-based hash. For comparison, be aware there are multiple algorithms around.
- item_type: LsResultItemType#
Designating the item as a file or folder
- relative_path: str#
The primary key in the catalog, the location of the item relative to the root of the catalog
- size: int | None = None#
The size of the item. None if the item is a folder.
- source_path: str#
An absolute URL pointing to the content of the item at the time of catalog creation
- class wrapp.LsResultItemType(
- value,
- names=None,
- *,
- module=None,
- qualname=None,
- type=None,
- start=1,
- boundary=None,
Designated the type of the item that can be produced by a list
- class wrapp.datastructures.catalog.StorageMap(*args: Any, **kwargs: Any)#
This is used to define a mapping between the hash of a CatalogItem and an alternative storage path. This is used to map hashes to file names in tar files for deduplication.
- storage_map: Dict[str, str]#
Map hash to a path containing the content of that item.
- class wrapp.datastructures.catalog.Tag(*args: Any, **kwargs: Any)#
Represents a tag as defined by the Nucleus Tagging Service
- class wrapp.InstalledPackage(*args: Any, **kwargs: Any)#
Represents a package that has potentially been installed in a storage location, it also might be just a wrapp file found in a folder tree underneath a top level wrapp file.
This class stores metadata about an installed package including its name, version, location, and repository information. It inherits from Pydantic’s BaseModel for data validation and serialization capabilities.
- name#
The package name.
- Type:
str
- version#
The package version installed.
- Type:
str
- wrapp_file_url#
The absolute URL of the wrapp file in the installed location.
- Type:
str
- install_location#
The path of the package relative to the top level package.
- Type:
str
- repository#
The URL of the repository this package was installed from. None indicates this is a fresh package that has not been created in a repo yet.
- Type:
Optional[str]
- package#
The detailed package information including the catalog, if already loaded
- Type:
Optional[PackageInfo]
- property num_items#
For backward compatibility with the 2.0.0-a9 and 2.0.0-a10, which had the num_items field but not the package and the catalog.
- class wrapp.datastructures.workspace_analysis.PackageStatusItemType(
- value,
- names=None,
- *,
- module=None,
- qualname=None,
- type=None,
- start=1,
- boundary=None,
Enum for the different package error types identified by the wrapp status operations. Each key has its name as a string value.
- EXTRA_ITEM = 'extra_item'#
There is an extra item which was not produced by the install operation
- EXTRA_PACKAGE = 'extra_package'#
There is an extra package which is not listed as a dependency
- ITEM_DIFFERS = 'item_differs'#
An item differs in size or content
- ITEM_MISSING = 'item_missing'#
An item is missing which is listed in the package’s catalog
- PACKAGE_MISSING = 'package_missing'#
A package file is missing
- PACKAGE_MODIFIED = 'package_modified'#
This package was modified, a top level error that will be amended by more specific errors
- PACKAGE_NO_CATALOG = 'package_no_catalog'#
A package has no catalog, indicating it has never been installed to the location
- PACKAGE_VERSION_MISMATCH = 'package_version_mismatch'#
There is a version mismatch between the package in the dependency and the wrapp file of the subpackage
- SUBPACKAGE_MODIFIED = 'subpackage_modified'#
At least one subpackage is modified
- class wrapp.datastructures.workspace_analysis.PackageStatusItem(*args: Any, **kwargs: Any)#
Provides details about a package error found during a wrapp status call.
- package_name#
The name of the package
- Type:
str
- absolute_wrapp_file_path#
Location of the package file
- Type:
str
- status_message#
A detailed human-readable description of the package error
- Type:
str
- status_item_type#
An enum indicating the class of package error
- Type:
Patch Operations#
- class wrapp.Patch#
Class to execute a set of patch operations returned by “install” efficiently and in the correct order. To execute all patch operations in a list, create an object of this class, use
Patch.add()to add all operations, and runPatch.execute().The correct order of operations is determined by the ‘<’ operator for PatchOpBase.
- add(operation: PatchOpBase) None#
Add a patch operation to this patch. All operations are executed in the order they are added to the Patch object, with the exception of adding tag and deleting files: all other operations are executed first, followed by all operations adding tags, and then all operations deleting files.
- async execute(
- context: CommandParameters,
- scheduler: SchedulerContext,
Execute all operations in the patch. Return true if all operations succeed or if the Patch is empty.
Errors in all patch operations are gathered. If any error occurred, False is returned and the errors can be retrieved through
Patch.get_errors()
- get_error_msg(
- operation: PatchOpBase,
Check if the given operation has failed and return the error message for the operation if it failed, or None if it succeeded.
- get_errors() list[tuple[PatchOpBase, str]]#
Return all errors encountered during
Patch.execute()as a list of tuples, first the failed patch operation asPatchOpBase, secondly the error message.
- class wrapp.datastructures.patch.PatchOperationType(
- value,
- names=None,
- *,
- module=None,
- qualname=None,
- type=None,
- start=1,
- boundary=None,
Enum for the different patch operations. Each key has its name as a string value.
- add_tag = 'add_tag'#
Enum value indicating
PatchOpAddTag
- copy = 'copy'#
Enum value indicating
PatchOpCopy
- create_folder = 'create_folder'#
Enum value indicating
PatchOpCreateFolder
- delete = 'delete'#
Enum value indicating
PatchOpDelete
- delete_tag = 'delete_tag'#
Enum value indicating
PatchOpDeleteTag
- class wrapp.datastructures.patch.PatchOpBase(*args: Any, **kwargs: Any)#
Abstract base class for patch operations. Each concrete class has a field “operation_type” of type
PatchOperationType.- explanation_txt() str#
Description of the patch operation.
- class wrapp.datastructures.patch.PatchOpCopy(*args: Any, **kwargs: Any)#
Patch operation copying a file.
- dst: str#
The target file path.
- explanation_txt() str#
Description of the patch operation.
- hash: str | None = None#
If available, the hash of the source file for optimization
- item_type: ItemType#
Deprecated - WRAPP only copies files which implicitly creates folders.
- src: str#
The source file.
- class wrapp.datastructures.patch.PatchOpDelete(*args: Any, **kwargs: Any)#
Patch operation deleting a file.
- explanation_txt() str#
Description of the patch operation.
- url: str#
The URL of the deleted file.
- class wrapp.datastructures.patch.PatchOpCreateFolder(*args: Any, **kwargs: Any)#
Patch operation to create a folder. Only for backwards compatibility.
Deprecated since version 1.2.0.
- explanation_txt() str#
Description of the patch operation.
Common Configuration#
All Python API functions can be called without explicitly specifying the context parameter. However, if any of the standard command-line configuration options need to be specified, it is possible to provide them with a CommandParameters object.
- class wrapp.CommandParameters(
- debug: bool = False,
- verbose: bool = False,
- dry_run: bool = False,
- log_file: str | None = None,
- hash_cache_file: str | None = None,
Class storing all common parameters for wrapp commands.
Construct this class with CommandParameters(<keyword params>). For a more detailed explanation for the individual parameters, please refer to the section Generic Parameters.
- debug: bool = False#
Debug mode.
- dry_run: bool = False#
Dry run.
- hash_cache_file: str | None = None#
The path to the hash cache.
- log_file: str | None = None#
The path to the log file.
- verbose: bool = False#
Verbose.
Progress Reporting#
When initializing the SchedulingParameters object, it is possible to provide a progress report callback. It can also be configured how frequently that callback is called. The callback receives a hierarchy of ProgressReport objects reflecting the state of the current operations WRAPP is doing.
- class wrapp.ProgressReport(
- job_name: str,
- num_tasks_executing: int,
- num_tasks_done: int,
- expected_num_sub_jobs: int | None,
Progress report for WRAPP operations (or “jobs”) running in the background.
To group WRAPP operations under a recognizable name, use
SchedulerContext.sub_job(), e.g.wrapp.list_repo(url, scheduler=scheduler.sub_job("list repo"))Also specify a progress report callback when the SchedulerContext is set up. If you are using the ContextManager, you can specify the callback in its constructor.
Once this is done and the progress report verbosity in SchedulerParams is not “off”, the progress report callback will be called regularly with the progress report for the root job containing all other jobs to allow the application to update any progress indicators.
Each progress report represents the state of a job, including its sub-jobs. A single root job report (named “WRAPP” by default, customizable via SchedulingParameters.job_name) serves as the top-level container for all other job reports.
Jobs which are siblings in the hierarchy are executed concurrently. A job is completed when all its directly spawned tasks are completed, and all its children jobs are completed. On the lowest level a job is an asyncio.task.
For a simple example how to transform the progress report into user-readable output, please check the example_complex.py.
- job_name() str#
Name of the job, used to identify jobs. The name of the root job can be customized via the
job_nameparameter ofSchedulingParameters, or useSchedulerContext.sub_job()to group operations under a named sub-job. The root job defaults to “WRAPP” if no custom name is provided.Job names might not be unique.
- num_tasks_done() int#
Number of tasks spawned by this job which are completed (this includes failed tasks).
- num_tasks_executing() int#
Number of tasks spawned by this job directly which are currently executing.
- num_tasks_processing(resource_name: str) int#
The number of tasks spawned directly by this job being executed and making use of the limited resource with the given name. Raises KeyError if the resource name is not part of the list returned by throttled_resource_names.
- num_tasks_waiting(resource_name: str) int#
The number of tasks spawned directly by this job waiting for the limited resources with the given name to become available. Raises KeyError if the resource name is not part of the list returned by throttled_resource_names.
- progress() float#
A rough measure of progress for this job between 0 and 1 if the job status is “Running”, otherwise return 1. Note that the number might go up as well as down if additional tasks are created during the execution of the jobs.
Progress is distributed equally: each sub-job and this job’s own tasks each receive 1/n weight. This prevents progress from jumping when only a fraction of total work has been discovered.
- status() ProgressReportStatus#
The status of the job. A job can be running without having any tasks or sub jobs spawned and running - then it is just the job being currently executed, blocking its parent job (unless it has been spawned within a task).
- sub_jobs() list[ProgressReport]#
All sub jobs running as part of this job.
- throttled_resource_names() Iterable[str]#
The names of all resources limiting concurrent execution of tasks spawned directly by this job. The name of a resource is given by WRAPP and user readable, examples are “nucleus_server.nvidia.com (max requests)”, “S3:my_bucket (max data transfers)”, “CPU threads”.
- total_num_tasks_done() int#
Number of tasks spawned by this job or its sub jobs which are completed (this includes failed tasks).
- total_num_tasks_executing() int#
Number of tasks spawned by this job or its sub jobs which are currently executing.
- total_num_tasks_processing() int#
The number of tasks spawned by this job or its sub jobs being executed and making use of one of the limited resources.
- total_num_tasks_processing_for_resource(
- resource_name: str,
The number of tasks spawned by this job or its sub jobs being executed and making use of the limited resource with the given name. Raises KeyError if the resource name is not part of the list returned by throttled_resource_names.
- total_num_tasks_waiting() int#
The number of tasks spawned by this job or its sub jobs waiting for one of the limited resources to become available.
- total_num_tasks_waiting_for_resource(
- resource_name: str,
The number of tasks spawned by this job or its sub jobs waiting for the limited resources with the given name to become available. Raises KeyError if the resource name is not part of the list returned by throttled_resource_names.
- total_throttled_resource_names() Iterable[str]#
The names of all resources limiting concurrent execution of tasks spawned by this job or its sub jobs
Async operations#
To avoid server overload, it is strongly recommended to make use of the WrappScheduler context manager to create a block in which API rate throttling and parallel execution is performed as specified by an instance of the SchedulingParameters object.
- class wrapp.ProgressReportVerbosity(
- value,
- names=None,
- *,
- module=None,
- qualname=None,
- type=None,
- start=1,
- boundary=None,
The amount of detail wrapps progress report prints out.
- off#
no progress is printed.
- normal#
an overview of the progress of the top-level tasks is given.
- verbose#
all scheduled jobs are listed.
- class wrapp.SchedulingParameters(
- jobs: int = 200,
- tagging_jobs: int = 50,
- file_transfer_jobs: int = 10,
- progress_report: ProgressReportVerbosity = ProgressReportVerbosity.off,
- job_name: str | None = None,
- progress_report_callback: Callable[[ProgressReport], None] | None = None,
- progress_report_interval: float | None = None,
Class storing all parameters for constructing schedulers and rate limiting for wrapp commands.
Construct this class with SchedulingParameters(<keyword params>). Hand this into the run_scheduler() method of the SchedulerContext class to construct an appropriate scheduling setup For a more detailed explanation for the parameters related to the job numbers, please refer to the section Generic Parameters.
- file_transfer_jobs: int = 10#
The number of concurrent file transfer jobs. Deprecated in favour of ThrottlingParameters.
- job_name: str | None = None#
The job name under which all tasks for the executed operation appear in the ProgressReport. Default is “WRAPP”.
- jobs: int = 200#
The number of concurrent roundtrip jobs. Deprecated in favour of ThrottlingParameters.
- progress_report_callback: Callable[[ProgressReport], None] | None = None#
Callback which can be used to monitor the progress of the operation. See ProgressReport for more details.
- progress_report_interval: float | None = None#
Interval in seconds between calls to the progress_report_callback. If 0, the callback is called as frequently as possible. Default 0.2.
- class wrapp.SchedulerContext(
- params: SchedulingParameters | None = None,
All WRAPP operations executed using the same SchedulerContext will be reported together by the progress reporting. Use this class directly if it is not convenient to use the
WrappSchedulercontext manager class.Make sure to call start_scheduler() and stop_scheduler() accordingly:
>>> import wrapp >>> async def example(root_dir: str) -> wrapp.Catalog: >>> scheduler_context = SchedulerContext(SchedulingParameters()) >>> scheduler_context.start_scheduler() >>> try: >>> catalog = await catalog(root_dir, scheduler=scheduler_context) >>> finally: >>> await scheduler_context.stop_scheduler() >>> return catalog
- get_parameters() SchedulingParameters#
- Returns:
Retrieve the SchedulingParameters which were used to construct this SchedulerContext
- start_scheduler() None#
Initialize the progress reporting. Use this in standalone usage, and make sure to call stop_scheduler in a ‘finally’ block.
- async stop_scheduler(
- *,
- exc_type=None,
- exc_val=None,
- exc_tb=None,
Stops the progress reporting. Call only after start_scheduler() had been called.
- sub_job(
- job_name: str,
Create a named sub-job for progress reporting.
Returns a new
SchedulerContextwhere all operations will be grouped under job_name in theProgressReporthierarchy. This is useful for labeling or grouping multiple WRAPP API calls under a single progress entry.Example:
await wrapp.create(..., scheduler=scheduler.sub_job("Packaging"))
- class wrapp.ThrottlingParameters(
- jobs: int = 200,
- tagging_jobs: int = 50,
- file_transfer_jobs: int = 10,
Class storing all common parameters for rate limiting for wrapp commands.
Construct this class with ThrottlingParameters(<keyword params>). For a more detailed explanation for the parameters related to the job numbers, please refer to the section Generic Parameters.
- class wrapp.WrappScheduler(
- *,
- context: SchedulerContext | None = None,
- params: SchedulingParameters | None = None,
ContextManager for wrapp to schedule its tasks. This will help throttling the number of tasks send simultaneously to the server, avoiding server overload and back-off responses.
Example
>>> import asyncio, wrapp >>> async def example(test_folder_1: str, test_folder_2: str): >>> tasks = [] >>> async with WrappScheduler(params=SchedulingParameters(jobs=10)) as scheduler_context: >>> tasks.append(asyncio.create_task(wrapp.catalog(test_folder_1, scheduler=scheduler_context))) >>> tasks.append(asyncio.create_task(wrapp.catalog(test_folder_2, scheduler=scheduler_context))) >>> results = await asyncio.gather(*tasks)
Initializing wrapp#
Simple: ContextManager#
The easiest way to initialize WRAPP in a stand-alone Python script is the ContextManager.
- class wrapp.ContextManager(
- auth_infos: list[StorageAuthInfo] | None = None,
- interactive_login_allowed=False,
- standalone_mode=True,
- scheduler_context: SchedulerContext | SchedulingParameters | None = None,
- throttling_params: ThrottlingParameters | None = None,
Helper class to make it easy to initialize WRAPP. If you don’t know what to do to initialize WRAPP, use this class to get started.
The context manager handles the full lifecycle — initialization, authentication, scheduling, and shutdown — so you can focus on WRAPP commands:
async with wrapp.ContextManager([auth_info]) as ctx: await wrapp.list_repo("s3://my-bucket/repo", scheduler=ctx)
Storage API usage — To use Storage API backends, set the environment variables
WRAPP_ENABLE_STORAGE_API=TrueandWRAPP_STORAGE_API_DISCOVERY_URL=<url>before entering the context manager. Authentication can be supplied viaStorageAuthInfoconstructed withStorageAPIBearerAuthInfoorStorageAPIClientCredentialsAuthInfo, or through CLI--auth/WRAPP_AUTH. See Storage API Authentication for full details.See Setup and tear down for example code.
Initialize the context manager setting up the desired mode of operation
- Parameters:
auth_infos – Optional list of AuthInfo instances giving credentials for the servers to be involved
interactive_login_allowed – If False [default], no interactive browser window will pop up, and all authentication information needs to be supplied via the AuthInfo data
standalone_mode – If True [default], call wrapp.shutdown() on exit. No further wrapp calls are possible beyond this
scheduler_context – Assigned to the command_context field, which can be used in calls to the API for command throttling Alternatively, a “SchedulingParameters” object can be passed in that parameter, which will be used to construct a “SchedulerContext” object.
throttling_params – The maximum number of concurrent tasks for several categories.
- add_auth(
- auth_info: StorageAuthInfo,
Add another StorageAuthInfo credential to the AuthManager used
- Parameters:
auth_info
Complex: individual calls#
If you need to embed wrapp in a larger program, using the wrapp.ContextManager might be difficult due to your program structure. In those cases it is possible to call the individual initialize and shutdown methods separately:
# Copyright (c) 2024-2026, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
import asyncio
import logging
import os
import sys
import omni.client
from rich.live import Live
from rich.logging import RichHandler
from rich.table import (
Column,
Table,
)
import wrapp.api
from wrapp import (
CommandParameters,
ProgressReport,
ProgressReportVerbosity,
SchedulerContext,
SchedulingParameters,
ThrottlingParameters,
)
from wrapp.utils.progress_report import ProgressReportStatus
from .create_new_version import create_new_version
# Setup rich live display
def setup_live_display() -> Live:
# Lazy creation to make the redirect of logging work
_live_display = Live()
rich_handler = RichHandler()
rich_handler.setLevel(logging.getLogger().level)
formatter = logging.Formatter("%(message)s")
rich_handler.setFormatter(formatter)
logging.getLogger().handlers = [
rich_handler,
*filter(lambda x: isinstance(x, logging.FileHandler), logging.getLogger().handlers),
]
_live_display.start()
return _live_display
_live_display = setup_live_display()
def print_progress(progress_report: ProgressReport):
"""
Example function handling the progress report output.
"""
progress_table = Table(
"Progress ",
Column("Name", overflow="fold", ratio=1),
"Discovered",
"Running ",
"Done ",
expand=True,
)
def add_rows(job: ProgressReport, indent: int = 0):
if job.status() == ProgressReportStatus.Running:
progress_str = f"{(job.progress()) * 100:.0f}%"
else:
progress_str = job.status().value
progress_table.add_row(
progress_str,
(" " * indent) + job.job_name(),
str(job.total_num_tasks_executing() + job.total_num_tasks_done()),
str(job.total_num_tasks_executing() - job.total_num_tasks_waiting()),
str(job.total_num_tasks_done()),
)
if indent < 3:
for sub_job in job.sub_jobs():
add_rows(sub_job, indent + 1)
# Skip the main root node called "WRAPP"
for j in progress_report.sub_jobs():
add_rows(j)
_live_display.update(progress_table, refresh=True)
def main():
"""
Example main function that shows how to use the WRAPP commands in a larger application using the initialize() and shutdown() pattern
"""
# First, initialize the wrapp API which will also initialize the omni.client library
wrapp.api.initialize(throttling_params=ThrottlingParameters(file_transfer_jobs=1))
try:
# Setup authentication for subsequent calls
auth = wrapp.AuthManager(interactive_fallback=False)
auth.initialize()
try:
# Read credentials from environment variables, supplying test defaults
auth_info = wrapp.AuthInfo(
server=os.getenv("HOST_URL", "omniverse://localhost"),
username=os.getenv("OMNI_USER", "omniverse"),
password_or_token=os.getenv("OMNI_PASS", "omniverse"),
)
auth.add_auth(auth_info)
# To make sure we're not using cached credentials, force a sign-out and reconnect to the server
omni.client.sign_out(auth_info.server)
omni.client.reconnect(auth_info.server)
# Define common parameters for all subsequent commands
command_params = CommandParameters(verbose=True)
# Define test data location
root_url = f"{auth_info.server}/Projects/wrapp_examples"
async def async_main():
scheduler_context = SchedulerContext(
SchedulingParameters(progress_report_callback=print_progress, progress_report=ProgressReportVerbosity.normal)
)
scheduler_context.start_scheduler()
try:
tasks = []
tasks.append(
asyncio.create_task(
create_new_version(
root_url=root_url,
package_name="all_projects",
repository=f"{auth_info.server}",
scheduler=scheduler_context.sub_job("Creating sample project"),
context=command_params,
)
)
)
results = await asyncio.gather(*tasks)
finally:
await scheduler_context.stop_scheduler()
asyncio.run(async_main())
finally:
# Deregister the AuthManager
auth.shutdown()
except wrapp.FailedCommand as e:
print(f"Could not create package, error message by wrapp is: '{e}'")
finally:
# Shutdown the WRAPP API
wrapp.api.shutdown(*sys.exc_info())
if __name__ == "__main__":
main()
The example function called takes the wrapp.SchedulerContext and hands it to all wrapp API functions called:
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
from typing import (
List,
Tuple,
)
from urllib.parse import urlparse
import semver
import wrapp
from wrapp import (
CommandParameters,
)
def calculate_next_version(versions: List[str], version_part: str) -> Tuple[str, str]:
"""
Given a list of strings with version numbers and the version part to increment (major, minor, patch), returns a pair of latest version and the next version
"""
sem_versions = [semver.Version.parse(v) for v in versions]
latest_version = sorted(sem_versions)[-1]
return str(latest_version), str(latest_version.next_version(version_part))
async def create_new_version(
root_url: str, package_name: str, repository: str, scheduler: wrapp.SchedulerContext, context: CommandParameters()
) -> None:
"""
Example function that creates a package in omniverse://localhost/.packages, calculating the next minor semver version from listing the existing packages
in that repository. If no package is found, it starts with 1.0.0. This uses the semver package to parse version numbers and calculate the next minor version.
:param root_url: URL of source package
:param package_name: name to give to the package.
:param repository: URL of the repository
:param scheduler: pre-constructed SchedulerContext object determining the amount of jobs to run
:param context: command context to use
"""
# Calculate the next version number
packages_in_repo = await wrapp.list_repo(repository, scheduler=scheduler)
next_version = None
for package, versions in packages_in_repo:
parsed_url = urlparse(package)
path_parts = parsed_url.path.split("/")
if path_parts[-1] == package_name:
if len(versions) > 0:
latest_version, next_version = calculate_next_version(versions, "minor")
print(f"Latest version of package {package_name} is {latest_version}, using {next_version}")
break
if next_version is None:
print(f"No prior version found for {package_name}, starting at 1.0.0")
next_version = "1.0.0"
print(f"Creating package {package_name} version {next_version}")
await wrapp.create(package_name, next_version, source=root_url, catalog=False, repo=repository, context=context, scheduler=scheduler)
print("Done")