|
IBR-DTNSuite 0.6
|
00001 00003 #include "ibrdtn/api/dtn_api.h" 00004 #include "ibrcommon/net/tcpclient.h" 00005 #include "ibrdtn/api/Client.h" 00006 #include "ibrdtn/api/StringBundle.h" 00007 #include <string.h> 00008 #include <cstdarg> 00009 #include <unistd.h> 00010 00011 00012 00013 class CAPIGateway : public dtn::api::Client 00014 { 00015 public: 00016 CAPIGateway(string app, void (*process_bundle)(const void * data, uint32_t size), void (*status_callback)(struct dtn_notification info) = NULL , string address = "127.0.0.1", int port = 4550) 00017 : dtn::api::Client(app, _tcpclient), _tcpclient(address, port) 00018 { 00019 cout << "Connecting daemon..."; 00020 this->connect(); //Rückgabewerte?!!? 00021 cout << "Done" << endl; 00022 this->process_bundle=process_bundle; 00023 this->status_callback=status_callback; 00024 00025 tx_buffer_position=0; 00026 tx_buffer_size=KBYTE(64); 00027 tx_buffer=(char *)malloc(tx_buffer_size); 00028 dst_eid=NULL; 00029 if (tx_buffer == NULL) { 00030 cout << "C_API: Error allocating tx_buffer." << endl; 00031 tx_buffer_size=0; 00032 } 00033 if (this->process_bundle == NULL) { //enter synchronous mode 00034 if ( pipe(sync_pipe) != 0) { 00035 perror("C_API: Error creating pipe for synchronous operation"); 00036 } 00037 } 00038 }; 00039 00043 virtual ~CAPIGateway() 00044 { 00045 // Close the tcp connection. 00046 _tcpclient.close(); 00047 }; 00048 00049 void send(char *dst_uri, char *data, uint32_t length) { 00050 dtn::data::Bundle b; 00051 b._destination = dtn::data::EID(dst_uri); 00052 dtn::data::PayloadBlock &payload = b.push_back<dtn::data::PayloadBlock>(); 00053 00054 // add the data 00055 (*payload.getBLOB().iostream()).write(data, length); 00056 00057 // transmit the packet 00058 dtn::data::DefaultSerializer(*this) << b; 00059 Client::flush(); 00060 } 00061 00062 void dtn_write(const void *data, uint32_t length) { 00063 uintptr_t offset=0; 00064 uint32_t tocopy=0; 00065 while (offset < length) { 00066 if ((tx_buffer_size-tx_buffer_position) >= (length-offset)) 00067 tocopy=(length-offset); 00068 else 00069 tocopy=tx_buffer_size-tx_buffer_position; 00070 //cout << "CPY " << tocopy << " because size is " << tx_buffer_size << " and position is " << tx_buffer_position << endl; 00071 memcpy(tx_buffer+tx_buffer_position,(uint8_t *)data+offset,tocopy); 00072 offset+=tocopy; 00073 tx_buffer_position+=tocopy; 00074 if (tx_buffer_position == tx_buffer_size) { //TX buffer full, transmit 00075 cout << "TX !" << endl; 00076 transmit_buffer(); 00077 } 00078 } 00079 } 00080 00081 int32_t dtn_read(void *buf, uint32_t length) { 00082 if (process_bundle != NULL) { 00083 cout << "C_API: Can't use dtn_read() for endpoint in asynchronous mode" << endl; 00084 return -1; 00085 } 00086 return ::read(sync_pipe[0],buf, length); 00087 } 00088 00089 void transmit_buffer() { 00090 if (dst_eid==NULL) { 00091 cout << "C_API: transmit request without destination: Clearing buffer " << endl; 00092 tx_buffer_position=0; 00093 return; 00094 } 00095 if (tx_buffer_position == 0) //is empty 00096 return; 00097 dtn::data::Bundle b; 00098 b._destination = dtn::data::EID(this->dst_eid); 00099 dtn::data::PayloadBlock &payload = b.push_back<dtn::data::PayloadBlock>(); 00100 00101 // add the data 00102 (*payload.getBLOB().iostream()).write(tx_buffer, tx_buffer_position); //+1 is wrong =?!? 00103 00104 // transmit the packet 00105 dtn::data::DefaultSerializer(*this) << b; 00106 Client::flush(); 00107 tx_buffer_position=0; 00108 } 00109 00110 void set_dsteid(const char *dst) { 00111 if (this->dst_eid != NULL) { 00112 free(this->dst_eid); 00113 } 00114 this->dst_eid = (char *)malloc(strlen(dst)+1); 00115 if (this->dst_eid == NULL) { 00116 cout << "C_API: Set destination eid failed. No memory" << endl; 00117 return; 00118 } 00119 strcpy(this->dst_eid,dst); 00120 } 00121 00122 void set_txchunksize(uint32_t chunksize) { 00123 transmit_buffer(); //flush old buffer 00124 if (tx_buffer != NULL) 00125 free(tx_buffer); 00126 tx_buffer=(char *)malloc(chunksize); 00127 if (tx_buffer == NULL) { 00128 cout << "C_API: Set chunksize failed. No memory" << endl; 00129 tx_buffer_size=0; 00130 return; 00131 } 00132 cout << "C_API: New tx_buffer_size: " << chunksize << endl; 00133 tx_buffer_size=chunksize; 00134 } 00135 00136 private: 00137 void (*process_bundle)(const void * data, uint32_t size); 00138 void (*status_callback)(struct dtn_notification info); 00139 void *recvd; 00140 ibrcommon::Mutex recv_mutex; 00141 int sync_pipe[2]; 00142 00143 char *tx_buffer; 00144 uint32_t tx_buffer_size; 00145 uint32_t tx_buffer_position; 00146 char *dst_eid; 00147 00148 void notify(uint16_t status, uint32_t data) { 00149 struct dtn_notification n; 00150 if (status_callback != NULL) { 00151 n.status=status; 00152 n.data=data; 00153 recv_mutex.enter(); 00154 status_callback(n); 00155 recv_mutex.leave(); 00156 } 00157 } 00158 00159 ibrcommon::tcpclient _tcpclient; 00160 00161 00162 00168 void received(dtn::api::Bundle &b) 00169 { 00170 if (process_bundle != NULL) { 00171 receive_async(b); 00172 } 00173 else { 00174 receive_sync(b); 00175 } 00176 } 00177 00178 void receive_async(dtn::api::Bundle &b) { 00179 int32_t len=b.getData().iostream().size(); 00180 //cout << "You probably wouldn't believe it but I received " << len << " bytes stuff " << endl; 00181 if (len < 0 || len > 65536 ) { //todo: set tx_buffer size 00182 cout << "Ignoring bundle" << endl; 00183 notify(DTN_NOTIFY_TOOBIG,len); 00184 return; 00185 } 00186 recvd=malloc(len); 00187 (*b.getData().iostream()).read((char *)recvd, len); 00188 recv_mutex.enter(); 00189 process_bundle(recvd,len); 00190 recv_mutex.leave(); 00191 free(recvd); 00192 } 00193 00194 void receive_sync(dtn::api::Bundle &b) { 00195 uint32_t chunksize=KBYTE(64); 00196 char *buffer=(char *)malloc(chunksize); 00197 uint32_t len=b.getData().iostream().size(); 00198 uint32_t offset=0; 00199 if (buffer == NULL) { 00200 cout << "C_API: receive_sync(): Can't allocate buffer " << endl; 00201 return; 00202 } 00203 //cout << "You probably wouldn't believe it but I received " << len << " bytes stuff " << endl; 00204 while(len != 0) { 00205 if (len <= chunksize) { 00206 //cout << "Write " << len << endl; 00207 (*b.getData().iostream()).read(buffer, len); 00208 if ( ::write(sync_pipe[1],buffer,len) < 0 ) 00209 { 00210 std::cerr << "error while writing" << std::endl; 00211 } 00212 break; 00213 } 00214 else { 00215 //cout << "Write " << chunksize << endl; 00216 (*b.getData().iostream()).read(buffer, chunksize); 00217 offset+=chunksize; len-=chunksize; 00218 if ( ::write(sync_pipe[1],buffer,chunksize) < 0 ) 00219 { 00220 std::cerr << "error while writing" << std::endl; 00221 } 00222 } 00223 } 00224 } 00225 }; 00226 00227 00228 struct dtn_fd { 00229 CAPIGateway *gate; 00230 }; 00231 00232 //TODO alloc with[MAX_DTN_FDS], init on DT dameon init 00233 static struct dtn_fd dtn_fds[] = {{NULL},{NULL},{NULL},{NULL}}; 00234 00235 extern "C" int32_t dtn_register_endpoint(char *ep, void (*process_bundle)(const void * data, uint32_t size), void (*status_callback)(struct dtn_notification info)) { 00236 int i; 00237 cout << "Register Endpoint " << ep << endl; 00238 for (i=0; i<MAX_DTN_FDS; i++) { 00239 if (dtn_fds[i].gate == NULL) { 00240 cout << "Alloc dtn_fd " << i << endl; 00241 dtn_fds[i].gate = new CAPIGateway(ep, process_bundle); 00242 00243 return i; 00244 } 00245 } 00246 cout << "C_API: DTN fds exhausted " << endl; 00247 return -1; 00248 } 00249 00250 00251 extern "C" void dtn_close_endpoint(DTN_EP ep) { 00252 if (ep < 0 || ep >= MAX_DTN_FDS) { 00253 cout << "C_API: Invalid ep descriptor in close(): " << ep << endl; 00254 return; 00255 } 00256 if (dtn_fds[ep].gate == NULL) { 00257 cout << "C_API: Trying to close non existing ep " << ep << endl; 00258 return; 00259 } 00260 dtn_fds[ep].gate->close(); 00261 dtn_fds[ep].gate = NULL; 00262 00263 } 00264 00265 extern "C" void dtn_write(DTN_EP ep, const void *data, uint32_t length) { 00266 if (ep < 0 || ep >= MAX_DTN_FDS) { 00267 cout << "C_API: Invalid ep descriptor in dtn_write(): " << ep << endl; 00268 return; 00269 } 00270 if (dtn_fds[ep].gate == NULL) { 00271 cout << "C_API: Trying to dtn_write() to non existing ep " << ep << endl; 00272 return; 00273 } 00274 dtn_fds[ep].gate->dtn_write(data,length); 00275 } 00276 00277 extern "C" int32_t dtn_read(DTN_EP ep, void *buf, uint32_t length) { 00278 if (ep < 0 || ep >= MAX_DTN_FDS) { 00279 cout << "C_API: Invalid ep descriptor in dtn_read(): " << ep << endl; 00280 return -1; 00281 } 00282 if (dtn_fds[ep].gate == NULL) { 00283 cout << "C_API: Trying to dtn_read() from non existing ep " << ep << endl; 00284 return -1; 00285 } 00286 return dtn_fds[ep].gate->dtn_read(buf,length); 00287 } 00288 00289 extern "C" void dtn_endpoint_set_option(DTN_EP ep, uint16_t option, ...) { 00290 va_list ap; 00291 va_start(ap,option); 00292 if (ep < 0 || ep >= MAX_DTN_FDS) { 00293 cout << "C_API: Invalid ep descriptor in dtn_endpoint_set_option(): " << ep << endl; 00294 return; 00295 } 00296 if (dtn_fds[ep].gate == NULL) { 00297 cout << "C_API: Trying to dtn_endpoint_set_option() to non existing ep " << ep << endl; 00298 return; 00299 } 00300 switch (option) { 00301 case DTN_OPTION_DSTEID: 00302 dtn_fds[ep].gate->set_dsteid(va_arg(ap, char *)); 00303 break; 00304 case DTN_OPTION_TXCHUNKSIZE: 00305 dtn_fds[ep].gate->set_txchunksize(va_arg(ap, uint32_t)); 00306 break; 00307 case DTN_OPTION_FLUSH: 00308 dtn_fds[ep].gate->transmit_buffer(); 00309 break; 00310 default: 00311 cout << "C_API: Unkown option " << option << " in dtn_endpoint_set_option()" << endl; 00312 } 00313 } 00314 00315 extern "C" void dtn_send_bundle(int32_t ep , char *dst_uri, char *data, uint32_t length) { 00316 if (ep < 0 || ep >= MAX_DTN_FDS || dtn_fds[ep].gate == NULL) { 00317 cout << "C_API: Invalid ep descriptor in sendBundle(): " << ep << endl; 00318 return; 00319 } 00320 dtn_fds[ep].gate->send(dst_uri, data, length); 00321 } 00322 00323 extern "C" void dtn_hithere() { 00324 cout << "Muahahaharrr" << endl; 00325 } 00326