From f84c186882c0d425609857776e32c852ba267a12 Mon Sep 17 00:00:00 2001 From: pingkai Date: Sun, 26 Apr 2020 15:26:20 +0800 Subject: [PATCH] feat(framework): add communication tools Signed-off-by: pingkai --- framework/CMakeLists.txt | 1 + framework/communication/CMakeLists.txt | 14 ++ framework/communication/IProtocol.cpp | 39 ++++++ framework/communication/IProtocol.h | 40 ++++++ framework/communication/TCPProtocol.cpp | 137 +++++++++++++++++++ framework/communication/TCPProtocol.h | 71 ++++++++++ framework/communication/messageServer.cpp | 72 ++++++++++ framework/communication/messageServer.h | 46 +++++++ framework/tests/CMakeLists.txt | 1 + framework/tests/communication/CMakeLists.txt | 43 ++++++ framework/tests/communication/test.cpp | 62 +++++++++ 11 files changed, 526 insertions(+) create mode 100644 framework/communication/CMakeLists.txt create mode 100644 framework/communication/IProtocol.cpp create mode 100644 framework/communication/IProtocol.h create mode 100644 framework/communication/TCPProtocol.cpp create mode 100644 framework/communication/TCPProtocol.h create mode 100644 framework/communication/messageServer.cpp create mode 100644 framework/communication/messageServer.h create mode 100644 framework/tests/communication/CMakeLists.txt create mode 100644 framework/tests/communication/test.cpp diff --git a/framework/CMakeLists.txt b/framework/CMakeLists.txt index 0e249a9ec..4f2adf27d 100644 --- a/framework/CMakeLists.txt +++ b/framework/CMakeLists.txt @@ -48,6 +48,7 @@ add_subdirectory(codec) add_subdirectory(utils) add_subdirectory(filter) add_subdirectory(render) +add_subdirectory(communication) if (ENABLE_CACHE_MODULE) set(ENABLE_MUXER ON) add_definitions(-DENABLE_CACHE_MODULE) diff --git a/framework/communication/CMakeLists.txt b/framework/communication/CMakeLists.txt new file mode 100644 index 000000000..5179dda4d --- /dev/null +++ b/framework/communication/CMakeLists.txt @@ -0,0 +1,14 @@ +cmake_minimum_required(VERSION 3.6) +project(communication) +set(CMAKE_CXX_STANDARD 11) +add_library(communication ${TARGET_LIBRARY_TYPE} "") +target_sources(communication PRIVATE + messageServer.cpp + messageServer.h + IProtocol.cpp + IProtocol.h + TCPProtocol.cpp + TCPProtocol.h) +target_include_directories(communication PRIVATE + ${COMMON_INC_DIR} + ) \ No newline at end of file diff --git a/framework/communication/IProtocol.cpp b/framework/communication/IProtocol.cpp new file mode 100644 index 000000000..0c9eccf3c --- /dev/null +++ b/framework/communication/IProtocol.cpp @@ -0,0 +1,39 @@ +// +// Created by moqi on 2020/4/22. +// + +#include "IProtocol.h" +#include +#include +#include +#include +using namespace Cicada; + +std::string IProtocolServer::getLocalIp() +{ + struct ifaddrs *ifaddr, *ifa; + int family, s; + char host[NI_MAXHOST]; + + if (getifaddrs(&ifaddr) == -1) { + perror("getifaddrs"); + return ""; + } + + for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) { + if (ifa->ifa_addr == nullptr) continue; + + family = ifa->ifa_addr->sa_family; + + if (!strncmp(ifa->ifa_name, "lo", 2)) continue; + if (family == AF_INET) { + s = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in), host, NI_MAXHOST, nullptr, 0, NI_NUMERICHOST); + if (s != 0) { + return ""; + } + freeifaddrs(ifaddr); + return host; + } + } + return ""; +} diff --git a/framework/communication/IProtocol.h b/framework/communication/IProtocol.h new file mode 100644 index 000000000..4cb4efd8a --- /dev/null +++ b/framework/communication/IProtocol.h @@ -0,0 +1,40 @@ +// +// Created by moqi on 2020/4/22. +// + +#ifndef CICADAMEDIA_IPROTOCOL_H +#define CICADAMEDIA_IPROTOCOL_H + + +#include +namespace Cicada { + class IProtocolServer { + public: + virtual ~IProtocolServer() = default; + virtual std::string getServerUri() = 0; + + virtual int init() = 0; + + virtual int write(const uint8_t *buffer, int size) = 0; + + virtual int write_u32(uint32_t val) = 0; + virtual void flush() = 0; + + // protected: + static std::string getLocalIp(); + }; + + class IProtocolClient { + public: + virtual ~IProtocolClient() = default; + + virtual int connect(const std::string &server) = 0; + + virtual int read(uint8_t *buffer, int size) = 0; + + virtual int read_u32(uint32_t* val) = 0; + }; +}// namespace Cicada + + +#endif//CICADAMEDIA_IPROTOCOL_H diff --git a/framework/communication/TCPProtocol.cpp b/framework/communication/TCPProtocol.cpp new file mode 100644 index 000000000..99b81c26b --- /dev/null +++ b/framework/communication/TCPProtocol.cpp @@ -0,0 +1,137 @@ +// +// Created by moqi on 2020/4/22. +// + +#include "TCPProtocol.h" +#define LOG_TAG "TCPProtocol" +extern "C" { +#include +} +#include +using namespace Cicada; +using namespace std; +InterruptAble::InterruptAble() +{ + mInterruptCB.callback = check_interrupt; + mInterruptCB.opaque = this; +} +int InterruptAble::check_interrupt(void *pHandle) +{ + auto *server = static_cast(pHandle); + return server->mInterrupted.load(); +} +TCPProtocolServer::TCPProtocolServer() = default; +TCPProtocolServer::~TCPProtocolServer() +{ + interrupt(true); + mAcceptThread = nullptr; + avio_close(mServer); + for (auto item : mClients) { + avio_close(item); + } +} +std::string TCPProtocolServer::getServerUri() +{ + return "tcp://" + IProtocolServer::getLocalIp() + ":8888"; +} +int TCPProtocolServer::init() +{ + AVDictionary *options = nullptr; + int ret; + ret = av_dict_set(&options, "listen", "2", 0); + if (ret < 0) { + AF_LOGE("set listen error\n"); + return ret; + } + if ((ret = avio_open2(&mServer, "tcp://:8888", AVIO_FLAG_WRITE, &mInterruptCB, &options)) < 0) { + AF_LOGE("Failed to open server: %s\n", av_err2str(ret)); + if (options) { + av_dict_free(&options); + } + return ret; + } + if (options) { + av_dict_free(&options); + } + mAcceptThread = unique_ptr(NEW_AF_THREAD(accept_loop)); + mAcceptThread->start(); + return 0; +} +int TCPProtocolServer::accept_loop() +{ + int ret; + AVIOContext *client; + ret = avio_accept(mServer, &client); + if (ret >= 0) { + AF_LOGD("get a client\n"); + std::lock_guard lock(mClientMutex); + mClients.push_back(client); + } + return ret; +} +int TCPProtocolServer::write(const uint8_t *buffer, int size) +{ + int ret; + std::lock_guard lock(mClientMutex); + if (mClients.empty()) { + return 0; + } + for (auto item : mClients) { + avio_write(item, buffer, size); + } + + return 0; +} +void TCPProtocolServer::flush() +{ + for (auto item : mClients) { + avio_flush(item); + } +} +int TCPProtocolServer::write_u32(uint32_t val) +{ + int ret; + std::lock_guard lock(mClientMutex); + if (mClients.empty()) { + return 0; + } + for (auto item : mClients) { + avio_wl32(item, val); + } + + return 0; +} +TCPProtocolClient::TCPProtocolClient() = default; +TCPProtocolClient::~TCPProtocolClient() +{ + interrupt(true); + avio_close(mClient); +} +int TCPProtocolClient::connect(const string &server) +{ + AVDictionary *format_opts = nullptr; + av_dict_set_int(&format_opts, "rw_timeout", AV_TIME_BASE/100, 0); + + int ret = avio_open2(&mClient, server.c_str(), AVIO_FLAG_READ, &mInterruptCB, &format_opts); + if (format_opts) { + av_dict_free(&format_opts); + } + if (ret < 0) { + AF_LOGE("connect to server error %s\n", av_err2str(ret)); + avio_close(mClient); + mClient = nullptr; + } + return ret; +} +int TCPProtocolClient::read(uint8_t *buffer, int size) +{ + avio_feof(mClient); + return avio_read(mClient, buffer, size); +} +int TCPProtocolClient::read_u32(uint32_t *val) +{ + avio_feof(mClient); + *val = avio_rl32(mClient); + *val = av_le2ne32(*val); + return 0; +} diff --git a/framework/communication/TCPProtocol.h b/framework/communication/TCPProtocol.h new file mode 100644 index 000000000..45942a555 --- /dev/null +++ b/framework/communication/TCPProtocol.h @@ -0,0 +1,71 @@ +// +// Created by moqi on 2020/4/22. +// + +#ifndef CICADAMEDIA_TCPPROTOCOL_H +#define CICADAMEDIA_TCPPROTOCOL_H + +#include "IProtocol.h" +#include +#include + +extern "C" { +#include +} +namespace Cicada { + class InterruptAble { + public: + InterruptAble(); + virtual ~InterruptAble() = default; + void interrupt(bool inter) + { + mInterrupted = inter; + } + + private: + static int check_interrupt(void *pHandle); + + protected: + AVIOInterruptCB mInterruptCB{}; + std::atomic_bool mInterrupted{false}; + }; + class TCPProtocolServer : public IProtocolServer, private InterruptAble { + public: + TCPProtocolServer(); + + ~TCPProtocolServer() override; + + std::string getServerUri() override; + + int init() override; + + int write(const uint8_t *buffer, int size) override; + int write_u32(uint32_t val) override; + + void flush() override; + + private: + int accept_loop(); + + private: + AVIOContext *mServer{nullptr}; + std::mutex mClientMutex{}; + std::vector mClients{}; + std::unique_ptr mAcceptThread{}; + }; + + class TCPProtocolClient : public IProtocolClient, private InterruptAble { + public: + TCPProtocolClient(); + ~TCPProtocolClient() override; + int connect(const std::string &server) override; + int read(uint8_t *buffer, int size) override; + int read_u32(uint32_t* val) override; + + private: + AVIOContext *mClient{}; + }; +}// namespace Cicada + + +#endif//CICADAMEDIA_TCPPROTOCOL_H diff --git a/framework/communication/messageServer.cpp b/framework/communication/messageServer.cpp new file mode 100644 index 000000000..7dad6a4d4 --- /dev/null +++ b/framework/communication/messageServer.cpp @@ -0,0 +1,72 @@ +// +// Created by moqi on 2020/4/22. +// +#include "messageServer.h" +#include "TCPProtocol.h" +#define LOG_TAG "messageServer" +#include +using namespace Cicada; +messageServer::messageServer() +{ + mServer = static_cast>(new TCPProtocolServer()); +} +messageServer::~messageServer() +{} +int messageServer::write(const std::string &msg) +{ + if (!inited) { + return -1; + } + + mServer->write_u32(msg.size()); + mServer->write(reinterpret_cast(msg.c_str()), msg.size()); + mServer->flush(); + return 0; +} +int messageServer::init() +{ + if (!inited) { + int ret = mServer->init(); + if (ret >= 0) { + inited = true; + return 0; + } else + return -1; + } + return 0; +} +std::string messageServer::getServerUri() +{ + return mServer->getServerUri(); +} + +messageClient::messageClient() +{ + mClient = static_cast>(new TCPProtocolClient()); + mBuffer = static_cast(malloc(1024)); +} +messageClient::~messageClient() +{ + free(mBuffer); +} +int messageClient::connect(const std::string &server) +{ + return mClient->connect(server); +} +std::string messageClient::readMessage() +{ + uint32_t size = 0; + mClient->read_u32(&size); + mBuffer[size] = 0; + auto *p = reinterpret_cast(mBuffer); + while (size > 0) { + int ret = mClient->read(p, size); + if (ret > 0) { + p += ret; + size -= ret; + } else { + break; + } + } + return mBuffer; +} diff --git a/framework/communication/messageServer.h b/framework/communication/messageServer.h new file mode 100644 index 000000000..7db1c8e4b --- /dev/null +++ b/framework/communication/messageServer.h @@ -0,0 +1,46 @@ +// +// Created by moqi on 2020/4/22. +// + +#ifndef CICADAMEDIA_MESSAGESERVER_H +#define CICADAMEDIA_MESSAGESERVER_H + +#include "IProtocol.h" +#include +#include + + +namespace Cicada { + class messageServer { + public: + messageServer(); + ~messageServer(); + + int init(); + + std::string getServerUri(); + + int write(const std::string &msg); + + private: + std::unique_ptr mServer{}; + bool inited{false}; + }; + + class messageClient { + public: + messageClient(); + ~messageClient(); + + int connect(const std::string &server); + + std::string readMessage(); + + private: + std::unique_ptr mClient{}; + char *mBuffer{}; + }; +}// namespace Cicada + + +#endif//CICADAMEDIA_MESSAGESERVER_H diff --git a/framework/tests/CMakeLists.txt b/framework/tests/CMakeLists.txt index 2d509be7d..47dedadc5 100644 --- a/framework/tests/CMakeLists.txt +++ b/framework/tests/CMakeLists.txt @@ -20,6 +20,7 @@ add_subdirectory(dataSource) #add_subdirectory(render) add_subdirectory(demuxer) add_subdirectory(decoder) +add_subdirectory(communication) enable_testing() diff --git a/framework/tests/communication/CMakeLists.txt b/framework/tests/communication/CMakeLists.txt new file mode 100644 index 000000000..141420c49 --- /dev/null +++ b/framework/tests/communication/CMakeLists.txt @@ -0,0 +1,43 @@ +cmake_minimum_required(VERSION 3.6) +project(dataSourceTest LANGUAGES CXX) + +# require C++11 +set(CMAKE_CXX_STANDARD 11) +set(CMAKE_CXX_EXTENSIONS OFF) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +if (APPLE) + include(../Apple.cmake) +endif () +include(../../${TARGET_PLATFORM}.cmake) + +add_executable(communicationTest test.cpp) +target_link_libraries(communicationTest PRIVATE + communication + framework_utils + avformat + avcodec + avutil + ) +target_include_directories(communicationTest PRIVATE + ../../ + ${COMMON_INC_DIR} + ) + +target_link_directories(communicationTest PRIVATE ${COMMON_LIB_DIR}) +if (APPLE) + target_link_libraries( + communicationTest PUBLIC + iconv + bz2 + ${FRAMEWORK_LIBS} + ) +else () + target_link_libraries( + communicationTest PUBLIC + dl + ssl + crypto + pthread + ) +endif () \ No newline at end of file diff --git a/framework/tests/communication/test.cpp b/framework/tests/communication/test.cpp new file mode 100644 index 000000000..6f3958076 --- /dev/null +++ b/framework/tests/communication/test.cpp @@ -0,0 +1,62 @@ +// +// Created by moqi on 2020/4/22. +// + +#include +#include +#include +#include +#include +using namespace Cicada; +using namespace std; +static void testProtocol() +{ + // string ip = IProtocolServer::getLocalIp(); + // AF_LOGD("ip is %s\n", ip.c_str()); + TCPProtocolServer server; + server.init(); + TCPProtocolClient client; + client.connect(server.getServerUri()); + TCPProtocolClient client1; + client1.connect(server.getServerUri()); + af_msleep(10); + uint8_t buffer[10] = "hello"; + server.write(reinterpret_cast("hello"), 6); + memset(buffer, 0, 10); + client.read(buffer, 10); + AF_LOGD("buffer is %s\n", buffer); + + memset(buffer, 0, 10); + client1.read(buffer, 10); + AF_LOGD("buffer is %s\n", buffer); +} + +static void testMessage() +{ + messageServer server; + messageClient client[1]{}; + + server.init(); + string serverUrl = server.getServerUri(); + for (auto &item : client) { + item.connect(serverUrl); + } + + af_msleep(10); + for (auto &item : client) { + AF_LOGD("client receive %s\n", item.readMessage().c_str()); + } + + server.write("12345"); + af_msleep(1000); + for (auto &item : client) { + AF_LOGD("client receive %s\n", item.readMessage().c_str()); + } +} +int main() +{ + // testProtocol(); + testMessage(); + + return 0; +}