From baa1d5d22c099d2911a25b67dad1b538f602323f Mon Sep 17 00:00:00 2001 From: Ahmed Gheith Date: Wed, 31 Mar 2021 18:00:04 -0500 Subject: [PATCH] NotifyWatcher --- .gitignore | 2 + build.sc | 11 +- os/src/Internals.scala | 4 + os/watch/src/inotify/Errno.scala | 8 + os/watch/src/inotify/Event.scala | 31 ++++ os/watch/src/inotify/Mask.scala | 60 +++++++ os/watch/src/inotify/Notify.scala | 101 +++++++++++ os/watch/src/inotify/NotifyException.scala | 5 + os/watch/src/inotify/NotifyWatcher.scala | 168 +++++++++++++++++++ os/watch/src/package.scala | 12 +- os/watch/test/src/Locked.scala | 8 + os/watch/test/src/RandomTests.scala | 175 ++++++++++++++++++++ os/watch/test/src/TestEntry.scala | 50 ++++++ os/watch/test/src/inotify/NotifyTests.scala | 59 +++++++ 14 files changed, 690 insertions(+), 4 deletions(-) create mode 100644 os/watch/src/inotify/Errno.scala create mode 100644 os/watch/src/inotify/Event.scala create mode 100644 os/watch/src/inotify/Mask.scala create mode 100644 os/watch/src/inotify/Notify.scala create mode 100644 os/watch/src/inotify/NotifyException.scala create mode 100644 os/watch/src/inotify/NotifyWatcher.scala create mode 100644 os/watch/test/src/Locked.scala create mode 100644 os/watch/test/src/RandomTests.scala create mode 100644 os/watch/test/src/TestEntry.scala create mode 100644 os/watch/test/src/inotify/NotifyTests.scala diff --git a/.gitignore b/.gitignore index 5771059b..a002e4ca 100644 --- a/.gitignore +++ b/.gitignore @@ -6,8 +6,10 @@ target/ .project .cache .sbtserver +.bsp/ project/.sbtserver tags nohup.out out lowered.hnir +random_data/ diff --git a/build.sc b/build.sc index 08417564..5359097e 100644 --- a/build.sc +++ b/build.sc @@ -50,7 +50,8 @@ object os extends Module { def platformSegment = "jvm" def moduleDeps = super.moduleDeps :+ os.jvm() def ivyDeps = Agg( - ivy"net.java.dev.jna:jna:5.0.0" + ivy"net.java.dev.jna:jna:5.0.0", + ivy"com.lihaoyi::sourcecode::0.2.5", ) object test extends Tests with OsLibTestModule { def platformSegment = "jvm" @@ -107,7 +108,11 @@ trait OsLibModule extends CrossScalaModule with PublishModule{ trait OsLibTestModule extends ScalaModule with TestModule{ def ivyDeps = Agg( ivy"com.lihaoyi::utest::0.7.8", - ivy"com.lihaoyi::sourcecode::0.2.5" + ivy"com.lihaoyi::sourcecode::0.2.5", + if (scalaVersion().startsWith("2.11")) + ivy"org.scalacheck::scalacheck::1.15.2" + else + ivy"org.scalacheck::scalacheck::1.15.3" ) def platformSegment: String @@ -116,7 +121,7 @@ trait OsLibTestModule extends ScalaModule with TestModule{ millSourcePath / s"src-$platformSegment" ) - def testFrameworks = Seq("utest.runner.Framework") + def testFrameworks = Seq("utest.runner.Framework", "org.scalacheck.ScalaCheckFramework") // we check the textual output of system commands and expect it in english override def forkEnv: Target[Map[String, String]] = super.forkEnv() ++ Map("LC_ALL" -> "C") } diff --git a/os/src/Internals.scala b/os/src/Internals.scala index 8a3b82ad..ab623890 100644 --- a/os/src/Internals.scala +++ b/os/src/Internals.scala @@ -22,4 +22,8 @@ object Internals{ src, dest.write(_, 0, _) ) + + def linux() = { + System.getProperty("os.name") == "Linux" + } } diff --git a/os/watch/src/inotify/Errno.scala b/os/watch/src/inotify/Errno.scala new file mode 100644 index 00000000..b5f9fb46 --- /dev/null +++ b/os/watch/src/inotify/Errno.scala @@ -0,0 +1,8 @@ +package os.watch.inotify + +object Errno { + + val EAGAIN: Int = 11 + val ENOTDIR: Int = 20 + +} diff --git a/os/watch/src/inotify/Event.scala b/os/watch/src/inotify/Event.scala new file mode 100644 index 00000000..beff6a8d --- /dev/null +++ b/os/watch/src/inotify/Event.scala @@ -0,0 +1,31 @@ +package os.watch.inotify + +import java.nio.ByteBuffer + +case class Event(val wd: Int, val mask: Mask, val cookie: Int, val name: String) { + + + +} + +object Event { + def apply(buf: ByteBuffer): Event = { + val wd = buf.getInt + val mask = buf.getInt + val cookie = buf.getInt + val len = buf.getInt + val sb = new StringBuilder() + + var i = 0 + + while (i < len) { + val b = buf.get() + if (b != 0) { + sb.append(b.toChar) + } + i += 1 + } + + Event(wd, Mask(mask), cookie, sb.toString) + } +} diff --git a/os/watch/src/inotify/Mask.scala b/os/watch/src/inotify/Mask.scala new file mode 100644 index 00000000..cb8c7391 --- /dev/null +++ b/os/watch/src/inotify/Mask.scala @@ -0,0 +1,60 @@ +package os.watch.inotify + +import scala.collection.mutable + +case class Mask(mask: Int) { + def |(rhs: Mask): Mask = Mask(mask|rhs.mask) + def contains(rhs: Mask) : Boolean = (mask & rhs.mask) == rhs.mask + + override def toString: String = { + + val things = Mask.named_masks.toList.sortBy(_._1).flatMap { case(name,m) => + if (this.contains(m)) Seq(name) else Seq() + }.mkString("+") + + f"Mask($mask%08x = $things)" + } +} + +object Mask { + val named_masks : mutable.Map[String,Mask] = mutable.Map() + + private def named(bit: Int)(implicit name: sourcecode.Name): Mask = { + val a = Mask(1 << bit) + named_masks.put(name.value,a) + a + } + + val access: Mask = named(0) + val modify: Mask = named(1) + val attrib: Mask = named(2) + val close_write: Mask = named(3) + val close_nowrite: Mask = named(4) + val open: Mask = named(5) + val move_from: Mask = named(6) + val move_to: Mask = named(7) + val create: Mask = named(8) + val delete: Mask = named(9) + val delete_self: Mask = named(10) + + val unmount: Mask = named(13) + val overflow: Mask = named(14) + val ignored: Mask = named(15) + + val only_dir: Mask = named(24) + val do_not_follow: Mask = named(25) + val exclude_unlink: Mask = named(26) + + val mask_create: Mask = named(28) + val mask_add: Mask = named(29) + val is_dir: Mask = named(30) + val one_shot: Mask = named(31) + + + val close : Mask = close_write | close_nowrite + val move : Mask = move_from | move_to + + val all_events = access | modify | attrib | + close | open | move | create | + delete | delete_self | unmount +} diff --git a/os/watch/src/inotify/Notify.scala b/os/watch/src/inotify/Notify.scala new file mode 100644 index 00000000..b2b40db3 --- /dev/null +++ b/os/watch/src/inotify/Notify.scala @@ -0,0 +1,101 @@ +package os.watch.inotify + +import com.sun.jna.{LastErrorException, Library, Native} +import geny.Generator + +import java.nio.{ByteBuffer, ByteOrder} +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.tailrec +import scala.collection.mutable +import scala.util.Try + +trait Notify extends Library { + //@throws[LastErrorException] + def inotify_init() : Int; + + //@throws[LastErrorException] + def inotify_init1(flags: Int) : Int + + //@throws[LastErrorException] + def inotify_add_watch(fd: Int, path: String, mask: Int): Int + + //@throws[LastErrorException] + def inotify_rm_watch(fd: Int, wd: Int): Int + + //@throws[LastErrorException] + def read(fd: Int, buf: Array[Byte], count: Long): Long + + //@throws[LastErrorException] + def close(fd: Int): Int +} + +object Notify { + val it : Notify = Native.load("c",classOf[Notify]) + + val O_NONBLOCK: Int = 1 << 11 + + + import Generator._ + + // convenience + def add_watch(fd: Int, path : os.Path, actions: Mask) : Int = { + it.inotify_add_watch(fd,path.toString,actions.mask) + } + + def events(buf: ByteBuffer): Generator[Event] = new Generator[Event]() { + override def generate(handleItem: Event => Action): Action = { + while (buf.hasRemaining) { + if (handleItem(Event(buf)) == End) return End + } + Continue + } + } + + def buffers(fd: => AtomicReference[Option[Int]]): Generator[ByteBuffer] = { + new Generator[ByteBuffer] { + override def generate(handleItem: ByteBuffer => Action): Action = { + + @tailrec + def loop(arr: Array[Byte]): Action = fd.get match { + case Some(fd) => + it.read(fd, arr, arr.length) match { + case 0 => + Continue + case n if n < 0 => + val errno = Native.getLastError() + if (errno == Errno.EAGAIN) { + Thread.sleep(10) + loop(arr) + } else { + throw new NotifyException(s"read error ${Native.getLastError()}, fd = $fd") + } + //throw new Exception (s"n = $n") + case n => + val buf = ByteBuffer.wrap(arr, 0, n.toInt).order(ByteOrder.nativeOrder()) + if (handleItem(buf) == End) { + End + } else { + loop(arr) + } + } + case None => + End + } + + + loop(Array.fill[Byte](1000)(0)) + } + } + } + + def events(fd: AtomicReference[Option[Int]]): Generator[Event] = for { + b <- buffers(fd) + e <- events(b) + } yield e + + + + + +} + diff --git a/os/watch/src/inotify/NotifyException.scala b/os/watch/src/inotify/NotifyException.scala new file mode 100644 index 00000000..862f73f5 --- /dev/null +++ b/os/watch/src/inotify/NotifyException.scala @@ -0,0 +1,5 @@ +package os.watch.inotify + +class NotifyException(msg: String) extends Exception(msg) { + +} diff --git a/os/watch/src/inotify/NotifyWatcher.scala b/os/watch/src/inotify/NotifyWatcher.scala new file mode 100644 index 00000000..bccca51d --- /dev/null +++ b/os/watch/src/inotify/NotifyWatcher.scala @@ -0,0 +1,168 @@ +package os.watch.inotify + +import com.sun.jna.{LastErrorException, Native} +import com.sun.nio.file.SensitivityWatchEventModifier +import os.watch.Watcher + +import java.io.IOException +import java.nio.file.StandardWatchEventKinds.{ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY, OVERFLOW} +import java.nio.file.{ClosedWatchServiceException, FileSystems, WatchEvent, WatchKey} +import java.util.concurrent.atomic.AtomicReference +import scala.collection.mutable +import scala.util.control.NonFatal + +class WatchedPaths { + private val pathToId = mutable.Map[os.Path,Int]() + private val idToPath = mutable.Map[Int,os.Path]() + + def put(p: os.Path, id: Int) = synchronized { + pathToId.put(p,id) + idToPath.put(id,p) + } + + def get(path: os.Path) : Option[Int] = this.synchronized { + pathToId.get(path) + } + + def get(wd: Int) : Option[os.Path] = this.synchronized { + idToPath.get(wd) + } + + def remove(p: os.Path) : Option[Int] = this.synchronized { + val wd = pathToId.remove(p) + wd.foreach { wd => idToPath.remove(wd) } + wd + } +} + +class NotifyWatcher(roots: Seq[os.Path], + onEvent: Set[os.Path] => Unit, + logger: (String, Any) => Unit = (_, _) => ()) extends Watcher{ + + val fd = new AtomicReference(Option(Notify.it.inotify_init1(Notify.O_NONBLOCK))) + if (fd.get.get < 0) { + throw new NotifyException(s"inotify failed") + } + val currentlyWatchedPaths = new WatchedPaths() + val newlyWatchedPaths = mutable.Buffer.empty[os.Path] + val bufferedEvents = mutable.Set.empty[os.Path] + + roots.foreach(watchSinglePath) + recursiveWatches() + + bufferedEvents.clear() + + def fatal(msg: String): Unit = { + //fd = -1 + throw new NotifyException(msg) + } + + def watchSinglePath(p: os.Path) = fd.get.foreach { fd => + //pr + val wd = Notify.add_watch(fd, p, + Mask.create | + Mask.delete | + Mask.modify | + Mask.move_from | + Mask.move_to | + Mask.do_not_follow | + Mask.only_dir | + Mask.mask_add + ) + + bufferedEvents.add(p) + + if (wd < 0) { + val err = Native.getLastError() + if (err != Errno.ENOTDIR) { + fatal(s"error#$err watchSinglePath($p)") + } + } else { + logger("WATCH", (p, wd)) + currentlyWatchedPaths.put(p, wd) + newlyWatchedPaths.append(p) + + } + } + + def processEvent(event: Event) = { + //println(event) + currentlyWatchedPaths.get(event.wd).foreach { base => + + logger("WATCH PATH", base) + + //println(s" $base $event") + + val p = if (event.name == "") base else base / event.name + + // events that don't need to be reported + val ignored = event.mask.contains(Mask.ignored) + val folder_update = event.mask.contains(Mask.is_dir | Mask.modify) + val should_report = (!ignored) && (!folder_update) + + if (should_report) { + bufferedEvents.add(p) + } + + if (event.mask.contains(Mask.create) || event.mask.contains(Mask.move_to)) { + watchSinglePath(p) + } else if (event.mask.contains(Mask.delete) || + event.mask.contains(Mask.ignored) || + event.mask.contains(Mask.move_from)) { + //println(s" removing watch for $p") + currentlyWatchedPaths.remove(p).foreach { wd => + fd.get.foreach(fd => Notify.it.inotify_rm_watch(fd, wd)) + } + + } + } + } + + def recursiveWatches() = { + while(newlyWatchedPaths.nonEmpty){ + val top = newlyWatchedPaths.remove(newlyWatchedPaths.length - 1) + val listing = try os.list(top) catch {case e: java.nio.file.NotDirectoryException => Nil } + for(p <- listing) watchSinglePath(p) + bufferedEvents.add(top) + } + } + + def run(): Unit = { + try { + logger("WATCH CURRENT", currentlyWatchedPaths) + Notify.events(fd).foreach { event => + logger("WATCH KEY",event) + + if (event.mask.contains(Mask.overflow)) { + throw new Exception(s"scate events ${event.toString}") + } + + processEvent(event) + + recursiveWatches() + triggerListener() + } + } catch { + case e: InterruptedException => + close() + case e: NotifyException if fd.get.isEmpty => + /* ignore */ + case NonFatal(e) => + e.printStackTrace() + close() + } + } + + def close(): Unit = { + fd.get.foreach { it => + fd.set(None) + Notify.it.close(it) + } + } + + private def triggerListener(): Unit = { + logger("TRIGGER", bufferedEvents.toSet) + onEvent(bufferedEvents.toSet) + bufferedEvents.clear() + } +} \ No newline at end of file diff --git a/os/watch/src/package.scala b/os/watch/src/package.scala index 67a30e47..0a9e9fc0 100644 --- a/os/watch/src/package.scala +++ b/os/watch/src/package.scala @@ -1,5 +1,7 @@ package os +import os.watch.inotify.NotifyWatcher + package object watch{ /** * Efficiently watches the given `roots` folders for changes. Any time the @@ -31,7 +33,15 @@ package object watch{ onEvent: Set[os.Path] => Unit, logger: (String, Any) => Unit = (_, _) => ()): AutoCloseable = { val watcher = System.getProperty("os.name") match{ - case "Linux" => new os.watch.WatchServiceWatcher(roots, onEvent, logger) + case "Linux" => + if (sys.env.get("OS_WATCH_OLD") + .map(_.toLowerCase.trim.take(1)) + .toSet + .intersect(Set("1","y","t")).nonEmpty) { + new WatchServiceWatcher(roots, onEvent, logger) + } else { + new NotifyWatcher(roots, onEvent, logger) + } case "Mac OS X" => new os.watch.FSEventsWatcher(roots, onEvent, logger, 0.05) case osName => throw new Exception(s"watch not supported on operating system: $osName") } diff --git a/os/watch/test/src/Locked.scala b/os/watch/test/src/Locked.scala new file mode 100644 index 00000000..ce30f459 --- /dev/null +++ b/os/watch/test/src/Locked.scala @@ -0,0 +1,8 @@ +package os.watch + +case class Locked[A <: AnyRef](private val it : A) { + def apply[B](f: A => B): B = it.synchronized { + f(it) + } + +} diff --git a/os/watch/test/src/RandomTests.scala b/os/watch/test/src/RandomTests.scala new file mode 100644 index 00000000..f77c1ac2 --- /dev/null +++ b/os/watch/test/src/RandomTests.scala @@ -0,0 +1,175 @@ +package os.watch + +import java.util.concurrent.atomic.AtomicInteger +import org.scalacheck.Prop.forAll +import org.scalacheck.Properties + +import scala.util.control.NonFatal +import scala.util.Random +//import upickle.default._ + +import scala.collection.mutable + +object RandomTests extends Properties("os.watch") { + + val base_dir : os.Path = os.pwd / "random_data" + os.remove.all(base_dir) + os.makeDir(base_dir) + + def show[A : Ordering](what: String, in: Iterable[A]): Unit = { + println(what) + in.toList.sorted.foreach { p => + println(s" $p") + } + } + + val run_counter = new AtomicInteger(0) + + case class Comparison(expected: Set[os.Path], run: Run) { + def compare(): Boolean = { + val actual = run.actual(_.toSet) + val id = run.id + if (expected != actual) { + val ok_file = base_dir / f"$id%04d.expected" + val out_file = base_dir / f"$id%04d.actual" + System.err.println(s"look for differences in $ok_file and $out_file") + os.write(ok_file,expected.toList.sorted.mkString("\n")) + os.write(out_file,actual.toList.sorted.mkString("\n")) + } + + expected == actual + } + } + + val comparisons = mutable.Buffer[Comparison]() + + class Run() { + val actual = Locked(mutable.Set[os.Path]()) + val id = run_counter.getAndIncrement() + val dir = base_dir / f"run_$id%04d" + os.makeDir(dir) + + private var watch : AutoCloseable = null + + def observe[A](f: => A): A = { + watch = os.watch.watch(Seq(dir), { paths => + paths.foreach(p => actual(_.add(p))) + }) + f + } + + def check(expected: Set[os.Path]): Boolean = { + var i = 0 + + while ((expected.size > actual(_.size)) && (i < 10000)) { + Thread.sleep(1) + i += 1 + } + + watch.close() + + val c = Comparison(expected,this) + + val out = c.compare() + if (out) { + comparisons.appendAll(Seq(c)) + } + out + } + } + + //val runs = mutable.Buffer[(Run,Set[os.Path])]() + + if (os.Internals.linux()) { + property("create") = forAll(TestDir.gen(100)) { d => + val run = new Run() + //println(s"create#${run.id}") + val made = run.observe { + d.make(run.dir, 0) + } + run.check(made.toSet) + } && { + val out = comparisons.forall(_.compare()) + comparisons.clear() + out + } + + property("rm") = forAll(TestDir.gen(100)) { d => + val run = new Run() + //println(s"rm#${run.id}") + val expected = d.make(run.dir, 0).toSet + run.observe { + os.remove.all(run.dir) + } + run.check(expected) + } && { + val out = comparisons.forall(_.compare()) + comparisons.clear() + out + } + + property("update") = forAll(TestDir.gen(100)) { d => + val run = new Run() + //println(s"update#${run.id}") + val files = d.make(run.dir, 0).filter(p => os.isFile(p)) + val shuffled = Random.shuffle(files) + run.observe { + shuffled.foreach { p => os.write.over(p, "") } + } + run.check(files.toSet) + } && { + val out = comparisons.forall(_.compare()) + comparisons.clear() + out + } + + property("move_files") = forAll(TestDir.gen(100)) { d => + val run = new Run() + //println(s"move_files#${run.id}") + + val things = d.make(run.dir, 0) + val files = things.filter(p => os.isFile(p)) + + val targets = run.observe { + var i = 0 + files.map { p => + val target = run.dir / s"x$i" + i += 1 + os.move(p, target) + target + } + } + run.check(files.toSet ++ targets.toSet) + } && { + val out = comparisons.forall(_.compare()) + comparisons.clear() + out + } + + property("move_folder") = forAll(TestDir.gen(100)) { d => + val run = new Run() + //println(s"move_folder#${run.id}") + + val things = d.make(run.dir,0) + + val sources = os.list(run.dir).toList + val target = run.dir / "it" + os.makeDir(target) + + run.observe { + sources.foreach { p => + os.move.into(p,target) + } + } + + val new_paths = things.map(_.relativeTo(run.dir)).map(target / _) + + run.check(new_paths.toSet ++ sources.toSet) + } && { + val out = comparisons.forall(_.compare()) + comparisons.clear() + out + } + } + +} diff --git a/os/watch/test/src/TestEntry.scala b/os/watch/test/src/TestEntry.scala new file mode 100644 index 00000000..17140448 --- /dev/null +++ b/os/watch/test/src/TestEntry.scala @@ -0,0 +1,50 @@ +package os.watch + +import org.scalacheck._ +//import upickle.default._ + +sealed trait TestEntry { + //def name : String + def make(root: os.Path, index: Int) : Seq[os.Path] +} + +case object TestFile extends TestEntry { + def make(root: os.Path, index: Int): Seq[os.Path] = { + val p = root / f"f$index%04d" + os.write(p , index.toString) + Seq(p) + } +} + +case class TestDir(n_files: Int, dirs: Seq[TestEntry]) extends TestEntry { + def make(root: os.Path, index: Int): Seq[os.Path] = { + val me = root / f"d$index%04d" + os.makeDir(me) + + val child_files = (0 to n_files).flatMap(i => TestFile.make(me,i)) + + val child_dirs = dirs.zipWithIndex.flatMap { case(child,index) => + child.make(me,index) + } + + Seq(me) ++ child_files ++ child_dirs + } +} + +object TestDir { + + //implicit val rw : ReadWriter[TestDir] = macroRW + + def gen(limit: Int) : Gen[TestDir] = + for { + n_files <- Gen.choose(0,limit/3) + n_dirs <- Gen.choose(0, 0.max(limit/3 - n_files - 1)) + n_left = 0.max(limit - n_files - n_dirs) + dirs <- Gen.listOfN(n_dirs,Gen.delay(TestDir.gen(n_left/n_dirs+1))) + } yield TestDir(n_files,dirs) + + //implicit def shrink : Shrink[TestDir] = Shrink { (dir:TestDir) => + // dir.children.tails.map(t => TestDir(dir.name,t)).toStream + //} + +} diff --git a/os/watch/test/src/inotify/NotifyTests.scala b/os/watch/test/src/inotify/NotifyTests.scala new file mode 100644 index 00000000..408dcdc0 --- /dev/null +++ b/os/watch/test/src/inotify/NotifyTests.scala @@ -0,0 +1,59 @@ +package os.watch.inotify + +import utest.{TestSuite, Tests, test} +import java.util.concurrent.atomic.AtomicReference + + +object NotifyTests extends TestSuite { + val tests = Tests { + + test("Basic") { + if (System.getProperty("os.name") == "Linux") { + import Notify.it + + val temp = os.temp.dir(deleteOnExit = true) + val fd = it.inotify_init() + + val masks = Mask.create | Mask.delete | Mask.move | Mask.modify + + def watch(path: os.Path): os.Path = { + Notify.add_watch(fd, path, masks) + path + } + + watch(temp) + + def makeDir(path: os.Path): os.Path = { + os.makeDir(path) + watch(path) + } + + def makeFile(path: os.Path): os.Path = { + os.write(path, path.last) + path + } + + val r = makeDir(temp / "r") + + Seq("a", "b", "c").foreach { n => + val dir_path = makeDir(r / n) + val file_path = makeFile(dir_path / n) + } + + os.remove.all(temp) + + var i = 0 + + Notify.events(new AtomicReference(Option(fd))).generate { e => + println(s"[$i] $e") + i += 1 + if (i == 22) { + os.Generator.End + } else { + os.Generator.Continue + } + } + } + } + } +}