Files
dbis_core/sdk/python/dbis_iru/client.py
2026-03-02 12:14:07 -08:00

220 lines
5.3 KiB
Python

"""
DBIS IRU Python Client
"""
import requests
from typing import Optional, Dict, Any, List
from .types import IRUOffering, IRUInquiry, IRUSubscription
class IRUClient:
"""Client for DBIS IRU API"""
def __init__(
self,
api_base_url: str,
api_key: Optional[str] = None,
timeout: int = 30
):
"""
Initialize IRU Client
Args:
api_base_url: Base URL for DBIS API
api_key: API key for authentication (optional)
timeout: Request timeout in seconds
"""
self.api_base_url = api_base_url.rstrip('/')
self.api_key = api_key
self.timeout = timeout
self.session = requests.Session()
if api_key:
self.session.headers.update({
'Authorization': f'Bearer {api_key}'
})
self.session.headers.update({
'Content-Type': 'application/json'
})
def get_offerings(
self,
capacity_tier: Optional[int] = None,
institutional_type: Optional[str] = None
) -> List[IRUOffering]:
"""
Get all IRU offerings
Args:
capacity_tier: Filter by capacity tier (1-5)
institutional_type: Filter by institutional type
Returns:
List of IRU offerings
"""
params = {}
if capacity_tier:
params['capacityTier'] = capacity_tier
if institutional_type:
params['institutionalType'] = institutional_type
response = self._request(
'GET',
'/api/v1/iru/marketplace/offerings',
params=params
)
return response['data']
def get_offering(self, offering_id: str) -> IRUOffering:
"""
Get offering by ID
Args:
offering_id: Offering ID
Returns:
IRU offering details
"""
response = self._request(
'GET',
f'/api/v1/iru/marketplace/offerings/{offering_id}'
)
return response['data']
def submit_inquiry(self, inquiry: IRUInquiry) -> Dict[str, Any]:
"""
Submit initial inquiry
Args:
inquiry: Inquiry details
Returns:
Inquiry result with inquiry ID and status
"""
response = self._request(
'POST',
'/api/v1/iru/marketplace/inquiries',
json=inquiry.dict()
)
return response['data']
def get_inquiry_status(self, inquiry_id: str) -> Dict[str, Any]:
"""
Get inquiry status
Args:
inquiry_id: Inquiry ID
Returns:
Inquiry status details
"""
response = self._request(
'GET',
f'/api/v1/iru/marketplace/inquiries/{inquiry_id}'
)
return response['data']
def calculate_pricing(
self,
offering_id: str,
usage_profile: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Calculate pricing for an offering
Args:
offering_id: Offering ID
usage_profile: Optional usage profile
Returns:
Pricing breakdown
"""
params = {}
if usage_profile:
import json
params['usageProfile'] = json.dumps(usage_profile)
response = self._request(
'GET',
f'/api/v1/iru/marketplace/offerings/{offering_id}/pricing',
params=params
)
return response['data']
def get_dashboard(self) -> Dict[str, Any]:
"""
Get participant dashboard
Returns:
Dashboard data
"""
response = self._request(
'GET',
'/api/v1/iru/portal/dashboard'
)
return response['data']
def get_service_health(self, subscription_id: str) -> Dict[str, Any]:
"""
Get service health
Args:
subscription_id: Subscription ID
Returns:
Service health data
"""
response = self._request(
'GET',
f'/api/v1/iru/portal/monitoring/{subscription_id}/health'
)
return response['data']
def get_deployment_status(self, subscription_id: str) -> Dict[str, Any]:
"""
Get deployment status
Args:
subscription_id: Subscription ID
Returns:
Deployment status
"""
response = self._request(
'GET',
f'/api/v1/iru/portal/deployment/{subscription_id}'
)
return response['data']
def _request(
self,
method: str,
path: str,
params: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Make HTTP request"""
url = f"{self.api_base_url}{path}"
try:
response = self.session.request(
method=method,
url=url,
params=params,
json=json,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise Exception(f"API request failed: {str(e)}")