Scripting Love spouse app toys

This is the place for all suggestions, releases and feedback regarding Milovana Hardware efforts.
Post Reply
User avatar
ChrisHolm
Explorer
Explorer
Posts: 48
Joined: Fri Apr 22, 2022 4:37 am
Gender: Male
Sexual Orientation: Straight
I am a: Submissive

Scripting Love spouse app toys

Post by ChrisHolm »

Getting cheap toys from China always challenges in the varieties to control the device. This includes toys controlled by the love spouse app, which are not very well supported and documented, like this little friend in the picture
cockringvib400.jpg
cockringvib400.jpg (28.51 KiB) Viewed 9094 times
Searching the internet I found a little python script to control those toys via script, which works very well for the windows but not for linux. https://snyk.io/advisor/python/pylovespouse

You can just use it like this and it works out of the box:

Code: Select all

import lovespouse as ls
ls.SHAKE(1,5) 			      #selects mode 1 and vibrates for 5 seconds in the SHAKE class
ls.SINGLE_SHOCK_MODE1(5)    #selects mode 5 in the SINGLE_SHOCK_MODE1 class
ls.SINGLE_SHOCK_MODE2(2)    #selects mode 2 in the SINGLE_SHOCK_MODE2 class
ls.TELESCOPIC_MODE(9)         #selects mode 9 in the TELESCOPIC_MODE class
ls.OFF()                                 #completes the vibration
So I combined my knowledge of bluetooth with python under linux with the windows script and got a small tool to control those toys from linux, which I share here.
Spoiler: show
#!/usr/bin/python
#
# SPDX-License-Identifier: LGPL-2.1-or-later
# needs Python 3 to work
#
# little script to control toys used with the love spouse app
#

from __future__ import print_function

import argparse
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service
import time
import threading

#from gi.repository import GObject # python2
from gi.repository import GLib # python3

mainloop = None

BLUEZ_SERVICE_NAME = 'org.bluez'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'

# function to determine the hex command from mode and submode to send to the toy
#
def get_command(m, sm):
# set sm to 0 if out of range
if sm <=0 or sm >9:
sm = 0

# Single Shock mode1
if m == 'shock1':
shock1_arr = ["d5964c", "d41f5d", "d7846f", "d60d7e", "d1b20a", "d03b1b", "d3a029", "d22938", "dddec0", "dc57d1", ]
return shock1_arr[sm]
# Single Shock mode2
if m == 'shock2':
shock2_arr = ["a5113f", "a4982e", "a7031c", "a68a0d", "a13579", "a0bc68", "a3275a", "a2ae4b", "ad59b3", "acd0a2", ]
return shock2_arr[sm]
# Shake mode
if m == 'shake':
shake_arr = ["C5175C", "F41D7C", "F7864E", "F60F5F", "F1B02B", "F0393A", "F3A208", "F22B19", "FDDCE1", "FC55F0", ]
return shake_arr[sm]
# Telescopic mode
if m == 'telescope':
tele_arr = ["E5157D", "E49C6C", "E7075E", "E68E4F", "E1313B", "E0B82A", "E32318", "E2AA09", "ED5DF1", "ECD4E0", ]
return tele_arr[sm]

# else return OFF command
return "E5157D"


# define exception classes
class InvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
class NotSupportedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotSupported'
class NotPermittedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotPermitted'
class InvalidValueLengthException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.InvalidValueLength'
class FailedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.Failed'

# class for the advertisement
class Advertisement(dbus.service.Object):
PATH_BASE = '/org/bluez/example/advertisement'

def __init__(self, bus, index, advertising_type):
self.path = self.PATH_BASE + str(index)
self.bus = bus
self.ad_type = advertising_type
self.service_uuids = None
self.manufacturer_data = None
self.solicit_uuids = None
self.service_data = None
self.local_name = None
self.include_tx_power = False
self.data = None
dbus.service.Object.__init__(self, bus, self.path)

