diff --git a/.github/workflows/tls.yml b/.github/workflows/tls.yml index 8c6fa276..8f27c9aa 100644 --- a/.github/workflows/tls.yml +++ b/.github/workflows/tls.yml @@ -33,4 +33,5 @@ jobs: - name: Run tests env: FAKTORY_URL_SECURE: tcp://:uredinales@localhost:17419 + FAKTORY_URL: tcp://:uredinales@localhost:7419 run: cargo test --locked --features native_tls,rustls --test tls diff --git a/Cargo.lock b/Cargo.lock index 16e127f8..da2c6d07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,9 +34,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.14" +version = "0.6.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" +checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" dependencies = [ "anstyle", "anstyle-parse", @@ -49,33 +49,33 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" +checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" [[package]] name = "anstyle-parse" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" +checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391" +checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" dependencies = [ "windows-sys 0.52.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.3" +version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" +checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" dependencies = [ "anstyle", "windows-sys 0.52.0", @@ -139,7 +139,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.71", + "syn 2.0.72", ] [[package]] @@ -150,7 +150,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.71", + "syn 2.0.72", ] [[package]] @@ -174,6 +174,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bitflags" version = "2.6.0" @@ -195,17 +201,23 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" -version = "1.6.1" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" [[package]] name = "cc" -version = "1.1.5" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052" +checksum = "504bdec147f2cc13c8b57ed9401fd8a147cc66b67ad5cb241394244f2c947549" [[package]] name = "cfg-if" @@ -223,23 +235,23 @@ dependencies = [ "iana-time-zone", "num-traits", "serde", - "windows-targets 0.52.6", + "windows-targets", ] [[package]] name = "clap" -version = "4.5.9" +version = "4.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64acc1846d54c1fe936a78dc189c34e28d3f5afc348403f28ecf53660b9b8462" +checksum = "11d8838454fda655dafd3accb2b6e2bea645b9e4078abe84a22ceb947235c5cc" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.5.9" +version = "4.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fb8393d67ba2e7bfaf28a23458e4e2b543cc73a99595511eb207fdb8aede942" +checksum = "216aec2b177652e3846684cbfe25c9964d18ec45234f0f5da5157b207ed1aab6" dependencies = [ "anstream", "anstyle", @@ -249,15 +261,15 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" [[package]] name = "colorchoice" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" +checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" [[package]] name = "core-foundation" @@ -407,7 +419,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.71", + "syn 2.0.72", ] [[package]] @@ -436,6 +448,7 @@ dependencies = [ "openssl", "pin-project", "rand", + "rustls-native-certs", "rustls-pki-types", "semver", "serde", @@ -587,9 +600,9 @@ dependencies = [ [[package]] name = "is_terminal_polyfill" -version = "1.70.0" +version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" [[package]] name = "itoa" @@ -659,13 +672,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.11" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4" dependencies = [ + "hermit-abi", "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -739,21 +753,11 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi", - "libc", -] - [[package]] name = "object" -version = "0.36.1" +version = "0.36.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "081b846d1d56ddfc18fdf1a922e4f6e07a11768ea1b92dec44e42b72712ccfce" +checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" dependencies = [ "memchr", ] @@ -775,9 +779,9 @@ checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "openssl" -version = "0.10.64" +version = "0.10.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" +checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" dependencies = [ "bitflags", "cfg-if", @@ -796,7 +800,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.71", + "syn 2.0.72", ] [[package]] @@ -807,9 +811,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.102" +version = "0.9.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2" +checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" dependencies = [ "cc", "libc", @@ -846,7 +850,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.71", + "syn 2.0.72", ] [[package]] @@ -869,9 +873,12 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] name = "ppv-lite86" -version = "0.2.17" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] [[package]] name = "proc-macro2" @@ -978,17 +985,40 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425" +dependencies = [ + "base64", + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" +checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" [[package]] name = "rustls-webpki" -version = "0.102.5" +version = "0.102.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a6fccd794a42c2c105b513a2f62bc3fd8f3ba57a4593677ceb0bd035164d78" +checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e" dependencies = [ "ring", "rustls-pki-types", @@ -1044,31 +1074,32 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.204" +version = "1.0.205" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12" +checksum = "e33aedb1a7135da52b7c21791455563facbbcc43d0f0f66165b42c21b3dfb150" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.204" +version = "1.0.205" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" +checksum = "692d6f5ac90220161d6774db30c662202721e64aed9058d2c394f451261420c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.71", + "syn 2.0.72", ] [[package]] name = "serde_json" -version = "1.0.120" +version = "1.0.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" +checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da" dependencies = [ "itoa", + "memchr", "ryu", "serde", ] @@ -1146,9 +1177,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.71" +version = "2.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b146dcf730474b4bcd16c311627b31ede9ab149045db4d6088b3becaea046462" +checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af" dependencies = [ "proc-macro2", "quote", @@ -1169,34 +1200,35 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.10.1" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" +checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64" dependencies = [ "cfg-if", "fastrand", + "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] name = "thiserror" -version = "1.0.62" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2675633b1499176c2dff06b0856a27976a8f9d436737b4cf4f312d4d91d8bbb" +checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.62" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c" +checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.71", + "syn 2.0.72", ] [[package]] @@ -1257,30 +1289,29 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.0" +version = "1.39.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" +checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "pin-project-lite", "socket2", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "tokio-macros" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.71", + "syn 2.0.72", ] [[package]] @@ -1360,7 +1391,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.71", + "syn 2.0.72", ] [[package]] @@ -1468,9 +1499,9 @@ checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" [[package]] name = "version_check" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] name = "wasi" @@ -1499,7 +1530,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.71", + "syn 2.0.72", "wasm-bindgen-shared", ] @@ -1521,7 +1552,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.71", + "syn 2.0.72", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -1560,16 +1591,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.6", -] - -[[package]] -name = "windows-sys" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" -dependencies = [ - "windows-targets 0.48.5", + "windows-targets", ] [[package]] @@ -1578,22 +1600,16 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.6", + "windows-targets", ] [[package]] -name = "windows-targets" -version = "0.48.5" +name = "windows-sys" +version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows_aarch64_gnullvm 0.48.5", - "windows_aarch64_msvc 0.48.5", - "windows_i686_gnu 0.48.5", - "windows_i686_msvc 0.48.5", - "windows_x86_64_gnu 0.48.5", - "windows_x86_64_gnullvm 0.48.5", - "windows_x86_64_msvc 0.48.5", + "windows-targets", ] [[package]] @@ -1602,46 +1618,28 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.6", - "windows_aarch64_msvc 0.52.6", - "windows_i686_gnu 0.52.6", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", "windows_i686_gnullvm", - "windows_i686_msvc 0.52.6", - "windows_x86_64_gnu 0.52.6", - "windows_x86_64_gnullvm 0.52.6", - "windows_x86_64_msvc 0.52.6", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" - [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" -[[package]] -name = "windows_aarch64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" - [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" -[[package]] -name = "windows_i686_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" - [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -1654,48 +1652,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" -[[package]] -name = "windows_i686_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" - [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" -[[package]] -name = "windows_x86_64_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" - [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" - [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" -[[package]] -name = "windows_x86_64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" - [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -1719,6 +1693,27 @@ dependencies = [ "time", ] +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", +] + [[package]] name = "zeroize" version = "1.8.1" diff --git a/Cargo.toml b/Cargo.toml index cd9a88c7..996f559f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,11 @@ exclude = [".github", "docker", ".gitignore", "Makefile"] [features] default = [] native_tls = ["dep:pin-project", "dep:tokio-native-tls"] -rustls = ["dep:pin-project", "dep:tokio-rustls"] +rustls = [ + "dep:pin-project", + "dep:tokio-rustls", + "dep:rustls-native-certs", +] binaries = ["dep:clap", "tokio/macros"] ent = [] @@ -47,6 +51,7 @@ tokio = { version = "1.35.1", features = [ ] } tokio-native-tls = { version = "0.3.1", optional = true } tokio-rustls = { version = "0.25.0", optional = true } +rustls-native-certs = { version = "0.7.1", optional = true } tracing = "0.1" url = "2" semver = { version = "1.0.23", features = ["serde"] } diff --git a/Makefile b/Makefile index 79e49cfe..968e667c 100644 --- a/Makefile +++ b/Makefile @@ -59,6 +59,7 @@ test/e2e: .PHONY: test/e2e/tls test/e2e/tls: FAKTORY_URL_SECURE=tcp://:${FAKTORY_PASSWORD}@${FAKTORY_HOST}:${FAKTORY_PORT_SECURE} \ + FAKTORY_URL=tcp://:${FAKTORY_PASSWORD}@${FAKTORY_HOST}:${FAKTORY_PORT} \ cargo test --locked --features native_tls,rustls --test tls -- --nocapture .PHONY: test/load diff --git a/README.md b/README.md index 80a3a61e..0fc5ae72 100644 --- a/README.md +++ b/README.md @@ -49,16 +49,54 @@ c.enqueue(Job::new("foobar", vec!["z"])).await.unwrap(); If you want to **accept** jobs from Faktory, use `Worker`. ```rust -use faktory::WorkerBuilder; +use async_trait::async_trait; +use faktory::{JobRunner, Worker}; use std::io; -let mut w = WorkerBuilder::default(); -w.register("foobar", |job| async move { - println!("{:?}", job); - Ok::<(), io::Error>(()) -}); -let mut w = w.connect(None).await.unwrap(); -if let Err(e) = w.run(&["default"]).await { - println!("worker failed: {}", e); + +struct DomainEntity(i32); + +impl DomainEntity { + fn new(buzz: i32) -> Self { + DomainEntity(buzz) + } +} + +#[async_trait] +impl JobRunner for DomainEntity { + type Error = io::Error; + + async fn run(&self, job: Job) -> Result<(), Self::Error> { + println!("{:?}, buzz={}", job, self.0); + Ok(()) + } +} + +let mut w = Worker::builder() + .register("fizz", DomainEntity::new(1)) + .register("jobtype", DomainEntity::new(100)) + .register_fn("foobar", |job| async move { + println!("{:?}", job); + Ok::<(), io::Error>(()) + }) + .register_blocking_fn("fibo", |job| { + std::thread::sleep(Duration::from_millis(1000)); + println!("{:?}", job); + Ok::<(), io::Error>(()) + }) + .with_rustls() // available on `rustls` feature only + .connect(None) + .await + .unwrap(); + +match w.run(&["default"]).await { + Err(e) => println!("worker failed: {}", e), + Ok(stop_details) => { + println!( + "Stop reason: {}, number of workers that were running: {}", + stop_details.reason, + stop_details.workers_still_running + ); + } } ``` diff --git a/docker/certs/README.md b/docker/certs/README.md new file mode 100644 index 00000000..074e19e3 --- /dev/null +++ b/docker/certs/README.md @@ -0,0 +1,11 @@ +## Important + +The certificate has been produced with [`minica`](https://github.com/jsha/minica): + +```sh +./minica -domains 'localhost' +``` + +The lib's version used was `1.1.0` and the default algorithm at the time of issuance was `ecdsa`. + +The certificate was issued on `August 13, 2024` and will be valid for `2 years and 30 days` (which is a limitation [imposed](https://github.com/jsha/minica/blob/c5ce70c9b524953b13628607abafd7a557c6f074/main.go#L277-L281) by certain platforms). diff --git a/docker/certs/faktory.local.crt b/docker/certs/faktory.local.crt index 1214e42b..677d2d0f 100644 --- a/docker/certs/faktory.local.crt +++ b/docker/certs/faktory.local.crt @@ -1,21 +1,12 @@ -----BEGIN CERTIFICATE----- -MIIDazCCAlMCFAxQwXkfT4M84/fevISct//qQskRMA0GCSqGSIb3DQEBCwUAMHEx -CzAJBgNVBAYTAlVaMREwDwYDVQQIDAhUYXNoa2VudDERMA8GA1UEBwwIVGFzaGtl -bnQxEzARBgNVBAoMCmZha3RvcnktcnMxEzARBgNVBAsMCmZha3RvcnktcnMxEjAQ -BgNVBAMMCWxvY2FsaG9zdDAgFw0yNDAyMDMyMDI1MDlaGA8zMDA0MDQwNjIwMjUw -OVowcTELMAkGA1UEBhMCVVoxETAPBgNVBAgMCFRhc2hrZW50MREwDwYDVQQHDAhU -YXNoa2VudDETMBEGA1UECgwKZmFrdG9yeS1yczETMBEGA1UECwwKZmFrdG9yeS1y -czESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB -CgKCAQEA4ektheqTRy+eHn9j22AxGHqtg/elEiZC0UCLX51ysEkhnLLvFlVFtzd7 -q+nx1PNiHdH5i/TjdAYrXAZhKU/k2YfrgCyOjm/XxSw7ujXPP+cWOmdRYTexT9o7 -Yrg3ZYMniJbbTl8j37dieXHaO7FHAvpww1q/nbQkwD/1WqK1ggQY/OZ38wpUvsws -9LA7shuXdGnjAXunnRGEzZ2EG6T5hYw0PFL+2CHwr0lqNbCur8wu99t4ED9/vfLG -0TWRQwSnApyjHy89rn5Ze3vOiNzcBW778oZxwvzriEmbQQg6RxKE19AlaiV4+n5S -woAi8Ji69BKRUSlxRhW6eX4ABV2eOwIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQDS -EXuIvVx27LyWlIhfY6vwSWqeUoRXmMFpiBNTTvvHQKlJzLlDyn1b+CqHvMdE9RZh -FI5shZkiqtRRTUGVHB4o0ntwCQmWyV/5FQQ6EYs/bHXUcN2vt1XuU7WK4fRafPPu -snYDgg0TmpGvm+J8W64TfJogWqpPsnT4pOF+aNqW88TTs1JUnNFDBQmw2QKBK+AH -+V4zhpCjVXpKtVMTnDWHQfJh4whelD18lU1jPCbzQrRs2hQWQvtzKWi0YCYc1IXl -4E6eIOHRuiUl/mE3p3f2CGJIwxgrMuxN07ncnwVXBPCaVzSLWJHy0G61mFKH5R/7 -42EC7S/POk5GtzkMJ5Du +MIIB3DCCAWOgAwIBAgIIH7dYNg66/2UwCgYIKoZIzj0EAwMwIDEeMBwGA1UEAxMV +bWluaWNhIHJvb3QgY2EgMDI1MWZmMB4XDTI0MDgxMzA1NTQyOFoXDTI2MDkxMjA1 +NTQyOFowFDESMBAGA1UEAxMJbG9jYWxob3N0MHYwEAYHKoZIzj0CAQYFK4EEACID +YgAENZuBDDayhB5EzmRfErEoIbfE5IjWChNzjO4CLTrECemPqcJbjzsk8MBwB5cb +bHGMeg1nqkqof0ZkgrM4sWZsWNI1H/LODKdXBIqMpbU12iEs7S3eo5RaGlq9CtE6 +dYyAo3YwdDAOBgNVHQ8BAf8EBAMCBaAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsG +AQUFBwMCMAwGA1UdEwEB/wQCMAAwHwYDVR0jBBgwFoAUdieiiQwm+V0FBPIukYU/ +udp7ScwwFAYDVR0RBA0wC4IJbG9jYWxob3N0MAoGCCqGSM49BAMDA2cAMGQCMGL7 +ge3qiN2B0P0bQvf9DNCblvuC7rx6NcZraYpAj9HgO9iUTqyMVxB04uWiOOjE9wIw +D0ciU7opj7CqwaoC3EQbLleMoEuK8LLdHj/JfMxO2I9AlAxzT4ksIg/VSErlUEcv -----END CERTIFICATE----- diff --git a/docker/certs/faktory.local.key b/docker/certs/faktory.local.key index 08e1f32c..d8d59515 100644 --- a/docker/certs/faktory.local.key +++ b/docker/certs/faktory.local.key @@ -1,28 +1,6 @@ -----BEGIN PRIVATE KEY----- -MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDh6S2F6pNHL54e -f2PbYDEYeq2D96USJkLRQItfnXKwSSGcsu8WVUW3N3ur6fHU82Id0fmL9ON0Bitc -BmEpT+TZh+uALI6Ob9fFLDu6Nc8/5xY6Z1FhN7FP2jtiuDdlgyeIlttOXyPft2J5 -cdo7sUcC+nDDWr+dtCTAP/VaorWCBBj85nfzClS+zCz0sDuyG5d0aeMBe6edEYTN -nYQbpPmFjDQ8Uv7YIfCvSWo1sK6vzC7323gQP3+98sbRNZFDBKcCnKMfLz2ufll7 -e86I3NwFbvvyhnHC/OuISZtBCDpHEoTX0CVqJXj6flLCgCLwmLr0EpFRKXFGFbp5 -fgAFXZ47AgMBAAECggEAJjyV4G86O1fDbw0HxUdMOAT3nnkJfv9r2sgObwISueS+ -5CtjDUgkkyS4cXoY3P7O0hZKoxYxc19h8mMACgKETQ9U3G5uOIyUnEJm35cg+4Ns -/ziijQ5knAvndkeQ1MU0qUlDWEoBI+oBqGWNVwIj70ydTmtrOFGX0NRiflNA3n7q -pJbdRZzKnTxXxRwIRuGA1y6SlBLQ740hVOm56iLtRJ+P0kNErSL8Uhws/X9/0MXH -W8r2JVikNumBZH18MK+wBGulwZBcLurFfv31hbeQ/FnckOJ1OE53rnV+tBrZN7Ap -6eR4IMcVPfunnGX+meEUnJfmC0HrdQXucDB8Ey/biQKBgQDygP0JeUKpSWX2uSfV -2c8N0opmC2uHswOhf+H9TOyA4DO5NmlbOqVv+uUwRQvIkoen8XNMCPOyoK7WZNAB -hfyU+ck3HDIBqHbGBisUXDNLgIQIhWVznYK0QC+YYr+rEmFun0sMriuhZsU1q2mW -VoAPSTJhaufRb0TKib9Tarzg4wKBgQDue8jk0tbK5xL9dcyn1CxHtDAbfyQfQnSd -G+GcQDDCamgbKI042A5lPSToYEOpSMTOn/n5CmezsSMFnwuwZAgQ1Pbd3YeknBCi -6jWzqYcC11u3EeX9YPJgEDZq0uSWNZg0phDBsu+PYq7vDAriCsMeQrLMvQb0Fs3n -Pp4vVzSEyQKBgQCb+h1G/6jBzAT6WYNmyE6mPFpqYkQKpzjZorCPxO+FwS9jnLzN -Qf5w9TZ/Apoeqyj3+5RGPqfIqBNssLEdmbmpdLRYbxk2+c1Td1o0IU2Y7ZN/C5YC -dDhCidpTMIjJluv2RBz4jfpgOQL1j0g9u2to6ZKvGBz9F41unITkOY49MwKBgEzk -1qqJHL6BcQsOT3WRoNFh1N0YyoHVwJnjooPp4o7dFkIjeh1o9INKCrtuRoKvtt1U -kZnt8+/pXnxygqdWKY+byxlQU2sM8wREdho+wAx3edf2Smy/NIcq0xDwfMm98ByR -qvd5hWp7DCKBhITLqYv5P4NqM3LCY5N7CjADcyiZAoGBALXXR5WSHLjtzaN4Eeti -pWur1VN30HiM2zRTXwTxx6X7y/FI5xzoCVAJb6tSpC/aXzFx05Xa/LyhDXI2sbhm -G3a4tjBRrief5z8XQ7gdBSiyRtLc1XFy3kmeN2HTPMWSIrbk56xyEOqbXov5S+41 -hWwNT3lodEZ2ymFWEZHHAvhb +MIG2AgEAMBAGByqGSM49AgEGBSuBBAAiBIGeMIGbAgEBBDCs4MYYR22MIL1rZitb +nZF25hbh9M1aI4uPdn+Vqzphuk+tjEMAmYbAZcSCCGGoUzKhZANiAAQ1m4EMNrKE +HkTOZF8SsSght8TkiNYKE3OM7gItOsQJ6Y+pwluPOyTwwHAHlxtscYx6DWeqSqh/ +RmSCszixZmxY0jUf8s4Mp1cEioyltTXaISztLd6jlFoaWr0K0Tp1jIA= -----END PRIVATE KEY----- diff --git a/src/lib.rs b/src/lib.rs index badd21af..30ed5c10 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,14 +48,43 @@ //! //! ```no_run //! # tokio_test::block_on(async { -//! use faktory::WorkerBuilder; +//! use async_trait::async_trait; +//! use faktory::{Job, JobRunner, Worker}; //! use std::io; -//! let mut w = WorkerBuilder::default() +//! +//! struct DomainEntity(i32); +//! +//! impl DomainEntity { +//! fn new(buzz: i32) -> Self { +//! DomainEntity(buzz) +//! } +//! } +//! +//! #[async_trait] +//! impl JobRunner for DomainEntity { +//! type Error = io::Error; +//! +//! async fn run(&self, job: Job) -> Result<(), Self::Error> { +//! println!("{:?}, buzz={}", job, self.0); +//! Ok(()) +//! } +//! } +//! +//! let mut w = Worker::builder() +//! .register("fizz", DomainEntity::new(1)) //! .register_fn("foobar", |job| async move { //! println!("{:?}", job); //! Ok::<(), io::Error>(()) //! }) -//! .connect(None).await.unwrap(); +//! .register_blocking_fn("fibo", |job| { +//! std::thread::sleep(std::time::Duration::from_millis(1000)); +//! println!("{:?}", job); +//! Ok::<(), io::Error>(()) +//! }) +//! .with_rustls() // available on `rustls` feature only +//! .connect(None) +//! .await +//! .unwrap(); //! //! if let Err(e) = w.run(&["default"]).await { //! println!("worker failed: {}", e); @@ -77,7 +106,8 @@ mod worker; pub use crate::error::Error; pub use crate::proto::{ - Client, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, Reconnect, ServerSnapshot, WorkerId, + Client, Connection, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, Reconnect, + ServerSnapshot, WorkerId, }; pub use crate::worker::{JobRunner, StopDetails, StopReason, Worker, WorkerBuilder}; diff --git a/src/proto/batch/handle.rs b/src/proto/batch/handle.rs index 5015510d..dcf2407d 100644 --- a/src/proto/batch/handle.rs +++ b/src/proto/batch/handle.rs @@ -1,27 +1,26 @@ use crate::error::Error; use crate::proto::{Batch, BatchId, Client, Job}; -use tokio::io::{AsyncBufRead, AsyncWrite}; /// Represents a newly started or re-opened batch of jobs. -pub struct BatchHandle<'a, S: AsyncWrite + Unpin + Send> { +pub struct BatchHandle<'a> { bid: BatchId, - c: &'a mut Client, + c: &'a mut Client, } -impl<'a, S: AsyncWrite + Unpin + Send> BatchHandle<'a, S> { - pub(crate) fn new(bid: BatchId, c: &mut Client) -> BatchHandle<'_, S> { +impl<'a> BatchHandle<'a> { + pub(crate) fn new(bid: BatchId, c: &mut Client) -> BatchHandle<'_> { BatchHandle { bid, c } } } -impl<'a, S: AsyncWrite + Unpin + Send> BatchHandle<'a, S> { +impl BatchHandle<'_> { /// ID issued by the Faktory server to this batch. pub fn id(&self) -> &BatchId { &self.bid } } -impl<'a, S: AsyncBufRead + AsyncWrite + Unpin + Send> BatchHandle<'a, S> { +impl BatchHandle<'_> { /// Add the given job to the batch. /// /// Should the submitted job - for whatever reason - already have a `bid` key present in its custom hash, @@ -33,7 +32,7 @@ impl<'a, S: AsyncBufRead + AsyncWrite + Unpin + Send> BatchHandle<'a, S> { } /// Initiate a child batch of jobs. - pub async fn start_batch(&mut self, mut batch: Batch) -> Result, Error> { + pub async fn start_batch(&mut self, mut batch: Batch) -> Result, Error> { batch.parent_bid = Some(self.bid.clone()); self.c.start_batch(batch).await } diff --git a/src/proto/batch/status.rs b/src/proto/batch/status.rs index b1c66332..71fad0db 100644 --- a/src/proto/batch/status.rs +++ b/src/proto/batch/status.rs @@ -5,7 +5,6 @@ use super::BatchHandle; use crate::error::Error; use crate::proto::{BatchId, Client}; use chrono::{DateTime, Utc}; -use tokio::io::{AsyncBufRead, AsyncWrite}; // Not documented, but existing de fakto and also mentioned in the official client // https://github.com/contribsys/faktory/blob/main/client/batch.go#L17-L19 @@ -83,10 +82,7 @@ impl<'a> BatchStatus { /// Open the batch for which this `BatchStatus` has been retrieved. /// /// See [`open_batch`](Client::open_batch). - pub async fn open( - &self, - prod: &'a mut Client, - ) -> Result>, Error> { + pub async fn open(&self, prod: &'a mut Client) -> Result>, Error> { prod.open_batch(&self.bid).await } } diff --git a/src/proto/client/conn.rs b/src/proto/client/conn.rs new file mode 100644 index 00000000..cdf2ad18 --- /dev/null +++ b/src/proto/client/conn.rs @@ -0,0 +1,10 @@ +use tokio::io::{AsyncBufRead, AsyncWrite}; + +use crate::Reconnect; + +/// A duplex buffered stream to the Faktory service. +pub trait Connection: AsyncWrite + AsyncBufRead + Unpin + Send + Reconnect {} + +impl Connection for T where T: AsyncWrite + AsyncBufRead + Unpin + Send + Reconnect {} + +pub type BoxedConnection = Box; diff --git a/src/proto/client/ent.rs b/src/proto/client/ent.rs index 2b54269a..6f063a7e 100644 --- a/src/proto/client/ent.rs +++ b/src/proto/client/ent.rs @@ -1,20 +1,25 @@ use super::super::batch::{CommitBatch, GetBatchStatus, OpenBatch}; -use super::super::{single, BatchStatus, JobId, Progress, ProgressUpdate, Track}; +use super::super::{single, BatchStatus, JobId, Progress, ProgressUpdate}; use super::{Client, ReadToken}; use crate::ent::{Batch, BatchHandle, BatchId}; use crate::error::{self, Error}; -use tokio::io::{AsyncBufRead, AsyncWrite}; +use crate::proto::FetchProgress; -impl Client { +impl Client { /// Send information on a job's execution progress to Faktory. - pub async fn set_progress(&mut self, upd: ProgressUpdate) -> Result<(), Error> { - let cmd = Track::Set(upd); - self.issue(&cmd).await?.read_ok().await + pub async fn set_progress

