RPC的基础:调研EOS插件http_plugin

区块链的应用是基于http服务,这种能力在EOS中是依靠http_plugin插件赋予的。

关键字:通讯模式,add_api,http server,https server,unix server,io_service,socket,connection

通讯模式

EOS中,一个插件的使用要先获取其实例,例如http_plugin获取实例的语句是:

auto& _http_plugin = app().get_plugin<http_plugin>();

其他插件的获取方式与此相同。目前为止,包括前文介绍到的method、channel、信号槽、信号量,跨模块的交互方式可以总结为五种:

  • method,插件之间的调用,一个插件A将其函数按key注册到method池中,其他任意数量的插件B、C、D均可通过key去method池中找到该函数并调用。这种通讯模式是一个由调用者主动发起的过程。
  • channel,插件之间的调用,一个插件A按key找到频道并向频道publish一个动作,其他任意数量的插件B、C、D,甚至在不同节点上的插件B、C、D,只要是按key订阅了该channel并绑定了他们各自本地的一个notify function,就会被触发执行。这种通讯模式是基于发布订阅模式,或者说是更高级的观察者模式,是由发布者的行为交由channel来触发所有订阅者绑定的本地通知函数的过程。
  • 信号槽,插件与controller的交互过程。controller下启基于chainbase的状态数据库,上承信号的管理,通过信号来与外部进行交互,controller会根据链的行为emit一个对应的信号出来,其他插件如果有处理该信号的需求会连接connect该信号并绑定函数实现。有时候一个信号会被多个插件所连接,例如accepted_block_header信号,是承认区块头的信号,会被net_plugin捕捉并处理,同时该信号也会被chain_plugin所捕捉,触发广播。
  • 信号量,一般是应用程序与操作系统发生的交互,在EOS中,应用程序的实例是application,它与操作系统发生的交互都是通过信号量来完成,首先声明一个信号,然后通过async_wait触发信号完成与操作系统的交互。
  • 实例调用,对比以上四种松散的方式,这种模式是强关联,正如我们刚刚学习编程时喜欢使用new/create而不考虑对象的垃圾处理以及实例管理,后来会采用解耦的松散的统一实例管理框架,或者采用单例而不是每次都要new/create。但这种方式并不是完全不被推荐的,当实例的某个成员直接被需要时,可以直接通过该方式获取到,而不是通过以上四种方式来使用。

目前总结出来的五种跨模块交互方式,前四种更注重通讯,最后一种更注重其他模块的内容。更注重通讯的前四种是基于同一底层通讯机制(socket),但适用于不同场景的设计实现。

add_api函数

从chain_api_plugin过来,http_plugin的使用方式是:

_http_plugin.add_api({
      CHAIN_RO_CALL(get_info, 200l),
      ...
   });

那么,就从add_api入手研究http_plugin。add_api函数声明在http_plugin头文件中,说明该函数的内容很少或很具备通用性。

void add_api(const api_description& api) {
   for (const auto& call : api)
      add_handler(call.first, call.second);
}

从前面的调用代码可以看出,add_api函数的参数是一个对象集合,它们总体是一个api_description类型的常量引用。

using api_description = std::map<string, url_handler>;

api_description根据源码可知是一个map,key为string类型的url路径地址,值为url_handler是具体实现API功能的处理函数。在add_api的调用部分,宏CHAIN_RO_CALL调用了另一个宏CALL,CALL组装了map的这两个数:

