Skip to content

Commit

Permalink
Merge pull request #191 from clowder-framework/release/1.15.1
Browse files Browse the repository at this point in the history
Release/1.15.1
  • Loading branch information
lmarini authored Mar 17, 2021
2 parents 3c16b47 + 69f4760 commit 7d03641
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 100 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 1.15.1 - 2021-03-12

### Fixed
- Several views were throwing errors trying to access a None value in `EventSinkService` when a user was not logged in.
Replaced `get()` with `getOrElse()`.
- Consolidated field names sent by the EventSinkService to maximize reuse.
- Changed `EventSinkService` logging to debug to minimize chatter.
- Don't automatically create eventsink queue and bind it to eventsink exchange. Let clients do that so that we don't
have a queue for the eventsink filling up if there are no consumers.

## 1.15.0 - 2021-03-03

### Added
Expand Down
189 changes: 95 additions & 94 deletions app/services/EventSinkService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,21 @@ import java.net.URI
import java.time.Instant
import play.api.{Logger, Play}
import play.api.Play.current
import play.api.libs.json.{JsValue, Json}
import play.api.libs.json.{JsObject, JsValue, Json}

object EventSinkService {
val EXCHANGE_NAME_CONFIG_KEY = "eventsink.exchangename"
val QUEUE_NAME_CONFIG_KEY = "eventsink.queuename"

val EXCHANGE_NAME_DEFAULT_VALUE = "clowder.metrics"
val QUEUE_NAME_DEFAULT_VALUE = "event.sink"

// TODO: Make sure these match the real config key names
val AMPLITUDE_CONFIG_KEY = "amplitude.apikey"
val GA_CONFIG_KEY = "google.analytics"
val INFLUX_AUTH_CONFIG_KEY = "influx.uri"
val MONGO_AUTH_CONFIG_KEY = "mongo.uri"
val QUEUE_NAME_DEFAULT_VALUE = ""
}

