Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(meta): add worker to cluster after able to resolve worker dns (#15748) #15775

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"time",
"signal",
] }
tokio-retry = "0.3"
toml = "0.8"
tracing = "0.1"
tracing-futures = { version = "0.2", features = ["futures-03"] }
Expand Down
33 changes: 32 additions & 1 deletion src/common/src/util/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::net::SocketAddr;
use std::net::{SocketAddr, ToSocketAddrs};
use std::str::FromStr;
use std::time::Duration;

use anyhow::Context;
use risingwave_pb::common::PbHostAddress;
use thiserror_ext::AsReport;
use tokio::time::sleep;
use tokio_retry::strategy::ExponentialBackoff;
use tracing::error;

/// General host address and port.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -90,6 +95,32 @@ pub fn is_local_address(server_addr: &HostAddr, peer_addr: &HostAddr) -> bool {
server_addr == peer_addr
}

pub async fn try_resolve_dns(host: &str, port: i32) -> Result<SocketAddr, String> {
let addr = format!("{}:{}", host, port);
let mut backoff = ExponentialBackoff::from_millis(100)
.max_delay(Duration::from_secs(3))
.factor(5);
const MAX_RETRY: usize = 20;
for i in 1..=MAX_RETRY {
let err = match addr.to_socket_addrs() {
Ok(mut addr_iter) => {
if let Some(addr) = addr_iter.next() {
return Ok(addr);
} else {
format!("{} resolved to no addr", addr)
}
}
Err(e) => e.to_report_string(),
};
// It may happen that the dns information of newly registered worker node
// has not been propagated to the meta node and cause error. Wait for a while and retry
let delay = backoff.next().unwrap();
error!(attempt = i, backoff_delay = ?delay, err, addr, "fail to resolve worker node address");
sleep(delay).await;
}
Err(format!("failed to resolve dns: {}", addr))
}

#[cfg(test)]
mod tests {
use crate::util::addr::{is_local_address, HostAddr};
Expand Down
13 changes: 12 additions & 1 deletion src/meta/service/src/cluster_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use risingwave_meta::manager::MetadataManager;
use risingwave_meta_model_v2::WorkerId;
use risingwave_pb::common::worker_node::State;
use risingwave_pb::common::HostAddress;
use risingwave_pb::meta::cluster_service_server::ClusterService;
use risingwave_pb::meta::{
ActivateWorkerNodeRequest, ActivateWorkerNodeResponse, AddWorkerNodeRequest,
Expand Down Expand Up @@ -46,7 +47,17 @@ impl ClusterService for ClusterServiceImpl {
) -> Result<Response<AddWorkerNodeResponse>, Status> {
let req = request.into_inner();
let worker_type = req.get_worker_type()?;
let host = req.get_host()?.clone();
let host: HostAddress = req.get_host()?.clone();
#[cfg(not(madsim))]
{
use risingwave_common::util::addr::try_resolve_dns;
use tracing::{error, info};
let socket_addr = try_resolve_dns(&host.host, host.port).await.map_err(|e| {
error!(e);
Status::internal(e)
})?;
info!(?socket_addr, ?host, "resolve host addr");
}
let property = req
.property
.ok_or_else(|| MetaError::invalid_parameter("worker node property is not provided"))?;
Expand Down
Loading