/* */ #include "SocketBuffer.h" #include #include #include "SocketCore.h" #include "DlAbortEx.h" #include "message.h" #include "fmt.h" #include "LogFactory.h" namespace aria2 { SocketBuffer::ByteArrayBufEntry::ByteArrayBufEntry (unsigned char* bytes, size_t length, ProgressUpdate* progressUpdate) : BufEntry(progressUpdate), bytes_(bytes), length_(length) {} SocketBuffer::ByteArrayBufEntry::~ByteArrayBufEntry() { delete [] bytes_; } ssize_t SocketBuffer::ByteArrayBufEntry::send (const std::shared_ptr& socket, size_t offset) { return socket->writeData(bytes_+offset, length_-offset); } bool SocketBuffer::ByteArrayBufEntry::final(size_t offset) const { return length_ <= offset; } size_t SocketBuffer::ByteArrayBufEntry::getLength() const { return length_; } const unsigned char* SocketBuffer::ByteArrayBufEntry::getData() const { return bytes_; } SocketBuffer::StringBufEntry::StringBufEntry(const std::string& s, ProgressUpdate* progressUpdate) : BufEntry(progressUpdate), str_(s) {} // SocketBuffer::StringBufEntry::StringBufEntry() {} ssize_t SocketBuffer::StringBufEntry::send (const std::shared_ptr& socket, size_t offset) { return socket->writeData(str_.data()+offset, str_.size()-offset); } bool SocketBuffer::StringBufEntry::final(size_t offset) const { return str_.size() <= offset; } size_t SocketBuffer::StringBufEntry::getLength() const { return str_.size(); } const unsigned char* SocketBuffer::StringBufEntry::getData() const { return reinterpret_cast(str_.c_str()); } void SocketBuffer::StringBufEntry::swap(std::string& s) { str_.swap(s); } SocketBuffer::SocketBuffer(const std::shared_ptr& socket): socket_(socket), offset_(0) {} SocketBuffer::~SocketBuffer() {} void SocketBuffer::pushBytes(unsigned char* bytes, size_t len, ProgressUpdate* progressUpdate) { if(len > 0) { bufq_.push_back(std::shared_ptr (new ByteArrayBufEntry(bytes, len, progressUpdate))); } } void SocketBuffer::pushStr(const std::string& data, ProgressUpdate* progressUpdate) { if(data.size() > 0) { bufq_.push_back(std::shared_ptr (new StringBufEntry(data, progressUpdate))); } } ssize_t SocketBuffer::send() { a2iovec iov[A2_IOV_MAX]; size_t totalslen = 0; while(!bufq_.empty()) { size_t num; ssize_t amount = 24*1024; ssize_t firstlen = bufq_[0]->getLength() - offset_; amount -= firstlen; iov[0].A2IOVEC_BASE = reinterpret_cast(const_cast (bufq_[0]->getData() + offset_)); iov[0].A2IOVEC_LEN = firstlen; for(num = 1; num < A2_IOV_MAX && num < bufq_.size() && amount > 0; ++num) { const std::shared_ptr& buf = bufq_[num]; ssize_t len = buf->getLength(); if(amount >= len) { amount -= len; iov[num].A2IOVEC_BASE = reinterpret_cast(const_cast(buf->getData())); iov[num].A2IOVEC_LEN = len; } else { break; } } ssize_t slen = socket_->writeVector(iov, num); if(slen == 0 && !socket_->wantRead() && !socket_->wantWrite()) { throw DL_ABORT_EX(fmt(EX_SOCKET_SEND, "Connection closed.")); } // A2_LOG_NOTICE(fmt("num=%zu, amount=%d, bufq.size()=%zu, SEND=%d", // num, amount, bufq_.size(), slen)); totalslen += slen; if(firstlen > slen) { offset_ += slen; bufq_[0]->progressUpdate(slen, false); goto fin; } else { slen -= firstlen; bufq_[0]->progressUpdate(firstlen, true); bufq_.pop_front(); offset_ = 0; for(size_t i = 1; i < num; ++i) { const std::shared_ptr& buf = bufq_[0]; ssize_t len = buf->getLength(); if(len > slen) { offset_ = slen; bufq_[0]->progressUpdate(slen, false); goto fin; break; } else { slen -= len; bufq_[0]->progressUpdate(len, true); bufq_.pop_front(); } } } } fin: return totalslen; } bool SocketBuffer::sendBufferIsEmpty() const { return bufq_.empty(); } } // namespace aria2