diff --git a/src/async/consumer/mod.rs b/src/async/consumer/mod.rs index 4359f458..d06a75d0 100644 --- a/src/async/consumer/mod.rs +++ b/src/async/consumer/mod.rs @@ -108,7 +108,8 @@ impl usize { let mut running = 0; for wstate in self.worker_states.iter() { - if let Some(jid) = wstate.lock().unwrap().running_job.take() { + let may_be_jid = wstate.lock().unwrap().running_job.take(); + if let Some(jid) = may_be_jid { running += 1; let f = Fail::new(&*jid, "unknown", "terminated"); let _ = match self.c.issue(&f).await { @@ -209,9 +210,9 @@ impl< .collect(); let mut workers = Vec::with_capacity(workers_count); - for i in 0..workers_count { + for (worker, status) in statuses.iter().enumerate().take(workers_count) { let handle = self - .spawn_worker(Arc::clone(&statuses[i]), i, queues) + .spawn_worker(Arc::clone(status), worker, queues) .await?; workers.push(handle) } diff --git a/src/async/proto/single/cmd.rs b/src/async/proto/single/cmd.rs index 6d07848e..430d443b 100644 --- a/src/async/proto/single/cmd.rs +++ b/src/async/proto/single/cmd.rs @@ -22,7 +22,7 @@ impl AsyncFaktoryCommand for Push { async fn issue(&self, w: &mut W) -> Result<(), Error> { w.write_all(b"PUSH ").await?; let r = serde_json::to_vec(&**self).map_err(Error::Serialization)?; - w.write(&r).await?; + w.write_all(&r).await?; Ok(w.write_all(b"\r\n").await?) } } @@ -69,7 +69,7 @@ macro_rules! self_to_cmd { let c = format!("{} ", stringify!($struct).to_uppercase()); w.write_all(c.as_bytes()).await?; let r = serde_json::to_vec(self).map_err(Error::Serialization)?; - w.write(&r).await?; + w.write_all(&r).await?; Ok(w.write_all(b"\r\n").await?) } } diff --git a/tests/real/async/enterprise.rs b/tests/real/async/enterprise.rs index 9b908962..cdd8d5fc 100644 --- a/tests/real/async/enterprise.rs +++ b/tests/real/async/enterprise.rs @@ -24,7 +24,9 @@ async fn print_job(j: Job) -> io::Result<()> { #[tokio::test] async fn async_ent_expiring_job() { skip_if_not_enterprise!(); + let url = learn_faktory_url(); + let local = "async_ent_expiring_job"; // prepare a producer ("client" in Faktory terms) and consumer ("worker"): let mut producer = AsyncProducer::connect(Some(&url)).await.unwrap(); @@ -38,28 +40,30 @@ async fn async_ent_expiring_job() { let ttl = chrono::Duration::seconds(job_ttl_secs as i64); let job1 = JobBuilder::new("AnExpiringJob") .args(vec!["ISBN-13:9781718501850"]) + .queue(local) .expires_at(chrono::Utc::now() + ttl) .build(); // enqueue and fetch immediately job1: producer.enqueue(job1).await.unwrap(); - let had_job = consumer.run_one(0, &["default"]).await.unwrap(); + let had_job = consumer.run_one(0, &[local]).await.unwrap(); assert!(had_job); // check that the queue is drained: - let had_job = consumer.run_one(0, &["default"]).await.unwrap(); + let had_job = consumer.run_one(0, &[local]).await.unwrap(); assert!(!had_job); // prepare another one: let job2 = JobBuilder::new("AnExpiringJob") .args(vec!["ISBN-13:9781718501850"]) + .queue(local) .expires_at(chrono::Utc::now() + ttl) .build(); // enqueue and then fetch job2, but after ttl: producer.enqueue(job2).await.unwrap(); tokio::time::sleep(time::Duration::from_secs(job_ttl_secs * 2)).await; - let had_job = consumer.run_one(0, &["default"]).await.unwrap(); + let had_job = consumer.run_one(0, &[local]).await.unwrap(); // For the non-enterprise edition of Faktory, this assertion will // fail, which should be taken into account when running the test suite on CI. diff --git a/tests/real/enterprise.rs b/tests/real/enterprise.rs index 5bc5b781..b12ed85f 100644 --- a/tests/real/enterprise.rs +++ b/tests/real/enterprise.rs @@ -15,6 +15,7 @@ fn ent_expiring_job() { skip_if_not_enterprise!(); let url = learn_faktory_url(); + let local = "ent_expiring_job"; // prepare a producer ("client" in Faktory terms) and consumer ("worker"): let mut producer = Producer::connect(Some(&url)).unwrap(); @@ -30,28 +31,30 @@ fn ent_expiring_job() { let ttl = chrono::Duration::seconds(job_ttl_secs as i64); let job1 = JobBuilder::new("AnExpiringJob") .args(vec!["ISBN-13:9781718501850"]) + .queue(local) .expires_at(chrono::Utc::now() + ttl) .build(); // enqueue and fetch immediately job1: producer.enqueue(job1).unwrap(); - let had_job = consumer.run_one(0, &["default"]).unwrap(); + let had_job = consumer.run_one(0, &[local]).unwrap(); assert!(had_job); // check that the queue is drained: - let had_job = consumer.run_one(0, &["default"]).unwrap(); + let had_job = consumer.run_one(0, &[local]).unwrap(); assert!(!had_job); // prepare another one: let job2 = JobBuilder::new("AnExpiringJob") .args(vec!["ISBN-13:9781718501850"]) + .queue(local) .expires_at(chrono::Utc::now() + ttl) .build(); // enqueue and then fetch job2, but after ttl: producer.enqueue(job2).unwrap(); thread::sleep(time::Duration::from_secs(job_ttl_secs * 2)); - let had_job = consumer.run_one(0, &["default"]).unwrap(); + let had_job = consumer.run_one(0, &[local]).unwrap(); // For the non-enterprise edition of Faktory, this assertion will // fail, which should be taken into account when running the test suite on CI.