# -*- coding: utf-8 -*-
"""
Zenoss devicemanagement_router
"""
from datetime import datetime as dt
from zenossapi.apiclient import ZenossAPIClientError
from zenossapi.routers import ZenossRouter
__router__ = 'DeviceManagementRouter'
[docs]class DeviceManagementRouter(ZenossRouter):
"""
Class for interacting with the Zenoss devicemanagement router
"""
def __init__(self, url, headers, ssl_verify):
super(DeviceManagementRouter, self).__init__(url, headers, ssl_verify,
'devicemanagement_router',
'DeviceManagementRouter')
self.uid = None
def __repr__(self):
if self.uid:
identifier = self.uid
else:
identifier = hex(id(self))
return '<{0} object at {1}>'.format(
type(self).__name__, identifier
)
[docs] def timezone(self):
"""
Returns the configured timezone.
Returns:
str:
"""
tz_data = self._router_request(
self._make_request_data(
'getTimeZone',
data=dict()
)
)
return tz_data['data']
[docs] def list_maintenance_windows(self, uid):
"""
Returns the list of maintenance windows configured for a device
or device class.
Arguments:
uid (str): The UID of the device or device class
Returns:
list(dict):
"""
uid = self._check_uid(uid)
mw_data = self._router_request(
self._make_request_data(
'getMaintWindows',
data=dict(
uid=uid
)
)
)
for mw in mw_data['data']:
mw['uid'] = mw['uid'].replace('/zport/dmd/', '', 1)
return mw_data['data']
[docs] def get_maintenance_windows(self, uid):
"""
Returns a list of ZenossMaintenanceWindow objects for the
maintenance windows configured for a device or device class
Arguments:
uid (str): The UID of the device or device class
Returns:
list(ZenossMaintenanceWindow):
"""
mw_data = self.list_maintenance_windows(uid)
windows = []
for mw in mw_data:
windows.append(
ZenossMaintenanceWindow(
self.api_url,
self.api_headers,
self.ssl_verify,
mw,
parent=self._check_uid(uid),
)
)
return windows
[docs] def get_maintenance_window(self, uid, name):
"""
Get a maintenance window object for the named window.
Arguments:
uid (str): The UID of the device or device class
name (str): Name of the maintenance window
Returns:
ZenossMaintenanceWindow:
"""
mw_data = self.list_maintenance_windows(uid)
for mw in mw_data:
if mw['name'] == name:
return ZenossMaintenanceWindow(
self.api_url,
self.api_headers,
self.ssl_verify,
mw,
parent=self._check_uid(uid),
)
return None
[docs] def add_maintenance_window(self, uid, name, start, duration, enabled=False, start_state=300, repeat='Never', occurrence='1st', days='Sunday'):
"""
Add a new maintenance window for device or device class.
Arguments:
uid (str): The UID of the device or device class
start (str): Window start time in UNIX epoch timestamp format,
e.g. "1511290393"
duration (str): Duration of the window in HH:MM:SS format
start_state (int): Production state for the maintenance window,
default is 300 (Maintenance)
repeat (str): Maintenance window repeat interval, default is
'Never'. Other valid choices are: 'Daily', 'Every Weekday',
'Weekly', 'Monthly: day of month', 'Monthly: day of week'
occurrence (str): For 'Monthly: day of week' repeats, options are
'1st', '2nd', '3rd', '4th', '5th', 'Last'
days (str): For 'Monthly: day of week' repeats, options are
'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday',
'Saturday', 'Sunday'
Returns:
ZenossMaintenanceWindow:
"""
if repeat not in ['Never', 'Daily', 'Every Weekday', 'Weekly',
'Monthly: day of month', 'Monthly: day of week']:
raise ZenossAPIClientError(
'Invalid maintenance window repetition: {0}'.format(repeat))
if occurrence not in ['1st', '2nd', '3rd', '4th', '5th', 'Last']:
raise ZenossAPIClientError(
'Invalid maintenance window occurrence: {0}'.format(occurrence))
if days not in ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday',
'Saturday', 'Sunday']:
raise ZenossAPIClientError(
'Invalid maintenance window days: {0}'.format(days))
self._router_request(
self._make_request_data(
'addMaintWindow',
data=dict(
params=dict(
uid=uid,
name=name,
start=start,
duration=duration,
enabled=enabled,
startState=start_state,
repeat=repeat,
occurrence=occurrence,
days=days,
)
)
)
)
return self.get_maintenance_window(uid, name)
[docs] def list_user_commands(self, uid):
"""
Get the list of user commands configured for a device or device class.
Arguments:
uid (str): The UID of the device or device class
Returns:
list(dict):
"""
uid = self._check_uid(uid)
uc_data = self._router_request(
self._make_request_data(
'getUserCommands',
data=dict(
uid=uid,
)
)
)
return uc_data['data']
[docs] def get_user_commands(self, uid):
"""
Get a list of user commands objects configured for a device
or device class.
Arguments:
uid (str): The UID of the device or device class
Returns:
list(ZenossUserCommand):
"""
uc_data = self.list_user_commands(uid)
user_commands = []
for uc in uc_data:
user_commands.append(
ZenossUserCommand(
self.api_url,
self.api_headers,
self.ssl_verify,
uc,
parent=self._check_uid(uid)
)
)
return user_commands
[docs] def get_user_command_by_id(self, uid, command_id):
"""
Get a configured user command by its id
Arguments:
uid (str): The UID of the device or device class
command_id (str): The ID of the user command
"""
uc_data = self.list_user_commands(uid)
for uc in uc_data:
if uc['id'] == command_id:
return ZenossUserCommand(
self.api_url,
self.api_headers,
self.ssl_verify,
uc,
parent=self._check_uid(uid)
)
return None
[docs] def get_user_command_by_name(self, uid, command_name):
"""
Get a configured user command by its id
Arguments:
uid (str): The UID of the device or device class
command_name (str): The name of the user command
"""
uc_data = self.list_user_commands(uid)
for uc in uc_data:
if uc['name'] == command_name:
return ZenossUserCommand(
self.api_url,
self.api_headers,
self.ssl_verify,
uc,
parent=self._check_uid(uid)
)
return None
[docs] def add_user_command(self, uid, name, description, command, password):
"""
Add a new user command to a device or device class.
Arguments:
uid (str): The UID of the device or device class
name (str): Name for the new command
description (str): Description of the new command
command (str): Command line of the new command, can include TALES
expressions
password (str): Password of the user adding the command.
"""
uid = self._check_uid(uid)
self._router_request(
self._make_request_data(
'addUserCommand',
data=dict(
uid=uid,
name=name,
description=description,
command=command,
password=password
)
)
)
return self.get_user_command_by_name(uid, name)
[docs] def list_users(self, uid):
"""
List the users available to associate with a device or device class.
Arguments:
uid (str): the UID of the device or device class
Returns:
list(str):
"""
uid = self._check_uid(uid)
user_data = self._router_request(
self._make_request_data(
'getUserList',
data=dict(
uid=uid
)
)
)
return user_data['data']
[docs] def list_available_roles(self, uid):
"""
List the admin roles available to associate with a device or device class.
Arguments:
uid (str): The UID of the device or device class
Returns:
list(str):
"""
uid = self._check_uid(uid)
role_data = self._router_request(
self._make_request_data(
'getRolesList',
data=dict(
uid=uid
)
)
)
return role_data['data']
[docs] def list_admin_roles(self, uid):
"""
List the admin roles associated with a device or device class.
Arguments:
uid (str): The UID of the device or device class
Returns:
list(dict):
"""
uid = self._check_uid(uid)
role_data = self._router_request(
self._make_request_data(
'getAdminRoles',
data=dict(
uid=uid
)
)
)
for r in role_data['data']:
r['uid'] = r['uid'].replace('/zport/dmd/', '', 1)
return role_data['data']
[docs] def get_admins(self, uid):
"""
Get ZenossDeviceManagementAdmin objects for the configured admin
users for a device or device class.
Arguments:
uid (str): The UID of the device or device class
Returns:
list(ZenossDeviceManagementAdmin):
"""
admin_data = self.list_admin_roles(uid)
admins = []
for admin in admin_data:
admins.append(
ZenossDeviceManagementAdmin(
self.api_url,
self.api_headers,
self.ssl_verify,
admin
)
)
return admins
[docs] def list_admins_by_role(self, uid, role):
"""
List configured admin users for a device or device class by role.
Arguments:
uid (str): The UID of the device or device class
role (str): The role to filter on
Returns:
list(dict):
"""
admin_data = self.list_admin_roles(uid)
admins = []
for admin in admin_data:
if admin['role'] == role:
admins.append(admin)
return admins
[docs] def get_admins_by_role(self, uid, role):
"""
Get ZenossDeviceManagementAdmin objects for the configured admin users
of a device or device class by role.
Arguments:
uid (str): The UID of the device or device class
role (str): The role to filter on
Returns:
list(ZenossDeviceManagementAdmin):
"""
admin_data = self.list_admin_roles(uid)
admins = []
for admin in admin_data:
if admin['role'] == role:
admins.append(
ZenossDeviceManagementAdmin(
self.api_url,
self.api_headers,
self.ssl_verify,
admin
)
)
return admins
[docs] def get_admin_by_name(self, uid, name):
"""
Get an admin user for a device or device class by name.
Arguments:
uid (str): The UID of the device or device class
name (str): The name of the admin user
Returns:
ZenossDeviceManagementAdmin:
"""
admin_data = self.list_admin_roles(uid)
for admin in admin_data:
if admin['name'] == name:
return ZenossDeviceManagementAdmin(
self.api_url,
self.api_headers,
self.ssl_verify,
admin
)
return None
[docs] def get_admin_by_id(self, uid, admin_id):
"""
Get and admin user for a device or device class by id.
Arguments:
uid (str): The UID of the device or device class
admin_id (str): The ID of the admin user
Returns:
ZenossDeviceManagementAdmin:
"""
admin_data = self.list_admin_roles(uid)
for admin in admin_data:
if admin['id'] == admin_id:
return ZenossDeviceManagementAdmin(
self.api_url,
self.api_headers,
self.ssl_verify,
admin
)
return None
[docs] def add_admin(self, uid, name, role=None):
"""
Add an admin user to a device or device class.
Arguments:
uid (str): The UID of the device or device class
name (str): The name of the user to add
role (str): The role to associate with the user for this
device or device class
Returns:
ZenossDeviceManagementAdmin:
"""
uid = self._check_uid(uid)
self._router_request(
self._make_request_data(
'addAdminRole',
data=dict(
params=dict(
uid=uid,
name=name,
role=role,
)
)
)
)
return self.get_admin_by_name(uid, name)
[docs]class ZenossMaintenanceWindow(DeviceManagementRouter):
"""
Class for Zenoss maintenance window objects
"""
def __init__(self, url, headers, ssl_verify, window_data, parent=None):
super(ZenossMaintenanceWindow, self).__init__(url, headers, ssl_verify)
self.uid = window_data['uid'].replace('/zport/dmd/', '', 1)
self.id = window_data['id']
self.name = window_data['name']
self.description = window_data['description']
self.meta_type = window_data['meta_type']
self.inspector_type = window_data['inspector_type']
self.startState = window_data['startState']
self.startProdState = window_data['startProdState']
self.enabled = window_data['enabled']
self.started = window_data['started']
self.start = window_data['start']
self.startTime = window_data['startTime']
self.duration = window_data['duration']
self.skip = window_data['skip']
self.repeat = window_data['repeat']
self.niceRepeat = window_data['niceRepeat']
self.occurrence = window_data['occurrence']
self.days = window_data['days']
self.parent = parent
self.startDate, start_ts, tz = self.startTime.split()
self.startHours, self.startMinutes = start_ts.split(':')[:2]
duration_parts = self.duration.split()
if len(duration_parts) > 1:
self.durationDays = duration_parts[0]
else:
self.durationDays = None
self.durationHours, self.durationMinutes = duration_parts[-1].split(':')[:2]
def _build_update_params(self, params):
"""
Build the params dict for updating a maintenance window
Arguments:
params (dict): The update parameters as a dict
Returns:
dict:
"""
[docs] def delete(self):
"""
Delete a maintenance window from a device or device class.
Returns:
dict:
"""
return self._router_request(
self._make_request_data(
'deleteMaintWindow',
data=dict(
uid=self.parent,
id=self.id,
)
)
)
[docs] def update(self, start_timestamp=None, start_datetime=None, start_date=None,
start_hours=None, start_minutes=None, duration_days=None,
duration_time=None, duration_hours=None, duration_minutes=None,
production_state=None, enabled=None, repeat=None,
occurrence=None, days=None):
"""
Update the settings for a maintenance window, with flexible options for
specifying the start date/time and duration.
Arguments:
start_timestamp (float): Start date and time in UNIX timestamp format
start_datetime (datetime): Start date and time as a datetime.datetime object
start_date (str): Start date as a string
start_hours (str): Start hours as a string
start_minutes (str): Start minutes as a string
duration_days (str): Duration days
duration_time (str): Duration time in "HH:MM" format
duration_hours (str): Duration hours
duration_minutes (str): Duration minutes
production_state (int): Production state for the window
enabled (bool): Enabled state of the window
occurrence (str): Repeat occurrence
days (str): Repeat days
Returns:
bool:
"""
new_params = dict()
if start_timestamp:
new_start = dt.fromtimestamp(start_timestamp)
new_params['startDate'] = str(new_start.date())
new_params['startHours'] = str(new_start.hour)
new_params['startMinutes'] = str(new_start.minute)
elif start_datetime:
new_params['startDate'] = str(start_datetime.date())
new_params['startHours'] = str(start_datetime.hour)
new_params['startMinutes'] = str(start_datetime.minute)
if start_date:
new_params['startDate'] = start_date
if start_hours:
new_params['startHours'] = start_hours
if start_minutes:
new_params['startMinutes'] = start_minutes
new_params['durationDays'] = duration_days
if duration_time:
new_params['durationHours'], new_params['durationMinutes'] = duration_time.split(':')[:2]
if duration_hours:
new_params['durationHours'] = duration_hours
if duration_minutes:
new_params['durationMinutes'] = duration_minutes
update_resp = self._router_request(
self._make_request_data(
'editMaintWindow',
data=dict(
params=dict(
uid=self.parent,
id=self.id,
startDate=new_params.get('startDate', self.startDate),
startHours=new_params.get('startHours', self.startHours),
startMinutes=new_params.get('startMinutes', self.startMinutes),
durationDays=new_params.get('durationDays', self.durationDays),
durationHours=new_params.get('durationHours', self.durationHours),
durationMinutes=new_params.get('durationMinutes', self.durationMinutes),
startProductionState=production_state if production_state else self.startProdState,
repeat=repeat if repeat else self.repeat,
enabled=enabled if enabled else self.enabled,
occurrence=occurrence if occurrence else self.occurrence,
days=days if days else self.days
)
)
)
)
self.__init__(self.api_url, self.api_headers, self.ssl_verify, update_resp['data'][0], parent=self.parent)
return True
[docs] def enable(self):
"""
Set maintenance window to enabled.
Returns:
bool:
"""
if not self.enabled:
self._router_request(
self._make_request_data(
'editMaintWindow',
data=dict(
params=dict(
uid=self.parent,
id=self.id,
startDate=self.startDate,
startHours=self.startHours,
startMinutes=self.startMinutes,
durationDays=self.durationDays,
durationHours=self.durationHours,
startProductionState=self.startProdState,
repeat=self.repeat,
enabled=True,
occurrence=self.occurrence,
days=self.days,
)
)
)
)
self.enabled = True
return True
[docs] def disable(self):
"""
Set maintenance window to disabled.
Returns:
bool:
"""
if self.enabled:
self._router_request(
self._make_request_data(
'editMaintWindow',
data=dict(
uid=self.parent,
id=self.id,
params=dict(
startDate=self.startDate,
startHours=self.startHours,
startMinutes=self.startMinutes,
durationDays=self.durationDays,
durationHours=self.durationHours,
startProductionState=self.startProdState,
repeat=self.repeat,
enabled=False,
occurrence=self.occurrence,
days=self.days,
)
)
)
)
self.enabled = False
return True
[docs]class ZenossUserCommand(DeviceManagementRouter):
"""
Class for Zenoss user command objects
"""
def __init__(self, url, headers, ssl_verify, command_data, parent=None):
super(ZenossUserCommand, self).__init__(url, headers, ssl_verify)
self.description = command_data['description']
self.name = command_data['name']
self.meta_type = command_data['meta_type']
self.command = command_data['command']
self.inspector_type = command_data['inspector_type']
self.id = command_data['id']
self.uid = command_data['uid'].replace('/zport/dmd/', '', 1)
self.parent = parent
[docs] def delete(self):
"""
Delete a user command for a device or device class.
Returns:
dict:
"""
return self._router_request(
self._make_request_data(
'deleteUserCommand',
data=dict(
uid=self.parent,
id=self.id
)
)
)
[docs] def update(self, description=None, command=None, password=None):
"""
Update a user command.
Arguments:
description (str): Description of the user command
command (str): Command line of the command
password (str): Password of the user updating the command.
"""
self._router_request(
self._make_request_data(
'updateUserCommand',
data=dict(
params=dict(
uid=self.parent,
id=self.id,
description=description if description else self.description,
command=command if command else self.command,
password=password,
)
)
)
)
uc_data = self.list_user_commands(self.parent)
for uc in uc_data:
if uc['id'] == self.id:
self.__init__(self.api_url, self.api_headers, self.ssl_verify, uc, parent=self.parent)
return True
[docs]class ZenossDeviceManagementAdmin(DeviceManagementRouter):
"""
Class for Zenoss user command objects
"""
def __init__(self, url, headers, ssl_verify, admin_data):
super(ZenossDeviceManagementAdmin, self).__init__(url, headers, ssl_verify)
self.uid = admin_data['uid'].replace('/zport/dmd/', '', 1)
self.name = admin_data['name']
self.id = admin_data['id']
self.meta_type = admin_data['meta_type']
self.role = admin_data['role']
self.inspector_type = admin_data['inspector_type']
self.pager = admin_data['pager']
self.email=admin_data['email']
[docs] def update(self, role):
"""
Update the admin user's role.
Arguments:
role (str): New role for the user
Returns:
bool:
"""
self._router_request(
self._make_request_data(
'updateAdminRole',
data=dict(
params=dict(
uid=self.uid,
name=self.name,
role=role
)
)
)
)
self.role = role
return True
[docs] def delete(self):
"""
Delete an admin user from a device or device class.
:return:
"""
return self._router_request(
self._make_request_data(
'removeAdmin',
data=dict(
uid=self.uid,
id=self.id,
)
)
)