(&mut self, upd: P) -> Result<(), Error> + where + P: AsRef + Sync, + { + self.issue(upd.as_ref()).await?.read_ok().await } /// Fetch information on a job's execution progress from Faktory. - pub async fn get_progress(&mut self, jid: JobId) -> Result, Error> { - let cmd = Track::Get(jid); + pub async fn get_progress(&mut self, jid: J) -> Result, Error> + where + J: AsRef + Sync, + { + let cmd = FetchProgress::new(jid); self.issue(&cmd).await?.read_json().await } @@ -28,7 +33,7 @@ impl Client { } /// Initiate a new batch of jobs. - pub async fn start_batch(&mut self, batch: Batch) -> Result, Error> { + pub async fn start_batch(&mut self, batch: Batch) -> Result, Error> { let bid = self.issue(&batch).await?.read_bid().await?; Ok(BatchHandle::new(bid, self)) } @@ -37,7 +42,7 @@ impl Client { /// /// This will not error if a batch with the provided `bid` does not exist, /// rather `Ok(None)` will be returned. - pub async fn open_batch(&mut self, bid: B) -> Result>, Error> + pub async fn open_batch(&mut self, bid: B) -> Result>, Error> where B: AsRef + Sync, { @@ -53,7 +58,7 @@ impl Client { } } -impl<'a, S: AsyncBufRead + AsyncWrite + Unpin + Send> ReadToken<'a, S> { +impl ReadToken<'_> { pub(crate) async fn read_bid(self) -> Result { single::read_bid(&mut self.0.stream).await } @@ -61,15 +66,13 @@ impl<'a, S: AsyncBufRead + AsyncWrite + Unpin + Send> ReadToken<'a, S> { pub(crate) async fn maybe_bid(self) -> Result, Error> { match single::read_bid(&mut self.0.stream).await { Ok(bid) => Ok(Some(bid)), - Err(err) => match err { - Error::Protocol(error::Protocol::Internal { msg }) => { - if msg.starts_with("No such batch") { - return Ok(None); - } - Err(error::Protocol::Internal { msg }.into()) + Err(Error::Protocol(error::Protocol::Internal { msg })) => { + if msg.starts_with("No such batch") { + return Ok(None); } - another => Err(another), - }, + Err(error::Protocol::Internal { msg }.into()) + } + Err(another) => Err(another), } } } diff --git a/src/proto/client/mod.rs b/src/proto/client/mod.rs index 162c9b5d..35e19caf 100644 --- a/src/proto/client/mod.rs +++ b/src/proto/client/mod.rs @@ -5,17 +5,21 @@ mod ent; #[cfg(doc)] use crate::proto::{BatchStatus, Progress, ProgressUpdate}; -use super::{single, Info, Push, QueueAction, QueueControl, Reconnect}; +use super::{single, Info, Push, QueueAction, QueueControl}; use super::{utils, PushBulk}; use crate::error::{self, Error}; -use crate::{Job, WorkerId}; +use crate::{Job, Reconnect, WorkerId}; use std::collections::HashMap; -use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufStream}; +use tokio::io::{AsyncBufRead, AsyncWrite, BufStream}; use tokio::net::TcpStream as TokioStream; mod options; pub(crate) use options::ClientOptions; +mod conn; +pub(crate) use conn::BoxedConnection; +pub use conn::Connection; + pub(crate) const EXPECTED_PROTOCOL_VERSION: usize = 2; fn check_protocols_match(ver: usize) -> Result<(), Error> { @@ -120,14 +124,14 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> { /// /// ```no_run /// # tokio_test::block_on(async { -/// use faktory::{Client, JobId, ent::ProgressUpdateBuilder}; +/// use faktory::{Client, JobId, ent::ProgressUpdate}; /// let jid = JobId::new("W8qyVle9vXzUWQOf"); /// let mut cl = Client::connect(None).await?; -/// let progress = ProgressUpdateBuilder::new(jid) +/// let progress = ProgressUpdate::builder(jid) /// .desc("Almost done...".to_owned()) /// .percent(99) /// .build(); -/// cl.set_progress(progress).await?; +/// cl.set_progress(&progress).await?; /// # Ok::<(), faktory::Error>(()) /// }); ///```` @@ -145,15 +149,11 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> { /// # Ok::<(), faktory::Error>(()) /// }); /// ``` -pub struct Client { - stream: S, +pub struct Client { + stream: BoxedConnection, opts: ClientOptions, } - -impl Client -where - S: AsyncBufRead + AsyncWrite + Unpin + Send + Reconnect, -{ +impl Client { pub(crate) async fn connect_again(&mut self) -> Result { let s = self.stream.reconnect().await?; Client::new(s, self.opts.clone()).await @@ -165,10 +165,7 @@ where } } -impl Drop for Client -where - S: AsyncWrite + Unpin + Send, -{ +impl Drop for Client { fn drop(&mut self) { tokio::task::block_in_place(|| { tokio::runtime::Handle::current().block_on(async { @@ -186,22 +183,24 @@ pub(crate) enum HeartbeatStatus { Quiet, } -impl Client> { +impl Client { /// Create new [`Client`] and connect to a Faktory server with a non-standard stream. - pub async fn connect_with( - stream: S, - pwd: Option, - ) -> Result>, Error> { - let buffered = BufStream::new(stream); + /// + /// In case you've got a `stream` that doesn't already implement `AsyncBufRead`, you will + /// want to wrap it in `tokio::io::BufStream`. + pub async fn connect_with(stream: S, pwd: Option) -> Result + where + S: AsyncBufRead + AsyncWrite + Reconnect + Send + Sync + Unpin + 'static, + { let opts = ClientOptions { password: pwd, ..Default::default() }; - Client::new(buffered, opts).await + Client::new(Box::new(stream), opts).await } } -impl Client> { +impl Client { /// Create new [`Client`] and connect to a Faktory server. /// /// If `url` is not given, will use the standard Faktory environment variables. Specifically, @@ -213,17 +212,15 @@ impl Client> { /// ```text /// tcp://localhost:7419 /// ``` - pub async fn connect(url: Option<&str>) -> Result>, Error> { + pub async fn connect(url: Option<&str>) -> Result { let url = utils::parse_provided_or_from_env(url)?; let stream = TokioStream::connect(utils::host_from_url(&url)).await?; - Self::connect_with(stream, url.password().map(|p| p.to_string())).await + let buffered_stream = BufStream::new(stream); + Self::connect_with(buffered_stream, url.password().map(|p| p.to_string())).await } } -impl Client -where - S: AsyncBufRead + AsyncWrite + Unpin + Send, -{ +impl Client { async fn init(&mut self) -> Result<(), Error> { let hi = single::read_hi(&mut self.stream).await?; check_protocols_match(hi.version)?; @@ -264,7 +261,7 @@ where Ok(()) } - pub(crate) async fn new(stream: S, opts: ClientOptions) -> Result, Error> { + pub(crate) async fn new(stream: BoxedConnection, opts: ClientOptions) -> Result { let mut c = Client { stream, opts }; c.init().await?; Ok(c) @@ -273,7 +270,7 @@ where pub(crate) async fn issue( &mut self, c: &FC, - ) -> Result, Error> { + ) -> Result, Error> { single::write_command(&mut self.stream, c).await?; Ok(ReadToken(self)) } @@ -328,10 +325,7 @@ where } } -impl Client -where - S: AsyncBufRead + AsyncWrite + Unpin + Send, -{ +impl Client { /// Enqueue the given job on the Faktory server. /// /// Returns `Ok` if the job was successfully queued by the Faktory server. @@ -435,11 +429,9 @@ where } } -pub struct ReadToken<'a, S>(pub(crate) &'a mut Client) -where - S: AsyncWrite + Unpin + Send; +pub struct ReadToken<'a>(pub(crate) &'a mut Client); -impl<'a, S: AsyncBufRead + AsyncWrite + Unpin + Send> ReadToken<'a, S> { +impl ReadToken<'_> { pub(crate) async fn read_ok(self) -> Result<(), Error> { single::read_ok(&mut self.0.stream).await } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 63f479cd..a0304ccb 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -4,8 +4,10 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream as TokioStream; mod client; -pub use client::Client; -pub(crate) use client::{ClientOptions, HeartbeatStatus, EXPECTED_PROTOCOL_VERSION}; +pub(crate) use client::{ + BoxedConnection, ClientOptions, HeartbeatStatus, EXPECTED_PROTOCOL_VERSION, +}; +pub use client::{Client, Connection}; mod single; @@ -16,7 +18,10 @@ pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueContr pub(crate) mod utils; #[cfg(feature = "ent")] -pub use self::single::ent::{JobState, Progress, ProgressUpdate, ProgressUpdateBuilder, Track}; +pub use self::single::ent::{JobState, Progress, ProgressUpdate, ProgressUpdateBuilder}; + +#[cfg(feature = "ent")] +pub(crate) use self::single::ent::FetchProgress; #[cfg(feature = "ent")] pub use self::single::BatchId; @@ -28,16 +33,27 @@ pub use batch::{Batch, BatchBuilder, BatchHandle, BatchStatus, CallbackState}; /// A stream that can be re-established after failing. #[async_trait::async_trait] -pub trait Reconnect: Sized { +pub trait Reconnect { /// Re-establish the stream. - async fn reconnect(&mut self) -> io::Result; + async fn reconnect(&mut self) -> io::Result; +} + +#[async_trait::async_trait] +impl Reconnect for Box +where + S: Reconnect + Send, +{ + async fn reconnect(&mut self) -> io::Result { + (**self).reconnect().await + } } #[async_trait::async_trait] impl Reconnect for TokioStream { - async fn reconnect(&mut self) -> io::Result { + async fn reconnect(&mut self) -> io::Result { let addr = &self.peer_addr().expect("socket address"); - TokioStream::connect(addr).await + let stream = TokioStream::connect(addr).await?; + Ok(Box::new(BufStream::new(stream))) } } @@ -46,8 +62,7 @@ impl Reconnect for BufStream where S: AsyncRead + AsyncWrite + Reconnect + Send + Sync, { - async fn reconnect(&mut self) -> io::Result { - let stream = self.get_mut().reconnect().await?; - Ok(Self::new(stream)) + async fn reconnect(&mut self) -> io::Result { + self.get_mut().reconnect().await } } diff --git a/src/proto/single/ent/cmd.rs b/src/proto/single/ent/cmd.rs index 43881e44..8cfcfe3b 100644 --- a/src/proto/single/ent/cmd.rs +++ b/src/proto/single/ent/cmd.rs @@ -3,27 +3,33 @@ use crate::error::Error; use crate::proto::{single::FaktoryCommand, JobId}; use tokio::io::{AsyncWrite, AsyncWriteExt}; +#[async_trait::async_trait] +impl FaktoryCommand for ProgressUpdate { + async fn issue(&self, w: &mut W) -> Result<(), Error> { + w.write_all(b"TRACK SET ").await?; + let r = serde_json::to_vec(self).map_err(Error::Serialization)?; + w.write_all(&r).await?; + Ok(w.write_all(b"\r\n").await?) + } +} + #[derive(Debug, Clone)] -pub enum Track { - Set(ProgressUpdate), - Get(JobId), +pub(crate) struct FetchProgress(J); + +impl FetchProgress { + pub fn new(j: J) -> Self { + Self(j) + } } #[async_trait::async_trait] -impl FaktoryCommand for Track { +impl FaktoryCommand for FetchProgress +where + J: AsRef + Sync, +{ async fn issue(&self, w: &mut W) -> Result<(), Error> { - match self { - Self::Set(upd) => { - w.write_all(b"TRACK SET ").await?; - let r = serde_json::to_vec(upd).map_err(Error::Serialization)?; - w.write_all(&r).await?; - Ok(w.write_all(b"\r\n").await?) - } - Self::Get(jid) => { - w.write_all(b"TRACK GET ").await?; - w.write_all(jid.as_bytes()).await?; - Ok(w.write_all(b"\r\n").await?) - } - } + w.write_all(b"TRACK GET ").await?; + w.write_all(self.0.as_ref().as_bytes()).await?; + Ok(w.write_all(b"\r\n").await?) } } diff --git a/src/proto/single/ent/mod.rs b/src/proto/single/ent/mod.rs index e6f4f63e..2a017c49 100644 --- a/src/proto/single/ent/mod.rs +++ b/src/proto/single/ent/mod.rs @@ -5,7 +5,7 @@ mod cmd; mod progress; mod utils; -pub use cmd::Track; +pub(crate) use cmd::FetchProgress; pub use progress::{JobState, Progress, ProgressUpdate, ProgressUpdateBuilder}; impl JobBuilder { diff --git a/src/proto/single/ent/progress.rs b/src/proto/single/ent/progress.rs index be227d9b..20f36960 100644 --- a/src/proto/single/ent/progress.rs +++ b/src/proto/single/ent/progress.rs @@ -38,6 +38,12 @@ pub struct ProgressUpdate { pub reserve_until: Option>, } +impl AsRef for ProgressUpdate { + fn as_ref(&self) -> &Self { + self + } +} + impl ProgressUpdate { /// Create an instance of `ProgressUpdate` for the job with this ID specifying its completion percentage. pub fn set(jid: JobId, percent: u8) -> ProgressUpdate { diff --git a/src/tls/native_tls.rs b/src/tls/native_tls.rs index ae5c903b..45cc0bf1 100644 --- a/src/tls/native_tls.rs +++ b/src/tls/native_tls.rs @@ -2,11 +2,11 @@ use crate::{Client, WorkerBuilder}; use crate::error::{self, Error}; -use crate::proto::utils; +use crate::proto::{self, utils}; use crate::Reconnect; use std::io; use std::ops::{Deref, DerefMut}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, BufStream}; use tokio::net::TcpStream as TokioTcpStream; use tokio_native_tls::TlsStream as NativeTlsStream; use tokio_native_tls::{native_tls::TlsConnector, TlsConnector as AsyncTlsConnector}; @@ -22,8 +22,11 @@ use tokio_native_tls::{native_tls::TlsConnector, TlsConnector as AsyncTlsConnect /// # tokio_test::block_on(async { /// use faktory::Client; /// use faktory::native_tls::TlsStream; -/// let tls = TlsStream::connect(None).await.unwrap(); -/// let cl = Client::connect_with(tls, None).await.unwrap(); +/// use tokio::io::BufStream; +/// +/// let stream = TlsStream::connect(None).await.unwrap(); +/// let buffered = BufStream::new(stream); +/// let cl = Client::connect_with(buffered, None).await.unwrap(); /// # drop(cl); /// # }); /// ``` @@ -111,19 +114,46 @@ where } #[async_trait::async_trait] -impl Reconnect for TlsStream -where - S: AsyncRead + AsyncWrite + Send + Unpin + Reconnect, -{ - async fn reconnect(&mut self) -> io::Result { +impl Reconnect for BufStream> { + async fn reconnect(&mut self) -> io::Result { + let stream = self + .get_mut() + .stream + .get_mut() + .get_mut() + .get_mut() + .reconnect() + .await?; + let res = TlsStream::new( + stream, + self.get_ref().connector.clone(), + self.get_ref().hostname.clone(), + ) + .await?; + let buffered = BufStream::new(res); + Ok(Box::new(buffered)) + } +} + +#[async_trait::async_trait] +impl Reconnect for BufStream> { + async fn reconnect(&mut self) -> io::Result { let stream = self + .get_mut() .stream .get_mut() .get_mut() .get_mut() .reconnect() .await?; - Self::new(stream, self.connector.clone(), self.hostname.clone()).await + let res = TlsStream::new( + stream, + self.get_ref().connector.clone(), + self.get_ref().hostname.clone(), + ) + .await?; + let buffered = BufStream::new(res); + Ok(Box::new(buffered)) } } diff --git a/src/tls/rustls.rs b/src/tls/rustls.rs index a8a59cb8..ac596a64 100644 --- a/src/tls/rustls.rs +++ b/src/tls/rustls.rs @@ -1,11 +1,12 @@ #[cfg(doc)] use crate::{Client, WorkerBuilder}; -use crate::{proto::utils, Error, Reconnect}; +use crate::proto::{self, utils}; +use crate::{Error, Reconnect}; use std::io; use std::ops::{Deref, DerefMut}; use std::sync::Arc; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, BufStream}; use tokio::net::TcpStream as TokioTcpStream; use tokio_rustls::client::TlsStream as RustlsStream; use tokio_rustls::rustls::{ClientConfig, RootCertStore}; @@ -22,8 +23,11 @@ use tokio_rustls::TlsConnector; /// # tokio_test::block_on(async { /// use faktory::Client; /// use faktory::rustls::TlsStream; -/// let tls = TlsStream::connect(None).await.unwrap(); -/// let cl = Client::connect_with(tls, None).await.unwrap(); +/// use tokio::io::BufStream; +/// +/// let stream = TlsStream::connect(None).await.unwrap(); +/// let buffered = BufStream::new(stream); +/// let cl = Client::connect_with(buffered, None).await.unwrap(); /// # drop(cl); /// # }); /// ``` @@ -51,8 +55,8 @@ impl TlsStream { /// /// If `url` is given, but does not specify a port, it defaults to 7419. /// - /// Internally creates a `ClientConfig` with an empty root certificates store and no client - /// authentication. Use [`with_client_config`](TlsStream::with_client_config) + /// Internally creates a `ClientConfig` with an _empty_ root certificates store and _no client + /// authentication_. Use [`with_client_config`](TlsStream::with_client_config) /// or [`with_connector`](TlsStream::with_connector) for customized /// `ClientConfig` and `TlsConnector` accordingly. pub async fn connect(url: Option<&str>) -> Result { @@ -63,6 +67,21 @@ impl TlsStream { TlsStream::with_connector(con, url).await } + /// Create a new TLS connection over TCP using native certificates. + /// + /// Unlike [`TlsStream::connect`], creates a root certificates store populated + /// with the certificates loaded from a platform-native certificate store. + pub async fn connect_with_native_certs(url: Option<&str>) -> Result { + let mut store = RootCertStore::empty(); + for cert in rustls_native_certs::load_native_certs()? { + store.add(cert).map_err(io::Error::other)?; + } + let config = ClientConfig::builder() + .with_root_certificates(store) + .with_no_client_auth(); + TlsStream::with_connector(TlsConnector::from(Arc::new(config)), url).await + } + /// Create a new TLS connection over TCP using a non-default TLS configuration. /// /// See `connect` for details about the `url` parameter. @@ -93,7 +112,9 @@ where /// Create a new TLS connection on an existing stream. /// /// Internally creates a `ClientConfig` with an empty root certificates store and no client - /// authentication. Use [`new`](TlsStream::new) for a customized `TlsConnector`. + /// authentication. + /// + /// Use [`new`](TlsStream::new) for a customized `TlsConnector`. pub async fn default(stream: S, hostname: String) -> io::Result { let conf = ClientConfig::builder() .with_root_certificates(RootCertStore::empty()) @@ -121,13 +142,32 @@ where } #[async_trait::async_trait] -impl Reconnect for TlsStream -where - S: AsyncRead + AsyncWrite + Send + Unpin + Reconnect, -{ - async fn reconnect(&mut self) -> io::Result { - let stream = self.stream.get_mut().0.reconnect().await?; - TlsStream::new(stream, self.connector.clone(), self.hostname.clone()).await +impl Reconnect for BufStream> { + async fn reconnect(&mut self) -> io::Result { + let stream = self.get_mut().stream.get_mut().0.reconnect().await?; + let tls_stream = TlsStream::new( + stream, + self.get_ref().connector.clone(), + self.get_ref().hostname.clone(), + ) + .await?; + let buffered = BufStream::new(tls_stream); + Ok(Box::new(buffered)) + } +} + +#[async_trait::async_trait] +impl Reconnect for BufStream> { + async fn reconnect(&mut self) -> io::Result { + let stream = self.get_mut().stream.get_mut().0.reconnect().await?; + let tls_stream = TlsStream::new( + stream, + self.get_ref().connector.clone(), + self.get_ref().hostname.clone(), + ) + .await?; + let buffered = BufStream::new(tls_stream); + Ok(Box::new(buffered)) } } diff --git a/src/worker/builder.rs b/src/worker/builder.rs index ea445306..7ff18767 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -1,14 +1,26 @@ use super::{runner::Closure, CallbacksRegistry, Client, ShutdownSignal, Worker}; use crate::{ proto::{utils, ClientOptions}, - Error, Job, JobRunner, WorkerId, + Error, Job, JobRunner, Reconnect, WorkerId, }; use std::future::Future; -use std::sync::Arc; use std::time::Duration; -use tokio::io::{AsyncRead, AsyncWrite, BufStream}; +use tokio::io::{AsyncBufRead, AsyncWrite, BufStream}; use tokio::net::TcpStream as TokioStream; +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum TlsKind { + None, + + #[cfg(feature = "native_tls")] + #[cfg_attr(docsrs, doc(cfg(feature = "native_tls")))] + Native, + + #[cfg(feature = "rustls")] + #[cfg_attr(docsrs, doc(cfg(feature = "rustls")))] + Rust, +} + /// Convenience wrapper for building a Faktory worker. /// /// See the [`Worker`] documentation for details. @@ -18,6 +30,7 @@ pub struct WorkerBuilder { callbacks: CallbacksRegistry, shutdown_timeout: Option, shutdown_signal: Option, + tls_kind: TlsKind, } impl Default for WorkerBuilder { @@ -38,6 +51,7 @@ impl Default for WorkerBuilder { callbacks: CallbacksRegistry::default(), shutdown_timeout: None, shutdown_signal: None, + tls_kind: TlsKind::None, } } } @@ -210,7 +224,7 @@ impl WorkerBuilder { H: Fn(Job) -> Result<(), E> + Send + Sync + 'static, { self.callbacks - .insert(kind.into(), super::Callback::Sync(Arc::new(handler))); + .insert(kind.into(), super::Callback::Sync(Box::new(handler))); self } @@ -231,24 +245,62 @@ impl WorkerBuilder { self } + /// Make the traffic between this worker and Faktory encrypted with native TLS. + /// + /// The underlying crate (`native-tls`) will use _SChannel_ on Windows, + /// _SecureTransport_ on OSX, and _OpenSSL_ on other platforms. + /// + /// Internally, will use [`TlsStream::connect`](crate::native_tls::TlsStream::connect) to establish + /// a TLS stream to the Faktory server. + /// + /// Note that if you use this method on the builder, but eventually use [`WorkerBuilder::connect_with`] + /// (rather than [`WorkerBuilder::connect`]) to create an instance of [`Worker`], this worker + /// will be connected to the Faktory server with the stream you've provided to `connect_with`. + #[cfg(feature = "native_tls")] + #[cfg_attr(docsrs, doc(cfg(feature = "native_tls")))] + pub fn with_native_tls(mut self) -> Self { + self.tls_kind = TlsKind::Native; + self + } + + /// Make the traffic between this worker and Faktory encrypted with [`rustls`](https://github.com/rustls/rustls). + /// + /// Internally, will use [`TlsStream::connect_with_native_certs`](crate::rustls::TlsStream::connect_with_native_certs) + /// to establish a TLS stream to the Faktory server. + /// + /// Note that if you use this method on the builder, but eventually use [`WorkerBuilder::connect_with`] + /// (rather than [`WorkerBuilder::connect`]) to create an instance of [`Worker`], this worker + /// will be connected to the Faktory server with the stream you've provided to `connect_with`. + #[cfg(feature = "rustls")] + #[cfg_attr(docsrs, doc(cfg(feature = "rustls")))] + pub fn with_rustls(mut self) -> Self { + self.tls_kind = TlsKind::Rust; + self + } + /// Connect to a Faktory server with a non-standard stream. - pub async fn connect_with( + /// + /// In case you've got a `stream` that doesn't already implement `AsyncBufRead`, you will + /// want to wrap it in `tokio::io::BufStream`. + pub async fn connect_with( mut self, stream: S, pwd: Option, - ) -> Result, E>, Error> { + ) -> Result, Error> + where + S: AsyncBufRead + AsyncWrite + Reconnect + Send + Sync + Unpin + 'static, + { self.opts.password = pwd; self.opts.is_worker = true; - let buffered = BufStream::new(stream); - let client = Client::new(buffered, self.opts).await?; - Ok(Worker::new( + let client = Client::new(Box::new(stream), self.opts).await?; + let worker = Worker::new( client, self.workers_count, self.callbacks, self.shutdown_timeout, self.shutdown_signal, - ) - .await) + ); + Ok(worker) } /// Connect to a Faktory server. @@ -264,13 +316,28 @@ impl WorkerBuilder { /// ``` /// /// If `url` is given, but does not specify a port, it defaults to 7419. - pub async fn connect( - self, - url: Option<&str>, - ) -> Result, E>, Error> { - let url = utils::parse_provided_or_from_env(url)?; - let stream = TokioStream::connect(utils::host_from_url(&url)).await?; - self.connect_with(stream, url.password().map(|p| p.to_string())) - .await + pub async fn connect(self, url: Option<&str>) -> Result, Error> { + let parsed_url = utils::parse_provided_or_from_env(url)?; + let password = parsed_url.password().map(|p| p.to_string()); + match self.tls_kind { + TlsKind::None => { + let addr = utils::host_from_url(&parsed_url); + let stream = TokioStream::connect(addr).await?; + let buffered = BufStream::new(stream); + self.connect_with(buffered, password).await + } + #[cfg(feature = "rustls")] + TlsKind::Rust => { + let stream = crate::rustls::TlsStream::connect_with_native_certs(url).await?; + let buffered = BufStream::new(stream); + self.connect_with(buffered, password).await + } + #[cfg(feature = "native_tls")] + TlsKind::Native => { + let stream = crate::native_tls::TlsStream::connect(url).await?; + let buffered = BufStream::new(stream); + self.connect_with(buffered, password).await + } + } } } diff --git a/src/worker/health.rs b/src/worker/health.rs index bdb446df..e7984258 100644 --- a/src/worker/health.rs +++ b/src/worker/health.rs @@ -5,15 +5,13 @@ use std::{ sync::{atomic, Arc}, time::{self, Duration}, }; -use tokio::io::{AsyncBufRead, AsyncWrite}; use tokio::time::sleep as tokio_sleep; const CHECK_STATE_INTERVAL: Duration = Duration::from_millis(100); const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); -impl Worker +impl Worker where - S: AsyncBufRead + AsyncWrite + Send + Unpin, E: StdError, { /// Send beats to Fakotry and quiet/terminate workers if signalled so. diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 80efe0c4..0f734792 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -1,4 +1,4 @@ -use super::proto::{Client, Reconnect}; +use super::proto::Client; use crate::error::Error; use crate::proto::{Ack, Fail, Job}; use fnv::FnvHashMap; @@ -8,9 +8,7 @@ use std::process; use std::sync::{atomic, Arc}; use std::time::Duration; use std::{error::Error as StdError, sync::atomic::AtomicUsize}; -use tokio::io::{AsyncBufRead, AsyncWrite}; -use tokio::net::TcpStream; -use tokio::task::{spawn_blocking, AbortHandle, JoinError, JoinSet}; +use tokio::task::{spawn, spawn_blocking, AbortHandle, JoinError, JoinSet}; use tokio::time::sleep as tokio_sleep; mod builder; @@ -31,7 +29,7 @@ type ShutdownSignal = Pin + 'static + Send>>; pub(crate) enum Callback { Async(runner::BoxedJobRunner), - Sync(Arc Result<(), E> + Sync + Send + 'static>), + Sync(Box Result<(), E> + Sync + Send + 'static>), } type CallbacksRegistry = FnvHashMap>; @@ -131,8 +129,15 @@ type CallbacksRegistry = FnvHashMap>; /// .await /// .unwrap(); /// -/// if let Err(e) = w.run(&["default"]).await { -/// println!("worker failed: {}", e); +/// match w.run(&["default"]).await { +/// Err(e) => println!("worker failed: {}", e), +/// Ok(stop_details) => { +/// println!( +/// "Stop reason: {}, number of workers that were running: {}", +/// stop_details.reason, +/// stop_details.workers_still_running +/// ); +/// } /// } /// # }); /// ``` @@ -157,8 +162,8 @@ type CallbacksRegistry = FnvHashMap>; /// You can also register anything that implements [`JobRunner`] to handle jobs /// with [`register`](WorkerBuilder::register). /// -pub struct Worker { - c: Client, +pub struct Worker { + c: Client, worker_states: Arc, callbacks: Arc>, terminated: bool, @@ -170,7 +175,7 @@ pub struct Worker { shutdown_signal: Option, } -impl Worker { +impl Worker<()> { /// Creates an ergonomic constructor for a new [`Worker`]. /// /// Also equivalent to [`WorkerBuilder::default`]. @@ -179,15 +184,15 @@ impl Worker { } } -impl Worker { +impl Worker { async fn reconnect(&mut self) -> Result<(), Error> { self.c.reconnect().await } } -impl Worker { - async fn new( - c: Client, +impl Worker { + fn new( + c: Client, workers_count: usize, callbacks: CallbacksRegistry, shutdown_timeout: Option, @@ -213,21 +218,41 @@ enum Failed { BadJobType(String), } -impl Worker { +impl Worker { async fn run_job(&mut self, job: Job) -> Result<(), Failed> { let handler = self .callbacks - .get(job.kind()) + .get(&job.kind) .ok_or(Failed::BadJobType(job.kind().to_string()))?; - match handler { - Callback::Async(cb) => cb.run(job).await.map_err(Failed::Application), - Callback::Sync(cb) => { - let cb = Arc::clone(cb); - match spawn_blocking(move || cb(job)).await { - Err(join_error) => Err(Failed::HandlerPanic(join_error)), - Ok(processing_result) => processing_result.map_err(Failed::Application), - } + let spawning_result = match handler { + Callback::Async(_) => { + let callbacks = self.callbacks.clone(); + let processing_task = async move { + let callback = callbacks.get(&job.kind).unwrap(); + if let Callback::Async(cb) = callback { + cb.run(job).await + } else { + unreachable!() + } + }; + spawn(processing_task).await } + Callback::Sync(_) => { + let callbacks = self.callbacks.clone(); + let processing_task = move || { + let callback = callbacks.get(&job.kind).unwrap(); + if let Callback::Sync(cb) = callback { + cb(job) + } else { + unreachable!() + } + }; + spawn_blocking(processing_task).await + } + }; + match spawning_result { + Err(join_error) => Err(Failed::HandlerPanic(join_error)), + Ok(processing_result) => processing_result.map_err(Failed::Application), } } @@ -338,11 +363,7 @@ impl } } -impl< - S: AsyncBufRead + AsyncWrite + Reconnect + Send + Unpin + 'static, - E: StdError + 'static + Send, - > Worker -{ +impl Worker { async fn for_worker(&mut self) -> Result { Ok(Worker { // We actually only need: diff --git a/src/worker/stop.rs b/src/worker/stop.rs index 228a214f..ce84770a 100644 --- a/src/worker/stop.rs +++ b/src/worker/stop.rs @@ -1,3 +1,5 @@ +use std::fmt::{Debug, Display}; + #[cfg(doc)] use super::{Worker, WorkerBuilder}; @@ -21,6 +23,12 @@ pub enum StopReason { ServerInstruction, } +impl Display for StopReason { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Debug::fmt(&self, f) + } +} + #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] /// Holds some details aroung a worker's run stoppage, such as the reason why this worker discontinued /// and the number of workers that might still be processing jobs at that instant. diff --git a/tests/consumer.rs b/tests/consumer.rs index d1824962..2d9037bf 100644 --- a/tests/consumer.rs +++ b/tests/consumer.rs @@ -58,13 +58,14 @@ mod mock; use faktory::*; use std::{io, sync::Arc, time::Duration}; +use tokio::io::BufStream; use tokio::{spawn, sync::Mutex, time::sleep}; use tokio_util::sync::CancellationToken; #[tokio::test(flavor = "multi_thread")] async fn hello() { let mut s = mock::Stream::default(); - let w: Worker<_, io::Error> = WorkerBuilder::default() + let w: Worker = WorkerBuilder::default() .hostname("host".to_string()) .wid(WorkerId::new("wid")) .labels([ @@ -76,7 +77,7 @@ async fn hello() { .add_to_labels(["will".to_string()]) .add_to_labels(["be".to_string(), "added".to_string()]) .register_fn("never_called", |_j: Job| async move { unreachable!() }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); let written = s.pop_bytes_written(0); @@ -101,9 +102,9 @@ async fn hello() { #[tokio::test(flavor = "multi_thread")] async fn hello_pwd() { let mut s = mock::Stream::with_salt(1545, "55104dc76695721d"); - let w: Worker<_, io::Error> = WorkerBuilder::default() + let w: Worker = WorkerBuilder::default() .register_fn("never_called", |_j: Job| async move { unreachable!() }) - .connect_with(s.clone(), Some("foobar".to_string())) + .connect_with(BufStream::new(s.clone()), Some("foobar".to_string())) .await .unwrap(); let written = s.pop_bytes_written(0); @@ -125,7 +126,7 @@ async fn dequeue() { assert_eq!(job.args(), &["z"]); Ok::<(), io::Error>(()) }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); s.ignore(0); @@ -166,7 +167,7 @@ async fn dequeue_first_empty() { assert_eq!(job.args(), &["z"]); Ok::<(), io::Error>(()) }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); s.ignore(0); @@ -225,7 +226,7 @@ async fn well_behaved() { sleep(Duration::from_secs(7)).await; Ok::<(), io::Error>(()) }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); s.ignore(0); @@ -294,7 +295,7 @@ async fn no_first_job() { sleep(Duration::from_secs(7)).await; Ok::<(), io::Error>(()) }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); s.ignore(0); @@ -365,7 +366,7 @@ async fn well_behaved_many() { sleep(Duration::from_secs(7)).await; Ok::<(), io::Error>(()) }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); s.ignore(0); @@ -442,7 +443,7 @@ async fn terminate() { let mut s = mock::Stream::new(2); // main plus worker // prepare a worker with only never (!) returning handler - let mut w: Worker<_, io::Error> = WorkerBuilder::default() + let mut w: Worker = WorkerBuilder::default() .hostname("machine".into()) .wid(WorkerId::new("wid")) .register_fn("foobar", |_| async move { @@ -450,7 +451,7 @@ async fn terminate() { sleep(Duration::from_secs(5)).await; } }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); @@ -557,7 +558,7 @@ async fn heart_broken() { let signal = async move { child_token.cancelled().await }; // prepare a worker without any handlers - let w: Worker<_, io::Error> = Worker::builder() + let w: Worker = Worker::builder() .with_graceful_shutdown(signal) .shutdown_timeout(Duration::from_millis(500)) .register_fn("foobar", |_j| async move { @@ -568,7 +569,7 @@ async fn heart_broken() { sleep(Duration::from_secs(7)).await; Ok(()) }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); diff --git a/tests/mock/mod.rs b/tests/mock/mod.rs index e1a940d4..547dec94 100644 --- a/tests/mock/mod.rs +++ b/tests/mock/mod.rs @@ -4,7 +4,7 @@ use std::{ pin::Pin, sync::{Arc, Mutex}, }; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, BufStream}; mod inner; @@ -23,18 +23,18 @@ impl Default for Stream { #[async_trait::async_trait] impl Reconnect for Stream { - async fn reconnect(&mut self) -> Result { + async fn reconnect(&mut self) -> io::Result> { let mine = self .all .lock() .unwrap() .take_stream() .expect("tried to make a new stream, but no more connections expected"); - Ok(Stream { + Ok(Box::new(BufStream::new(Stream { mine, all: Arc::clone(&self.all), check_count: self.check_count, - }) + }))) } } diff --git a/tests/producer.rs b/tests/producer.rs index ee50d815..e7308843 100644 --- a/tests/producer.rs +++ b/tests/producer.rs @@ -1,12 +1,13 @@ mod mock; use faktory::*; +use tokio::io::BufStream; #[tokio::test(flavor = "multi_thread")] async fn hello() { let mut s = mock::Stream::default(); - - let p = Client::connect_with(s.clone(), None).await.unwrap(); + let buffered = BufStream::new(s.clone()); + let p = Client::connect_with(buffered, None).await.unwrap(); let written = s.pop_bytes_written(0); assert!(written.starts_with(b"HELLO {")); let written: serde_json::Value = serde_json::from_slice(&written[b"HELLO ".len()..]).unwrap(); @@ -25,8 +26,8 @@ async fn hello() { #[tokio::test(flavor = "multi_thread")] async fn hello_pwd() { let mut s = mock::Stream::with_salt(1545, "55104dc76695721d"); - - let c = Client::connect_with(s.clone(), Some("foobar".to_string())) + let buffered = BufStream::new(s.clone()); + let c = Client::connect_with(buffered, Some("foobar".to_string())) .await .unwrap(); let written = s.pop_bytes_written(0); @@ -44,7 +45,8 @@ async fn hello_pwd() { #[tokio::test(flavor = "multi_thread")] async fn enqueue() { let mut s = mock::Stream::default(); - let mut p = Client::connect_with(s.clone(), None).await.unwrap(); + let buffered = BufStream::new(s.clone()); + let mut p = Client::connect_with(buffered, None).await.unwrap(); s.ignore(0); s.ok(0); @@ -85,7 +87,8 @@ async fn enqueue() { #[tokio::test(flavor = "multi_thread")] async fn queue_control() { let mut s = mock::Stream::default(); - let mut p = Client::connect_with(s.clone(), None).await.unwrap(); + let buffered = BufStream::new(s.clone()); + let mut p = Client::connect_with(buffered, None).await.unwrap(); s.ignore(0); s.ok(0); diff --git a/tests/real/community.rs b/tests/real/community.rs index e1d14457..5eb25063 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -729,22 +729,32 @@ async fn test_panic_in_handler() { let local = "test_panic_in_handler"; let mut w = Worker::builder::() - .register_blocking_fn("panic", |_j| { - panic!("Panic inside the handler..."); + .register_blocking_fn("panic_SYNC_handler", |_j| { + panic!("Panic inside sync the handler..."); + }) + .register_fn("panic_ASYNC_handler", |_j| async move { + panic!("Panic inside async handler..."); }) .connect(None) .await .unwrap(); - Client::connect(None) - .await - .unwrap() - .enqueue(Job::builder("panic").queue(local).build()) + let mut c = Client::connect(None).await.unwrap(); + + c.enqueue(Job::builder("panic_SYNC_handler").queue(local).build()) .await .unwrap(); // we _did_ consume and process the job, the processing result itself though // was a failure; however, a panic in the handler was "intercepted" and communicated - // to the Faktory server via the FAIL command + // to the Faktory server via the FAIL command; + // note how the test run is not interrupted with a panic + assert!(w.run_one(0, &[local]).await.unwrap()); + + c.enqueue(Job::builder("panic_ASYNC_handler").queue(local).build()) + .await + .unwrap(); + + // same for async handler, note how the test run is not interrupted with a panic assert!(w.run_one(0, &[local]).await.unwrap()); } diff --git a/tests/real/enterprise.rs b/tests/real/enterprise.rs index 94fe13b8..1aaf2995 100644 --- a/tests/real/enterprise.rs +++ b/tests/real/enterprise.rs @@ -456,7 +456,7 @@ async fn test_tracker_can_send_and_retrieve_job_execution_progress() { .is_ok()); // Let's update the progress once again, to check the 'set_progress' shortcut: assert!(t - .set_progress(ProgressUpdate::set(job_id.clone(), 33)) + .set_progress(&ProgressUpdate::set(job_id.clone(), 33)) .await .is_ok()); @@ -465,7 +465,7 @@ async fn test_tracker_can_send_and_retrieve_job_execution_progress() { // ... and read the progress info let result = t - .get_progress(job_id.clone()) + .get_progress(&job_id) .await .expect("Retrieved progress update over the wire"); @@ -490,7 +490,7 @@ async fn test_tracker_can_send_and_retrieve_job_execution_progress() { let progress = t .lock() .expect("lock acquired successfully") - .get_progress(job_id.clone()) + .get_progress(&job_id) .await .expect("Retrieved progress update over the wire once again") .expect("Some progress"); @@ -510,12 +510,12 @@ async fn test_tracker_can_send_and_retrieve_job_execution_progress() { .desc("Final stage.".to_string()) .percent(99) .build(); - assert!(t.lock().unwrap().set_progress(upd).await.is_ok()); + assert!(t.lock().unwrap().set_progress(&upd).await.is_ok()); let progress = t .lock() .unwrap() - .get_progress(job_id) + .get_progress(&job_id) .await .expect("Retrieved progress update over the wire once again") .expect("Some progress"); @@ -523,7 +523,7 @@ async fn test_tracker_can_send_and_retrieve_job_execution_progress() { if progress.percent != Some(100) { let upd = progress.update_percent(100); assert_eq!(upd.desc, progress.desc); - assert!(t.lock().unwrap().set_progress(upd).await.is_ok()) + assert!(t.lock().unwrap().set_progress(&upd).await.is_ok()) } // What about 'ordinary' job ? @@ -538,7 +538,7 @@ async fn test_tracker_can_send_and_retrieve_job_execution_progress() { let progress = t .lock() .expect("lock acquired") - .get_progress(job_id.clone()) + .get_progress(&job_id) .await .expect("Retrieved progress update over the wire once again") .expect("Some progress"); diff --git a/tests/tls/native_tls.rs b/tests/tls/native_tls.rs index 4f4c5c20..4b1a0963 100644 --- a/tests/tls/native_tls.rs +++ b/tests/tls/native_tls.rs @@ -1,7 +1,8 @@ use faktory::native_tls::TlsStream; -use faktory::{Client, Job, WorkerBuilder, WorkerId}; +use faktory::{Client, Job, Worker, WorkerId}; use serde_json::Value; use std::{env, sync}; +use tokio::io::BufStream; use url::Url; #[tokio::test(flavor = "multi_thread")] @@ -27,9 +28,11 @@ async fn roundtrip_tls() { .danger_accept_invalid_certs(true) .build() .unwrap(); - TlsStream::with_connector(connector, Some(&env::var("FAKTORY_URL_SECURE").unwrap())) - .await - .unwrap() + let stream = + TlsStream::with_connector(connector, Some(&env::var("FAKTORY_URL_SECURE").unwrap())) + .await + .unwrap(); + BufStream::new(stream) }; let password = Url::parse(&env::var("FAKTORY_URL_SECURE").expect("faktory url to be set...")) @@ -37,7 +40,7 @@ async fn roundtrip_tls() { .password() .map(|p| p.to_string()); - let mut worker = WorkerBuilder::default() + let mut worker = Worker::builder() .hostname("tester".to_string()) .wid(WorkerId::new(local)) .register(local, fixtures::JobHandler::new(tx)) diff --git a/tests/tls/rustls.rs b/tests/tls/rustls.rs index 093358a0..9119afbc 100644 --- a/tests/tls/rustls.rs +++ b/tests/tls/rustls.rs @@ -1,10 +1,11 @@ use faktory::rustls::TlsStream; -use faktory::{Client, Job, WorkerBuilder, WorkerId}; +use faktory::{Client, Job, Worker, WorkerId}; use serde_json::Value; use std::{ env, sync::{self, Arc}, }; +use tokio::io::BufStream; use tokio_rustls::rustls::{ClientConfig, SignatureScheme}; use url::Url; @@ -27,7 +28,7 @@ async fn roundtrip_tls() { let tls = || async { let verifier = fixtures::TestServerCertVerifier::new( - SignatureScheme::RSA_PSS_SHA512, + SignatureScheme::ECDSA_NISTP384_SHA384, env::current_dir() .unwrap() .join("docker") @@ -39,12 +40,13 @@ async fn roundtrip_tls() { .with_custom_certificate_verifier(Arc::new(verifier)) .with_no_client_auth(); - TlsStream::with_client_config( + let stream = TlsStream::with_client_config( client_config, Some(&env::var("FAKTORY_URL_SECURE").unwrap()), ) .await - .unwrap() + .unwrap(); + BufStream::new(stream) }; let password = Url::parse(&env::var("FAKTORY_URL_SECURE").expect("faktory url to be set...")) @@ -52,7 +54,7 @@ async fn roundtrip_tls() { .password() .map(|p| p.to_string()); - let mut worker = WorkerBuilder::default() + let mut worker = Worker::builder() .hostname("tester".to_string()) .wid(WorkerId::new(local)) .register(local, fixtures::JobHandler::new(tx))