From 6774c79ba6aff8e5f86108c81297721087639b41 Mon Sep 17 00:00:00 2001 From: Jonas 'Sortie' Termansen Date: Tue, 29 Apr 2014 22:16:07 +0200 Subject: [PATCH] Fix polling Unix sockets in both incoming and outgoing directions. --- kernel/include/sortix/kernel/poll.h | 8 +++- kernel/inode.cpp | 5 ++- kernel/logterminal.cpp | 2 +- kernel/net/fs.cpp | 16 +++++--- kernel/pipe.cpp | 60 +++++++++++++++++++---------- kernel/poll.cpp | 46 ++++++++++++++++------ 6 files changed, 96 insertions(+), 41 deletions(-) diff --git a/kernel/include/sortix/kernel/poll.h b/kernel/include/sortix/kernel/poll.h index 7da30245..793da5d6 100644 --- a/kernel/include/sortix/kernel/poll.h +++ b/kernel/include/sortix/kernel/poll.h @@ -1,6 +1,6 @@ /******************************************************************************* - Copyright(C) Jonas 'Sortie' Termansen 2012. + Copyright(C) Jonas 'Sortie' Termansen 2012, 2014. This file is part of Sortix. @@ -57,7 +57,8 @@ class PollNode friend class PollChannel; public: - PollNode() { next = NULL; prev = NULL; channel = NULL; } + PollNode() { next = NULL; prev = NULL; channel = NULL; master = this; slave = NULL; } + ~PollNode() { delete slave; } private: PollNode* next; @@ -65,6 +66,8 @@ private: public: PollChannel* channel; + PollNode* master; + PollNode* slave; public: kthread_mutex_t* wake_mutex; @@ -75,6 +78,7 @@ public: public: void Cancel(); + PollNode* CreateSlave(); }; diff --git a/kernel/inode.cpp b/kernel/inode.cpp index 4525a527..0e841372 100644 --- a/kernel/inode.cpp +++ b/kernel/inode.cpp @@ -332,8 +332,9 @@ int AbstractInode::poll(ioctx_t* /*ctx*/, PollNode* /*node*/) if ( inode_type == INODE_TYPE_FILE ) { // TODO: Correct bits? - node->revents |= (POLLIN | POLLOUT) & node->events; - // TODO: What if not listening on events (POLLIN | POLLOUT)? + if ( !((POLLIN | POLLOUT) & node->events) ) + return errno = EAGAIN, -1; + node->master->revents |= (POLLIN | POLLOUT) & node->events; return 0; } #endif diff --git a/kernel/logterminal.cpp b/kernel/logterminal.cpp index a8afd13e..1da3ae2c 100644 --- a/kernel/logterminal.cpp +++ b/kernel/logterminal.cpp @@ -445,7 +445,7 @@ int LogTerminal::poll(ioctx_t* /*ctx*/, PollNode* node) short ret_status = PollEventStatus() & node->events; if ( ret_status ) { - node->revents |= ret_status; + node->master->revents |= ret_status; return 0; } poll_channel.Register(node); diff --git a/kernel/net/fs.cpp b/kernel/net/fs.cpp index 252c299e..592a28d7 100644 --- a/kernel/net/fs.cpp +++ b/kernel/net/fs.cpp @@ -289,9 +289,14 @@ ssize_t StreamSocket::write(ioctx_t* ctx, const uint8_t* buf, size_t count) int StreamSocket::poll(ioctx_t* ctx, PollNode* node) { if ( is_connected ) - // TODO: The poll API is broken, can't provide multiple sources on a poll - // node. For now, polling the read channel should be most useful. - return incoming.poll(ctx, node); + { + PollNode* slave = node->CreateSlave(); + if ( !slave ) + return -1; + int incoming_result = incoming.poll(ctx, node); + int outgoing_result = outgoing.poll(ctx, slave); + return incoming_result == 0 || outgoing_result == 0 ? 0 : -1; + } if ( is_listening ) return manager->AcceptPoll(this, ctx, node); return errno = ENOTCONN, -1; @@ -360,8 +365,9 @@ void Manager::Unlisten(StreamSocket* socket) int Manager::AcceptPoll(StreamSocket* socket, ioctx_t* /*ctx*/, PollNode* node) { ScopedLock lock(&manager_lock); - if ( socket->first_pending ) - return (node->revents |= POLLIN | POLLRDNORM), 0; + if ( socket->first_pending && + ((POLLIN | POLLRDNORM) & node->events) ) + return node->master->revents |= ((POLLIN | POLLRDNORM) & node->events), 0; socket->accept_poll_channel.Register(node); return errno = EAGAIN, -1; } diff --git a/kernel/pipe.cpp b/kernel/pipe.cpp index 4d106ee1..fc7eb6ef 100644 --- a/kernel/pipe.cpp +++ b/kernel/pipe.cpp @@ -1,6 +1,6 @@ /******************************************************************************* - Copyright(C) Jonas 'Sortie' Termansen 2011, 2012, 2013. + Copyright(C) Jonas 'Sortie' Termansen 2011, 2012, 2013, 2014. This file is part of Sortix. @@ -62,13 +62,16 @@ public: void PerhapsShutdown(); ssize_t read(ioctx_t* ctx, uint8_t* buf, size_t count); ssize_t write(ioctx_t* ctx, const uint8_t* buf, size_t count); - int poll(ioctx_t* ctx, PollNode* node); + int read_poll(ioctx_t* ctx, PollNode* node); + int write_poll(ioctx_t* ctx, PollNode* node); private: - short PollEventStatus(); + short ReadPollEventStatus(); + short WritePollEventStatus(); private: - PollChannel poll_channel; + PollChannel read_poll_channel; + PollChannel write_poll_channel; kthread_mutex_t pipelock; kthread_cond_t readcond; kthread_cond_t writecond; @@ -114,7 +117,8 @@ void PipeChannel::CloseWriting() void PipeChannel::PerhapsShutdown() { kthread_mutex_lock(&pipelock); - poll_channel.Signal(PollEventStatus()); + read_poll_channel.Signal(ReadPollEventStatus()); + write_poll_channel.Signal(WritePollEventStatus()); bool deleteme = !anyreading & !anywriting; kthread_mutex_unlock(&pipelock); if ( deleteme ) @@ -145,7 +149,8 @@ ssize_t PipeChannel::read(ioctx_t* ctx, uint8_t* buf, size_t count) bufferoffset = (bufferoffset + amount) % buffersize; bufferused -= amount; kthread_cond_broadcast(&writecond); - poll_channel.Signal(PollEventStatus()); + read_poll_channel.Signal(ReadPollEventStatus()); + write_poll_channel.Signal(WritePollEventStatus()); return amount; } @@ -178,34 +183,48 @@ ssize_t PipeChannel::write(ioctx_t* ctx, const uint8_t* buf, size_t count) ctx->copy_from_src(buffer + writeoffset, buf, amount); bufferused += amount; kthread_cond_broadcast(&readcond); - poll_channel.Signal(PollEventStatus()); + read_poll_channel.Signal(ReadPollEventStatus()); + write_poll_channel.Signal(WritePollEventStatus()); return amount; } -short PipeChannel::PollEventStatus() +short PipeChannel::ReadPollEventStatus() { short status = 0; - if ( !anywriting ) + if ( !anywriting && !bufferused ) status |= POLLHUP; - if ( !anyreading ) - status |= POLLERR; if ( bufferused ) status |= POLLIN | POLLRDNORM; - if ( bufferused != buffersize ) + return status; +} + +short PipeChannel::WritePollEventStatus() +{ + short status = 0; + if ( !anyreading ) + status |= POLLERR; + if ( anyreading && bufferused != buffersize ) status |= POLLOUT | POLLWRNORM; return status; } -int PipeChannel::poll(ioctx_t* /*ctx*/, PollNode* node) +int PipeChannel::read_poll(ioctx_t* /*ctx*/, PollNode* node) { ScopedLockSignal lock(&pipelock); - short ret_status = PollEventStatus() & node->events; + short ret_status = ReadPollEventStatus() & node->events; if ( ret_status ) - { - node->revents |= ret_status; - return 0; - } - poll_channel.Register(node); + return node->master->revents |= ret_status, 0; + read_poll_channel.Register(node); + return errno = EAGAIN, -1; +} + +int PipeChannel::write_poll(ioctx_t* /*ctx*/, PollNode* node) +{ + ScopedLockSignal lock(&pipelock); + short ret_status = WritePollEventStatus() & node->events; + if ( ret_status ) + return node->master->revents |= ret_status, 0; + write_poll_channel.Register(node); return errno = EAGAIN, -1; } @@ -263,7 +282,8 @@ ssize_t PipeEndpoint::write(ioctx_t* ctx, const uint8_t* buf, size_t count) int PipeEndpoint::poll(ioctx_t* ctx, PollNode* node) { - return channel->poll(ctx, node); + return reading ? channel->read_poll(ctx, node) + : channel->write_poll(ctx, node); } class PipeNode : public AbstractInode diff --git a/kernel/poll.cpp b/kernel/poll.cpp index ec37ac4a..3df62c71 100644 --- a/kernel/poll.cpp +++ b/kernel/poll.cpp @@ -1,6 +1,6 @@ /******************************************************************************* - Copyright(C) Jonas 'Sortie' Termansen 2012. + Copyright(C) Jonas 'Sortie' Termansen 2012, 2014. This file is part of Sortix. @@ -79,20 +79,24 @@ void PollChannel::Signal(short events) void PollChannel::SignalUnlocked(short events) { for ( PollNode* node = first; node; node = node->next ) - if ( node->revents |= events & (node->events | POLL__ONLY_REVENTS) ) + { + PollNode* target = node->master; + if ( target->revents |= events & (target->events | POLL__ONLY_REVENTS) ) { - ScopedLock node_lock(node->wake_mutex); - if ( !*node->woken ) + ScopedLock target_lock(target->wake_mutex); + if ( !*target->woken ) { - *node->woken = true; - kthread_cond_signal(node->wake_cond); + *target->woken = true; + kthread_cond_signal(target->wake_cond); } } + } } void PollChannel::Register(PollNode* node) { ScopedLock lock(&channel_lock); + assert(!node->channel); node->channel = this; if ( !first ) first = last = node, @@ -124,6 +128,23 @@ void PollNode::Cancel() { if ( channel ) channel->Unregister(this); + if ( slave ) + slave->Cancel(); +} + +PollNode* PollNode::CreateSlave() +{ + PollNode* new_slave = new PollNode(); + if ( !new_slave ) + return NULL; + new_slave->wake_mutex = wake_mutex; + new_slave->wake_cond = wake_cond; + new_slave->events = events; + new_slave->revents = revents; + new_slave->woken = woken; + new_slave->master = master; + new_slave->slave = slave; + return slave = new_slave; } namespace Poll { @@ -191,7 +212,7 @@ static int sys_ppoll(struct pollfd* user_fds, nfds_t nfds, bool unexpected_error = false; nfds_t reqs = nfds; - for ( reqs = 0; !unexpected_error && reqs < nfds; reqs++ ) + for ( reqs = 0; !unexpected_error && reqs < nfds; ) { PollNode* node = nodes + reqs; if ( fds[reqs].fd < 0 ) @@ -202,16 +223,18 @@ static int sys_ppoll(struct pollfd* user_fds, nfds_t nfds, // user-space immediately? What if conditions are already true on // some of the file descriptors (those we have processed so far?)? node->revents = 0; + reqs++; continue; } Ref desc = process->GetDescriptor(fds[reqs].fd); - if ( !desc ) { unexpected_error = true; break; } - node->events = fds[reqs].events; + if ( !desc ) { self_woken = unexpected_error = true; break; } + node->events = fds[reqs].events | POLL__ONLY_REVENTS; node->revents = 0; node->wake_mutex = &wakeup_mutex; node->wake_cond = &wakeup_cond; node->woken = (bool*) &remote_woken; - // TODO: How should erors be handled? + reqs++; + // TODO: How should errors be handled? if ( desc->poll(&ctx, node) == 0 ) self_woken = true; else if ( errno != EAGAIN ) @@ -231,7 +254,8 @@ static int sys_ppoll(struct pollfd* user_fds, nfds_t nfds, kthread_mutex_unlock(&wakeup_mutex); for ( nfds_t i = 0; i < reqs; i++ ) - nodes[i].Cancel(); + if ( 0 <= fds[i].fd ) + nodes[i].Cancel(); if ( !unexpected_error ) {