New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Utilize UnixListener for the worker process to accept reports. #26
Conversation
Please correct the title of PR |
let stream = self | ||
.stream | ||
.get_or_try_init(|| UnixStream::connect(&self.worker_addr).map(Mutex::new))? | ||
.lock() | ||
.map_err(|_| anyhow!("Get Lock failed"))? | ||
.take() | ||
.context("The RECEIVER has been taked")?; | ||
let receiver = | ||
UnixStream::from_std(receiver).context("try into tokio unix stream failed")?; | ||
Ok(Self(receiver)) | ||
} | ||
} | ||
.map_err(|_| anyhow!("Get Lock failed"))?; | ||
|
||
#[async_trait] | ||
impl ColletcItemConsume for Consumer { | ||
async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> { | ||
match channel_receive(&mut self.0).await { | ||
Ok(item) => Ok(Some(item)), | ||
Err(e) => Err(e.into()), | ||
} | ||
channel_send(item, stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should set the stream nonblocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-blocking has its own problem as I already stated. See apache/skywalking#9831 (comment).
I've tried to set it to non-blocking and it just failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about connect-write-close for every request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually take connect setup as a costly operation. Of course, a Unix domain socket should be much cheaper. Since the fpm is somewhat permanent, keeping an open connection is expected to be a better choice (than open-write-close every time). IMHO, the non-blocking problem cannot be solved by any tricks. A non-blocking operation almost always has a chance of failure and is usually used with polling and retries. An open-write-close sequence may solve the problem or may not. Even if it does, it just shifts the cost from 'send' to 'open'(connect).
If we'd like to ensure non-blocking, we could delegate the send operation to another thread. If we are implementing an event loop ourselves, the standalone thread could be eliminated by multiplexing the running flow (as tokio does). With fpm, we have no luck.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the connection perspective, I think keeping connected makes perfect sense. And using the blocking way to send is not a block.
The key to resolving the concern is, let's not block the use process. This is why, usually, before sending the segments/spans through the network, there is a no-blocking in-memory queue.
Ref from the Java agent
https://github.com/apache/skywalking-java/blob/1148ec3801179c005304ae96dd76d7eac1cbfafa/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java#L68
The DataCarrier with IF_POSSIBLE is a queue implementation, which could provide both buffering mechanism and auto-drop mechanism. The auto-drop would automatically drop the new message to the queue, if the queue is full, which could be caused by slowly sending or the connection disconnected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not know PHP process model, is the code of agent not running inside the user process?
Usually we don't recommendsharing connection cross processes. One gRPC connection per process is done in all other agents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure whether one gRPC connection per process pattern fits PHP. We usually have multiple fpm processes (at least 10+) running per server. Anyway, the current design starts a worker process for all these processes, which is why the IPC comes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have a very simple test method: kill the grpc process, and then observe whether the request is blocked.
The grpc process is created here:
Line 40 in 992da66
let pid = libc::fork(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've already tested it. It fails with err=Broken pipe (os error 32)
. Of course, it should fail, and since we are utilizing a Unix domain socket here, unlike the INET domain, which should fail fast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. In this case, requests will block only when the uds buffer is full. It should be rare.
src/worker.rs
Outdated
let (tx, rx) = mpsc::channel::<Result<CollectItem, Box<dyn Error + Send>>>(255); | ||
let listener = match UnixListener::bind(worker_addr) { | ||
Ok(listener) => listener, | ||
Err(err) => { | ||
error!(?err, "Create endpoint failed"); | ||
error!(?err, "Bind failed"); | ||
return; | ||
} | ||
}; | ||
let channel = connect(endpoint).await; | ||
|
||
let consumer = match channel::Consumer::new() { | ||
Ok(consumer) => consumer, | ||
debug!("Bind"); | ||
tokio::spawn(async move { | ||
loop { | ||
match listener.accept().await { | ||
Ok((mut stream, _addr)) => { | ||
debug!("Accept"); | ||
|
||
let tx = tx.clone(); | ||
tokio::spawn(async move { | ||
loop { | ||
let r = match channel::channel_receive(&mut stream).await { | ||
Err(err) => match err.downcast_ref::<io::Error>() { | ||
Some(e) if e.kind() == io::ErrorKind::UnexpectedEof => { | ||
return | ||
} | ||
_ => Err(err.into()), | ||
}, | ||
Ok(i) => Ok(i), | ||
}; | ||
|
||
if let Err(err) = tx.send(r).await { | ||
error!(?err, "Send failed"); | ||
return; | ||
} | ||
} | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that (tx, rx)
and outer tokio::spawn
can be removed, to prevent the data copied. In addition, please write more details in the log, like debug!(server_addr, "Bind unix domain socket");
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without an outer spawn
, the accept
(dead) loop will block the codes after it. Perhaps we could reorg the codes into another Future
and prevent an explicit spawn (but an arm of select!
). I'm not sure whether this is necessary.
The mpsc
facilitates Consumer
. If we build the worker listener with a select
structure, we may be able to replace the mpsc
with a FIFO. Still, the UnixListener
and the GrpcReporter
seem to run in their own (tokio) threads. IMHO, (tokio) mpsc
is proper here, at least with the current schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel is the foundation of the agent, appears to be more stable now
No description provided.