Skip to content

Commit

Permalink
Propagation of file status
Browse files Browse the repository at this point in the history
  • Loading branch information
balazsgrill committed Sep 4, 2024
1 parent 0484198 commit 639b5e0
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 17 deletions.
34 changes: 28 additions & 6 deletions bindings/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (f closerFunc) Close() error {
return f()
}

func BindVirtualizationInstance(id string, localpath string, remotefs afero.Fs, logger zerolog.Logger, statecallback func(error)) (io.Closer, error) {
func BindVirtualizationInstance(id string, localpath string, remotefs afero.Fs, logger zerolog.Logger, statecallback func(win.ConnectionState)) (io.Closer, error) {
var closer win.Virtualization
var err error
if UseCFAPI {
Expand All @@ -132,14 +132,36 @@ func BindVirtualizationInstance(id string, localpath string, remotefs afero.Fs,
return nil, err
}

internalSynchronize := func() {
statecallback(win.ConnectionState{
ID: id,
SyncInProgress: true,
LastSyncError: nil,
})
err = closer.PerformSynchronization()
if err != nil {
logger.Err(err).Send()
statecallback(win.ConnectionState{
ID: id,
SyncInProgress: false,
LastSyncError: err,
})
} else {
statecallback(win.ConnectionState{
ID: id,
SyncInProgress: false,
LastSyncError: nil,
})
}
}

// initial sync
internalSynchronize()

t := time.NewTicker(30 * time.Second)
go func() {
for range t.C {
err = closer.PerformSynchronization()
if err != nil {
logger.Err(err).Send()
}
statecallback(err)
internalSynchronize()
}
}()

Expand Down
4 changes: 2 additions & 2 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func main() {
keys, _ := mgr.InstanceList()
for _, keyname := range keys {
go func(keyname string) {
err := mgr.StartInstance(keyname, ui.Logger, func(err error) {
if err != nil {
err := mgr.StartInstance(keyname, ui.Logger, func(state win.ConnectionState) {
if state.LastSyncError != nil {
ui.Logger.Err(err).Msgf("%s is offline %v", keyname, err)
}
})
Expand Down
5 changes: 3 additions & 2 deletions cmd/main/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"

"github.com/balazsgrill/potatodrive/bindings"
"github.com/balazsgrill/potatodrive/win"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"golang.org/x/sys/windows/registry"
Expand Down Expand Up @@ -43,7 +44,7 @@ func initLogger() (string, zerolog.Logger, io.Closer) {
return logfilepath, log.Output(zerolog.MultiLevelWriter(logf, zerolog.NewConsoleWriter())).With().Timestamp().Logger(), logf
}

func startInstance(parentkey registry.Key, keyname string, logger zerolog.Logger, statecallback func(error)) (io.Closer, error) {
func startInstance(parentkey registry.Key, keyname string, logger zerolog.Logger, statecallback func(win.ConnectionState)) (io.Closer, error) {
key, err := registry.OpenKey(parentkey, keyname, registry.QUERY_VALUE)
if err != nil {
logger.Printf("Open key: %v", err)
Expand Down Expand Up @@ -111,7 +112,7 @@ func (m *Manager) InstanceList() ([]string, error) {
return m.keylist, nil
}

func (m *Manager) StartInstance(id string, logger zerolog.Logger, statecallback func(error)) error {
func (m *Manager) StartInstance(id string, logger zerolog.Logger, statecallback func(win.ConnectionState)) error {
instance, err := startInstance(m.parentkey, id, logger, statecallback)
if err != nil {
return err
Expand Down
61 changes: 61 additions & 0 deletions ui/statuswindow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package ui

import (
"log"

"github.com/balazsgrill/potatodrive/win"
"github.com/lxn/walk"
. "github.com/lxn/walk/declarative"
)

type StatusList struct {
walk.ReflectListModelBase
statuses []win.FileSyncState
pathToStatus map[string]int
}

func NewStatusList() *StatusList {
return &StatusList{
pathToStatus: make(map[string]int),
}
}

func (sl *StatusList) AddState(state win.FileSyncState) {
currentindex, exists := sl.pathToStatus[state.Path]
if exists {
sl.statuses[currentindex] = state
} else {
sl.statuses = append(sl.statuses, state)
sl.pathToStatus[state.Path] = len(sl.statuses) - 1
}
}

func StatusWindow() {
var mw *walk.MainWindow
var lb *walk.ListBox

if err := (MainWindow{
AssignTo: &mw,
Title: "PotatoDrive status",
MinSize: Size{200, 200},
Size: Size{800, 600},
Font: Font{Family: "Segoe UI", PointSize: 9},
Layout: VBox{},
Children: []Widget{
Composite{
DoubleBuffering: true,
Layout: VBox{},
Children: []Widget{
ListBox{
AssignTo: &lb,
MultiSelection: true,
Model: model,

Check failure on line 52 in ui/statuswindow.go

View workflow job for this annotation

GitHub Actions / build

undefined: model
ItemStyler: styler,

Check failure on line 53 in ui/statuswindow.go

View workflow job for this annotation

GitHub Actions / build

undefined: styler
},
},
},
},
}).Create(); err != nil {
log.Fatal(err)
}
}
17 changes: 15 additions & 2 deletions win/cfapi/filesystem/fetchdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import (

const BUFFER_SIZE int64 = 1024 * 1024

func (instance *VirtualizationInstance) callback_getFilePath(info *cfapi.CF_CALLBACK_INFO) string {
return win.GetString(info.VolumeDosName) + win.GetString(info.NormalizedPath)
}

func (instance *VirtualizationInstance) callback_getRemoteFilePath(info *cfapi.CF_CALLBACK_INFO) string {
return instance.path_localToRemote(win.GetString(info.VolumeDosName) + win.GetString(info.NormalizedPath))
return instance.path_localToRemote(instance.callback_getFilePath(info))
}

type transferBuffer struct {
Expand Down Expand Up @@ -50,12 +54,16 @@ func (tb *transferBuffer) send(updatehash hash.Hash) error {
func (instance *VirtualizationInstance) fetchData(info *cfapi.CF_CALLBACK_INFO, data *cfapi.CF_CALLBACK_PARAMETERS_FetchData) uintptr {
instance.lock.Lock()
defer instance.lock.Unlock()
filename := instance.callback_getRemoteFilePath(info)
localpath := instance.callback_getFilePath(info)
instance.NotifyFileState(localpath, win.FileSyncStateDownloading)

filename := instance.path_localToRemote(localpath)
length := data.RequiredLength
byteOffset := data.RequiredFileOffset
remoteinfo, err := instance.fs.Stat(filename)
if err != nil {
instance.Logger.Error().Msgf("Remote file is inaccessible %s: %s", filename, err)
instance.NotifyFileError(localpath, err)
return uintptr(syscall.EIO)
}
if length == 0 || length < 0 {
Expand All @@ -77,6 +85,7 @@ func (instance *VirtualizationInstance) fetchData(info *cfapi.CF_CALLBACK_INFO,
file, err := instance.fs.Open(filename)
if err != nil {
instance.Logger.Error().Msgf("Error opening file %s: %s", filename, err)
instance.NotifyFileError(localpath, err)
return uintptr(syscall.EIO)
}
defer file.Close()
Expand All @@ -101,6 +110,8 @@ func (instance *VirtualizationInstance) fetchData(info *cfapi.CF_CALLBACK_INFO,
if tb.count >= BUFFER_SIZE {
err = tb.send(updatehash)
if err != nil {
instance.Logger.Error().Msgf("Error computing file hash %s: %s", filename, err)
instance.NotifyFileError(localpath, err)
return uintptr(syscall.EIO)
}
}
Expand All @@ -109,6 +120,7 @@ func (instance *VirtualizationInstance) fetchData(info *cfapi.CF_CALLBACK_INFO,
instance.Logger.Debug().Msgf("Read %d bytes", count)
if err != nil {
instance.Logger.Error().Msgf("Error reading file %s: %s", filename, err)
instance.NotifyFileError(localpath, err)
return uintptr(syscall.EIO)
}
if updatehash != nil {
Expand All @@ -117,6 +129,7 @@ func (instance *VirtualizationInstance) fetchData(info *cfapi.CF_CALLBACK_INFO,
instance.Logger.Warn().Msgf("Error updating state cache %s: %s", filename, err)
}
}
instance.NotifyFileState(localpath, win.FileSyncStateDone)

return 0
}
24 changes: 24 additions & 0 deletions win/cfapi/filesystem/filestate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package filesystem

import "github.com/balazsgrill/potatodrive/win"

func (instance *VirtualizationInstance) NotifyFileState(path string, state win.FileSyncStateEnum) {
if instance.handler == nil {
return
}
instance.handler(win.FileSyncState{
Path: path,
State: state,
})
}

func (instance *VirtualizationInstance) NotifyFileError(path string, err error) {
if instance.handler == nil {
return
}
instance.handler(win.FileSyncState{
Path: path,
State: win.FileSyncStateError,
LastError: err,
})
}
9 changes: 5 additions & 4 deletions win/cfapi/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ type VirtualizationInstance struct {
connectionKey cfapi.CF_CONNECTION_KEY
lock sync.Mutex
watcher *fsnotify.Watcher
handler func(state win.FileSyncState)
}

func (instance *VirtualizationInstance) SetFileStateHandler(handler func(state win.FileSyncState)) {
instance.handler = handler
}

func StartProjecting(rootPath string, filesystem afero.Fs, logger zerolog.Logger) (win.Virtualization, error) {
Expand Down Expand Up @@ -70,10 +75,6 @@ func (instance *VirtualizationInstance) start() error {
instance.watcher.Add(instance.rootPath)
go instance.watch()

err = instance.PerformSynchronization()
if err != nil {
instance.Logger.Printf("Initial synchronization failed %v", err)
}
return nil
}

Expand Down
15 changes: 14 additions & 1 deletion win/cfapi/filesystem/synclocaltoremote.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"strings"

"github.com/balazsgrill/potatodrive/win"
"github.com/balazsgrill/potatodrive/win/cfapi"
)

Expand Down Expand Up @@ -77,10 +78,15 @@ func (instance *VirtualizationInstance) syncLocalToRemote() error {
localisnewer := os.IsNotExist(err) || (localinfo.ModTime().UTC().Unix() > remoteinfo.ModTime().UTC().Unix())

if localisnewer {
// TODO Add file to queue instead of doing it here
instance.NotifyFileState(localpath, win.FileSyncStateUploading)
instance.Logger.Info().Msgf("Updating remote file '%s'", path)
err = instance.streamLocalToRemote(path)
if err != nil {
instance.NotifyFileError(localpath, err)
return err
} else {
instance.NotifyFileState(localpath, win.FileSyncStateDone)
}
}
// mark file as in-sync
Expand All @@ -89,7 +95,14 @@ func (instance *VirtualizationInstance) syncLocalToRemote() error {
}

if deleted {
return os.Remove(localpath)
err := os.Remove(localpath)
if err != nil {
instance.NotifyFileError(localpath, err)
return err
} else {
instance.NotifyFileState(localpath, win.FileSyncStateDeleted)
}
return err
}
return nil
})
Expand Down
1 change: 1 addition & 0 deletions win/cfapi/filesystem/syncremotetolocal.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (instance *VirtualizationInstance) syncRemoteToLocal() error {
if hr != 0 {
return win.ErrorByCode(hr)
}
instance.NotifyFileState(localpath, win.FileSyncStateDirty)

}

Expand Down
3 changes: 3 additions & 0 deletions win/projfs/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type enumerationSession struct {
wildcard bool
}

func (*VirtualizationInstance) SetFileStateHandler(handler func(state win.FileSyncState)) {}

func (instance *VirtualizationInstance) Close() error {
if instance._instanceHandle == 0 {
return errors.New("not started")
Expand Down Expand Up @@ -126,6 +128,7 @@ func (instance *VirtualizationInstance) path_getNameLocal(path string) string {
}

func (instance *VirtualizationInstance) PerformSynchronization() error {
// TODO propagate file sync state
err := instance.syncLocalToRemote()
if err != nil {
return err
Expand Down
26 changes: 26 additions & 0 deletions win/virtualization.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
type Virtualization interface {
io.Closer
PerformSynchronization() error
SetFileStateHandler(handler func(state FileSyncState))
}

func BytesToGuid(b []byte) *syscall.GUID {
Expand All @@ -19,3 +20,28 @@ func BytesToGuid(b []byte) *syscall.GUID {
Data4: ([8]byte)(b[8:16]),
}
}

type ConnectionState struct {
ID string
SyncInProgress bool
LastSyncError error
}

type FileSyncStateEnum int

const (
FileSyncStateUnknown FileSyncStateEnum = 0
FileSyncStatePending FileSyncStateEnum = 1
FileSyncStateUploading FileSyncStateEnum = 2
FileSyncStateDownloading FileSyncStateEnum = 3
FileSyncStateDone FileSyncStateEnum = 4
FileSyncStateDeleted FileSyncStateEnum = 5
FileSyncStateDirty FileSyncStateEnum = 6
FileSyncStateError FileSyncStateEnum = 7
)

type FileSyncState struct {
Path string
State FileSyncStateEnum
LastError error
}

0 comments on commit 639b5e0

Please sign in to comment.