Skip to content

Commit

Permalink
Implement pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
mitchpaulus committed Sep 20, 2024
1 parent 88fd852 commit 8e74627
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 8 deletions.
181 changes: 175 additions & 6 deletions mshell-go/Evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/exec"
"strconv"
"strings"
"sync"
)

type MShellStack []MShellObject
Expand Down Expand Up @@ -188,6 +189,8 @@ func (state *EvalState) Evaluate(tokens []Token, stack *MShellStack, context Exe
switch top.(type) {
case *MShellList:
result, exitCode = RunProcess(*top.(*MShellList), context)
case *MShellPipe:
result, exitCode = state.RunPipeline(*top.(*MShellPipe), context, stack)
default:
return FailWithMessage(fmt.Sprintf("%d:%d: Cannot execute a non-list object.\n", t.Line, t.Column))
}
Expand Down Expand Up @@ -492,7 +495,6 @@ func (state *EvalState) Evaluate(tokens []Token, stack *MShellStack, context Exe
return FailWithMessage(fmt.Sprintf("%d:%d: Loop quotation needs a minimum of one token.\n", t.Line, t.Column))
}


context := ExecuteContext {
StandardInput: nil,
StandardOutput: nil,
Expand Down Expand Up @@ -595,6 +597,11 @@ func (state *EvalState) Evaluate(tokens []Token, stack *MShellStack, context Exe
}
quoteContext.StandardInput = file
defer file.Close()
} else if context.StandardInput != nil {
quoteContext.StandardInput = context.StandardInput
} else {
// Default to stdin of this process itself
quoteContext.StandardInput = os.Stdin
}

if quotation.StandardOutputFile != "" {
Expand All @@ -604,6 +611,11 @@ func (state *EvalState) Evaluate(tokens []Token, stack *MShellStack, context Exe
}
quoteContext.StandardOutput = file
defer file.Close()
} else if context.StandardOutput != nil {
quoteContext.StandardOutput = context.StandardOutput
} else {
// Default to stdout of this process itself
quoteContext.StandardOutput = os.Stdout
}

result := state.Evaluate(quotation.Tokens, stack, quoteContext)
Expand Down Expand Up @@ -634,6 +646,19 @@ func (state *EvalState) Evaluate(tokens []Token, stack *MShellStack, context Exe
}

stack.Push(&MShellString { state.PositionalArgs[posIndex - 1] })
} else if t.Type == PIPE {
obj1, err := stack.Pop()
if err != nil {
return FailWithMessage(fmt.Sprintf("%d:%d: Cannot do '%s' operation on an empty stack.\n", t.Line, t.Column, t.Lexeme))
}

// obj1 should be a list
list, ok := obj1.(*MShellList)
if !ok {
return FailWithMessage(fmt.Sprintf("%d:%d: Cannot pipe a %s.\n", t.Line, t.Column, obj1.TypeName()))
}

stack.Push( &MShellPipe { *list } )
} else {
return FailWithMessage(fmt.Sprintf("%d:%d: We haven't implemented the token type '%s' yet.\n", t.Line, t.Column, t.Type))
}
Expand All @@ -642,17 +667,68 @@ func (state *EvalState) Evaluate(tokens []Token, stack *MShellStack, context Exe
return EvalResult { true, -1 }
}

type Executable interface {
Execute(state *EvalState, context ExecuteContext, stack *MShellStack) (EvalResult, int)
}


func (list *MShellList) Execute(state *EvalState, context ExecuteContext, stack *MShellStack) (EvalResult, int) {
return RunProcess(*list, context)
}

func (quotation *MShellQuotation) Execute(state *EvalState, context ExecuteContext, stack *MShellStack) (EvalResult, int) {
quotationContext := ExecuteContext {
StandardInput: nil,
StandardOutput: nil,
}

if quotation.StandardInputFile != "" {
file, err := os.Open(quotation.StandardInputFile)
if err != nil {
return FailWithMessage(fmt.Sprintf("Error opening file %s for reading: %s\n", quotation.StandardInputFile, err.Error())), 1
}
quotationContext.StandardInput = file
defer file.Close()
} else if context.StandardInput != nil {
quotationContext.StandardInput = context.StandardInput
} else {
// Default to stdin of this process itself
quotationContext.StandardInput = os.Stdin
}

if quotation.StandardOutputFile != "" {
file, err := os.Create(quotation.StandardOutputFile)
if err != nil {
return FailWithMessage(fmt.Sprintf("Error opening file %s for writing: %s\n", quotation.StandardOutputFile, err.Error())), 1
}
quotationContext.StandardOutput = file
defer file.Close()
} else if context.StandardOutput != nil {
quotationContext.StandardOutput = context.StandardOutput
} else {
// Default to stdout of this process itself
quotationContext.StandardOutput = os.Stdout
}

result := state.Evaluate(quotation.Tokens, stack, quotationContext)
if !result.Success {
return result, 1
} else {
return SimpleSuccess(), 0
}
}


