Skip to content

Commit

Permalink
use thread pool und thread-id basis
Browse files Browse the repository at this point in the history
  • Loading branch information
catkira committed Nov 7, 2024
1 parent eea1f38 commit e167ea6
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 19 deletions.
2 changes: 2 additions & 0 deletions iiod-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,9 @@ void iiod_client_free_block(struct iio_block_pdata *block)
/* Cancel any I/O going on. This means we must send the block free
* command through the main I/O as the block's I/O stream is
* disrupted. */
printf("thread = %u, iiod_io_cancel...\n", pthread_self());
iiod_io_cancel(block->io);
printf("thread = %u, iiod_io_cancel ok\n", pthread_self());
iiod_io_unref(block->io);

io = iiod_responder_get_default_io(client->responder);
Expand Down
56 changes: 39 additions & 17 deletions iiod-responder.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ struct iiod_io {
/* Cond to sleep until I/O is done */
struct iio_cond *cond;
struct iio_mutex *lock;
struct iio_mutex *inuse_lock;

/* Set to true when the response has been read */
bool r_done;
Expand All @@ -71,6 +70,9 @@ struct iiod_responder {
void *d;

struct iiod_io *readers, *writers, *default_io;
struct iiod_io* default_io_pool[100];
unsigned int default_io_pool_thread_ids[100];
unsigned int default_io_pool_size;
uint16_t next_client_id;

struct iio_mutex *lock;
Expand Down Expand Up @@ -114,6 +116,15 @@ static void __iiod_io_cancel_unlocked(struct iiod_io *io)
struct iiod_responder *priv = io->responder;
struct iiod_io *tmp;

if (priv->readers) {
for (tmp = priv->readers; tmp; ) {
if (tmp == tmp->r_next) {
printf("loop detected!!!\n");
}
tmp = tmp->r_next;
}
}

/* Discard the entry from the readers list */
printf("thread = %u, discard reader %d\n", pthread_self(), io);
if (io == priv->readers) {
Expand All @@ -126,7 +137,6 @@ static void __iiod_io_cancel_unlocked(struct iiod_io *io)
}
}
}
iio_mutex_unlock(io->inuse_lock);
}

