Source code for modelz.openapi_client

import httpx

from modelz.openapi.sdk.api.deployment import (
    delete_users_login_name_clusters_cluster_id_deployments_name,
    get_users_login_name_clusters_cluster_id_deployments,
    get_users_login_name_clusters_cluster_id_deployments_name,
    post_users_login_name_clusters_cluster_id_deployments,
    put_users_login_name_clusters_cluster_id_deployments_name,
)
from modelz.openapi.sdk.client import AuthenticatedClient
from modelz.openapi.sdk.models import (
    DeploymentCreateRequest,
    DeploymentListResponse,
    DeploymentResponse,
    DeploymentUpdateRequest,
)
from modelz.openapi.sdk.types import Response

TIMEOUT = httpx.Timeout(5, read=300, write=300)


[docs]class DeploymentClient: def __init__( self, login_name: str, key: str, host: str = "https://cloud.modelz.ai/api/v1", cluster_id: str = "modelz", ): """Create a Modelz Client for deployments. Args: login_name: UUID for operated user key: API key host: ModelZ apiserver base URL cluster_id: cluster UUID for operated agent """ self.client = ( AuthenticatedClient(base_url=host, token=key) .with_timeout(TIMEOUT) .with_headers({"X-API-Key": key}) ) self.login_name = login_name self.host = host self.cluster_id = cluster_id
[docs] def create(self, req: DeploymentCreateRequest) -> Response[DeploymentResponse]: """Create a new deployment. Args: req: spec of request body """ return post_users_login_name_clusters_cluster_id_deployments.sync_detailed( login_name=self.login_name, cluster_id=self.cluster_id, client=self.client, json_body=req, )
[docs] def list(self) -> Response[DeploymentListResponse]: """Create all exist deployments.""" return get_users_login_name_clusters_cluster_id_deployments.sync_detailed( login_name=self.login_name, cluster_id=self.cluster_id, client=self.client )
[docs] def get(self, deployment_id: str) -> Response[DeploymentResponse]: """Get exist deployment by id Args: deployment_id: deployment id """ return get_users_login_name_clusters_cluster_id_deployments_name.sync_detailed( login_name=self.login_name, cluster_id=self.cluster_id, name=deployment_id, client=self.client, )
[docs] def update( self, deployment_id: str, req: DeploymentUpdateRequest ) -> Response[DeploymentResponse]: """Update editable spec of any exist deployments. Args: deployment_id: deployment id req: spec of request body """ return put_users_login_name_clusters_cluster_id_deployments_name.sync_detailed( login_name=self.login_name, cluster_id=self.cluster_id, name=deployment_id, client=self.client, json_body=req, )
[docs] def delete(self, deployment_id: str) -> Response[None]: """Delete any exist deployments. Args: deployment_id: deployment id """ return ( delete_users_login_name_clusters_cluster_id_deployments_name.sync_detailed( login_name=self.login_name, cluster_id=self.cluster_id, name=deployment_id, client=self.client, ) )