(WebRTC : Example) PeerChannel class

Posted by : at

Category : OpenSource


Example Server PeerChannel

class PeerChannel {
 public:
  // PeerChannel은 ChannelMember을 관리해주는 클래스라 생각하면 된다.
  typedef std::vector<ChannelMember*> Members;

  PeerChannel() {}

  ~PeerChannel() { DeleteAll(); }

  const Members& members() const { return members_; }

  // Returns true if the request should be treated as a new ChannelMember
  // request.  Otherwise the request is not peerconnection related.
  // DataSocket의 상태를 통해서 Connection여부를 체크
  static bool IsPeerConnection(const DataSocket* ds);

  // Finds a connected peer that's associated with the |ds| socket.
  ChannelMember* Lookup(DataSocket* ds) const;

  // Checks if the request has a "peer_id" parameter and if so, looks up the
  // peer for which the request is targeted at.
  ChannelMember* IsTargetedRequest(const DataSocket* ds) const;

  // Adds a new ChannelMember instance to the list of connected peers and
  // associates it with the socket.
  bool AddMember(DataSocket* ds);

  // Closes all connections and sends a "shutting down" message to all
  // connected peers.
  void CloseAll();

  // Called when a socket was determined to be closing by the peer (or if the
  // connection went dead).
  void OnClosing(DataSocket* ds);

  void CheckForTimeout();

 protected:
  void DeleteAll();
  void BroadcastChangedState(const ChannelMember& member,
                             Members* delivery_failures);
  void HandleDeliveryFailures(Members* failures);

  // Builds a simple list of "name,id\n" entries for each member.
  std::string BuildResponseForNewMember(const ChannelMember& member,
                                        std::string* content_type);

 protected:
  Members members_;
};
/*
 *  Copyright 2011 The WebRTC Project Authors. All rights reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */

#include "webrtc/examples/peerconnection/server/peer_channel.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <algorithm>

#include "webrtc/examples/peerconnection/server/data_socket.h"
#include "webrtc/examples/peerconnection/server/utils.h"
#include "webrtc/base/stringutils.h"
#include "webrtc/base/urlencode.h"

using rtc::sprintfn;

// Set to the peer id of the originator when messages are being
// exchanged between peers, but set to the id of the receiving peer
// itself when notifications are sent from the server about the state
// of other peers.
//
// WORKAROUND: Since support for CORS varies greatly from one browser to the
// next, we don't use a custom name for our peer-id header (originally it was
// "X-Peer-Id: ").  Instead, we use a "simple header", "Pragma" which should
// always be exposed to CORS requests.  There is a special CORS header devoted
// to exposing proprietary headers (Access-Control-Expose-Headers), however
// at this point it is not working correctly in some popular browsers.
static const char kPeerIdHeader[] = "Pragma: ";

static const char* kRequestPaths[] = {
  "/wait", "/sign_out", "/message",
};

enum RequestPathIndex {
  kWait,
  kSignOut,
  kMessage,
};

const size_t kMaxNameLength = 512;

//
// ChannelMember
//

int ChannelMember::s_member_id_ = 0;

ChannelMember::ChannelMember(DataSocket* socket)
  : waiting_socket_(NULL), id_(++s_member_id_),
    connected_(true), timestamp_(time(NULL)) {
  assert(socket);
  assert(socket->method() == DataSocket::GET);
  assert(socket->PathEquals("/sign_in"));
  name_ = rtc::UrlDecodeString(socket->request_arguments());
  if (name_.empty())
    name_ = "peer_" + int2str(id_);
  else if (name_.length() > kMaxNameLength)
    name_.resize(kMaxNameLength);

  std::replace(name_.begin(), name_.end(), ',', '_');
}

ChannelMember::~ChannelMember() {
}

bool ChannelMember::is_wait_request(DataSocket* ds) const {
  return ds && ds->PathEquals(kRequestPaths[kWait]);
}

bool ChannelMember::TimedOut() {
  return waiting_socket_ == NULL && (time(NULL) - timestamp_) > 30;
}

std::string ChannelMember::GetPeerIdHeader() const {
  std::string ret(kPeerIdHeader + int2str(id_) + "\r\n");
  return ret;
}

bool ChannelMember::NotifyOfOtherMember(const ChannelMember& other) {
  assert(&other != this);
  QueueResponse("200 OK", "text/plain", GetPeerIdHeader(),
                other.GetEntry());
  return true;
}

