IBR-DTN  1.0.0
ApiServer.cpp
Go to the documentation of this file.
1 /*
2  * ApiServer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "Configuration.h"
24 #include "api/ApiServer.h"
25 #include "core/EventDispatcher.h"
26 #include "core/BundleCore.h"
27 
28 #include <ibrcommon/net/vaddress.h>
29 #include <ibrcommon/Logger.h>
30 #include <ibrcommon/net/vsocket.h>
31 #include <ibrcommon/net/socketstream.h>
32 
33 #include <typeinfo>
34 #include <algorithm>
35 #include <sstream>
36 #include <unistd.h>
37 #include <list>
38 
39 namespace dtn
40 {
41  namespace api
42  {
43  const std::string ApiServer::TAG = "ApiServer";
44 
45  ApiServer::ApiServer(const ibrcommon::File &socketfile)
46  : _shutdown(false), _garbage_collector(*this)
47  {
48  _sockets.add(new ibrcommon::fileserversocket(socketfile));
49  }
50 
51  ApiServer::ApiServer(const ibrcommon::vinterface &net, int port)
52  : _shutdown(false), _garbage_collector(*this)
53  {
54  if (net.isLoopback()) {
55  if (ibrcommon::basesocket::hasSupport(AF_INET6)) {
56  ibrcommon::vaddress addr6(ibrcommon::vaddress::VADDR_LOCALHOST, port, AF_INET6);
57  _sockets.add(new ibrcommon::tcpserversocket(addr6, 5));
58  }
59 
60  ibrcommon::vaddress addr4(ibrcommon::vaddress::VADDR_LOCALHOST, port, AF_INET);
61  _sockets.add(new ibrcommon::tcpserversocket(addr4, 5));
62  }
63  else if (net.isAny()) {
64  ibrcommon::vaddress addr(ibrcommon::vaddress::VADDR_ANY, port);
65  _sockets.add(new ibrcommon::tcpserversocket(addr, 5));
66  }
67  else {
68  // add a socket for each address on the interface
69  std::list<ibrcommon::vaddress> addrs = net.getAddresses();
70 
71  // convert the port into a string
72  std::stringstream ss; ss << port;
73 
74  for (std::list<ibrcommon::vaddress>::iterator iter = addrs.begin(); iter != addrs.end(); ++iter) {
75  ibrcommon::vaddress &addr = (*iter);
76 
77  try {
78  // handle the addresses according to their family
79  switch (addr.family()) {
80  case AF_INET:
81  case AF_INET6:
82  addr.setService(ss.str());
83  _sockets.add(new ibrcommon::tcpserversocket(addr, 5), net);
84  break;
85 
86  default:
87  break;
88  }
89  } catch (const ibrcommon::vaddress::address_exception &ex) {
90  IBRCOMMON_LOGGER_TAG(ApiServer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
91  }
92  }
93  }
94  }
95 
97  {
98  _garbage_collector.stop();
99  _garbage_collector.join();
100  join();
101 
102  _sockets.destroy();
103  }
104 
106  {
107  // shut-down all server sockets
108  _sockets.down();
109  }
110 
111  void ApiServer::componentUp() throw ()
112  {
113  // routine checked for throw() on 15.02.2013
114  try {
115  // bring up all server sockets
116  _sockets.up();
117  } catch (const ibrcommon::socket_exception &ex) {
118  IBRCOMMON_LOGGER_TAG(ApiServer::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
119  }
120 
122  startGarbageCollector();
123  }
124 
125  void ApiServer::componentRun() throw ()
126  {
127  try {
128  while (!_shutdown)
129  {
130  // create a socket set to do select on all sockets
131  ibrcommon::socketset fds;
132 
133  // do select on all socket and find all readable ones
134  _sockets.select(&fds, NULL, NULL, NULL);
135 
136  // iterate through all readable sockets
137  for (ibrcommon::socketset::iterator iter = fds.begin(); iter != fds.end(); ++iter)
138  {
139  // we assume all the sockets in _sockets are server sockets
140  // so cast this one to the right class
141  ibrcommon::serversocket &sock = dynamic_cast<ibrcommon::serversocket&>(**iter);
142 
143  // variable to store the peer address on the next accept call
144  ibrcommon::vaddress peeraddr;
145 
146  // accept the next client
147  ibrcommon::clientsocket *peersock = sock.accept(peeraddr);
148 
149  // set the no delay option for the new socket if configured
150  if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
151  {
152  // check if the socket is a tcpsocket
153  if ( dynamic_cast<ibrcommon::tcpsocket*>(peersock) != NULL )
154  {
155  peersock->set(ibrcommon::clientsocket::NO_DELAY, true);
156  }
157  }
158 
159  // create a new socket stream using the new client socket
160  // now the socket stream is responsible for the client socket
161  // and will destroy the instance on its own destruction
162  ibrcommon::socketstream *conn = new ibrcommon::socketstream(peersock);
163 
164  // if we are already in shutdown state
165  if (_shutdown)
166  {
167  // close the new socket
168  conn->close();
169 
170  // and free the object
171  delete conn;
172  return;
173  }
174 
175  // generate some output
176  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "new connected client at the extended API server" << IBRCOMMON_LOGGER_ENDL;
177 
178  // send welcome banner
179  (*conn) << "IBR-DTN " << dtn::daemon::Configuration::getInstance().version() << " API 1.0" << std::endl;
180 
181  // the new client object will be hold here
182  ClientHandler *obj = NULL;
183 
184  // in locked state we create a new registration for the new connection
185  {
186  ibrcommon::MutexLock l1(_registration_lock);
187 
188  // create a new registration
189  Registration *reg = new Registration();
190  _registrations.push_back(reg);
191  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "new registration " << reg->getHandle() << IBRCOMMON_LOGGER_ENDL;
192 
193  // create a new clienthandler for the new registration
194  obj = new ClientHandler(*this, *_registrations.back(), conn);
195  }
196 
197  // one again in locked state we push the new connection in the connection list
198  {
199  ibrcommon::MutexLock l2(_connection_lock);
200  _connections.push_back(obj);
201  }
202 
203  // start the client handler
204  obj->start();
205  }
206 
207  // breakpoint
208  ibrcommon::Thread::yield();
209  }
210  } catch (const std::exception&) {
211  // ignore all errors
212  return;
213  }
214  }
215 
216  void ApiServer::componentDown() throw ()
217  {
219 
220  // put the server into shutdown mode
221  _shutdown = true;
222 
223  // close the listen API socket
224  _sockets.down();
225 
226  // pause the garbage collection
227  _garbage_collector.pause();
228 
229  // stop/close all connections in locked state
230  {
231  ibrcommon::MutexLock l(_connection_lock);
232 
233  // shutdown all clients
234  for (client_list::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
235  {
236  (*iter)->stop();
237  }
238  }
239 
240  // wait until all clients are down
241  while (_connections.size() > 0) ibrcommon::Thread::sleep(1000);
242  }
243 
244  Registration& ApiServer::getRegistration(const std::string &handle)
245  {
246  ibrcommon::MutexLock l(_registration_lock);
247  for (registration_list::iterator iter = _registrations.begin(); iter != _registrations.end(); ++iter)
248  {
249  Registration &reg = (**iter);
250 
251  if (reg == handle)
252  {
253  if (reg.isPersistent()){
254  reg.attach();
255  return reg;
256  }
257  break;
258  }
259  }
260 
261  throw Registration::NotFoundException("Registration not found");
262  }
263 
264  size_t ApiServer::timeout(ibrcommon::Timer*)
265  {
266  {
267  ibrcommon::MutexLock l(_registration_lock);
268 
269  // remove non-persistent and detached registrations
270  for (registration_list::iterator iter = _registrations.begin(); iter != _registrations.end();)
271  {
272  try
273  {
274  Registration *reg = (*iter);
275 
276  reg->attach();
277  if(!reg->isPersistent()){
278  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "release registration " << reg->getHandle() << IBRCOMMON_LOGGER_ENDL;
279  _registrations.erase(iter++);
280  delete reg;
281  }
282  else
283  {
284  reg->detach();
285  ++iter;
286  }
287  }
289  {
290  ++iter;
291  }
292  }
293  }
294 
295  return nextRegistrationExpiry();
296  }
297 
298  const std::string ApiServer::getName() const
299  {
300  return "ApiServer";
301  }
302 
304  {
305  // generate some output
306  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "api connection up" << IBRCOMMON_LOGGER_ENDL;
307  }
308 
310  {
311  // generate some output
312  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "api connection down" << IBRCOMMON_LOGGER_ENDL;
313 
314  ibrcommon::MutexLock l(_connection_lock);
315 
316  // remove this object out of the list
317  for (client_list::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
318  {
319  if (obj == (*iter))
320  {
321  _connections.erase(iter);
322  break;
323  }
324  }
325  }
326 
328  {
329  if(reg.isPersistent())
330  {
331  reg.detach();
332  startGarbageCollector();
333  }
334  else
335  {
336  ibrcommon::MutexLock l(_registration_lock);
337  // remove the registration
338  for (registration_list::iterator iter = _registrations.begin(); iter != _registrations.end(); ++iter)
339  {
340  Registration *r = (*iter);
341 
342  if (reg == (*r))
343  {
344  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "release registration " << reg.getHandle() << IBRCOMMON_LOGGER_ENDL;
345  _registrations.erase(iter);
346  delete r;
347  break;
348  }
349  }
350  }
351  }
352 
354  {
355  // ignore fragments - we can not deliver them directly to the client
356  if (queued.bundle.isFragment()) return;
357 
358  ibrcommon::MutexLock l(_connection_lock);
359  for (client_list::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
360  {
361  ClientHandler &conn = **iter;
362  if (conn.getRegistration().hasSubscribed(queued.bundle.destination))
363  {
365  }
366  }
367  }
368 
369  void ApiServer::startGarbageCollector()
370  {
371  try
372  {
373  /* set the timeout for the GarbageCollector */
374  size_t timeout = nextRegistrationExpiry();
375  _garbage_collector.set(timeout);
376 
377  /* start it, if it is not running yet */
378  if(!_garbage_collector.isRunning())
379  _garbage_collector.start();
380  }
381  catch(const ibrcommon::Timer::StopTimerException &ex)
382  {
383  }
384  }
385 
386  size_t ApiServer::nextRegistrationExpiry()
387  {
388  ibrcommon::MutexLock l(_registration_lock);
389  bool persistentFound = false;
390  size_t new_timeout = 0;
391  size_t current_time = ibrcommon::Timer::get_current_time();
392 
393  // find the registration that expires next
394  for (registration_list::iterator iter = _registrations.begin(); iter != _registrations.end(); ++iter)
395  {
396  try
397  {
398  Registration &reg = (**iter);
399 
400  reg.attach();
401  if(!reg.isPersistent()){
402  /* found an expired registration, trigger the timer */
403  reg.detach();
404  new_timeout = 0;
405  persistentFound = true;
406  break;
407  }
408  else
409  {
410  size_t expire_time = reg.getExpireTime();
411  /* we dont have to check if the expire time is smaller then the current_time
412  since isPersistent() would have returned false */
413  size_t expire_timeout = expire_time - current_time;
414  reg.detach();
415 
416  /* if persistentFound is false, no persistent registration was found yet */
417  if(!persistentFound)
418  {
419  persistentFound = true;
420  new_timeout = expire_timeout;
421  }
422  else if(expire_timeout < new_timeout)
423  {
424  new_timeout = expire_timeout;
425  }
426  }
427  }
428  catch(const Registration::AlreadyAttachedException &ex)
429  {
430  }
431  }
432 
433  if(!persistentFound) throw ibrcommon::Timer::StopTimerException();
434 
435  return new_timeout;
436  }
437  }
438 }
static Configuration & getInstance(bool reset=false)
virtual void connectionUp(ClientHandler *conn)
Definition: ApiServer.cpp:303
virtual const std::string getName() const
Definition: ApiServer.cpp:298
virtual void connectionDown(ClientHandler *conn)
Definition: ApiServer.cpp:309
static void add(EventReceiver< E > *receiver)
bool _shutdown
Definition: Main.cpp:71
const std::string & getHandle() const
virtual size_t timeout(ibrcommon::Timer *)
Definition: ApiServer.cpp:264
static void remove(const EventReceiver< E > *receiver)
ApiServer(const ibrcommon::File &socket)
Definition: ApiServer.cpp:45
bool hasSubscribed(const dtn::data::EID &endpoint)
Registration & getRegistration()
void notify(const NOTIFY_CALL)
virtual ~ApiServer()
Definition: ApiServer.cpp:96
std::string version() const
void raiseEvent(const dtn::routing::QueueBundleEvent &evt)
Definition: ApiServer.cpp:353
Registration & getRegistration(const std::string &handle)
Definition: ApiServer.cpp:244
void freeRegistration(Registration &reg)
Definition: ApiServer.cpp:327