This repository has been archived by the owner on Oct 13, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reply_reader.go
112 lines (95 loc) · 3.16 KB
/
reply_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package netconf
import (
"bytes"
"encoding/xml"
"io"
"time"
"unicode"
)
// TODO: add ReplyReader.Reset method
// ReplyReader reads exactly one RPC reply from the session,
// and discards the message separator. If multiple RPCs need to
// be read from the session, multiple ReplyReaders will be required.
// The io.EOF error is returned on every read after the NETCONF message
// separator is encountered. This is how ReplyReader is able to satisfy
// the strict interpretation of the io.Reader interface.
type ReplyReader struct {
session io.Reader // attached to stdout of netconf session
err error // once an error is generated, always return it on subsequent calls
}
// NewReplyReader assumes the given reader reads from
// a NETCONF session's stdout, and adapts its behavior to
// a standard io.Reader, allowing it to work with standard
// library methods and functions.
// It is intended to read exactly one RPC reply, however
// it can be reused after calling the Reset method.
func NewReplyReader(session io.Reader) *ReplyReader {
return &ReplyReader{
session: session,
}
}
// Read implements the io.Reader interface by returning io.EOF
// whenever the standard NETCONF message separator is found in
// the byte stream.
func (rr *ReplyReader) Read(p []byte) (n int, err error) {
if rr.err != nil {
return 0, rr.err
}
n, rr.err = rr.session.Read(p)
bTrim := bytes.TrimRightFunc(p[:n], unicode.IsSpace)
if bytes.HasSuffix(bTrim, messageSeparatorBytes) {
n = bytes.LastIndex(bTrim, messageSeparatorBytes)
rr.err = io.EOF
}
return n, rr.err
}
// Reset clears the internal error field, allowing
// this reader to be reused.
func (rr *ReplyReader) Reset() {
rr.err = nil
}
// WithDeadline decorates the ReplyReader with a DeadlineReader.
// The DeadlineReader sets its deadline before every call to Read.
func (rr *ReplyReader) WithDeadline(deadline time.Duration) *DeadlineReader {
return &DeadlineReader{
reader: rr,
deadline: deadline,
}
}
// DeadlineReader is a decorator for an io.Reader that sets a deadline
// before every read. It can only be constructed by a ReplyReader's
// WithDeadline method.
type DeadlineReader struct {
reader io.Reader // NETCONF session's stdout reader
deadline time.Duration // deadline to set before every call to Read
}
// Read sets a deadline before every call to Read, and returns a DeadlineError
// if reading is not complete before the configured deadline expires.
// It is recommended that you close the session upon receipt of a DeadlineError,
// otherwise a subsequent read will return whatever the NETCONF server wrote to
// its stdout stream after the deadline expired.
func (dr *DeadlineReader) Read(b []byte) (n int, err error) {
var begin time.Time
timer := time.NewTimer(dr.deadline)
defer timer.Stop()
ch := make(chan struct{})
go func() {
begin = time.Now()
n, err = dr.reader.Read(b)
ch <- struct{}{}
}()
select {
case <-ch:
return n, err
case timeDone := <-timer.C:
return n, &DeadlineError{
Op: "read",
BeginTime: begin,
FailTime: timeDone,
Deadline: dr.deadline,
}
}
}
func (dr *DeadlineReader) AsDecoder() *Decoder {
return &Decoder{Decoder: xml.NewDecoder(dr)}
}