forked from nortti/tea_cah
63 lines
1.9 KiB
Python
63 lines
1.9 KiB
Python
import select
|
|
import socket
|
|
import threading
|
|
|
|
class Channel:
|
|
"""An asynchronic communication channel that can be used to send python object and can be poll()ed."""
|
|
|
|
def __init__(self):
|
|
# We use a socket to enable polling and blocking reads
|
|
self.write_socket, self.read_socket = socket.socketpair()
|
|
self.poll = select.poll()
|
|
self.poll.register(self.read_socket, select.POLLIN)
|
|
|
|
# Store messages in a list
|
|
self.messages = []
|
|
self.messages_lock = threading.Lock()
|
|
|
|
def send(self, message):
|
|
# Add message to the list of messages and write to the write socket to signal there's data to read
|
|
with self.messages_lock:
|
|
self.write_socket.sendall(b'!')
|
|
self.messages.append(message)
|
|
|
|
def recv(self, blocking = True):
|
|
# Timeout of -1 will make poll wait until data is available
|
|
# Timeout of 0 will make poll exit immediately if there's no data
|
|
if blocking:
|
|
timeout = -1
|
|
else:
|
|
timeout = 0
|
|
|
|
# See if there is data to read / wait until there is
|
|
results = self.poll.poll(timeout)
|
|
|
|
# None of the sockets were ready. This can only happen if we weren't blocking
|
|
# Return None to signal lack of data
|
|
if len(results) == 0:
|
|
assert not blocking
|
|
return None
|
|
|
|
# Remove first message from the list (FIFO principle), and read one byte from the socket
|
|
# This keeps the number of available messages and the number of bytes readable in the socket in sync
|
|
with self.messages_lock:
|
|
message = self.messages.pop(0)
|
|
self.read_socket.recv(1)
|
|
|
|
return message
|
|
|
|
def fileno(self):
|
|
# Allows for a Channel object to be passed directly to poll()
|
|
return self.read_socket.fileno()
|
|
|
|
def close(self):
|
|
# Close the file descriptors, so that we aren't leaking them
|
|
self.write_socket.close()
|
|
self.read_socket.close()
|
|
|
|
# Support with-statements
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exception_type, exception_value, traceback):
|
|
self.close()
|