[linux-minidisc] [PATCH 10/17] Add helper classes for asynchronous reads.
- From: Vincent Pelletier <plr.vincent@gmail.com>
- To: <linux-minidisc@lists.fu-berlin.de>
- Date: Wed, 27 Jan 2010 22:10:27 +0100
- Domainkey-signature: a=rsa-sha1; c=nofws; d=gmail.com; s=gamma; h=message-id:in-reply-to:references:from:date:subject:to:mime-version :content-type:x-bogosity:x-uid; b=Gdw3ijjg64f0iSI3IOwPwXhw/leSTFRvHr0ZKEirPYnLmHUW5frLdjJ1QnwlTqcMui mFOF0t3pyTE/LlOWj5u1w55xofW8gBZTwhIDuHtRRAtZobOYGbExwub9/YiQk2SGScT9 afHb6Cv7uWu2CRv+RJ0Fde3mUnziI54lcysNY=
- Subject: [linux-minidisc] [PATCH 10/17] Add helper classes for asynchronous reads.
---
netmd/usb1.py | 123 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 123 insertions(+), 0 deletions(-)
diff --git a/netmd/usb1.py b/netmd/usb1.py
index 925adfc..cd0711b 100644
--- a/netmd/usb1.py
+++ b/netmd/usb1.py
@@ -8,6 +8,129 @@ __all__ = ['LibUSBContext']
# Default string length
STRING_LENGTH = 256
+EVENT_CALLBACK_SET = frozenset((
+ libusb1.LIBUSB_TRANSFER_COMPLETED,
+ libusb1.LIBUSB_TRANSFER_ERROR,
+ libusb1.LIBUSB_TRANSFER_TIMED_OUT,
+ libusb1.LIBUSB_TRANSFER_CANCELLED,
+ libusb1.LIBUSB_TRANSFER_STALL,
+ libusb1.LIBUSB_TRANSFER_NO_DEVICE,
+ libusb1.LIBUSB_TRANSFER_OVERFLOW,
+))
+
+DEFAULT_ASYNC_TRANSFER_ERROR_CALLBACK = lambda x, y: False
+
+class USBAsyncReaderBase(object):
+ _handle = None
+ _submited = False
+
+ def __init__(self, handle, endpoint, size, user_data=None, timeout=0):
+ data = create_string_buffer(size)
+ self._data = data
+ self._transfer = self._getTransfer(
+ handle,
+ endpoint,
+ data,
+ self._callbackDispatcher,
+ user_data,
+ timeout,
+ )
+ # XXX: set _handle *after* _transfer, so __del__ doesn't get an
+ # exception if called during constructor execution.
+ self._handle = handle
+ self._event_callback_dict = {}
+ self._errorCallback = DEFAULT_ASYNC_TRANSFER_ERROR_CALLBACK
+
+ def submit(self):
+ self._submited = True
+ self._handle.submitTransfer(self._transfer)
+
+ def cancel(self):
+ self._handle.cancelTransfer(self._transfer)
+ self._submited = False
+
+ def setEventCallback(self, event, callback):
+ if event not in EVENT_CALLBACK_SET:
+ raise ValueError, 'Unknown event %r.' % (event, )
+ self._event_callback_dict[event] = callback
+
+ def setDefaultCallback(self, callback):
+ self._errorCallback = callback
+
+ def getEventCallback(self, event, default=None):
+ return self._event_callback_dict.get(event, default)
+
+ def _callbackDispatcher(self, transfer_p):
+ transfer = self._transfer.contents #transfer_p.contents
+ if self.getEventCallback(transfer.status, self._errorCallback)(
+ transfer, self._data):
+ self.submit()
+ else:
+ self._submited = False
+
+ def isSubmited(self):
+ return self._submited
+
+ def __del__(self):
+ if self._handle is not None:
+ try:
+ self.cancel()
+ except libusb1.USBError, exception:
+ if exception.value != libusb1.LIBUSB_ERROR_NOT_FOUND:
+ raise
+
+class USBAsyncBulkReader(USBAsyncReaderBase):
+ def _getTransfer(self, handle, *args, **kw):
+ return handle.getBulkTransfer(*args, **kw)
+
+class USBAsyncInterruptReader(USBAsyncReaderBase):
+ def _getTransfer(self, handle, *args, **kw):
+ return handle.getInterruptTransfer(*args, **kw)
+
+class USBPoller(object):
+ def __init__(self, context, poller):
+ self.context = context
+ self.poller = poller
+ fd_set = set()
+ self.fd_set = fd_set
+ context.setPollFDNotifiers(self._registerFD, self._unregisterFD)
+ for fd, events in context.getPollFDList():
+ self._registerFD(fd, events)
+
+ def poll(self, timeout=None):
+ fd_set = self.fd_set
+ next_usb_timeout = self.context.getNextTimeout()
+ if next_usb_timeout == 0:
+ next_usb_timeout = None
+ if timeout is None:
+ usb_timeout = next_usb_timeout
+ else:
+ usb_timeout = min(next_usb_timeout or timeout, timeout)
+ event_list = self.poller.poll(usb_timeout)
+ event_list_len = len(event_list)
+ if event_list_len:
+ result = [(x, y) for x, y in event_list if x not in fd_set]
+ if len(result) != event_list_len:
+ self.context.handleEventsTimeout()
+ else:
+ result = event_list
+ self.context.handleEventsTimeout()
+ return result
+
+ def register(self, fd, events):
+ self.poller.register(fd, events)
+
+ def unregister(self, fd):
+ self.poller.unregister(fd)
+
+ def _registerFD(self, fd, events, user_data=None):
+ self.fd_set.add(fd)
+ self.register(fd, events)
+
+ def _unregisterFD(self, fd, user_data=None):
+ self.unregister(fd)
+ self.sd_set.discard(fd)
+
class USBDeviceHandle(object):
handle = None