Skip to content

Commit

Permalink
linux: wip - issue console read() calls from background thread
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin O'Connor <[email protected]>
  • Loading branch information
KevinOConnor committed Jan 3, 2025
1 parent 01386c1 commit c25b06d
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 49 deletions.
147 changes: 105 additions & 42 deletions src/linux/console.c
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
// TTY based IO
//
// Copyright (C) 2017-2021 Kevin O'Connor <[email protected]>
// Copyright (C) 2017-2025 Kevin O'Connor <[email protected]>
//
// This file may be distributed under the terms of the GNU GPLv3 license.

#define _GNU_SOURCE
#include <errno.h> // errno
#include <fcntl.h> // fcntl
#include <poll.h> // ppoll
#include <poll.h> // poll
#include <pty.h> // openpty
#include <stdio.h> // fprintf
#include <string.h> // memmove
#include <sys/stat.h> // chmod
#include <time.h> // struct timespec
#include <unistd.h> // ttyname
#include <pthread.h> // pthread_create
#include "board/irq.h" // irq_wait
#include "board/misc.h" // console_sendf
#include "command.h" // command_find_block
#include "internal.h" // console_setup
#include "sched.h" // sched_wake_task

static struct pollfd main_pfd[1];
#define MP_TTY_IDX 0

// Report 'errno' in a message written to stderr
void
report_errno(char *where, int rc)
Expand All @@ -33,16 +30,93 @@ report_errno(char *where, int rc)


/****************************************************************
* Console handling
* Console reading background thread
****************************************************************/

// Global storage for input command reading
static struct {
struct task_wake console_wake;
uint8_t receive_buf[4096];
int receive_pos;

// Main input file
int fd;

// All variables below must be protected by lock
pthread_mutex_t lock;

int receive_pos, force_shutdown;
} ConsoleInfo;

// Sleep until a signal received (waking early for console input if needed)
static void *
console_thread(void *data)
{
int MP_TTY_IDX = 0;
struct pollfd main_pfd[1];
main_pfd[MP_TTY_IDX].fd = ConsoleInfo.fd;
main_pfd[MP_TTY_IDX].events = POLLIN;

uint8_t *receive_buf = ConsoleInfo.receive_buf;
for (;;) {
int ret = poll(main_pfd, ARRAY_SIZE(main_pfd), 100);
if (ret <= 0) {
if (errno != EINTR)
report_errno("poll main_pfd", ret);
return NULL;
}
if (!main_pfd[MP_TTY_IDX].revents)
continue;
sched_wake_task(&ConsoleInfo.console_wake);

// Read data
pthread_mutex_lock(&ConsoleInfo.lock);
int receive_pos = ConsoleInfo.receive_pos;
pthread_mutex_unlock(&ConsoleInfo.lock);
uint8_t readsize = sizeof(ConsoleInfo.receive_buf) - receive_pos;
if (readsize <= 0) {
usleep(10); // XXX
continue;
}
ret = read(main_pfd[MP_TTY_IDX].fd, &receive_buf[receive_pos]
, readsize);
if (ret < 0) {
if (errno == EWOULDBLOCK) {
continue;
} else {
report_errno("read", ret);
return NULL;
}
}

// Check for forced shutdown indicator
if (ret == 15 && receive_buf[receive_pos+14] == '\n'
&& memcmp(&receive_buf[receive_pos], "FORCE_SHUTDOWN\n", 15) == 0) {
pthread_mutex_lock(&ConsoleInfo.lock);
ConsoleInfo.force_shutdown = 1;
timer_wake_task_from_thread(&ConsoleInfo.console_wake);
pthread_mutex_unlock(&ConsoleInfo.lock);
continue;
}

// Add to buffer
pthread_mutex_lock(&ConsoleInfo.lock);
int new_receive_pos = ConsoleInfo.receive_pos;
if (new_receive_pos != receive_pos)
memmove(&receive_buf[new_receive_pos], &receive_buf[receive_pos]
, receive_pos - new_receive_pos);
ConsoleInfo.receive_pos = new_receive_pos + ret;

timer_wake_task_from_thread(&ConsoleInfo.console_wake);

pthread_mutex_unlock(&ConsoleInfo.lock);
}
}


/****************************************************************
* Console handling
****************************************************************/

void *
console_receive_buffer(void)
{
Expand All @@ -56,27 +130,18 @@ console_task(void)
if (!sched_check_wake(&ConsoleInfo.console_wake))
return;

// Read data
int receive_pos = ConsoleInfo.receive_pos;
uint8_t *receive_buf = ConsoleInfo.receive_buf;
int ret = read(main_pfd[MP_TTY_IDX].fd, &receive_buf[receive_pos]
, sizeof(ConsoleInfo.receive_buf) - receive_pos);
if (ret < 0) {
if (errno == EWOULDBLOCK) {
ret = 0;
} else {
report_errno("read", ret);
return;
}
}
if (ret == 15 && receive_buf[receive_pos+14] == '\n'
&& memcmp(&receive_buf[receive_pos], "FORCE_SHUTDOWN\n", 15) == 0)
pthread_mutex_lock(&ConsoleInfo.lock);
if (ConsoleInfo.force_shutdown) {
ConsoleInfo.force_shutdown = 0;
pthread_mutex_unlock(&ConsoleInfo.lock);
shutdown("Force shutdown command");
}

// Find and dispatch message blocks in the input
int len = receive_pos + ret;
uint8_t *receive_buf = ConsoleInfo.receive_buf;
int len = ConsoleInfo.receive_pos;
uint_fast8_t pop_count, msglen = len > MESSAGE_MAX ? MESSAGE_MAX : len;
ret = command_find_and_dispatch(receive_buf, msglen, &pop_count);
int ret = command_find_and_dispatch(receive_buf, msglen, &pop_count);
if (ret) {
len -= pop_count;
if (len) {
Expand All @@ -85,6 +150,7 @@ console_task(void)
}
}
ConsoleInfo.receive_pos = len;
pthread_mutex_unlock(&ConsoleInfo.lock);
}
DECL_TASK(console_task);

