Skip to content

Commit

Permalink
feat(framework): add communication tools
Browse files Browse the repository at this point in the history
Signed-off-by: pingkai <pingkai010@gmail.com>
  • Loading branch information
pingkai committed Apr 26, 2020
1 parent 96837a7 commit f84c186
Show file tree
Hide file tree
Showing 11 changed files with 526 additions and 0 deletions.
1 change: 1 addition & 0 deletions framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ add_subdirectory(codec)
add_subdirectory(utils)
add_subdirectory(filter)
add_subdirectory(render)
add_subdirectory(communication)
if (ENABLE_CACHE_MODULE)
set(ENABLE_MUXER ON)
add_definitions(-DENABLE_CACHE_MODULE)
Expand Down
14 changes: 14 additions & 0 deletions framework/communication/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
cmake_minimum_required(VERSION 3.6)
project(communication)
set(CMAKE_CXX_STANDARD 11)
add_library(communication ${TARGET_LIBRARY_TYPE} "")
target_sources(communication PRIVATE
messageServer.cpp
messageServer.h
IProtocol.cpp
IProtocol.h
TCPProtocol.cpp
TCPProtocol.h)
target_include_directories(communication PRIVATE
${COMMON_INC_DIR}
)
39 changes: 39 additions & 0 deletions framework/communication/IProtocol.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//
// Created by moqi on 2020/4/22.
//

#include "IProtocol.h"
#include <cstring>
#include <ifaddrs.h>
#include <netdb.h>
#include <sys/socket.h>
using namespace Cicada;

std::string IProtocolServer::getLocalIp()
{
struct ifaddrs *ifaddr, *ifa;
int family, s;
char host[NI_MAXHOST];

if (getifaddrs(&ifaddr) == -1) {
perror("getifaddrs");
return "";
}

for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) {
if (ifa->ifa_addr == nullptr) continue;

family = ifa->ifa_addr->sa_family;

if (!strncmp(ifa->ifa_name, "lo", 2)) continue;
if (family == AF_INET) {
s = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in), host, NI_MAXHOST, nullptr, 0, NI_NUMERICHOST);
if (s != 0) {
return "";
}
freeifaddrs(ifaddr);
return host;
}
}
return "";
}
40 changes: 40 additions & 0 deletions framework/communication/IProtocol.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//
// Created by moqi on 2020/4/22.
//

#ifndef CICADAMEDIA_IPROTOCOL_H
#define CICADAMEDIA_IPROTOCOL_H


#include <string>
namespace Cicada {
class IProtocolServer {
public:
virtual ~IProtocolServer() = default;
virtual std::string getServerUri() = 0;

virtual int init() = 0;

virtual int write(const uint8_t *buffer, int size) = 0;

virtual int write_u32(uint32_t val) = 0;
virtual void flush() = 0;

// protected:
static std::string getLocalIp();
};

class IProtocolClient {
public:
virtual ~IProtocolClient() = default;

virtual int connect(const std::string &server) = 0;

virtual int read(uint8_t *buffer, int size) = 0;

virtual int read_u32(uint32_t* val) = 0;
};
}// namespace Cicada


#endif//CICADAMEDIA_IPROTOCOL_H
137 changes: 137 additions & 0 deletions framework/communication/TCPProtocol.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//
// Created by moqi on 2020/4/22.
//

