Files
dbis_core/sdk/python/dbis_iru/client.py

220 lines
5.3 KiB
Python
Raw Normal View History

"""
DBIS IRU Python Client
"""
import requests
from typing import Optional, Dict, Any, List
from .types import IRUOffering, IRUInquiry, IRUSubscription
class IRUClient:
"""Client for DBIS IRU API"""
def __init__(
self,
api_base_url: str,
api_key: Optional[str] = None,
timeout: int = 30
):
"""
Initialize IRU Client
Args:
api_base_url: Base URL for DBIS API
api_key: API key for authentication (optional)
timeout: Request timeout in seconds
"""
self.api_base_url = api_base_url.rstrip('/')
self.api_key = api_key
self.timeout = timeout
self.session = requests.Session()
if api_key:
self.session.headers.update({
'Authorization': f'Bearer {api_key}'
})
self.session.headers.update({
'Content-Type': 'application/json'
})
def get_offerings(
self,
capacity_tier: Optional[int] = None,
institutional_type: Optional[str] = None
) -> List[IRUOffering]:
"""
Get all IRU offerings
Args:
capacity_tier: Filter by capacity tier (1-5)
institutional_type: Filter by institutional type
Returns:
List of IRU offerings
"""
params = {}
if capacity_tier:
params['capacityTier'] = capacity_tier
if institutional_type:
params['institutionalType'] = institutional_type
response = self._request(
'GET',
'/api/v1/iru/marketplace/offerings',
params=params
)
return response['data']
def get_offering(self, offering_id: str) -> IRUOffering:
"""
Get offering by ID
Args:
offering_id: Offering ID
Returns:
IRU offering details
"""
response = self._request(
'GET',
f'/api/v1/iru/marketplace/offerings/{offering_id}'
)
return response['data']
def submit_inquiry(self, inquiry: IRUInquiry) -> Dict[str, Any]:
"""
Submit initial inquiry
Args:
inquiry: Inquiry details
Returns:
Inquiry result with inquiry ID and status
"""
response = self._request(
'POST',
'/api/v1/iru/marketplace/inquiries',
json=inquiry.dict()
)
return response['data']
def get_inquiry_status(self, inquiry_id: str) -> Dict[str, Any]:
"""
Get inquiry status
Args:
inquiry_id: Inquiry ID
Returns:
Inquiry status details
"""
response = self._request(
'GET',
f'/api/v1/iru/marketplace/inquiries/{inquiry_id}'
)
return response['data']
def calculate_pricing(
self,
offering_id: str,
usage_profile: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Calculate pricing for an offering
Args:
offering_id: Offering ID
usage_profile: Optional usage profile
Returns:
Pricing breakdown
"""
params = {}
if usage_profile:
import json
params['usageProfile'] = json.dumps(usage_profile)
response = self._request(
'GET',
f'/api/v1/iru/marketplace/offerings/{offering_id}/pricing',
params=params
)
return response['data']
def get_dashboard(self) -> Dict[str, Any]:
"""
Get participant dashboard
Returns:
Dashboard data
"""
response = self._request(
'GET',
'/api/v1/iru/portal/dashboard'
)
return response['data']
def get_service_health(self, subscription_id: str) -> Dict[str, Any]:
"""
Get service health
Args:
subscription_id: Subscription ID
Returns:
Service health data
"""
response = self._request(
'GET',
f'/api/v1/iru/portal/monitoring/{subscription_id}/health'
)
return response['data']
def get_deployment_status(self, subscription_id: str) -> Dict[str, Any]:
"""
Get deployment status
Args:
subscription_id: Subscription ID
Returns:
Deployment status
"""
response = self._request(
'GET',
f'/api/v1/iru/portal/deployment/{subscription_id}'
)
return response['data']
def _request(
self,
method: str,
path: str,
params: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Make HTTP request"""
url = f"{self.api_base_url}{path}"
try:
response = self.session.request(
method=method,
url=url,
params=params,
json=json,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise Exception(f"API request failed: {str(e)}")