def get_properties(self):
properties = dict()
properties['Type'] = self.ad_type
if self.service_uuids is not None:
properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
signature='s')
if self.solicit_uuids is not None:
properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
signature='s')
if self.manufacturer_data is not None:
properties['ManufacturerData'] = dbus.Dictionary(
self.manufacturer_data, signature='qv')
if self.service_data is not None:
properties['ServiceData'] = dbus.Dictionary(self.service_data,
signature='sv')
if self.local_name is not None:
properties['LocalName'] = dbus.String(self.local_name)
if self.include_tx_power:
properties['Includes'] = dbus.Array(["tx-power"], signature='s')

if self.data is not None:
properties['Data'] = dbus.Dictionary(
self.data, signature='yv')
return {LE_ADVERTISEMENT_IFACE: properties}

def get_path(self):
return dbus.ObjectPath(self.path)

def add_service_uuid(self, uuid):
if not self.service_uuids:
self.service_uuids = []
self.service_uuids.append(uuid)

def add_solicit_uuid(self, uuid):
if not self.solicit_uuids:
self.solicit_uuids = []
self.solicit_uuids.append(uuid)

def add_manufacturer_data(self, manuf_code, data):
if not self.manufacturer_data:
self.manufacturer_data = dbus.Dictionary({}, signature='qv')
self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')

def add_service_data(self, uuid, data):
if not self.service_data:
self.service_data = dbus.Dictionary({}, signature='sv')
self.service_data[uuid] = dbus.Array(data, signature='y')

def add_local_name(self, name):
if not self.local_name:
self.local_name = ""
self.local_name = dbus.String(name)

def add_data(self, ad_type, data):
if not self.data:
self.data = dbus.Dictionary({}, signature='yv')
self.data[ad_type] = dbus.Array(data, signature='y')

@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
print('GetAll')
if interface != LE_ADVERTISEMENT_IFACE:
raise InvalidArgsException()
print('returning props')
return self.get_properties()[LE_ADVERTISEMENT_IFACE]

@dbus.service.method(LE_ADVERTISEMENT_IFACE,
in_signature='',
out_signature='')
def Release(self):
print('%s: Released!' % self.path)

# class for the lovespouse advertisement
class LoveSpouseAdvertisement(Advertisement):

def __init__(self, bus, index, command):
Advertisement.__init__(self, bus, index, 'peripheral')

self.add_manufacturer_data(0xff, bytearray.fromhex("0000006db643ce97fe427c"+command))
self.add_local_name('LSpouse')
self.include_tx_power = True

# Ad registered callback
def register_ad_cb():
print('Advertisement registered')

# Ad registered error callback
def register_ad_error_cb(error):
print('Failed to register advertisement: ' + str(error))
mainloop.quit()

# find adapter function
def find_adapter(bus):
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
objects = remote_om.GetManagedObjects()

for o, props in objects.items():
if LE_ADVERTISING_MANAGER_IFACE in props:
return o

return None

# shutdown callback
def shutdown(timeout):
print('Advertising for {} seconds...'.format(timeout))
time.sleep(timeout)
mainloop.quit()

#
# main function
#
# duration - time to send the signal, 0 = forever
# mode - either shock1, shock2, shake or telescope, which are the modes for the LoveSpouse toys
# submode - 0 to 9, represents the 10 submodes of each mode
def main(duration, mode, submode):
global mainloop

# get the hex command string from the selected modes
command = get_command(mode, submode)

dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
adapter = find_adapter(bus)
if not adapter:
print('LEAdvertisingManager1 interface not found')
return

adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
"org.freedesktop.DBus.Properties")

adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))

ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
LE_ADVERTISING_MANAGER_IFACE)

ls_advertisement = LoveSpouseAdvertisement(bus, 0, command)

#mainloop = GObject.MainLoop() # python2
mainloop = GLib.MainLoop()

ad_manager.RegisterAdvertisement(ls_advertisement.get_path(), {},
reply_handler=register_ad_cb,
error_handler=register_ad_error_cb)

if duration > 0:
threading.Thread(target=shutdown, args=(duration,)).start()
else:
print('Advertising forever...')

mainloop.run() # blocks until mainloop.quit() is called

