Prechádzať zdrojové kódy

Added JSON-RPC 2.0 batch call.

Tatsuhiro Tsujikawa 14 rokov pred
rodič
commit
8a9fa9a692

+ 135 - 41
src/HttpServerBodyCommand.cc

@@ -56,6 +56,7 @@
 #include "SocketRecvBuffer.h"
 #include "SocketRecvBuffer.h"
 #include "json.h"
 #include "json.h"
 #include "DlAbortEx.h"
 #include "DlAbortEx.h"
+#include "message.h"
 
 
 namespace aria2 {
 namespace aria2 {
 
 
@@ -82,6 +83,102 @@ HttpServerBodyCommand::~HttpServerBodyCommand()
   e_->deleteSocketForReadCheck(socket_, this);
   e_->deleteSocketForReadCheck(socket_, this);
 }
 }
 
 
+namespace {
+xmlrpc::XmlRpcResponse
+createJsonRpcErrorResponse
+(int code,
+ const std::string& msg,
+ const SharedHandle<ValueBase>& id)
+{
+  SharedHandle<Dict> params = Dict::g();
+  params->put("code", Integer::g(code));
+  params->put("message", msg);
+  xmlrpc::XmlRpcResponse res(code, params, id);
+  return res;
+}
+} // namespace
+
+void HttpServerBodyCommand::sendJsonRpcResponse
+(const xmlrpc::XmlRpcResponse& res,
+ const std::string& callback)
+{
+  bool gzip = httpServer_->supportsGZip();
+  std::string responseData = res.toJson(callback, gzip);
+  if(res.code == 0) {
+    httpServer_->feedResponse(responseData, "application/json-rpc");
+  } else {
+    httpServer_->disableKeepAlive();
+    std::string httpCode;
+    switch(res.code) {
+    case -32600:
+      httpCode = "400 Bad Request";
+      break;
+    case -32601:
+      httpCode = "404 Not Found";
+      break;
+    default:
+      httpCode = "500 Internal Server Error";
+    };
+    httpServer_->feedResponse(httpCode, A2STR::NIL,
+                              responseData, "application/json-rpc");
+  }
+  addHttpServerResponseCommand();
+}
+
+void HttpServerBodyCommand::sendJsonRpcBatchResponse
+(const std::vector<xmlrpc::XmlRpcResponse>& results,
+ const std::string& callback)
+{
+  bool gzip = httpServer_->supportsGZip();
+  std::string responseData = xmlrpc::toJsonBatch(results, callback, gzip);
+  httpServer_->feedResponse(responseData, "application/json-rpc");
+  addHttpServerResponseCommand();
+}
+
+void HttpServerBodyCommand::addHttpServerResponseCommand()
+{
+  Command* command =
+    new HttpServerResponseCommand(getCuid(), httpServer_, e_, socket_);
+  e_->addCommand(command);
+  e_->setNoWait(true);
+}
+
+xmlrpc::XmlRpcResponse
+HttpServerBodyCommand::processJsonRpcRequest(const Dict* jsondict)
+{
+
+  SharedHandle<ValueBase> id = jsondict->get("id");
+  if(!id) {
+    return createJsonRpcErrorResponse(-32600, "Invalid Request.",
+                                      SharedHandle<ValueBase>());
+  }
+  const String* methodName = asString(jsondict->get("method"));
+  if(!methodName) {
+    return createJsonRpcErrorResponse(-32600, "Invalid Request.", id);
+  }
+  SharedHandle<List> params;
+  const SharedHandle<ValueBase>& tempParams = jsondict->get("params");
+  if(asList(tempParams)) {
+    params = static_pointer_cast<List>(tempParams);
+  } else if(!tempParams) {
+    params = List::g();
+  } else {
+    // TODO No support for Named params
+    return createJsonRpcErrorResponse(-32602, "Invalid params.", id);
+  }
+  xmlrpc::XmlRpcRequest req(methodName->s(), params, id);
+  SharedHandle<xmlrpc::XmlRpcMethod> method;
+  try {
+    method = xmlrpc::XmlRpcMethodFactory::create(req.methodName);
+  } catch(RecoverableException& e) {
+    A2_LOG_INFO_EX(EX_EXCEPTION_CAUGHT, e);
+    return createJsonRpcErrorResponse(-32601, "Method not found.", id);
+  }
+  method->setJsonRpc(true);
+  xmlrpc::XmlRpcResponse res = method->execute(req, e_);
+  return res;
+}
+
 bool HttpServerBodyCommand::execute()
 bool HttpServerBodyCommand::execute()
 {
 {
   if(e_->getRequestGroupMan()->downloadFinished() || e_->isHaltRequested()) {
   if(e_->getRequestGroupMan()->downloadFinished() || e_->isHaltRequested()) {
@@ -113,54 +210,51 @@ bool HttpServerBodyCommand::execute()
           bool gzip = httpServer_->supportsGZip();
           bool gzip = httpServer_->supportsGZip();
           std::string responseData = res.toXml(gzip);
           std::string responseData = res.toXml(gzip);
           httpServer_->feedResponse(responseData, "text/xml");
           httpServer_->feedResponse(responseData, "text/xml");
-          Command* command =
-            new HttpServerResponseCommand(getCuid(), httpServer_, e_, socket_);
-          e_->addCommand(command);
-          e_->setNoWait(true);
+          addHttpServerResponseCommand();
           return true;
           return true;
         } else if(reqPath == "/jsonrpc") {
         } else if(reqPath == "/jsonrpc") {
           // TODO handle query parameter
           // TODO handle query parameter
-          std::string callback;// = "callback";
+          std::string callback;
 
 
-          SharedHandle<ValueBase> json = json::decode(httpServer_->getBody());
-          const Dict* jsondict = asDict(json);
-          if(!jsondict) {
-            // TODO code: -32600, Invalid Request
-            throw DL_ABORT_EX("JSON-RPC Invalid Request");
+          SharedHandle<ValueBase> json;
+          try {
+            json = json::decode(httpServer_->getBody());
+          } catch(RecoverableException& e) {
+            A2_LOG_INFO_EX
+              (fmt("CUID#%lld - Failed to parse JSON-RPC request",
+                   getCuid()),
+               e);
+            xmlrpc::XmlRpcResponse res
+              (createJsonRpcErrorResponse
+               (-32700, "Parse error.", SharedHandle<ValueBase>()));
+            sendJsonRpcResponse(res, callback);
+            return true;
           }
           }
-          const String* methodName = asString(jsondict->get("method"));
-          if(!methodName) {
-            // TODO Batch request does not have method
-            throw DL_ABORT_EX("JSON-RPC No Method Found");
-          }
-          SharedHandle<List> params;
-          const SharedHandle<ValueBase>& tempParams = jsondict->get("params");
-          if(asList(tempParams)) {
-            params = static_pointer_cast<List>(tempParams);
-          } else if(!tempParams) {
-            params = List::g();
+          const Dict* jsondict = asDict(json);
+          if(jsondict) {
+            xmlrpc::XmlRpcResponse res = processJsonRpcRequest(jsondict);
+            sendJsonRpcResponse(res, callback);
           } else {
           } else {
-            // TODO No support for Named params
-            throw DL_ABORT_EX("JSON-RPC Named params are not supported");
+            const List* jsonlist = asList(json);
+            if(jsonlist) {
+              // This is batch call
+              std::vector<xmlrpc::XmlRpcResponse> results;
+              for(List::ValueType::const_iterator i = jsonlist->begin(),
+                    eoi = jsonlist->end(); i != eoi; ++i) {
+                const Dict* jsondict = asDict(*i);
+                if(jsondict) {
+                  xmlrpc::XmlRpcResponse r = processJsonRpcRequest(jsondict);
+                  results.push_back(r);
+                }
+              }
+              sendJsonRpcBatchResponse(results, callback);
+            } else {
+              xmlrpc::XmlRpcResponse res
+                (createJsonRpcErrorResponse
+                 (-32600, "Invalid Request.", SharedHandle<ValueBase>()));
+              sendJsonRpcResponse(res, callback);
+            }
           }
           }
-          SharedHandle<ValueBase> id = jsondict->get("id");
-          if(!id) {
-            // TODO Batch request does not have id
-            throw DL_ABORT_EX("JSON-RPC NO id found");
-          }
-          xmlrpc::XmlRpcRequest req(methodName->s(), params, id);
-
-          SharedHandle<xmlrpc::XmlRpcMethod> method =
-            xmlrpc::XmlRpcMethodFactory::create(req.methodName);
-          method->setJsonRpc(true);
-          xmlrpc::XmlRpcResponse res = method->execute(req, e_);
-          bool gzip = httpServer_->supportsGZip();
-          std::string responseData = res.toJson(callback, gzip);
-          httpServer_->feedResponse(responseData, "application/json-rpc");
-          Command* command =
-            new HttpServerResponseCommand(getCuid(), httpServer_, e_, socket_);
-          e_->addCommand(command);
-          e_->setNoWait(true);
           return true;
           return true;
         } else {
         } else {
           return true;
           return true;

+ 16 - 0
src/HttpServerBodyCommand.h

@@ -38,6 +38,8 @@
 #include "Command.h"
 #include "Command.h"
 #include "SharedHandle.h"
 #include "SharedHandle.h"
 #include "TimerA2.h"
 #include "TimerA2.h"
+#include "ValueBase.h"
+#include "XmlRpcResponse.h"
 
 
 namespace aria2 {
 namespace aria2 {
 
 
@@ -51,6 +53,20 @@ private:
   SharedHandle<SocketCore> socket_;
   SharedHandle<SocketCore> socket_;
   SharedHandle<HttpServer> httpServer_;
   SharedHandle<HttpServer> httpServer_;
   Timer timeoutTimer_;
   Timer timeoutTimer_;
+  void sendJsonRpcErrorResponse
+  (const std::string& httpStatus,
+   int code,
+   const std::string& message,
+   const SharedHandle<ValueBase>& id,
+   const std::string& callback);
+  void sendJsonRpcResponse
+  (const xmlrpc::XmlRpcResponse& res,
+   const std::string& callback);
+  void sendJsonRpcBatchResponse
+  (const std::vector<xmlrpc::XmlRpcResponse>& results,
+   const std::string& callback);
+  xmlrpc::XmlRpcResponse processJsonRpcRequest(const Dict* jsondict);
+  void addHttpServerResponseCommand();
 public:
 public:
   HttpServerBodyCommand(cuid_t cuid,
   HttpServerBodyCommand(cuid_t cuid,
                         const SharedHandle<HttpServer>& httpServer,
                         const SharedHandle<HttpServer>& httpServer,

+ 40 - 0
src/XmlRpcResponse.cc

@@ -208,6 +208,46 @@ std::string XmlRpcResponse::toJson(const std::string& callback, bool gzip) const
   }
   }
 }
 }
 
 
+namespace {
+template<typename OutputStream>
+OutputStream& encodeJsonBatchAll
+(OutputStream& o,
+ const std::vector<XmlRpcResponse>& results,
+ const std::string& callback)
+{
+  o << '[';
+  if(!results.empty()) {
+    encodeJsonAll(o, results[0].code, results[0].param, results[0].id,callback);
+  }
+  for(std::vector<XmlRpcResponse>::const_iterator i = results.begin()+1,
+        eoi = results.end(); i != eoi; ++i) {
+    o << ',';
+    encodeJsonAll(o, (*i).code, (*i).param, (*i).id, callback);
+  }
+  o << ']';
+  return o;
+}
+} // namespace
+
+std::string toJsonBatch
+(const std::vector<XmlRpcResponse>& results,
+ const std::string& callback,
+ bool gzip)
+{
+  if(gzip) {
+#ifdef HAVE_ZLIB
+    GZipEncoder o;
+    o.init();
+    return encodeJsonBatchAll(o, results, callback).str();
+#else // !HAVE_ZLIB
+    abort();
+#endif // !HAVE_ZLIB
+  } else {
+    std::stringstream o;
+    return encodeJsonBatchAll(o, results, callback).str();
+  }
+}
+
 } // namespace xmlrpc
 } // namespace xmlrpc
 
 
 } // namespace aria2
 } // namespace aria2

+ 8 - 0
src/XmlRpcResponse.h

@@ -38,6 +38,7 @@
 #include "common.h"
 #include "common.h"
 
 
 #include <string>
 #include <string>
+#include <vector>
 
 
 #include "ValueBase.h"
 #include "ValueBase.h"
 
 
@@ -69,8 +70,15 @@ struct XmlRpcResponse {
   // Encodes RPC response in JSON. If callback is not empty, the
   // Encodes RPC response in JSON. If callback is not empty, the
   // resulting string is JSONP.
   // resulting string is JSONP.
   std::string toJson(const std::string& callback, bool gzip = false) const;
   std::string toJson(const std::string& callback, bool gzip = false) const;
+
+  std::string toJsonBatch(const std::string& callback, bool gzip = false) const;
 };
 };
 
 
+std::string toJsonBatch
+(const std::vector<XmlRpcResponse>& results,
+ const std::string& callback,
+ bool gzip);
+
 } // namespace xmlrpc
 } // namespace xmlrpc
 
 
 } // namespace aria2
 } // namespace aria2