两个串口同时通讯报:Error removing epoll events for fd

发布时间 2023-07-05 10:37:23作者: 不及格的程序员-八神

android  MessageQueue流程分析

jeremy_fan 2018-09-21 15:47:06 276 收藏
展开
分析MessageQueue,我们先分析下它的头文件

class IDisplayEventConnection;
class EventThread;
class SurfaceFlinger;

// ---------------------------------------------------------------------------

class MessageBase : public MessageHandler
{
public:
MessageBase();

// return true if message has a handler
virtual bool handler() = 0;

// waits for the handler to be processed
void wait() const { barrier.wait(); }

protected:
virtual ~MessageBase();

private:
virtual void handleMessage(const Message& message);

mutable Barrier barrier; //实际为Condition类,用来实现休眠唤醒
};

class LambdaMessage : public MessageBase {
public:
explicit LambdaMessage(std::function<void()> handler)
: MessageBase(), mHandler(std::move(handler)) {}

bool handler() override {
mHandler();
// This return value is no longer checked, so it's always safe to return true
return true;
}

private:
const std::function<void()> mHandler;
};

// ---------------------------------------------------------------------------

class MessageQueue {
class Handler : public MessageHandler { //内部类,继承了MessageHandler处理message
enum {
eventMaskInvalidate = 0x1,
eventMaskRefresh = 0x2,
eventMaskTransaction = 0x4
};
MessageQueue& mQueue;
int32_t mEventMask;
public:
explicit Handler(MessageQueue& queue) : mQueue(queue), mEventMask(0) { }
virtual void handleMessage(const Message& message);
void dispatchRefresh();
void dispatchInvalidate();
};

friend class Handler;

sp<SurfaceFlinger> mFlinger; //反向指针指向外部的surfaceflinger
sp<Looper> mLooper;
sp<EventThread> mEventThread; //
sp<IDisplayEventConnection> mEvents; //
gui::BitTube mEventTube; //
sp<Handler> mHandler; // Handler变量,用来处理消息


static int cb_eventReceiver(int fd, int events, void* data);
int eventReceiver(int fd, int events);

public:
enum {//两种消息类型,无效和刷新
INVALIDATE = 0,
REFRESH = 1,
};

MessageQueue();
~MessageQueue();
void init(const sp<SurfaceFlinger>& flinger);//init函数,反向指针指向surfaceflinger
void setEventThread(const sp<EventThread>& events);

void waitMessage();//等待消息
status_t postMessage(const sp<MessageBase>& message, nsecs_t reltime=0);//发布消息

// sends INVALIDATE message at next VSYNC
void invalidate(); //下次vsync到来时,发送invalidate
// sends REFRESH message at next VSYNC
void refresh(); //在下次vsync到来时,发送refresh
};

// ---------------------------------------------------------------------------
core/include/utils/Looper.h
/**
* A message that can be posted to a Looper.
*/
struct Message {
Message() : what(0) { }
Message(int w) : what(w) { }

/* The message type. (interpretation is left up to the handler) */
int what;
};


/**
* Interface for a Looper message handler.
*
* The Looper holds a strong reference to the message handler whenever it has
* a message to deliver to it. Make sure to call Looper::removeMessages
* to remove any pending messages destined for the handler so that the handler
* can be destroyed.
*/
class MessageHandler : public virtual RefBase {
protected:
virtual ~MessageHandler();

public:
/**
* Handles a message.
*/
virtual void handleMessage(const Message& message) = 0;
};
从这个代码中可以看出,Message只有what一个成员变量,说明了message的类型,在MessageHandle中
提供了handleMessage函数,用来处理各种消息
上面的代码指出了MessageQueue的大体架构是怎么样的,成员变量里有thread说明我们提供了一个线程专门处理消息,

接下来我们看下代码,分析下,MessageQueue到底是如何实现的

native/services/surfaceflinger/MessageQueue.cpp

MessageQueue::MessageQueue()
{
}

MessageQueue::~MessageQueue() {
}

void MessageQueue::init(const sp<SurfaceFlinger>& flinger)
{
//首先我们来看一下这个init函数,首先反向指针指向surfaceflinger,初始化Looper,接下来初始化Handler进行消息的处理,Handler类最重要的方法为handlerMessage
mFlinger = flinger;
mLooper = new Looper(true);
mHandler = new Handler(*this);
}

//我们先看下Handler的构造函数,利用外部类型构造自身,由于是友元,我们可以通过这个mQueue访问MessageQueue的私有变量
explicit Handler(MessageQueue& queue) : mQueue(queue), mEventMask(0) { }

