Python API#
WRAPP provides a Python API for programmatic access to all package management functionality. Use it to integrate WRAPP into your build pipelines, automation scripts, or custom tools.
The primary API functions are the command functions (create, install, mirror, etc.)
documented in the Commands Reference — each command page includes its
Python API reference.
This section covers the supporting infrastructure needed to use those commands: initialization, authentication, configuration options, and the data structures they accept and return.
API functions#
Setup and tear down#
Here is an example of a main function that sets everything up for running one or more wrapp commands
using the supplied class wrapp.ContextManager. That class takes full responsibility for setup and tear down
and calls initialize and shutdown appropriately:
# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
import asyncio
import os
import random
import sys
import wrapp
async def async_main():
"""
Example main that shows how to use the WRAPP commands within a wrapp.ContextManager block
"""
auth_info = wrapp.StorageAuthInfo.nucleus(
server_url=os.getenv("HOST_URL", "omniverse://localhost"),
username=os.getenv("OMNI_USER", "omniverse"),
password_or_token=os.getenv("OMNI_PASS", "omniverse"),
)
random_version = str(random.randint(1, 1000))
async with wrapp.ContextManager([auth_info]) as wrapp_context:
await wrapp.create(
"test_project",
random_version,
source=sys.argv[1],
catalog=False,
scheduler=wrapp_context,
)
print(f"Created test_project version {random_version} in {auth_info.server_url}")
asyncio.run(async_main())
You should use either the ContextManager class as above to manage the setup and tear down of wrapp, or use manual initialize and shutdown methods. The latter is only required when embedding wrapp in a larger existing program. See the examples in the Initializing wrapp section for working example code.
- wrapp.initialize(
- debug_level: bool = False,
- throttling_params: ThrottlingParameters | None = None,
Initialize the wrapp API. This is safe to be called multiple times.
- Parameters:
debug_level – Enable debug level logging
throttling_params – Optional scheduling parameters for throttling control. If not provided, uses default ThrottlingParameters().
Raises RuntimeError if the wrapp API is unable to initialize.
- wrapp.shutdown(
- exc_type=None,
- exc_val=None,
- exc_tb=None,
- shutdown_client_lib=True,
Shuts down the WRAPP API and should be invoked before process exit. Afterwards, no further WRAPP commands may be issued unless you call initialize() again and shutdown_client_lib was False. When shutdown_client_lib=True, the underlying client library is fully torn down – no subsequent WRAPP or omni.client calls (including a new initialize()) are possible in the same process.
The exception parameters can be passed on using
shutdown(*sys.exc_info())if the function is called within code handling exceptions.
Authentication and Credentials#
Authentication is controlled by the class AuthManager, which has two static methods to control the number of available credentials.
- class wrapp.AuthInfo(server: str, username: str, password_or_token: str)#
@deprecated Provided for backwards compatibility. Please use StorageAuthInfo instead.
Data object to store credentials for a given base URL. For example,
>>> AuthInfo(server='omniverse://localhost',username='omniverse',password_or_token='omniverse')
will generate credentials for a workstation Nucleus with default password. To use a user generated API token (e.g. for single sign-on), use the special username $omni-api-token
- class wrapp.StorageAuthInfo(
- server_url: str,
- auth_data: NucleusAuthInfo | Boto3AuthInfo | AzureBlobAuthInfo,
Class to store credentials for a given server URL.
- classmethod boto3(
- server_url: str,
- aws_access_key_id: str,
- aws_secret_access_key: str,
- aws_session_token: str | None = None,
Convenience function to set up StorageAuthInfo for accessing a S3 bucket directly using boto3.
- classmethod nucleus(
- server_url: str,
- username: str,
- password_or_token: str,
Convenience function to set up StorageAuthInfo for a Nucleus server.
- class wrapp.AuthManager(
- auth_infos: List[StorageAuthInfo] | None = None,
- interactive_fallback: bool = True,
Use this class to register credential information to be used by subsequent API calls. This class can be used to programmatically set credentials used by a storage provider.
Example code for setting up credentials for Nucleus and using them:
>>> def download_file_from_staging(url, api_token): >>> with AuthManager([StorageAuthInfo.nucleus(server_url="omniverse://staging.nvidia.com", username= "$omni-api-token", password_or_token= api_token)], interactive_fallback=False) as auth: >>> result, version, content = wrapp_read_file_content(url) >>> if result == Result.OK: >>> return content >>> else: >>> raise Exception(f"Download error: {result} for {url}")
Note the use of the special username to designate an API token created for that server.
- add_auth(
- auth_info: StorageAuthInfo,
Add one credential pair to be used for the given server. These are not persisted but only kept in RAM.
- Parameters:
auth_info –
new auth info to be stored
- clear()#
Clear all previously stored credential information. This does not log out of a server or disable the registered callback
- initialize() Self#
Initializes the AuthManager after creation and installs authentication hook. Only after this the authentication info provided will be used. Make sure to call shutdown() in a ‘finally’ block somewhere to uninstall the authentication hook again.
- Returns:
self
- shutdown(exc_type=None, exc_val=None, exc_tb=None)#
De-registers the authentication callback hook installed by a previous call to initialize().
If called when handling exceptions, the function can be called using
auth_manager.shutdown(*sys.exc_info()). Otherwise the exception parameters can be left to their default values.
If no AuthManager is used, default client library behavior is enabled which is first checking for environment variables, and then falling back to an interactive login flow compatible with the Nucleus server version it’s trying to connect to. This might open a browser window, so don’t run this in non-interactive shells.
Working with package files#
For more advanced use cases, wrapp allows to read and write the content of the wrapp files themselves, exposed
as the wrapp.PackageInfo class.
Any given wrapp file can be read using the function
package = await read_package_info(file_url)
and subsequently inspected. The file format is versioned JSON, and the best way to write a potentially modified package back to the storage is via
await write_package_info(destination_file_url, package)
The relevant functions with their full parameter specifications:
- async wrapp.read_package_info(
- package_url: str,
Given a source url of a wrapp file, load and parse the content returning a PackageInfo object.
- Parameters:
package_url – URL of the wrapp file
- Returns:
The PackageInfo retrieved and parsed. None if the package file does not exist.
On failure, raises FailedCommand or StorageOperationError exception.
- async wrapp.write_package_info(
- package_url: str,
- package: PackageInfo,
- context: CommandParameters | None = None,
Given a destination URL including the file name and a package info, write the given information in the current JSON format.
- Parameters:
package_url – Destination path for the wrapp file to be created, including the file name
package – PackageInfo object to be stored
context – optional CommandParameters() object to control the storage operation
- Returns:
The filename written to
On failure, this will raise a StorageOperationError.
Data Objects#
These simple data objects are used by the API to carry more complex data. We use pydantic to make it easier to write code with structs in Python.
- class wrapp.Catalog(*args: Any, **kwargs: Any)#
The Catalog contains a bill of materials (BOM) of files at a specific version identified by their relative path relative to the root URL recorded.
- items: list[CatalogItem]#
Content of catalog
- root: str#
Root URL of catalog operation, all relative_path entries of the Item resolve to this
- size_of_catalog() int#
Return the sum of bytes in all items of this catalog.
- size_of_catalog_deduped() int#
Return the sum of bytes in all items of this catalog, counting each distinct hash only once.
- storage_map: StorageMap | None = None#
Currently only used in TAR files to map the relative_path to the file with the content for deduplication
- class wrapp.CatalogItem(*args: Any, **kwargs: Any)#
A single item in a catalog represents a file. folders are not part of the catalog.
- hash: str | None = None#
content-based hash. For comparison, beware there are multiple algorithms around
- relative_path: str#
PRIMARY KEY in catalog, the location of the item relative to the root of the catalog
- size: int | None = None#
size of the file
- source_path: str#
An absolute URL pointing to the content of the item at the time of catalog creation
- class wrapp.PackageInfo(*args: Any, **kwargs: Any)#
The PackageInfo class encapsulates all information about a wrapp package. These are stored in .wrapp files
To load and save wrapp files, use the load() and save() member functions of the PackageInfo class.
- dependencies: list[Dependency] | None = None#
Optional list of dependency entries
- name: str = ''#
Name of the package
- repository: str | None = None#
Optional repository of last create or install operation.
- source: str | None = None#
Optional URL of source of last create operation
- version: str = ''#
“Version referenced
- class wrapp.Dependency(*args: Any, **kwargs: Any)#
A Dependency records the relative installation path of a subpackage, its name and version and the repository from which it was installed.
- package: str#
Name of the referenced package
- relative_destination: str#
Relative installation path
- repository: str#
The repository URL from which the package was installed
- version: str#
Version of the referenced package
- class wrapp.DependencySet(*args: Any, **kwargs: Any)#
A DependencySet represents a set of Dependencies, e.g. to create a metapackage easily.
- class wrapp.LsResultItem(*args: Any, **kwargs: Any)#
TheLsResultItem represents a single directory entry returned by the ls command, and it could be a file or a folder.
- hash: str | None = None#
A content-based hash. For comparison, be aware there are multiple algorithms around.
- item_type: LsResultItemType#
Designating the item as a file or folder
- relative_path: str#
The primary key in the catalog, the location of the item relative to the root of the catalog
- size: int | None = None#
The size of the item. None if the item is a folder.
- source_path: str#
An absolute URL pointing to the content of the item at the time of catalog creation
- class wrapp.LsResultItemType(
- value,
- names=None,
- *,
- module=None,
- qualname=None,
- type=None,
- start=1,
- boundary=None,
Designated the type of the item that can be produced by a list
- class wrapp.datastructures.catalog.StorageMap(*args: Any, **kwargs: Any)#
- storage_map: Dict[str, str]#
Map relative_path to a path containing the content of that item. Used for deduplication
- class wrapp.datastructures.catalog.Tag(*args: Any, **kwargs: Any)#
Represents a tag as defined by the Nucleus Tagging Service
- class wrapp.InstalledPackage(*args: Any, **kwargs: Any)#
Represents a package that has potentially been installed in a storage location, it also might be just a wrapp file found in a folder tree underneath a top level wrapp file.
This class stores metadata about an installed package including its name, version, location, and repository information. It inherits from Pydantic’s BaseModel for data validation and serialization capabilities.
- name#
The package name.
- Type:
str
- version#
The package version installed.
- Type:
str
- wrapp_file_url#
The absolute URL of the wrapp file in the installed location.
- Type:
str
- install_location#
The path of the package relative to the top level package.
- Type:
str
- repository#
The URL of the repository this package was installed from. None indicates this is a fresh package that has not been created in a repo yet.
- Type:
Optional[str]
- package#
The detailed package information including the catalog, if already loaded
- Type:
Optional[PackageInfo]
- property num_items#
For backward compatibility with the 2.0.0-a9 and 2.0.0-a10, which had the num_items field but not the package and the catalog.
- class wrapp.datastructures.workspace_analysis.PackageStatusItemType(
- value,
- names=None,
- *,
- module=None,
- qualname=None,
- type=None,
- start=1,
- boundary=None,
Enum for the different package error types identified by the wrapp status operations. Each key has its name as a string value.
- EXTRA_ITEM = 'extra_item'#
There is an extra item which was not produced by the install operation
- EXTRA_PACKAGE = 'extra_package'#
There is an extra package which is not listed as a dependency
- ITEM_DIFFERS = 'item_differs'#
An item differs in size or content
- ITEM_MISSING = 'item_missing'#
An item is missing which is listed in the package’s catalog
- PACKAGE_MISSING = 'package_missing'#
A package file is missing
- PACKAGE_MODIFIED = 'package_modified'#
This package was modified, a top level error that will be amended by more specific errors
- PACKAGE_NO_CATALOG = 'package_no_catalog'#
A package has no catalog, indicating it has never been installed to the location
- PACKAGE_VERSION_MISMATCH = 'package_version_mismatch'#
There is a version mismatch between the package in the dependency and the wrapp file of the subpackage
- SUBPACKAGE_MODIFIED = 'subpackage_modified'#
At least one subpackage is modified
- class wrapp.datastructures.workspace_analysis.PackageStatusItem(*args: Any, **kwargs: Any)#
Provides details about a package error found during a wrapp status call.
- package_name#
The name of the package
- Type:
str
- absolute_wrapp_file_path#
Location of the package file
- Type:
str
- status_message#
A detailed human-readable description of the package error
- Type:
str
- status_item_type#
An enum indicating the class of package error
- Type:
Patch Operations#
- class wrapp.Patch#
Class to execute a set of patch operations returned by “install” efficiently and in the correct order. To execute all patch operations in a list, create an object of this class, use
Patch.add()to add all operations, and runPatch.execute().The correct order of operations is determined by the ‘<’ operator for PatchOpBase.
- add(operation: PatchOpBase) None#
Add a patch operation to this patch. All operations are executed in the order they are added to the Patch object, with the exception of adding tag and deleting files: all other operations are executed first, followed by all operations adding tags, and then all operations deleting files.
- async execute(
- context: CommandParameters,
- scheduler: SchedulerContext,
Execute all operations in the patch. Return true if all operations succeed or if the Patch is empty.
Errors in all patch operations are gathered. If any error occurred, False is returned and the errors can be retrieved through
Patch.get_errors()
- get_error_msg(
- operation: PatchOpBase,
Check if the given operation has failed and return the error message for the operation if it failed, or None if it succeeded.
- get_errors() list[tuple[PatchOpBase, str]]#
Return all errors encountered during
Patch.execute()as a list of tuples, first the failed patch operation asPatchOpBase, secondly the error message.
- class wrapp.datastructures.patch.PatchOperationType(
- value,
- names=None,
- *,
- module=None,
- qualname=None,
- type=None,
- start=1,
- boundary=None,
Enum for the different patch operations. Each key has its name as a string value.
- add_tag = 'add_tag'#
Enum value indicating
PatchOpAddTag
- copy = 'copy'#
Enum value indicating
PatchOpCopy
- create_folder = 'create_folder'#
Enum value indicating
PatchOpCreateFolder
- delete = 'delete'#
Enum value indicating
PatchOpDelete
- delete_tag = 'delete_tag'#
Enum value indicating
PatchOpDeleteTag
- class wrapp.datastructures.patch.PatchOpBase(*args: Any, **kwargs: Any)#
Abstract base class for patch operations. Each concrete class has a field “operation_type” of type
PatchOperationType.- explanation_txt() str#
Description of the patch operation.
- class wrapp.datastructures.patch.PatchOpCopy(*args: Any, **kwargs: Any)#
Patch operation copying a file.
- dst: str#
The target file path.
- explanation_txt() str#
Description of the patch operation.
- hash: str | None = None#
If available, the hash of the source file for optimization
- item_type: ItemType#
Deprecated - WRAPP only copies files which implicitly creates folders.
- src: str#
The source file.
- class wrapp.datastructures.patch.PatchOpDelete(*args: Any, **kwargs: Any)#
Patch operation deleting a file.
- explanation_txt() str#
Description of the patch operation.
- url: str#
The URL of the deleted file.
- class wrapp.datastructures.patch.PatchOpCreateFolder(*args: Any, **kwargs: Any)#
Patch operation to create a folder. Only for backwards compatibility.
Deprecated since version 1.2.0.
- explanation_txt() str#
Description of the patch operation.
Common Configuration#
All Python API functions can be called without explicitly specifying the context parameter. However, if any of the standard command-line configuration options need to be specified, it is possible to provide them with a CommandParameters object.
- class wrapp.CommandParameters(
- debug: bool = False,
- verbose: bool = False,
- dry_run: bool = False,
- log_file: str | None = None,
- hash_cache_file: str | None = None,
Class storing all common parameters for wrapp commands.
Construct this class with CommandParameters(<keyword params>). For a more detailed explanation for the individual parameters, please refer to the section Generic Parameters.
- debug: bool = False#
Debug mode.
- dry_run: bool = False#
Dry run.
- hash_cache_file: str | None = None#
The path to the hash cache.
- log_file: str | None = None#
The path to the log file.
- verbose: bool = False#
Verbose.
Progress Reporting#
When initializing the SchedulingParameters object, it is possible to provide a progress report callback. It can also be configured how frequently that callback is called. The callback receives a hierarchy of ProgressReport objects reflecting the state of the current operations WRAPP is doing.
- class wrapp.ProgressReport(
- job_name: str,
- num_tasks_executing: int,
- num_tasks_done: int,
- expected_num_sub_jobs: int | None,
Progress report for WRAPP operations (or “jobs”) running in the background.
To get a progress report for a specific WRAPP operation, make sure to start the operation with a recognizable job name, e.g. wrapp.list_repo(url, scheduler=scheduler_context.with_job_name(f”list repo for url ‘{url}’”))
Also specify a progress report callback when the SchedulerContext is set up. If you are using the ContextManager, you can specify the callback in its constructor.
Once this is done and the progress report verbosity in SchedulerParams is not “off”, the progress report callback will be called regularly with the progress report for the root job containing all other jobs to allow the application to update any progress indicators.
Each progress report represents the state of a job, including its sub-jobs. A single root job report with the job_id “WRAPP” serves as the top-level container for all other job reports.
Jobs which are siblings in the hierarchy are executed concurrently. A job is completed when all its directly spawned tasks are completed, and all its children jobs are completed. On the lowest level a job is an asyncio.task.
For a simple example how to transform the progress report into user-readable output, please check the example_complex.py.
- job_name() str#
Name of the job, used to identify jobs. The name of the main job can be set with SchedulerContext.with_job_name(…). The root job is always named “WRAPP”.
Job names might not be unique.
- num_tasks_done() int#
Number of tasks spawned by this job which are completed (this includes failed tasks).
- num_tasks_executing() int#
Number of tasks spawned by this job directly which are currently executing.
- num_tasks_processing(resource_name: str) int#
The number of tasks spawned directly by this job being executed and making use of the limited resource with the given name. Raises KeyError if the resource name is not part of the list returned by throttled_resource_names.
- num_tasks_waiting(resource_name: str) int#
The number of tasks spawned directly by this job waiting for the limited resources with the given name to become available. Raises KeyError if the resource name is not part of the list returned by throttled_resource_names.
- progress() float#
A rough measure of progress for this job between 0 and 1 if the job status is “Running”, otherwise return 1. Note that the number might go up as well as down if additional tasks are created during the execution of the jobs.
- status() ProgressReportStatus#
The status of the job. A job can be running without having any tasks or sub jobs spawned and running - then it is just the job being currently executed, blocking its parent job (unless it has been spawned within a task).
- sub_jobs() list[ProgressReport]#
All sub jobs running as part of this job.
- throttled_resource_names() Iterable[str]#
The names of all resources limiting concurrent execution of tasks spawned directly by this job. The name of a resource is given by WRAPP and user readable, examples are “nucleus_server.nvidia.com (max requests)”, “S3:my_bucket (max data transfers)”, “CPU threads”.
- total_num_tasks_done() int#
Number of tasks spawned by this job or its sub jobs which are completed (this includes failed tasks).
- total_num_tasks_executing() int#
Number of tasks spawned by this job or its sub jobs which are currently executing.
- total_num_tasks_processing() int#
The number of tasks spawned by this job or its sub jobs being executed and making use of one of the limited resources.
- total_num_tasks_processing_for_resource(
- resource_name: str,
The number of tasks spawned by this job or its sub jobs being executed and making use of the limited resource with the given name. Raises KeyError if the resource name is not part of the list returned by throttled_resource_names.
- total_num_tasks_waiting() int#
The number of tasks spawned by this job or its sub jobs waiting for one of the limited resources to become available.
- total_num_tasks_waiting_for_resource(
- resource_name: str,
The number of tasks spawned by this job or its sub jobs waiting for the limited resources with the given name to become available. Raises KeyError if the resource name is not part of the list returned by throttled_resource_names.
- total_throttled_resource_names() Iterable[str]#
The names of all resources limiting concurrent execution of tasks spawned by this job or its sub jobs
Async operations#
To avoid server overload, it is strongly recommended to make use of the WrappScheduler context manager to create a block in which API rate throttling and parallel execution is performed as specified by an instance of the SchedulingParameters object.
- class wrapp.ProgressReportVerbosity(
- value,
- names=None,
- *,
- module=None,
- qualname=None,
- type=None,
- start=1,
- boundary=None,
The amount of detail wrapps progress report prints out.
- off#
no progress is printed.
- normal#
an overview of the progress of the top-level tasks is given.
- verbose#
all scheduled jobs are listed.
- class wrapp.SchedulingParameters(
- jobs: int = 200,
- tagging_jobs: int = 50,
- file_transfer_jobs: int = 10,
- progress_report: ProgressReportVerbosity = ProgressReportVerbosity.off,
- job_name: str | None = None,
- progress_report_callback: Callable[[ProgressReport], None] | None = None,
- progress_report_interval: float | None = None,
Class storing all parameters for constructing schedulers and rate limiting for wrapp commands.
Construct this class with SchedulingParameters(<keyword params>). Hand this into the run_scheduler() method of the SchedulerContext class to construct an appropriate scheduling setup For a more detailed explanation for the parameters related to the job numbers, please refer to the section Generic Parameters.
- file_transfer_jobs: int = 10#
The number of concurrent file transfer jobs. Deprecated in favour of ThrottlingParameters.
- job_name: str | None = None#
The job name under which all tasks for the executed operation appear in the ProgressReport. Default is “WRAPP”.
- jobs: int = 200#
The number of concurrent roundtrip jobs. Deprecated in favour of ThrottlingParameters.
- progress_report_callback: Callable[[ProgressReport], None] | None = None#
Callback which can be used to monitor the progress of the operation. See ProgressReport for more details.
- progress_report_interval: float | None = None#
Interval in seconds between calls to the progress_report_callback. If 0, the callback is called as frequently as possible. Default 0.2.
- class wrapp.SchedulerContext(
- params: SchedulingParameters | None = None,
All WRAPP operations executed using the same SchedulerContext will be reported together by the progress reporting. Use this class directly if it is not convenient to use the
WrappSchedulercontext manager class.Make sure to call start_scheduler() and stop_scheduler() accordingly:
>>> import wrapp >>> async def example(root_dir: str) -> wrapp.Catalog: >>> scheduler_context = SchedulerContext(SchedulingParameters()) >>> scheduler_context.start_scheduler() >>> try: >>> catalog = await catalog(root_dir, scheduler=scheduler_context) >>> finally: >>> await scheduler_context.stop_scheduler() >>> return catalog
- get_parameters() SchedulingParameters#
- Returns:
Retrieve the SchedulingParameters which were used to construct this SchedulerContext
- start_scheduler() None#
Initialize the progress reporting. Use this in standalone usage, and make sure to call stop_scheduler in a ‘finally’ block.
- async stop_scheduler(
- *,
- exc_type=None,
- exc_val=None,
- exc_tb=None,
Stops the progress reporting. Call only after start_scheduler() had been called.
- class wrapp.ThrottlingParameters(
- jobs: int = 200,
- tagging_jobs: int = 50,
- file_transfer_jobs: int = 10,
Class storing all common parameters for rate limiting for wrapp commands.
Construct this class with ThrottlingParameters(<keyword params>). For a more detailed explanation for the parameters related to the job numbers, please refer to the section Generic Parameters.
- class wrapp.WrappScheduler(
- *,
- context: SchedulerContext | None = None,
- params: SchedulingParameters | None = None,
ContextManager for wrapp to schedule its tasks. This will help throttling the number of tasks send simultaneously to the server, avoiding server overload and back-off responses.
Example
>>> import asyncio, wrapp >>> async def example(test_folder_1: str, test_folder_2: str): >>> tasks = [] >>> async with WrappScheduler(params=SchedulingParameters(jobs=10)) as scheduler_context: >>> tasks.append(asyncio.create_task(wrapp.catalog(test_folder_1, scheduler=scheduler_context))) >>> tasks.append(asyncio.create_task(wrapp.catalog(test_folder_2, scheduler=scheduler_context))) >>> results = await asyncio.gather(*tasks)
Initializing wrapp#
Simple: ContextManager#
The easiest way to initialize WRAPP in a stand-alone Python script is the ContextManager.
- class wrapp.ContextManager(
- auth_infos: list[StorageAuthInfo] | None = None,
- interactive_login_allowed=False,
- standalone_mode=True,
- scheduler_context: SchedulerContext | SchedulingParameters | None = None,
- throttling_params: ThrottlingParameters | None = None,
Helper class to make it easy to initialize WRAPP. If you don’t know what to do to initialize WRAPP, use this class to get started.
See Setup and tear down for example code.
Initialize the context manager setting up the desired mode of operation
- Parameters:
auth_infos – Optional list of AuthInfo instances giving credentials for the servers to be involved
interactive_login_allowed – If False [default], no interactive browser window will pop up, and all authentication information needs to be supplied via the AuthInfo data
standalone_mode – If True [default], call wrapp.shutdown() on exit. No further wrapp calls are possible beyond this
scheduler_context – Assigned to the command_context field, which can be used in calls to the API for command throttling Alternatively, a “SchedulingParameters” object can be passed in that parameter, which will be used to construct a “SchedulerContext” object.
throttling_params – The maximum number of concurrent tasks for several categories.
- add_auth(
- auth_info: StorageAuthInfo,
Add another StorageAuthInfo credential to the AuthManager used
- Parameters:
auth_info
Complex: individual calls#
If you need to embed wrapp in a larger program, using the wrapp.ContextManager might be difficult due to your program structure. In those cases it is possible to call the individual initialize and shutdown methods separately:
# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
import asyncio
import logging
import os
import sys
import omni.client
from rich.live import Live
from rich.logging import RichHandler
from rich.table import (
Column,
Table,
)
import wrapp.api
from wrapp import (
CommandParameters,
ProgressReport,
ProgressReportVerbosity,
SchedulerContext,
SchedulingParameters,
ThrottlingParameters,
)
from wrapp.utils.progress_report import ProgressReportStatus
from .create_new_version import create_new_version
# Setup rich live display
def setup_live_display() -> Live:
# Lazy creation to make the redirect of logging work
_live_display = Live()
rich_handler = RichHandler()
rich_handler.setLevel(logging.getLogger().level)
formatter = logging.Formatter("%(message)s")
rich_handler.setFormatter(formatter)
logging.getLogger().handlers = [
rich_handler,
*filter(lambda x: isinstance(x, logging.FileHandler), logging.getLogger().handlers),
]
_live_display.start()
return _live_display
_live_display = setup_live_display()
def print_progress(progress_report: ProgressReport):
"""
Example function handling the progress report output.
"""
progress_table = Table(
"Progress ",
Column("Name", overflow="fold", ratio=1),
"Discovered",
"Running ",
"Done ",
expand=True,
)
def add_rows(job: ProgressReport, indent: int = 0):
if job.status() == ProgressReportStatus.Running:
progress_str = f"{(job.progress()) * 100:.0f}%"
else:
progress_str = job.status().value
progress_table.add_row(
progress_str,
(" " * indent) + job.job_name(),
str(job.total_num_tasks_executing() + job.total_num_tasks_done()),
str(job.total_num_tasks_executing() - job.total_num_tasks_waiting()),
str(job.total_num_tasks_done()),
)
if indent < 3:
for sub_job in job.sub_jobs():
add_rows(sub_job, indent + 1)
# Skip the main root node called "WRAPP"
for j in progress_report.sub_jobs():
add_rows(j)
_live_display.update(progress_table, refresh=True)
def main():
"""
Example main function that shows how to use the WRAPP commands in a larger application using the initialize() and shutdown() pattern
"""
# First, initialize the wrapp API which will also initialize the omni.client library
wrapp.api.initialize(throttling_params=ThrottlingParameters(file_transfer_jobs=1))
try:
# Setup authentication for subsequent calls
auth = wrapp.AuthManager(interactive_fallback=False)
auth.initialize()
try:
# Read credentials from environment variables, supplying test defaults
auth_info = wrapp.AuthInfo(
server=os.getenv("HOST_URL", "omniverse://localhost"),
username=os.getenv("OMNI_USER", "omniverse"),
password_or_token=os.getenv("OMNI_PASS", "omniverse"),
)
auth.add_auth(auth_info)
# To make sure we're not using cached credentials, force a sign-out and reconnect to the server
omni.client.sign_out(auth_info.server)
omni.client.reconnect(auth_info.server)
# Define common parameters for all subsequent commands
command_params = CommandParameters(verbose=True)
async def async_main():
scheduler_context = SchedulerContext(
SchedulingParameters(progress_report_callback=print_progress, progress_report=ProgressReportVerbosity.normal)
)
scheduler_context.start_scheduler()
try:
tasks = []
tasks.append(
asyncio.create_task(
create_new_version(
root_url=f"{auth_info.server}/Projects",
package_name="all_projects",
repository=f"{auth_info.server}",
scheduler=scheduler_context.create_sub_context_and_node("Creating sample project"),
context=command_params,
)
)
)
results = await asyncio.gather(*tasks)
finally:
await scheduler_context.stop_scheduler()
asyncio.run(async_main())
finally:
# Deregister the AuthManager
auth.shutdown()
except wrapp.FailedCommand as e:
print(f"Could not create package, error message by wrapp is: '{e}'")
finally:
# Shutdown the WRAPP API
wrapp.api.shutdown(*sys.exc_info())
if __name__ == "__main__":
main()
The example function called takes the wrapp.SchedulerContext and hands it to all wrapp API functions called:
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
from typing import (
List,
Tuple,
)
from urllib.parse import urlparse
import semver
import wrapp
from wrapp import (
CommandParameters,
)
def calculate_next_version(versions: List[str], version_part: str) -> Tuple[str, str]:
"""
Given a list of strings with version numbers and the version part to increment (major, minor, patch), returns a pair of latest version and the next version
"""
sem_versions = [semver.Version.parse(v) for v in versions]
latest_version = sorted(sem_versions)[-1]
return str(latest_version), str(latest_version.next_version(version_part))
async def create_new_version(
root_url: str, package_name: str, repository: str, scheduler: wrapp.SchedulerContext, context: CommandParameters()
) -> None:
"""
Example function that creates a package in omniverse://localhost/.packages, calculating the next minor semver version from listing the existing packages
in that repository. If no package is found, it starts with 1.0.0. This uses the semver package to parse version numbers and calculate the next minor version.
:param root_url: URL of source package
:param package_name: name to give to the package.
:param repository: URL of the repository
:param scheduler: pre-constructed SchedulerContext object determining the amount of jobs to run
:param context: command context to use
"""
# Calculate the next version number
packages_in_repo = await wrapp.list_repo(repository, scheduler=scheduler)
next_version = None
for package, versions in packages_in_repo:
parsed_url = urlparse(package)
path_parts = parsed_url.path.split("/")
if path_parts[-1] == package_name:
if len(versions) > 0:
latest_version, next_version = calculate_next_version(versions, "minor")
print(f"Latest version of package {package_name} is {latest_version}, using {next_version}")
break
if next_version is None:
print(f"No prior version found for {package_name}, starting at 1.0.0")
next_version = "1.0.0"
print(f"Creating package {package_name} version {next_version}")
await wrapp.create(package_name, next_version, source=root_url, catalog=False, repo=repository, context=context, scheduler=scheduler)
print("Done")