129 lines
3.9 KiB
Python
129 lines
3.9 KiB
Python
|
|
import logging
|
|
import os
|
|
import time
|
|
from pydbus import SystemBus
|
|
from gi.repository import GLib
|
|
|
|
|
|
# Setup logging
|
|
logfile = "/opt/sms/sms.log"
|
|
FORMAT = '%(asctime)-15s %(message)s'
|
|
logging.basicConfig(format=FORMAT,filename=logfile,level=10)
|
|
logger = logging.getLogger('observiumsms')
|
|
|
|
class ModemManagerError(Exception):
|
|
def __init__(self, value):
|
|
self.value = value
|
|
def __str__(self):
|
|
return repr(self.value)
|
|
|
|
class SIMError(ModemManagerError):
|
|
def __init__(self, value):
|
|
self.value = value
|
|
def __str__(self):
|
|
return repr(self.value)
|
|
|
|
def get_modem():
|
|
# Retry for 5 seconds and give the modems time
|
|
retries = 0
|
|
while retries < 5:
|
|
retries += 1
|
|
# Contact ModemManager, unlock SIM and send SMS to emergency numbers
|
|
with SystemBus() as bus:
|
|
mm = bus.get('.ModemManager1')
|
|
modems = mm.GetManagedObjects()
|
|
if not modems or len(modems) == 0:
|
|
time.sleep(5)
|
|
else:
|
|
break
|
|
|
|
if not modems or len(modems) == 0:
|
|
raise ModemManagerError('No modems found')
|
|
modem_path = list(modems.keys())[0]
|
|
|
|
logger.info("Going with %s", modem_path)
|
|
modem = bus.get(".ModemManager1", modem_path)
|
|
return modem
|
|
|
|
def unlock_sim(modem):
|
|
sim_path = modem.Sim
|
|
if not sim_path:
|
|
raise SIMError('No SIM card found')
|
|
|
|
# unlock SIM, if not alreadY
|
|
props = modem.GetStatus()
|
|
print(props)
|
|
if props and props['state'] == 2:
|
|
# check if the modem_pin.txt file exists
|
|
with open('modem_pin.txt', 'r') as mp:
|
|
mypin = mp.readline().rstrip('\n')
|
|
|
|
logger.info("Trying to unlock %s", sim_path)
|
|
with SystemBus() as bus:
|
|
sim = bus.get('.ModemManager1', sim_path)
|
|
try:
|
|
sim.SendPin(mypin, timeout=10000)
|
|
except GLib.Error as exc:
|
|
remaining_attempts = modem.UnlockRetries
|
|
raise SIMError('Failed to unlock PIN, remaining attempts: {}'.format(remaining_attempts)) from exc
|
|
|
|
|
|
def send_sms(recipients, smsstring):
|
|
|
|
# Assemble SMS
|
|
logger.info("Sending SMS: %s", smsstring)
|
|
|
|
retries = 0
|
|
# Retry the whole block, because once pydbus fetches the properties
|
|
# it does not really refresh them, including modems
|
|
while True:
|
|
try:
|
|
# enable the modem
|
|
# Get modem, unlock SIM
|
|
modem = get_modem()
|
|
unlock_sim(modem)
|
|
modem.Enable(True)
|
|
break
|
|
except SIMError:
|
|
# cannot continue with this one (could be a wrong PIN
|
|
# and then we waste our attempts)
|
|
raise
|
|
except:
|
|
retries += 1
|
|
time.sleep(1)
|
|
if retries > 5:
|
|
raise
|
|
|
|
# test sending SMS
|
|
modem_msg = modem['org.freedesktop.ModemManager1.Modem.Messaging']
|
|
smses = modem_msg.List()
|
|
|
|
while not modem.GetStatus().get('signal-quality', None) or not modem.GetStatus()['signal-quality'][1]:
|
|
modem = get_modem()
|
|
print(modem.GetStatus().get('signal-quality', None))
|
|
time.sleep(2)
|
|
|
|
for line in recipients:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
logger.info("Creating new SMS for %s", line)
|
|
smstext = GLib.Variant('s', smsstring)
|
|
smsnumber = GLib.Variant('s', line)
|
|
|
|
dictsms = { 'text': smstext, 'number': smsnumber }
|
|
|
|
msgpath = modem_msg.Create(dictsms)
|
|
logger.info("Created new SMS %s", msgpath)
|
|
|
|
logger.info("Sending SMS %s", msgpath)
|
|
with SystemBus() as bus:
|
|
sms = bus.get(".ModemManager1", msgpath)
|
|
sms.Send(timeout=30000)
|
|
|
|
logger.info("SMS %s state %s", msgpath, sms.State)
|
|
|
|
|