Rebasing QxtWebContent on QxtFifo
Updated QxtFifo to permit "priming" data by a derived class Integrating changes into QxtAbstractConnector
This commit is contained in:
parent
335d2c66f9
commit
21d6e314ac
|
@ -68,6 +68,9 @@ struct QxtFifoNode {
|
|||
QxtFifoNode(const char* data, int size) : content(data, size) {
|
||||
next = NULL;
|
||||
}
|
||||
QxtFifoNode(const QByteArray &data) : content(data) {
|
||||
next = NULL;
|
||||
}
|
||||
|
||||
QByteArray content;
|
||||
QBasicAtomicPointer<QxtFifoNode> next;
|
||||
|
@ -94,6 +97,18 @@ QxtFifo::QxtFifo(QObject *parent) : QIODevice(parent)
|
|||
setOpenMode(QIODevice::ReadWrite);
|
||||
}
|
||||
|
||||
/*!
|
||||
Constructs a new QxtFifo with \a parent and initial content from \a prime.
|
||||
*/
|
||||
QxtFifo::QxtFifo(const QByteArray &prime, QObject *parent) : QIODevice(parent)
|
||||
{
|
||||
QXT_INIT_PRIVATE(QxtFifo);
|
||||
setOpenMode(QIODevice::ReadWrite);
|
||||
// Since we're being constructed, access to the internals is safe
|
||||
qxt_d().head->content = prime;
|
||||
qxt_d().available = prime.size();
|
||||
}
|
||||
|
||||
/*!
|
||||
\reimp
|
||||
*/
|
||||
|
|
|
@ -40,6 +40,7 @@ public:
|
|||
void clear();
|
||||
|
||||
protected:
|
||||
explicit QxtFifo(const QByteArray &prime, QObject * parent = 0);
|
||||
virtual qint64 readData(char * data, qint64 maxSize);
|
||||
virtual qint64 writeData(const char * data, qint64 maxSize);
|
||||
|
||||
|
|
|
@ -51,6 +51,7 @@ headers (by implementing writeHeaders(QIODevice*, const QHttpResponseHeader&)).
|
|||
#include <QHash>
|
||||
#include <QIODevice>
|
||||
#include <QByteArray>
|
||||
#include <QPointer>
|
||||
|
||||
#ifndef QXT_DOXYGEN_RUN
|
||||
class QxtAbstractHttpConnectorPrivate : public QxtPrivate<QxtAbstractHttpConnector>
|
||||
|
@ -59,6 +60,7 @@ public:
|
|||
QxtHttpSessionManager* manager;
|
||||
QReadWriteLock bufferLock, requestLock;
|
||||
QHash<QIODevice*, QByteArray> buffers; // connection->buffer
|
||||
QHash<QIODevice*, QPointer<QxtWebContent> > contents; // connection->content
|
||||
QHash<quint32, QIODevice*> requests; // requestID->connection
|
||||
quint32 nextRequestID;
|
||||
|
||||
|
@ -79,6 +81,7 @@ public:
|
|||
{
|
||||
QWriteLocker locker(&bufferLock);
|
||||
buffers.remove(device);
|
||||
contents.remove(device);
|
||||
}
|
||||
|
||||
inline void doneWithRequest(quint32 requestID)
|
||||
|
@ -162,25 +165,70 @@ void QxtAbstractHttpConnector::incomingData(QIODevice* device)
|
|||
device = qobject_cast<QIODevice*>(sender());
|
||||
if (!device) return;
|
||||
}
|
||||
QReadLocker locker(&qxt_d().bufferLock);
|
||||
QByteArray& buffer = qxt_d().buffers[device];
|
||||
buffer.append(device->readAll());
|
||||
if (!canParseRequest(buffer)) return;
|
||||
QHttpRequestHeader header = parseRequest(buffer);
|
||||
QxtWebContent* content = 0;
|
||||
QByteArray start;
|
||||
if (header.contentLength() > 0)
|
||||
// Scope things so we don't block access during incomingRequest()
|
||||
QHttpRequestHeader header;
|
||||
QxtWebContent *content = 0;
|
||||
{
|
||||
start = buffer.left(header.value("content-length").toInt());
|
||||
buffer = buffer.mid(header.value("content-length").toInt());
|
||||
content = new QxtWebContent(header.contentLength(), start, device);
|
||||
// Fetch the incoming data block
|
||||
QByteArray block = device->readAll();
|
||||
// Check for a current content "device"
|
||||
QReadLocker locker(&qxt_d().bufferLock);
|
||||
content = qxt_d().contents[device];
|
||||
if(content && (content->wantAll() || content->bytesNeeded() > 0)){
|
||||
// This block (or part of it) belongs to content device
|
||||
qint64 needed = block.size();
|
||||
if(!content->wantAll() && needed > content->bytesNeeded())
|
||||
needed = content->bytesNeeded();
|
||||
content->write(block.constData(), needed);
|
||||
if(block.size() <= needed)
|
||||
return; // Used it all ...
|
||||
block.remove(0, needed);
|
||||
}
|
||||
// The data received represents a new request (or start thereof)
|
||||
qxt_d().contents[device] = content = NULL;
|
||||
QByteArray& buffer = qxt_d().buffers[device];
|
||||
buffer.append(block);
|
||||
if (!canParseRequest(buffer)) return;
|
||||
// Have received all of the headers so we can start processing
|
||||
header = parseRequest(buffer);
|
||||
QByteArray start;
|
||||
int len = header.hasContentLength() ? int(header.contentLength()) : -1;
|
||||
if(len > 0)
|
||||
{
|
||||
if(len <= buffer.size()){
|
||||
// This request is fully-received & excess is another request
|
||||
// Leave in buffer & we'll fake a following "readyRead()"
|
||||
start = buffer.left(len);
|
||||
buffer = buffer.mid(len);
|
||||
content = new QxtWebContent(start, this);
|
||||
QMetaObject::invokeMethod(this, "incomingData",
|
||||
Qt::QueuedConnection, Q_ARG(QIODevice*, device));
|
||||
}
|
||||
else{
|
||||
// This request isn't finished yet but may still have one to
|
||||
// follow it. Remember the content device so we can append to
|
||||
// it until we've got it all.
|
||||
start = buffer;
|
||||
buffer.clear();
|
||||
qxt_d().contents[device] = content =
|
||||
new QxtWebContent(len, start, this, device);
|
||||
}
|
||||
}
|
||||
else if (header.hasKey("connection") && header.value("connection").toLower() == "close")
|
||||
{
|
||||
// Not pipelining so we want to pass all remaining data to the
|
||||
// content device. Although 'len' will be -1, we're using an
|
||||
// explict value for clarity. This causes the content device
|
||||
// to indicate it wants all remaining data.
|
||||
start = buffer;
|
||||
buffer.clear();
|
||||
qxt_d().contents[device] = content =
|
||||
new QxtWebContent(-1, start, this, device);
|
||||
} // else no content
|
||||
//
|
||||
// NOTE: Buffer lock goes out of scope after this point
|
||||
}
|
||||
else if (header.hasKey("connection") && header.value("connection").toLower() == "close")
|
||||
{
|
||||
start = buffer;
|
||||
buffer.clear();
|
||||
content = new QxtWebContent(header.contentLength(), start, device);
|
||||
} // else no content
|
||||
// Allocate request ID and process it
|
||||
quint32 requestID = qxt_d().getNextRequestID(device);
|
||||
sessionManager()->incomingRequest(requestID, header, content);
|
||||
}
|
||||
|
|
|
@ -43,39 +43,37 @@ QxtWeb uses QxtWebContent as an abstraction for streaming data.
|
|||
#include "qxtwebcontent.h"
|
||||
#include <string.h>
|
||||
#include <QUrl>
|
||||
#include <QCoreApplication>
|
||||
#include <QThread>
|
||||
|
||||
#ifndef QXT_DOXYGEN_RUN
|
||||
class QxtWebContentPrivate : public QxtPrivate<QxtWebContent>
|
||||
{
|
||||
public:
|
||||
QxtWebContentPrivate() : ignoreRemaining(false) {}
|
||||
QxtWebContentPrivate() : bytesNeeded(0), ignoreRemaining(false) {}
|
||||
QXT_DECLARE_PUBLIC(QxtWebContent)
|
||||
|
||||
void init(int contentLength, const QByteArray& start, QIODevice* device)
|
||||
void init(int contentLength, QIODevice* device)
|
||||
{
|
||||
this->start = start;
|
||||
this->device = device;
|
||||
if (contentLength <= 0)
|
||||
bytesRemaining = -1;
|
||||
else
|
||||
bytesRemaining = contentLength - start.length();
|
||||
if (device)
|
||||
{
|
||||
QObject::connect(device, SIGNAL(readyRead()), &qxt_p(), SIGNAL(readyRead()));
|
||||
// QObject::connect(device, SIGNAL(aboutToClose()), this, SIGNAL(aboutToClose()));
|
||||
// QObject::connect(device, SIGNAL(destroyed()), this, SIGNAL(aboutToClose()));
|
||||
// ask the object if it has an error signal
|
||||
if (device->metaObject()->indexOfSignal(QMetaObject::normalizedSignature(SIGNAL(error(QAbstractSocket::SocketError)))) >= 0)
|
||||
{
|
||||
QObject::connect(device, SIGNAL(error(QAbstractSocket::SocketError)), &qxt_p(), SLOT(errorReceived(QAbstractSocket::SocketError)));
|
||||
}
|
||||
}
|
||||
qxt_p().setOpenMode(QIODevice::ReadOnly);
|
||||
if (contentLength < 0)
|
||||
bytesNeeded = -1;
|
||||
else{
|
||||
bytesNeeded = contentLength - qxt_p().bytesAvailable();
|
||||
Q_ASSERT(bytesNeeded >= 0);
|
||||
}
|
||||
if(device){
|
||||
// Connect a disconnected signal if it has one
|
||||
if(device->metaObject()->indexOfSignal(QMetaObject::normalizedSignature(SIGNAL(disconnected()))) >= 0){
|
||||
QObject::connect(device, SIGNAL(disconnected()), &qxt_p(), SLOT(ignoreRemainingContent()));
|
||||
}
|
||||
// Likewise, connect an error signal if it has one
|
||||
if(device->metaObject()->indexOfSignal(QMetaObject::normalizedSignature(SIGNAL(error(QAbstractSocket::SocketError)))) >= 0){
|
||||
QObject::connect(device, SIGNAL(error(QAbstractSocket::SocketError)), &qxt_p(), SLOT(errorReceived(QAbstractSocket::SocketError)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
qint64 bytesRemaining;
|
||||
QByteArray start;
|
||||
QIODevice* device;
|
||||
qint64 bytesNeeded;
|
||||
bool ignoreRemaining;
|
||||
};
|
||||
#endif
|
||||
|
@ -83,30 +81,18 @@ public:
|
|||
/*!
|
||||
* Constructs a QxtWebContent object.
|
||||
*
|
||||
* The content provided by this constructor is the first \a contentLength bytes
|
||||
* read from the provided \a device.
|
||||
* The content provided by this constructor is the data contained in \a prime,
|
||||
* followed by whatever data is subsequently written to this object from the
|
||||
* source device up to the specified \a contentLength. Note that the provided
|
||||
* \a sourceDevice is used solely to detect socket errors and does not specify
|
||||
* parentage.
|
||||
*
|
||||
* The QxtWebContent object is parented to the \a device.
|
||||
*/
|
||||
QxtWebContent::QxtWebContent(int contentLength, QIODevice* device) : QIODevice(device)
|
||||
QxtWebContent::QxtWebContent(int contentLength, const QByteArray& prime,
|
||||
QObject *parent, QIODevice* sourceDevice) : QxtFifo(prime, parent)
|
||||
{
|
||||
QXT_INIT_PRIVATE(QxtWebContent);
|
||||
qxt_d().init(contentLength, QByteArray(), device);
|
||||
}
|
||||
|
||||
/*!
|
||||
* Constructs a QxtWebContent object.
|
||||
*
|
||||
* The content provided by this constructor is the data contained in \a start,
|
||||
* followed by enough data read from the provided \a device to fill the desired
|
||||
* \a contentLength.
|
||||
*
|
||||
* The QxtWebContent object is parented to the \a device.
|
||||
*/
|
||||
QxtWebContent::QxtWebContent(int contentLength, const QByteArray& start, QIODevice* device) : QIODevice(device)
|
||||
{
|
||||
QXT_INIT_PRIVATE(QxtWebContent);
|
||||
qxt_d().init(contentLength, start, device);
|
||||
qxt_d().init(contentLength, sourceDevice);
|
||||
}
|
||||
|
||||
/*!
|
||||
|
@ -115,21 +101,12 @@ QxtWebContent::QxtWebContent(int contentLength, const QByteArray& start, QIODevi
|
|||
* The content provided by this constructor is exactly the data contained in
|
||||
* \a content.
|
||||
*/
|
||||
QxtWebContent::QxtWebContent(const QByteArray& content, QObject* parent) : QIODevice(parent)
|
||||
QxtWebContent::QxtWebContent(const QByteArray& content, QObject* parent)
|
||||
: QxtFifo(content, parent)
|
||||
{
|
||||
QXT_INIT_PRIVATE(QxtWebContent);
|
||||
qxt_d().init(content.size(), content, 0);
|
||||
}
|
||||
|
||||
/*!
|
||||
* \reimp
|
||||
*/
|
||||
qint64 QxtWebContent::bytesAvailable() const
|
||||
{
|
||||
qint64 available = QIODevice::bytesAvailable() + (qxt_d().device ? qxt_d().device->bytesAvailable() : 0) + qxt_d().start.count();
|
||||
if (available > qxt_d().bytesRemaining)
|
||||
return qxt_d().bytesRemaining;
|
||||
return available;
|
||||
qxt_d().init(content.size(), 0);
|
||||
setOpenMode(ReadOnly);
|
||||
}
|
||||
|
||||
/*!
|
||||
|
@ -137,61 +114,60 @@ qint64 QxtWebContent::bytesAvailable() const
|
|||
*/
|
||||
qint64 QxtWebContent::readData(char* data, qint64 maxSize)
|
||||
{
|
||||
char* writePtr = data;
|
||||
// read more than 32k; TCP ideally handles 48k blocks but we need wiggle room
|
||||
if (maxSize > 32768) maxSize = 32768;
|
||||
|
||||
// don't read more than the content-length
|
||||
int sz = qxt_d().start.count();
|
||||
if (sz > 0 && maxSize > sz)
|
||||
{
|
||||
memcpy(writePtr, qxt_d().start.constData(), sz);
|
||||
writePtr += sz;
|
||||
maxSize -= sz;
|
||||
qxt_d().start.clear();
|
||||
}
|
||||
else if (sz > 0 && sz >= maxSize)
|
||||
{
|
||||
memcpy(writePtr, qxt_d().start.constData(), maxSize);
|
||||
qxt_d().start = qxt_d().start.mid(maxSize);
|
||||
return maxSize;
|
||||
}
|
||||
|
||||
if (qxt_d().device == 0)
|
||||
{
|
||||
return sz;
|
||||
}
|
||||
else if (qxt_d().bytesRemaining >= 0)
|
||||
{
|
||||
qint64 readBytes = qxt_d().device->read(writePtr, (maxSize > qxt_d().bytesRemaining) ? qxt_d().bytesRemaining : maxSize);
|
||||
qxt_d().bytesRemaining -= readBytes;
|
||||
if (qxt_d().bytesRemaining == 0) QMetaObject::invokeMethod(this, "aboutToClose", Qt::QueuedConnection);
|
||||
return sz + readBytes;
|
||||
}
|
||||
else
|
||||
{
|
||||
return sz + qxt_d().device->read(writePtr, maxSize);
|
||||
}
|
||||
int result = QxtFifo::readData(data, maxSize);
|
||||
if(bytesAvailable() == 0 && bytesNeeded() == 0)
|
||||
QMetaObject::invokeMethod(this, "aboutToClose", Qt::QueuedConnection);
|
||||
return result;
|
||||
}
|
||||
|
||||
/*!
|
||||
* Returns the number of bytes of content that have not yet been read.
|
||||
*
|
||||
* Note that not all of the remaining content may be immediately available for
|
||||
* reading. This function returns the content length, minus the number of
|
||||
* bytes that have already been read.
|
||||
* Returns true if the total content size is unknown and false otherwise.
|
||||
*/
|
||||
bool QxtWebContent::wantAll() const
|
||||
{
|
||||
return (qxt_d().bytesNeeded == -1);
|
||||
}
|
||||
|
||||
/*!
|
||||
* Returns the total number of bytes of content expected. This will be -1
|
||||
* if the total content size is unknown.
|
||||
*/
|
||||
qint64 QxtWebContent::unreadBytes() const
|
||||
{
|
||||
return qxt_d().start.size() + qxt_d().bytesRemaining;
|
||||
if(wantAll())
|
||||
return -1;
|
||||
return bytesAvailable() + bytesNeeded();
|
||||
}
|
||||
|
||||
/*!
|
||||
* Returns the number of bytes of content that have not yet been written
|
||||
* from the source device. This will be -1 if the total content size is
|
||||
* unknown.
|
||||
*/
|
||||
qint64 QxtWebContent::bytesNeeded() const
|
||||
{
|
||||
return qxt_d().bytesNeeded;
|
||||
}
|
||||
|
||||
/*!
|
||||
* \reimp
|
||||
*/
|
||||
qint64 QxtWebContent::writeData(const char*, qint64)
|
||||
qint64 QxtWebContent::writeData(const char *data, qint64 maxSize)
|
||||
{
|
||||
// always an error to write
|
||||
if(!(openMode() & WriteOnly))
|
||||
return -1; // Not accepting writes
|
||||
if(maxSize > 0) {
|
||||
// This must match the QxtFifo implementation for consistency
|
||||
if(maxSize > INT_MAX) maxSize = INT_MAX; // qint64 could easily exceed QAtomicInt, so let's play it safe
|
||||
if(qxt_d().bytesNeeded >= 0){
|
||||
qxt_d().bytesNeeded -= maxSize;
|
||||
Q_ASSERT(qxt_d().bytesNeeded >= 0);
|
||||
}
|
||||
if(qxt_d().ignoreRemaining)
|
||||
return maxSize;
|
||||
return QxtFifo::writeData(data, maxSize);
|
||||
}
|
||||
// Error
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -200,7 +176,9 @@ qint64 QxtWebContent::writeData(const char*, qint64)
|
|||
*/
|
||||
void QxtWebContent::errorReceived(QAbstractSocket::SocketError)
|
||||
{
|
||||
setErrorString(qxt_d().device->errorString());
|
||||
QIODevice *device = qobject_cast<QIODevice*>(sender());
|
||||
if(device)
|
||||
setErrorString(device->errorString());
|
||||
}
|
||||
|
||||
/*!
|
||||
|
@ -212,14 +190,12 @@ void QxtWebContent::errorReceived(QAbstractSocket::SocketError)
|
|||
*/
|
||||
void QxtWebContent::waitForAllContent()
|
||||
{
|
||||
if (!qxt_d().device) return;
|
||||
QByteArray buffer;
|
||||
while (qxt_d().device && qxt_d().bytesRemaining > 0)
|
||||
{
|
||||
buffer = qxt_d().device->readAll();
|
||||
qxt_d().start += buffer;
|
||||
qxt_d().bytesRemaining -= buffer.size();
|
||||
if (qxt_d().bytesRemaining > 0) qxt_d().device->waitForReadyRead(-1);
|
||||
while(qxt_d().bytesNeeded != 0 && !qxt_d().ignoreRemaining){
|
||||
// Still need data ... yield processing
|
||||
if(QCoreApplication::hasPendingEvents())
|
||||
QCoreApplication::processEvents();
|
||||
if(this->thread() != QThread::currentThread())
|
||||
QThread::yieldCurrentThread();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -231,11 +207,10 @@ void QxtWebContent::waitForAllContent()
|
|||
*/
|
||||
void QxtWebContent::ignoreRemainingContent()
|
||||
{
|
||||
if (qxt_d().bytesRemaining <= 0 || !qxt_d().device) return;
|
||||
if (!qxt_d().ignoreRemaining)
|
||||
{
|
||||
qxt_d().ignoreRemaining = true;
|
||||
QObject::connect(qxt_d().device, SIGNAL(readyRead()), this, SLOT(ignoreRemainingContent()));
|
||||
if (qxt_d().bytesNeeded <= 0) return;
|
||||
if(!qxt_d().ignoreRemaining){
|
||||
qxt_d().ignoreRemaining = true;
|
||||
qxt_d().bytesNeeded = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,18 +30,20 @@
|
|||
#include <QByteArray>
|
||||
#include <QHash>
|
||||
#include <qxtglobal.h>
|
||||
#include <qxtfifo.h>
|
||||
|
||||
class QxtWebContentPrivate;
|
||||
class QXT_WEB_EXPORT QxtWebContent : public QIODevice
|
||||
class QXT_WEB_EXPORT QxtWebContent : public QxtFifo
|
||||
{
|
||||
Q_OBJECT
|
||||
public:
|
||||
QxtWebContent(int contentLength, const QByteArray& start, QIODevice* device);
|
||||
QxtWebContent(int contentLength, QIODevice* device);
|
||||
QxtWebContent(int contentLength, const QByteArray& start, QObject *parent,
|
||||
QIODevice* sourceDevice);
|
||||
explicit QxtWebContent(const QByteArray& content, QObject* parent = 0);
|
||||
static QHash<QString, QString> parseUrlEncodedQuery(const QString& data);
|
||||
|
||||
virtual qint64 bytesAvailable() const;
|
||||
bool wantAll() const;
|
||||
qint64 bytesNeeded() const;
|
||||
qint64 unreadBytes() const;
|
||||
|
||||
void waitForAllContent();
|
||||
|
|
Loading…
Reference in New Issue
Block a user