from .xmlutils import text_to_etree as _text_to_xml
from requests import Timeout
from six.moves import range
from six.moves.urllib.parse import urlparse
import six
[docs]def locate(host=None, port=None, internal=True, timeout=5):
if not (host or internal):
raise ValueError('cannot make external connection with no explicit host given')
if internal:
return locate_by_ssdp(host=host, timeout=timeout)
else:
return locate_by_resource(host, port)
[docs]def locate_by_ssdp(service_types=None, host=None, timeout=5):
from greyupnp.ssdp import search as ssdp_search
if service_types is None:
from pyinthesky import SERVICE_TYPES
service_types = SERVICE_TYPES
if not isinstance(service_types, dict):
service_types = dict.fromkeys(service_types, True)
# This is where we'll store discovery objects as they're returned.
collected = set()
searches = ssdp_search(tuple(service_types), timeout)
def host_matches(discovery):
urlobj = urlparse(discovery.location)
return (not host) or host in (urlobj.netloc, urlobj.hostname)
for (service_type, required) in service_types.items():
# See if we've found the service type already.
res = None
for discovery in collected:
if discovery.type == service_type and host_matches(discovery):
res = discovery
break
# Otherwise, listen to incoming search results and see if that matches.
if not res:
for discovery in searches:
collected.add(discovery)
if discovery.type == service_type and host_matches(discovery):
res = discovery
break
# Return a result if we found it...
if res:
yield res.location
continue
# Or complain if we didn't find it, yet need it...
elif required:
err = 'unable to find service of type "%s" within %s seconds'
raise Timeout(err % (service_type, timeout))
# Iterate over this resource.
[docs]def locate_by_resource(host, port=None, timeout=5):
from .transport import Transport
t = Transport(host, port, default_timeout=timeout)
# How many services do we need to find?
from pyinthesky import SERVICE_TYPES as ST
min_services = len([x for x in ST.values() if x])
max_services = len(ST)
# We look to find the first item.
import time
finish_by = time.time() + timeout
counted = 0
# Not sure if there's a maximum number for descriptions.
for i in range(1000):
loc = 'description%d.xml' % i
now = time.time()
if now > finish_by:
err = 'couldn\'t find all the resources for host "%s"'
raise t.Timeout(err % host)
# Found a resource!
r = t.get_resource(loc, raw_resp=True, timeout=finish_by-now)
if r.status_code == 200:
counted += 1
last_loop_found = True
yield r.url
# Found as many resources as we know about, so just
# return.
if counted >= max_services:
return
elif r.status_code == 404:
# If we didn't find a resource, then...
# * If we haven't found one before, then keep iterating
# until we find the first.
# * If we have found one before, then I guess that's
# as many as we can find - it's a problem if we
# have found at least the minimum number.
# * Although we normally have resources next to each
# other (orderwise), sometimes we have gaps (e.g.
# 0,2 or, 0,1,3) so we allow single item gaps.
if not counted:
last_loop_found = False
continue
if last_loop_found:
last_loop_found = False
continue
if counted < min_services:
err = 'did not find enough resources for host "%s" (found %s, needed %s)'
raise RuntimeError(err % (host, counted, min_services))
else:
return
# No idea what to do in this case, we're not expecting it.
else:
r.raise_for_status()
[docs]def default_validator(service_type, key, value):
from .validators import create_validator
return create_validator(value, key)
[docs]class Connection(object):
def __init__(self, resources, default_timeout=None):
from pyinthesky.transport import Transport
if isinstance(resources, six.string_types):
raise ValueError('resources must be an iterator of strings, not a string type')
self.resources = tuple(resources)
if not self.resources:
raise ValueError("at least one resource is needed")
self.create_validator = default_validator
# Look for a common root if we can.
from pyinthesky.utils import common_url_prefix
root = common_url_prefix(self.resources)
fixed_root = bool(root)
self.transport = Transport(root=root, fixed_root=fixed_root,
default_timeout=default_timeout)
[docs] def connect(self):
from pyinthesky.miniupnp import parse_device_description
upnp_devices = []
for resource in self.resources:
res_resp = self.transport.get_resource(resource)
resp_xml = _text_to_xml(res_resp.text)
upnp_devices.append(parse_device_description(resp_xml))
# Expose devices.
self.devices = {}
for upnp_device in upnp_devices:
self.devices[upnp_device.devtype] = Device(self, upnp_device)
[docs]class Device(object):
def __init__(self, connection, upnp_device):
self.connection = connection
self.upnp_device = upnp_device
self.services = {
s.name: Service(self, s)
for s in upnp_device.services.values()
}
def __getattr__(self, name):
return getattr(self.upnp_device, name)
def __str__(self):
return str(self.upnp_device)
# XXX: Maybe not.
def __repr__(self):
return repr(self.upnp_device)
[docs]class Service(object):
def __init__(self, device, upnp_service):
self.device = device
self.name = upnp_service.name
self.upnp_service = upnp_service
def __getattr__(self, name):
return getattr(self.upnp_service, name)
[docs] def connect(self):
from pyinthesky.miniupnp import parse_service_description
schema_resp = self.transport.get_resource(self.description_url)
schema_xml = _text_to_xml(schema_resp.text)
self.service_desc = parse_service_description(schema_xml)
# Create validators for all value types (states).
#
# It's a map from statevar objects to validators.
validators = {
v: self.create_validator(self.upnp_service.servtype, k, v) # pylint: disable=not-callable
for (k, v) in self.service_desc.states.items()
}
# And now create input and output validators for methods.
from pyinthesky.validators import create_multivalidator
from collections import OrderedDict as OD
self.methods = {}
for name, action in self.service_desc.actions.items():
in_vals = OD((k, validators[v]) for (k, v) in action.parameters.items())
out_vals = OD((k, validators[v]) for (k, v) in action.returns.items())
self.methods[name] = [
create_multivalidator(in_vals, name),
create_multivalidator(out_vals, name),
]
[docs] def invoke(self, action_name, *args, **kwargs):
from pyinthesky import minisoap, miniupnp, utils, xmlutils
# Find the action definition and get the argument order.
in_valid, out_valid = self.methods[action_name]
# Then take the positional arguments and normalise them to
# keyword-only arguments.
try:
kw = utils.args_to_kwargs(args, kwargs, in_valid.argument_order)
except TypeError as te:
raise TypeError('{} [{}]'.format(te, action_name))
# Next, validate and convert the arguments.
try:
kw_to_use = in_valid.output(kw)
except in_valid.Invalid as e:
raise ValueError(e)
# Build a UPNP request, then bundle that into SOAP.
schema = self.upnp_service.service_type
upnp_req = miniupnp.encode_action_request(schema, action_name, kw_to_use)
soap_req = minisoap.soap_encode([upnp_req])
# Submit it to the control URL.
respobj = self.transport.soap_request(
self.upnp_service.control_url, schema, action_name,
xmlutils.etree_to_text(soap_req), raw_resp=True
)
# Try to interpret it as a SOAP response - it may be an error
# response too.
try:
resp_etree = xmlutils.text_to_etree(respobj.text)
except xmlutils.ElementTree.ParseError:
# Just raise the original HTTP error - but if there wasn't
# one (curious), then raise an error complaining about the
# inability to parse the XML.
respobj.raise_for_status()
raise
# Decode the SOAP response.
try:
upnp_resp = minisoap.soap_decode(resp_etree)
except minisoap.SoapError as se:
# Check to see if it's a UPnPError wrapped inside a Soap error.
ue = miniupnp.check_upnp_error(se)
if ue is None:
raise
# Check to see if it specifically refers to a problem with
# a given value.
if not miniupnp.is_action_value_error(ue):
raise ue # pylint: disable=raising-bad-type
ave = ActionValueError(ue.desc)
ave.cause = ue
raise ave
# There should only be one response body.
if len(upnp_resp) == 0:
untyped_result = {}
elif len(upnp_resp) == 1:
untyped_result = miniupnp.decode_action_response(
action_name, upnp_resp[0])
else:
raise RuntimeError('%s body parts inside UPnP response' % len(upnp_resp))
# Break values out from their UPnP structure, and then convert
# the value types appropriately.
result = out_valid.input(untyped_result)
# Return either the dictionary structure if populated, or None
# if empty.
return result or None
@property
def transport(self):
return self.device.connection.transport
@property
def create_validator(self):
return self.device.connection.create_validator
[docs]class ActionValueError(ValueError):
pass