Expand All @@ -97,25 +163,11 @@ console_sendf(const struct command_encoder *ce, va_list args)
uint_fast8_t msglen = command_encode_and_frame(buf, ce, args);

// Transmit message
int ret = write(main_pfd[MP_TTY_IDX].fd, buf, msglen);
int ret = write(ConsoleInfo.fd, buf, msglen);
if (ret < 0)
report_errno("write", ret);
}

// Sleep until a signal received (waking early for console input if needed)
void
console_sleep(sigset_t *sigset)
{
int ret = ppoll(main_pfd, ARRAY_SIZE(main_pfd), NULL, sigset);
if (ret <= 0) {
if (errno != EINTR)
report_errno("ppoll main_pfd", ret);
return;
}
if (main_pfd[MP_TTY_IDX].revents)
sched_wake_task(&ConsoleInfo.console_wake);
}


/****************************************************************
* Setup
Expand Down Expand Up @@ -168,8 +220,7 @@ console_setup(char *name)
ret = set_close_on_exec(sfd);
if (ret)
return -1;
main_pfd[MP_TTY_IDX].fd = mfd;
main_pfd[MP_TTY_IDX].events = POLLIN;
ConsoleInfo.fd = mfd;

// Create symlink to tty
unlink(name);
Expand All @@ -194,5 +245,17 @@ console_setup(char *name)
if (ret)
return -1;

// Create background reading thread
ret = pthread_mutex_init(&ConsoleInfo.lock, NULL);
if (ret)
return -1;

pthread_t reader_tid; // Not used
timer_disable_signals();
ret = pthread_create(&reader_tid, NULL, console_thread, NULL);
timer_enable_signals();
if (ret)
return -1;

return 0;
}
3 changes: 2 additions & 1 deletion src/linux/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ void report_errno(char *where, int rc);
int set_non_blocking(int fd);
int set_close_on_exec(int fd);
int console_setup(char *name);
void console_sleep(sigset_t *sigset);

// timer.c
int timer_check_periodic(uint32_t *ts);
void timer_disable_signals(void);
void timer_enable_signals(void);
struct task_wake;
void timer_wake_task_from_thread(struct task_wake *w);

// watchdog.c
int watchdog_setup(void);
Expand Down
33 changes: 27 additions & 6 deletions src/linux/timer.c
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
// Handling of timers on linux systems
//
// Copyright (C) 2017-2021 Kevin O'Connor <[email protected]>
// Copyright (C) 2017-2025 Kevin O'Connor <[email protected]>
//
// This file may be distributed under the terms of the GNU GPLv3 license.

#include <pthread.h> // pthread_mutex_t
#include <time.h> // struct timespec
#include "autoconf.h" // CONFIG_CLOCK_FREQ
#include "board/io.h" // readl
#include "board/irq.h" // irq_disable
#include "board/misc.h" // timer_from_us
#include "command.h" // DECL_CONSTANT
#include "internal.h" // console_sleep
#include "internal.h" // timer_check_periodic
#include "sched.h" // DECL_INIT

// Global storage for timer handling
Expand All @@ -27,6 +28,10 @@ static struct {
// Unix signal tracking
timer_t t_alarm;
sigset_t ss_alarm, ss_sleep;
// Waking from background threads
pthread_mutex_t lock;
pthread_cond_t cond;
int sleeping;
} TimerInfo;


Expand Down Expand Up @@ -269,10 +274,12 @@ irq_wait(void)
{
// Must atomically sleep until signaled
if (!readl(&TimerInfo.must_wake_timers)) {
timer_disable_signals();
if (!TimerInfo.must_wake_timers)
console_sleep(&TimerInfo.ss_sleep);
timer_enable_signals();
pthread_mutex_lock(&TimerInfo.lock);
TimerInfo.sleeping = 1;
pthread_cond_timedwait(&TimerInfo.cond, &TimerInfo.lock
, &TimerInfo.next_wake);
TimerInfo.sleeping = 0;
pthread_mutex_unlock(&TimerInfo.lock);
}
irq_poll();
}
Expand All @@ -283,3 +290,17 @@ irq_poll(void)
if (readl(&TimerInfo.must_wake_timers))
timer_dispatch();
}

void
timer_wake_task_from_thread(struct task_wake *w)
{
pthread_mutex_lock(&TimerInfo.lock);

// XXX - may not be thread safe
sched_wake_task(w);

if (TimerInfo.sleeping)
pthread_cond_signal(&TimerInfo.cond);

pthread_mutex_unlock(&TimerInfo.lock);
}

0 comments on commit c25b06d

Please sign in to comment.