ProsthesisInterface.py
Go to the documentation of this file.
1#!/usr/bin/env python3
2# Example of interaction with a BLE UART device using a UART service
3# implementation.
4# Author: Tony DiCola
5#import Adafruit_BluefruitLE
6#from Adafruit_BluefruitLE.services import UART
7
8import re
9import numpy as np
10
11# Get the BLE provider for the current platform.
12#ble = Adafruit_BluefruitLE.get_provider()
13
14import sys, traceback, Ice
15import IceStorm
16import time
17
18#from bmp280 import BMP280
19
20status = 0
21ic = None
22
23def main():
24
25 ########################################################## ICE part
26 try:
27 properties = Ice.createProperties(sys.argv)
28 properties.load("./default.generated.cfg")
29 properties.load("./default.cfg")
30
31 init_data = Ice.InitializationData()
32 init_data.properties = properties
33
34 ic = Ice.initialize(init_data)
35
36 Ice.loadSlice('-I%s %s -DMININTERFACE' % (Ice.getSliceDir(), 'ProsthesisInterface.ice'))
37 print("loadSlice")
38 import armarx
39
40
41 obj = ic.stringToProxy("IceStorm/TopicManager")
42 print("stringToProxy")
43
44 topicMan = IceStorm.TopicManagerPrx.checkedCast(obj)
45 print("checkedCast")
46 topicName = "ProsthesisMotorValues"
47 print(topicName)
48
49 try:
50 topic = topicMan.retrieve(topicName);
51 except IceStorm.NoSuchTopic:
52 print("No such topic.")
53 try:
54
55 topic = topicMan.create(topicName)
56 print("Try to create topic.")
57
58 except IceStorm.TopicExists:
59 # if the topic has been created in the meanwhile
60 # retry the retrieval.
61 topic = topicMan.retrieve(topicName)
62 print("Try again.")
63
64 pub = topic.getPublisher().ice_oneway();
65 prosthesis = armarx.ProsthesisListenerInterfacePrx.uncheckedCast(pub);
66
67
68 #base = ic.stringToProxy("SimpleprosthesisInterfaceUnit")
69
70 #prosthesis = armarx.SimpleprosthesisInterfacePrx.checkedCast(base)
71 if not prosthesis:
72 raise RuntimeError("Invalid proxy")
73
74 #val = 0
75 #print(val)
76
77 except:
78 traceback.print_exc()
79 status = 1
80 '''
81 ############################################################## BLE part
82
83 motorData = np.zeros((2,2))
84
85 # Clear any cached data because both bluez and CoreBluetooth have issues with
86 # caching data and it going stale.
87 ble.clear_cached_data()
88
89 # Get the first available BLE network adapter and make sure it's powered on.
90 adapter = ble.get_default_adapter()
91 adapter.power_on()
92 print('Using adapter: {0}'.format(adapter.name))
93
94 # Disconnect any currently connected UART devices. Good for cleaning up and
95 # starting from a fresh state.
96 print('Disconnecting any connected UART devices...')
97 UART.disconnect_devices()
98
99 # Scan for UART devices.
100 print('Searching for UART device...')
101 try:
102 adapter.start_scan()
103 # Search for the first UART device found (will time out after 60 seconds
104 # but you can specify an optional timeout_sec parameter to change it).
105 device = UART.find_device()
106 if device is None:
107 raise RuntimeError('Failed to find UART device!')
108 finally:
109 # Make sure scanning is stopped before exiting.
110 adapter.stop_scan()
111
112 print('Connecting to device...')
113 print(device)
114 device.connect() # Will time out after 60 seconds, specify timeout_sec parameter
115 # to change the timeout.
116
117 # Once connected do everything else in a try/finally to make sure the device
118 # is disconnected when done.
119 try:
120 # Wait for service discovery to complete for the UART service. Will
121 # time out after 60 seconds (specify timeout_sec parameter to override).
122 print('Discovering services...')
123 UART.discover(device, timeout_sec=10)
124
125 # Once service discovery is complete create an instance of the service
126 # and start interacting with it.
127 uart = UART(device)
128 print("registered")
129 # Write a string to the TX characteristic.
130 #uart.write('g1\r\n')
131 #print("Sent 'Hello world!' to the device.")
132
133 # Now wait up to one minute to receive data from the device.
134 rawString = None
135 while(1):
136 #print('Waiting up to 60 seconds to receive data from the device...')
137 received = uart.read(timeout_sec=60)
138 if received is not None:
139 # Received data, print it out.
140 #print('Received: {0}'.format(received))
141 rawString = (rawString if rawString else "") + received
142 #else:
143 # Timeout waiting for data, None is returned.
144 #print('Received no data!')
145
146 received = None
147 lineSep = rawString.split('\n')
148 while len(lineSep) > 1:
149 fullLine = lineSep.pop(0)
150 print(fullLine)
151 dataList = [float(s) for s in re.findall(r'-?\d+\.?\d*', fullLine)]
152 motorNo = dataList[0]
153 motorPos = dataList[1]
154 motorPwm = dataList[2]
155 motorData[0,0] = motorPos
156 motorData[0,1] = motorPwm
157 #print('Motor No. %f at position %f with PWM %f.' %(motorNo, motorPos, motorPwm))
158
159 rawString = lineSep[0]
160 #sensor.readTemperature()
161 values = armarx.ProsthesisMotorValues(name = "motorValues", position1 = motorData[0,0], pwm1 = motorData[0,1], position2=motorData[1,0], pwm2 = motorData[1,1])
162 #values.name = "Test"
163 prosthesis.reportMotorValues(values)
164 #time.sleep(0.01)
165 #val = val + 1
166 #print(val)
167
168 finally:
169 # Make sure device is disconnected on exit.
170 device.disconnect()
171
172 if ic:
173 #clean up
174 try:
175 ic.destroy()
176 except:
177 traceback.print_exc()
178 status = 1
179
180 sys.exit(status)
181 '''
182# Initialize the BLE system. MUST be called before other BLE calls!
183#ble.initialize()
184
185# Start the mainloop to process BLE events, and run the provided function in
186# a background thread. When the provided main function stops running, returns
187# an integer status code, or throws an error the program will exit.
188#ble.run_mainloop_with(main)
189main()
190values = armarx.ProsthesisMotorValues(name = "motorValues", position1 = 0, pwm1 = 1, position2 = 2, pwm2 = 3)
191 #values.name = "Test"
192prosthesis.reportMotorValues(values)
Ice::PropertiesPtr createProperties()