static ssize_t iiod_rw_all(struct iiod_responder *priv,
Expand Down Expand Up @@ -567,7 +577,6 @@ int iiod_io_get_response_async(struct iiod_io *io,
return priv->thrd_err_code;
}

iio_mutex_lock(io->inuse_lock);
if (nb)
memcpy(io->r_io.buf, buf, sizeof(*buf) * nb);
io->r_io.nb_buf = nb;
Expand Down Expand Up @@ -656,11 +665,6 @@ iiod_responder_create_io(struct iiod_responder *priv, uint16_t id)
if (err)
goto err_free_cond;

io->inuse_lock = iio_mutex_create();
err = iio_err(io->inuse_lock);
if (err)
goto err_free_cond;

io->client_id = id;

printf("thread = %u, created io %d\n", pthread_self(), io);
Expand All @@ -676,8 +680,8 @@ iiod_responder_create_io(struct iiod_responder *priv, uint16_t id)
void
iiod_responder_set_timeout(struct iiod_responder *priv, unsigned int timeout_ms)
{
priv->timeout_ms = timeout_ms;
priv->default_io->timeout_ms = timeout_ms;
// priv->timeout_ms = timeout_ms;
// priv->default_io->timeout_ms = timeout_ms;
}

void
Expand All @@ -704,10 +708,7 @@ iiod_responder_create(const struct iiod_responder_ops *ops, void *d)
if (err)
goto err_free_priv;

priv->default_io = iiod_responder_create_io(priv, 0);
err = iio_err(priv->default_io);
if (err)
goto err_free_lock;
priv->default_io_pool_size = 0;

priv->write_task = iio_task_create(iiod_responder_write, priv,
"iiod-responder-writer-task");
Expand Down Expand Up @@ -780,7 +781,8 @@ struct iiod_io * iiod_command_create_io(const struct iiod_command *cmd,
{
struct iiod_responder *priv = (struct iiod_responder *) data;

return iiod_responder_create_io(priv, cmd->client_id);
struct iiod_io* io = iiod_responder_create_io(priv, cmd->client_id);
return io;
}

void iiod_io_cancel(struct iiod_io *io)
Expand All @@ -802,11 +804,11 @@ void iiod_io_cancel(struct iiod_io *io)

/* Cancel any pending response request */
iiod_io_cancel_response(io);
iio_mutex_unlock(io->inuse_lock);
}

static void iiod_io_destroy(struct iiod_io *io)
{
printf("destroy %d\n", io);
iio_mutex_destroy(io->lock);
iio_cond_destroy(io->cond);
free(io);
Expand Down Expand Up @@ -846,7 +848,27 @@ void iiod_io_unref(struct iiod_io *io)
struct iiod_io *
iiod_responder_get_default_io(struct iiod_responder *priv)
{
return priv->default_io;
int idx = -1;
for (unsigned int i = 0; i < priv->default_io_pool_size; i++) {
if (priv->default_io_pool_thread_ids[i] == pthread_self()) {
idx = i;
break;
}
}
struct iiod_io *io;
if (idx != -1) {
printf("using existing io element for thread %u\n", pthread_self());
io = priv->default_io_pool[idx];
}
else {
printf("creating new io element for thread %u\n", pthread_self());
io = iiod_responder_create_io(priv, 0);
io->timeout_ms = priv->timeout_ms;
priv->default_io_pool_thread_ids[priv->default_io_pool_size] = pthread_self();
priv->default_io_pool[priv->default_io_pool_size] = io;
priv->default_io_pool_size++;
}
return io;
}

struct iiod_io *
Expand Down
5 changes: 4 additions & 1 deletion network-unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ void do_cancel(struct iiod_client_pdata *io_ctx)
{
uint64_t event = 1;
int ret;

printf("thread = %u, cancel... cancel_fd = %d\n", pthread_self(), io_ctx->cancel_fd[CANCEL_WR_FD]);
ret = write(io_ctx->cancel_fd[CANCEL_WR_FD], &event, sizeof(event));
if (ret == -1) {
/* If this happens something went very seriously wrong */
prm_perror(io_ctx->params, -errno,
"Unable to signal cancellation event");
printf("do cancel error!\n");
}
}

Expand All @@ -117,11 +118,13 @@ int wait_cancellable(struct iiod_client_pdata *io_ctx,
pfd[0].events = POLLOUT;
pfd[1].fd = io_ctx->cancel_fd[0];
pfd[1].events = POLLIN;
printf("thread = %u, wait cancellable... cancel_fd = %d\n", pthread_self(), pfd[1].fd);

do {
do {
ret = poll(pfd, 2, timeout);
} while (ret == -1 && errno == EINTR);
printf("thread = %u, pfd[0].revents = %d, pfd[1].revents = %d, ret = %d\n",pthread_self(), pfd[0].revents,pfd[1].revents, ret);

if (ret == -1)
return -errno;
Expand Down
4 changes: 3 additions & 1 deletion network.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,13 @@ static ssize_t network_recv(struct iiod_client_pdata *io_ctx, void *data,
while (1) {
if (cancellable) {
ret = wait_cancellable(io_ctx, true, timeout_ms);
printf("thread = %u, wait cancellable ret = %d\n", pthread_self(), ret);
if (ret < 0)
return ret;
}

printf("thread = %u, recv...\n", pthread_self());
ret = recv(io_ctx->fd, data, (int) len, flags);
printf("thread = %u, recv ret = %d\n", pthread_self(), ret);
if (ret == 0)
return -EPIPE;
else if (ret > 0)
Expand Down

0 comments on commit e167ea6

Please sign in to comment.