ad_manager.UnregisterAdvertisement(ls_advertisement)
print('Advertisement unregistered')
dbus.service.Object.remove_from_connection(ls_advertisement)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--duration', default=0, type=float, help="time in seconds to send the command for (default: 0)")
parser.add_argument('--mode', default='shock1', type=str, help="switch device to mode, must be one of shock1, shock2, shake, telescope, off (default: shock1)")
parser.add_argument('--submode', default='0', type=int, help="switch device to submode 0-9 (default: 0)")
args = parser.parse_args()

main(args.duration, args.mode, args.submode)
And now the question: is it possible to integrate this into EOS somehow to use it with a tease?
User avatar
Ksoles
Explorer
Explorer
Posts: 45
Joined: Tue Nov 14, 2023 7:48 pm

Re: Scripting Love spouse app toys

Post by Ksoles »

ChrisHolm wrote: Sat Dec 28, 2024 9:06 am Getting cheap toys from China always challenges in the varieties to control the device. This includes toys controlled by the love spouse app, which are not very well supported and documented, like this little friend in the picture

cockringvib400.jpg

Searching the internet I found a little python script to control those toys via script, which works very well for the windows but not for linux. https://snyk.io/advisor/python/pylovespouse

You can just use it like this and it works out of the box:

Code: Select all

import lovespouse as ls
ls.SHAKE(1,5) 			      #selects mode 1 and vibrates for 5 seconds in the SHAKE class
ls.SINGLE_SHOCK_MODE1(5)    #selects mode 5 in the SINGLE_SHOCK_MODE1 class
ls.SINGLE_SHOCK_MODE2(2)    #selects mode 2 in the SINGLE_SHOCK_MODE2 class
ls.TELESCOPIC_MODE(9)         #selects mode 9 in the TELESCOPIC_MODE class
ls.OFF()                                 #completes the vibration
So I combined my knowledge of bluetooth with python under linux with the windows script and got a small tool to control those toys from linux, which I share here.
Spoiler: show
#!/usr/bin/python
#
# SPDX-License-Identifier: LGPL-2.1-or-later
# needs Python 3 to work
#
# little script to control toys used with the love spouse app
#

from __future__ import print_function

import argparse
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service
import time
import threading

#from gi.repository import GObject # python2
from gi.repository import GLib # python3

mainloop = None

BLUEZ_SERVICE_NAME = 'org.bluez'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'

# function to determine the hex command from mode and submode to send to the toy
#
def get_command(m, sm):
# set sm to 0 if out of range
if sm <=0 or sm >9:
sm = 0

# Single Shock mode1
if m == 'shock1':
shock1_arr = ["d5964c", "d41f5d", "d7846f", "d60d7e", "d1b20a", "d03b1b", "d3a029", "d22938", "dddec0", "dc57d1", ]
return shock1_arr[sm]
# Single Shock mode2
if m == 'shock2':
shock2_arr = ["a5113f", "a4982e", "a7031c", "a68a0d", "a13579", "a0bc68", "a3275a", "a2ae4b", "ad59b3", "acd0a2", ]
return shock2_arr[sm]
# Shake mode
if m == 'shake':
shake_arr = ["C5175C", "F41D7C", "F7864E", "F60F5F", "F1B02B", "F0393A", "F3A208", "F22B19", "FDDCE1", "FC55F0", ]
return shake_arr[sm]
# Telescopic mode
if m == 'telescope':
tele_arr = ["E5157D", "E49C6C", "E7075E", "E68E4F", "E1313B", "E0B82A", "E32318", "E2AA09", "ED5DF1", "ECD4E0", ]
return tele_arr[sm]

# else return OFF command
return "E5157D"


# define exception classes
class InvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
class NotSupportedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotSupported'
class NotPermittedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotPermitted'
class InvalidValueLengthException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.InvalidValueLength'
class FailedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.Failed'

# class for the advertisement
class Advertisement(dbus.service.Object):
PATH_BASE = '/org/bluez/example/advertisement'

def __init__(self, bus, index, advertising_type):
self.path = self.PATH_BASE + str(index)
self.bus = bus
self.ad_type = advertising_type
self.service_uuids = None
self.manufacturer_data = None
self.solicit_uuids = None
self.service_data = None
self.local_name = None
self.include_tx_power = False
self.data = None
dbus.service.Object.__init__(self, bus, self.path)

def get_properties(self):
properties = dict()
properties['Type'] = self.ad_type
if self.service_uuids is not None:
properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
signature='s')
if self.solicit_uuids is not None:
properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
signature='s')
if self.manufacturer_data is not None:
properties['ManufacturerData'] = dbus.Dictionary(
self.manufacturer_data, signature='qv')
if self.service_data is not None:
properties['ServiceData'] = dbus.Dictionary(self.service_data,
signature='sv')
if self.local_name is not None:
properties['LocalName'] = dbus.String(self.local_name)
if self.include_tx_power:
properties['Includes'] = dbus.Array(["tx-power"], signature='s')

if self.data is not None:
properties['Data'] = dbus.Dictionary(
self.data, signature='yv')
return {LE_ADVERTISEMENT_IFACE: properties}

def get_path(self):
return dbus.ObjectPath(self.path)

def add_service_uuid(self, uuid):
if not self.service_uuids:
self.service_uuids = []
self.service_uuids.append(uuid)

def add_solicit_uuid(self, uuid):
if not self.solicit_uuids:
self.solicit_uuids = []
self.solicit_uuids.append(uuid)

def add_manufacturer_data(self, manuf_code, data):
if not self.manufacturer_data:
self.manufacturer_data = dbus.Dictionary({}, signature='qv')
self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')

def add_service_data(self, uuid, data):
if not self.service_data:
self.service_data = dbus.Dictionary({}, signature='sv')
self.service_data[uuid] = dbus.Array(data, signature='y')

def add_local_name(self, name):
if not self.local_name:
self.local_name = ""
self.local_name = dbus.String(name)

def add_data(self, ad_type, data):
if not self.data:
self.data = dbus.Dictionary({}, signature='yv')
self.data[ad_type] = dbus.Array(data, signature='y')

@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
print('GetAll')
if interface != LE_ADVERTISEMENT_IFACE:
raise InvalidArgsException()
print('returning props')
return self.get_properties()[LE_ADVERTISEMENT_IFACE]

@dbus.service.method(LE_ADVERTISEMENT_IFACE,
in_signature='',
out_signature='')
def Release(self):
print('%s: Released!' % self.path)

# class for the lovespouse advertisement
class LoveSpouseAdvertisement(Advertisement):

def __init__(self, bus, index, command):
Advertisement.__init__(self, bus, index, 'peripheral')

self.add_manufacturer_data(0xff, bytearray.fromhex("0000006db643ce97fe427c"+command))
self.add_local_name('LSpouse')
self.include_tx_power = True

# Ad registered callback
def register_ad_cb():
print('Advertisement registered')

# Ad registered error callback
def register_ad_error_cb(error):
print('Failed to register advertisement: ' + str(error))
mainloop.quit()

# find adapter function
def find_adapter(bus):
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
objects = remote_om.GetManagedObjects()

for o, props in objects.items():
if LE_ADVERTISING_MANAGER_IFACE in props:
return o

return None

# shutdown callback
def shutdown(timeout):
print('Advertising for {} seconds...'.format(timeout))
time.sleep(timeout)
mainloop.quit()

#
# main function
#
# duration - time to send the signal, 0 = forever
# mode - either shock1, shock2, shake or telescope, which are the modes for the LoveSpouse toys
# submode - 0 to 9, represents the 10 submodes of each mode
def main(duration, mode, submode):
global mainloop

# get the hex command string from the selected modes
command = get_command(mode, submode)

dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
adapter = find_adapter(bus)
if not adapter:
print('LEAdvertisingManager1 interface not found')
return

adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
"org.freedesktop.DBus.Properties")

adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))

ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
LE_ADVERTISING_MANAGER_IFACE)

ls_advertisement = LoveSpouseAdvertisement(bus, 0, command)

#mainloop = GObject.MainLoop() # python2
mainloop = GLib.MainLoop()

ad_manager.RegisterAdvertisement(ls_advertisement.get_path(), {},
reply_handler=register_ad_cb,
error_handler=register_ad_error_cb)

if duration > 0:
threading.Thread(target=shutdown, args=(duration,)).start()
else:
print('Advertising forever...')

mainloop.run() # blocks until mainloop.quit() is called

