gcc/libgfortran/io/async.c
Nicolas Koenig 2b4c906561 re PR fortran/25829 ([F03] Asynchronous IO support)
2018-08-21  Nicolas Koenig  <koenigni@gcc.gnu.org>
	Thomas Koenig <tkoenig@gcc.gnu.org>

	PR fortran/25829
	* gfortran.texi: Add description of asynchronous I/O.
	* trans-decl.c (gfc_finish_var_decl): Treat asynchronous variables
	as volatile.
	* trans-io.c (gfc_build_io_library_fndecls): Rename st_wait to
	st_wait_async and change argument spec from ".X" to ".w".
	(gfc_trans_wait): Pass ID argument via reference.

2018-08-21  Nicolas Koenig  <koenigni@gcc.gnu.org>
	Thomas Koenig <tkoenig@gcc.gnu.org>

	PR fortran/25829
	* gfortran.dg/f2003_inquire_1.f03: Add write statement.
	* gfortran.dg/f2003_io_1.f03: Add wait statement.

2018-08-21  Nicolas Koenig  <koenigni@gcc.gnu.org>
	Thomas Koenig <tkoenig@gcc.gnu.org>

	PR fortran/25829
	* Makefile.am: Add async.c to gfor_io_src.
	Add async.h to gfor_io_headers.
	* Makefile.in: Regenerated.
	* gfortran.map: Add _gfortran_st_wait_async.
	* io/async.c: New file.
	* io/async.h: New file.
	* io/close.c: Include async.h.
	(st_close): Call async_wait for an asynchronous unit.
	* io/file_pos.c (st_backspace): Likewise.
	(st_endfile): Likewise.
	(st_rewind): Likewise.
	(st_flush): Likewise.
	* io/inquire.c: Add handling for asynchronous PENDING
	and ID arguments.
	* io/io.h (st_parameter_dt): Add async bit.
	(st_parameter_wait): Correct.
	(gfc_unit): Add au pointer.
	(st_wait_async): Add prototype.
	(transfer_array_inner): Likewise.
	(st_write_done_worker): Likewise.
	* io/open.c: Include async.h.
	(new_unit): Initialize asynchronous unit.
	* io/transfer.c (async_opt): New struct.
	(wrap_scalar_transfer): New function.
	(transfer_integer): Call wrap_scalar_transfer to do the work.
	(transfer_real): Likewise.
	(transfer_real_write): Likewise.
	(transfer_character): Likewise.
	(transfer_character_wide): Likewise.
	(transfer_complex): Likewise.
	(transfer_array_inner): New function.
	(transfer_array): Call transfer_array_inner.
	(transfer_derived): Call wrap_scalar_transfer.
	(data_transfer_init): Check for asynchronous I/O.
	Perform a wait operation on any pending asynchronous I/O
	if the data transfer is synchronous. Copy PDT and enqueue
	thread for data transfer.
	(st_read_done_worker): New function.
	(st_read_done): Enqueue transfer or call st_read_done_worker.
	(st_write_done_worker): New function.
	(st_write_done): Enqueue transfer or call st_read_done_worker.
	(st_wait): Document as no-op for compatibility reasons.
	(st_wait_async): New function.
	* io/unit.c (insert_unit): Use macros LOCK, UNLOCK and TRYLOCK;
	add NOTE where necessary.
	(get_gfc_unit): Likewise.
	(init_units): Likewise.
	(close_unit_1): Likewise. Call async_close if asynchronous.
	(close_unit): Use macros LOCK and UNLOCK.
	(finish_last_advance_record): Likewise.
	(newunit_alloc): Likewise.
	* io/unix.c (find_file): Likewise.
	(flush_all_units_1): Likewise.
	(flush_all_units): Likewise.
	* libgfortran.h (generate_error_common): Add prototype.
	* runtime/error.c: Include io.h and async.h.
	(generate_error_common): New function.

2018-08-21  Nicolas Koenig  <koenigni@gcc.gnu.org>
	Thomas Koenig <tkoenig@gcc.gnu.org>

	PR fortran/25829
	* testsuite/libgomp.fortran/async_io_1.f90: New test.
	* testsuite/libgomp.fortran/async_io_2.f90: New test.
	* testsuite/libgomp.fortran/async_io_3.f90: New test.
	* testsuite/libgomp.fortran/async_io_4.f90: New test.
	* testsuite/libgomp.fortran/async_io_5.f90: New test.
	* testsuite/libgomp.fortran/async_io_6.f90: New test.
	* testsuite/libgomp.fortran/async_io_7.f90: New test.


