Skip to content

Commit

Permalink
Support background command by sys-arg "%delay" (#82)
Browse files Browse the repository at this point in the history
* add sys arg %delay parsing and display it in flow and executing
* wait for all background commands finish
* display background commands' output after main thread finish
* display current background tasks in main thread executor
* change session-dir in background command executing
  • Loading branch information
innerr authored Nov 28, 2021
1 parent ba505cb commit 8ef377e
Show file tree
Hide file tree
Showing 25 changed files with 912 additions and 167 deletions.
10 changes: 7 additions & 3 deletions pkg/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ func RegisterCmds(cmds *core.CmdTree) {
RegisterTrivialCmds(cmds)
RegisterFlowCmds(cmds)
RegisterHubCmds(cmds)
RegisterDbgCmds(cmds.AddSub("dbg"))
RegisterSessionCmds(cmds.AddSub("session", "sessions").RegEmptyCmd("manage sessions").Owner())
RegisterDbgCmds(cmds.AddSub("dbg").RegEmptyCmd("debug related commands").Owner())
RegisterMiscCmds(cmds)
RegisterDisplayCmds(cmds.AddSub("display", "disp", "dis", "di"))
RegisterBuiltinCmds(cmds.AddSub("builtin", "b", "B").SetHidden())
RegisterDisplayCmds(cmds.AddSub("display", "disp", "dis", "di").RegEmptyCmd("display related commands").Owner())
RegisterBuiltinCmds(cmds.AddSub("builtin", "b", "B").RegEmptyCmd("internal commands, mostly for init loading").Owner().SetHidden())
}

func RegisterExecutorCmds(cmds *core.CmdTree) {
Expand Down Expand Up @@ -327,6 +328,9 @@ func RegisterVerbCmds(cmds *core.CmdTree) {
AddArg("volume", "1", "vol", "v", "V")
}

func RegisterSessionCmds(cmds *core.CmdTree) {
}

func RegisterHubCmds(cmds *core.CmdTree) {
listHubHelpStr := "list dir and repo info in hub"
hub := cmds.AddSub("hub", "h", "H").
Expand Down
18 changes: 13 additions & 5 deletions pkg/builtin/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,24 @@ func Sleep(

dur, err := time.ParseDuration(durStr)
if err != nil {
fmt.Printf("[Sleep] time string '%s' parse failed: %v\n", durStr, err)
return currCmdIdx, false
panic(fmt.Errorf("[Sleep] time string '%s' parse failed: %v\n", durStr, err))
}
fmt.Printf(".zzZZ ")
secs := int(dur.Seconds())
if secs == 0 {
return currCmdIdx, true
}

for i := 0; i < secs; i++ {
fmt.Printf(".")
if i%60 == 0 && i != 0 && i+1 != secs {
cc.Screen.Print("\n")
}
if i%60 == 0 && i+1 != secs {
cc.Screen.Print(".zzZZ ")
}
cc.Screen.Print(".")
time.Sleep(time.Second)
}
fmt.Printf(" *\\O/*\n")
cc.Screen.Print("\n")
return currCmdIdx, true
}

Expand Down
178 changes: 178 additions & 0 deletions pkg/cli/core/bg_tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package core

import (
"bytes"
"fmt"
"io"
"os/exec"
"sync"
)

type CmdIO struct {
CmdStdin io.Reader
CmdStdout io.Writer
CmdStderr io.Writer
}

func (self CmdIO) SetupCmd(cmd *exec.Cmd) {
if self.CmdStdin != nil {
cmd.Stdin = self.CmdStdin
}
if self.CmdStdout != nil {
cmd.Stdout = self.CmdStdout
}
if self.CmdStderr != nil {
cmd.Stderr = self.CmdStderr
}
}

type BgStdout struct {
bg *bytes.Buffer
fg io.Writer
lock sync.Mutex
}

func NewBgStdout() *BgStdout {
return &BgStdout{
bg: bytes.NewBuffer(nil),
}
}

func (self *BgStdout) BringToFront(fg io.Writer) {
self.lock.Lock()
defer self.lock.Unlock()
_, err := io.Copy(fg, self.bg)
if err != nil {
panic(err)
}
self.fg = fg
self.bg = nil
}

func (self *BgStdout) Write(p []byte) (n int, err error) {
self.lock.Lock()
defer self.lock.Unlock()
if self.fg != nil {
return self.fg.Write(p)
}
return self.bg.Write(p)
}

type BgTaskInfo struct {
Tid string
Cmd string
Started bool
Finished bool
}

type BgTask struct {
info BgTaskInfo
stdout *BgStdout
finishNotifier chan interface{}
lock sync.Mutex
}

func NewBgTask(tid string, cmd string, stdout *BgStdout) *BgTask {
return &BgTask{
info: BgTaskInfo{
Tid: tid,
Cmd: cmd,
},
stdout: stdout,
finishNotifier: make(chan interface{}),
}
}

func (self *BgTask) OnStart() {
self.lock.Lock()
defer self.lock.Unlock()
self.info.Started = true
}

func (self *BgTask) GetStat() BgTaskInfo {
self.lock.Lock()
defer self.lock.Unlock()
return self.info
}

func (self *BgTask) OnFinish() {
self.lock.Lock()
self.info.Finished = true
self.lock.Unlock()
self.finishNotifier <- nil
}

func (self *BgTask) WaitForFinish() {
<-self.finishNotifier
}

type BgTasks struct {
tids []string
tasks map[string]*BgTask
lock sync.Mutex
}

func NewBgTasks() *BgTasks {
return &BgTasks{
tids: []string{},
tasks: map[string]*BgTask{},
}
}

func (self *BgTasks) GetOrAddTask(tid string, cmd string, stdout *BgStdout) *BgTask {
self.lock.Lock()
defer self.lock.Unlock()
task, ok := self.tasks[tid]
if ok {
return task
}
self.tids = append(self.tids, tid)
task = NewBgTask(tid, cmd, stdout)
self.tasks[tid] = task
return task
}

func (self *BgTasks) GetEarliestTask() (tid string, task *BgTask, ok bool) {
self.lock.Lock()
defer self.lock.Unlock()
if len(self.tids) == 0 {
return
}
tid = self.tids[0]
task, ok = self.tasks[tid]
return
}

func (self *BgTasks) GetStat() []BgTaskInfo {
self.lock.Lock()
defer self.lock.Unlock()
infos := make([]BgTaskInfo, len(self.tids))
for i, tid := range self.tids {
infos[i] = self.tasks[tid].GetStat()
}
return infos
}

func (self *BgTasks) BringBgTaskToFront(tid string, stdout io.Writer) {
self.lock.Lock()
defer self.lock.Unlock()
task, ok := self.tasks[tid]
if !ok {
panic(fmt.Errorf("[BgTasks.BringBgTaskToFront] task '%s' not found", tid))
}
task.stdout.BringToFront(stdout)
}

func (self *BgTasks) RemoveTask(tid string) {
self.lock.Lock()
defer self.lock.Unlock()
_, ok := self.tasks[tid]
if len(self.tids) == 0 || !ok {
panic(fmt.Errorf("[BgTasks.RemoveTask] task '%s' not found", tid))
}
if self.tids[0] != tid {
panic(fmt.Errorf("[BgTasks.RemoveTask] removing task '%s' is not the earliest", tid))
}
self.tids = self.tids[1:]
delete(self.tasks, tid)
}
50 changes: 31 additions & 19 deletions pkg/cli/core/cli.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,12 @@
package core

// TODO: use io.Write ?
type Screen interface {
Print(text string)
Error(text string)
// Same as line-number, but it's the count of 'Print'
OutputNum() int
}

type QuietScreen struct {
outN int
}

func (self *QuietScreen) Print(text string) {
self.outN += 1
}

func (self *QuietScreen) Error(text string) {
}

func (self *QuietScreen) OutputNum() int {
return self.outN
}

type Executor interface {
Execute(caller string, cc *Cli, input ...string) bool
}
Expand All @@ -36,9 +20,11 @@ type Cli struct {
TolerableErrs *TolerableErrs
Executor Executor
Helps *Helps
BgTasks *BgTasks
CmdIO CmdIO
}

func NewCli(env *Env, screen Screen, cmds *CmdTree, parser CliParser, abbrs *EnvAbbrs) *Cli {
func NewCli(env *Env, screen Screen, cmds *CmdTree, parser CliParser, abbrs *EnvAbbrs, cmdIO CmdIO) *Cli {
return &Cli{
env,
screen,
Expand All @@ -48,18 +34,44 @@ func NewCli(env *Env, screen Screen, cmds *CmdTree, parser CliParser, abbrs *Env
NewTolerableErrs(),
nil,
NewHelps(),
NewBgTasks(),
cmdIO,
}
}

func (self *Cli) Clone() *Cli {
func (self *Cli) Copy() *Cli {
return &Cli{
self.GlobalEnv,
self.Screen,
self.Cmds,
self.Parser,
self.EnvAbbrs,
self.TolerableErrs,
nil,
self.Executor,
self.Helps,
self.BgTasks,
self.CmdIO,
}
}

// TODO: fixme, do real clone for ready-only instances
func (self *Cli) CloneForAsyncExecuting(env *Env) *Cli {
screen := NewBgTaskScreen()
bgStdout := screen.GetBgStdout()
return &Cli{
env.GetLayer(EnvLayerSession),
screen,
self.Cmds,
self.Parser,
self.EnvAbbrs,
NewTolerableErrs(),
self.Executor,
self.Helps,
self.BgTasks,
CmdIO{
nil,
bgStdout,
bgStdout,
},
}
}
9 changes: 3 additions & 6 deletions pkg/cli/core/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package core

import (
"fmt"
"os"
"os/exec"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -143,14 +142,14 @@ func (self *Cmd) Execute(
cc *Cli,
env *Env,
flow *ParsedCmds,
currCmdIdx int) (int, bool) {
currCmdIdx int) (newCurrCmdIdx int, ok bool) {

begin := time.Now()
if len(self.autoTimerKeys.Begin) != 0 {
env.GetLayer(EnvLayerSession).SetInt(self.autoTimerKeys.Begin, int(begin.Unix()))
}

newCurrCmdIdx, ok := self.execute(argv, cc, env, flow, currCmdIdx)
newCurrCmdIdx, ok = self.execute(argv, cc, env, flow, currCmdIdx)
if !ok {
// Normally the command should print info before return false, so no need to panic
// panic(NewCmdError(flow.Cmds[currCmdIdx], "command failed without detail info"))
Expand Down Expand Up @@ -549,9 +548,7 @@ func (self *Cmd) executeFile(argv ArgVals, cc *Cli, env *Env, parsedCmd ParsedCm
}
cmd := exec.Command(bin, args...)

cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cc.CmdIO.SetupCmd(cmd)

err := cmd.Run()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/core/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func NewCmdTree(strs *CmdTreeStrs) *CmdTree {

func (self *CmdTree) Execute(
argv ArgVals,
sysArgv SysArgVals,
cc *Cli,
env *Env,
flow *ParsedCmds,
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/core/dep.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TryExeEnvOpCmds(
if !cmd.IsTheSameFunc(it.Func) {
continue
}
newCC := cc.Clone()
newCC := cc.Copy()
newCC.Screen = &QuietScreen{}
_, succeeded := cmd.Execute(argv, newCC, env, flow, currCmdIdx)
if !succeeded {
Expand Down
Loading

0 comments on commit 8ef377e

Please sign in to comment.