#include "TCPProtocol.h"
#define LOG_TAG "TCPProtocol"
extern "C" {
#include <libavutil/bswap.h>
}
#include <utils/frame_work_log.h>
using namespace Cicada;
using namespace std;
InterruptAble::InterruptAble()
{
mInterruptCB.callback = check_interrupt;
mInterruptCB.opaque = this;
}
int InterruptAble::check_interrupt(void *pHandle)
{
auto *server = static_cast<InterruptAble *>(pHandle);
return server->mInterrupted.load();
}
TCPProtocolServer::TCPProtocolServer() = default;
TCPProtocolServer::~TCPProtocolServer()
{
interrupt(true);
mAcceptThread = nullptr;
avio_close(mServer);
for (auto item : mClients) {
avio_close(item);
}
}
std::string TCPProtocolServer::getServerUri()
{
return "tcp://" + IProtocolServer::getLocalIp() + ":8888";
}
int TCPProtocolServer::init()
{
AVDictionary *options = nullptr;
int ret;
ret = av_dict_set(&options, "listen", "2", 0);
if (ret < 0) {
AF_LOGE("set listen error\n");
return ret;
}
if ((ret = avio_open2(&mServer, "tcp://:8888", AVIO_FLAG_WRITE, &mInterruptCB, &options)) < 0) {
AF_LOGE("Failed to open server: %s\n", av_err2str(ret));
if (options) {
av_dict_free(&options);
}
return ret;
}
if (options) {
av_dict_free(&options);
}
mAcceptThread = unique_ptr<afThread>(NEW_AF_THREAD(accept_loop));
mAcceptThread->start();
return 0;
}
int TCPProtocolServer::accept_loop()
{
int ret;
AVIOContext *client;
ret = avio_accept(mServer, &client);
if (ret >= 0) {
AF_LOGD("get a client\n");
std::lock_guard<std::mutex> lock(mClientMutex);
mClients.push_back(client);
}
return ret;
}
int TCPProtocolServer::write(const uint8_t *buffer, int size)
{
int ret;
std::lock_guard<std::mutex> lock(mClientMutex);
if (mClients.empty()) {
return 0;
}
for (auto item : mClients) {
avio_write(item, buffer, size);
}

return 0;
}
void TCPProtocolServer::flush()
{
for (auto item : mClients) {
avio_flush(item);
}
}
int TCPProtocolServer::write_u32(uint32_t val)
{
int ret;
std::lock_guard<std::mutex> lock(mClientMutex);
if (mClients.empty()) {
return 0;
}
for (auto item : mClients) {
avio_wl32(item, val);
}

return 0;
}
TCPProtocolClient::TCPProtocolClient() = default;
TCPProtocolClient::~TCPProtocolClient()
{
interrupt(true);
avio_close(mClient);
}
int TCPProtocolClient::connect(const string &server)
{
AVDictionary *format_opts = nullptr;
av_dict_set_int(&format_opts, "rw_timeout", AV_TIME_BASE/100, 0);

int ret = avio_open2(&mClient, server.c_str(), AVIO_FLAG_READ, &mInterruptCB, &format_opts);
if (format_opts) {
av_dict_free(&format_opts);
}
if (ret < 0) {
AF_LOGE("connect to server error %s\n", av_err2str(ret));
avio_close(mClient);
mClient = nullptr;
}
return ret;
}
int TCPProtocolClient::read(uint8_t *buffer, int size)
{
avio_feof(mClient);
return avio_read(mClient, buffer, size);
}
int TCPProtocolClient::read_u32(uint32_t *val)
{
avio_feof(mClient);
*val = avio_rl32(mClient);
*val = av_le2ne32(*val);
return 0;
}
71 changes: 71 additions & 0 deletions framework/communication/TCPProtocol.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//
// Created by moqi on 2020/4/22.
//

#ifndef CICADAMEDIA_TCPPROTOCOL_H
#define CICADAMEDIA_TCPPROTOCOL_H

#include "IProtocol.h"
#include <queue>
#include <utils/afThread.h>

extern "C" {
#include <libavformat/avio.h>
}
namespace Cicada {
class InterruptAble {
public:
InterruptAble();
virtual ~InterruptAble() = default;
void interrupt(bool inter)
{
mInterrupted = inter;
}

private:
static int check_interrupt(void *pHandle);

protected:
AVIOInterruptCB mInterruptCB{};
std::atomic_bool mInterrupted{false};
};
class TCPProtocolServer : public IProtocolServer, private InterruptAble {
public:
TCPProtocolServer();

~TCPProtocolServer() override;

std::string getServerUri() override;

int init() override;

int write(const uint8_t *buffer, int size) override;
int write_u32(uint32_t val) override;

void flush() override;

private:
int accept_loop();

private:
AVIOContext *mServer{nullptr};
std::mutex mClientMutex{};
std::vector<AVIOContext *> mClients{};
std::unique_ptr<afThread> mAcceptThread{};
};

class TCPProtocolClient : public IProtocolClient, private InterruptAble {
public:
TCPProtocolClient();
~TCPProtocolClient() override;
int connect(const std::string &server) override;
int read(uint8_t *buffer, int size) override;
int read_u32(uint32_t* val) override;

private:
AVIOContext *mClient{};
};
}// namespace Cicada


#endif//CICADAMEDIA_TCPPROTOCOL_H
72 changes: 72 additions & 0 deletions framework/communication/messageServer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//
// Created by moqi on 2020/4/22.
//
#include "messageServer.h"
#include "TCPProtocol.h"
#define LOG_TAG "messageServer"
#include <utils/frame_work_log.h>
using namespace Cicada;
messageServer::messageServer()
{
mServer = static_cast<std::unique_ptr<IProtocolServer>>(new TCPProtocolServer());
}
messageServer::~messageServer()
{}
int messageServer::write(const std::string &msg)
{
if (!inited) {
return -1;
}

mServer->write_u32(msg.size());
mServer->write(reinterpret_cast<const uint8_t *>(msg.c_str()), msg.size());
mServer->flush();
return 0;
}
int messageServer::init()
{
if (!inited) {
int ret = mServer->init();
if (ret >= 0) {
inited = true;
return 0;
} else
return -1;
}
return 0;
}
std::string messageServer::getServerUri()
{
return mServer->getServerUri();
}

messageClient::messageClient()
{
mClient = static_cast<std::unique_ptr<IProtocolClient>>(new TCPProtocolClient());
mBuffer = static_cast<char *>(malloc(1024));
}
messageClient::~messageClient()
{
free(mBuffer);
}
int messageClient::connect(const std::string &server)
{
return mClient->connect(server);
}
std::string messageClient::readMessage()
{
uint32_t size = 0;
mClient->read_u32(&size);
mBuffer[size] = 0;
auto *p = reinterpret_cast<uint8_t *>(mBuffer);
while (size > 0) {
int ret = mClient->read(p, size);
if (ret > 0) {
p += ret;
size -= ret;
} else {
break;
}
}
return mBuffer;
}
Loading

0 comments on commit f84c186

Please sign in to comment.