| /* |
| * Copyright (c) 2011 The WebRTC project authors. All Rights Reserved. |
| * |
| * Use of this source code is governed by a BSD-style license |
| * that can be found in the LICENSE file in the root of the source |
| * tree. An additional intellectual property rights grant can be found |
| * in the file PATENTS. All contributing project authors may |
| * be found in the AUTHORS file in the root of the source tree. |
| */ |
| |
| #include "data_log.h" |
| |
| #include <assert.h> |
| |
| #include <algorithm> |
| #include <list> |
| |
| #include "critical_section_wrapper.h" |
| #include "event_wrapper.h" |
| #include "file_wrapper.h" |
| #include "rw_lock_wrapper.h" |
| #include "thread_wrapper.h" |
| |
| namespace webrtc { |
| |
| DataLogImpl::CritSectScopedPtr DataLogImpl::crit_sect_( |
| CriticalSectionWrapper::CreateCriticalSection()); |
| |
| DataLogImpl* DataLogImpl::instance_ = NULL; |
| |
| // A Row contains cells, which are indexed by the column names as std::string. |
| // The string index is treated in a case sensitive way. |
| class Row { |
| public: |
| Row(); |
| ~Row(); |
| |
| // Inserts a Container into the cell of the column specified with |
| // column_name. |
| // column_name is treated in a case sensitive way. |
| int InsertCell(const std::string& column_name, |
| const Container* value_container); |
| |
| // Converts the value at the column specified by column_name to a string |
| // stored in value_string. |
| // column_name is treated in a case sensitive way. |
| void ToString(const std::string& column_name, std::string* value_string); |
| |
| private: |
| // Collection of containers indexed by column name as std::string |
| typedef std::map<std::string, const Container*> CellMap; |
| |
| CellMap cells_; |
| CriticalSectionWrapper* cells_lock_; |
| }; |
| |
| // A LogTable contains multiple rows, where only the latest row is active for |
| // editing. The rows are defined by the ColumnMap, which contains the name of |
| // each column and the length of the column (1 for one-value-columns and greater |
| // than 1 for multi-value-columns). |
| class LogTable { |
| public: |
| LogTable(); |
| ~LogTable(); |
| |
| // Adds the column with name column_name to the table. The column will be a |
| // multi-value-column if multi_value_length is greater than 1. |
| // column_name is treated in a case sensitive way. |
| int AddColumn(const std::string& column_name, int multi_value_length); |
| |
| // Buffers the current row while it is waiting to be written to file, |
| // which is done by a call to Flush(). A new row is available when the |
| // function returns |
| void NextRow(); |
| |
| // Inserts a Container into the cell of the column specified with |
| // column_name. |
| // column_name is treated in a case sensitive way. |
| int InsertCell(const std::string& column_name, |
| const Container* value_container); |
| |
| // Creates a log file, named as specified in the string file_name, to |
| // where the table will be written when calling Flush(). |
| int CreateLogFile(const std::string& file_name); |
| |
| // Write all complete rows to file. |
| // May not be called by two threads simultaneously (doing so may result in |
| // a race condition). Will be called by the file_writer_thread_ when that |
| // thread is running. |
| void Flush(); |
| |
| private: |
| // Collection of multi_value_lengths indexed by column name as std::string |
| typedef std::map<std::string, int> ColumnMap; |
| typedef std::list<Row*> RowList; |
| |
| ColumnMap columns_; |
| RowList rows_[2]; |
| RowList* rows_history_; |
| RowList* rows_flush_; |
| Row* current_row_; |
| FileWrapper* file_; |
| bool write_header_; |
| CriticalSectionWrapper* table_lock_; |
| }; |
| |
| Row::Row() |
| : cells_(), |
| cells_lock_(CriticalSectionWrapper::CreateCriticalSection()) { |
| } |
| |
| Row::~Row() { |
| for (CellMap::iterator it = cells_.begin(); it != cells_.end();) { |
| delete it->second; |
| // For maps all iterators (except the erased) are valid after an erase |
| cells_.erase(it++); |
| } |
| delete cells_lock_; |
| } |
| |
| int Row::InsertCell(const std::string& column_name, |
| const Container* value_container) { |
| CriticalSectionScoped synchronize(cells_lock_); |
| assert(cells_.count(column_name) == 0); |
| if (cells_.count(column_name) > 0) |
| return -1; |
| cells_[column_name] = value_container; |
| return 0; |
| } |
| |
| void Row::ToString(const std::string& column_name, |
| std::string* value_string) { |
| CriticalSectionScoped synchronize(cells_lock_); |
| const Container* container = cells_[column_name]; |
| if (container == NULL) { |
| *value_string = "NaN,"; |
| return; |
| } |
| container->ToString(value_string); |
| } |
| |
| LogTable::LogTable() |
| : columns_(), |
| rows_(), |
| rows_history_(&rows_[0]), |
| rows_flush_(&rows_[1]), |
| current_row_(new Row), |
| file_(FileWrapper::Create()), |
| write_header_(true), |
| table_lock_(CriticalSectionWrapper::CreateCriticalSection()) { |
| } |
| |
| LogTable::~LogTable() { |
| for (RowList::iterator row_it = rows_history_->begin(); |
| row_it != rows_history_->end();) { |
| delete *row_it; |
| row_it = rows_history_->erase(row_it); |
| } |
| for (ColumnMap::iterator col_it = columns_.begin(); |
| col_it != columns_.end();) { |
| // For maps all iterators (except the erased) are valid after an erase |
| columns_.erase(col_it++); |
| } |
| if (file_ != NULL) { |
| file_->Flush(); |
| file_->CloseFile(); |
| delete file_; |
| } |
| delete current_row_; |
| delete table_lock_; |
| } |
| |
| int LogTable::AddColumn(const std::string& column_name, |
| int multi_value_length) { |
| assert(multi_value_length > 0); |
| if (!write_header_) { |
| // It's not allowed to add new columns after the header |
| // has been written. |
| assert(false); |
| return -1; |
| } else { |
| CriticalSectionScoped synchronize(table_lock_); |
| if (write_header_) |
| columns_[column_name] = multi_value_length; |
| else |
| return -1; |
| } |
| return 0; |
| } |
| |
| void LogTable::NextRow() { |
| CriticalSectionScoped sync_rows(table_lock_); |
| rows_history_->push_back(current_row_); |
| current_row_ = new Row; |
| } |
| |
| int LogTable::InsertCell(const std::string& column_name, |
| const Container* value_container) { |
| CriticalSectionScoped synchronize(table_lock_); |
| assert(columns_.count(column_name) > 0); |
| if (columns_.count(column_name) == 0) |
| return -1; |
| return current_row_->InsertCell(column_name, value_container); |
| } |
| |
| int LogTable::CreateLogFile(const std::string& file_name) { |
| if (file_name.length() == 0) |
| return -1; |
| if (file_->Open()) |
| return -1; |
| file_->OpenFile(file_name.c_str(), |
| false, // Open with read/write permissions |
| false, // Don't wraparound and write at the beginning when |
| // the file is full |
| true); // Open as a text file |
| if (file_ == NULL) |
| return -1; |
| return 0; |
| } |
| |
| void LogTable::Flush() { |
| ColumnMap::iterator column_it; |
| bool commit_header = false; |
| if (write_header_) { |
| CriticalSectionScoped synchronize(table_lock_); |
| if (write_header_) { |
| commit_header = true; |
| write_header_ = false; |
| } |
| } |
| if (commit_header) { |
| for (column_it = columns_.begin(); |
| column_it != columns_.end(); ++column_it) { |
| if (column_it->second > 1) { |
| file_->WriteText("%s[%u],", column_it->first.c_str(), |
| column_it->second); |
| for (int i = 1; i < column_it->second; ++i) |
| file_->WriteText(","); |
| } else { |
| file_->WriteText("%s,", column_it->first.c_str()); |
| } |
| } |
| if (columns_.size() > 0) |
| file_->WriteText("\n"); |
| } |
| |
| // Swap the list used for flushing with the list containing the row history |
| // and clear the history. We also create a local pointer to the new |
| // list used for flushing to avoid race conditions if another thread |
| // calls this function while we are writing. |
| // We don't want to block the list while we're writing to file. |
| { |
| CriticalSectionScoped synchronize(table_lock_); |
| RowList* tmp = rows_flush_; |
| rows_flush_ = rows_history_; |
| rows_history_ = tmp; |
| rows_history_->clear(); |
| } |
| |
| // Write all complete rows to file and delete them |
| for (RowList::iterator row_it = rows_flush_->begin(); |
| row_it != rows_flush_->end();) { |
| for (column_it = columns_.begin(); |
| column_it != columns_.end(); ++column_it) { |
| std::string row_string; |
| (*row_it)->ToString(column_it->first, &row_string); |
| file_->WriteText("%s", row_string.c_str()); |
| } |
| if (columns_.size() > 0) |
| file_->WriteText("\n"); |
| delete *row_it; |
| row_it = rows_flush_->erase(row_it); |
| } |
| } |
| |
| int DataLog::CreateLog() { |
| return DataLogImpl::CreateLog(); |
| } |
| |
| void DataLog::ReturnLog() { |
| return DataLogImpl::ReturnLog(); |
| } |
| |
| std::string DataLog::Combine(const std::string& table_name, int table_id) { |
| std::stringstream ss; |
| std::string combined_id = table_name; |
| std::string number_suffix; |
| ss << "_" << table_id; |
| ss >> number_suffix; |
| combined_id += number_suffix; |
| std::transform(combined_id.begin(), combined_id.end(), combined_id.begin(), |
| ::tolower); |
| return combined_id; |
| } |
| |
| int DataLog::AddTable(const std::string& table_name) { |
| DataLogImpl* data_log = DataLogImpl::StaticInstance(); |
| if (data_log == NULL) |
| return -1; |
| return data_log->AddTable(table_name); |
| } |
| |
| int DataLog::AddColumn(const std::string& table_name, |
| const std::string& column_name, |
| int multi_value_length) { |
| DataLogImpl* data_log = DataLogImpl::StaticInstance(); |
| if (data_log == NULL) |
| return -1; |
| return data_log->DataLogImpl::StaticInstance()->AddColumn(table_name, |
| column_name, |
| multi_value_length); |
| } |
| |
| int DataLog::NextRow(const std::string& table_name) { |
| DataLogImpl* data_log = DataLogImpl::StaticInstance(); |
| if (data_log == NULL) |
| return -1; |
| return data_log->DataLogImpl::StaticInstance()->NextRow(table_name); |
| } |
| |
| DataLogImpl::DataLogImpl() |
| : counter_(1), |
| tables_(), |
| flush_event_(EventWrapper::Create()), |
| file_writer_thread_(NULL), |
| tables_lock_(RWLockWrapper::CreateRWLock()) { |
| } |
| |
| DataLogImpl::~DataLogImpl() { |
| StopThread(); |
| Flush(); // Write any remaining rows |
| delete file_writer_thread_; |
| delete flush_event_; |
| for (TableMap::iterator it = tables_.begin(); it != tables_.end();) { |
| delete static_cast<LogTable*>(it->second); |
| // For maps all iterators (except the erased) are valid after an erase |
| tables_.erase(it++); |
| } |
| delete tables_lock_; |
| } |
| |
| int DataLogImpl::CreateLog() { |
| CriticalSectionScoped synchronize(crit_sect_.get()); |
| if (instance_ == NULL) { |
| instance_ = new DataLogImpl(); |
| return instance_->Init(); |
| } else { |
| ++instance_->counter_; |
| } |
| return 0; |
| } |
| |
| int DataLogImpl::Init() { |
| file_writer_thread_ = ThreadWrapper::CreateThread( |
| DataLogImpl::Run, |
| instance_, |
| kHighestPriority, |
| "DataLog"); |
| if (file_writer_thread_ == NULL) |
| return -1; |
| unsigned int thread_id = 0; |
| bool success = file_writer_thread_->Start(thread_id); |
| if (!success) |
| return -1; |
| return 0; |
| } |
| |
| DataLogImpl* DataLogImpl::StaticInstance() { |
| return instance_; |
| } |
| |
| void DataLogImpl::ReturnLog() { |
| CriticalSectionScoped synchronize(crit_sect_.get()); |
| if (instance_ && instance_->counter_ > 1) { |
| --instance_->counter_; |
| return; |
| } |
| delete instance_; |
| instance_ = NULL; |
| } |
| |
| int DataLogImpl::AddTable(const std::string& table_name) { |
| WriteLockScoped synchronize(*tables_lock_); |
| // Make sure we don't add a table which already exists |
| if (tables_.count(table_name) > 0) |
| return -1; |
| tables_[table_name] = new LogTable(); |
| if (tables_[table_name]->CreateLogFile(table_name + ".txt") == -1) |
| return -1; |
| return 0; |
| } |
| |
| int DataLogImpl::AddColumn(const std::string& table_name, |
| const std::string& column_name, |
| int multi_value_length) { |
| ReadLockScoped synchronize(*tables_lock_); |
| if (tables_.count(table_name) == 0) |
| return -1; |
| return tables_[table_name]->AddColumn(column_name, multi_value_length); |
| } |
| |
| int DataLogImpl::InsertCell(const std::string& table_name, |
| const std::string& column_name, |
| const Container* value_container) { |
| ReadLockScoped synchronize(*tables_lock_); |
| assert(tables_.count(table_name) > 0); |
| if (tables_.count(table_name) == 0) |
| return -1; |
| return tables_[table_name]->InsertCell(column_name, value_container); |
| } |
| |
| int DataLogImpl::NextRow(const std::string& table_name) { |
| ReadLockScoped synchronize(*tables_lock_); |
| if (tables_.count(table_name) == 0) |
| return -1; |
| tables_[table_name]->NextRow(); |
| if (file_writer_thread_ == NULL) { |
| // Write every row to file as they get complete. |
| tables_[table_name]->Flush(); |
| } else { |
| // Signal a complete row |
| flush_event_->Set(); |
| } |
| return 0; |
| } |
| |
| void DataLogImpl::Flush() { |
| ReadLockScoped synchronize(*tables_lock_); |
| for (TableMap::iterator it = tables_.begin(); it != tables_.end(); ++it) { |
| it->second->Flush(); |
| } |
| } |
| |
| bool DataLogImpl::Run(void* obj) { |
| static_cast<DataLogImpl*>(obj)->Process(); |
| return true; |
| } |
| |
| void DataLogImpl::Process() { |
| // Wait for a row to be complete |
| flush_event_->Wait(WEBRTC_EVENT_INFINITE); |
| Flush(); |
| } |
| |
| void DataLogImpl::StopThread() { |
| if (file_writer_thread_ != NULL) { |
| file_writer_thread_->SetNotAlive(); |
| flush_event_->Set(); |
| // Call Stop() repeatedly, waiting for the Flush() call in Process() to |
| // finish. |
| while (!file_writer_thread_->Stop()) continue; |
| } |
| } |
| |
| } // namespace webrtc |