IBR-DTNSuite 0.6

ibrdtn/ibrdtn/api/CApi.cpp

Go to the documentation of this file.
00001 
00003 #include "ibrdtn/api/dtn_api.h"
00004 #include "ibrcommon/net/tcpclient.h"
00005 #include "ibrdtn/api/Client.h"
00006 #include "ibrdtn/api/StringBundle.h"
00007 #include <string.h>
00008 #include <cstdarg>
00009 #include <unistd.h>
00010 
00011 
00012 
00013 class CAPIGateway : public dtn::api::Client
00014 {
00015         public:
00016                 CAPIGateway(string app, void (*process_bundle)(const void * data, uint32_t size), void (*status_callback)(struct dtn_notification info) = NULL , string address = "127.0.0.1", int port = 4550)
00017                 : dtn::api::Client(app, _tcpclient), _tcpclient(address, port)
00018                 {
00019                         cout << "Connecting daemon...";
00020                         this->connect(); //Rückgabewerte?!!?
00021                         cout << "Done" << endl;
00022                         this->process_bundle=process_bundle;
00023                         this->status_callback=status_callback;
00024 
00025                         tx_buffer_position=0;
00026                         tx_buffer_size=KBYTE(64);
00027                         tx_buffer=(char *)malloc(tx_buffer_size);
00028                         dst_eid=NULL;
00029                         if (tx_buffer == NULL) {
00030                                 cout << "C_API: Error allocating tx_buffer." << endl;
00031                                 tx_buffer_size=0;
00032                         }
00033                         if (this->process_bundle == NULL) { //enter synchronous mode
00034                                 if ( pipe(sync_pipe) != 0) {
00035                                         perror("C_API: Error creating pipe for synchronous operation");
00036                                 }
00037                         }
00038                 };
00039 
00043                 virtual ~CAPIGateway()
00044                 {
00045                         // Close the tcp connection.
00046                         _tcpclient.close();
00047                 };
00048 
00049                 void send(char *dst_uri, char *data, uint32_t length) {
00050                         dtn::data::Bundle b;
00051                         b._destination = dtn::data::EID(dst_uri);
00052                         dtn::data::PayloadBlock &payload = b.push_back<dtn::data::PayloadBlock>();
00053 
00054                         // add the data
00055                         (*payload.getBLOB().iostream()).write(data, length);
00056 
00057                         // transmit the packet
00058                         dtn::data::DefaultSerializer(*this) << b;
00059                         Client::flush();
00060                 }
00061 
00062                 void dtn_write(const void *data, uint32_t length) {
00063                         uintptr_t offset=0;
00064                         uint32_t tocopy=0;
00065                         while (offset < length) {
00066                                 if ((tx_buffer_size-tx_buffer_position) >= (length-offset))
00067                                         tocopy=(length-offset);
00068                                 else
00069                                         tocopy=tx_buffer_size-tx_buffer_position;
00070                                 //cout << "CPY " << tocopy << " because size is " << tx_buffer_size << " and position is " << tx_buffer_position << endl;
00071                                 memcpy(tx_buffer+tx_buffer_position,(uint8_t *)data+offset,tocopy);
00072                                 offset+=tocopy;
00073                                 tx_buffer_position+=tocopy;
00074                                 if (tx_buffer_position == tx_buffer_size) { //TX buffer full, transmit
00075                                         cout << "TX !" << endl;
00076                                         transmit_buffer();
00077                                 }
00078                         }
00079                 }
00080 
00081                 int32_t dtn_read(void *buf, uint32_t length) {
00082                         if (process_bundle != NULL) {
00083                                 cout << "C_API: Can't use dtn_read() for endpoint in asynchronous mode" << endl;
00084                                 return -1;
00085                         }
00086                         return ::read(sync_pipe[0],buf, length);
00087                 }
00088 
00089                 void transmit_buffer() {
00090                         if (dst_eid==NULL) {
00091                                 cout << "C_API: transmit request without destination: Clearing buffer " << endl;
00092                                 tx_buffer_position=0;
00093                                 return;
00094                         }
00095                         if (tx_buffer_position == 0) //is empty
00096                                 return;
00097                         dtn::data::Bundle b;
00098                         b._destination = dtn::data::EID(this->dst_eid);
00099                         dtn::data::PayloadBlock &payload = b.push_back<dtn::data::PayloadBlock>();
00100 
00101                         // add the data
00102                         (*payload.getBLOB().iostream()).write(tx_buffer, tx_buffer_position); //+1 is wrong =?!?
00103 
00104                         // transmit the packet
00105                         dtn::data::DefaultSerializer(*this) << b;
00106                         Client::flush();
00107                         tx_buffer_position=0;
00108                 }
00109 
00110                 void set_dsteid(const char *dst) {
00111                         if (this->dst_eid != NULL) {
00112                                 free(this->dst_eid);
00113                         }
00114                         this->dst_eid = (char *)malloc(strlen(dst)+1);
00115                         if (this->dst_eid == NULL) {
00116                                 cout << "C_API: Set destination eid failed. No memory" << endl;
00117                                 return;
00118                         }
00119                         strcpy(this->dst_eid,dst);
00120                 }
00121 
00122                 void set_txchunksize(uint32_t chunksize) {
00123                         transmit_buffer(); //flush old buffer
00124                         if (tx_buffer != NULL)
00125                                 free(tx_buffer);
00126                         tx_buffer=(char *)malloc(chunksize);
00127                         if (tx_buffer == NULL) {
00128                                 cout << "C_API: Set chunksize failed. No memory" << endl;
00129                                 tx_buffer_size=0;
00130                                 return;
00131                         }
00132                         cout << "C_API: New tx_buffer_size: " << chunksize << endl;
00133                         tx_buffer_size=chunksize;
00134                 }
00135 
00136         private:
00137                 void (*process_bundle)(const void * data, uint32_t size);
00138                 void (*status_callback)(struct dtn_notification info);
00139                 void *recvd;
00140                 ibrcommon::Mutex recv_mutex;
00141                 int sync_pipe[2];
00142 
00143                 char *tx_buffer;
00144                 uint32_t tx_buffer_size;
00145                 uint32_t tx_buffer_position;
00146                 char *dst_eid;
00147 
00148                 void notify(uint16_t status, uint32_t data) {
00149                         struct dtn_notification n;
00150                         if (status_callback != NULL) {
00151                                 n.status=status;
00152                                 n.data=data;
00153                                 recv_mutex.enter();
00154                                 status_callback(n);
00155                                 recv_mutex.leave();
00156                         }
00157                 }
00158 
00159                 ibrcommon::tcpclient _tcpclient;
00160 
00161 
00162 
00168                 void received(dtn::api::Bundle &b)
00169                 {
00170                         if (process_bundle != NULL) {
00171                                 receive_async(b);
00172                         }
00173                         else {
00174                                 receive_sync(b);
00175                         }
00176                 }
00177 
00178                 void receive_async(dtn::api::Bundle &b) {
00179                         int32_t len=b.getData().iostream().size();
00180                         //cout << "You probably wouldn't believe it but I received " << len << " bytes stuff " << endl;
00181                         if (len < 0 || len > 65536 ) {  //todo: set tx_buffer size
00182                                 cout << "Ignoring bundle" << endl;
00183                                 notify(DTN_NOTIFY_TOOBIG,len);
00184                                 return;
00185                         }
00186                         recvd=malloc(len);
00187                         (*b.getData().iostream()).read((char *)recvd, len);
00188                         recv_mutex.enter();
00189                         process_bundle(recvd,len);
00190                         recv_mutex.leave();
00191                         free(recvd);
00192                 }
00193 
00194                 void receive_sync(dtn::api::Bundle &b) {
00195                         uint32_t chunksize=KBYTE(64);
00196                         char *buffer=(char *)malloc(chunksize);
00197                         uint32_t len=b.getData().iostream().size();
00198                         uint32_t offset=0;
00199                         if (buffer == NULL) {
00200                                 cout << "C_API: receive_sync(): Can't allocate buffer " << endl;
00201                                 return;
00202                         }
00203                         //cout << "You probably wouldn't believe it but I received " << len << " bytes stuff " << endl;
00204                         while(len != 0) {
00205                                 if (len <= chunksize) {
00206                                         //cout << "Write " << len << endl;
00207                                         (*b.getData().iostream()).read(buffer, len);
00208                                         if ( ::write(sync_pipe[1],buffer,len) < 0 )
00209                                         {
00210                                                 std::cerr << "error while writing" << std::endl;
00211                                         }
00212                                         break;
00213                                 }
00214                                 else {
00215                                         //cout << "Write " << chunksize << endl;
00216                                         (*b.getData().iostream()).read(buffer, chunksize);
00217                                         offset+=chunksize; len-=chunksize;
00218                                         if ( ::write(sync_pipe[1],buffer,chunksize) < 0 )
00219                                         {
00220                                                 std::cerr << "error while writing" << std::endl;
00221                                         }
00222                                 }
00223                         }
00224                 }
00225 };
00226 
00227 
00228 struct dtn_fd {
00229         CAPIGateway *gate;
00230 };
00231 
00232 //TODO alloc with[MAX_DTN_FDS], init on DT dameon init
00233 static struct dtn_fd dtn_fds[] = {{NULL},{NULL},{NULL},{NULL}};
00234 
00235 extern "C" int32_t dtn_register_endpoint(char *ep, void (*process_bundle)(const void * data, uint32_t size), void (*status_callback)(struct dtn_notification info)) {
00236         int i;
00237         cout << "Register Endpoint " << ep << endl;
00238         for (i=0; i<MAX_DTN_FDS; i++) {
00239                 if (dtn_fds[i].gate == NULL) {
00240                         cout << "Alloc dtn_fd " << i << endl;
00241                         dtn_fds[i].gate = new CAPIGateway(ep, process_bundle);
00242 
00243                         return i;
00244                 }
00245         }
00246         cout << "C_API: DTN fds exhausted " << endl;
00247         return -1;
00248 }
00249 
00250 
00251 extern "C" void dtn_close_endpoint(DTN_EP ep) {
00252         if (ep < 0 || ep >= MAX_DTN_FDS) {
00253                 cout << "C_API: Invalid ep descriptor in close(): " << ep << endl;
00254                 return;
00255         }
00256         if (dtn_fds[ep].gate == NULL) {
00257                 cout << "C_API: Trying to close non existing ep " << ep << endl;
00258                 return;
00259         }
00260         dtn_fds[ep].gate->close();
00261         dtn_fds[ep].gate = NULL;
00262 
00263 }
00264 
00265 extern "C" void dtn_write(DTN_EP ep, const void *data, uint32_t length) {
00266         if (ep < 0 || ep >= MAX_DTN_FDS) {
00267                 cout << "C_API: Invalid ep descriptor in dtn_write(): " << ep << endl;
00268                 return;
00269         }
00270         if (dtn_fds[ep].gate == NULL) {
00271                 cout << "C_API: Trying to dtn_write() to non existing ep " << ep << endl;
00272                 return;
00273         }
00274         dtn_fds[ep].gate->dtn_write(data,length);
00275 }
00276 
00277 extern "C" int32_t dtn_read(DTN_EP ep, void *buf, uint32_t length) {
00278         if (ep < 0 || ep >= MAX_DTN_FDS) {
00279                 cout << "C_API: Invalid ep descriptor in dtn_read(): " << ep << endl;
00280                 return -1;
00281         }
00282         if (dtn_fds[ep].gate == NULL) {
00283                 cout << "C_API: Trying to dtn_read() from non existing ep " << ep << endl;
00284                 return -1;
00285         }
00286         return dtn_fds[ep].gate->dtn_read(buf,length);
00287 }
00288 
00289 extern "C" void dtn_endpoint_set_option(DTN_EP ep, uint16_t option, ...) {
00290         va_list ap;
00291         va_start(ap,option);
00292         if (ep < 0 || ep >= MAX_DTN_FDS) {
00293                 cout << "C_API: Invalid ep descriptor in dtn_endpoint_set_option(): " << ep << endl;
00294                 return;
00295         }
00296         if (dtn_fds[ep].gate == NULL) {
00297                 cout << "C_API: Trying to dtn_endpoint_set_option() to non existing ep " << ep << endl;
00298                 return;
00299         }
00300         switch (option) {
00301                 case DTN_OPTION_DSTEID:
00302                         dtn_fds[ep].gate->set_dsteid(va_arg(ap, char *));
00303                         break;
00304                 case DTN_OPTION_TXCHUNKSIZE:
00305                         dtn_fds[ep].gate->set_txchunksize(va_arg(ap, uint32_t));
00306                         break;
00307                 case DTN_OPTION_FLUSH:
00308                         dtn_fds[ep].gate->transmit_buffer();
00309                         break;
00310                 default:
00311                         cout << "C_API: Unkown option " << option << " in dtn_endpoint_set_option()" << endl;
00312         }
00313 }
00314 
00315 extern "C" void dtn_send_bundle(int32_t ep , char *dst_uri, char *data, uint32_t length) {
00316         if (ep < 0 || ep >= MAX_DTN_FDS || dtn_fds[ep].gate == NULL) {
00317                 cout << "C_API: Invalid ep descriptor in sendBundle(): " << ep << endl;
00318                 return;
00319         }
00320         dtn_fds[ep].gate->send(dst_uri, data, length);
00321 }
00322 
00323 extern "C" void dtn_hithere() {
00324         cout << "Muahahaharrr" << endl;
00325 }
00326