#!/usr/bin/env python3 """ TP-Link Omada Controller API Client A Python client library for interacting with the TP-Link Omada Controller API. """ import requests import json from typing import Dict, List, Optional, Any from urllib.parse import urljoin class OmadaError(Exception): """Base exception for Omada API errors""" pass class AuthenticationError(OmadaError): """Authentication failed""" pass class OmadaController: """TP-Link Omada Controller API Client""" def __init__( self, host: str, username: str, password: str, port: int = 8043, verify_ssl: bool = True, timeout: int = 30 ): """ Initialize Omada Controller client Args: host: Omada Controller hostname or IP username: Controller username password: Controller password port: Controller port (default: 8043) verify_ssl: Verify SSL certificates (default: True) timeout: Request timeout in seconds (default: 30) """ self.base_url = f"https://{host}:{port}" self.username = username self.password = password self.verify_ssl = verify_ssl self.timeout = timeout self.session = requests.Session() self.session.verify = verify_ssl self.token = None self.authenticated = False def _request( self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None ) -> Dict[str, Any]: """ Make API request Args: method: HTTP method (GET, POST, PUT, DELETE) endpoint: API endpoint data: Request body data params: Query parameters Returns: API response as dictionary Raises: OmadaError: If API request fails """ url = urljoin(self.base_url, endpoint) headers = { "Content-Type": "application/json", "Accept": "application/json" } if self.token: headers["Authorization"] = f"Bearer {self.token}" try: response = self.session.request( method=method, url=url, headers=headers, json=data, params=params, timeout=self.timeout ) response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: if e.response.status_code == 401: raise AuthenticationError("Authentication failed") from e raise OmadaError(f"API request failed: {e}") from e except requests.exceptions.RequestException as e: raise OmadaError(f"Request failed: {e}") from e def login(self) -> bool: """ Authenticate with Omada Controller Returns: True if authentication successful Raises: AuthenticationError: If authentication fails """ endpoint = "/api/v2/login" data = { "username": self.username, "password": self.password } try: response = self._request("POST", endpoint, data=data) self.token = response.get("token") self.authenticated = True return True except OmadaError as e: self.authenticated = False raise AuthenticationError(f"Login failed: {e}") from e def logout(self) -> None: """Logout from Omada Controller""" if self.authenticated: endpoint = "/api/v2/logout" try: self._request("POST", endpoint) except OmadaError: pass # Ignore errors on logout finally: self.token = None self.authenticated = False def is_authenticated(self) -> bool: """Check if authenticated""" return self.authenticated def get_sites(self) -> List[Dict[str, Any]]: """ Get all sites Returns: List of site dictionaries """ endpoint = "/api/v2/sites" response = self._request("GET", endpoint) return response.get("data", []) def get_site(self, site_id: str) -> Dict[str, Any]: """ Get site by ID Args: site_id: Site ID Returns: Site dictionary """ endpoint = f"/api/v2/sites/{site_id}" response = self._request("GET", endpoint) return response.get("data", {}) def create_site( self, name: str, timezone: str = "UTC", description: Optional[str] = None ) -> Dict[str, Any]: """ Create a new site Args: name: Site name timezone: Timezone (e.g., "America/New_York") description: Site description Returns: Created site dictionary """ endpoint = "/api/v2/sites" data = { "name": name, "timezone": timezone } if description: data["description"] = description response = self._request("POST", endpoint, data=data) return response.get("data", {}) def get_access_points(self, site_id: str) -> List[Dict[str, Any]]: """ Get all access points for a site Args: site_id: Site ID Returns: List of access point dictionaries """ endpoint = f"/api/v2/sites/{site_id}/access-points" response = self._request("GET", endpoint) return response.get("data", []) def get_access_point(self, ap_id: str) -> Dict[str, Any]: """ Get access point by ID Args: ap_id: Access point ID Returns: Access point dictionary """ endpoint = f"/api/v2/access-points/{ap_id}" response = self._request("GET", endpoint) return response.get("data", {}) def configure_ap( self, ap_id: str, name: Optional[str] = None, location: Optional[str] = None, radio_config: Optional[Dict] = None ) -> Dict[str, Any]: """ Configure access point Args: ap_id: Access point ID name: Access point name location: Physical location radio_config: Radio configuration Returns: Updated access point dictionary """ endpoint = f"/api/v2/access-points/{ap_id}" data = {} if name: data["name"] = name if location: data["location"] = location if radio_config: data["radio_config"] = radio_config response = self._request("PUT", endpoint, data=data) return response.get("data", {}) def get_ssids(self, site_id: str) -> List[Dict[str, Any]]: """ Get all SSIDs for a site Args: site_id: Site ID Returns: List of SSID dictionaries """ endpoint = f"/api/v2/sites/{site_id}/ssids" response = self._request("GET", endpoint) return response.get("data", []) def create_ssid( self, site_id: str, name: str, security: str = "wpa3", password: Optional[str] = None, vlan: Optional[int] = None, radios: Optional[List[str]] = None ) -> Dict[str, Any]: """ Create SSID Args: site_id: Site ID name: SSID name security: Security type (open, wpa2, wpa3) password: WPA password (required for wpa2/wpa3) vlan: VLAN ID radios: List of radios (["2.4GHz", "5GHz"]) Returns: Created SSID dictionary """ endpoint = f"/api/v2/sites/{site_id}/ssids" data = { "name": name, "security": security } if password: data["password"] = password if vlan: data["vlan"] = vlan if radios: data["radios"] = radios response = self._request("POST", endpoint, data=data) return response.get("data", {}) def get_clients(self, site_id: str) -> List[Dict[str, Any]]: """ Get all client devices for a site Args: site_id: Site ID Returns: List of client dictionaries """ endpoint = f"/api/v2/sites/{site_id}/clients" response = self._request("GET", endpoint) return response.get("data", []) def get_client(self, mac: str) -> Dict[str, Any]: """ Get client device by MAC address Args: mac: MAC address Returns: Client dictionary """ endpoint = f"/api/v2/clients/{mac}" response = self._request("GET", endpoint) return response.get("data", {}) # Example usage if __name__ == "__main__": # Initialize controller controller = OmadaController( host="omada.sankofa.nexus", username="admin", password="secure-password" ) try: # Authenticate controller.login() print("Authenticated successfully") # Get sites sites = controller.get_sites() print(f"Found {len(sites)} sites") # Get access points for first site if sites: site_id = sites[0]["id"] aps = controller.get_access_points(site_id) print(f"Found {len(aps)} access points") # Logout controller.logout() print("Logged out") except AuthenticationError as e: print(f"Authentication failed: {e}") except OmadaError as e: print(f"Error: {e}")