Skip to content

Commit

Permalink
NotifyWatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
utgheith committed Apr 4, 2021
1 parent 604f328 commit de32868
Show file tree
Hide file tree
Showing 16 changed files with 839 additions and 4 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ target/
.project
.cache
.sbtserver
.bsp/
project/.sbtserver
tags
nohup.out
out
lowered.hnir
random_data/
11 changes: 8 additions & 3 deletions build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ object os extends Module {
def platformSegment = "jvm"
def moduleDeps = super.moduleDeps :+ os.jvm()
def ivyDeps = Agg(
ivy"net.java.dev.jna:jna:5.0.0"
ivy"net.java.dev.jna:jna:5.0.0",
ivy"com.lihaoyi::sourcecode::0.2.5",
)
object test extends Tests with OsLibTestModule {
def platformSegment = "jvm"
Expand Down Expand Up @@ -107,7 +108,11 @@ trait OsLibModule extends CrossScalaModule with PublishModule{
trait OsLibTestModule extends ScalaModule with TestModule{
def ivyDeps = Agg(
ivy"com.lihaoyi::utest::0.7.8",
ivy"com.lihaoyi::sourcecode::0.2.5"
ivy"com.lihaoyi::sourcecode::0.2.5",
if (scalaVersion().startsWith("2.11"))
ivy"org.scalacheck::scalacheck::1.15.2"
else
ivy"org.scalacheck::scalacheck::1.15.3"
)

def platformSegment: String
Expand All @@ -116,7 +121,7 @@ trait OsLibTestModule extends ScalaModule with TestModule{
millSourcePath / s"src-$platformSegment"
)

def testFrameworks = Seq("utest.runner.Framework")
def testFrameworks = Seq("utest.runner.Framework", "org.scalacheck.ScalaCheckFramework")
// we check the textual output of system commands and expect it in english
override def forkEnv: Target[Map[String, String]] = super.forkEnv() ++ Map("LC_ALL" -> "C")
}
Expand Down
4 changes: 4 additions & 0 deletions os/src/Internals.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ object Internals{
src,
dest.write(_, 0, _)
)

def linux() = {
System.getProperty("os.name") == "Linux"
}
}
12 changes: 12 additions & 0 deletions os/watch/src/inotify/Constants.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package os.watch.inotify

object Constants {

val EAGAIN: Int = 11
val ENOTDIR: Int = 20

val O_NONBLOCK: Int = 1 << 11

val POLLIN : Short = 1

}
31 changes: 31 additions & 0 deletions os/watch/src/inotify/Event.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package os.watch.inotify

import java.nio.ByteBuffer

case class Event(val wd: Int, val mask: Mask, val cookie: Int, val name: String) {



}

object Event {
def apply(buf: ByteBuffer): Event = {
val wd = buf.getInt
val mask = buf.getInt
val cookie = buf.getInt
val len = buf.getInt
val sb = new StringBuilder()

var i = 0

while (i < len) {
val b = buf.get()
if (b != 0) {
sb.append(b.toChar)
}
i += 1
}

Event(wd, Mask(mask), cookie, sb.toString)
}
}
60 changes: 60 additions & 0 deletions os/watch/src/inotify/Mask.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package os.watch.inotify

import scala.collection.mutable

case class Mask(mask: Int) {
def |(rhs: Mask): Mask = Mask(mask|rhs.mask)
def contains(rhs: Mask) : Boolean = (mask & rhs.mask) == rhs.mask

override def toString: String = {

val things = Mask.named_masks.toList.sortBy(_._1).flatMap { case(name,m) =>
if (this.contains(m)) Seq(name) else Seq()
}.mkString("+")

f"Mask($mask%08x = $things)"
}
}

object Mask {
val named_masks : mutable.Map[String,Mask] = mutable.Map()

private def named(bit: Int)(implicit name: sourcecode.Name): Mask = {
val a = Mask(1 << bit)
named_masks.put(name.value,a)
a
}

val access: Mask = named(0)
val modify: Mask = named(1)
val attrib: Mask = named(2)
val close_write: Mask = named(3)
val close_nowrite: Mask = named(4)
val open: Mask = named(5)
val move_from: Mask = named(6)
val move_to: Mask = named(7)
val create: Mask = named(8)
val delete: Mask = named(9)
val delete_self: Mask = named(10)

val unmount: Mask = named(13)
val overflow: Mask = named(14)
val ignored: Mask = named(15)

val only_dir: Mask = named(24)
val do_not_follow: Mask = named(25)
val exclude_unlink: Mask = named(26)

val mask_create: Mask = named(28)
val mask_add: Mask = named(29)
val is_dir: Mask = named(30)
val one_shot: Mask = named(31)


val close : Mask = close_write | close_nowrite
val move : Mask = move_from | move_to

val all_events = access | modify | attrib |
close | open | move | create |
delete | delete_self | unmount
}
122 changes: 122 additions & 0 deletions os/watch/src/inotify/Notify.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package os.watch.inotify

import com.sun.jna.{LastErrorException, Library, Native}
import geny.Generator

import java.nio.{ByteBuffer, ByteOrder}
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.mutable
import scala.util.Try

trait Notify extends Library {
//@throws[LastErrorException]
def inotify_init() : Int;

//@throws[LastErrorException]
def inotify_init1(flags: Int) : Int

//@throws[LastErrorException]
def inotify_add_watch(fd: Int, path: String, mask: Int): Int

//@throws[LastErrorException]
def inotify_rm_watch(fd: Int, wd: Int): Int

def poll(fds: Array[Byte], nfds: Int, timeout: Int): Int

//@throws[LastErrorException]
def read(fd: Int, buf: Array[Byte], count: Long): Long

//@throws[LastErrorException]
def close(fd: Int): Int
}

object Notify {
val it : Notify = Native.load("c",classOf[Notify])

import Generator._

// convenience
def add_watch(fd: Int, path : os.Path, actions: Mask) : Int = {
it.inotify_add_watch(fd,path.toString,actions.mask)
}

def poll_read(fd: Int, timeout: Int): Boolean = {
val data = ByteBuffer.allocate(8).order(ByteOrder.nativeOrder())
data.putInt(fd)
data.putShort(Constants.POLLIN)
data.putShort(0)
val rc = it.poll(data.array(),1,timeout)
if (rc == 0) {
false
} else if (rc == 1) {
data.rewind()
//println(s"fd : ${data.getInt()} $fd")
//println(f"events : ${data.getShort}%04x")
//val revent = data.getShort
//println(f"revent: ${revent}%04x")
return (data.getShort(6) & Constants.POLLIN) != 0
true
} else {
throw new NotifyException(s"poll error, fd:$fd, errno:${Native.getLastError}")
}
}

// Event processing

def events(buf: ByteBuffer): Generator[Event] = new Generator[Event]() {
override def generate(handleItem: Event => Action): Action = {
while (buf.hasRemaining) {
if (handleItem(Event(buf)) == End) return End
}
Continue
}
}

def buffers(fd: AtomicReference[Option[Int]]): Generator[ByteBuffer] = {
new Generator[ByteBuffer] {
override def generate(handleItem: ByteBuffer => Action): Action = {

val buffer = Array.fill[Byte](4096)(0)

while (true) {
fd.get match {
case Some(fd) =>
it.read(fd, buffer, buffer.length) match {
case 0 =>
return End
case n if n < 0 =>
val errno = Native.getLastError()
if (errno == Constants.EAGAIN) {
Thread.sleep(10)
} else {
throw new NotifyException(s"read error ${Native.getLastError()}, fd = $fd")
}
case n =>
val buf = ByteBuffer.wrap(buffer, 0, n.toInt).order(ByteOrder.nativeOrder())
if (handleItem(buf) == End) {
return End
}
}

case None =>
return End
}
}
End
}
}
}

//
// Produces a stream of events. Terminates when either
// - The 'fd' bcomes None
// - a handler returns End
// - a read error occurs
//
def events(fd: AtomicReference[Option[Int]]): Generator[Event] = for {
b <- buffers(fd)
e <- events(b)
} yield e
}

5 changes: 5 additions & 0 deletions os/watch/src/inotify/NotifyException.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package os.watch.inotify

class NotifyException(msg: String) extends Exception(msg) {

}
Loading

0 comments on commit de32868

Please sign in to comment.