なお、JSON-RPCは必ずしもhttpやTCP上で通信を行う必要はなく、JSON-RPC 2.0ではどのプロトコル上で通信を行うべきかについては記述されていません。 このサンプルではsocket
をnlib
経由で使うことができない場合(NLIB_NO_SOCKET
)、メモリ上でJSON-RPCのリクエストとレスポンスをやりとりして動作しています。
#include <string>
#include <map>
#include "jsonrpc.h"
#ifndef DONT_USE_SOCKET
#include "nn/nlib/TcpInputStream.h"
#include "nn/nlib/TcpOutputStream.h"
#endif
using nlib_ns::threading::Future;
using nlib_ns::msgpack::MpObject;
using nlib_ns::msgpack::jsonrpc::JsonRpcResponse;
using nlib_ns::msgpack::jsonrpc::JsonRpcClient;
using nlib_ns::msgpack::jsonrpc::JsonRpcRequest;
using nlib_ns::msgpack::jsonrpc::JsonRpcRequestWriter;
using nlib_ns::msgpack::jsonrpc::JsonRpcResponseHandler;
Tunnel g_c2s;
Tunnel g_s2c;
namespace {
volatile bool g_shutdown_client = false;
#ifdef DONT_USE_SOCKET
class ClientEndByMemory {
public:
g_c2s.Send(data, n);
return 0;
}
while (!g_s2c.Receive(data, n)) {
if (g_shutdown_client) {
*n = 0;
return 0;
}
}
return 0;
}
bool IsOpen() { return true; }
};
#else
class ClientEndByTcp {
public:
ClientEndByTcp() : socket_(NLIB_SOCKET_INVALID) {}
errno_t Init() {
return nlib_socket(&socket_, AF_INET, SOCK_STREAM | NLIB_SOCK_NONBLOCK, 0); }
nlib_sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = nlib_htons(NLIB_SOCKPORT_SAMPLE);
addr.sin_addr.s_addr = nlib_htonl(INADDR_LOOPBACK);
e = nlib_connect_for(socket_, reinterpret_cast<nlib_sockaddr*>(&addr), sizeof(addr),
0, 500 * 10000);
socket_ = NLIB_SOCKET_INVALID;
return e;
}
return stream_.Init(socket_);
}
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(send_lock_);
nlib_ns::TcpOutputStream os;
os.Init(socket_);
{
}
os.Write(data->get(), n);
os.Write(',');
os.Flush();
return 0;
else
return os.GetErrorValue();
}
using nlib_ns::threading::SimpleCriticalSection;
using nlib_ns::threading::ScopedLock;
ScopedLock<SimpleCriticalSection> l(receive_lock_);
if (stream_.Peek() < 0) {
*n = 0;
return 0;
}
int c;
size_t nbytes = 0;
for (;;) {
c = stream_.Read();
if (c == ':') break;
size_t new_val = nbytes * 10 + c - '0';
if (new_val < nbytes) return EILSEQ;
nbytes = new_val;
}
data->reset(reinterpret_cast<uint8_t*>(
nlib_malloc(nbytes)));
if (!*data) return ENOMEM;
if (nbytes != stream_.Read(data->get(), nbytes)) return EILSEQ;
if (stream_.Read() != ',') return EILSEQ;
*n = nbytes;
return 0;
}
bool IsOpen() { return socket_ != -1; }
errno_t Close() {
return nlib_shutdownsocket(socket_, SHUT_RDWR); }
private:
nlib_sock socket_;
nlib_ns::TcpInputStream stream_;
nlib_ns::threading::SimpleCriticalSection send_lock_;
nlib_ns::threading::SimpleCriticalSection receive_lock_;
};
#endif
#ifndef DONT_USE_SOCKET
typedef ClientEndByTcp ClientEnd;
#else
typedef ClientEndByMemory ClientEnd;
#endif
UniquePtr<ClientEnd> g_client_end;
JsonRpcClient g_client;
errno_t SendRequest(
const char* method, uint32_t reqid, MpObject& params) {
Nlist<JsonRpcRequest> req_list;
JsonRpcRequest* req = req_list.push_back();
if (!req) return ENOMEM;
req->SetId(reqid);
if (!req->SetMethod(method)) {
return ENOMEM;
}
req->MoveParamsFrom(params);
JsonRpcRequestWriter w(false);
e = w.BeginWriteRequest(static_cast<uint32_t>(req_list.size()));
Nlist<JsonRpcRequest>::const_iterator it = req_list.begin();
Nlist<JsonRpcRequest>::const_iterator end = req_list.end();
for (; it != end; ++it) {
e = w.WriteRequest(*it);
}
size_t n;
ReallocOutputStream::UniquePtrType req_bytes;
e = w.EndWriteRequest(&req_bytes, &n);
return g_client_end->Send(&req_bytes, n);
}
void ResponseHandlerThread(void*) {
ReallocOutputStream::UniquePtrType data;
for (;;) {
size_t n = 0;
errno_t e = g_client_end->Receive(&data, &n);
if (g_shutdown_client || !g_client_end->IsOpen()) break;
} else if (n == 0) {
break;
} else {
e = g_client.ResolveResponse(data.get(), n);
}
}
}
}
}
namespace {
std::string CreateMessage(DateTime dt, std::string msg) {
MpObject obj;
char dtcstr[32];
{
std::map<std::string, std::string> params;
e = dt.ToW3cDtf(dtcstr);
NLIB_ASSERT_NOERR(e);
params["datetime"] = dtcstr;
params["text"] = msg;
obj.Box(params);
}
NLIB_ASSERT(g_client);
reqid_t reqid = g_client.GenerateId();
JsonRpcClient::FutureType future;
e = g_client.GetFutureForId(&future, reqid);
e = SendRequest("create", reqid, obj);
g_client.Abort(reqid);
return std::string();
}
g_client.Abort(reqid);
return std::string();
}
JsonRpcResponse* response = future.Get();
std::string uri;
} else {
e = response->GetMpObject().Unbox(&uri);
NLIB_ASSERT_NOERR(e);
nlib_printf(
"===create('%s', '%s'): success. uri='%s'\n", dtcstr, msg.c_str(), uri.c_str());
}
return uri;
}
bool ReadMessage(MpObject* obj) {
NLIB_ASSERT(g_client);
reqid_t reqid = g_client.GenerateId();
JsonRpcClient::FutureType future;
e = g_client.GetFutureForId(&future, reqid);
MpObject dummy;
e = SendRequest("read", reqid, dummy);
g_client.Abort(reqid);
return false;
}
e = future.WaitFor(TimeSpan(0, 3));
g_client.Abort(reqid);
return false;
}
JsonRpcResponse* response = future.Get();
response ? response->GetErrorMessage() : "response null");
return false;
} else {
MpObject& o = response->GetMpObject();
NLIB_ASSERT(o.IsArray());
uint32_t n = o.GetSize();
nlib_printf(
"===read(): success messages = %" PRIu32
"\n", n);
for (uint32_t i = 0; i < n; ++i) {
MpObject* item = o.GetArrayItem(i);
std::map<std::string, std::string> params;
e = item->Unbox(¶ms);
NLIB_ASSERT_NOERR(e);
nlib_printf(
"#%" PRIu32
": uri='%s', datetime='%s', text='%s'\n", i,
params["uri"].c_str(), params["datetime"].c_str(), params["text"].c_str());
}
return true;
}
}
bool UpdateMessage(std::string uri, DateTime dt, std::string msg) {
MpObject obj;
char dtcstr[32];
{
std::map<std::string, std::string> params;
e = dt.ToW3cDtf(dtcstr);
NLIB_ASSERT_NOERR(e);
params["datetime"] = dtcstr;
params["text"] = msg;
params["uri"] = uri;
obj.Box(params);
}
NLIB_ASSERT(g_client);
reqid_t reqid = g_client.GenerateId();
JsonRpcClient::FutureType future;
e = g_client.GetFutureForId(&future, reqid);
e = SendRequest("update", reqid, obj);
g_client.Abort(reqid);
return false;
}
g_client.Abort(reqid);
return false;
}
JsonRpcResponse* response = future.Get();
nlib_printf(
"===update('%s', '%s', '%s'): error '%s'\n", uri.c_str(), dtcstr, msg.c_str(),
response ? response->GetErrorMessage() : "response null");
return false;
} else {
nlib_printf(
"===update('%s', '%s', '%s'): success\n", uri.c_str(), dtcstr, msg.c_str());
return true;
}
}
bool DeleteMessage(std::string uri) {
MpObject obj;
{
std::map<std::string, std::string> params;
params["uri"] = uri;
obj.Box(params);
}
NLIB_ASSERT(g_client);
reqid_t reqid = g_client.GenerateId();
JsonRpcClient::FutureType future;
errno_t e = g_client.GetFutureForId(&future, reqid);
e = SendRequest("delete", reqid, obj);
g_client.Abort(reqid);
return false;
}
g_client.Abort(reqid);
return false;
}
JsonRpcResponse* response = future.Get();
nlib_printf(
"===delete('%s'): error '%s'\n", uri.c_str(),
response ? response->GetErrorMessage() : "response null");
return false;
} else {
nlib_printf(
"===delete('%s'): success\n", uri.c_str());
return true;
}
}
}
const int kNumConsoleData = 6;
struct ConsoleData {
const char* description;
const char* date;
} consoleData[kNumConsoleData] = {
{"Cassette Vision(13500 yen)", "1981-07-30"},
{"Pyu-ta(59800 yen)", "1982-08-20"},
{"Faminly Computer(14800 yen)", "1983-07-15"},
{"SG-3000(29800 yen)", "1983-07-15"},
{"Mark III(15000 yen)", "1985-10-20"},
{"Disk System", "1976-02-21"}
};
bool Listup() {
Future<bool> future;
MpObject* ptr = NULL;
Async(&future, ReadMessage, ptr);
return future.Get();
}
bool JsonRpc() {
e = InitServer();
std::string uri1, uri5;
g_client_end.reset(new ClientEnd());
if (!g_client_end) return false;
e = g_client_end->Init();
e = g_client_end->Connect();
NLIB_ASSERT(!g_client);
g_client = JsonRpcClient::OpenClient("myclient");
if (!g_client) return false;
g_shutdown_client = true;
g_client_end->Close();
return false;
}
g_shutdown_client = true;
g_client_end->Close();
return false;
}
Future<std::string> ft[kNumConsoleData];
for (int i = 0; i < kNumConsoleData; ++i) {
const ConsoleData& c = consoleData[i];
DateTime dt;
TimeSpan dummy;
e = DateTime::Parse(c.date, &dt, &dummy);
NLIB_ASSERT_NOERR(e);
NLIB_UNUSED(e);
e =
Async(&ft[i], CreateMessage, dt, std::string(c.description));
}
#if 1
for (int i = 0; i < kNumConsoleData; ++i) {
ft[i].Get();
}
#endif
if (!Listup()) {
}
uri5 = ft[5].Get();
if (!uri5.empty()) {
nlib_printf(
"Update '%s' to fix data\n", uri5.c_str());
DateTime dt_up1;
dt_up1.Init(1986, 2, 21);
Future<bool> result;
e =
Async(&result, UpdateMessage, ft[5].Get(), dt_up1,
std::string("Disk System(15000 yen)"));
if (!result.Get()) {
goto GENERAL_ERROR;
}
} else {
goto GENERAL_ERROR;
}
if (!Listup()) {
}
uri1 = ft[1].Get();
if (!uri1.empty()) {
Future<bool> ft_del1, ft_del2;
e =
Async(&ft_del1, DeleteMessage, uri1);
e =
Async(&ft_del2, DeleteMessage, uri1);
if (ft_del1.Get()) {
if (ft_del2.Get()) goto kInternalError;
} else if (!ft_del2.Get()) {
goto kInternalError;
}
} else {
goto GENERAL_ERROR;
}
if (!Listup()) {
}
g_shutdown_client = true;
g_shutdown_server = true;
g_client_end->Close();
return true;
GENERAL_ERROR:
g_shutdown_client = true;
g_shutdown_server = true;
g_client_end->Close();
return true;
kInternalError:
g_shutdown_client = true;
g_shutdown_server = true;
g_client_end->Close();
return false;
}
bool SampleMain(int, char**) {
bool result = JsonRpc();
g_client_end.reset();
return result;
}
NLIB_MAINFUNC