class EventSinkService {
val messageService: MessageService = DI.injector.getInstance(classOf[MessageService])
val userService: UserService = DI.injector.getInstance(classOf[UserService])
val appConfig: AppConfigurationService = DI.injector.getInstance(classOf[AppConfigurationService])

// UNUSED: Fetch directly from config on demand
def getGoogleAnalytics(): String = Play.configuration.getString(EventSinkService.GA_CONFIG_KEY).getOrElse("")
def getAmplitudeApiKey(): String = Play.configuration.getString(EventSinkService.AMPLITUDE_CONFIG_KEY).getOrElse("")
def getMongoAuth(): String = Play.configuration.getString(EventSinkService.AMPLITUDE_CONFIG_KEY).getOrElse("")
def getInfluxAuth(): String = Play.configuration.getString(EventSinkService.INFLUX_AUTH_CONFIG_KEY).getOrElse("")

/** Event Sink exchange name in RabbitMQ */
val exchangeName = Play.configuration.getString(EventSinkService.EXCHANGE_NAME_CONFIG_KEY)
.getOrElse(EventSinkService.EXCHANGE_NAME_DEFAULT_VALUE)
Expand All @@ -41,20 +29,22 @@ class EventSinkService {
val queueName = Play.configuration.getString(EventSinkService.QUEUE_NAME_CONFIG_KEY)
.getOrElse(EventSinkService.QUEUE_NAME_DEFAULT_VALUE)

def logEvent(category: String, metadata: JsValue) = {
Logger.info("eventsink.exchangename=" + exchangeName)
Logger.info("eventsink.queueName=" + queueName)

Logger.info("Submitting message to event sink exchange: " + Json.stringify(metadata))

//val message = EventSinkMessage(Instant.now().getEpochSecond, category, metadata)
messageService.submit(exchangeName, queueName, metadata, "fanout")
def logEvent(message: JsValue) = {
// Inject timestamp before logging the event
val event = message.as[JsObject] + ("created" -> Json.toJson(java.util.Date.from(Instant.now())))
Logger.info("Submitting message to event sink exchange: " + Json.stringify(event))
try {
messageService.submit(exchangeName, queueName, event, "fanout")
} catch {
case e: Throwable => { Logger.error("Failed to submit event sink message", e) }
}
}

/** Log an event when user signs up */
def logUserSignupEvent(user: User) = {
Logger.info("New user signed up: " + user.id.stringify)
logEvent("user_activity", Json.obj(
Logger.debug("New user signed up: " + user.id.stringify)
logEvent(Json.obj(
"category" -> "user_activity",
"type" -> "signup",
"user_id" -> user.id,
"user_name" -> user.fullName
Expand All @@ -63,8 +53,9 @@ class EventSinkService {

/** Log an event when user logs in */
def logUserLoginEvent(user: User) = {
Logger.info("User logged in: " + user.id.stringify)
logEvent("user_activity", Json.obj(
Logger.debug("User logged in: " + user.id.stringify)
logEvent(Json.obj(
"category" -> "user_activity",
"type" -> "login",
"user_id" -> user.id,
"user_name" -> user.fullName
Expand All @@ -73,193 +64,203 @@ class EventSinkService {

/** Log an event when user views a dataset */
def logDatasetViewEvent(dataset: Dataset, viewer: Option[User]) = {
Logger.info("User viewed a dataset: " + dataset.id.stringify)
logEvent("view_resource", Json.obj(
Logger.debug("User viewed a dataset: " + dataset.id.stringify)
logEvent(Json.obj(
"category" -> "view_resource",
"type" -> "dataset",
"resource_id" -> dataset.id,
"resource_name" -> dataset.name,
"author_id" -> dataset.author.id,
"author_name" -> dataset.author.fullName,
"viewer_id" -> viewer.get.id,
"viewer_name" -> viewer.get.getMiniUser.fullName
"user_id" -> viewer.getOrElse(User.anonymous).id,
"user_name" -> viewer.getOrElse(User.anonymous).getMiniUser.fullName
))
}

/** Log an event when user views a file */
def logFileViewEvent(file: File, viewer: Option[User]) = {
Logger.info("User viewed a file: " + file.id.stringify)
logEvent("view_resource", Json.obj(
Logger.debug("User viewed a file: " + file.id.stringify)
logEvent(Json.obj(
"category" -> "view_resource",
"type" -> "file",
"resource_id" -> file.id,
"resource_name" -> file.filename,
"author_id" -> file.author.id,
"author_name" -> file.author.fullName,
"viewer_id" -> viewer.get.id,
"viewer_name" -> viewer.get.getMiniUser.fullName
"user_id" -> viewer.getOrElse(User.anonymous).id,
"user_name" -> viewer.getOrElse(User.anonymous).getMiniUser.fullName
))
}

/** Log an event when user views a collection */
def logCollectionViewEvent(collection: Collection, viewer: Option[User]) = {
Logger.info("User viewed a collection: " + collection.id.stringify)
logEvent("view_resource", Json.obj(
Logger.debug("User viewed a collection: " + collection.id.stringify)
logEvent(Json.obj(
"category" -> "view_resource",
"type" -> "collection",
"resource_id" -> collection.id,
"resource_name" -> collection.name,
"author_id" -> collection.author.id,
"author_name" -> collection.author.fullName,
"viewer_id" -> viewer.get.id,
"viewer_name" -> viewer.get.getMiniUser.fullName
"user_id" -> viewer.getOrElse(User.anonymous).id,
"user_name" -> viewer.getOrElse(User.anonymous).getMiniUser.fullName
))
}

/** Log an event when user views a space */
def logSpaceViewEvent(space: ProjectSpace, viewer: Option[User]) = {
Logger.info("User viewed a space: " + space.id.stringify)
Logger.debug("User viewed a space: " + space.id.stringify)
(viewer, userService.get(space.creator)) match {
case (Some(v), Some(author)) => {
logEvent("view_resource", Json.obj(
logEvent(Json.obj(
"category" -> "view_resource",
"type" -> "space",
"resource_id" -> space.id,
"resource_name" -> space.name,
"author_id" -> space.creator.stringify,
"author_name" -> author.fullName,
"viewer_id" -> v.id,
"viewer_name" -> v.getMiniUser.fullName
"user_id" -> v.id,
"user_name" -> v.getMiniUser.fullName
))
}
case (None, Some(author)) => {
logEvent("view_resource", Json.obj(
logEvent(Json.obj(
"category" -> "view_resource",
"type" -> "space",
"resource_id" -> space.id,
"resource_name" -> space.name,
"author_id" -> author.id,
"author_name" -> author.fullName,
"viewer_id" -> "",
"viewer_name" -> "Anonymous"
"user_id" -> User.anonymous.id,
"user_name" -> User.anonymous.fullName
))
}
case (Some(v), None) => {
// TODO: Is this a real case? Is this needed?
logEvent("view_resource", Json.obj(
logEvent(Json.obj(
"category" -> "view_resource",
"type" -> "space",
"resource_id" -> space.id,
"resource_name" -> space.name,
"author_id" -> space.creator.stringify,
"author_name" -> "",
"viewer_id" -> v.id,
"viewer_name" -> v.getMiniUser.fullName
"user_id" -> v.id,
"user_name" -> v.getMiniUser.fullName
))
}
case (None, None) => {
// TODO: Is this a real case? Is this needed?
logEvent("view_resource", Json.obj(
logEvent(Json.obj(
"category" -> "view_resource",
"type" -> "space",
"resource_id" -> space.id,
"resource_name" -> space.name,
"author_id" -> space.creator.stringify,
"author_name" -> "",
"viewer_id" -> "",
"viewer_name" -> "Anonymous"
"user_id" -> User.anonymous.id,
"user_name" -> User.anonymous.fullName
))
}
}
}

def logSubmitFileToExtractorEvent(file: File, extractorName: String, submitter: Option[User]) = {
logEvent("extraction", Json.obj(
logEvent(Json.obj(
"category" -> "extraction",
"type" -> "file",
"extractor_name" -> extractorName,
"resource_id" -> file.id,
"resource_name" -> file.filename,
"author_id" -> file.author.id,
"author_name" -> file.author.fullName,
"submitter_id" -> submitter.get.id,
"submitter_name" -> submitter.get.getMiniUser.fullName
"user_id" -> submitter.getOrElse(User.anonymous).id,
"user_name" -> submitter.getOrElse(User.anonymous).getMiniUser.fullName
))
}

def logSubmitDatasetToExtractorEvent(dataset: Dataset, extractorName: String, submitter: Option[User]) = {
logEvent("extraction", Json.obj(
logEvent(Json.obj(
"category" -> "extraction",
"type" -> "dataset",
"extractor_name" -> extractorName,
"resource_id" -> dataset.id,
"resource_name" -> dataset.name,
"author_id" -> dataset.author.id,
"author_name" -> dataset.author.fullName,
"submitter_id" -> submitter.get.id,
"submitter_name" -> submitter.get.getMiniUser.fullName
"user_id" -> submitter.getOrElse(User.anonymous).id,
"user_name" -> submitter.getOrElse(User.anonymous).getMiniUser.fullName
))
}

def logSubmitSelectionToExtractorEvent(dataset: Dataset, extractorName: String, submitter: Option[User]) = {
// TODO: Is this a real case? Is this needed?
logEvent("extraction", Json.obj(
logEvent(Json.obj(
"category" -> "extraction",
"type" -> "selection",
"extractor_name" -> extractorName,
"resource_id" -> dataset.id,
"resource_name" -> dataset.name,
"author_id" -> dataset.author.id,
"author_name" -> dataset.author.fullName,
"submitter_id" -> submitter.get.id,
"submitter_name" -> submitter.get.getMiniUser.fullName
"user_id" -> submitter.getOrElse(User.anonymous).id,
"user_name" -> submitter.getOrElse(User.anonymous).getMiniUser.fullName
))
}

def logFileUploadEvent(file: File, dataset: Option[Dataset], uploader: Option[User]) = {
dataset match {
case Some(d) => {
logEvent("upload", Json.obj(
logEvent(Json.obj(
"category" -> "upload",
"dataset_id" -> d.id,
"dataset_name" -> d.name,
"dataset_author_name" -> d.author.fullName,
"dataset_author_id" -> d.author.id,
"uploader_id" -> uploader.get.id,
"uploader_name" -> uploader.get.getMiniUser.fullName,
"filename" -> file.filename,
"length" -> file.length
"author_name" -> d.author.fullName,
"author_id" -> d.author.id,
"user_id" -> uploader.getOrElse(User.anonymous).id,
"user_name" -> uploader.getOrElse(User.anonymous).getMiniUser.fullName,
"resource_name" -> file.filename,
"size" -> file.length
))
}
case None => {
logEvent("upload", Json.obj(
"uploader_id" -> uploader.get.id,
"uploader_name" -> uploader.get.getMiniUser.fullName,
"filename" -> file.filename,
"length" -> file.length
logEvent(Json.obj(
"category" -> "upload",
"user_id" -> uploader.getOrElse(User.anonymous).id,
"user_name" -> uploader.getOrElse(User.anonymous).getMiniUser.fullName,
"resource_name" -> file.filename,
"size" -> file.length
))
}
}
}

def logFileDownloadEvent(file: File, /*dataset: Dataset,*/ downloader: Option[User]) = {
logEvent("download", Json.obj(
/*"dataset_id" -> dataset.id,
"dataset_name" -> dataset.name,
"dataset_author_name" -> dataset.author.fullName,
"dataset_author_id" -> dataset.author.id,*/
def logFileDownloadEvent(file: File, downloader: Option[User]) = {
logEvent(Json.obj(
"category" -> "download",
"type" -> "file",
"uploader_id" -> file.author.id,
"uploader_name" -> file.author.fullName,
"downloader_id" -> downloader.get.id,
"downloader_name" -> downloader.get.getMiniUser.fullName,
"filename" -> file.filename,
"length" -> file.length
"resource_id" -> file.id,
"resource_name" -> file.filename,
"author_id" -> file.author.id,
"author_name" -> file.author.fullName,
"user_id" -> downloader.getOrElse(User.anonymous).id,
"user_name" -> downloader.getOrElse(User.anonymous).getMiniUser.fullName,
"size" -> file.length
))
}

def logDatasetDownloadEvent(dataset: Dataset, downloader: Option[User]) = {
logEvent("download", Json.obj(
logEvent(Json.obj(
"category" -> "download",
"type" -> "dataset",
"dataset_id" -> dataset.id,
"dataset_name" -> dataset.name,
"dataset_author_name" -> dataset.author.fullName,
"dataset_author_id" -> dataset.author.id,
"downloader_id" -> downloader.get.id,
"downloader_name" -> downloader.get.getMiniUser.fullName,
"files_length" -> dataset.files.length,
"folder_length" -> dataset.folders.length
"resource_id" -> dataset.id,
"resource_name" -> dataset.name,
"author_name" -> dataset.author.fullName,
"author_id" -> dataset.author.id,
"user_id" -> downloader.getOrElse(User.anonymous).id,
"user_name" -> downloader.getOrElse(User.anonymous).getMiniUser.fullName,
"size" -> (dataset.files.length + dataset.folders.length)
))
}
}

//case class EventSinkMessage(created: Long, category: String, metadata: JsValue)
//case class EventSinkMessage(created: Long, category: String, metadata: JsValue)
10 changes: 7 additions & 3 deletions app/services/rabbitmq/RabbitMQMessageService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,15 @@ class RabbitMQMessageService extends MessageService {

/** Submit a message to broker. */
override def submit(exchange: String, routing_key: String, message: JsValue, exchange_type: String = "topic") = {
// This probably isn't going to extract queue (use other submit() for that) so make a new broker
connect()
val tempChannel = connection.get.createChannel()
tempChannel.exchangeDeclare(exchange, exchange_type, true)
tempChannel.queueDeclare(routing_key, true, false, false, null)
tempChannel.queueBind(routing_key, exchange, routing_key)

// If a routing_key (queue name) was provided, ensure that the queue exists
if (routing_key != "") {
tempChannel.queueDeclare(routing_key, true, false, false, null)
tempChannel.queueBind(routing_key, exchange, routing_key)
}
tempChannel.basicPublish(exchange, routing_key, null, message.toString.getBytes)
}

Expand Down
2 changes: 1 addition & 1 deletion doc/src/sphinx/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
author = 'Luigi Marini'

# The full version, including alpha/beta/rc tags
release = '1.15.0'
release = '1.15.1'


# -- General configuration ---------------------------------------------------
Expand Down
Loading

0 comments on commit 7d03641

Please sign in to comment.