Co-Authored-By: Thomas Koenig <tkoenig@gcc.gnu.org>

From-SVN: r263750
2018-08-21 18:48:59 +00:00

570 lines
12 KiB
C

/* Copyright (C) 2018 Free Software Foundation, Inc.
Contributed by Nicolas Koenig
This file is part of the GNU Fortran runtime library (libgfortran).
Libgfortran is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3, or (at your option)
any later version.
Libgfortran is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
Under Section 7 of GPL version 3, you are granted additional
permissions described in the GCC Runtime Library Exception, version
3.1, as published by the Free Software Foundation.
You should have received a copy of the GNU General Public License and
a copy of the GCC Runtime Library Exception along with this program;
see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
<http://www.gnu.org/licenses/>. */
#include "libgfortran.h"
#define _GTHREAD_USE_COND_INIT_FUNC
#include "../../libgcc/gthr.h"
#include "io.h"
#include "fbuf.h"
#include "format.h"
#include "unix.h"
#include <string.h>
#include <assert.h>
#include <sys/types.h>
#include "async.h"
#if ASYNC_IO
DEBUG_LINE (__thread const char *aio_prefix = MPREFIX);
DEBUG_LINE (__gthread_mutex_t debug_queue_lock = __GTHREAD_MUTEX_INIT;)
DEBUG_LINE (aio_lock_debug *aio_debug_head = NULL;)
/* Current unit for asynchronous I/O. Needed for error reporting. */
__thread gfc_unit *thread_unit = NULL;
/* Queue entry for the asynchronous I/O entry. */
typedef struct transfer_queue
{
enum aio_do type;
struct transfer_queue *next;
struct st_parameter_dt *new_pdt;
transfer_args arg;
_Bool has_id;
int read_flag;
} transfer_queue;
struct error {
st_parameter_dt *dtp;
int id;
};
/* Helper function to exchange the old vs. a new PDT. */
static void
update_pdt (st_parameter_dt **old, st_parameter_dt *new) {
st_parameter_dt *temp;
NOTE ("Changing pdts, current_unit = %p", (void *) (new->u.p.current_unit));
temp = *old;
*old = new;
if (temp)
free (temp);
}
/* Destroy an adv_cond structure. */
static void
destroy_adv_cond (struct adv_cond *ac)
{
T_ERROR (__gthread_mutex_destroy, &ac->lock);
T_ERROR (__gthread_cond_destroy, &ac->signal);
}
/* Function invoked as start routine for a new asynchronous I/O unit.
Contains the main loop for accepting requests and handling them. */
static void *
async_io (void *arg)
{
DEBUG_LINE (aio_prefix = TPREFIX);
transfer_queue *ctq = NULL, *prev = NULL;
gfc_unit *u = (gfc_unit *) arg;
async_unit *au = u->au;
LOCK (&au->lock);
thread_unit = u;
au->thread = __gthread_self ();
while (true)
{
/* Main loop. At this point, au->lock is always held. */
WAIT_SIGNAL_MUTEX (&au->work, au->tail != NULL, &au->lock);
LOCK (&au->lock);
ctq = au->head;
prev = NULL;
/* Loop over the queue entries until they are finished. */
while (ctq)
{
if (prev)
free (prev);
prev = ctq;
if (!au->error.has_error)
{
UNLOCK (&au->lock);
switch (ctq->type)
{
case AIO_WRITE_DONE:
NOTE ("Finalizing write");
st_write_done_worker (au->pdt);
UNLOCK (&au->io_lock);
break;
case AIO_READ_DONE:
NOTE ("Finalizing read");
st_read_done_worker (au->pdt);
UNLOCK (&au->io_lock);
break;
case AIO_DATA_TRANSFER_INIT:
NOTE ("Data transfer init");
LOCK (&au->io_lock);
update_pdt (&au->pdt, ctq->new_pdt);
data_transfer_init_worker (au->pdt, ctq->read_flag);
break;
case AIO_TRANSFER_SCALAR:
NOTE ("Starting scalar transfer");
ctq->arg.scalar.transfer (au->pdt, ctq->arg.scalar.arg_bt,
ctq->arg.scalar.data,
ctq->arg.scalar.i,
ctq->arg.scalar.s1,
ctq->arg.scalar.s2);
break;
case AIO_TRANSFER_ARRAY:
NOTE ("Starting array transfer");
NOTE ("ctq->arg.array.desc = %p",
(void *) (ctq->arg.array.desc));
transfer_array_inner (au->pdt, ctq->arg.array.desc,
ctq->arg.array.kind,
ctq->arg.array.charlen);
free (ctq->arg.array.desc);
break;
case AIO_CLOSE:
NOTE ("Received AIO_CLOSE");
goto finish_thread;
default:
internal_error (NULL, "Invalid queue type");
break;
}
LOCK (&au->lock);
if (unlikely (au->error.has_error))
au->error.last_good_id = au->id.low - 1;
}
else
{
if (ctq->type == AIO_WRITE_DONE || ctq->type == AIO_READ_DONE)
{
UNLOCK (&au->io_lock);
}
else if (ctq->type == AIO_CLOSE)
{
NOTE ("Received AIO_CLOSE during error condition");
UNLOCK (&au->lock);
goto finish_thread;
}
}
NOTE ("Next ctq, current id: %d", au->id.low);
if (ctq->has_id && au->id.waiting == au->id.low++)
SIGNAL (&au->id.done);
ctq = ctq->next;
}
au->tail = NULL;
au->head = NULL;
au->empty = 1;
UNLOCK (&au->lock);
SIGNAL (&au->emptysignal);
LOCK (&au->lock);
}
finish_thread:
au->tail = NULL;
au->head = NULL;
au->empty = 1;
SIGNAL (&au->emptysignal);
free (ctq);
return NULL;
}
/* Free an asynchronous unit. */
static void
free_async_unit (async_unit *au)
{
if (au->tail)
internal_error (NULL, "Trying to free nonempty asynchronous unit");
destroy_adv_cond (&au->work);
destroy_adv_cond (&au->emptysignal);
destroy_adv_cond (&au->id.done);
T_ERROR (__gthread_mutex_destroy, &au->lock);
free (au);
}
/* Initialize an adv_cond structure. */
static void
init_adv_cond (struct adv_cond *ac)
{
ac->pending = 0;
__GTHREAD_MUTEX_INIT_FUNCTION (&ac->lock);
__gthread_cond_init_function (&ac->signal);
}
/* Initialize an asyncronous unit, returning zero on success,
nonzero on failure. It also sets u->au. */
void
init_async_unit (gfc_unit *u)
{
async_unit *au;
if (!__gthread_active_p ())
{
u->au = NULL;
return;
}
au = (async_unit *) xmalloc (sizeof (async_unit));
u->au = au;
init_adv_cond (&au->work);
init_adv_cond (&au->emptysignal);
__GTHREAD_MUTEX_INIT_FUNCTION (&au->lock);
__GTHREAD_MUTEX_INIT_FUNCTION (&au->io_lock);
LOCK (&au->lock);
T_ERROR (__gthread_create, &au->thread, &async_io, (void *) u);
au->pdt = NULL;
au->head = NULL;
au->tail = NULL;
au->empty = true;
au->id.waiting = -1;
au->id.low = 0;
au->id.high = 0;
au->error.fatal_error = 0;
au->error.has_error = 0;
au->error.last_good_id = 0;
init_adv_cond (&au->id.done);
UNLOCK (&au->lock);
}
/* Enqueue a transfer statement. */
void
enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
{
transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
tq->arg = *arg;
tq->type = type;
tq->has_id = 0;
LOCK (&au->lock);
if (!au->tail)
au->head = tq;
else
au->tail->next = tq;
au->tail = tq;
REVOKE_SIGNAL (&(au->emptysignal));
au->empty = false;
UNLOCK (&au->lock);
SIGNAL (&au->work);
}
/* Enqueue an st_write_done or st_read_done which contains an ID. */
int
enqueue_done_id (async_unit *au, enum aio_do type)
{
int ret;
transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
tq->type = type;
tq->has_id = 1;
LOCK (&au->lock);
if (!au->tail)
au->head = tq;
else
au->tail->next = tq;
au->tail = tq;
REVOKE_SIGNAL (&(au->emptysignal));
au->empty = false;
ret = au->id.high++;
NOTE ("Enqueue id: %d", ret);
UNLOCK (&au->lock);
SIGNAL (&au->work);
return ret;
}
/* Enqueue an st_write_done or st_read_done without an ID. */
void
enqueue_done (async_unit *au, enum aio_do type)
{
transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
tq->type = type;
tq->has_id = 0;
LOCK (&au->lock);
if (!au->tail)
au->head = tq;
else
au->tail->next = tq;
au->tail = tq;
REVOKE_SIGNAL (&(au->emptysignal));
au->empty = false;
UNLOCK (&au->lock);
SIGNAL (&au->work);
}
/* Enqueue a CLOSE statement. */
void
enqueue_close (async_unit *au)
{
transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
tq->type = AIO_CLOSE;
LOCK (&au->lock);
if (!au->tail)
au->head = tq;
else
au->tail->next = tq;
au->tail = tq;
REVOKE_SIGNAL (&(au->emptysignal));
au->empty = false;
UNLOCK (&au->lock);
SIGNAL (&au->work);
}
/* The asynchronous unit keeps the currently active PDT around.
This function changes that to the current one. */
void
enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
{
st_parameter_dt *new = xmalloc (sizeof (st_parameter_dt));
transfer_queue *tq = xmalloc (sizeof (transfer_queue));
memcpy ((void *) new, (void *) dt, sizeof (st_parameter_dt));
NOTE ("dt->internal_unit_desc = %p", dt->internal_unit_desc);
NOTE ("common.flags & mask = %d", dt->common.flags & IOPARM_LIBRETURN_MASK);
tq->next = NULL;
tq->type = AIO_DATA_TRANSFER_INIT;
tq->read_flag = read_flag;
tq->has_id = 0;
tq->new_pdt = new;
LOCK (&au->lock);
if (!au->tail)
au->head = tq;
else
au->tail->next = tq;
au->tail = tq;
REVOKE_SIGNAL (&(au->emptysignal));
au->empty = 0;
UNLOCK (&au->lock);
SIGNAL (&au->work);
}
/* Collect the errors that may have happened asynchronously. Return true if
an error has been encountered. */
bool
collect_async_errors (st_parameter_common *cmp, async_unit *au)
{
bool has_error = au->error.has_error;
if (has_error)
{
if (generate_error_common (cmp, au->error.family, au->error.message))
{
au->error.has_error = 0;
au->error.cmp = NULL;
}
else
{
/* The program will exit later. */
au->error.fatal_error = true;
}
}
return has_error;
}
/* Perform a wait operation on an asynchronous unit with an ID specified,
which means collecting the errors that may have happened asynchronously.
Return true if an error has been encountered. */
bool
async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
{
bool ret;
if (au == NULL)
return false;
if (cmp == NULL)
cmp = au->error.cmp;
if (au->error.has_error)
{
if (i <= au->error.last_good_id)
return false;
return collect_async_errors (cmp, au);
}
LOCK (&au->lock);
NOTE ("Waiting for id %d", i);
if (au->id.waiting < i)
au->id.waiting = i;
UNLOCK (&au->lock);
SIGNAL (&(au->work));
LOCK (&au->lock);
WAIT_SIGNAL_MUTEX (&(au->id.done),
(au->id.low >= au->id.waiting || au->empty), &au->lock);
LOCK (&au->lock);
ret = collect_async_errors (cmp, au);
UNLOCK (&au->lock);
return ret;
}
/* Perform a wait operation an an asynchronous unit without an ID. */
bool
async_wait (st_parameter_common *cmp, async_unit *au)
{
bool ret;
if (au == NULL)
return false;
if (cmp == NULL)
cmp = au->error.cmp;
SIGNAL (&(au->work));
LOCK (&(au->lock));
if (au->empty)
{
ret = collect_async_errors (cmp, au);
UNLOCK (&au->lock);
return ret;
}
WAIT_SIGNAL_MUTEX (&(au->emptysignal), (au->empty), &au->lock);
ret = collect_async_errors (cmp, au);
return ret;
}
/* Close an asynchronous unit. */
void
async_close (async_unit *au)
{
if (au == NULL)
return;
NOTE ("Closing async unit");
enqueue_close (au);
T_ERROR (__gthread_join, au->thread, NULL);
free_async_unit (au);
}
#else
/* Only set u->au to NULL so no async I/O will happen. */
void
init_async_unit (gfc_unit *u)
{
u->au = NULL;
return;
}
/* Do-nothing function, which will not be called. */
void
enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
{
return;
}
/* Do-nothing function, which will not be called. */
int
enqueue_done_id (async_unit *au, enum aio_do type)
{
return 0;
}
/* Do-nothing function, which will not be called. */
void
enqueue_done (async_unit *au, enum aio_do type)
{
return;
}
/* Do-nothing function, which will not be called. */
void
enqueue_close (async_unit *au)
{
return;
}
/* Do-nothing function, which will not be called. */
void
enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
{
return;
}
/* Do-nothing function, which will not be called. */
bool
collect_async_errors (st_parameter_common *cmp, async_unit *au)
{
return false;
}
/* Do-nothing function, which will not be called. */
bool
async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
{
return false;
}
/* Do-nothing function, which will not be called. */
bool
async_wait (st_parameter_common *cmp, async_unit *au)
{
return false;
}
/* Do-nothing function, which will not be called. */
void
async_close (async_unit *au)
{
return;
}
#endif