ad_manager.UnregisterAdvertisement(ls_advertisement)
print('Advertisement unregistered')
dbus.service.Object.remove_from_connection(ls_advertisement)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--duration', default=0, type=float, help="time in seconds to send the command for (default: 0)")
parser.add_argument('--mode', default='shock1', type=str, help="switch device to mode, must be one of shock1, shock2, shake, telescope, off (default: shock1)")
parser.add_argument('--submode', default='0', type=int, help="switch device to submode 0-9 (default: 0)")
args = parser.parse_args()

main(args.duration, args.mode, args.submode)
And now the question: is it possible to integrate this into EOS somehow to use it with a tease?
If you were to integrate this with Intiface Central, you could use the script made by cfs6t08p. However you would need to produce your own teases, or just modify existing ones manually.
User avatar
ChrisHolm
Explorer
Explorer
Posts: 48
Joined: Fri Apr 22, 2022 4:37 am
Gender: Male
Sexual Orientation: Straight
I am a: Submissive

Re: Scripting Love spouse app toys

Post by ChrisHolm »

Thank you, great hint, I took a look at the script by cfs6t08p and think I will try to have something similiar for the LoveSpouse toys. I am not sure if I will be able to integrate it with buttblug.io, at least it should be possible to integrate it standalone.

I will describe my solution here if it works.

Thanks again. :-)
User avatar
ChrisHolm
Explorer
Explorer
Posts: 48
Joined: Fri Apr 22, 2022 4:37 am
Gender: Male
Sexual Orientation: Straight
I am a: Submissive

Re: Scripting Love spouse app toys

Post by ChrisHolm »

Now here is my working solution for now. :-)

I created a monkey script base on the one from cfs6t08p. You will need violentmonkey add in for chrome or firefox to use it.
Search for LoveSpouseEOS skript or visit https://sleazyfork.org/de/scripts/522319-lovespouseeos directly and install it. You will need a small python script which will create a small http-server on your local computer and controls your toys from within the http request s made by the monkey script.

Here is the script:
Spoiler: show

Code: Select all

#!/usr/bin/python
#
# SPDX-License-Identifier: LGPL-2.1-or-later
# needs Python 3 to work
#
# little script to control toys used with the love spouse app
#

from __future__ import print_function

import argparse
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service
import time
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
import logging
import json
from gi.repository import GLib  # python3

mainloop = None

BLUEZ_SERVICE_NAME = 'org.bluez'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'

# function to determine the hex command from mode and submode to send to the toy
def get_command(m, sm):
	# set sm to 0 if out of range
	if sm <=0 or sm >9:
		sm = 0
		
	# Single Shock mode1
	if m == 'shock1':
		shock1_arr = ["d5964c", "d41f5d", "d7846f", "d60d7e", "d1b20a", "d03b1b", "d3a029", "d22938", "dddec0", "dc57d1", ]
		return shock1_arr[sm]
    # Single Shock mode2
	if m == 'shock2':
		shock2_arr = ["a5113f", "a4982e", "a7031c", "a68a0d", "a13579", "a0bc68", "a3275a", "a2ae4b", "ad59b3", "acd0a2", ]
		return shock2_arr[sm]
   # Shake mode
	if m == 'shake':
		shake_arr = ["C5175C", "F41D7C", "F7864E", "F60F5F", "F1B02B", "F0393A", "F3A208", "F22B19", "FDDCE1", "FC55F0", ]
		return shake_arr[sm]     
    # Telescopic mode
	if m == 'telescope':
		tele_arr = ["E5157D", "E49C6C", "E7075E", "E68E4F", "E1313B", "E0B82A", "E32318", "E2AA09", "ED5DF1", "ECD4E0", ]
		return tele_arr[sm]
     
    # else return OFF command
	return "E5157D"

# class to process incoming requests        
class S(BaseHTTPRequestHandler):
    def _set_response(self):
        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()

    def do_GET(self):
        self._set_response()

    def do_POST(self):
        content_length = int(self.headers['Content-Length']) # <--- Gets the size of data
        post_data = self.rfile.read(content_length) # <--- Gets the data itself

        self._set_response()
        if str(self.path) == "/lovespouse":
            parms = json.loads(post_data.decode('utf-8'))
            toy_control(parms)
			


# define exception classes
class InvalidArgsException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
class NotSupportedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotSupported'
class NotPermittedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotPermitted'
class InvalidValueLengthException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'
class FailedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.Failed'

# class for the advertisement
class Advertisement(dbus.service.Object):
    PATH_BASE = '/org/bluez/example/advertisement'

    def __init__(self, bus, index, advertising_type):
        self.path = self.PATH_BASE + str(index)
        self.bus = bus
        self.ad_type = advertising_type
        self.service_uuids = None
        self.manufacturer_data = None
        self.solicit_uuids = None
        self.service_data = None
        self.local_name = None
        self.include_tx_power = False
        self.data = None
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        properties = dict()
        properties['Type'] = self.ad_type
        if self.service_uuids is not None:
            properties['ServiceUUIDs'] = dbus.Array(self.service_uuids,
                                                    signature='s')
        if self.solicit_uuids is not None:
            properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids,
                                                    signature='s')
        if self.manufacturer_data is not None:
            properties['ManufacturerData'] = dbus.Dictionary(
                self.manufacturer_data, signature='qv')
        if self.service_data is not None:
            properties['ServiceData'] = dbus.Dictionary(self.service_data,
                                                        signature='sv')
        if self.local_name is not None:
            properties['LocalName'] = dbus.String(self.local_name)
        if self.include_tx_power:
            properties['Includes'] = dbus.Array(["tx-power"], signature='s')

        if self.data is not None:
            properties['Data'] = dbus.Dictionary(
                self.data, signature='yv')
        return {LE_ADVERTISEMENT_IFACE: properties}

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service_uuid(self, uuid):
        if not self.service_uuids:
            self.service_uuids = []
        self.service_uuids.append(uuid)

    def add_solicit_uuid(self, uuid):
        if not self.solicit_uuids:
            self.solicit_uuids = []
        self.solicit_uuids.append(uuid)

    def add_manufacturer_data(self, manuf_code, data):
        if not self.manufacturer_data:
            self.manufacturer_data = dbus.Dictionary({}, signature='qv')
        self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y')

    def add_service_data(self, uuid, data):
        if not self.service_data:
            self.service_data = dbus.Dictionary({}, signature='sv')
        self.service_data[uuid] = dbus.Array(data, signature='y')

    def add_local_name(self, name):
        if not self.local_name:
            self.local_name = ""
        self.local_name = dbus.String(name)

    def add_data(self, ad_type, data):
        if not self.data:
            self.data = dbus.Dictionary({}, signature='yv')
        self.data[ad_type] = dbus.Array(data, signature='y')

    @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}')
    def GetAll(self, interface):
        print('GetAll')
        if interface != LE_ADVERTISEMENT_IFACE:
            raise InvalidArgsException()
        print('returning props')
        return self.get_properties()[LE_ADVERTISEMENT_IFACE]

    @dbus.service.method(LE_ADVERTISEMENT_IFACE, in_signature='', out_signature='')
    def Release(self):
        print('%s: Released!' % self.path)

# class for the lovespouse advertisement
class LoveSpouseAdvertisement(Advertisement):

    def __init__(self, bus, index, command):
        Advertisement.__init__(self, bus, index, 'peripheral')

        self.add_manufacturer_data(0xff, bytearray.fromhex("0000006db643ce97fe427c"+command))
        self.add_local_name('LSpouse')
        self.include_tx_power = True

# Add registered callback
def register_ad_cb():
    print('Advertisement registered')

# Add registered error callback
def register_ad_error_cb(error):
    print('Failed to register advertisement: ' + str(error))
    mainloop.quit()

# find adapter function
def find_adapter(bus):
    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
    objects = remote_om.GetManagedObjects()

    for o, props in objects.items():
        if LE_ADVERTISING_MANAGER_IFACE in props:
            return o

    return None

# shutdown callback
def shutdown(timeout):
    print('Advertising for {} seconds...'.format(timeout))
    time.sleep(timeout)
    mainloop.quit()