//下发事件时,要把事件类型mEventMask设置为eventMaskRefresh或者eventMaskInvalidate
//在handleManager时再设置为0
//下发REFRESH事件
void MessageQueue::Handler::dispatchRefresh() {
if ((android_atomic_or(eventMaskRefresh, &mEventMask) & eventMaskRefresh) == 0) {
mQueue.mLooper->sendMessage(this, Message(MessageQueue::REFRESH));
}
}

//下发INVALIDATE事件
void MessageQueue::Handler::dispatchInvalidate() {
if ((android_atomic_or(eventMaskInvalidate, &mEventMask) & eventMaskInvalidate) == 0) {
mQueue.mLooper->sendMessage(this, Message(MessageQueue::INVALIDATE));
}
}

//真正的调用mFlinger处理消息
void MessageQueue::Handler::handleMessage(const Message& message) {
switch (message.what) {
case INVALIDATE:
android_atomic_and(~eventMaskInvalidate, &mEventMask);
mQueue.mFlinger->onMessageReceived(message.what);
break;
case REFRESH:
android_atomic_and(~eventMaskRefresh, &mEventMask);
mQueue.mFlinger->onMessageReceived(message.what);
break;
}
}
//
根据上面代码,推测,是mLooper在sendMessage时,我们进行了handleMessage操作,接下来我们分析下Looper

Looper逻辑不是太复杂,我们只分析下,我们用到的几个函数,构造函数,addFd, removeFd, pollOnce, sendMessage

system/core/libutils/Looper.cpp
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
strerror(errno));

AutoMutex _l(mLock);
rebuildEpollLocked(); //重点看下这个函数
}

void Looper::rebuildEpollLocked() {
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
close(mEpollFd);
}

// Allocate the new epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT); //创建轮训的fd
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));

struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd;
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);//添加监察的fd
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));

for (size_t i = 0; i < mRequests.size(); i++) { //把mRequest中的fd都添加进mEpollFd
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);

int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}

//相较而言,removeFd要简单的多,把fd从监察列表中去除
int Looper::removeFd(int fd, int seq) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeFd - fd=%d, seq=%d", this, fd, seq);
#endif

{ // acquire lock
AutoMutex _l(mLock);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
return 0;
}

// Check the sequence number if one was given.
if (seq != -1 && mRequests.valueAt(requestIndex).seq != seq) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeFd - sequence number mismatch, oldSeq=%d",
this, mRequests.valueAt(requestIndex).seq);
#endif
return 0;
}

// Always remove the FD from the request map even if an error occurs while
// updating the epoll set so that we avoid accidentally leaking callbacks.
mRequests.removeItemsAt(requestIndex);

int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
if (epollResult < 0) {
if (seq != -1 && (errno == EBADF || errno == ENOENT)) {
// Tolerate EBADF or ENOENT when the sequence number is known because it
// means that the file descriptor was closed before its callback was
// unregistered. This error may occur naturally when a callback has the
// side-effect of closing the file descriptor before returning and
// unregistering itself.
//
// Unfortunately due to kernel limitations we need to rebuild the epoll
// set from scratch because it may contain an old file handle that we are
// now unable to remove since its file descriptor is no longer valid.
// No such problem would have occurred if we were using the poll system
// call instead, but that approach carries others disadvantages.
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeFd - EPOLL_CTL_DEL failed due to file descriptor "
"being closed: %s", this, strerror(errno));
#endif
scheduleEpollRebuildLocked();
} else {
// Some other error occurred. This is really weird because it means
// our list of callbacks got out of sync with the epoll set somehow.
// We defensively rebuild the epoll set to avoid getting spurious
// notifications with nowhere to go.
ALOGE("Error removing epoll events for fd %d: %s", fd, strerror(errno));
scheduleEpollRebuildLocked();
return -1;
}
}
} // release lock
return 1;
}

void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",
this, uptime, handler.get(), message.what);
#endif

size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);

size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}

MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1); //把事件按发生时间放到相应的位置

// Optimization: If the Looper is currently sending a message, then we can skip
// the call to wake() because the next thing the Looper will do after processing
// messages is to decide when the next wakeup time should be. In fact, it does
// not even matter whether this code is running on the Looper thread.
if (mSendingMessage) {
return;
}
} // release lock

// Wake the poll loop only when we enqueue a new message at the head.
if (i == 0) {
wake();
}
}


pollOnce最终会调用到pollInner

while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) { //当事件到了时,处理该事件
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message); //调用handleMessage
} // release handler

mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}

现在先做一个简单的分析,我对轮询还不是很懂,后续要进行学习
————————————————
版权声明:本文为CSDN博主「jeremy_fan」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/jeremy_fan/java/article/details/82799336