14 import sys, traceback, Ice
28 properties.load(
"./default.generated.cfg")
29 properties.load(
"./default.cfg")
31 init_data = Ice.InitializationData()
32 init_data.properties = properties
34 ic = Ice.initialize(init_data)
36 Ice.loadSlice(
'-I%s %s -DMININTERFACE' % (Ice.getSliceDir(),
'ProsthesisInterface.ice'))
41 obj = ic.stringToProxy(
"IceStorm/TopicManager")
42 print(
"stringToProxy")
44 topicMan = IceStorm.TopicManagerPrx.checkedCast(obj)
46 topicName =
"ProsthesisMotorValues"
50 topic = topicMan.retrieve(topicName);
51 except IceStorm.NoSuchTopic:
52 print(
"No such topic.")
55 topic = topicMan.create(topicName)
56 print(
"Try to create topic.")
58 except IceStorm.TopicExists:
61 topic = topicMan.retrieve(topicName)
64 pub = topic.getPublisher().ice_oneway();
65 prosthesis = armarx.ProsthesisListenerInterfacePrx.uncheckedCast(pub);
72 raise RuntimeError(
"Invalid proxy")
81 ############################################################## BLE part
83 motorData = np.zeros((2,2))
85 # Clear any cached data because both bluez and CoreBluetooth have issues with
86 # caching data and it going stale.
87 ble.clear_cached_data()
89 # Get the first available BLE network adapter and make sure it's powered on.
90 adapter = ble.get_default_adapter()
92 print('Using adapter: {0}'.format(adapter.name))
94 # Disconnect any currently connected UART devices. Good for cleaning up and
95 # starting from a fresh state.
96 print('Disconnecting any connected UART devices...')
97 UART.disconnect_devices()
99 # Scan for UART devices.
100 print('Searching for UART device...')
103 # Search for the first UART device found (will time out after 60 seconds
104 # but you can specify an optional timeout_sec parameter to change it).
105 device = UART.find_device()
107 raise RuntimeError('Failed to find UART device!')
109 # Make sure scanning is stopped before exiting.
112 print('Connecting to device...')
114 device.connect() # Will time out after 60 seconds, specify timeout_sec parameter
115 # to change the timeout.
117 # Once connected do everything else in a try/finally to make sure the device
118 # is disconnected when done.
120 # Wait for service discovery to complete for the UART service. Will
121 # time out after 60 seconds (specify timeout_sec parameter to override).
122 print('Discovering services...')
123 UART.discover(device, timeout_sec=10)
125 # Once service discovery is complete create an instance of the service
126 # and start interacting with it.
129 # Write a string to the TX characteristic.
130 #uart.write('g1\r\n')
131 #print("Sent 'Hello world!' to the device.")
133 # Now wait up to one minute to receive data from the device.
136 #print('Waiting up to 60 seconds to receive data from the device...')
137 received = uart.read(timeout_sec=60)
138 if received is not None:
139 # Received data, print it out.
140 #print('Received: {0}'.format(received))
141 rawString = (rawString if rawString else "") + received
143 # Timeout waiting for data, None is returned.
144 #print('Received no data!')
147 lineSep = rawString.split('\n')
148 while len(lineSep) > 1:
149 fullLine = lineSep.pop(0)
151 dataList = [float(s) for s in re.findall(r'-?\d+\.?\d*', fullLine)]
152 motorNo = dataList[0]
153 motorPos = dataList[1]
154 motorPwm = dataList[2]
155 motorData[0,0] = motorPos
156 motorData[0,1] = motorPwm
157 #print('Motor No. %f at position %f with PWM %f.' %(motorNo, motorPos, motorPwm))
159 rawString = lineSep[0]
160 #sensor.readTemperature()
161 values = armarx.ProsthesisMotorValues(name = "motorValues", position1 = motorData[0,0], pwm1 = motorData[0,1], position2=motorData[1,0], pwm2 = motorData[1,1])
162 #values.name = "Test"
163 prosthesis.reportMotorValues(values)
169 # Make sure device is disconnected on exit.
177 traceback.print_exc()
192 prosthesis.reportMotorValues(values)