--- netmd/usb1.py | 106 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 files changed, 105 insertions(+), 1 deletions(-)
diff --git a/netmd/usb1.py b/netmd/usb1.py index d22f52a..34731b3 100644 --- a/netmd/usb1.py +++ b/netmd/usb1.py @@ -1,6 +1,6 @@ # pyusb compatibility layer for libus-1.0 import libusb1 -from ctypes import byref, create_string_buffer, c_int +from ctypes import byref, create_string_buffer, c_int, sizeof, POINTER from cStringIO import StringIO __all__ = ['LibUSBContext'] @@ -162,6 +162,72 @@ class USBDeviceHandle(object): transferred = self._interruptTransfer(endpoint, data, length, timeout) return data.raw[:transferred] + def _getTransfer(self, iso_packets=0): + result = libusb1.libusb_alloc_transfer(iso_packets) + if not result: + raise libusb1.USBError, 'Unable to get a transfer object' + return result + + def fillBulkTransfer(self, transfer, endpoint, string_buffer, + callback, user_data, timeout): + libusb1.libusb_fill_bulk_transfer(transfer, self.handle, + endpoint, string_buffer, sizeof(string_buffer), + libusb1.libusb_transfer_cb_fn_p(callback), user_data, + timeout) + + def getBulkTransfer(self, endpoint, string_buffer, callback, + user_data=None, timeout=0): + result = self._getTransfer() + self.fillBulkTransfer(result, endpoint, string_buffer, callback, + user_data, timeout) + return result + + def fillInterruptTransfer(self, transfer, endpoint, string_buffer, + callback, user_data, timeout): + libusb1.libusb_fill_interrupt_transfer(transfer, self.handle, + endpoint, string_buffer, sizeof(string_buffer), + libusb1.libusb_transfer_cb_fn_p(callback), user_data, + timeout) + + def getInterruptTransfer(self, endpoint, string_buffer, callback, + user_data=None, timeout=0): + result = self._getTransfer() + self.fillInterruptTransfer(result, endpoint, string_buffer, + callback, user_data, timeout) + return result + + def fillControlSetup(self, string_buffer, request_type, request, value, + index, length): + libusb1.libusb_fill_control_setup(string_buffer, request_type, + request, value, index, length) + + def fillControlTransfer(self, transfer, setup, callback, + user_data, timeout): + libusb1.libusb_fill_control_transfer(transfer, self.handle, + setup, libusb1.libusb_transfer_cb_fn_p(callback), user_data, + timeout) + + def getControlTransfer(self, setup, callback, user_data=None, timeout=0): + result = self._getTransfer() + self.fillControlTransfer(result, setup, callback, user_data, timeout) + return result + + def fillISOTransfer(self, *args, **kw): + raise NotImplementedError + + def getISOTransfer(self, *args, **kw): + raise NotImplementedError + + def submitTransfer(self, transfer): + result = libusb1.libusb_submit_transfer(transfer) + if result: + raise libusb1.USBError, result + + def cancelTransfer(self, transfer): + result = libusb1.libusb_cancel_transfer(transfer) + if result: + raise libusb1.USBError, result + class USBDevice(object): configuration_descriptor_list = None @@ -347,3 +413,41 @@ class LibUSBContext(object): result = None return result + def getPollFDList(self): + pollfd_p_p = libusb1.libusb_get_pollfds(self.context_p) + result = [] + append = result.append + fd_index = 0 + while pollfd_p_p[fd_index]: + append((pollfd_p_p[fd_index].contents.fd, + pollfd_p_p[fd_index].contents.events)) + fd_index += 1 + return result + + def handleEvents(self): + result = libusb1.libusb_handle_events(self.context_p) + if result: + raise libusb1.USBError, result + + def handleEventsTimeout(self, tv=None): + assert tv is None, 'tv parameter is not supported yet' + tv = libusb1.timeval(0, 0) + result = libusb1.libusb_handle_events_timeout(self.context_p, byref(tv)) + if result: + raise libusb1.USBError, result + + def setPollFDNotifiers(self, added_cb=None, removed_cb=None, user_data=None): + if added_cb is None: + added_cb = POINTER(None) + else: + added_cb = libusb1.libusb_pollfd_added_cb_p(added_cb) + if removed_cb is None: + removed_cb = POINTER(None) + else: + removed_cb = libusb1.libusb_pollfd_removed_cb_p(removed_cb) + libusb1.libusb_set_pollfd_notifiers(self.context_p, added_cb, + removed_cb, user_data) + + def getNextTimeout(self): + return libusb1.libusb_get_next_timeout(self.context_p, None) +