Skip to content

Commit

Permalink
Use queue name in tests. Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Jan 17, 2024
1 parent 1257b56 commit 5ea2a8b
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 11 deletions.
7 changes: 4 additions & 3 deletions src/async/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ impl<S: AsyncBufReadExt + AsyncWriteExt + Send + Unpin, E: StdError + 'static +
async fn force_fail_all_workers(&mut self) -> usize {
let mut running = 0;
for wstate in self.worker_states.iter() {
if let Some(jid) = wstate.lock().unwrap().running_job.take() {
let may_be_jid = wstate.lock().unwrap().running_job.take();
if let Some(jid) = may_be_jid {
running += 1;
let f = Fail::new(&*jid, "unknown", "terminated");
let _ = match self.c.issue(&f).await {
Expand Down Expand Up @@ -209,9 +210,9 @@ impl<
.collect();

let mut workers = Vec::with_capacity(workers_count);
for i in 0..workers_count {
for (worker, status) in statuses.iter().enumerate().take(workers_count) {
let handle = self
.spawn_worker(Arc::clone(&statuses[i]), i, queues)
.spawn_worker(Arc::clone(status), worker, queues)
.await?;
workers.push(handle)

Check warning on line 217 in src/async/consumer/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/async/consumer/mod.rs#L205-L217

Added lines #L205 - L217 were not covered by tests
}
Expand Down
4 changes: 2 additions & 2 deletions src/async/proto/single/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl AsyncFaktoryCommand for Push {
async fn issue<W: AsyncWriteExt + Unpin + Send>(&self, w: &mut W) -> Result<(), Error> {
w.write_all(b"PUSH ").await?;
let r = serde_json::to_vec(&**self).map_err(Error::Serialization)?;
w.write(&r).await?;
w.write_all(&r).await?;
Ok(w.write_all(b"\r\n").await?)
}
}
Expand Down Expand Up @@ -69,7 +69,7 @@ macro_rules! self_to_cmd {
let c = format!("{} ", stringify!($struct).to_uppercase());
w.write_all(c.as_bytes()).await?;
let r = serde_json::to_vec(self).map_err(Error::Serialization)?;
w.write(&r).await?;
w.write_all(&r).await?;
Ok(w.write_all(b"\r\n").await?)
}
}
Expand Down
10 changes: 7 additions & 3 deletions tests/real/async/enterprise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ async fn print_job(j: Job) -> io::Result<()> {
#[tokio::test]
async fn async_ent_expiring_job() {
skip_if_not_enterprise!();

let url = learn_faktory_url();
let local = "async_ent_expiring_job";

// prepare a producer ("client" in Faktory terms) and consumer ("worker"):
let mut producer = AsyncProducer::connect(Some(&url)).await.unwrap();
Expand All @@ -38,28 +40,30 @@ async fn async_ent_expiring_job() {
let ttl = chrono::Duration::seconds(job_ttl_secs as i64);
let job1 = JobBuilder::new("AnExpiringJob")
.args(vec!["ISBN-13:9781718501850"])
.queue(local)
.expires_at(chrono::Utc::now() + ttl)
.build();

// enqueue and fetch immediately job1:
producer.enqueue(job1).await.unwrap();
let had_job = consumer.run_one(0, &["default"]).await.unwrap();
let had_job = consumer.run_one(0, &[local]).await.unwrap();
assert!(had_job);

// check that the queue is drained:
let had_job = consumer.run_one(0, &["default"]).await.unwrap();
let had_job = consumer.run_one(0, &[local]).await.unwrap();
assert!(!had_job);

// prepare another one:
let job2 = JobBuilder::new("AnExpiringJob")
.args(vec!["ISBN-13:9781718501850"])
.queue(local)
.expires_at(chrono::Utc::now() + ttl)
.build();

// enqueue and then fetch job2, but after ttl:
producer.enqueue(job2).await.unwrap();
tokio::time::sleep(time::Duration::from_secs(job_ttl_secs * 2)).await;
let had_job = consumer.run_one(0, &["default"]).await.unwrap();
let had_job = consumer.run_one(0, &[local]).await.unwrap();

// For the non-enterprise edition of Faktory, this assertion will
// fail, which should be taken into account when running the test suite on CI.
Expand Down
9 changes: 6 additions & 3 deletions tests/real/enterprise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ fn ent_expiring_job() {
skip_if_not_enterprise!();

let url = learn_faktory_url();
let local = "ent_expiring_job";

// prepare a producer ("client" in Faktory terms) and consumer ("worker"):
let mut producer = Producer::connect(Some(&url)).unwrap();
Expand All @@ -30,28 +31,30 @@ fn ent_expiring_job() {
let ttl = chrono::Duration::seconds(job_ttl_secs as i64);
let job1 = JobBuilder::new("AnExpiringJob")
.args(vec!["ISBN-13:9781718501850"])
.queue(local)
.expires_at(chrono::Utc::now() + ttl)
.build();

// enqueue and fetch immediately job1:
producer.enqueue(job1).unwrap();
let had_job = consumer.run_one(0, &["default"]).unwrap();
let had_job = consumer.run_one(0, &[local]).unwrap();
assert!(had_job);

// check that the queue is drained:
let had_job = consumer.run_one(0, &["default"]).unwrap();
let had_job = consumer.run_one(0, &[local]).unwrap();
assert!(!had_job);

// prepare another one:
let job2 = JobBuilder::new("AnExpiringJob")
.args(vec!["ISBN-13:9781718501850"])
.queue(local)
.expires_at(chrono::Utc::now() + ttl)
.build();

// enqueue and then fetch job2, but after ttl:
producer.enqueue(job2).unwrap();
thread::sleep(time::Duration::from_secs(job_ttl_secs * 2));
let had_job = consumer.run_one(0, &["default"]).unwrap();
let had_job = consumer.run_one(0, &[local]).unwrap();

// For the non-enterprise edition of Faktory, this assertion will
// fail, which should be taken into account when running the test suite on CI.
Expand Down

0 comments on commit 5ea2a8b

Please sign in to comment.