Viewing file: PhysicalDevice.py (16.19 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
#!/usr/bin/python3
## Copyright (C) 2008, 2009, 2010, 2012, 2014 Red Hat, Inc. ## Authors: ## Tim Waugh <twaugh@redhat.com>
## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version.
## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details.
## You should have received a copy of the GNU General Public License ## along with this program; if not, write to the Free Software ## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import config import gettext gettext.install(domain=config.PACKAGE, localedir=config.localedir) import cupshelpers import urllib.parse import ppdippstr import socket from debug import *
class PhysicalDevice: def __init__(self, device): self.devices = None self._network_host = None self.dnssd_hostname = None self._cupsserver = False self.firsturi = None self.add_device (device) self._user_data = {} self._ppdippstr = ppdippstr.backends
def _canonical_id (self, device): if hasattr (device, "id_dict"): mfg = device.id_dict.get ('MFG', '') mdl = device.id_dict.get ('MDL', '')
if mfg == '' or mdl.lower ().startswith (mfg.lower ()): make_and_model = mdl else: make_and_model = "%s %s" % (mfg, mdl) else: make_and_model = device.make_and_model
return cupshelpers.ppds.ppdMakeModelSplit (make_and_model)
def _add_dot_local_if_needed(self, hostname): if (hostname is None): return None if ((not '.' in hostname) and (not ':' in hostname) and (hostname != 'localhost')): return hostname + '.local' else: return hostname
def _get_address (self, hostname): try: address = socket.getaddrinfo(hostname, 0, family=socket.AF_INET)[0][4][0] except: try: address = socket.getaddrinfo(hostname, 0, family=socket.AF_INET6)[0][4][0] except: address = None return address
def _get_host_from_uri (self, uri): hostport = None host = None dnssdhost = None (scheme, rest) = urllib.parse.splittype (uri) if scheme == 'hp' or scheme == 'hpfax': ipparam = None if rest.startswith ("/net/"): (rest, ipparam) = urllib.parse.splitquery (rest[5:])
if ipparam is not None: if ipparam.startswith("ip="): hostport = ipparam[3:] elif ipparam.startswith ("hostname="): hostport = ipparam[9:] elif ipparam.startswith("zc="): dnssdhost = ipparam[3:] else: return None, None else: return None, None elif scheme == 'dnssd' or scheme == 'mdns': # The URIs of the CUPS "dnssd" backend do not contain the host # name of the printer return None, None else: (hostport, rest) = urllib.parse.splithost (rest) if hostport is None: return None, None
if hostport: (host, port) = urllib.parse.splitport (hostport)
if (host): ip = None try: ip = self._get_address(host) if ip: host = ip except: pass elif (dnssdhost): try: host = self._get_address(dnssdhost) except: host = None
return self._add_dot_local_if_needed(host), \ self._add_dot_local_if_needed(dnssdhost)
def add_device (self, device): if self._network_host or self.dnssd_hostname: host, dnssdhost = self._get_host_from_uri (device.uri) if (hasattr (device, 'address')): host = self._add_dot_local_if_needed(device.address) if (hasattr (device, 'hostname') and dnssdhost is None): dnssdhost = self._add_dot_local_if_needed(device.hostname) if (host is None and dnssdhost is None) or \ not ((host and self._network_host and host == self._network_host) or (host and self.dnssd_hostname and host == self.dnssd_hostname) or (dnssdhost and self._network_host and dnssdhost == self._network_host) or (dnssdhost and self.dnssd_hostname and dnssdhost == self.dnssd_hostname)) or \ (host is None and self.dnssd_hostname is None) or \ (dnssdhost is None and self._network_host is None): raise ValueError else: (mfg, mdl) = self._canonical_id (device) if self.devices is None: self.mfg = mfg self.mdl = mdl self.mfg_lower = mfg.lower () self.mdl_lower = mdl.lower () self.sn = device.id_dict.get ('SN', '') self.devices = [] else: def nicest (a, b): def count_lower (s): l = s.lower () n = 0 for i in range (len (s)): if l[i] != s[i]: n += 1 return n if count_lower (b) < count_lower (a): return b return a
self.mfg = nicest (self.mfg, mfg) self.mdl = nicest (self.mdl, mdl)
sn = device.id_dict.get ('SN', '') if sn != '' and self.sn != '' and sn != self.sn: raise ValueError
if device.type == "socket": # Remove default port to more easily find duplicate URIs device.uri = device.uri.replace (":9100", "") if (device.uri.startswith('ipp:') and \ device.uri.find('/printers/') != -1) or \ ((device.uri.startswith('dnssd:') or \ device.uri.startswith('mdns:')) and \ device.uri.endswith('/cups')): # CUPS server self._cupsserver = True elif self._cupsserver: # Non-CUPS queue on a CUPS server, drop this one return for d in self.devices: if d.uri == device.uri: return
# Use the URI of the very first device added as a kind of identifier # for this physical device record, to make debugging easier if not self.firsturi: self.firsturi = device.uri;
self.devices.append (device) self.devices.sort ()
if (not self._network_host or not self.dnssd_hostname) and \ device.device_class == "network": # We just added a network device. self._network_host, dnssdhost = \ self._get_host_from_uri (device.uri) if dnssdhost: self.dnssd_hostname = dnssdhost;
if (hasattr (device, 'address') and self._network_host is None): address = device.address if address: self._network_host = self._add_dot_local_if_needed(address)
if (hasattr (device, 'hostname') and self.dnssd_hostname is None): hostname = device.hostname if hostname: self.dnssd_hostname = self._add_dot_local_if_needed(hostname)
if (self.dnssd_hostname and self._network_host is None): try: self._network_host = self._get_address(hostname); except: self._network_host = None
debugprint("Device %s added to physical device: %s" % (device.uri, repr(self)))
def get_devices (self): return self.devices
def get_info (self): # If the manufacturer/model is not known, or useless (in the # case of the hpfax backend or a dnssd URI pointing to a remote # CUPS queue), show the device-info field instead. if (self.devices[0].uri.startswith('ipp:') and \ self.devices[0].uri.find('/printers/') != -1) or \ ((self.devices[0].uri.startswith('dnssd:') or \ self.devices[0].uri.startswith('mdns:')) and \ self.devices[0].uri.endswith('/cups')): if not self.dnssd_hostname: info = "%s" % self._network_host elif not self._network_host or self._network_host.find(":") != -1: info = "%s" % self.dnssd_hostname else: if self._network_host != self.dnssd_hostname: info = "%s (%s)" % (self.dnssd_hostname, self._network_host) else: info = "%s" % self._network_host elif self.mfg == '' or \ (self.mfg == "HP" and self.mdl.startswith("Fax")): info = self._ppdippstr.get (self.devices[0].info) else: info = "%s %s" % (self.mfg, self.mdl) if ((self._network_host and len (self._network_host) > 0) or \ (self.dnssd_hostname and len (self.dnssd_hostname) > 0)) and not \ ((self.devices[0].uri.startswith('dnssd:') or \ self.devices[0].uri.startswith('mdns:')) and \ self.devices[0].uri.endswith('/cups')) and \ (not self._network_host or \ info.find(self._network_host) == -1) and \ (not self.dnssd_hostname or \ info.find(self.dnssd_hostname) == -1): if not self.dnssd_hostname: info += " (%s)" % self._network_host elif not self._network_host: info += " (%s)" % self.dnssd_hostname else: info += " (%s, %s)" % (self.dnssd_hostname, self._network_host) elif len (self.sn) > 0: info += " (%s)" % self.sn return info
# User data def set_data (self, key, value): self._user_data[key] = value
def get_data (self, key): return self._user_data.get (key)
def __str__ (self): return "(description: %s)" % self.__repr__ ()
def __repr__ (self): return ("<PhysicalDevice.PhysicalDevice (%s,%s,%s,%s,%s,%s)>" % (self.mfg, self.mdl, self.sn, self._network_host, self.dnssd_hostname, self.firsturi))
def __eq__(self, other): if type (other) != type (self): return False
if not (((not self._network_host or len (self._network_host) == 0) and (not other._network_host or len (other._network_host) == 0) and (not self.dnssd_hostname or len (self.dnssd_hostname) == 0) and (not other.dnssd_hostname or len (other.dnssd_hostname) == 0)) or (self._network_host and len (self._network_host) > 0 and other._network_host and len (other._network_host) > 0 and self._network_host == other._network_host) or (self.dnssd_hostname and len (self.dnssd_hostname) > 0 and other.dnssd_hostname and len (other.dnssd_hostname) > 0 and self.dnssd_hostname == other.dnssd_hostname) or (self._network_host and len (self._network_host) > 0 and other.dnssd_hostname and len (other.dnssd_hostname) > 0 and self._network_host == other.dnssd_hostname) or (self.dnssd_hostname and len (self.dnssd_hostname) > 0 and other._network_host and len (other._network_host) > 0 and self.dnssd_hostname == other._network_host)): return False
devs = other.get_devices() if devs: uris = [x.uri for x in self.devices] for dev in devs: if dev.uri in uris: # URI match return True
if ((other.mfg == '' and other.mdl == '') or (self.mfg == '' and self.mdl == '')): if other.mfg == '' and self.mfg == '': # Both just a backend, not a real physical device. return self.devices[0] == other.devices[0]
# One or other is just a backend, not a real physical device. return False
def split_make_and_model (dev): if dev.mfg == '' or dev.mdl.lower ().startswith (dev.mfg.lower ()): make_and_model = dev.mdl else: make_and_model = "%s %s" % (dev.mfg, dev.mdl) (mfg, mdl) = cupshelpers.ppds.ppdMakeModelSplit (make_and_model) return (cupshelpers.ppds.normalize (mfg), cupshelpers.ppds.normalize (mdl))
(our_mfg, our_mdl) = split_make_and_model (self) (other_mfg, other_mdl) = split_make_and_model (other)
if our_mfg != other_mfg: return False
if our_mfg == "hp" and self.sn != '' and self.sn == other.sn: return True
if our_mdl != other_mdl: return False
if self.sn == '' or other.sn == '': return True
return self.sn == other.sn
def __lt__(self, other): if type (other) != type (self): return False
if self == other: return False;
if self._network_host != other._network_host: if self._network_host is None: return True
if other._network_host is None: return False
return self._network_host < other._network_host
if self.dnssd_hostname != other.dnssd_hostname: if self.dnssd_hostname is None: return True
if other.dnssd_hostname is None: return False
return self.dnssd_hostname < other.dnssd_hostname
devs = other.get_devices() if devs: uris = [x.uri for x in self.devices] for dev in devs: if dev.uri in uris: # URI match, so compare equal. Not less than. return False
if ((other.mfg == '' and other.mdl == '') or (self.mfg == '' and self.mdl == '')): if other.mfg == '' and self.mfg == '': # Both just a backend, not a real physical device. return self.devices[0] < other.devices[0]
# One or other is just a backend, not a real physical device. return other.mfg == '' and other.mdl == ''
def split_make_and_model (dev): if dev.mfg == '' or dev.mdl.lower ().startswith (dev.mfg.lower ()): make_and_model = dev.mdl else: make_and_model = "%s %s" % (dev.mfg, dev.mdl) (mfg, mdl) = cupshelpers.ppds.ppdMakeModelSplit (make_and_model) return (cupshelpers.ppds.normalize (mfg), cupshelpers.ppds.normalize (mdl))
(our_mfg, our_mdl) = split_make_and_model (self) (other_mfg, other_mdl) = split_make_and_model (other)
if our_mfg != other_mfg: return our_mfg < other_mfg
if our_mdl != other_mdl: return our_mdl < other_mdl
if self.sn == '' or other.sn == '': return False
return self.sn < other.sn
if __name__ == '__main__': import authconn c = authconn.Connection () devices = cupshelpers.getDevices (c)
physicaldevices = [] for device in devices.values (): physicaldevice = PhysicalDevice (device) try: i = physicaldevices.index (physicaldevice) physicaldevices[i].add_device (device) except ValueError: physicaldevices.append (physicaldevice)
physicaldevices.sort () for physicaldevice in physicaldevices: print(physicaldevice.get_info ()) devices = physicaldevice.get_devices () for device in devices: print(" ", device)
|