WRAPP API Reference
API functions
Setup and tear down
Here is an example of a main function that sets everything up for running one or more wrapp commands using the supplied class wrapp.api.ContextManager. That class takes full responsibility for setup and tear down and calls initialize and shutdown appropriately:
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
import asyncio
import os
import random
import wrapp
async def async_main():
"""
Example main that shows how to use the WRAPP commands within a wrapp.ContextManager block
"""
auth_info = wrapp.AuthInfo(
server=os.getenv("HOST_URL", "omniverse://localhost"),
username=os.getenv("OMNI_USER", "omniverse"),
password_or_token=os.getenv("OMNI_PASS", "omniverse"),
)
random_version = str(random.randint(1, 1000))
async with wrapp.ContextManager([auth_info]) as wrapp_context:
await wrapp.create(
"test_project",
random_version,
source=f"{auth_info.server}/Projects",
catalog=False,
scheduler=wrapp_context,
)
print(f"Created test_project version {random_version} in {auth_info.server}")
asyncio.run(async_main())
You should use either the ContextManager class as above to manage the setup and tear down of wrapp, or use manual initialize and shutdown methods. The latter is only required when embedding wrapp in a larger existing program. See the examples in the Initializing wrapp in a larger program section for working example code.
- wrapp.api.initialize(debug_level: bool = False) None
Initialize the wrapp API. This is safe to be called multiple times.
Raises RuntimeError if the wrapp API is unable to initialize.
- wrapp.api.shutdown() None
Shutdown of wrapp API. This should be called before exiting the program. No further wrapp commands should be issued after calling this function. As a side effect, this also calls omni.client.shutdown(), so no further omni.client calls can be issued by the process as well.
Main functions
- async wrapp.commands.create(package_name: str, version: str, source: str, catalog: bool, tags: bool = False, ignore_file: Optional[str] = None, repo: Optional[str] = None, *, context: CommandParameters = CommandParameters(debug=False, verbose=False, dry_run=False, log_file=None, hash_cache_file=None), scheduler: Optional[SchedulerContext] = None) None
Takes package name and version plus either a catalog or a root directory, creates a new package in the repository.
The create command catalogs a subtree and creates a frozen version. The destination directory for the freeze operation always is a package cache directory, which by default is in /.packages on the same network location as the source data. Change the location of this by specifying the repo option.
- Parameters
package_name – Name of the package to create
version – Version of the package to create, can be any string identifier
source – Root directory of source that will be copied
catalog – Flag to indicate source points to a catalog file instead of a root directory
tags – Set this to additionally query the tagging service and make tags part of the catalog
ignore_file – Specify the name of an ignore file containing rules for ignoring items
repo – Specify the URL of the repository where to create the package. If not specified, will use the root of the server of the source directory
context – Global configuration parameters
scheduler – Optionally pre-constructed SchedulerContext. When calling many functions in a row make sure to pre-construct the scheduler.
- Raises
FailedCommand – When prerequisites not matched
NucleusOperationError – Raised when network or file operations fail
- async wrapp.commands.install(package_name: str, version: str, destination: str, tags: bool, package: Optional[str] = None, patch: bool = False, conflicts: Optional[List[Tuple[str, PatchOpBase]]] = None, repo: Optional[str] = None, ignore_version_conflicts: bool = False, ignore_file: Optional[str] = None, *, context: CommandParameters = CommandParameters(debug=False, verbose=False, dry_run=False, log_file=None, hash_cache_file=None), scheduler: Optional[SchedulerContext] = None) None
Given a package name and version, installs or updates the package’s file set at the given destination.
Installs or updates a package described by package name and version string from the package cache. This looks in the package cache /.packages on the same server as the destination, and copies the package version found into the destination directory. If a package file is specified, the new dependency is recorded in there. If the repo is specified, the operations looks in that package cache instead of the default cache.
- Parameters
package_name – Name of the package to install
version – Version to install
destination – URL to install the package to. Can be an omniverse://, a URL supported by S3 or a file: URL.
tags – Whether to also create the tags in the target. Only available when destination is a Nucleus server with Tagging Service
package – Optionally specify the URL of a package file to record the dependency created
patch – Specify False to fail installation on conflict, or True to record conflicts in the conflicts parameter and keep going
conflicts – Supply list that gets extended by patch operations when patch is set and install conflicts are detected
repo – Optionally specify a repository URL. If not given, assume it is the root of the destination server (same server install).
ignore_version_conflicts – Set this to ignore version inconsistencies should versions create a diamond pattern conflict of nested packages
ignore_file – Specify the name of a file with ignore rules used when cataloging the existing destination.
context – Global configuration parameters
scheduler – Optionally pre-constructed SchedulerContext. When calling many functions in a row make sure to pre-construct the scheduler.
- Raises
FailedCommand – When prerequisites not matched
NucleusOperationError – Raised when network or file operations fail
- async wrapp.commands.catalog(root_dir: Optional[str] = None, file_list: Optional[Union[str, List[Tuple[str, str]]]] = None, tags: bool = False, ignores: Optional[Union[str, IgnoreEvaluator]] = None, local_hash: bool = False, checkpoints: bool = True, *, context: CommandParameters = CommandParameters(debug=False, verbose=False, dry_run=False, log_file=None, hash_cache_file=None), scheduler: Optional[SchedulerContext] = None) Catalog
Takes a root URL or a file list to build a catalog via either a file tree walk or a file list tsv file, returning a Catalog
- Parameters
root_dir – URL of the root of the files to be cataloged
file_list – Alternative to root_dir, specify the name of a tab separated file containing a list of files to include, or a list explicitly listing all files as tuples base path, relative path.
tags – Set this to additionally query the tagging service and make tags part of the catalog
ignores – Specify the name of an ignore file containing rules for ignoring items
local_hash – Flag to allow the catalog operation to calculate the hash locally, potentially after a download of the data
checkpoints – Flag, default on, to list checkpoints of local items and add the latest checkpoint into the source URL if the source URL supports checkpoints
context – Global configuration parameters
scheduler – Optionally pre-constructed SchedulerContext. When calling many functions in a row make sure to pre-construct the scheduler.
- Returns
Catalog created
- Raises
FailedCommand – When prerequisites not matched
NucleusOperationError – Raised when network or file operations fail
- async wrapp.commands.new(package_name: str, version: str, destination: str, create_catalog: bool, tags: bool = False, ignores: Optional[Union[str, IgnoreEvaluator]] = None, local_hash: bool = False, *, context: CommandParameters = CommandParameters(debug=False, verbose=False, dry_run=False, log_file=None, hash_cache_file=None), scheduler: Optional[Scheduling] = None) None
Takes package name and version, and creates a new .<package>.wrapp file
This creates a new .<package>.wrapp package file that can be used as a target for the install-command to record dependencies. Name and version are just strings, destination needs to be a URL pointing to a folder.
- Parameters
package_name – Name of the package
version – Initial version. Can be overridden by the first create command for this package
destination – URL of the package. In this directory the new .wrapp file will be created.
create_catalog – Specify whether to catalog the location and put the catalog into the wrapp file created.
tags – Specify this to also catalog the tags when cataloging the destination
ignores – Specify a name of an ignore rule file to use during cataloging
local_hash – Set this to allow calculating local hashes during cataloging. Might download data when creating in a remote server.
context – Global configuration parameters
scheduler – Optionally pre-constructed SchedulerContext. When calling many functions in a row make sure to pre-construct the scheduler.
- Raises
FailedCommand – When prerequisites not matched
NucleusOperationError – Raised when network or file operations fail
- async wrapp.commands.uninstall(package_name: str, install_location: str, package: Optional[str], tags: bool = False, ignore_file: Optional[str] = None, ignore_tags: bool = False, force: bool = False, *, context: CommandParameters = CommandParameters(debug=False, verbose=False, dry_run=False, log_file=None, hash_cache_file=None), scheduler: Optional[SchedulerContext] = None) None
Given a package name and either an install_location or the URL of a wrapp file, uninstall that package after checking it has no local modifications that could be lost
Any package that has been installed can be uninstalled again. There are two modes of uninstallation: Via the directory in which the package has been installed, or via pointing to the dependency file which had been used to record the install operation. Then uninstall will also remove the dependency information recorded in that file.
- Parameters
package_name – The name of the package to uninstall
install_location – If install_location is specified as the URL of the package to uninstall, the package will be removed from that install location
package – If no install_location is given, specify the URL of a package file with a dependency to the package to uninstall
tags – If set, check before uninstalling if any tags have been set and are different from the install source. Refuse to uninstall in case of modification in tags.
ignore_file – Specify the name of an ignore file containing rules for ignoring items in the installed package
ignore_tags – If specified, don’t compare the package catalog and the installed location’s tags before allowing to uninstall.
force – Specify this to force uninstall - warning, this might potentially lose data!
context – Global configuration parameters
scheduler – Optionally pre-constructed SchedulerContext. When calling many functions in a row make sure to pre-construct the scheduler.
- Raises
FailedCommand – When prerequisites not matched
NucleusOperationError – Raised when network or file operations fail
Diff and patch
- async wrapp.commands.diff(catalog_a: str, catalog_b: str, ignore_tags: bool) Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Tuple[Dict[str, Any], Dict[str, Any]]]]
Take two catalog files produced by the catalog command, and return files unique to a, unique to b, and different in three lists
- Parameters
catalog_a – File name of catalog A
catalog_b – File name of catalog B
ignore_tags – Specify this to ignore tags
- Returns
Tuple of three lists (unique_to_a, unique_to_b, differing_items)
- async wrapp.commands.create_patch(a_catalog: Catalog, b_catalog: Catalog, baseline: Catalog, patch: Optional[str], tags: bool = False, overwrite: bool = False, ignore: bool = False, force: bool = False, context: CommandParameters = CommandParameters(debug=False, verbose=False, dry_run=False, log_file=None, hash_cache_file=None)) Tuple[List[Tuple[str, PatchOpBase]], List[Tuple[str, PatchOpBase]]]
Takes three catalogs, and calculates the patch operation to turn A into B. Returns list of operation and list of conflicts if any.
Create a patch file specifying a list of operations that when applied will convert A to B, where the base catalog specifies the last common ancestor catalog. This will check if A is unmodified, else it will fail. If you want to keep changes in A but copy over all non-conflicting changes, specify the –ignore parameter. In case you want record all operations that will roll back A and make A identical to B, specify the –force parameter. Specify –show to get a detailed list of changes planned.
- Parameters
a_catalog – The target catalog
b_catalog – The source catalog
baseline – The last common catalog before A and B diverged. For simple merge just specify B again.
patch – Optionally specify a file name to have the patch written out to a file for later use by apply_patch
tags – Flag to respect tags stored in catalogs. If set, differences in tags can cause conflicts and update operations.
overwrite – Specify to allow to overwrite the patch file if it already exists
ignore – Specify whether to ignore conflicting changes in A, e.g. files modified in both sets, or extra files in A. Don’t touch conflicting files, leave them as is.
force – Specify whether to rollback changes, guarantee that after apply patch A becomes B.
context – Global configuration parameters
- Returns
(patch_operation_list, conflict_list) tuple both being lists of PatchOp instances
- Raises
FailedCommand – When prerequisites not matched
NucleusOperationError – Raised when network or file operations fail
- async wrapp.commands.apply_patch(patch: str, *, context: CommandParameters = CommandParameters(debug=False, verbose=False, dry_run=False, log_file=None, hash_cache_file=None), scheduler: Optional[SchedulerContext] = None) None
Taking a patch file as input, apply the changes previously calculated by the create-patch command.
- Parameters
patch – Specify filename of patch file (local file path)
context – Global configuration parameters
scheduler – Optionally pre-constructed SchedulerContext. When calling many functions in a row make sure to pre-construct the scheduler.
- Raises
FailedCommand – in case the patch file specified cannot be found
NucleusOperationError – in case of failed patch operations
Export and Import
- async wrapp.commands.export_package(catalog: Optional[str], repo: Optional[str], package_name: str, version: str, output: str, overwrite: bool = False, dedup=False, *, context: CommandParameters = CommandParameters(debug=False, verbose=False, dry_run=False, log_file=None, hash_cache_file=None), scheduler: Optional[SchedulerContext] = None) None
Take package information and repository location and create an uncompressed tar file of a package from the given package cache
- Parameters
catalog – Name of the catalog file that describes the items to export
repo – URL of the repository containing the package to be exported
output – Name of tar file to create
overwrite – Flag to allow overwriting the tar file in case it already exists
dedup – Specify this to deduplicate content on export, i.e. don’t store files by their name in the tar but by their hash
context – Global configuration parameters
scheduler – Optionally pre-constructed SchedulerContext. When calling many functions in a row make sure to pre-construct the scheduler.
- Raises
FailedCommand – When prerequisites not matched
NucleusOperationError – Raised when network or file operations fail
- async wrapp.commands.import_package(package_file: str, repository: str, overwrite: bool = False, *, context: CommandParameters = CommandParameters(debug=False, verbose=False, dry_run=False, log_file=None, hash_cache_file=None), scheduler: Optional[SchedulerContext] = None) None
Given a previously exported tar file and a repository location, import the package and make it available for install commands using that repository
- Parameters
package_file – Name of tar file to import. Note the naming convention is that it is either <package_name>.tar or <package_name>.<version>.tar
repository – URL of the repository to write the package to (excluding the .packages directory)
overwrite – Set this to allow overwriting an existing package. Note this might leave extra files in the repository.
context – Global configuration parameters
scheduler – Optionally pre-constructed SchedulerContext. When calling many functions in a row make sure to pre-construct the scheduler.
- Raises
FailedCommand – When prerequisites not matched
NucleusOperationError – Raised when network or file operations fail
- async wrapp.commands.mirror(package_name: str, version: str, source_repo: str, destination_repo: str, tags: bool = False, resume: bool = False, template_version: Optional[str] = None, context: CommandParameters = CommandParameters(debug=False, verbose=False, dry_run=False, log_file=None, hash_cache_file=None), scheduler: Optional[SchedulerContext] = None) None
Take package name and version, and copy it from the source repository to the destination repository.
- Parameters
package_name – Name of the package to copy
version – Version of the package to copy
source_repo – Specify the URL of the source repository, excluding the .packages directory
destination_repo – Specify the URL of the target repository, again excluding the .packages directory name
tags – Set this to create the tags in the target repository.
resume – Set this flag to make a differential copy by first cataloging the target repository package directory and then doing a differential copy
template_version – Optionally specify a directory to copy into the place first. This can accelerate creation if the template is on the same server and can be reused.
context – Global configuration parameters
scheduler – Optionally pre-constructed SchedulerContext. When calling many functions in a row make sure to pre-construct the scheduler.
- Raises
FailedCommand – When prerequisites not matched
NucleusOperationError – Raised when network or file operations fail
Freeze functions
These functions form the core of the power of wrapp and are used by all the other methods. We use the term freeze instead of copy because the freeze functions only copy the current head version into a new location, to make that version available. A copy could be expected to copy the version history and checkpoints as well, which is not what freeze does.
- async wrapp.commands.freeze(source: str, dest_dir: str, catalog: bool = False, overwrite: bool = False, tags: bool = False, ignore_file: Optional[str] = None, ignore_packages: bool = False, force_upload: bool = False, context: CommandParameters = CommandParameters(debug=False, verbose=False, dry_run=False, log_file=None, hash_cache_file=None), *, scheduler: Optional[SchedulerContext] = None) None
Taking a source and a destination, freeze a version of a package for later publishing or preparing a reproducible build
- Parameters
source – URL of the root of the files to freeze
dest_dir – URL of the destination
catalog – When set, the source parameter points a file with a catalog previously produced by catalog, else source is a source folder URL
overwrite – Specify to allow overwriting files in the destination
tags – Specify whether to freeze tags via the Tagging Service as well
ignore_file – Specify the name of an ignore file containing rules for ignoring items
ignore_packages – When specified, first find all package files in the source and load their file lists to exclude them from the freeze operation
force_upload – Specify this to disable the create_with_hash optimization that can accelerate uploads when files might already be on the target
context – Global configuration parameters
scheduler – Optionally pre-constructed SchedulerContext. When calling many functions in a row make sure to pre-construct the scheduler.
- Raises
FailedCommand – When prerequisites not matched
NucleusOperationError – Raised when network or file operations fail
- async wrapp.commands.freeze_file_tree(source: str, dest_dir: str, ignore_file: Optional[str] = None, overwrite: bool = False, copy_tags: bool = False, skip_files: Optional[Set[str]] = None, force_upload: bool = False, *, context: CommandParameters = CommandParameters(debug=False, verbose=False, dry_run=False, log_file=None, hash_cache_file=None), scheduler: Optional[SchedulerContext] = None) None
Taking a source and a destination, freeze a version of a package for later publishing or preparing a reproducible build
- Parameters
ignore_file – Specify the name of a file containing ignore rules to load and respect
source – URL of the root of the files to freeze
dest_dir – URL of the destination
overwrite – Specify to allow overwriting files in the destination
copy_tags – Specify whether to freeze tags via the Tagging Service as well
skip_files – Specify a list of files with their relative paths within the source directory that should not be part of the destination
force_upload – Specify this to disable the create_with_hash optimization that can accelerate uploads when files might already be on the target
context – Global configuration parameters
scheduler – Optionally pre-constructed SchedulerContext. When calling many functions in a row make sure to pre-construct the scheduler.
- Raises
FailedCommand – When prerequisites not matched
NucleusOperationError – Raised when network or file operations fail
- async wrapp.commands.freeze_catalog(source_catalog: str, catalog_root: str, catalog_items: List[CatalogItem], dest_dir: str, ignore_file: Optional[str] = None, overwrite: bool = False, copy_tags: bool = False, ignore_existing_folders: bool = False, force_upload: bool = False, *, context: CommandParameters = CommandParameters(debug=False, verbose=False, dry_run=False, log_file=None, hash_cache_file=None), scheduler: Optional[SchedulerContext] = None) None
Taking a source and a destination, freeze a version of a package for later publishing or preparing a reproducible build
- Parameters
source_catalog – File URL of the catalog file
catalog_root – The root URL for all
catalog_items – List of item objects that should become part of the frozen version
dest_dir – URL of the destination
ignore_file – Specify the name of an ignore file containing rules for ignoring items
overwrite – Specify to allow overwriting files in the destination
copy_tags – Specify whether to freeze tags via the Tagging Service as well
ignore_existing_folders – Specify this to ignore if folders in the destination already exist
force_upload – Specify this to disable the create_with_hash optimization that can accelerate uploads when files might already be on the target
context – Global configuration parameters
scheduler – Optionally pre-constructed SchedulerContext. When calling many functions in a row make sure to pre-construct the scheduler.
- Raises
FailedCommand – When prerequisites not matched
NucleusOperationError – Raised when network or file operations fail
Utility functions
- async wrapp.commands.get(url: str, filename: Optional[str], overwrite: bool = False, context: CommandParameters = CommandParameters(debug=False, verbose=False, dry_run=False, log_file=None, hash_cache_file=None), *, scheduler: Optional[SchedulerContext] = None) None
Download a single file from the given URL to the specified local filename
- Parameters
url – URL to download
filename – Optional alternate file name to download to. If not given the file will have the same name as in the source URL
overwrite – Specify to allow overwriting the target if it already exists
context – Global configuration parameters
scheduler – Optionally pre-constructed SchedulerContext. When calling many functions in a row make sure to pre-construct the scheduler.
- Raises
FailedCommand – When prerequisites not matched
NucleusOperationError – Raised when network or file operations fail
- async wrapp.commands.list_repo(repository: str, *, scheduler: Optional[SchedulerContext] = None) List[Tuple[str, List[str]]]
Takes a repository URL and returns a list all packages and all versions that are stored there
- Parameters
repository – URL of a repository, excluding the .packages directory name
scheduler – Optionally pre-constructed SchedulerContext. When calling many functions in a row make sure to pre-construct the scheduler.
- Returns
List of tuples consisting of a package name and a list of version strings that package is available in
- Raises
FailedCommand – When prerequisites not matched
NucleusOperationError – Raised when network or file operations fail
Authentication and Credentials
Authentication is controlled by the class AuthManager, which has two static methods to control the number of available credentials.
- class wrapp.AuthInfo(*, server: str, username: str, password_or_token: str)
Data object to store credentials for a given base URL. For example,
>>> AuthInfo(server='omniverse://localhost',username='omniverse',password_or_token='omniverse')
will generate credentials for a workstation Nucleus with default password. To use a user generated API token (e.g. for single sign-on), use the special username $omni-api-token
- class wrapp.AuthManager(auth_infos: Optional[List[AuthInfo]] = None, interactive_fallback: bool = True)
Use this class to register credential information for client-library to be used by subsequent API calls. This class has no effect for S3 access via boto3 - for authentication with boto3, please use the standard boto3 environment variables or configuration files as described in the manual. Note that these can only be used within the lifetime of an auth registration obtained via setup_authentication().
Example code for setting up credentials and using them:
>>> def download_file_from_staging(url, api_token): >>> with AuthManager([AuthInfo(server="omniverse://staging.nvidia.com",username="$omni-api-token",password_or_token=api_token)], interactive_fallback=False) as auth: >>> result, version, content = omni.client.read_file(url) >>> if result == omni.client.Result.OK: >>> return content >>> else: >>> raise Exception(f"Download error: {result} for {url}")
Note the use of the special username to designate an API token created for that server.
If no AuthManager is used, default client library behavior is enabled which is first checking for environment variables, and then falling back to an interactive login flow compatible with the Nucleus server version it’s trying to connect to. This might open a browser window, so don’t run this in non-interactive shells.
Data Objects
These simple data objects are used by the API to carry more complex data. We use pydantic to make it easier to write code with structs in Python.
- class wrapp.datastructures.catalog.Catalog(*, root: str, format_version: str = '1', items: list[wrapp.datastructures.catalog.CatalogItem], storage_map: Optional[StorageMap] = None)
- class wrapp.datastructures.catalog.CatalogItem(*, source_path: str, relative_path: str, type: CatalogItemType, hash: Optional[str] = None, size: Optional[int] = None, tags: Optional[List[Tag]] = None)
- class wrapp.datastructures.catalog.CatalogItemType(value)
Designated the type of the item that can be stored in a catalog
- class wrapp.datastructures.catalog.StorageMap(*, storage_map: Dict[str, str])
- class wrapp.datastructures.catalog.Tag(*, name: str, ns: str, value: str)
Represents a tag as defined by the Nucleus Tagging Service
- class wrapp.datastructures.package.Dependency(*, format_version: str = '2', package: str, version: str, relative_destination: str, repository: str)
- class wrapp.datastructures.package.PackageInfo(*, format_version: str = '2', name: str = '', version: str = '', catalog: Optional[Catalog] = None, repository: Optional[str] = None, dependencies: Optional[List[Dependency]] = None)
Patch Operations
- class wrapp.datastructures.patch.PatchOperationType(value)
An enumeration.
- class wrapp.datastructures.patch.PatchOpBase(*, operation_type: PatchOperationType, explanation: Optional[str] = None)
- class wrapp.datastructures.patch.PatchOpCopy(*, operation_type: PatchOperationType = PatchOperationType.copy, explanation: Optional[str] = None, src: str, dst: str, hash: Optional[str] = None, item_type: ItemType)
- class wrapp.datastructures.patch.PatchOpDelete(*, operation_type: PatchOperationType = PatchOperationType.delete, explanation: Optional[str] = None, url: str)
- class wrapp.datastructures.patch.PatchOpCreateFolder(*, operation_type: PatchOperationType = PatchOperationType.create_folder, explanation: Optional[str] = None, url: str)
- class wrapp.datastructures.patch.PatchOpAddTag(*, operation_type: PatchOperationType = PatchOperationType.add_tag, explanation: Optional[str] = None, url: str, tag: str)
- class wrapp.datastructures.patch.PatchOpDeleteTag(*, operation_type: PatchOperationType = PatchOperationType.delete_tag, explanation: Optional[str] = None, url: str, tag: str)
Common Configuration
All Python API functions can be called without explicitly specifying the context parameter. However, if any of the standard command-line configuration options need to be specified, it is possible to provide them with a CommandParameters object.
- class wrapp.CommandParameters(debug: bool = False, verbose: bool = False, dry_run: bool = False, log_file: Optional[str] = None, hash_cache_file: Optional[str] = None)
Class storing all common parameters for wrapp commands.
Construct this class with CommandParameters(<keyword params>) or CommandParameters.from_params() before calling wrapp API functions.
Async operations
To avoid server overload, it is strongly recommended to make use of the WrappScheduler context manager to create a block in which API rate throttling and parallel execution is performed as specified by an instance of the SchedulingParameters object.
- class wrapp.ProgressReportVerbosity(value)
The amount of detail wrapps progress report prints out.
- off
no progress is printed.
- normal
an overview of the progress of the top-level tasks is given.
- verbose
all scheduled jobs are listed.
- class wrapp.SchedulingParameters(jobs: int = 100, tagging_jobs: int = 50, file_transfer_jobs: int = 10, progress_report: ProgressReportVerbosity = ProgressReportVerbosity.off, num_times_to_ignore_ctrl_c_in_scheduler_shutdown: int = 2)
Class storing all common parameters for constructing schedulers and rate limiting for wrapp commands.
Construct this class with SchedulingParameters(<keyword params>) or SchedulingParameters.from_params() before calling wrapp API functions. Hand this into the run_scheduler() method of the SchedulerContext class to construct an appropriate scheduling setup
- class wrapp.SchedulerContext(params: ~wrapp.datastructures.command_context.SchedulingParameters = SchedulingParameters(jobs=100, tagging_jobs=50, file_transfer_jobs=10, progress_report=<ProgressReportVerbosity.off: 'off'>, num_times_to_ignore_ctrl_c_in_scheduler_shutdown=2))
Class managing the lifetime of a Scheduling object. Use this directly if it is not convenient to use the WrappScheduler context manager class.
Make sure to call start_scheduler() and stop_scheduler() accordingly:
>>> import wrapp >>> async def example(root_dir: str) -> wrapp.Catalog: >>> scheduler_context = SchedulerContext(SchedulingParameters(jobs=1)) >>> scheduler_context.start_scheduler() >>> try: >>> catalog = await catalog(root_dir, scheduler=scheduler_context) >>> finally: >>> await scheduler_context.stop_scheduler() >>> return catalog
- class wrapp.WrappScheduler(*, context: Optional[SchedulerContext] = None, params: Optional[SchedulingParameters] = None)
ContextManager for wrapp to schedule its tasks. This will help throttling the number of tasks send simultaneously to the server, avoiding server overload and back-off responses.
Example
>>> import asyncio, wrapp >>> async def example(test_folder_1: str, test_folder_2: str): >>> tasks = [] >>> async with WrappScheduler(params=SchedulingParameters(jobs=10)) as scheduler_context: >>> tasks.append(asyncio.create_task(wrapp.catalog(test_folder_1, scheduler=scheduler_context))) >>> tasks.append(asyncio.create_task(wrapp.catalog(test_folder_2, scheduler=scheduler_context))) >>> results = await asyncio.gather(*tasks)
- class wrapp.api.ContextManager(auth_infos: ~typing.Optional[~typing.List[~wrapp.datastructures.authentication.AuthInfo]] = None, interactive_login_allowed=False, standalone_mode=True, scheduler_context=<wrapp.datastructures.command_context.SchedulerContext object>)
Initializing wrapp in a larger program
If you need to embed wrapp in a larger program, using the wrapp.ContextManager might be difficult due to your program structure. In those cases it is possible to call the individual initialize and shutdown methods separately:
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
import asyncio
import os
import omni.client
import wrapp.api
from wrapp import (
CommandParameters,
SchedulerContext,
SchedulingParameters,
)
from .create_new_version import create_new_version
"""
Example main function that shows how to use the WRAPP commands in a larger application using the initialize() and shutdown() pattern
"""
if __name__ == "__main__":
# First, initialize the wrapp API which will also initialize the omni.client library
wrapp.api.initialize()
try:
# Setup authentication for subsequent calls
auth = wrapp.AuthManager(interactive_fallback=False)
auth.initialize()
try:
# Read credentials from environment variables, supplying test defaults
auth_info = wrapp.AuthInfo(
server=os.getenv("HOST_URL", "omniverse://localhost"),
username=os.getenv("OMNI_USER", "omniverse"),
password_or_token=os.getenv("OMNI_PASS", "omniverse"),
)
auth.add_auth(auth_info)
# To make sure we're not using cached credentials, force a sign-out and reconnect to the server
omni.client.sign_out(auth_info.server)
omni.client.reconnect(auth_info.server)
# Define common parameters for all subsequent commands
command_params = CommandParameters(verbose=True)
async def async_main():
scheduler_context = SchedulerContext(SchedulingParameters(file_transfer_jobs=1))
scheduler_context.start_scheduler()
try:
tasks = []
tasks.append(
asyncio.create_task(
create_new_version(
root_url=f"{auth_info.server}/Projects",
package_name="all_projects",
repository=f"{auth_info.server}",
scheduler=scheduler_context,
context=command_params,
)
)
)
results = await asyncio.gather(*tasks)
finally:
await scheduler_context.stop_scheduler()
asyncio.run(async_main())
finally:
# Deregister the AuthManager
auth.shutdown()
except wrapp.FailedCommand as e:
print(f"Could not create package, error message by wrapp is: '{e}'")
finally:
# Shutdown the WRAPP API
wrapp.api.shutdown()
The example function called takes the wrapp.SchedulerContext and hands it to all wrapp API functions called:
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
from typing import (
List,
Tuple,
)
from urllib.parse import urlparse
import semver
import wrapp
from wrapp import (
CommandParameters,
)
def calculate_next_version(versions: List[str], version_part: str) -> Tuple[str, str]:
"""
Given a list of strings with version numbers and the version part to increment (major, minor, patch), returns a pair of latest version and the next version
"""
sem_versions = [semver.Version.parse(v) for v in versions]
latest_version = sorted(sem_versions)[-1]
return str(latest_version), str(latest_version.next_version(version_part))
async def create_new_version(
root_url: str, package_name: str, repository: str, scheduler: wrapp.SchedulerContext, context: CommandParameters()
) -> None:
"""
Example function that creates a package in omniverse://localhost/.packages, calculating the next minor semver version from listing the existing packages
in that repository. If no package is found, it starts with 1.0.0. This uses the semver package to parse version numbers and calculate the next minor version.
:param root_url: URL of source package
:param package_name: name to give to the package.
:param repository: URL of the repository
:param scheduler: pre-constructed SchedulerContext object determining the amount of jobs to run
:param context: command context to use
"""
# Calculate the next version number
packages_in_repo = await wrapp.list_repo(repository, scheduler=scheduler)
next_version = None
for package, versions in packages_in_repo:
parsed_url = urlparse(package)
path_parts = parsed_url.path.split("/")
if path_parts[-1] == package_name:
if len(versions) > 0:
latest_version, next_version = calculate_next_version(versions, "minor")
print(f"Latest version of package {package_name} is {latest_version}, using {next_version}")
break
if next_version is None:
print(f"No prior version found for {package_name}, starting at 1.0.0")
next_version = "1.0.0"
print(f"Creating package {package_name} version {next_version}")
await wrapp.create(package_name, next_version, source=root_url, catalog=False, repo=repository, context=context, scheduler=scheduler)
print("Done")