func RunProcess(list MShellList, context ExecuteContext) (EvalResult, int) {
// Check for empty list
if len(list.Items) == 0 {
return FailWithMessage("Cannot execute an empty list.\n"), -1
return FailWithMessage("Cannot execute an empty list.\n"), 1
}

// Check that all list items are commandlineable
for i, item := range list.Items {
if !item.IsCommandLineable() {
return FailWithMessage(fmt.Sprintf("Item %d (%s) cannot be used as a command line argument.\n", i, item.DebugString())), -1
return FailWithMessage(fmt.Sprintf("Item %d (%s) cannot be used as a command line argument.\n", i, item.DebugString())), 1
}
}

Expand All @@ -661,14 +737,13 @@ func RunProcess(list MShellList, context ExecuteContext) (EvalResult, int) {
commandLineArguments[i] = item.CommandLine()
}


cmd := exec.Command(commandLineArguments[0], commandLineArguments[1:]...)

if list.StandardOutputFile != "" {
// Open the file for writing
file, err := os.Create(list.StandardOutputFile)
if err != nil {
return FailWithMessage(fmt.Sprintf("Error opening file %s for writing: %s\n", list.StandardOutputFile, err.Error())), -1
return FailWithMessage(fmt.Sprintf("Error opening file %s for writing: %s\n", list.StandardOutputFile, err.Error())), 1
}
cmd.Stdout = file
defer file.Close()
Expand All @@ -683,7 +758,7 @@ func RunProcess(list MShellList, context ExecuteContext) (EvalResult, int) {
// Open the file for reading
file, err := os.Open(list.StandardInputFile)
if err != nil {
return FailWithMessage(fmt.Sprintf("Error opening file %s for reading: %s\n", list.StandardInputFile, err.Error())), -1
return FailWithMessage(fmt.Sprintf("Error opening file %s for reading: %s\n", list.StandardInputFile, err.Error())), 1
}
cmd.Stdin = file
defer file.Close()
Expand All @@ -697,8 +772,102 @@ func RunProcess(list MShellList, context ExecuteContext) (EvalResult, int) {
// No redirection for stderr currently, just use the stderr of this process
cmd.Stderr = os.Stderr

// fmt.Fprintf(os.Stderr, "Running command: %s\n", cmd.String())
cmd.Run() // Manually deal with the exit code upstream
// fmt.Fprintf(os.Stderr, "Command finished\n")
exitCode := cmd.ProcessState.ExitCode()

return SimpleSuccess(), exitCode
}

func (state *EvalState) RunPipeline(MShellPipe MShellPipe, context ExecuteContext, stack *MShellStack) (EvalResult, int) {
if len(MShellPipe.List.Items) == 0 {
return FailWithMessage("Cannot execute an empty pipe.\n"), 1
}

// Check that all list items are Executables
for i, item := range MShellPipe.List.Items {
if _, ok := item.(Executable); !ok {
return FailWithMessage(fmt.Sprintf("Item %d (%s) in pipe is not a list or a quotation.\n", i, item.DebugString())), 1
}
}

if len(MShellPipe.List.Items) == 1 {
// Just run the Execute on the first item
asExecutable, _ := MShellPipe.List.Items[0].(Executable)
return asExecutable.Execute(state, context, stack)
}

// Have at least 2 items here, create pipeline of Executables, set up list of contexts
contexts := make([]ExecuteContext, len(MShellPipe.List.Items))

pipeReaders := make([]io.Reader, len(MShellPipe.List.Items) - 1)
pipeWriters := make([]io.Writer, len(MShellPipe.List.Items) - 1)

// Set up pipes
for i := 0; i < len(MShellPipe.List.Items) - 1; i++ {
pipeReader, pipeWriter, err := os.Pipe()
if err != nil {
return FailWithMessage(fmt.Sprintf("Error creating pipe: %s\n", err.Error())), 1
}
pipeReaders[i] = pipeReader
pipeWriters[i] = pipeWriter
}

for i := 0; i < len(MShellPipe.List.Items); i++ {
newContext := ExecuteContext {
StandardInput: nil,
StandardOutput: nil,
}

if i == 0 {
// Stdin should use the context of this function
newContext.StandardInput = context.StandardInput
newContext.StandardOutput = pipeWriters[0]
} else if i == len(MShellPipe.List.Items) - 1 {
// Stdout should use the context of this function
newContext.StandardInput = pipeReaders[len(pipeReaders) - 1]
newContext.StandardOutput = context.StandardOutput
} else {
newContext.StandardInput = pipeReaders[i - 1]
newContext.StandardOutput = pipeWriters[i]
}

contexts[i] = newContext
}

// Run the executables concurrently
var wg sync.WaitGroup
results := make([]EvalResult, len(MShellPipe.List.Items))
exitCodes := make([]int, len(MShellPipe.List.Items))

for i, item := range MShellPipe.List.Items {
wg.Add(1)
go func(i int, item Executable) {
defer wg.Done()
// fmt.Fprintf(os.Stderr, "Running item %d\n", i)
results[i], exitCodes[i] = item.Execute(state, contexts[i], stack)

// Close pipe ends that are no longer needed
if i > 0 {
pipeReaders[i-1].(io.Closer).Close()
}
if i < len(MShellPipe.List.Items)-1 {
pipeWriters[i].(io.Closer).Close()
}
}(i, item.(Executable))
}

// Wait for all processes to complete
wg.Wait()

// Check for errors
for i, result := range results {
if !result.Success {
return result, exitCodes[i]
}
}

// Return the exit code of the last item
return SimpleSuccess(), exitCodes[len(exitCodes) - 1]
}
2 changes: 1 addition & 1 deletion mshell/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ public EvalResult Evaluate(List<TokenNew> tokens, Stack<MShellObject> stack, Exe
{
if (qList.Items.Count < 2)
{
return FailWithMessage("Quotation list for if should have a minimum of 2 elements.\n");
return FailWithMessage($"{t.Line}:{t.Column}: If statement requires at least two arguments. Found {qList.Items.Count}.\n");
}

if (qList.Items.Any(i => !i.IsQuotation))
Expand Down
2 changes: 1 addition & 1 deletion mshell/tests/if_fail.msh.stderr
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Quotation list for if should have a minimum of 2 elements.
1:8: If statement requires at least two arguments. Found 1.

0 comments on commit 8e74627

Please sign in to comment.