diff --git a/kernel/pipe.cpp b/kernel/pipe.cpp index a59959fa..823f71c2 100644 --- a/kernel/pipe.cpp +++ b/kernel/pipe.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -43,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -82,10 +84,14 @@ private: kthread_cond_t readcond; kthread_cond_t writecond; uint8_t* buffer; + uintptr_t sender_system_tid; + uintptr_t receiver_system_tid; size_t bufferoffset; size_t bufferused; size_t buffersize; size_t pretended_read_buffer_size; + size_t pledged_read; + size_t pledged_write; bool anyreading; bool anywriting; bool is_sigpipe_enabled; @@ -102,6 +108,10 @@ PipeChannel::PipeChannel(uint8_t* buffer, size_t buffersize) bufferoffset = bufferused = 0; anyreading = anywriting = true; is_sigpipe_enabled = true; + sender_system_tid = 0; + receiver_system_tid = 0; + pledged_read = 0; + pledged_write = 0; } PipeChannel::~PipeChannel() @@ -136,65 +146,125 @@ void PipeChannel::PerhapsShutdown() ssize_t PipeChannel::read(ioctx_t* ctx, uint8_t* buf, size_t count) { + if ( SSIZE_MAX < count ) + count = SSIZE_MAX; + Thread* this_thread = CurrentThread(); + this_thread->yield_to_tid = sender_system_tid; ScopedLockSignal lock(&pipelock); - if ( !lock.IsAcquired() ) { errno = EINTR; return -1; } - while ( anywriting && !bufferused ) + if ( !lock.IsAcquired() ) + return errno = EINTR, -1; + size_t so_far = 0; + while ( count ) { - if ( ctx->dflags & O_NONBLOCK ) - return errno = EWOULDBLOCK, -1; - if ( !kthread_cond_wait_signal(&readcond, &pipelock) ) + receiver_system_tid = this_thread->system_tid; + while ( anywriting && !bufferused ) { - errno = EINTR; - return -1; + this_thread->yield_to_tid = sender_system_tid; + if ( pledged_read ) + { + pledged_write++; + kthread_mutex_unlock(&pipelock); + kthread_yield(); + kthread_mutex_lock(&pipelock); + pledged_write--; + continue; + } + if ( so_far ) + return so_far; + if ( ctx->dflags & O_NONBLOCK ) + return errno = EWOULDBLOCK, -1; + pledged_write++; + bool interrupted = !kthread_cond_wait_signal(&readcond, &pipelock); + pledged_write--; + if ( interrupted ) + return errno = EINTR, -1; } + if ( !bufferused && !anywriting ) + return (ssize_t) so_far; + size_t amount = count; + if ( bufferused < amount ) + amount = bufferused; + size_t linear = buffersize - bufferoffset; + if ( linear < amount ) + amount = linear; + assert(amount); + if ( !ctx->copy_to_dest(buf, buffer + bufferoffset, amount) ) + return so_far ? (ssize_t) so_far : -1; + bufferoffset = (bufferoffset + amount) % buffersize; + bufferused -= amount; + buf += amount; + count -= amount; + so_far += amount; + kthread_cond_broadcast(&writecond); + read_poll_channel.Signal(ReadPollEventStatus()); + write_poll_channel.Signal(WritePollEventStatus()); } - if ( !bufferused && !anywriting ) { return 0; } - if ( bufferused < count ) { count = bufferused; } - size_t amount = count; - size_t linear = buffersize - bufferoffset; - if ( linear < amount ) { amount = linear; } - assert(amount); - ctx->copy_to_dest(buf, buffer + bufferoffset, amount); - bufferoffset = (bufferoffset + amount) % buffersize; - bufferused -= amount; - kthread_cond_broadcast(&writecond); - read_poll_channel.Signal(ReadPollEventStatus()); - write_poll_channel.Signal(WritePollEventStatus()); - return amount; + return (ssize_t) so_far; } ssize_t PipeChannel::write(ioctx_t* ctx, const uint8_t* buf, size_t count) { + if ( SSIZE_MAX < count ) + count = SSIZE_MAX; + Thread* this_thread = CurrentThread(); + this_thread->yield_to_tid = receiver_system_tid; ScopedLockSignal lock(&pipelock); - if ( !lock.IsAcquired() ) { errno = EINTR; return -1; } - while ( anyreading && bufferused == buffersize ) + if ( !lock.IsAcquired() ) + return errno = EINTR, -1; + sender_system_tid = this_thread->system_tid; + size_t so_far = 0; + while ( count ) { - if ( ctx->dflags & O_NONBLOCK ) - return errno = EWOULDBLOCK, -1; - if ( !kthread_cond_wait_signal(&writecond, &pipelock) ) + sender_system_tid = this_thread->system_tid; + while ( anyreading && bufferused == buffersize ) { - errno = EINTR; - return -1; + this_thread->yield_to_tid = receiver_system_tid; + if ( pledged_write ) + { + pledged_read++; + kthread_mutex_unlock(&pipelock); + kthread_yield(); + kthread_mutex_lock(&pipelock); + pledged_read--; + continue; + } + if ( so_far ) + return so_far; + if ( ctx->dflags & O_NONBLOCK ) + return errno = EWOULDBLOCK, -1; + pledged_read++; + bool interrupted = !kthread_cond_wait_signal(&writecond, &pipelock); + pledged_read--; + if ( interrupted ) + return errno = EINTR, -1; } + if ( !anyreading ) + { + if ( so_far ) + return (ssize_t) so_far; + if ( is_sigpipe_enabled ) + CurrentThread()->DeliverSignal(SIGPIPE); + return errno = EPIPE, -1; + } + size_t amount = count; + if ( buffersize - bufferused < amount ) + amount = buffersize - bufferused; + size_t writeoffset = (bufferoffset + bufferused) % buffersize; + size_t linear = buffersize - writeoffset; + if ( linear < amount ) + amount = linear; + assert(amount); + if ( !ctx->copy_from_src(buffer + writeoffset, buf, amount) ) + return so_far ? (ssize_t) so_far : -1; + bufferused += amount; + buf += amount; + count -= amount; + so_far += amount; + kthread_cond_broadcast(&readcond); + read_poll_channel.Signal(ReadPollEventStatus()); + write_poll_channel.Signal(WritePollEventStatus()); } - if ( !anyreading ) - { - if ( is_sigpipe_enabled ) - CurrentThread()->DeliverSignal(SIGPIPE); - return errno = EPIPE, -1; - } - if ( buffersize - bufferused < count ) { count = buffersize - bufferused; } - size_t writeoffset = (bufferoffset + bufferused) % buffersize; - size_t amount = count; - size_t linear = buffersize - writeoffset; - if ( linear < amount ) { amount = linear; } - assert(amount); - ctx->copy_from_src(buffer + writeoffset, buf, amount); - bufferused += amount; - kthread_cond_broadcast(&readcond); - read_poll_channel.Signal(ReadPollEventStatus()); - write_poll_channel.Signal(WritePollEventStatus()); - return amount; + return (ssize_t) so_far; } short PipeChannel::ReadPollEventStatus() @@ -341,14 +411,22 @@ void PipeEndpoint::Disconnect() ssize_t PipeEndpoint::read(ioctx_t* ctx, uint8_t* buf, size_t count) { - if ( !reading ) { errno = EBADF; return -1; } - return channel->read(ctx, buf, count); + if ( !reading ) + return errno = EBADF, -1; + ssize_t result = channel->read(ctx, buf, count); + CurrentThread()->yield_to_tid = 0; + Scheduler::ScheduleTrueThread(); + return result; } ssize_t PipeEndpoint::write(ioctx_t* ctx, const uint8_t* buf, size_t count) { - if ( reading ) { errno = EBADF; return -1; } - return channel->write(ctx, buf, count); + if ( reading ) + return errno = EBADF, -1; + ssize_t result = channel->write(ctx, buf, count); + CurrentThread()->yield_to_tid = 0; + Scheduler::ScheduleTrueThread(); + return result; } int PipeEndpoint::poll(ioctx_t* ctx, PollNode* node)