| /** |
| * Copyright (C) 2010 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "logging.h" |
| #include "status.h" |
| #include "worker.h" |
| |
| #include <time.h> |
| |
| //#define WORKER_DEBUG |
| #ifdef WORKER_DEBUG |
| |
| #define DBG(...) ALOGD(__VA_ARGS__) |
| |
| #else |
| |
| #define DBG(...) |
| |
| #endif |
| |
| void * WorkerThread::Work(void *param) { |
| WorkerThread *t = (WorkerThread *)param; |
| android_atomic_acquire_store(STATE_RUNNING, &t->state_); |
| void * v = t->Worker(t->workerParam_); |
| android_atomic_acquire_store(STATE_STOPPED, &t->state_); |
| return v; |
| } |
| |
| bool WorkerThread::isRunning() { |
| DBG("WorkerThread::isRunning E"); |
| bool ret_value = android_atomic_acquire_load(&state_) == STATE_RUNNING; |
| DBG("WorkerThread::isRunning X ret_value=%d", ret_value); |
| return ret_value; |
| } |
| |
| WorkerThread::WorkerThread() { |
| DBG("WorkerThread::WorkerThread E"); |
| state_ = STATE_INITIALIZED; |
| pthread_mutex_init(&mutex_, NULL); |
| pthread_cond_init(&cond_, NULL); |
| DBG("WorkerThread::WorkerThread X"); |
| } |
| |
| WorkerThread::~WorkerThread() { |
| DBG("WorkerThread::~WorkerThread E"); |
| Stop(); |
| pthread_mutex_destroy(&mutex_); |
| DBG("WorkerThread::~WorkerThread X"); |
| } |
| |
| // Return true if changed from STATE_RUNNING to STATE_STOPPING |
| bool WorkerThread::BeginStopping() { |
| DBG("WorkerThread::BeginStopping E"); |
| bool ret_value = (android_atomic_acquire_cas(STATE_RUNNING, STATE_STOPPING, &state_) == 0); |
| DBG("WorkerThread::BeginStopping X ret_value=%d", ret_value); |
| return ret_value; |
| } |
| |
| // Wait until state is not STATE_STOPPING |
| void WorkerThread::WaitUntilStopped() { |
| DBG("WorkerThread::WaitUntilStopped E"); |
| pthread_cond_signal(&cond_); |
| while(android_atomic_release_load(&state_) == STATE_STOPPING) { |
| usleep(200000); |
| } |
| DBG("WorkerThread::WaitUntilStopped X"); |
| } |
| |
| void WorkerThread::Stop() { |
| DBG("WorkerThread::Stop E"); |
| if (BeginStopping()) { |
| WaitUntilStopped(); |
| } |
| DBG("WorkerThread::Stop X"); |
| } |
| |
| int WorkerThread::Run(void *workerParam) { |
| DBG("WorkerThread::Run E workerParam=%p", workerParam); |
| int status; |
| int ret; |
| |
| workerParam_ = workerParam; |
| |
| ret = pthread_attr_init(&attr_); |
| if (ret != 0) { |
| ALOGE("RIL_Init X: pthread_attr_init failed err=%s", strerror(ret)); |
| return STATUS_ERR; |
| } |
| ret = pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_DETACHED); |
| if (ret != 0) { |
| ALOGE("RIL_Init X: pthread_attr_setdetachstate failed err=%s", |
| strerror(ret)); |
| return STATUS_ERR; |
| } |
| ret = pthread_create(&tid_, &attr_, |
| (void * (*)(void *))&WorkerThread::Work, this); |
| if (ret != 0) { |
| ALOGE("RIL_Init X: pthread_create failed err=%s", strerror(ret)); |
| return STATUS_ERR; |
| } |
| |
| // Wait until worker is running |
| while (android_atomic_acquire_load(&state_) == STATE_INITIALIZED) { |
| usleep(200000); |
| } |
| |
| DBG("WorkerThread::Run X workerParam=%p", workerParam); |
| return STATUS_OK; |
| } |
| |
| |
| class WorkerQueueThread : public WorkerThread { |
| private: |
| friend class WorkerQueue; |
| |
| public: |
| WorkerQueueThread() { |
| } |
| |
| virtual ~WorkerQueueThread() { |
| Stop(); |
| } |
| |
| void * Worker(void *param) { |
| DBG("WorkerQueueThread::Worker E"); |
| WorkerQueue *wq = (WorkerQueue *)param; |
| |
| // Do the work until we're told to stop |
| while (isRunning()) { |
| pthread_mutex_lock(&mutex_); |
| while (isRunning() && wq->q_.size() == 0) { |
| if (wq->delayed_q_.size() == 0) { |
| // Both queue's are empty so wait |
| pthread_cond_wait(&cond_, &mutex_); |
| } else { |
| // delayed_q_ is not empty, move any |
| // timed out records to q_. |
| int64_t now = android::elapsedRealtime(); |
| while((wq->delayed_q_.size() != 0) && |
| ((wq->delayed_q_.top()->time - now) <= 0)) { |
| struct WorkerQueue::Record *r = wq->delayed_q_.top(); |
| DBG("WorkerQueueThread::Worker move p=%p time=%lldms", |
| r->p, r->time); |
| wq->delayed_q_.pop(); |
| wq->q_.push_back(r); |
| } |
| |
| if ((wq->q_.size() == 0) && (wq->delayed_q_.size() != 0)) { |
| // We need to do a timed wait |
| struct timeval tv; |
| struct timespec ts; |
| struct WorkerQueue::Record *r = wq->delayed_q_.top(); |
| int64_t delay_ms = r->time - now; |
| DBG("WorkerQueueThread::Worker wait" |
| " p=%p time=%lldms delay_ms=%lldms", |
| r->p, r->time, delay_ms); |
| gettimeofday(&tv, NULL); |
| ts.tv_sec = tv.tv_sec + (delay_ms / 1000); |
| ts.tv_nsec = (tv.tv_usec + |
| ((delay_ms % 1000) * 1000)) * 1000; |
| pthread_cond_timedwait(&cond_, &mutex_, &ts); |
| } |
| } |
| } |
| if (isRunning()) { |
| struct WorkerQueue::Record *r = wq->q_.front(); |
| wq->q_.pop_front(); |
| void *p = r->p; |
| wq->release_record(r); |
| pthread_mutex_unlock(&mutex_); |
| wq->Process(r->p); |
| } else { |
| pthread_mutex_unlock(&mutex_); |
| } |
| } |
| DBG("WorkerQueueThread::Worker X"); |
| return NULL; |
| } |
| }; |
| |
| WorkerQueue::WorkerQueue() { |
| DBG("WorkerQueue::WorkerQueue E"); |
| wqt_ = new WorkerQueueThread(); |
| DBG("WorkerQueue::WorkerQueue X"); |
| } |
| |
| WorkerQueue::~WorkerQueue() { |
| DBG("WorkerQueue::~WorkerQueue E"); |
| Stop(); |
| |
| Record *r; |
| pthread_mutex_lock(&wqt_->mutex_); |
| while(free_list_.size() != 0) { |
| r = free_list_.front(); |
| free_list_.pop_front(); |
| DBG("WorkerQueue::~WorkerQueue delete free_list_ r=%p", r); |
| delete r; |
| } |
| while(delayed_q_.size() != 0) { |
| r = delayed_q_.top(); |
| delayed_q_.pop(); |
| DBG("WorkerQueue::~WorkerQueue delete delayed_q_ r=%p", r); |
| delete r; |
| } |
| pthread_mutex_unlock(&wqt_->mutex_); |
| |
| delete wqt_; |
| DBG("WorkerQueue::~WorkerQueue X"); |
| } |
| |
| int WorkerQueue::Run() { |
| return wqt_->Run(this); |
| } |
| |
| void WorkerQueue::Stop() { |
| wqt_->Stop(); |
| } |
| |
| /** |
| * Obtain a record from free_list if it is not empty, fill in the record with provided |
| * information: *p and delay_in_ms |
| */ |
| struct WorkerQueue::Record *WorkerQueue::obtain_record(void *p, int delay_in_ms) { |
| struct Record *r; |
| if (free_list_.size() == 0) { |
| r = new Record(); |
| DBG("WorkerQueue::obtain_record new r=%p", r); |
| } else { |
| r = free_list_.front(); |
| DBG("WorkerQueue::obtain_record reuse r=%p", r); |
| free_list_.pop_front(); |
| } |
| r->p = p; |
| if (delay_in_ms != 0) { |
| r->time = android::elapsedRealtime() + delay_in_ms; |
| } else { |
| r->time = 0; |
| } |
| return r; |
| } |
| |
| /** |
| * release a record and insert into the front of the free_list |
| */ |
| void WorkerQueue::release_record(struct Record *r) { |
| DBG("WorkerQueue::release_record r=%p", r); |
| free_list_.push_front(r); |
| } |
| |
| /** |
| * Add a record to processing queue q_ |
| */ |
| void WorkerQueue::Add(void *p) { |
| DBG("WorkerQueue::Add E:"); |
| pthread_mutex_lock(&wqt_->mutex_); |
| struct Record *r = obtain_record(p, 0); |
| q_.push_back(r); |
| if (q_.size() == 1) { |
| pthread_cond_signal(&wqt_->cond_); |
| } |
| pthread_mutex_unlock(&wqt_->mutex_); |
| DBG("WorkerQueue::Add X:"); |
| } |
| |
| void WorkerQueue::AddDelayed(void *p, int delay_in_ms) { |
| DBG("WorkerQueue::AddDelayed E:"); |
| if (delay_in_ms <= 0) { |
| Add(p); |
| } else { |
| pthread_mutex_lock(&wqt_->mutex_); |
| struct Record *r = obtain_record(p, delay_in_ms); |
| delayed_q_.push(r); |
| #ifdef WORKER_DEBUG |
| int64_t now = android::elapsedRealtime(); |
| DBG("WorkerQueue::AddDelayed" |
| " p=%p delay_in_ms=%d now=%lldms top->p=%p" |
| " top->time=%lldms diff=%lldms", |
| p, delay_in_ms, now, delayed_q_.top()->p, |
| delayed_q_.top()->time, delayed_q_.top()->time - now); |
| #endif |
| if ((q_.size() == 0) && (delayed_q_.top() == r)) { |
| // q_ is empty and the new record is at delayed_q_.top |
| // so we signal the waiting thread so it can readjust |
| // the wait time. |
| DBG("WorkerQueue::AddDelayed signal"); |
| pthread_cond_signal(&wqt_->cond_); |
| } |
| pthread_mutex_unlock(&wqt_->mutex_); |
| } |
| DBG("WorkerQueue::AddDelayed X:"); |
| } |
| |
| |
| class TestWorkerQueue : public WorkerQueue { |
| virtual void Process(void *p) { |
| ALOGD("TestWorkerQueue::Process: EX p=%p", p); |
| } |
| }; |
| |
| class TesterThread : public WorkerThread { |
| public: |
| void * Worker(void *param) |
| { |
| ALOGD("TesterThread::Worker E param=%p", param); |
| WorkerQueue *wq = (WorkerQueue *)param; |
| |
| // Test AddDelayed |
| wq->AddDelayed((void *)1000, 1000); |
| wq->Add((void *)0); |
| wq->Add((void *)0); |
| wq->Add((void *)0); |
| wq->Add((void *)0); |
| wq->AddDelayed((void *)100, 100); |
| wq->AddDelayed((void *)2000, 2000); |
| |
| for (int i = 1; isRunning(); i++) { |
| ALOGD("TesterThread: looping %d", i); |
| wq->Add((void *)i); |
| wq->Add((void *)i); |
| wq->Add((void *)i); |
| wq->Add((void *)i); |
| sleep(1); |
| } |
| |
| ALOGD("TesterThread::Worker X param=%p", param); |
| |
| return NULL; |
| } |
| }; |
| |
| void testWorker() { |
| ALOGD("testWorker E: ********"); |
| |
| // Test we can create a thread and delete it |
| TesterThread *tester = new TesterThread(); |
| delete tester; |
| |
| TestWorkerQueue *wq = new TestWorkerQueue(); |
| if (wq->Run() == STATUS_OK) { |
| ALOGD("testWorker WorkerQueue %p running", wq); |
| |
| // Test we can run a thread, stop it then delete it |
| tester = new TesterThread(); |
| tester->Run(wq); |
| ALOGD("testWorker tester %p running", tester); |
| sleep(10); |
| ALOGD("testWorker tester %p stopping", tester); |
| tester->Stop(); |
| ALOGD("testWorker tester %p stopped", tester); |
| wq->Stop(); |
| ALOGD("testWorker wq %p stopped", wq); |
| } |
| ALOGD("testWorker X: ********\n"); |
| } |