import logging import os import time from pydbus import SystemBus from gi.repository import GLib # Setup logging logfile = "/opt/sms/sms.log" FORMAT = '%(asctime)-15s %(message)s' logging.basicConfig(format=FORMAT,filename=logfile,level=10) logger = logging.getLogger('observiumsms') class ModemManagerError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) class SIMError(ModemManagerError): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) def get_modem_pin(): def read_modem_pin(path): with open(path, 'r') as mp: return mp.readline().rstrip('\n') # Check multiple paths paths = [ os.path.join( os.getcwd(), 'modem_pin.txt' ), os.path.join( os.path.dirname(os.path.abspath(__file__)), 'modem_pin.txt' ), '/etc/modem_pin.txt' ] for p in paths: if os.path.isfile(p): return read_modem_pin(p) raise Exception('modem_pin.txt not found in any directory') def get_modem(): # Retry for 5 seconds and give the modems time retries = 0 while retries < 5: retries += 1 # Contact ModemManager, unlock SIM and send SMS to emergency numbers with SystemBus() as bus: mm = bus.get('.ModemManager1') modems = mm.GetManagedObjects() if not modems or len(modems) == 0: time.sleep(5) else: break if not modems or len(modems) == 0: raise ModemManagerError('No modems found') modem_path = list(modems.keys())[0] logger.info("Going with %s", modem_path) modem = bus.get(".ModemManager1", modem_path) return modem def unlock_sim(modem): sim_path = modem.Sim if not sim_path: raise SIMError('No SIM card found') # unlock SIM, if not alreadY props = modem.GetStatus() logger.debug("Modem status: {}".format(props)) if props and props['state'] == 2: # check if the modem_pin.txt file exists mypin = get_modem_pin() logger.info("Trying to unlock %s", sim_path) with SystemBus() as bus: sim = bus.get('.ModemManager1', sim_path) try: sim.SendPin(mypin, timeout=10000) except GLib.Error as exc: remaining_attempts = modem.UnlockRetries raise SIMError('Failed to unlock PIN, remaining attempts: {}'.format(remaining_attempts)) from exc def unlock_and_enable_modem(): retries = 0 # Retry the whole block, because once pydbus fetches the properties # it does not really refresh them, including modems while True: try: # enable the modem # Get modem, unlock SIM modem = get_modem() logger.debug("Unlocking SIM (if needed)") unlock_sim(modem) logger.debug("Enabling modem") modem.Enable(True) return modem except SIMError: # cannot continue with this one (could be a wrong PIN # and then we waste our attempts) raise except: retries += 1 time.sleep(1) if retries > 15: raise def send_sms(recipients, smsstring): # Assemble SMS logger.info("Sending SMS: %s", smsstring) modem = unlock_and_enable_modem() # test sending SMS modem_msg = modem['org.freedesktop.ModemManager1.Modem.Messaging'] smses = modem_msg.List() while not modem.GetStatus().get('signal-quality', None) or not modem.GetStatus()['signal-quality'][1]: modem = get_modem() print(modem.GetStatus().get('signal-quality', None)) time.sleep(2) for line in recipients: line = line.strip() if not line: continue logger.info("Creating new SMS for %s", line) smstext = GLib.Variant('s', smsstring) smsnumber = GLib.Variant('s', line) dictsms = { 'text': smstext, 'number': smsnumber } msgpath = modem_msg.Create(dictsms) logger.info("Created new SMS %s", msgpath) logger.info("Sending SMS %s", msgpath) with SystemBus() as bus: sms = bus.get(".ModemManager1", msgpath) sms.Send(timeout=30000) logger.info("SMS %s state %s", msgpath, sms.State)