Scheme-GNUnet requires the new operations 'wait-until-port-readable-operation' and 'wait-until-port-readable-operation' for communicating with services. This patch has been previously submitted at <https://github.com/wingo/fibers/pull/50>, on Sep 16, 2021. As of Feb 3, 2022, upstream has not responded yet. diff --git a/Makefile.am b/Makefile.am index e2db57e..0134255 100644 --- a/Makefile.am +++ b/Makefile.am @@ -33,6 +33,7 @@ SOURCES = \ fibers/deque.scm \ fibers/epoll.scm \ fibers/interrupts.scm \ + fibers/io-wakeup.scm \ fibers/nameset.scm \ fibers/operations.scm \ fibers/posix-clocks.scm \ @@ -67,6 +68,7 @@ TESTS = \ tests/conditions.scm \ tests/channels.scm \ tests/foreign.scm \ + tests/io-wakeup.scm \ tests/parameters.scm \ tests/preemption.scm \ tests/speedup.scm diff --git a/fibers.texi b/fibers.texi index 52f7177..0990c8f 100644 --- a/fibers.texi +++ b/fibers.texi @@ -12,6 +12,7 @@ This manual is for Fibers (version @value{VERSION}, updated @value{UPDATED}) Copyright 2016-2022 Andy Wingo +Copyright 2021 Maxime Devos @quotation @c For more information, see COPYING.docs in the fibers @@ -453,6 +454,7 @@ of operations for channels and timers, and an internals interface. * Channels:: Share memory by communicating. * Timers:: Operations on time. * Conditions:: Waiting for simple state changes. +* Port Readiness:: Waiting until a port is ready for I/O. * REPL Commands:: Experimenting with Fibers at the console. * Schedulers and Tasks:: Fibers are built from lower-level primitives. @end menu @@ -722,6 +724,28 @@ signalled. Equivalent to @code{(perform-operation (wait-operation cvar))}. @end defun +@node Port Readiness +@section Port Readiness + +These two operations can be used on file ports to wait until +they are readable or writable. Spurious wake-ups are possible. +This is complementary to Guile's suspendable ports. + +@example +(use-modules (fibers io-wakeup)) +@end example + +@defun wait-until-port-readable-operation port +Make an operation that will succeed with no values when the input +port @var{port} becomes readable. For passive sockets, this operation +succeeds when a connection becomes available. +@end defun + +@defun wait-until-port-writable-operation +Make an operation that will succeed with no values when the output +port @var{port} becomes writable. +@end defun + @node REPL Commands @section REPL Commands diff --git a/fibers/io-wakeup.scm b/fibers/io-wakeup.scm new file mode 100644 index 0000000..5df03f1 --- /dev/null +++ b/fibers/io-wakeup.scm @@ -0,0 +1,93 @@ +;; Fibers: cooperative, event-driven user-space threads. + +;;;; Copyright (C) 2016,2021 Free Software Foundation, Inc. +;;;; Copyright (C) 2021 Maxime Devos +;;;; +;;;; This library is free software; you can redistribute it and/or +;;;; modify it under the terms of the GNU Lesser General Public +;;;; License as published by the Free Software Foundation; either +;;;; version 3 of the License, or (at your option) any later version. +;;;; +;;;; This library is distributed in the hope that it will be useful, +;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;;; Lesser General Public License for more details. +;;;; +;;;; You should have received a copy of the GNU Lesser General Public +;;;; License along with this library; if not, write to the Free Software +;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +;;;; + +(define-module (fibers io-wakeup) + #:use-module (fibers scheduler) + #:use-module (fibers operations) + #:use-module (ice-9 atomic) + #:use-module (ice-9 match) + #:use-module (ice-9 threads) + #:use-module (ice-9 ports internal) + #:export (wait-until-port-readable-operation + wait-until-port-writable-operation)) + +(define *poll-sched* (make-atomic-box #f)) + +(define (poll-sched) + (or (atomic-box-ref *poll-sched*) + (let ((sched (make-scheduler))) + (cond + ((atomic-box-compare-and-swap! *poll-sched* #f sched)) + (else + ;; FIXME: Would be nice to clean up this thread at some point. + (call-with-new-thread + (lambda () + (define (finished?) #f) + (run-scheduler sched finished?))) + sched))))) + +;; These procedure are subject to spurious wakeups. + +(define (readable? port) + "Test if PORT is writable." + (match (select (vector port) #() #() 0) + ((#() #() #()) #f) + ((#(_) #() #()) #t))) + +(define (writable? port) + "Test if PORT is writable." + (match (select #() (vector port) #() 0) + ((#() #() #()) #f) + ((#() #(_) #()) #t))) + +(define (make-wait-operation ready? schedule-when-ready port port-ready-fd this-procedure) + (make-base-operation #f + (lambda _ + (and (ready? port) values)) + (lambda (flag sched resume) + (define (commit) + (match (atomic-box-compare-and-swap! flag 'W 'S) + ('W (resume values)) + ('C (commit)) + ('S #f))) + (if sched + (schedule-when-ready + sched (port-ready-fd port) commit) + (schedule-task + (poll-sched) + (lambda () + (perform-operation (this-procedure port)) + (commit))))))) + +(define (wait-until-port-readable-operation port) + "Make an operation that will succeed when PORT is readable." + (unless (input-port? port) + (error "refusing to wait forever for input on non-input port")) + (make-wait-operation readable? schedule-task-when-fd-readable port + port-read-wait-fd + wait-until-port-readable-operation)) + +(define (wait-until-port-writable-operation port) + "Make an operation that will succeed when PORT is writable." + (unless (output-port? port) + (error "refusing to wait forever for output on non-output port")) + (make-wait-operation writable? schedule-task-when-fd-writable port + port-write-wait-fd + wait-until-port-writable-operation)) diff --git a/tests/io-wakeup.scm b/tests/io-wakeup.scm new file mode 100644 index 0000000..c14fa81 --- /dev/null +++ b/tests/io-wakeup.scm @@ -0,0 +1,167 @@ +;; Fibers: cooperative, event-driven user-space threads. + +;;;; Copyright (C) 2016 Free Software Foundation, Inc. +;;;; Copyright (C) 2021 Maxime Devos +;;;; +;;;; This library is free software; you can redistribute it and/or +;;;; modify it under the terms of the GNU Lesser General Public +;;;; License as published by the Free Software Foundation; either +;;;; version 3 of the License, or (at your option) any later version. +;;;; +;;;; This library is distributed in the hope that it will be useful, +;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;;; Lesser General Public License for more details. +;;;; +;;;; You should have received a copy of the GNU Lesser General Public +;;;; License along with this library; if not, write to the Free Software +;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +;;;; + +(define-module (tests io-wakeup) + #:use-module (rnrs bytevectors) + #:use-module (ice-9 control) + #:use-module (ice-9 suspendable-ports) + #:use-module (ice-9 binary-ports) + #:use-module (fibers) + #:use-module (fibers io-wakeup) + #:use-module (fibers operations) + #:use-module (fibers timers)) + +(define failed? #f) + +(define-syntax-rule (assert-equal expected actual) + (let ((x expected)) + (format #t "assert ~s equal to ~s: " 'actual x) + (force-output) + (let ((y actual)) + (cond + ((equal? x y) (format #t "ok\n")) + (else + (format #t "no (got ~s)\n" y) + (set! failed? #t)))))) + +(define-syntax-rule (assert-run-fibers-terminates exp) + (begin + (format #t "assert run-fibers on ~s terminates: " 'exp) + (force-output) + (let ((start (get-internal-real-time))) + (call-with-values (lambda () (run-fibers (lambda () exp))) + (lambda vals + (format #t "ok (~a s)\n" (/ (- (get-internal-real-time) start) + 1.0 internal-time-units-per-second)) + (apply values vals)))))) + +(define-syntax-rule (assert-run-fibers-returns (expected ...) exp) + (begin + (call-with-values (lambda () (assert-run-fibers-terminates exp)) + (lambda run-fiber-return-vals + (assert-equal '(expected ...) run-fiber-return-vals))))) + + +;; Note that theoretically, on very slow systems, SECONDS might need +;; to be increased. However, readable/timeout? and writable/timeout? +;; call this 5 times in a loop anyways, so the effective timeout is +;; a fourth of a second, which should be plenty in practice. +(define* (with-timeout op #:key (seconds 0.05) (wrap values)) + (choice-operation op + (wrap-operation (sleep-operation seconds) wrap))) + +(define* (readable/timeout? port #:key (allowed-spurious 5)) + "Does waiting for readability time-out? +Allow @var{allowed-spurious} spurious wakeups." + (or (perform-operation + (with-timeout + (wrap-operation (wait-until-port-readable-operation port) + (lambda () #f)) + #:wrap (lambda () #t))) + (and (> allowed-spurious 0) + (readable/timeout? port #:allowed-spurious + (- allowed-spurious 1))))) + +(define* (writable/timeout? port #:key (allowed-spurious 5)) + "Does waiting for writability time-out? +Allow @var{allowed-spurious} spurious wakeups." + (or (perform-operation + (with-timeout + (wrap-operation (wait-until-port-writable-operation port) + (lambda () #f)) + #:wrap (lambda () #t))) + (and (> allowed-spurious 0) + (writable/timeout? port #:allowed-spurious + (- allowed-spurious 1))))) + +;; Tests: +;; * wait-until-port-readable-operaton / wait-until-port-writable-operation +;; blocks if the port isn't ready for input / output. +;; +;; This is tested with a pipe (read & write) +;; and a listening socket (read, or accept in this case). +;; +;; Due to the possibility of spurious wakeups, +;; a limited few spurious wakeups are tolerated. +;; +;; * these operations succeed if the port is ready for input / output. +;; +;; These are again tested with a pipe and a listening socket +;; +;; Blocking is detected with a small time-out. + +(define (make-listening-socket) + (let ((server (socket PF_INET SOCK_DGRAM 0))) + (bind server AF_INET INADDR_LOOPBACK 0) + server)) + +(let ((s (make-listening-socket))) + (assert-run-fibers-returns (#t) + (readable/timeout? s)) + (assert-equal #t (readable/timeout? s)) + (close s)) + +(define (set-nonblocking! sock) + (let ((flags (fcntl sock F_GETFL))) + (fcntl sock F_SETFL (logior O_NONBLOCK flags)))) + +(define-syntax-rule (with-pipes (A B) exp exp* ...) + (let* ((pipes (pipe)) + (A (car pipes)) + (B (cdr pipes))) + exp exp* ... + (close A) + (close B))) + +(with-pipes (A B) + (setvbuf A 'none) + (setvbuf B 'none) + (assert-run-fibers-returns (#t) + (readable/timeout? A)) + (assert-equal #t (readable/timeout? A)) + + ;; The buffer is empty, so writability is expected. + (assert-run-fibers-returns (#f) + (writable/timeout? B)) + (assert-equal #f (writable/timeout? B)) + + ;; Fill the buffer + (set-nonblocking! B) + (let ((bv (make-bytevector 1024))) + (let/ec k + (parameterize ((current-write-waiter k)) + (let loop () + (put-bytevector B bv) + (loop))))) + + ;; As the buffer is full, writable/timeout? should return + ;; #t. + (assert-run-fibers-returns (#t) + (writable/timeout? B)) + ;; There's plenty to read now, so readable/timeout? should + ;; return #f. + (assert-run-fibers-returns (#f) + (readable/timeout? A))) + +(exit (if failed? 1 0)) + +;; Local Variables: +;; eval: (put 'with-pipes 'scheme-indent-function 1) +;; End: