Some cleanup for base/logging and base/stream.h

* Fix race when calling UpdateMinLogSeverity
* Remove unused 'diagnostic mode'
* Remove LogToStream
* Fix ctor of StringStream
* Delete POpenStream
* Delete AsyncWriteStream
* Delete CircularFileStream
* Delete StreamSegment

BUG=
R=pbos@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/57429004

Cr-Commit-Position: refs/heads/master@{#9273}
This commit is contained in:
Tommi
2015-05-25 11:25:59 +02:00
parent 23edcff7a9
commit 00aac5aacf
12 changed files with 18 additions and 700 deletions

View File

@ -68,7 +68,7 @@ public:
void set_agent(const std::string& agent) { agent_ = agent; } void set_agent(const std::string& agent) { agent_ = agent; }
const std::string& agent() const { return agent_; } const std::string& agent() const { return agent_; }
void set_proxy(const ProxyInfo& proxy) { proxy_ = proxy; } void set_proxy(const ProxyInfo& proxy) { proxy_ = proxy; }
const ProxyInfo& proxy() const { return proxy_; } const ProxyInfo& proxy() const { return proxy_; }
@ -99,22 +99,20 @@ public:
// reset clears the server, request, and response structures. It will also // reset clears the server, request, and response structures. It will also
// abort an active request. // abort an active request.
void reset(); void reset();
void set_server(const SocketAddress& address); void set_server(const SocketAddress& address);
const SocketAddress& server() const { return server_; } const SocketAddress& server() const { return server_; }
// Note: in order for HttpClient to retry a POST in response to // Note: in order for HttpClient to retry a POST in response to
// an authentication challenge, a redirect response, or socket disconnection, // an authentication challenge, a redirect response, or socket disconnection,
// the request document must support 'replaying' by calling Rewind() on it. // the request document must support 'replaying' by calling Rewind() on it.
// In the case where just a subset of a stream should be used as the request
// document, the stream may be wrapped with the StreamSegment adapter.
HttpTransaction* transaction() { return transaction_; } HttpTransaction* transaction() { return transaction_; }
const HttpTransaction* transaction() const { return transaction_; } const HttpTransaction* transaction() const { return transaction_; }
HttpRequestData& request() { return transaction_->request; } HttpRequestData& request() { return transaction_->request; }
const HttpRequestData& request() const { return transaction_->request; } const HttpRequestData& request() const { return transaction_->request; }
HttpResponseData& response() { return transaction_->response; } HttpResponseData& response() { return transaction_->response; }
const HttpResponseData& response() const { return transaction_->response; } const HttpResponseData& response() const { return transaction_->response; }
// convenience methods // convenience methods
void prepare_get(const std::string& url); void prepare_get(const std::string& url);
void prepare_post(const std::string& url, const std::string& content_type, void prepare_post(const std::string& url, const std::string& content_type,
@ -125,7 +123,7 @@ public:
// After you finish setting up your request, call start. // After you finish setting up your request, call start.
void start(); void start();
// Signalled when the header has finished downloading, before the document // Signalled when the header has finished downloading, before the document
// content is processed. You may change the response document in response // content is processed. You may change the response document in response
// to this signal. The second parameter indicates whether this is an // to this signal. The second parameter indicates whether this is an

View File

@ -258,63 +258,6 @@ static void ExpectEofFromStream(FileStream* stream) {
} }
} }
// For caching the lsb_release output (reading it invokes a sub-process and
// hence is somewhat expensive).
static std::string lsb_release_string;
static CriticalSection lsb_release_string_critsec;
std::string ReadLinuxLsbRelease() {
CritScope cs(&lsb_release_string_critsec);
if (!lsb_release_string.empty()) {
// Have cached result from previous call.
return lsb_release_string;
}
// No cached result. Run lsb_release and parse output.
POpenStream lsb_release_output;
if (!lsb_release_output.Open("lsb_release -idrcs", "r", NULL)) {
LOG_ERR(LS_ERROR) << "Can't run lsb_release";
return lsb_release_string; // empty
}
// Read in the command's output and build the string.
std::ostringstream sstr;
std::string line;
int wait_status;
if (!ExpectLineFromStream(&lsb_release_output, &line)) {
return lsb_release_string; // empty
}
sstr << "DISTRIB_ID=" << line;
if (!ExpectLineFromStream(&lsb_release_output, &line)) {
return lsb_release_string; // empty
}
sstr << " DISTRIB_DESCRIPTION=\"" << line << '"';
if (!ExpectLineFromStream(&lsb_release_output, &line)) {
return lsb_release_string; // empty
}
sstr << " DISTRIB_RELEASE=" << line;
if (!ExpectLineFromStream(&lsb_release_output, &line)) {
return lsb_release_string; // empty
}
sstr << " DISTRIB_CODENAME=" << line;
// Should not be anything left.
ExpectEofFromStream(&lsb_release_output);
lsb_release_output.Close();
wait_status = lsb_release_output.GetWaitStatus();
if (wait_status == -1 ||
!WIFEXITED(wait_status) ||
WEXITSTATUS(wait_status) != 0) {
LOG(LS_WARNING) << "Unexpected exit status from lsb_release";
}
lsb_release_string = sstr.str();
return lsb_release_string;
}
#endif #endif
std::string ReadLinuxUname() { std::string ReadLinuxUname() {

View File

@ -104,11 +104,6 @@ class ProcCpuInfo {
ConfigParser::MapVector sections_; ConfigParser::MapVector sections_;
}; };
#if !defined(WEBRTC_CHROMIUM_BUILD)
// Builds a string containing the info from lsb_release on a single line.
std::string ReadLinuxLsbRelease();
#endif
// Returns the output of "uname". // Returns the output of "uname".
std::string ReadLinuxUname(); std::string ReadLinuxUname();

View File

@ -88,14 +88,6 @@ TEST(ConfigParser, ParseConfig) {
EXPECT_EQ(true, parser.Parse(&key_val_pairs)); EXPECT_EQ(true, parser.Parse(&key_val_pairs));
} }
#if !defined(WEBRTC_CHROMIUM_BUILD)
TEST(ReadLinuxLsbRelease, ReturnsSomething) {
std::string str = ReadLinuxLsbRelease();
// ChromeOS don't have lsb_release
// EXPECT_FALSE(str.empty());
}
#endif
TEST(ReadLinuxUname, ReturnsSomething) { TEST(ReadLinuxUname, ReturnsSomething) {
std::string str = ReadLinuxUname(); std::string str = ReadLinuxUname();
EXPECT_FALSE(str.empty()); EXPECT_FALSE(str.empty());

View File

@ -91,14 +91,11 @@ CriticalSection LogMessage::crit_;
// Note: we explicitly do not clean this up, because of the uncertain ordering // Note: we explicitly do not clean this up, because of the uncertain ordering
// of destructors at program exit. Let the person who sets the stream trigger // of destructors at program exit. Let the person who sets the stream trigger
// cleanup by setting to NULL, or let it leak (safe at program exit). // cleanup by setting to NULL, or let it leak (safe at program exit).
LogMessage::StreamList LogMessage::streams_; LogMessage::StreamList LogMessage::streams_ GUARDED_BY(LogMessage::crit_);
// Boolean options default to false (0) // Boolean options default to false (0)
bool LogMessage::thread_, LogMessage::timestamp_; bool LogMessage::thread_, LogMessage::timestamp_;
// If we're in diagnostic mode, we'll be explicitly set that way; default=false.
bool LogMessage::is_diagnostic_mode_ = false;
LogMessage::LogMessage(const char* file, int line, LoggingSeverity sev, LogMessage::LogMessage(const char* file, int line, LoggingSeverity sev,
LogErrorContext err_ctx, int err, const char* module) LogErrorContext err_ctx, int err, const char* module)
: severity_(sev), : severity_(sev),
@ -214,20 +211,8 @@ void LogMessage::LogTimestamps(bool on) {
void LogMessage::LogToDebug(LoggingSeverity min_sev) { void LogMessage::LogToDebug(LoggingSeverity min_sev) {
dbg_sev_ = min_sev; dbg_sev_ = min_sev;
UpdateMinLogSeverity();
}
void LogMessage::LogToStream(LogSink* stream, LoggingSeverity min_sev) {
CritScope cs(&crit_); CritScope cs(&crit_);
// Discard and delete all previously installed streams UpdateMinLogSeverity();
for (StreamList::iterator it = streams_.begin(); it != streams_.end(); ++it) {
delete it->first;
}
streams_.clear();
// Install the new stream, if specified
if (stream) {
AddLogToStream(stream, min_sev);
}
} }
int LogMessage::GetLogToStream(LogSink* stream) { int LogMessage::GetLogToStream(LogSink* stream) {
@ -320,7 +305,7 @@ void LogMessage::ConfigureLogging(const char* params) {
LogToDebug(debug_level); LogToDebug(debug_level);
} }
void LogMessage::UpdateMinLogSeverity() { void LogMessage::UpdateMinLogSeverity() EXCLUSIVE_LOCKS_REQUIRED(crit_) {
LoggingSeverity min_sev = dbg_sev_; LoggingSeverity min_sev = dbg_sev_;
for (StreamList::iterator it = streams_.begin(); it != streams_.end(); ++it) { for (StreamList::iterator it = streams_.begin(); it != streams_.end(); ++it) {
min_sev = std::min(dbg_sev_, it->second); min_sev = std::min(dbg_sev_, it->second);

View File

@ -56,6 +56,7 @@
#include <utility> #include <utility>
#include "webrtc/base/basictypes.h" #include "webrtc/base/basictypes.h"
#include "webrtc/base/criticalsection.h" #include "webrtc/base/criticalsection.h"
#include "webrtc/base/thread_annotations.h"
namespace rtc { namespace rtc {
@ -153,6 +154,7 @@ class LogMessage {
// LogThreads: Display the thread identifier of the current thread // LogThreads: Display the thread identifier of the current thread
static void LogThreads(bool on = true); static void LogThreads(bool on = true);
// LogTimestamps: Display the elapsed time of the program // LogTimestamps: Display the elapsed time of the program
static void LogTimestamps(bool on = true); static void LogTimestamps(bool on = true);
@ -168,7 +170,6 @@ class LogMessage {
// GetLogToStream gets the severity for the specified stream, of if none // GetLogToStream gets the severity for the specified stream, of if none
// is specified, the minimum stream severity. // is specified, the minimum stream severity.
// RemoveLogToStream removes the specified stream, without destroying it. // RemoveLogToStream removes the specified stream, without destroying it.
static void LogToStream(LogSink* stream, LoggingSeverity min_sev);
static int GetLogToStream(LogSink* stream = NULL); static int GetLogToStream(LogSink* stream = NULL);
static void AddLogToStream(LogSink* stream, LoggingSeverity min_sev); static void AddLogToStream(LogSink* stream, LoggingSeverity min_sev);
static void RemoveLogToStream(LogSink* stream); static void RemoveLogToStream(LogSink* stream);
@ -177,9 +178,6 @@ class LogMessage {
// logging operations by pre-checking the logging level. // logging operations by pre-checking the logging level.
static int GetMinLogSeverity() { return min_sev_; } static int GetMinLogSeverity() { return min_sev_; }
static void SetDiagnosticMode(bool f) { is_diagnostic_mode_ = f; }
static bool IsDiagnosticMode() { return is_diagnostic_mode_; }
// Parses the provided parameter stream to configure the options above. // Parses the provided parameter stream to configure the options above.
// Useful for configuring logging from the command line. // Useful for configuring logging from the command line.
static void ConfigureLogging(const char* params); static void ConfigureLogging(const char* params);
@ -189,7 +187,7 @@ class LogMessage {
typedef std::list<StreamAndSeverity> StreamList; typedef std::list<StreamAndSeverity> StreamList;
// Updates min_sev_ appropriately when debug sinks change. // Updates min_sev_ appropriately when debug sinks change.
static void UpdateMinLogSeverity(); static void UpdateMinLogSeverity() EXCLUSIVE_LOCKS_REQUIRED(crit_);
// These write out the actual log messages. // These write out the actual log messages.
static void OutputToDebug(const std::string& msg, LoggingSeverity severity_); static void OutputToDebug(const std::string& msg, LoggingSeverity severity_);
@ -224,9 +222,6 @@ class LogMessage {
// Flags for formatting options // Flags for formatting options
static bool thread_, timestamp_; static bool thread_, timestamp_;
// are we in diagnostic mode (as defined by the app)?
static bool is_diagnostic_mode_;
DISALLOW_COPY_AND_ASSIGN(LogMessage); DISALLOW_COPY_AND_ASSIGN(LogMessage);
}; };
@ -293,6 +288,7 @@ class LogMessageVoidify {
rtc::LogCheckLevel(rtc::sev) rtc::LogCheckLevel(rtc::sev)
#define LOG_CHECK_LEVEL_V(sev) \ #define LOG_CHECK_LEVEL_V(sev) \
rtc::LogCheckLevel(sev) rtc::LogCheckLevel(sev)
inline bool LogCheckLevel(LoggingSeverity sev) { inline bool LogCheckLevel(LoggingSeverity sev) {
return (LogMessage::GetMinLogSeverity() <= sev); return (LogMessage::GetMinLogSeverity() <= sev);
} }

View File

@ -25,13 +25,8 @@ class LogSinkImpl
public: public:
LogSinkImpl() {} LogSinkImpl() {}
// The non-const reference constructor is required because of StringStream.
// TODO(tommi): Fix StringStream to accept a pointer for non-const.
template<typename P> template<typename P>
explicit LogSinkImpl(P& p) : Base(p) {} explicit LogSinkImpl(P* p) : Base(p) {}
template<typename P>
explicit LogSinkImpl(const P& p) : Base(p) {}
private: private:
void OnLogMessage(const std::string& message) override { void OnLogMessage(const std::string& message) override {
@ -46,7 +41,7 @@ TEST(LogTest, SingleStream) {
int sev = LogMessage::GetLogToStream(NULL); int sev = LogMessage::GetLogToStream(NULL);
std::string str; std::string str;
LogSinkImpl<StringStream> stream(str); LogSinkImpl<StringStream> stream(&str);
LogMessage::AddLogToStream(&stream, LS_INFO); LogMessage::AddLogToStream(&stream, LS_INFO);
EXPECT_EQ(LS_INFO, LogMessage::GetLogToStream(&stream)); EXPECT_EQ(LS_INFO, LogMessage::GetLogToStream(&stream));
@ -68,7 +63,7 @@ TEST(LogTest, MultipleStreams) {
int sev = LogMessage::GetLogToStream(NULL); int sev = LogMessage::GetLogToStream(NULL);
std::string str1, str2; std::string str1, str2;
LogSinkImpl<StringStream> stream1(str1), stream2(str2); LogSinkImpl<StringStream> stream1(&str1), stream2(&str2);
LogMessage::AddLogToStream(&stream1, LS_INFO); LogMessage::AddLogToStream(&stream1, LS_INFO);
LogMessage::AddLogToStream(&stream2, LS_VERBOSE); LogMessage::AddLogToStream(&stream2, LS_VERBOSE);
EXPECT_EQ(LS_INFO, LogMessage::GetLogToStream(&stream1)); EXPECT_EQ(LS_INFO, LogMessage::GetLogToStream(&stream1));

View File

@ -91,7 +91,7 @@ TEST(MultipartTest, TestAddAndRead) {
// Read the multipart stream into StringStream // Read the multipart stream into StringStream
std::string str; std::string str;
rtc::StringStream str_stream(str); rtc::StringStream str_stream(&str);
EXPECT_EQ(rtc::SR_SUCCESS, EXPECT_EQ(rtc::SR_SUCCESS,
Flow(&multipart, buffer, sizeof(buffer), &str_stream)); Flow(&multipart, buffer, sizeof(buffer), &str_stream));
EXPECT_EQ(size, str.length()); EXPECT_EQ(size, str.length());

View File

@ -291,89 +291,6 @@ StreamResult StreamTap::Write(const void* data, size_t data_len,
return res; return res;
} }
///////////////////////////////////////////////////////////////////////////////
// StreamSegment
///////////////////////////////////////////////////////////////////////////////
StreamSegment::StreamSegment(StreamInterface* stream)
: StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
length_(SIZE_UNKNOWN) {
// It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
stream->GetPosition(&start_);
}
StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
: StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
length_(length) {
// It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
stream->GetPosition(&start_);
}
StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
size_t* read, int* error) {
if (SIZE_UNKNOWN != length_) {
if (pos_ >= length_)
return SR_EOS;
buffer_len = std::min(buffer_len, length_ - pos_);
}
size_t backup_read;
if (!read) {
read = &backup_read;
}
StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
read, error);
if (SR_SUCCESS == result) {
pos_ += *read;
}
return result;
}
bool StreamSegment::SetPosition(size_t position) {
if (SIZE_UNKNOWN == start_)
return false; // Not seekable
if ((SIZE_UNKNOWN != length_) && (position > length_))
return false; // Seek past end of segment
if (!StreamAdapterInterface::SetPosition(start_ + position))
return false;
pos_ = position;
return true;
}
bool StreamSegment::GetPosition(size_t* position) const {
if (SIZE_UNKNOWN == start_)
return false; // Not seekable
if (!StreamAdapterInterface::GetPosition(position))
return false;
if (position) {
ASSERT(*position >= start_);
*position -= start_;
}
return true;
}
bool StreamSegment::GetSize(size_t* size) const {
if (!StreamAdapterInterface::GetSize(size))
return false;
if (size) {
if (SIZE_UNKNOWN != start_) {
ASSERT(*size >= start_);
*size -= start_;
}
if (SIZE_UNKNOWN != length_) {
*size = std::min(*size, length_);
}
}
return true;
}
bool StreamSegment::GetAvailable(size_t* size) const {
if (!StreamAdapterInterface::GetAvailable(size))
return false;
if (size && (SIZE_UNKNOWN != length_))
*size = std::min(*size, length_ - pos_);
return true;
}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// NullStream // NullStream
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
@ -600,238 +517,6 @@ void FileStream::DoClose() {
fclose(file_); fclose(file_);
} }
CircularFileStream::CircularFileStream(size_t max_size)
: max_write_size_(max_size),
position_(0),
marked_position_(max_size / 2),
last_write_position_(0),
read_segment_(READ_LATEST),
read_segment_available_(0) {
}
bool CircularFileStream::Open(
const std::string& filename, const char* mode, int* error) {
if (!FileStream::Open(filename.c_str(), mode, error))
return false;
if (strchr(mode, "r") != NULL) { // Opened in read mode.
// Check if the buffer has been overwritten and determine how to read the
// log in time sequence.
size_t file_size;
GetSize(&file_size);
if (file_size == position_) {
// The buffer has not been overwritten yet. Read 0 .. file_size
read_segment_ = READ_LATEST;
read_segment_available_ = file_size;
} else {
// The buffer has been over written. There are three segments: The first
// one is 0 .. marked_position_, which is the marked earliest log. The
// second one is position_ .. file_size, which is the middle log. The
// last one is marked_position_ .. position_, which is the latest log.
read_segment_ = READ_MARKED;
read_segment_available_ = marked_position_;
last_write_position_ = position_;
}
// Read from the beginning.
position_ = 0;
SetPosition(position_);
}
return true;
}
StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
size_t* read, int* error) {
if (read_segment_available_ == 0) {
size_t file_size;
switch (read_segment_) {
case READ_MARKED: // Finished READ_MARKED and start READ_MIDDLE.
read_segment_ = READ_MIDDLE;
position_ = last_write_position_;
SetPosition(position_);
GetSize(&file_size);
read_segment_available_ = file_size - position_;
break;
case READ_MIDDLE: // Finished READ_MIDDLE and start READ_LATEST.
read_segment_ = READ_LATEST;
position_ = marked_position_;
SetPosition(position_);
read_segment_available_ = last_write_position_ - position_;
break;
default: // Finished READ_LATEST and return EOS.
return rtc::SR_EOS;
}
}
size_t local_read;
if (!read) read = &local_read;
size_t to_read = std::min(buffer_len, read_segment_available_);
rtc::StreamResult result
= rtc::FileStream::Read(buffer, to_read, read, error);
if (result == rtc::SR_SUCCESS) {
read_segment_available_ -= *read;
position_ += *read;
}
return result;
}
StreamResult CircularFileStream::Write(const void* data, size_t data_len,
size_t* written, int* error) {
if (position_ >= max_write_size_) {
ASSERT(position_ == max_write_size_);
position_ = marked_position_;
SetPosition(position_);
}
size_t local_written;
if (!written) written = &local_written;
size_t to_eof = max_write_size_ - position_;
size_t to_write = std::min(data_len, to_eof);
rtc::StreamResult result
= rtc::FileStream::Write(data, to_write, written, error);
if (result == rtc::SR_SUCCESS) {
position_ += *written;
}
return result;
}
AsyncWriteStream::AsyncWriteStream(StreamInterface* stream,
rtc::Thread* write_thread)
: stream_(stream),
write_thread_(write_thread),
state_(stream ? stream->GetState() : SS_CLOSED) {
}
AsyncWriteStream::~AsyncWriteStream() {
write_thread_->Clear(this, 0, NULL);
ClearBufferAndWrite();
CritScope cs(&crit_stream_);
stream_.reset();
}
StreamState AsyncWriteStream::GetState() const {
return state_;
}
// This is needed by some stream writers, such as RtpDumpWriter.
bool AsyncWriteStream::GetPosition(size_t* position) const {
CritScope cs(&crit_stream_);
return stream_->GetPosition(position);
}
// This is needed by some stream writers, such as the plugin log writers.
StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
size_t* read, int* error) {
CritScope cs(&crit_stream_);
return stream_->Read(buffer, buffer_len, read, error);
}
void AsyncWriteStream::Close() {
if (state_ == SS_CLOSED) {
return;
}
write_thread_->Clear(this, 0, NULL);
ClearBufferAndWrite();
CritScope cs(&crit_stream_);
stream_->Close();
state_ = SS_CLOSED;
}
StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
size_t* written, int* error) {
if (state_ == SS_CLOSED) {
return SR_ERROR;
}
size_t previous_buffer_length = 0;
{
CritScope cs(&crit_buffer_);
previous_buffer_length = buffer_.size();
buffer_.AppendData(reinterpret_cast<const uint8_t*>(data), data_len);
}
if (previous_buffer_length == 0) {
// If there's stuff already in the buffer, then we already called
// Post and the write_thread_ hasn't pulled it out yet, so we
// don't need to re-Post.
write_thread_->Post(this, 0, NULL);
}
// Return immediately, assuming that it works.
if (written) {
*written = data_len;
}
return SR_SUCCESS;
}
void AsyncWriteStream::OnMessage(rtc::Message* pmsg) {
ClearBufferAndWrite();
}
bool AsyncWriteStream::Flush() {
if (state_ == SS_CLOSED) {
return false;
}
ClearBufferAndWrite();
CritScope cs(&crit_stream_);
return stream_->Flush();
}
void AsyncWriteStream::ClearBufferAndWrite() {
Buffer to_write;
{
CritScope cs_buffer(&crit_buffer_);
to_write = buffer_.Pass();
buffer_.Clear();
}
if (to_write.size() > 0) {
CritScope cs(&crit_stream_);
stream_->WriteAll(to_write.data(), to_write.size(), NULL, NULL);
}
}
#if defined(WEBRTC_POSIX) && !defined(__native_client__)
// Have to identically rewrite the FileStream destructor or else it would call
// the base class's Close() instead of the sub-class's.
POpenStream::~POpenStream() {
POpenStream::Close();
}
bool POpenStream::Open(const std::string& subcommand,
const char* mode,
int* error) {
Close();
file_ = popen(subcommand.c_str(), mode);
if (file_ == NULL) {
if (error)
*error = errno;
return false;
}
return true;
}
bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
int shflag, int* error) {
return Open(subcommand, mode, error);
}
void POpenStream::DoClose() {
wait_status_ = pclose(file_);
}
#endif
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// MemoryStream // MemoryStream
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
@ -1277,8 +962,8 @@ void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
// StringStream - Reads/Writes to an external std::string // StringStream - Reads/Writes to an external std::string
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
StringStream::StringStream(std::string& str) StringStream::StringStream(std::string* str)
: str_(str), read_pos_(0), read_only_(false) { : str_(*str), read_pos_(0), read_only_(false) {
} }
StringStream::StringStream(const std::string& str) StringStream::StringStream(const std::string& str)

View File

@ -340,36 +340,6 @@ class StreamTap : public StreamAdapterInterface {
DISALLOW_COPY_AND_ASSIGN(StreamTap); DISALLOW_COPY_AND_ASSIGN(StreamTap);
}; };
///////////////////////////////////////////////////////////////////////////////
// StreamSegment adapts a read stream, to expose a subset of the adapted
// stream's data. This is useful for cases where a stream contains multiple
// documents concatenated together. StreamSegment can expose a subset of
// the data as an independent stream, including support for rewinding and
// seeking.
///////////////////////////////////////////////////////////////////////////////
class StreamSegment : public StreamAdapterInterface {
public:
// The current position of the adapted stream becomes the beginning of the
// segment. If a length is specified, it bounds the length of the segment.
explicit StreamSegment(StreamInterface* stream);
explicit StreamSegment(StreamInterface* stream, size_t length);
// StreamAdapterInterface Interface
StreamResult Read(void* buffer,
size_t buffer_len,
size_t* read,
int* error) override;
bool SetPosition(size_t position) override;
bool GetPosition(size_t* position) const override;
bool GetSize(size_t* size) const override;
bool GetAvailable(size_t* size) const override;
private:
size_t start_, pos_, length_;
DISALLOW_COPY_AND_ASSIGN(StreamSegment);
};
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// NullStream gives errors on read, and silently discards all written data. // NullStream gives errors on read, and silently discards all written data.
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
@ -448,110 +418,6 @@ class FileStream : public StreamInterface {
DISALLOW_COPY_AND_ASSIGN(FileStream); DISALLOW_COPY_AND_ASSIGN(FileStream);
}; };
// A stream that caps the output at a certain size, dropping content from the
// middle of the logical stream and maintaining equal parts of the start/end of
// the logical stream.
class CircularFileStream : public FileStream {
public:
explicit CircularFileStream(size_t max_size);
bool Open(const std::string& filename, const char* mode, int* error) override;
StreamResult Read(void* buffer,
size_t buffer_len,
size_t* read,
int* error) override;
StreamResult Write(const void* data,
size_t data_len,
size_t* written,
int* error) override;
private:
enum ReadSegment {
READ_MARKED, // Read 0 .. marked_position_
READ_MIDDLE, // Read position_ .. file_size
READ_LATEST, // Read marked_position_ .. position_ if the buffer was
// overwritten or 0 .. position_ otherwise.
};
size_t max_write_size_;
size_t position_;
size_t marked_position_;
size_t last_write_position_;
ReadSegment read_segment_;
size_t read_segment_available_;
};
// A stream which pushes writes onto a separate thread and
// returns from the write call immediately.
class AsyncWriteStream : public StreamInterface {
public:
// Takes ownership of the stream, but not the thread.
AsyncWriteStream(StreamInterface* stream, rtc::Thread* write_thread);
~AsyncWriteStream() override;
// StreamInterface Interface
StreamState GetState() const override;
// This is needed by some stream writers, such as RtpDumpWriter.
bool GetPosition(size_t* position) const override;
StreamResult Read(void* buffer,
size_t buffer_len,
size_t* read,
int* error) override;
StreamResult Write(const void* data,
size_t data_len,
size_t* written,
int* error) override;
void Close() override;
bool Flush() override;
protected:
// From MessageHandler
void OnMessage(rtc::Message* pmsg) override;
virtual void ClearBufferAndWrite();
private:
rtc::scoped_ptr<StreamInterface> stream_;
Thread* write_thread_;
StreamState state_;
Buffer buffer_;
mutable CriticalSection crit_stream_;
CriticalSection crit_buffer_;
DISALLOW_COPY_AND_ASSIGN(AsyncWriteStream);
};
#if defined(WEBRTC_POSIX) && !defined(__native_client__)
// A FileStream that is actually not a file, but the output or input of a
// sub-command. See "man 3 popen" for documentation of the underlying OS popen()
// function.
class POpenStream : public FileStream {
public:
POpenStream() : wait_status_(-1) {}
~POpenStream() override;
bool Open(const std::string& subcommand,
const char* mode,
int* error) override;
// Same as Open(). shflag is ignored.
bool OpenShare(const std::string& subcommand,
const char* mode,
int shflag,
int* error) override;
// Returns the wait status from the last Close() of an Open()'ed stream, or
// -1 if no Open()+Close() has been done on this object. Meaning of the number
// is documented in "man 2 wait".
int GetWaitStatus() const { return wait_status_; }
protected:
void DoClose() override;
private:
int wait_status_;
};
#endif // WEBRTC_POSIX
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// MemoryStream is a simple implementation of a StreamInterface over in-memory // MemoryStream is a simple implementation of a StreamInterface over in-memory
// data. Data is read and written at the current seek position. Reads return // data. Data is read and written at the current seek position. Reads return
@ -730,7 +596,7 @@ class LoggingAdapter : public StreamAdapterInterface {
class StringStream : public StreamInterface { class StringStream : public StreamInterface {
public: public:
explicit StringStream(std::string& str); explicit StringStream(std::string* str);
explicit StringStream(const std::string& str); explicit StringStream(const std::string& str);
StreamState GetState() const override; StreamState GetState() const override;

View File

@ -14,9 +14,6 @@
namespace rtc { namespace rtc {
namespace {
static const int kTimeoutMs = 10000;
} // namespace
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// TestStream // TestStream
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
@ -96,71 +93,6 @@ void SeekTest(StreamInterface* stream, const unsigned char value) {
EXPECT_EQ(20U, bytes); EXPECT_EQ(20U, bytes);
} }
TEST(StreamSegment, TranslatesPosition) {
TestStream* test = new TestStream;
// Verify behavior of original stream
SeekTest(test, 0);
StreamSegment* segment = new StreamSegment(test);
// Verify behavior of adapted stream (all values offset by 20)
SeekTest(segment, 20);
delete segment;
}
TEST(StreamSegment, SupportsArtificialTermination) {
TestStream* test = new TestStream;
size_t bytes;
unsigned char buffer[5000] = { 0 };
const size_t kBufSize = sizeof(buffer);
{
StreamInterface* stream = test;
// Read a lot of bytes
EXPECT_EQ(stream->Read(buffer, kBufSize, &bytes, NULL), SR_SUCCESS);
EXPECT_EQ(bytes, kBufSize);
EXPECT_TRUE(VerifyTestBuffer(buffer, kBufSize, 0));
// Test seeking far ahead
EXPECT_TRUE(stream->SetPosition(12345));
// Read a bunch more bytes
EXPECT_EQ(stream->Read(buffer, kBufSize, &bytes, NULL), SR_SUCCESS);
EXPECT_EQ(bytes, kBufSize);
EXPECT_TRUE(VerifyTestBuffer(buffer, kBufSize, 12345 % 256));
}
// Create a segment of test stream in range [100,600)
EXPECT_TRUE(test->SetPosition(100));
StreamSegment* segment = new StreamSegment(test, 500);
{
StreamInterface* stream = segment;
EXPECT_EQ(stream->Read(buffer, kBufSize, &bytes, NULL), SR_SUCCESS);
EXPECT_EQ(500U, bytes);
EXPECT_TRUE(VerifyTestBuffer(buffer, 500, 100));
EXPECT_EQ(stream->Read(buffer, kBufSize, &bytes, NULL), SR_EOS);
// Test seeking past "end" of stream
EXPECT_FALSE(stream->SetPosition(12345));
EXPECT_FALSE(stream->SetPosition(501));
// Test seeking to end (edge case)
EXPECT_TRUE(stream->SetPosition(500));
EXPECT_EQ(stream->Read(buffer, kBufSize, &bytes, NULL), SR_EOS);
// Test seeking to start
EXPECT_TRUE(stream->SetPosition(0));
EXPECT_EQ(stream->Read(buffer, kBufSize, &bytes, NULL), SR_SUCCESS);
EXPECT_EQ(500U, bytes);
EXPECT_TRUE(VerifyTestBuffer(buffer, 500, 100));
EXPECT_EQ(stream->Read(buffer, kBufSize, &bytes, NULL), SR_EOS);
}
delete segment;
}
TEST(FifoBufferTest, TestAll) { TEST(FifoBufferTest, TestAll) {
const size_t kSize = 16; const size_t kSize = 16;
const char in[kSize * 2 + 1] = "0123456789ABCDEFGHIJKLMNOPQRSTUV"; const char in[kSize * 2 + 1] = "0123456789ABCDEFGHIJKLMNOPQRSTUV";
@ -438,60 +370,4 @@ TEST(FifoBufferTest, WriteOffsetAndReadOffset) {
EXPECT_EQ(SR_BLOCK, buf.ReadOffset(out, 10, 16, NULL)); EXPECT_EQ(SR_BLOCK, buf.ReadOffset(out, 10, 16, NULL));
} }
TEST(AsyncWriteTest, TestWrite) {
FifoBuffer* buf = new FifoBuffer(100);
AsyncWriteStream stream(buf, Thread::Current());
EXPECT_EQ(SS_OPEN, stream.GetState());
// Write "abc". Will go to the logging thread, which is the current
// thread.
stream.Write("abc", 3, NULL, NULL);
char bytes[100];
size_t count;
// Messages on the thread's queue haven't been processed, so "abc"
// hasn't been written yet.
EXPECT_NE(SR_SUCCESS, buf->ReadOffset(&bytes, 3, 0, &count));
// Now we process the messages on the thread's queue, so "abc" has
// been written.
EXPECT_TRUE_WAIT(SR_SUCCESS == buf->ReadOffset(&bytes, 3, 0, &count),
kTimeoutMs);
EXPECT_EQ(3u, count);
EXPECT_EQ(0, memcmp(bytes, "abc", 3));
// Write "def". Will go to the logging thread, which is the current
// thread.
stream.Write("d", 1, &count, NULL);
stream.Write("e", 1, &count, NULL);
stream.Write("f", 1, &count, NULL);
EXPECT_EQ(1u, count);
// Messages on the thread's queue haven't been processed, so "def"
// hasn't been written yet.
EXPECT_NE(SR_SUCCESS, buf->ReadOffset(&bytes, 3, 3, &count));
// Flush() causes the message to be processed, so "def" has now been
// written.
stream.Flush();
EXPECT_EQ(SR_SUCCESS, buf->ReadOffset(&bytes, 3, 3, &count));
EXPECT_EQ(3u, count);
EXPECT_EQ(0, memcmp(bytes, "def", 3));
// Write "xyz". Will go to the logging thread, which is the current
// thread.
stream.Write("xyz", 3, &count, NULL);
EXPECT_EQ(3u, count);
// Messages on the thread's queue haven't been processed, so "xyz"
// hasn't been written yet.
EXPECT_NE(SR_SUCCESS, buf->ReadOffset(&bytes, 3, 6, &count));
// Close() causes the message to be processed, so "xyz" has now been
// written.
stream.Close();
EXPECT_EQ(SR_SUCCESS, buf->ReadOffset(&bytes, 3, 6, &count));
EXPECT_EQ(3u, count);
EXPECT_EQ(0, memcmp(bytes, "xyz", 3));
EXPECT_EQ(SS_CLOSED, stream.GetState());
// Is't closed, so the writes should fail.
EXPECT_EQ(SR_ERROR, stream.Write("000", 3, NULL, NULL));
}
} // namespace rtc } // namespace rtc

View File

@ -248,7 +248,6 @@ int main(int argc, char **argv) {
DEFINE_string(datachannel, "", DEFINE_string(datachannel, "",
"Enable a data channel, and choose the type: rtp or sctp."); "Enable a data channel, and choose the type: rtp or sctp.");
DEFINE_bool(d, false, "Turn on debugging."); DEFINE_bool(d, false, "Turn on debugging.");
DEFINE_string(log, "", "Turn on debugging to a file.");
DEFINE_bool(debugsrtp, false, "Enable debugging for srtp."); DEFINE_bool(debugsrtp, false, "Enable debugging for srtp.");
DEFINE_bool(help, false, "Prints this message"); DEFINE_bool(help, false, "Prints this message");
DEFINE_bool(multisession, false, DEFINE_bool(multisession, false,
@ -265,7 +264,6 @@ int main(int argc, char **argv) {
bool auto_accept = FLAG_a; bool auto_accept = FLAG_a;
bool debug = FLAG_d; bool debug = FLAG_d;
std::string log = FLAG_log;
std::string signaling = FLAG_signaling; std::string signaling = FLAG_signaling;
std::string transport = FLAG_transport; std::string transport = FLAG_transport;
bool test_server = FLAG_testserver; bool test_server = FLAG_testserver;
@ -291,17 +289,6 @@ int main(int argc, char **argv) {
rtc::LogMessage::LogToDebug(rtc::LS_VERBOSE); rtc::LogMessage::LogToDebug(rtc::LS_VERBOSE);
} }
if (!log.empty()) {
rtc::StreamInterface* stream =
rtc::Filesystem::OpenFile(log, "a");
if (stream) {
rtc::LogMessage::LogToStream(stream, rtc::LS_VERBOSE);
} else {
Print(("Cannot open debug log " + log + "\n").c_str());
return 1;
}
}
if (debugsrtp) { if (debugsrtp) {
cricket::EnableSrtpDebugging(); cricket::EnableSrtpDebugging();
} }