#
# main function for sending the advertisement
#
# duration - time to send the signal, 0 = forever
# mode - either shock1, shock2, shake or telescope, which are the modes for the LoveSpouse toys
# submode - 0 to 9, represents the 10 submodes of each mode
def send_adv(duration, mode, submode):
	global mainloop
	
	# get the hex command string from the selected modes
	command = get_command(mode, submode)
	
	dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
	bus = dbus.SystemBus()
	adapter = find_adapter(bus)
	if not adapter:
		print('LEAdvertisingManager1 interface not found')
		return

	adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), "org.freedesktop.DBus.Properties")
	adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
	ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), LE_ADVERTISING_MANAGER_IFACE)
	ls_advertisement = LoveSpouseAdvertisement(bus, 0, command)

	mainloop = GLib.MainLoop()

	ad_manager.RegisterAdvertisement(ls_advertisement.get_path(), {}, reply_handler=register_ad_cb, error_handler=register_ad_error_cb)

	if duration > 0:
		threading.Thread(target=shutdown, args=(duration,)).start()
	else:
		print('Advertising forever...')

	mainloop.run()  # blocks until mainloop.quit() is called

	ad_manager.UnregisterAdvertisement(ls_advertisement)
	print('Advertisement unregistered')
	dbus.service.Object.remove_from_connection(ls_advertisement)

class SendBeat(threading.Thread):
    def __init__(self, duration, bpm):
        threading.Thread.__init__(self)
        self.duration = duration
        self.ppulse  = 0.5
        self.ppause = (60 - (bpm * self.ppulse)) / bpm
        self.do_run = True
        
    def run(self):
        while getattr(self, 'do_run', True):
            send_adv(self.ppulse, 'shake', 3)
            time.sleep(self.ppause)

# called from http server class to send command to toy.
# sb is the global threading object for beats            
sb = None            
def toy_control(parms):
    global sb
    duration = 0.05 # time to send advertisement
    mode = parms['mode']
    submode = parms['submode']
    
    if mode == 'bpm':
        bpm = submode
        
        if submode==0:
            if sb != None:
                sb.do_run = False
            send_adv(duration, mode, 0)
        else:
            sb = SendBeat(duration, bpm)
            sb.start()
    else:
        send_adv(duration, mode, submode)
    

# function to run http server
def httpd_run(server_class=HTTPServer, handler_class=S, port=8080):
    logging.basicConfig(level=logging.INFO)
    server_address = ('', port)
    httpd = server_class(server_address, handler_class)
    logging.info('Starting httpd...\n')
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        pass
    httpd.server_close()
    logging.info('Stopping httpd...\n')
    
# MAIN
if __name__ == '__main__':
    httpd_run()
      
Setup checklist:
  • Install violentmonkey as browser add-in
  • Install LoveSpouseEOS monkey script (search for LoveSpouseEOS within violentmonkey)
  • Save the python script as e.g. LoveSpouseEOS_server.py to your local computer
  • Start the python script
  • Turn on your toy
Probably there will be some quirks at the beginning. Let me know and I will see if I can extend or fix it.

Use it in EOS teases:
There are to different ways to use the toys at the moment. Both need notifications turned on in the tease.
  • Add a notification for a given time with the label: "Vibes: x", where x is a number between 1-9, selecting the mode your toy will vibe. You toy will then vibe for the given time in mode x.
  • Add a notification for a given time with the label "Beat: x", where x are the beats per minute, then your toy will vibe the given time with x beats per minute.
Credits: Have fun. :-D
User avatar
ComradeOohAah
Explorer
Explorer
Posts: 7
Joined: Wed Dec 18, 2024 7:49 pm
Gender: Male
Sexual Orientation: Straight
I am a: Dom (Male)
Contact:

Re: Scripting Love spouse app toys

Post by ComradeOohAah »

Howdy! Great work you've done here. :-D

So,I don't know if complicates things or makes it easier for folks, but I've got an android based media player called BuzzyBody for using funscripts with the Love Spouse toys and last month I released a feature that lets Buzzybody connect to Intiface as an emulated Lovense Hush. You can see the tutorial I wrote for the set up here.

I've only tested it from the Intiface Device menu so far, but would love for people to test it with real world situations like this so I know it's working properly. You do lose some resolution as it scales the Hush's 0-20 power scale to Love Spouse 0-9, but it should be usable scripts that use the full spectrum.
Post Reply