First commit with working implementation
This commit is contained in:
commit
1feabffe2b
96
.gitignore
vendored
Normal file
96
.gitignore
vendored
Normal file
@ -0,0 +1,96 @@
|
||||
# Custom
|
||||
modem_pin.txt
|
||||
*.log
|
||||
*.pyc
|
||||
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
env/
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*,cover
|
||||
.hypothesis/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# pyenv
|
||||
.python-version
|
||||
|
||||
# celery beat schedule file
|
||||
celerybeat-schedule
|
||||
|
||||
# dotenv
|
||||
.env
|
||||
|
||||
# virtualenv
|
||||
.venv
|
||||
venv/
|
||||
ENV/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
15
README.md
Normal file
15
README.md
Normal file
@ -0,0 +1,15 @@
|
||||
# mbim-sms-sender
|
||||
|
||||
A tool for sending SMS through a modem connected to MBIM/AT/other standard
|
||||
interface, through ModemManager using D-Bus.
|
||||
|
||||
## Usage:
|
||||
|
||||
**IMPORTANT:** If your SIM card has a PIN set up, insert it into `modem_pin.txt` file.
|
||||
|
||||
```
|
||||
import sms_functions
|
||||
sms_functions.send_sms([ '+420608123456', '+420775123456' ], 'Testovaci
|
||||
textova zprava')
|
||||
```
|
||||
|
83
send_sms.py
Executable file
83
send_sms.py
Executable file
@ -0,0 +1,83 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import logging
|
||||
import os
|
||||
from pydbus import SystemBus
|
||||
from gi.repository import GLib
|
||||
|
||||
def assemble_sms():
|
||||
str_list = []
|
||||
str_list.append("Observium {}\n".format(os.environ.get('OBSERVIUM_TITLE')))
|
||||
str_list.append("{} [{}]\n".format(os.environ.get('OBSERVIUM_ENTITY_NAME'), os.environ.get('OBSERVIUM_ENTITY_DESCRIPTION')))
|
||||
str_list.append("{}".format(os.environ.get('OBSERVIUM_DURATION')))
|
||||
return ''.join(str_list)
|
||||
|
||||
# Setup logging
|
||||
logfile = "/opt/sms/sms.log"
|
||||
FORMAT = '%(asctime)-15s %(message)s'
|
||||
logging.basicConfig(format=FORMAT,filename=logfile,level=10)
|
||||
logger = logging.getLogger('observiumsms')
|
||||
|
||||
# Path to file with SMS numbers
|
||||
emergency_file = "/opt/sms/emergency_numbers.txt"
|
||||
|
||||
# Assemble SMS
|
||||
smsstring = assemble_sms()
|
||||
logger.info("Sending SMS: %s", smsstring)
|
||||
|
||||
# Contact ModemManager, unlock SIM and send SMS to emergency numbers
|
||||
with SystemBus() as bus:
|
||||
mm = bus.get('.ModemManager1')
|
||||
modems = mm.GetManagedObjects()
|
||||
modem_path = list(modems.keys())[0]
|
||||
|
||||
logger.info("Going with %s", modem_path)
|
||||
modem = bus.get(".ModemManager1", modem_path)
|
||||
|
||||
sim_path = modem.Sim
|
||||
if not sim_path:
|
||||
logger.critical("No SIM")
|
||||
quit()
|
||||
|
||||
# unlock SIM, if not alreadY
|
||||
props = modem.GetStatus()
|
||||
print(props)
|
||||
if props and props['state'] == 2:
|
||||
logger.info("Trying to unlock %s", sim_path)
|
||||
sim = bus.get('.ModemManager1', sim_path)
|
||||
sim.SendPin("1234", timeout=10000)
|
||||
props = modem.GetStatus()
|
||||
if props and props['state'] == 2:
|
||||
logger.critical("Wrong PIN!")
|
||||
quit()
|
||||
|
||||
# enable the modem
|
||||
modem.Enable(True)
|
||||
|
||||
# test sending SMS
|
||||
modem_msg = modem['org.freedesktop.ModemManager1.Modem.Messaging']
|
||||
smses = modem_msg.List()
|
||||
|
||||
print(modem.GetStatus()['signal-quality'])
|
||||
|
||||
raise Exception('TODO phone numbers')
|
||||
|
||||
for line in fnums:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
logger.info("Creating new SMS for %s", line)
|
||||
smstext = GLib.Variant('s', smsstring)
|
||||
smsnumber = GLib.Variant('s', line)
|
||||
|
||||
dictsms = { 'text': smstext, 'number': smsnumber }
|
||||
|
||||
msgpath = modem_msg.Create(dictsms)
|
||||
logger.info("Created new SMS %s", msgpath)
|
||||
|
||||
logger.info("Sending SMS %s", msgpath)
|
||||
sms = bus.get(".ModemManager1", msgpath)
|
||||
sms.Send(timeout=30000)
|
||||
|
||||
logger.info("SMS %s state %s", msgpath, sms.State)
|
104
sms_functions.py
Normal file
104
sms_functions.py
Normal file
@ -0,0 +1,104 @@
|
||||
|
||||
import logging
|
||||
import os
|
||||
from pydbus import SystemBus
|
||||
from gi.repository import GLib
|
||||
|
||||
|
||||
# Setup logging
|
||||
logfile = "/opt/sms/sms.log"
|
||||
FORMAT = '%(asctime)-15s %(message)s'
|
||||
logging.basicConfig(format=FORMAT,filename=logfile,level=10)
|
||||
logger = logging.getLogger('observiumsms')
|
||||
|
||||
class ModemManagerError(Exception):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
def __str__(self):
|
||||
return repr(self.value)
|
||||
|
||||
class SIMError(ModemManagerError):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
def __str__(self):
|
||||
return repr(self.value)
|
||||
|
||||
def assemble_sms():
|
||||
str_list = []
|
||||
str_list.append("Observium {}\n".format(os.environ.get('OBSERVIUM_TITLE')))
|
||||
str_list.append("{} [{}]\n".format(os.environ.get('OBSERVIUM_ENTITY_NAME'), os.environ.get('OBSERVIUM_ENTITY_DESCRIPTION')))
|
||||
str_list.append("{}".format(os.environ.get('OBSERVIUM_DURATION')))
|
||||
return ''.join(str_list)
|
||||
|
||||
def get_modem():
|
||||
# Contact ModemManager, unlock SIM and send SMS to emergency numbers
|
||||
with SystemBus() as bus:
|
||||
mm = bus.get('.ModemManager1')
|
||||
modems = mm.GetManagedObjects()
|
||||
if not modems or len(modems) == 0:
|
||||
raise ModemManagerError('No modems found')
|
||||
modem_path = list(modems.keys())[0]
|
||||
|
||||
logger.info("Going with %s", modem_path)
|
||||
modem = bus.get(".ModemManager1", modem_path)
|
||||
return modem
|
||||
|
||||
def unlock_sim(modem):
|
||||
sim_path = modem.Sim
|
||||
if not sim_path:
|
||||
raise SIMError('No SIM card found')
|
||||
|
||||
# unlock SIM, if not alreadY
|
||||
props = modem.GetStatus()
|
||||
print(props)
|
||||
if props and props['state'] == 2:
|
||||
logger.info("Trying to unlock %s", sim_path)
|
||||
with SystemBus() as bus:
|
||||
sim = bus.get('.ModemManager1', sim_path)
|
||||
sim.SendPin("1234", timeout=10000)
|
||||
props = modem.GetStatus()
|
||||
if props and props['state'] == 2:
|
||||
remaining_attempts = modem.UnlockRetries
|
||||
print(remaining_attempts)
|
||||
raise SIMError('Wrong PIN!')
|
||||
|
||||
def send_sms(recipients, smsstring):
|
||||
|
||||
# Assemble SMS
|
||||
logger.info("Sending SMS: %s", smsstring)
|
||||
|
||||
# Get modem, unlock SIM
|
||||
modem = get_modem()
|
||||
unlock_sim(modem)
|
||||
|
||||
# enable the modem
|
||||
modem.Enable(True)
|
||||
|
||||
# test sending SMS
|
||||
modem_msg = modem['org.freedesktop.ModemManager1.Modem.Messaging']
|
||||
smses = modem_msg.List()
|
||||
|
||||
print(modem.GetStatus()['signal-quality'])
|
||||
|
||||
for line in recipients:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
logger.info("Creating new SMS for %s", line)
|
||||
smstext = GLib.Variant('s', smsstring)
|
||||
smsnumber = GLib.Variant('s', line)
|
||||
|
||||
dictsms = { 'text': smstext, 'number': smsnumber }
|
||||
|
||||
msgpath = modem_msg.Create(dictsms)
|
||||
logger.info("Created new SMS %s", msgpath)
|
||||
|
||||
logger.info("Sending SMS %s", msgpath)
|
||||
with SystemBus() as bus:
|
||||
sms = bus.get(".ModemManager1", msgpath)
|
||||
sms.Send(timeout=30000)
|
||||
|
||||
logger.info("SMS %s state %s", msgpath, sms.State)
|
||||
|
||||
|
10
test_sms.py
Executable file
10
test_sms.py
Executable file
@ -0,0 +1,10 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import os
|
||||
|
||||
ftest = open("/opt/sms/sms.log", "w")
|
||||
ftest.write("Observium {}\n".format(os.environ.get('OBSERVIUM_TITLE')))
|
||||
ftest.write("{} [{}]\n".format(os.environ.get('OBSERVIUM_ENTITY_NAME'), os.environ.get('OBSERVIUM_ENTITY_DESCRIPTION')))
|
||||
ftest.write("{}".format(os.environ.get('OBSERVIUM_DURATION')))
|
||||
ftest.close()
|
||||
|
Loading…
Reference in New Issue
Block a user