/* */ #include "SocketBuffer.h" #include #include #include "SocketCore.h" #include "DlAbortEx.h" #include "message.h" #include "fmt.h" #include "LogFactory.h" #include "a2functional.h" namespace aria2 { SocketBuffer::ByteArrayBufEntry::ByteArrayBufEntry (unsigned char* bytes, size_t length, std::unique_ptr progressUpdate) : BufEntry(std::move(progressUpdate)), bytes_(bytes), length_(length) {} SocketBuffer::ByteArrayBufEntry::~ByteArrayBufEntry() { delete [] bytes_; } ssize_t SocketBuffer::ByteArrayBufEntry::send (const std::shared_ptr& socket, size_t offset) { return socket->writeData(bytes_+offset, length_-offset); } bool SocketBuffer::ByteArrayBufEntry::final(size_t offset) const { return length_ <= offset; } size_t SocketBuffer::ByteArrayBufEntry::getLength() const { return length_; } const unsigned char* SocketBuffer::ByteArrayBufEntry::getData() const { return bytes_; } SocketBuffer::StringBufEntry::StringBufEntry (std::string s, std::unique_ptr progressUpdate) : BufEntry(std::move(progressUpdate)), str_(std::move(s)) {} ssize_t SocketBuffer::StringBufEntry::send (const std::shared_ptr& socket, size_t offset) { return socket->writeData(str_.data()+offset, str_.size()-offset); } bool SocketBuffer::StringBufEntry::final(size_t offset) const { return str_.size() <= offset; } size_t SocketBuffer::StringBufEntry::getLength() const { return str_.size(); } const unsigned char* SocketBuffer::StringBufEntry::getData() const { return reinterpret_cast(str_.c_str()); } SocketBuffer::SocketBuffer(const std::shared_ptr& socket): socket_(socket), offset_(0) {} SocketBuffer::~SocketBuffer() {} void SocketBuffer::pushBytes(unsigned char* bytes, size_t len, std::unique_ptr progressUpdate) { if(len > 0) { bufq_.push_back(make_unique (bytes, len, std::move(progressUpdate))); } } void SocketBuffer::pushStr(std::string data, std::unique_ptr progressUpdate) { if(!data.empty()) { bufq_.push_back(make_unique (std::move(data), std::move(progressUpdate))); } } ssize_t SocketBuffer::send() { a2iovec iov[A2_IOV_MAX]; size_t totalslen = 0; while(!bufq_.empty()) { size_t num; size_t bufqlen = bufq_.size(); ssize_t amount = 24*1024; ssize_t firstlen = bufq_.front()->getLength() - offset_; amount -= firstlen; iov[0].A2IOVEC_BASE = reinterpret_cast(const_cast (bufq_.front()->getData() + offset_)); iov[0].A2IOVEC_LEN = firstlen; num = 1; for(auto i = std::begin(bufq_)+1, eoi = std::end(bufq_); i != eoi && num < A2_IOV_MAX && num < bufqlen && amount > 0; ++i, ++num) { ssize_t len = (*i)->getLength(); if(amount >= len) { amount -= len; iov[num].A2IOVEC_BASE = reinterpret_cast(const_cast((*i)->getData())); iov[num].A2IOVEC_LEN = len; } else { break; } } ssize_t slen = socket_->writeVector(iov, num); if(slen == 0 && !socket_->wantRead() && !socket_->wantWrite()) { throw DL_ABORT_EX(fmt(EX_SOCKET_SEND, "Connection closed.")); } // A2_LOG_NOTICE(fmt("num=%zu, amount=%d, bufq.size()=%zu, SEND=%d", // num, amount, bufq_.size(), slen)); totalslen += slen; if(firstlen > slen) { offset_ += slen; bufq_.front()->progressUpdate(slen, false); goto fin; } else { slen -= firstlen; bufq_.front()->progressUpdate(firstlen, true); bufq_.pop_front(); offset_ = 0; for(size_t i = 1; i < num; ++i) { const std::unique_ptr& buf = bufq_.front(); ssize_t len = buf->getLength(); if(len > slen) { offset_ = slen; bufq_.front()->progressUpdate(slen, false); goto fin; break; } else { slen -= len; bufq_.front()->progressUpdate(len, true); bufq_.pop_front(); } } } } fin: return totalslen; } bool SocketBuffer::sendBufferIsEmpty() const { return bufq_.empty(); } } // namespace aria2