// Returns a string in the form "name,id,connected\n".
std::string ChannelMember::GetEntry() const {
  assert(name_.length() <= kMaxNameLength);

  // name, 11-digit int, 1-digit bool, newline, null
  char entry[kMaxNameLength + 15];
  sprintfn(entry, sizeof(entry), "%s,%d,%d\n",
           name_.substr(0, kMaxNameLength).c_str(), id_, connected_);
  return entry;
}

void ChannelMember::ForwardRequestToPeer(DataSocket* ds, ChannelMember* peer) {
  assert(peer);
  assert(ds);

  std::string extra_headers(GetPeerIdHeader());

  if (peer == this) {
    ds->Send("200 OK", true, ds->content_type(), extra_headers,
             ds->data());
  } else {
    printf("Client %s sending to %s\n",
        name_.c_str(), peer->name().c_str());
    peer->QueueResponse("200 OK", ds->content_type(), extra_headers,
                        ds->data());
    ds->Send("200 OK", true, "text/plain", "", "");
  }
}

void ChannelMember::OnClosing(DataSocket* ds) {
  if (ds == waiting_socket_) {
    waiting_socket_ = NULL;
    timestamp_ = time(NULL);
  }
}

void ChannelMember::QueueResponse(const std::string& status,
                                  const std::string& content_type,
                                  const std::string& extra_headers,
                                  const std::string& data) {
  if (waiting_socket_) {
    assert(queue_.size() == 0);
    assert(waiting_socket_->method() == DataSocket::GET);
    bool ok = waiting_socket_->Send(status, true, content_type, extra_headers,
                                    data);
    if (!ok) {
      printf("Failed to deliver data to waiting socket\n");
    }
    waiting_socket_ = NULL;
    timestamp_ = time(NULL);
  } else {
    QueuedResponse qr;
    qr.status = status;
    qr.content_type = content_type;
    qr.extra_headers = extra_headers;
    qr.data = data;
    queue_.push(qr);
  }
}

void ChannelMember::SetWaitingSocket(DataSocket* ds) {
  assert(ds->method() == DataSocket::GET);
  if (ds && !queue_.empty()) {
    assert(waiting_socket_ == NULL);
    const QueuedResponse& response = queue_.front();
    ds->Send(response.status, true, response.content_type,
             response.extra_headers, response.data);
    queue_.pop();
  } else {
    waiting_socket_ = ds;
  }
}

//
// PeerChannel
//

// static
bool PeerChannel::IsPeerConnection(const DataSocket* ds) {
  assert(ds);
  return (ds->method() == DataSocket::POST && ds->content_length() > 0) ||
         (ds->method() == DataSocket::GET && ds->PathEquals("/sign_in"));
}

ChannelMember* PeerChannel::Lookup(DataSocket* ds) const {
  assert(ds);

  if (ds->method() != DataSocket::GET && ds->method() != DataSocket::POST)
    return NULL;

  size_t i = 0;
  for (; i < ARRAYSIZE(kRequestPaths); ++i) {
    if (ds->PathEquals(kRequestPaths[i]))
      break;
  }

  if (i == ARRAYSIZE(kRequestPaths))
    return NULL;

  std::string args(ds->request_arguments());
  static const char kPeerId[] = "peer_id=";
  size_t found = args.find(kPeerId);
  if (found == std::string::npos)
    return NULL;

  int id = atoi(&args[found + ARRAYSIZE(kPeerId) - 1]);
  Members::const_iterator iter = members_.begin();
  for (; iter != members_.end(); ++iter) {
    if (id == (*iter)->id()) {
      if (i == kWait)
        (*iter)->SetWaitingSocket(ds);
      if (i == kSignOut)
        (*iter)->set_disconnected();
      return *iter;
    }
  }

  return NULL;
}

ChannelMember* PeerChannel::IsTargetedRequest(const DataSocket* ds) const {
  assert(ds);
  // Regardless of GET or POST, we look for the peer_id parameter
  // only in the request_path.
  const std::string& path = ds->request_path();
  size_t args = path.find('?');
  if (args == std::string::npos)
    return NULL;
  size_t found;
  const char kTargetPeerIdParam[] = "to=";
  do {
    found = path.find(kTargetPeerIdParam, args);
    if (found == std::string::npos)
      return NULL;
    if (found == (args + 1) || path[found - 1] == '&') {
      found += ARRAYSIZE(kTargetPeerIdParam) - 1;
      break;
    }
    args = found + ARRAYSIZE(kTargetPeerIdParam) - 1;
  } while (true);
  int id = atoi(&path[found]);
  Members::const_iterator i = members_.begin();
  for (; i != members_.end(); ++i) {
    if ((*i)->id() == id) {
      return *i;
    }
  }
  return NULL;
}

