Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utilize UnixListener for the worker process to accept reports. #26

Merged
merged 2 commits into from Oct 25, 2022

Conversation

phanalpha
Copy link
Contributor

No description provided.

@wu-sheng
Copy link
Member

Please correct the title of PR

@wu-sheng wu-sheng requested a review from jmjoy October 23, 2022 01:22
@wu-sheng wu-sheng added this to the 0.2.0 milestone Oct 23, 2022
@wu-sheng wu-sheng added the enhancement New feature or request label Oct 23, 2022
@phanalpha phanalpha changed the title (Unix) listener worker, a. Utilize UnixListener for the worker process to accept reports. Oct 23, 2022
src/module.rs Outdated Show resolved Hide resolved
Comment on lines +64 to +70
let stream = self
.stream
.get_or_try_init(|| UnixStream::connect(&self.worker_addr).map(Mutex::new))?
.lock()
.map_err(|_| anyhow!("Get Lock failed"))?
.take()
.context("The RECEIVER has been taked")?;
let receiver =
UnixStream::from_std(receiver).context("try into tokio unix stream failed")?;
Ok(Self(receiver))
}
}
.map_err(|_| anyhow!("Get Lock failed"))?;

#[async_trait]
impl ColletcItemConsume for Consumer {
async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
match channel_receive(&mut self.0).await {
Ok(item) => Ok(Some(item)),
Err(e) => Err(e.into()),
}
channel_send(item, stream)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should set the stream nonblocking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking has its own problem as I already stated. See apache/skywalking#9831 (comment).
I've tried to set it to non-blocking and it just failed.

Copy link
Member

@jmjoy jmjoy Oct 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about connect-write-close for every request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually take connect setup as a costly operation. Of course, a Unix domain socket should be much cheaper. Since the fpm is somewhat permanent, keeping an open connection is expected to be a better choice (than open-write-close every time). IMHO, the non-blocking problem cannot be solved by any tricks. A non-blocking operation almost always has a chance of failure and is usually used with polling and retries. An open-write-close sequence may solve the problem or may not. Even if it does, it just shifts the cost from 'send' to 'open'(connect).
If we'd like to ensure non-blocking, we could delegate the send operation to another thread. If we are implementing an event loop ourselves, the standalone thread could be eliminated by multiplexing the running flow (as tokio does). With fpm, we have no luck.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the connection perspective, I think keeping connected makes perfect sense. And using the blocking way to send is not a block.
The key to resolving the concern is, let's not block the use process. This is why, usually, before sending the segments/spans through the network, there is a no-blocking in-memory queue.
Ref from the Java agent
https://github.com/apache/skywalking-java/blob/1148ec3801179c005304ae96dd76d7eac1cbfafa/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java#L68
The DataCarrier with IF_POSSIBLE is a queue implementation, which could provide both buffering mechanism and auto-drop mechanism. The auto-drop would automatically drop the new message to the queue, if the queue is full, which could be caused by slowly sending or the connection disconnected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know PHP process model, is the code of agent not running inside the user process?
Usually we don't recommendsharing connection cross processes. One gRPC connection per process is done in all other agents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether one gRPC connection per process pattern fits PHP. We usually have multiple fpm processes (at least 10+) running per server. Anyway, the current design starts a worker process for all these processes, which is why the IPC comes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have a very simple test method: kill the grpc process, and then observe whether the request is blocked.

The grpc process is created here:

let pid = libc::fork();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've already tested it. It fails with err=Broken pipe (os error 32). Of course, it should fail, and since we are utilizing a Unix domain socket here, unlike the INET domain, which should fail fast.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. In this case, requests will block only when the uds buffer is full. It should be rare.

src/worker.rs Outdated
Comment on lines 97 to 132
let (tx, rx) = mpsc::channel::<Result<CollectItem, Box<dyn Error + Send>>>(255);
let listener = match UnixListener::bind(worker_addr) {
Ok(listener) => listener,
Err(err) => {
error!(?err, "Create endpoint failed");
error!(?err, "Bind failed");
return;
}
};
let channel = connect(endpoint).await;

let consumer = match channel::Consumer::new() {
Ok(consumer) => consumer,
debug!("Bind");
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((mut stream, _addr)) => {
debug!("Accept");

let tx = tx.clone();
tokio::spawn(async move {
loop {
let r = match channel::channel_receive(&mut stream).await {
Err(err) => match err.downcast_ref::<io::Error>() {
Some(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
return
}
_ => Err(err.into()),
},
Ok(i) => Ok(i),
};

if let Err(err) = tx.send(r).await {
error!(?err, "Send failed");
return;
}
}
});
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that (tx, rx) and outer tokio::spawn can be removed, to prevent the data copied. In addition, please write more details in the log, like debug!(server_addr, "Bind unix domain socket");.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without an outer spawn, the accept (dead) loop will block the codes after it. Perhaps we could reorg the codes into another Future and prevent an explicit spawn (but an arm of select!). I'm not sure whether this is necessary.
The mpsc facilitates Consumer. If we build the worker listener with a select structure, we may be able to replace the mpsc with a FIFO. Still, the UnixListener and the GrpcReporter seem to run in their own (tokio) threads. IMHO, (tokio) mpsc is proper here, at least with the current schema.

src/module.rs Show resolved Hide resolved
@jmjoy jmjoy requested a review from heyanlong October 24, 2022 09:09
Copy link
Member

@heyanlong heyanlong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel is the foundation of the agent, appears to be more stable now

@heyanlong heyanlong merged commit e6b733b into apache:master Oct 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
4 participants