#define CALL(api_name, api_handle, api_namespace, call_name) \
{std::string("/v1/" #api_name "/" #call_name), \
   [api_handle](string, string body, url_response_callback cb) mutable { \
          try { \
             if (body.empty()) body = "{}"; \
             auto result = api_handle.call_name(fc::json::from_string(body).as<api_namespace::call_name ## _params>()); \
             cb(200, fc::json::to_string(result)); \
          } catch (...) { \
             http_plugin::handle_exception(#api_name, #call_name, body, cb); \
          } \
       }}

CALL宏体包含两个数据,以逗号隔开,前面部分为url路径地址,后面部分为api_handler,此处实际上是一个匿名内部函数。回到add_api函数的声明,遍历整个api,逐一执行add_handler为url和api处理函数添加相互绑定的关系。

add_handler函数

直接进入函数实现的代码:

void http_plugin::add_handler(const string& url, const url_handler& handler) {
  ilog( "add api url: ${c}", ("c",url) ); // 输出日志
  app().get_io_service().post([=](){
    my->url_handlers.insert(std::make_pair(url,handler));
  });
}

app()前文讲到了,是用来获取application实例的,其包含一个public权限的成员函数get_io_service:

boost::asio::io_service& get_io_service() { return *io_serv; }

返回的是基于boost::asio::io_service库的共享指针类型,application的私有成员io_serv的指针。

io_service是asio框架中的调度器,用来调度异步事件,application实例要保存一个io_service对象,用于保存当前实例的所有待调度的异步事件。

io_service的两个重要方法:

  • post,用于发布一个异步事件,依赖asio库进行自动调度,不需要显式调用函数。
  • run,显式调用,同步执行回调函数。

当appbase.exec()执行时,io_service会同步启动,如果一个插件需要IO或其他异步操作,可以通过以下方式进行分发:

app().get_io_service().post( lambda )

那么,这种分发方式,除了在http_plugin的add_handler函数中使用到,EOSIO/eos中在bnet_plugin插件中有大量使用到,缘于bnet_plugin对异步事件发布的需求。回到add_handler函数,post后面跟随的是lambda表达式,[=]代表捕获所有以值访问的局部名字。lambda体是将url和handler作为二元组插入到http_plugin_impl对象的唯一指针my的共有成员url_handlers集合中,数据类型与上面的api_description一致。

url_handlers集合

url_handlers集合的数据源是其他插件通过add_api函数传入组装好的url和handler的对象。该集合作为api的异步处理器集合,在http_plugin中消费该集合数据的是handle_http_request函数。该函数处理外部请求,根据请求url在url_handlers集合中查找数据,找到handler以后,传入外部参数数据并执行handler对应的处理函数。

handle_http_request函数

/**
 * 处理一个http请求(http_plugin)
 * @tparam T socket type
 * @param con 连接对象
 */
template<class T>
void handle_http_request(typename websocketpp::server<T>::connection_ptr con) {
    try {
       auto& req = con->get_request(); // 获得请求对象req。
       if(!allow_host<T>(req, con))// 检查host地址是否有效
          return;
       // 根据config.ini中http_plugin相关的连接配置项进行设置。
       if( !access_control_allow_origin.empty()) {
          con->append_header( "Access-Control-Allow-Origin", access_control_allow_origin );
       }
       if( !access_control_allow_headers.empty()) {
          con->append_header( "Access-Control-Allow-Headers", access_control_allow_headers );
       }
       if( !access_control_max_age.empty()) {
          con->append_header( "Access-Control-Max-Age", access_control_max_age );
       }
       if( access_control_allow_credentials ) {
          con->append_header( "Access-Control-Allow-Credentials", "true" );
       }
       if(req.get_method() == "OPTIONS") { // HTTP method包含:`GET` `HEAD` `POST` `OPTIONS` `PUT` `DELETE` `TRACE` `CONNECT`
          con->set_status(websocketpp::http::status_code::ok);
          return;// OPTIONS不能缓存,未能获取到请求的资源。
       }
    
       con->append_header( "Content-type", "application/json" );// 增加请求头。
       auto body = con->get_request_body(); // 获得请求体(请求参数)
       auto resource = con->get_uri()->get_resource(); // 获得请求的路径(url)
       auto handler_itr = url_handlers.find( resource ); // 在url_handlers集合中找到对应的handler
       if( handler_itr != url_handlers.end()) {
          con->defer_http_response();// 延时响应
          // 调用handler,传入参数、url,回调函数是lambda表达式,用于将接收到的结果code和响应body赋值给连接。
          handler_itr->second( resource, body, [con]( auto code, auto&& body ) {
             con->set_body( std::move( body )); // 接收到的响应body赋值给连接。
             con->set_status( websocketpp::http::status_code::value( code )); // 接收到的code赋值给连接。
             con->send_http_response();// 发送http响应
          } );
       } else {
          dlog( "404 - not found: ${ep}", ("ep", resource)); // 未在url_handlers集合中找到
          // 针对失败的情况,设置http的响应对象数据。
          error_results results{websocketpp::http::status_code::not_found,
                                "Not Found", error_results::error_info(fc::exception( FC_LOG_MESSAGE( error, "Unknown Endpoint" )), verbose_http_errors )};
          con->set_body( fc::json::to_string( results ));
          con->set_status( websocketpp::http::status_code::not_found );
       }
    } catch( ... ) {
       handle_exception<T>( con );
    }
}

下面来看该函数handle_http_request的使用位置。有两处,均在http_plugin内部:

  • create_server_for_endpoint函数,为websocket对象ws设置http处理函数,是一个lambda表达式,lambda体为handle_http_request函数的调用,传入连接对象con,由hdl转换而来。另外,create_server_for_endpoint函数在http_plugin::plugin_startup中也有两处调用。
  • http_plugin::plugin_startup,插件的启动阶段,下面将分析该插件的生命周期。

http_plugin的生命周期

正如研究其他的插件一样,学习路线离不开插件的生命周期。

插件一般都是在程序入口(例如nodeos,keosd)进行生命周期的控制的,一般不做区分,由于插件有共同基类,程序入口做统一控制。

下面依次介绍http_plugin的生命周期。

http_plugin::set_defaults

仅属于http_plugin插件的生命周期。设置默认值,默认值仅包含三项:

struct http_plugin_defaults {
  // 如果不为空,该项的值将在被监听的地址生效。作为不同配置项的前缀。
  string address_config_prefix;
  // 如果为空,unix socket支持将被完全禁用。如果不为空,值为data目录的相对路径,作为默认路径启用unix socket支持。
  string default_unix_socket_path;
  // 如果不是0,HTTP将被启用于默认给出的端口号。如果是0,HTTP将不被默认启用。
  uint16_t default_http_port{0};
};

nodeos的set_defaults语句为:

http_plugin::set_defaults({
    .address_config_prefix = "",
    .default_unix_socket_path = "",
    .default_http_port = 8888
});

keosd的set_defaults语句为:

http_plugin::set_defaults({
    .address_config_prefix = "",
    // key_store_executable_name = "keosd";
    .default_unix_socket_path = keosd::config::key_store_executable_name + ".sock", // 默认unix socket路径为keosd.sock
    .default_http_port = 0
});

http_plugin::set_program_options

设置http_plugin插件的参数,构建属于http_plugin的配置选项,将与其他插件的配置共同组成配置文件config.ini,在此基础上添加–help等参数构建程序(例如nodeos)的CLI命令行参数。同时设置参数被设置以后的处理方案。

/**
 * 生命周期 http_plugin::set_program_options
 * @param cfg 命令行和配置文件的手动配置项的并集,交集以命令行配置为准的配置对象。
 */
void http_plugin::set_program_options(options_description&, options_description& cfg) {
   // 处理默认set_defaults配置项。
  my->mangle_option_names();
  if(current_http_plugin_defaults.default_unix_socket_path.length())// 默认unix socket 路径
     cfg.add_options()
        (my->unix_socket_path_option_name.c_str(), bpo::value<string>()->default_value(current_http_plugin_defaults.default_unix_socket_path),
         "The filename (relative to data-dir) to create a unix socket for HTTP RPC; set blank to disable.");
  if(current_http_plugin_defaults.default_http_port)// 设置默认http端口
     cfg.add_options()
        (my->http_server_address_option_name.c_str(), bpo::value<string>()->default_value("127.0.0.1:" + std::to_string(current_http_plugin_defaults.default_http_port)),
         "The local IP and port to listen for incoming http connections; set blank to disable.");
  else
     cfg.add_options()
        (my->http_server_address_option_name.c_str(), bpo::value<string>(),
         "The local IP and port to listen for incoming http connections; leave blank to disable.");// 端口配置为空的话禁用http
  // 根据手动配置项来设置
  cfg.add_options()
        (my->https_server_address_option_name.c_str(), bpo::value<string>(),
         "The local IP and port to listen for incoming https connections; leave blank to disable.")// 端口配置为空的话禁用http
        ("https-certificate-chain-file", bpo::value<string>(),// https的配置,证书链文件
         "Filename with the certificate chain to present on https connections. PEM format. Required for https.")
        ("https-private-key-file", bpo::value<string>(),// https的配置,私钥文件
         "Filename with https private key in PEM format. Required for https")
        ("access-control-allow-origin", bpo::value<string>()->notifier([this](const string& v) {// 跨域问题,控制访问源
            my->access_control_allow_origin = v;
            ilog("configured http with Access-Control-Allow-Origin: ${o}", ("o", my->access_control_allow_origin));
         }),
         "Specify the Access-Control-Allow-Origin to be returned on each request.")
        ("access-control-allow-headers", bpo::value<string>()->notifier([this](const string& v) {// 控制允许访问的http头
            my->access_control_allow_headers = v;
            ilog("configured http with Access-Control-Allow-Headers : ${o}", ("o", my->access_control_allow_headers));
         }),
         "Specify the Access-Control-Allow-Headers to be returned on each request.")
        ("access-control-max-age", bpo::value<string>()->notifier([this](const string& v) {// 控制访问的最大缓存age
            my->access_control_max_age = v;
            ilog("configured http with Access-Control-Max-Age : ${o}", ("o", my->access_control_max_age));
         }),
         "Specify the Access-Control-Max-Age to be returned on each request.")
        ("access-control-allow-credentials",
         bpo::bool_switch()->notifier([this](bool v) {
            my->access_control_allow_credentials = v;
            if (v) ilog("configured http with Access-Control-Allow-Credentials: true");
         })->default_value(false), // 控制访问允许的证书
         "Specify if Access-Control-Allow-Credentials: true should be returned on each request.")
         // 最大请求体的大小,默认为1MB。
        ("max-body-size", bpo::value<uint32_t>()->default_value(1024*1024), "The maximum body size in bytes allowed for incoming RPC requests")
        // 打印http详细的错误信息到日志,默认为false,不打印。
        ("verbose-http-errors", bpo::bool_switch()->default_value(false), "Append the error log to HTTP responses")
        // 校验host,如果设置为false,任意host均为有效。默认为true,要校验host。
        ("http-validate-host", boost::program_options::value<bool>()->default_value(true), "If set to false, then any incoming \"Host\" header is considered valid")
        // 别名。另外可接受的host头
        ("http-alias", bpo::value<std::vector<string>>()->composing(), "Additionaly acceptable values for the \"Host\" header of incoming HTTP requests, can be specified multiple times.  Includes http/s_server_address by default.");
}

http_plugin::plugin_initialize

插件初始化的操作。读取配置并做出处理。

实际上,在set_option_program阶段也做了对配置值的读取及转储处理。原因是一些默认参数,即用户不经常配置的选项,就不需要读取用户配置的选项,可以在set_option_program阶段做出处理,而那些需要用户来配置的选项则需要在初始化阶段读入并处理。

初始化阶段读入的配置项包含:

  • validate_host,是否校验host,bool类型的值。
  • valid_hosts,添加alias别名作为有效host。
  • listen_endpoint,根据在set_option_program阶段赋值的my成员http_server_address_option_name,重组处理得到监听点,同时添加至valid_hosts。
  • unix_endpoint,同样根据my成员unix_socket_path_option_name处理,得到绝对路径赋值给unix_endpoint。
  • 对set_option_program阶段赋值的my成员https_server_address_option_name的值的处理,https的两个配置的处理,最终重组处理,分别赋值给my成员https_listen_endpoint,https_cert_chain,https_key,以及valid_hosts。
  • max_body_size,直接赋值。

当然在初始化阶段仍旧可以配置set_option_program阶段已做出处理的配置项,以用户配置为准。

http_plugin::plugin_startup

在插件中,启动阶段都是非常重要的生命周期。它往往代码很简单甚至简略,但功能性很强。下面来看http_plugin的启动阶段的内容,g共分为三部分:

  • listen_endpoint,本地节点的http监听路径,例如127.0.0.1:8888。
  • unix_endpoint,如果为空,unix socket支持将被完全禁用。如果不为空,值为data目录的相对路径,作为默认路径启用unix socket支持。
  • https_listen_endpoint,https版本的本地节点http监听路径,一般不设置,对应的是配置中的https_server_address选项。

对于以上三种情况,启动阶段分别做了三种对应的处理,首先来看最标准最常见的情况,就是基于http的本地监听路径listen_endpoint:

if(my->listen_endpoint) {
    try {
        my->create_server_for_endpoint(*my->listen_endpoint, my->server); // 创建http服务(上面介绍到的函数)。内部调用了http请求处理函数。
        ilog("start listening for http requests");
        my->server.listen(*my->listen_endpoint);// 手动监听设置端点。使用设置绑定内部接收器。
        my->server.start_accept();// 启动服务器的异步连接,开始监听:无限循环接收器。启动服务器连接无限循环接收器。监听后必须调用。在底层io_service开始运行之前,此方法不会有任何效果。它可以在io_service已经运行之后被调用。有关如何停止此验收循环的说明,请参阅传输策略的文档。
    } catch ( const fc::exception& e ){
        elog( "http service failed to start: ${e}", ("e",e.to_detail_string()));
        throw;
    } catch ( const std::exception& e ){
        elog( "http service failed to start: ${e}", ("e",e.what()));
        throw;
    } catch (...) {
        elog("error thrown from http io service");
        throw;
    }
}

主要是启动http服务的流程,包括客户端和服务端,endpoint和server_endpoint两个角色的启动。下面来看基于unix socket的情况unix_endpoint:

if(my->unix_endpoint) {
    try {
        my->unix_server.clear_access_channels(websocketpp::log::alevel::all);// 清除所有登陆的频道
        my->unix_server.init_asio(&app().get_io_service());// 初始化io_service对象,io_service就是上面分析过的application的io_service对象,传入asio初始化函数初始化asio传输策略。在使用asio transport之前必须要init asio。
        my->unix_server.set_max_http_body_size(my->max_body_size); // 设置HTTP消息体大小的最大值,该值决定了如果超过这个值的消息体将导致连接断开。
        my->unix_server.listen(*my->unix_endpoint); // 手动设置本地socket监听路径。
        my->unix_server.set_http_handler([&](connection_hdl hdl) {// 设置http请求处理函数(注意此处不再通过create_server_for_endpoint函数来调用,因为不再需要websocket的包装)。
           my->handle_http_request<detail::asio_local_with_stub_log>( my->unix_server.get_con_from_hdl(hdl));
        });
        my->unix_server.start_accept();// 同上,启动server端的无限循环接收器。
    } catch ( const fc::exception& e ){
        elog( "unix socket service failed to start: ${e}", ("e",e.to_detail_string()));
        throw;
    } catch ( const std::exception& e ){
        elog( "unix socket service failed to start: ${e}", ("e",e.what()));
        throw;
    } catch (...) {
        elog("error thrown from unix socket io service");
        throw;
    }
}

下面来看基于https的本地监听路径https_listen_endpointd的处理:

if(my->https_listen_endpoint) {
    try {
        my->create_server_for_endpoint(*my->https_listen_endpoint, my->https_server); // 同上http的原理,只是参数换为https的值。
        // 设置TLS初始化处理器。当请求一个TLS上下文使用时,将调用该TLS初始化处理器。该处理器必须返回一个有效TLS上下文,以支持当前端点能够初始化TLS连接。
        // connection_hdl,一个连接的唯一标识。它是实现了一个弱引用智能指针weak_ptr指向连接对象。线程安全。通过函数endpoint::get_con_from_hdl()可以转化为一个完整的共享指针。
        my->https_server.set_tls_init_handler([this](websocketpp::connection_hdl hdl) -> ssl_context_ptr{
           return my->on_tls_init(hdl); 
        });
        ilog("start listening for https requests");
        my->https_server.listen(*my->https_listen_endpoint);// 同上http的原理,监听地址。
        my->https_server.start_accept();// 同上http的原理,启动服务。
    } catch ( const fc::exception& e ){
        elog( "https service failed to start: ${e}", ("e",e.to_detail_string()));
        throw;
    } catch ( const std::exception& e ){
        elog( "https service failed to start: ${e}", ("e",e.what()));
        throw;
    } catch (...) {
        elog("error thrown from https io service");
        throw;
    }
}

unix server与server的底层实现是一致的,只是外部的包裹处理不同,https_server的类型再加上这个ssl上下文的类型指针ssl_context_ptr。他们的声明分别是:

using websocket_server_type = websocketpp::server<detail::asio_with_stub_log<websocketpp::transport::asio::basic_socket::endpoint>>; // http server
using websocket_local_server_type = websocketpp::server<detail::asio_local_with_stub_log>; // unix server
using websocket_server_tls_type =  websocketpp::server<detail::asio_with_stub_log<websocketpp::transport::asio::tls_socket::endpoint>>; // https server
using ssl_context_ptr =  websocketpp::lib::shared_ptr<websocketpp::lib::asio::ssl::context>; // https ssl_context_ptr

HTTPS = HTTP over TLS。TLS的前身是SSL。

从上面的声明可以看出,http和https最大的不同是,前者是basic_socket,后者是tls_socket,socket类型不同,http是基础socket,https是包裹了tls的socket。

http_plugin::plugin_shutdown

关闭是插件的最后一个生命周期,代码很少,主要执行的是资源释放工作。

void http_plugin::plugin_shutdown() {
  if(my->server.is_listening())
     my->server.stop_listening();
  if(my->https_server.is_listening())
     my->https_server.stop_listening();
}

此处没有unix_server的处理[#6393]。http和https都是socket,需要手动停止监听,启动无限循环接收器。unix server是通过io_service来异步处理,底层实现逻辑相同,也启动了无限循环接收器。

总结

本文首先以外部使用http_plugin的方式:add_api函数为研究入口,逐层深入分析。接着从整体上研究了http_plugin的生命周期,进一步加深了对http_plugin的http/https/unix三种server的认识。

更多文章请转到醒者呆的博客园