bool PeerChannel::AddMember(DataSocket* ds) {
  assert(IsPeerConnection(ds));
  ChannelMember* new_guy = new ChannelMember(ds);
  Members failures;
  BroadcastChangedState(*new_guy, &failures);
  HandleDeliveryFailures(&failures);
  members_.push_back(new_guy);

  printf("New member added (total=%s): %s\n",
      size_t2str(members_.size()).c_str(), new_guy->name().c_str());

  // Let the newly connected peer know about other members of the channel.
  std::string content_type;
  std::string response = BuildResponseForNewMember(*new_guy, &content_type);
  ds->Send("200 Added", true, content_type, new_guy->GetPeerIdHeader(),
           response);
  return true;
}

void PeerChannel::CloseAll() {
  Members::const_iterator i = members_.begin();
  for (; i != members_.end(); ++i) {
    (*i)->QueueResponse("200 OK", "text/plain", "", "Server shutting down");
  }
  DeleteAll();
}

void PeerChannel::OnClosing(DataSocket* ds) {
  for (Members::iterator i = members_.begin(); i != members_.end(); ++i) {
    ChannelMember* m = (*i);
    m->OnClosing(ds);
    if (!m->connected()) {
      i = members_.erase(i);
      Members failures;
      BroadcastChangedState(*m, &failures);
      HandleDeliveryFailures(&failures);
      delete m;
      if (i == members_.end())
        break;
    }
  }
  printf("Total connected: %s\n", size_t2str(members_.size()).c_str());
}

void PeerChannel::CheckForTimeout() {
  for (Members::iterator i = members_.begin(); i != members_.end(); ++i) {
    ChannelMember* m = (*i);
    if (m->TimedOut()) {
      printf("Timeout: %s\n", m->name().c_str());
      m->set_disconnected();
      i = members_.erase(i);
      Members failures;
      BroadcastChangedState(*m, &failures);
      HandleDeliveryFailures(&failures);
      delete m;
      if (i == members_.end())
        break;
    }
  }
}

void PeerChannel::DeleteAll() {
  for (Members::iterator i = members_.begin(); i != members_.end(); ++i)
    delete (*i);
  members_.clear();
}

void PeerChannel::BroadcastChangedState(const ChannelMember& member,
                                        Members* delivery_failures) {
  // This function should be called prior to DataSocket::Close().
  assert(delivery_failures);

  if (!member.connected()) {
    printf("Member disconnected: %s\n", member.name().c_str());
  }

  Members::iterator i = members_.begin();
  for (; i != members_.end(); ++i) {
    if (&member != (*i)) {
      if (!(*i)->NotifyOfOtherMember(member)) {
        (*i)->set_disconnected();
        delivery_failures->push_back(*i);
        i = members_.erase(i);
        if (i == members_.end())
          break;
      }
    }
  }
}

void PeerChannel::HandleDeliveryFailures(Members* failures) {
  assert(failures);

  while (!failures->empty()) {
    Members::iterator i = failures->begin();
    ChannelMember* member = *i;
    assert(!member->connected());
    failures->erase(i);
    BroadcastChangedState(*member, failures);
    delete member;
  }
}

// Builds a simple list of "name,id\n" entries for each member.
std::string PeerChannel::BuildResponseForNewMember(const ChannelMember& member,
                                                   std::string* content_type) {
  assert(content_type);

  *content_type = "text/plain";
  // The peer itself will always be the first entry.
  std::string response(member.GetEntry());
  for (Members::iterator i = members_.begin(); i != members_.end(); ++i) {
    if (member.id() != (*i)->id()) {
      assert((*i)->connected());
      response += (*i)->GetEntry();
    }
  }

  return response;
}

About Taehyung Kim

안녕하세요? 8년차 현업 C++ 개발자 김태형이라고 합니다. 😁 C/C++을 사랑하며 다양한 사람과의 협업을 즐깁니다. ☕ 꾸준한 자기개발을 미덕이라 생각하며 노력중이며, 제가 얻은 지식을 홈페이지에 정리 중입니다. 좀 더 상세한 제 이력서 혹은 Private 프로젝트 접근 권한을 원하신다면 메일주세요. 😎

Star
Useful Links