00001
00003 #include "ibrdtn/api/dtn_api.h"
00004 #include "ibrcommon/net/tcpclient.h"
00005 #include "ibrdtn/api/Client.h"
00006 #include "ibrdtn/api/StringBundle.h"
00007 #include <string.h>
00008 #include <cstdarg>
00009 #include <unistd.h>
00010
00011
00012
00013 class CAPIGateway : public dtn::api::Client
00014 {
00015 public:
00016 CAPIGateway(string app, void (*process_bundle)(const void * data, uint32_t size), void (*status_callback)(struct dtn_notification info) = NULL , string address = "127.0.0.1", int port = 4550)
00017 : dtn::api::Client(app, _tcpclient), _tcpclient(address, port)
00018 {
00019 cout << "Connecting daemon...";
00020 this->connect();
00021 cout << "Done" << endl;
00022 this->process_bundle=process_bundle;
00023 this->status_callback=status_callback;
00024
00025 tx_buffer_position=0;
00026 tx_buffer_size=KBYTE(64);
00027 tx_buffer=(char *)malloc(tx_buffer_size);
00028 dst_eid=NULL;
00029 if (tx_buffer == NULL) {
00030 cout << "C_API: Error allocating tx_buffer." << endl;
00031 tx_buffer_size=0;
00032 }
00033 if (this->process_bundle == NULL) {
00034 if ( pipe(sync_pipe) != 0) {
00035 perror("C_API: Error creating pipe for synchronous operation");
00036 }
00037 }
00038 };
00039
00043 virtual ~CAPIGateway()
00044 {
00045
00046 _tcpclient.close();
00047 };
00048
00049 void send(char *dst_uri, char *data, uint32_t length) {
00050 dtn::data::Bundle b;
00051 b._destination = dtn::data::EID(dst_uri);
00052 dtn::data::PayloadBlock &payload = b.push_back<dtn::data::PayloadBlock>();
00053
00054
00055 (*payload.getBLOB().iostream()).write(data, length);
00056
00057
00058 dtn::data::DefaultSerializer(*this) << b;
00059 Client::flush();
00060 }
00061
00062 void dtn_write(const void *data, uint32_t length) {
00063 uintptr_t offset=0;
00064 uint32_t tocopy=0;
00065 while (offset < length) {
00066 if ((tx_buffer_size-tx_buffer_position) >= (length-offset))
00067 tocopy=(length-offset);
00068 else
00069 tocopy=tx_buffer_size-tx_buffer_position;
00070
00071 memcpy(tx_buffer+tx_buffer_position,(uint8_t *)data+offset,tocopy);
00072 offset+=tocopy;
00073 tx_buffer_position+=tocopy;
00074 if (tx_buffer_position == tx_buffer_size) {
00075 cout << "TX !" << endl;
00076 transmit_buffer();
00077 }
00078 }
00079 }
00080
00081 int32_t dtn_read(void *buf, uint32_t length) {
00082 if (process_bundle != NULL) {
00083 cout << "C_API: Can't use dtn_read() for endpoint in asynchronous mode" << endl;
00084 return -1;
00085 }
00086 return ::read(sync_pipe[0],buf, length);
00087 }
00088
00089 void transmit_buffer() {
00090 if (dst_eid==NULL) {
00091 cout << "C_API: transmit request without destination: Clearing buffer " << endl;
00092 tx_buffer_position=0;
00093 return;
00094 }
00095 if (tx_buffer_position == 0)
00096 return;
00097 dtn::data::Bundle b;
00098 b._destination = dtn::data::EID(this->dst_eid);
00099 dtn::data::PayloadBlock &payload = b.push_back<dtn::data::PayloadBlock>();
00100
00101
00102 (*payload.getBLOB().iostream()).write(tx_buffer, tx_buffer_position);
00103
00104
00105 dtn::data::DefaultSerializer(*this) << b;
00106 Client::flush();
00107 tx_buffer_position=0;
00108 }
00109
00110 void set_dsteid(const char *dst) {
00111 if (this->dst_eid != NULL) {
00112 free(this->dst_eid);
00113 }
00114 this->dst_eid = (char *)malloc(strlen(dst)+1);
00115 if (this->dst_eid == NULL) {
00116 cout << "C_API: Set destination eid failed. No memory" << endl;
00117 return;
00118 }
00119 strcpy(this->dst_eid,dst);
00120 }
00121
00122 void set_txchunksize(uint32_t chunksize) {
00123 transmit_buffer();
00124 if (tx_buffer != NULL)
00125 free(tx_buffer);
00126 tx_buffer=(char *)malloc(chunksize);
00127 if (tx_buffer == NULL) {
00128 cout << "C_API: Set chunksize failed. No memory" << endl;
00129 tx_buffer_size=0;
00130 return;
00131 }
00132 cout << "C_API: New tx_buffer_size: " << chunksize << endl;
00133 tx_buffer_size=chunksize;
00134 }
00135
00136 private:
00137 void (*process_bundle)(const void * data, uint32_t size);
00138 void (*status_callback)(struct dtn_notification info);
00139 void *recvd;
00140 ibrcommon::Mutex recv_mutex;
00141 int sync_pipe[2];
00142
00143 char *tx_buffer;
00144 uint32_t tx_buffer_size;
00145 uint32_t tx_buffer_position;
00146 char *dst_eid;
00147
00148 void notify(uint16_t status, uint32_t data) {
00149 struct dtn_notification n;
00150 if (status_callback != NULL) {
00151 n.status=status;
00152 n.data=data;
00153 recv_mutex.enter();
00154 status_callback(n);
00155 recv_mutex.leave();
00156 }
00157 }
00158
00159 ibrcommon::tcpclient _tcpclient;
00160
00161
00162
00168 void received(dtn::api::Bundle &b)
00169 {
00170 if (process_bundle != NULL) {
00171 receive_async(b);
00172 }
00173 else {
00174 receive_sync(b);
00175 }
00176 }
00177
00178 void receive_async(dtn::api::Bundle &b) {
00179 int32_t len=b.getData().iostream().size();
00180
00181 if (len < 0 || len > 65536 ) {
00182 cout << "Ignoring bundle" << endl;
00183 notify(DTN_NOTIFY_TOOBIG,len);
00184 return;
00185 }
00186 recvd=malloc(len);
00187 (*b.getData().iostream()).read((char *)recvd, len);
00188 recv_mutex.enter();
00189 process_bundle(recvd,len);
00190 recv_mutex.leave();
00191 free(recvd);
00192 }
00193
00194 void receive_sync(dtn::api::Bundle &b) {
00195 uint32_t chunksize=KBYTE(64);
00196 char *buffer=(char *)malloc(chunksize);
00197 uint32_t len=b.getData().iostream().size();
00198 uint32_t offset=0;
00199 if (buffer == NULL) {
00200 cout << "C_API: receive_sync(): Can't allocate buffer " << endl;
00201 return;
00202 }
00203
00204 while(len != 0) {
00205 if (len <= chunksize) {
00206
00207 (*b.getData().iostream()).read(buffer, len);
00208 if ( ::write(sync_pipe[1],buffer,len) < 0 )
00209 {
00210 std::cerr << "error while writing" << std::endl;
00211 }
00212 break;
00213 }
00214 else {
00215
00216 (*b.getData().iostream()).read(buffer, chunksize);
00217 offset+=chunksize; len-=chunksize;
00218 if ( ::write(sync_pipe[1],buffer,chunksize) < 0 )
00219 {
00220 std::cerr << "error while writing" << std::endl;
00221 }
00222 }
00223 }
00224 }
00225 };
00226
00227
00228 struct dtn_fd {
00229 CAPIGateway *gate;
00230 };
00231
00232
00233 static struct dtn_fd dtn_fds[] = {{NULL},{NULL},{NULL},{NULL}};
00234
00235 extern "C" int32_t dtn_register_endpoint(char *ep, void (*process_bundle)(const void * data, uint32_t size), void (*status_callback)(struct dtn_notification info)) {
00236 int i;
00237 cout << "Register Endpoint " << ep << endl;
00238 for (i=0; i<MAX_DTN_FDS; i++) {
00239 if (dtn_fds[i].gate == NULL) {
00240 cout << "Alloc dtn_fd " << i << endl;
00241 dtn_fds[i].gate = new CAPIGateway(ep, process_bundle);
00242
00243 return i;
00244 }
00245 }
00246 cout << "C_API: DTN fds exhausted " << endl;
00247 return -1;
00248 }
00249
00250
00251 extern "C" void dtn_close_endpoint(DTN_EP ep) {
00252 if (ep < 0 || ep >= MAX_DTN_FDS) {
00253 cout << "C_API: Invalid ep descriptor in close(): " << ep << endl;
00254 return;
00255 }
00256 if (dtn_fds[ep].gate == NULL) {
00257 cout << "C_API: Trying to close non existing ep " << ep << endl;
00258 return;
00259 }
00260 dtn_fds[ep].gate->close();
00261 dtn_fds[ep].gate = NULL;
00262
00263 }
00264
00265 extern "C" void dtn_write(DTN_EP ep, const void *data, uint32_t length) {
00266 if (ep < 0 || ep >= MAX_DTN_FDS) {
00267 cout << "C_API: Invalid ep descriptor in dtn_write(): " << ep << endl;
00268 return;
00269 }
00270 if (dtn_fds[ep].gate == NULL) {
00271 cout << "C_API: Trying to dtn_write() to non existing ep " << ep << endl;
00272 return;
00273 }
00274 dtn_fds[ep].gate->dtn_write(data,length);
00275 }
00276
00277 extern "C" int32_t dtn_read(DTN_EP ep, void *buf, uint32_t length) {
00278 if (ep < 0 || ep >= MAX_DTN_FDS) {
00279 cout << "C_API: Invalid ep descriptor in dtn_read(): " << ep << endl;
00280 return -1;
00281 }
00282 if (dtn_fds[ep].gate == NULL) {
00283 cout << "C_API: Trying to dtn_read() from non existing ep " << ep << endl;
00284 return -1;
00285 }
00286 return dtn_fds[ep].gate->dtn_read(buf,length);
00287 }
00288
00289 extern "C" void dtn_endpoint_set_option(DTN_EP ep, uint16_t option, ...) {
00290 va_list ap;
00291 va_start(ap,option);
00292 if (ep < 0 || ep >= MAX_DTN_FDS) {
00293 cout << "C_API: Invalid ep descriptor in dtn_endpoint_set_option(): " << ep << endl;
00294 return;
00295 }
00296 if (dtn_fds[ep].gate == NULL) {
00297 cout << "C_API: Trying to dtn_endpoint_set_option() to non existing ep " << ep << endl;
00298 return;
00299 }
00300 switch (option) {
00301 case DTN_OPTION_DSTEID:
00302 dtn_fds[ep].gate->set_dsteid(va_arg(ap, char *));
00303 break;
00304 case DTN_OPTION_TXCHUNKSIZE:
00305 dtn_fds[ep].gate->set_txchunksize(va_arg(ap, uint32_t));
00306 break;
00307 case DTN_OPTION_FLUSH:
00308 dtn_fds[ep].gate->transmit_buffer();
00309 break;
00310 default:
00311 cout << "C_API: Unkown option " << option << " in dtn_endpoint_set_option()" << endl;
00312 }
00313 }
00314
00315 extern "C" void dtn_send_bundle(int32_t ep , char *dst_uri, char *data, uint32_t length) {
00316 if (ep < 0 || ep >= MAX_DTN_FDS || dtn_fds[ep].gate == NULL) {
00317 cout << "C_API: Invalid ep descriptor in sendBundle(): " << ep << endl;
00318 return;
00319 }
00320 dtn_fds[ep].gate->send(dst_uri, data, length);
00321 }
00322
00323 extern "C" void dtn_hithere() {
00324 cout << "Muahahaharrr" << endl;
00325 }
00326