- cockringvib400.jpg (28.51 KiB) Viewed 9094 times
You can just use it like this and it works out of the box:
Code: Select all
import lovespouse as ls
ls.SHAKE(1,5) #selects mode 1 and vibrates for 5 seconds in the SHAKE class
ls.SINGLE_SHOCK_MODE1(5) #selects mode 5 in the SINGLE_SHOCK_MODE1 class
ls.SINGLE_SHOCK_MODE2(2) #selects mode 2 in the SINGLE_SHOCK_MODE2 class
ls.TELESCOPIC_MODE(9) #selects mode 9 in the TELESCOPIC_MODE class
ls.OFF() #completes the vibration
- Spoiler: show
- #!/usr/bin/python
#
# SPDX-License-Identifier: LGPL-2.1-or-later
# needs Python 3 to work
#
# little script to control toys used with the love spouse app
#
from __future__ import print_function
import argparse
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service
import time
import threading
#from gi.repository import GObject # python2
from gi.repository import GLib # python3
mainloop = None
BLUEZ_SERVICE_NAME = 'org.bluez'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
# function to determine the hex command from mode and submode to send to the toy
#
def get_command(m, sm):
# set sm to 0 if out of range
if sm <=0 or sm >9:
sm = 0
# Single Shock mode1
if m == 'shock1':
shock1_arr = ["d5964c", "d41f5d", "d7846f", "d60d7e", "d1b20a", "d03b1b", "d3a029", "d22938", "dddec0", "dc57d1", ]
return shock1_arr[sm]
# Single Shock mode2
if m == 'shock2':
shock2_arr = ["a5113f", "a4982e", "a7031c", "a68a0d", "a13579", "a0bc68", "a3275a", "a2ae4b", "ad59b3", "acd0a2", ]
return shock2_arr[sm]
# Shake mode
if m == 'shake':
shake_arr = ["C5175C", "F41D7C", "F7864E", "F60F5F", "F1B02B", "F0393A", "F3A208", "F22B19", "FDDCE1", "FC55F0", ]
return shake_arr[sm]
# Telescopic mode
if m == 'telescope':
tele_arr = ["E5157D", "E49C6C", "E7075E", "E68E4F", "E1313B", "E0B82A", "E32318", "E2AA09", "ED5DF1", "ECD4E0", ]
return tele_arr[sm]
# else return OFF command
return "E5157D"
# define exception classes
class InvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
class NotSupportedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotSupported'
class NotPermittedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotPermitted'
class InvalidValueLengthException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.InvalidValueLength'
class FailedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.Failed'
# class for the advertisement
class Advertisement(dbus.service.Object):
PATH_BASE = '/org/bluez/example/advertisement'
def __init__(self, bus, index, advertising_type):
self.path = self.PATH_BASE + str(index)
self.bus = bus
self.ad_type = advertising_type
self.service_uuids = None
self.manufacturer_data = None
self.solicit_uuids = None
self.service_data = None
self.local_name = None
self.include_tx_power = False
self.data = None
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
properties = dict()
properties['Type'] = self.ad_type
if self.service_uuids is not None:
properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
signature='s')
if self.solicit_uuids is not None:
properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
signature='s')
if self.manufacturer_data is not None:
properties['ManufacturerData'] = dbus.Dictionary(
self.manufacturer_data, signature='qv')
if self.service_data is not None:
properties['ServiceData'] = dbus.Dictionary(self.service_data,
signature='sv')
if self.local_name is not None:
properties['LocalName'] = dbus.String(self.local_name)
if self.include_tx_power:
properties['Includes'] = dbus.Array(["tx-power"], signature='s')
if self.data is not None:
properties['Data'] = dbus.Dictionary(
self.data, signature='yv')
return {LE_ADVERTISEMENT_IFACE: properties}
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service_uuid(self, uuid):
if not self.service_uuids:
self.service_uuids = []
self.service_uuids.append(uuid)
def add_solicit_uuid(self, uuid):
if not self.solicit_uuids:
self.solicit_uuids = []
self.solicit_uuids.append(uuid)
def add_manufacturer_data(self, manuf_code, data):
if not self.manufacturer_data:
self.manufacturer_data = dbus.Dictionary({}, signature='qv')
self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')
def add_service_data(self, uuid, data):
if not self.service_data:
self.service_data = dbus.Dictionary({}, signature='sv')
self.service_data[uuid] = dbus.Array(data, signature='y')
def add_local_name(self, name):
if not self.local_name:
self.local_name = ""
self.local_name = dbus.String(name)
def add_data(self, ad_type, data):
if not self.data:
self.data = dbus.Dictionary({}, signature='yv')
self.data[ad_type] = dbus.Array(data, signature='y')
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
print('GetAll')
if interface != LE_ADVERTISEMENT_IFACE:
raise InvalidArgsException()
print('returning props')
return self.get_properties()[LE_ADVERTISEMENT_IFACE]
@dbus.service.method(LE_ADVERTISEMENT_IFACE,
in_signature='',
out_signature='')
def Release(self):
print('%s: Released!' % self.path)
# class for the lovespouse advertisement
class LoveSpouseAdvertisement(Advertisement):
def __init__(self, bus, index, command):
Advertisement.__init__(self, bus, index, 'peripheral')
self.add_manufacturer_data(0xff, bytearray.fromhex("0000006db643ce97fe427c"+command))
self.add_local_name('LSpouse')
self.include_tx_power = True
# Ad registered callback
def register_ad_cb():
print('Advertisement registered')
# Ad registered error callback
def register_ad_error_cb(error):
print('Failed to register advertisement: ' + str(error))
mainloop.quit()
# find adapter function
def find_adapter(bus):
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
objects = remote_om.GetManagedObjects()
for o, props in objects.items():
if LE_ADVERTISING_MANAGER_IFACE in props:
return o
return None
# shutdown callback
def shutdown(timeout):
print('Advertising for {} seconds...'.format(timeout))
time.sleep(timeout)
mainloop.quit()
#
# main function
#
# duration - time to send the signal, 0 = forever
# mode - either shock1, shock2, shake or telescope, which are the modes for the LoveSpouse toys
# submode - 0 to 9, represents the 10 submodes of each mode
def main(duration, mode, submode):
global mainloop
# get the hex command string from the selected modes
command = get_command(mode, submode)
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
adapter = find_adapter(bus)
if not adapter:
print('LEAdvertisingManager1 interface not found')
return
adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
"org.freedesktop.DBus.Properties")
adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
LE_ADVERTISING_MANAGER_IFACE)
ls_advertisement = LoveSpouseAdvertisement(bus, 0, command)
#mainloop = GObject.MainLoop() # python2
mainloop = GLib.MainLoop()
ad_manager.RegisterAdvertisement(ls_advertisement.get_path(), {},
reply_handler=register_ad_cb,
error_handler=register_ad_error_cb)
if duration > 0:
threading.Thread(target=shutdown, args=(duration,)).start()
else:
print('Advertising forever...')
mainloop.run() # blocks until mainloop.quit() is called
ad_manager.UnregisterAdvertisement(ls_advertisement)
print('Advertisement unregistered')
dbus.service.Object.remove_from_connection(ls_advertisement)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--duration', default=0, type=float, help="time in seconds to send the command for (default: 0)")
parser.add_argument('--mode', default='shock1', type=str, help="switch device to mode, must be one of shock1, shock2, shake, telescope, off (default: shock1)")
parser.add_argument('--submode', default='0', type=int, help="switch device to submode 0-9 (default: 0)")
args = parser.parse_args()
main(args.duration, args.mode, args.submode)

