00001 #ifndef PROTON_MESSENGER_H 00002 #define PROTON_MESSENGER_H 1 00003 00004 /* 00005 * 00006 * Licensed to the Apache Software Foundation (ASF) under one 00007 * or more contributor license agreements. See the NOTICE file 00008 * distributed with this work for additional information 00009 * regarding copyright ownership. The ASF licenses this file 00010 * to you under the Apache License, Version 2.0 (the 00011 * "License"); you may not use this file except in compliance 00012 * with the License. You may obtain a copy of the License at 00013 * 00014 * http://www.apache.org/licenses/LICENSE-2.0 00015 * 00016 * Unless required by applicable law or agreed to in writing, 00017 * software distributed under the License is distributed on an 00018 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 00019 * KIND, either express or implied. See the License for the 00020 * specific language governing permissions and limitations 00021 * under the License. 00022 * 00023 */ 00024 00025 #include <proton/import_export.h> 00026 #include <proton/message.h> 00027 #include <proton/selectable.h> 00028 #include <proton/condition.h> 00029 #include <proton/terminus.h> 00030 #include <proton/link.h> 00031 #include <proton/transport.h> 00032 #include <proton/ssl.h> 00033 00034 #ifdef __cplusplus 00035 extern "C" { 00036 #endif 00037 00038 /** 00039 * @file 00040 * 00041 * The messenger API provides a high level interface for sending and 00042 * receiving AMQP messages. 00043 * 00044 * @defgroup messenger Messenger 00045 * @{ 00046 */ 00047 00048 /** 00049 * A ::pn_messenger_t provides a high level interface for sending and 00050 * receiving messages (See ::pn_message_t). 00051 * 00052 * Every messenger contains a single logical queue of incoming 00053 * messages and a single logical queue of outgoing messages. The 00054 * messages in these queues may be destined for, or originate from, a 00055 * variety of addresses. 00056 * 00057 * The messenger interface is single-threaded. All methods except one 00058 * (::pn_messenger_interrupt()) are intended to be used by one thread 00059 * at a time. 00060 * 00061 * 00062 * Address Syntax 00063 * ============== 00064 * 00065 * An address has the following form:: 00066 * 00067 * [ amqp[s]:// ] [user[:password]@] domain [/[name]] 00068 * 00069 * Where domain can be one of:: 00070 * 00071 * host | host:port | ip | ip:port | name 00072 * 00073 * The following are valid examples of addresses: 00074 * 00075 * - example.org 00076 * - example.org:1234 00077 * - amqp://example.org 00078 * - amqps://example.org 00079 * - example.org/incoming 00080 * - amqps://example.org/outgoing 00081 * - amqps://fred:trustno1@example.org 00082 * - 127.0.0.1:1234 00083 * - amqps://127.0.0.1:1234 00084 * 00085 * Sending & Receiving Messages 00086 * ============================ 00087 * 00088 * The messenger API works in conjuction with the ::pn_message_t API. 00089 * A ::pn_message_t is a mutable holder of message content. 00090 * 00091 * The ::pn_messenger_put() operation copies content from the supplied 00092 * ::pn_message_t to the outgoing queue, and may send queued messages 00093 * if it can do so without blocking. The ::pn_messenger_send() 00094 * operation blocks until it has sent the requested number of 00095 * messages, or until a timeout interrupts the attempt. 00096 * 00097 * 00098 * pn_messenger_t *messenger = pn_messenger(NULL); 00099 * pn_message_t *message = pn_message(); 00100 * char subject[1024]; 00101 * for (int i = 0; i < 3; i++) { 00102 * pn_message_set_address(message, "amqp://host/queue"); 00103 * sprintf(subject, "Hello World! %i", i); 00104 * pn_message_set_subject(message, subject); 00105 * pn_messenger_put(messenger, message) 00106 * pn_messenger_send(messenger); 00107 * 00108 * Similarly, the ::pn_messenger_recv() method receives messages into 00109 * the incoming queue, and may block as it attempts to receive up to 00110 * the requested number of messages, or until the timeout is reached. 00111 * It may receive fewer than the requested number. The 00112 * ::pn_messenger_get() method pops the eldest message off the 00113 * incoming queue and copies its content into the supplied 00114 * ::pn_message_t object. It will not block. 00115 * 00116 * 00117 * pn_messenger_t *messenger = pn_messenger(NULL); 00118 * pn_message_t *message = pn_message() 00119 * pn_messenger_recv(messenger): 00120 * while (pn_messenger_incoming(messenger) > 0) { 00121 * pn_messenger_get(messenger, message); 00122 * printf("%s", message.subject); 00123 * } 00124 * 00125 * Output: 00126 * Hello World 0 00127 * Hello World 1 00128 * Hello World 2 00129 * 00130 * The blocking flag allows you to turn off blocking behavior 00131 * entirely, in which case ::pn_messenger_send() and 00132 * ::pn_messenger_recv() will do whatever they can without blocking, 00133 * and then return. You can then look at the number of incoming and 00134 * outgoing messages to see how much outstanding work still remains. 00135 * 00136 * Authentication Mechanims 00137 * ======================== 00138 * 00139 * The messenger API authenticates using some specific mechanisms. In prior versions 00140 * of Proton the only authentication mechanism available was the PLAIN mechanism 00141 * which transports the user's password over the network unencrypted. The Proton versions 00142 * 0.10 and newer support other more secure mechanisms which avoid sending the users's 00143 * password over the network unencrypted. For backwards compatibility the 0.10 version 00144 * of the messenger API will also allow the use of the unencrypted PLAIN mechanism. From the 00145 * 0.11 version and onwards you will need to set the flag PN_FLAGS_ALLOW_INSECURE_MECHS to 00146 * carry on using the unencrypted PLAIN mechanism. 00147 * 00148 * The code for this looks like: 00149 * 00150 * ... 00151 * pn_messenger_set_flags(messenger, PN_FLAGS_ALLOW_INSECURE_MECHS); 00152 * ... 00153 * 00154 * Note that the use of the PLAIN mechanism over an SSL connection is allowed as the 00155 * password is not sent unencrypted. 00156 */ 00157 typedef struct pn_messenger_t pn_messenger_t; 00158 00159 /** 00160 * A subscription is a request for incoming messages. 00161 * 00162 * @todo currently the subscription API is under developed, this 00163 * should allow more explicit control over subscription properties and 00164 * behaviour 00165 */ 00166 typedef struct pn_subscription_t pn_subscription_t; 00167 00168 /** 00169 * Trackers provide a lightweight handle used to track the status of 00170 * incoming and outgoing deliveries. 00171 */ 00172 typedef int64_t pn_tracker_t; 00173 00174 /** 00175 * Describes all the possible states for a message associated with a 00176 * given tracker. 00177 */ 00178 typedef enum { 00179 PN_STATUS_UNKNOWN = 0, /**< The tracker is unknown. */ 00180 PN_STATUS_PENDING = 1, /**< The message is in flight. For outgoing 00181 messages, use ::pn_messenger_buffered to 00182 see if it has been sent or not. */ 00183 PN_STATUS_ACCEPTED = 2, /**< The message was accepted. */ 00184 PN_STATUS_REJECTED = 3, /**< The message was rejected. */ 00185 PN_STATUS_RELEASED = 4, /**< The message was released. */ 00186 PN_STATUS_MODIFIED = 5, /**< The message was modified. */ 00187 PN_STATUS_ABORTED = 6, /**< The message was aborted. */ 00188 PN_STATUS_SETTLED = 7 /**< The remote party has settled the message. */ 00189 } pn_status_t; 00190 00191 /** 00192 * Construct a new ::pn_messenger_t with the given name. The name is 00193 * global. If a NULL name is supplied, a UUID based name will be 00194 * chosen. 00195 * 00196 * @param[in] name the name of the messenger or NULL 00197 * 00198 * @return pointer to a new ::pn_messenger_t 00199 */ 00200 PN_EXTERN pn_messenger_t *pn_messenger(const char *name); 00201 00202 /** 00203 * Get the name of a messenger. 00204 * 00205 * @param[in] messenger a messenger object 00206 * @return the name of the messenger 00207 */ 00208 PN_EXTERN const char *pn_messenger_name(pn_messenger_t *messenger); 00209 00210 /** 00211 * Sets the path that will be used to get the certificate that will be 00212 * used to identify this messenger to its peers. The validity of the 00213 * path is not checked by this function. 00214 * 00215 * @param[in] messenger the messenger 00216 * @param[in] certificate a path to a certificate file 00217 * @return an error code of zero if there is no error 00218 */ 00219 PN_EXTERN int pn_messenger_set_certificate(pn_messenger_t *messenger, const char *certificate); 00220 00221 /** 00222 * Get the certificate path. This value may be set by 00223 * pn_messenger_set_certificate. The default certificate path is null. 00224 * 00225 * @param[in] messenger the messenger 00226 * @return the certificate file path 00227 */ 00228 PN_EXTERN const char *pn_messenger_get_certificate(pn_messenger_t *messenger); 00229 00230 /** 00231 * Set path to the private key that was used to sign the certificate. 00232 * See ::pn_messenger_set_certificate 00233 * 00234 * @param[in] messenger a messenger object 00235 * @param[in] private_key a path to a private key file 00236 * @return an error code of zero if there is no error 00237 */ 00238 PN_EXTERN int pn_messenger_set_private_key(pn_messenger_t *messenger, const char *private_key); 00239 00240 /** 00241 * Gets the private key file for a messenger. 00242 * 00243 * @param[in] messenger a messenger object 00244 * @return the messenger's private key file path 00245 */ 00246 PN_EXTERN const char *pn_messenger_get_private_key(pn_messenger_t *messenger); 00247 00248 /** 00249 * Sets the private key password for a messenger. 00250 * 00251 * @param[in] messenger a messenger object 00252 * @param[in] password the password for the private key file 00253 * 00254 * @return an error code of zero if there is no error 00255 */ 00256 PN_EXTERN int pn_messenger_set_password(pn_messenger_t *messenger, const char *password); 00257 00258 /** 00259 * Gets the private key file password for a messenger. 00260 * 00261 * @param[in] messenger a messenger object 00262 * @return password for the private key file 00263 */ 00264 PN_EXTERN const char *pn_messenger_get_password(pn_messenger_t *messenger); 00265 00266 /** 00267 * Sets the trusted certificates database for a messenger. 00268 * 00269 * The messenger will use this database to validate the certificate 00270 * provided by the peer. 00271 * 00272 * @param[in] messenger a messenger object 00273 * @param[in] cert_db a path to the certificates database 00274 * 00275 * @return an error code of zero if there is no error 00276 */ 00277 PN_EXTERN int pn_messenger_set_trusted_certificates(pn_messenger_t *messenger, const char *cert_db); 00278 00279 /** 00280 * Gets the trusted certificates database for a messenger. 00281 * 00282 * @param[in] messenger a messenger object 00283 * @return path to the trusted certificates database 00284 */ 00285 PN_EXTERN const char *pn_messenger_get_trusted_certificates(pn_messenger_t *messenger); 00286 00287 /** 00288 * Set the default timeout for a messenger. 00289 * 00290 * Any messenger call that blocks during execution will stop blocking 00291 * and return control when this timeout is reached, if you have set it 00292 * to a value greater than zero. The timeout is expressed in 00293 * milliseconds. 00294 * 00295 * @param[in] messenger a messenger object 00296 * @param[in] timeout a new timeout for the messenger, in milliseconds 00297 * @return an error code or zero if there is no error 00298 */ 00299 PN_EXTERN int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout); 00300 00301 /** 00302 * Gets the timeout for a messenger object. 00303 * 00304 * See ::pn_messenger_set_timeout() for details. 00305 * 00306 * @param[in] messenger a messenger object 00307 * @return the timeout for the messenger, in milliseconds 00308 */ 00309 PN_EXTERN int pn_messenger_get_timeout(pn_messenger_t *messenger); 00310 00311 /** 00312 * Check if a messenger is in blocking mode. 00313 * 00314 * @param[in] messenger a messenger object 00315 * @return true if blocking has been enabled, false otherwise 00316 */ 00317 PN_EXTERN bool pn_messenger_is_blocking(pn_messenger_t *messenger); 00318 00319 /** 00320 * Enable or disable blocking behavior for a messenger during calls to 00321 * ::pn_messenger_send and ::pn_messenger_recv. 00322 * 00323 * @param[in] messenger a messenger object 00324 * @param[in] blocking the value of the blocking flag 00325 * @return an error code or zero if there is no error 00326 */ 00327 PN_EXTERN int pn_messenger_set_blocking(pn_messenger_t *messenger, bool blocking); 00328 00329 /** 00330 * Check if a messenger is in passive mode. 00331 * 00332 * A messenger that is in passive mode will never attempt to perform 00333 * I/O internally, but instead will make all internal file descriptors 00334 * accessible through ::pn_messenger_selectable() to be serviced 00335 * externally. This can be useful for integrating messenger into an 00336 * external event loop. 00337 * 00338 * @param[in] messenger a messenger object 00339 * @return true if the messenger is in passive mode, false otherwise 00340 */ 00341 PN_EXTERN bool pn_messenger_is_passive(pn_messenger_t *messenger); 00342 00343 /** 00344 * Set the passive mode for a messenger. 00345 * 00346 * See ::pn_messenger_is_passive() for details on passive mode. 00347 * 00348 * @param[in] messenger a messenger object 00349 * @param[in] passive true to enable passive mode, false to disable 00350 * passive mode 00351 * @return an error code or zero on success 00352 */ 00353 PN_EXTERN int pn_messenger_set_passive(pn_messenger_t *messenger, bool passive); 00354 00355 /** Frees a Messenger. 00356 * 00357 * @param[in] messenger the messenger to free (or NULL), no longer 00358 * valid on return 00359 */ 00360 PN_EXTERN void pn_messenger_free(pn_messenger_t *messenger); 00361 00362 /** 00363 * Get the code for a messenger's most recent error. 00364 * 00365 * The error code is initialized to zero at messenger creation. The 00366 * error number is "sticky" i.e. error codes are not reset to 0 at the 00367 * end of successful API calls. You can use ::pn_messenger_error to 00368 * access the messenger's error object and clear explicitly if 00369 * desired. 00370 * 00371 * @param[in] messenger the messenger to check for errors 00372 * @return an error code or zero if there is no error 00373 * @see error.h 00374 */ 00375 PN_EXTERN int pn_messenger_errno(pn_messenger_t *messenger); 00376 00377 /** 00378 * Get a messenger's error object. 00379 * 00380 * Returns a pointer to a pn_error_t that is valid until the messenger 00381 * is freed. The pn_error_* API allows you to access the text, error 00382 * number, and lets you set or clear the error code explicitly. 00383 * 00384 * @param[in] messenger the messenger to check for errors 00385 * @return a pointer to the messenger's error descriptor 00386 * @see error.h 00387 */ 00388 PN_EXTERN pn_error_t *pn_messenger_error(pn_messenger_t *messenger); 00389 00390 /** 00391 * Get the size of a messenger's outgoing window. 00392 * 00393 * The size of the outgoing window limits the number of messages whose 00394 * status you can check with a tracker. A message enters this window 00395 * when you call pn_messenger_put on the message. For example, if your 00396 * outgoing window size is 10, and you call pn_messenger_put 12 times, 00397 * new status information will no longer be available for the first 2 00398 * messages. 00399 * 00400 * The default outgoing window size is 0. 00401 * 00402 * @param[in] messenger a messenger object 00403 * @return the outgoing window for the messenger 00404 */ 00405 PN_EXTERN int pn_messenger_get_outgoing_window(pn_messenger_t *messenger); 00406 00407 /** 00408 * Set the size of a messenger's outgoing window. 00409 * 00410 * See ::pn_messenger_get_outgoing_window() for details. 00411 * 00412 * @param[in] messenger a messenger object 00413 * @param[in] window the number of deliveries to track 00414 * @return an error or zero on success 00415 * @see error.h 00416 */ 00417 PN_EXTERN int pn_messenger_set_outgoing_window(pn_messenger_t *messenger, int window); 00418 00419 /** 00420 * Get the size of a messenger's incoming window. 00421 * 00422 * The size of a messenger's incoming window limits the number of 00423 * messages that can be accepted or rejected using trackers. Messages 00424 * *do not* enter this window when they have been received 00425 * (::pn_messenger_recv) onto you incoming queue. Messages only enter 00426 * this window only when you access them using pn_messenger_get. If 00427 * your incoming window size is N, and you get N+1 messages without 00428 * explicitly accepting or rejecting the oldest message, then it will 00429 * be implicitly accepted when it falls off the edge of the incoming 00430 * window. 00431 * 00432 * The default incoming window size is 0. 00433 * 00434 * @param[in] messenger a messenger object 00435 * @return the incoming window for the messenger 00436 */ 00437 PN_EXTERN int pn_messenger_get_incoming_window(pn_messenger_t *messenger); 00438 00439 /** 00440 * Set the size of a messenger's incoming window. 00441 * 00442 * See ::pn_messenger_get_incoming_window() for details. 00443 * 00444 * @param[in] messenger a messenger object 00445 * @param[in] window the number of deliveries to track 00446 * @return an error or zero on success 00447 * @see error.h 00448 */ 00449 PN_EXTERN int pn_messenger_set_incoming_window(pn_messenger_t *messenger, 00450 int window); 00451 00452 /** 00453 * Currently a no-op placeholder. For future compatibility, do not 00454 * send or receive messages before starting the messenger. 00455 * 00456 * @param[in] messenger the messenger to start 00457 * @return an error code or zero on success 00458 * @see error.h 00459 */ 00460 PN_EXTERN int pn_messenger_start(pn_messenger_t *messenger); 00461 00462 /** 00463 * Stops a messenger. 00464 * 00465 * Stopping a messenger will perform an orderly shutdown of all 00466 * underlying connections. This may require some time. If the 00467 * messenger is in non blocking mode (see ::pn_messenger_is_blocking), 00468 * this operation will return PN_INPROGRESS if it cannot finish 00469 * immediately. In that case, you can use ::pn_messenger_stopped() to 00470 * determine when the messenger has finished stopping. 00471 * 00472 * @param[in] messenger the messenger to stop 00473 * @return an error code or zero on success 00474 * @see error.h 00475 */ 00476 PN_EXTERN int pn_messenger_stop(pn_messenger_t *messenger); 00477 00478 /** 00479 * Returns true if a messenger is in the stopped state. This function 00480 * does not block. 00481 * 00482 * @param[in] messenger the messenger to stop 00483 * 00484 */ 00485 PN_EXTERN bool pn_messenger_stopped(pn_messenger_t *messenger); 00486 00487 /** 00488 * Subscribes a messenger to messages from the specified source. 00489 * 00490 * @param[in] messenger the messenger to subscribe 00491 * @param[in] source 00492 * @return a subscription 00493 */ 00494 PN_EXTERN pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source); 00495 00496 /** 00497 * Subscribes a messenger to messages from the specified source with the given 00498 * timeout for the subscription's lifetime. 00499 * 00500 * @param[in] messenger the messenger to subscribe 00501 * @param[in] source 00502 * @param[in] timeout the maximum time to keep the subscription alive once the 00503 * link is closed. 00504 * @return a subscription 00505 */ 00506 PN_EXTERN pn_subscription_t * 00507 pn_messenger_subscribe_ttl(pn_messenger_t *messenger, const char *source, 00508 pn_seconds_t timeout); 00509 00510 /** 00511 * Get a link based on link name and whether the link is a sender or receiver 00512 * 00513 * @param[in] messenger the messenger to get the link from 00514 * @param[in] address the link address that identifies the link to receive 00515 * @param[in] sender true if the link is a sender, false if the link is a 00516 * receiver 00517 * @return a link, or NULL if no link matches the address / sender parameters 00518 */ 00519 PN_EXTERN pn_link_t *pn_messenger_get_link(pn_messenger_t *messenger, 00520 const char *address, bool sender); 00521 00522 /** 00523 * Get a subscription's application context. 00524 * 00525 * See ::pn_subscription_set_context(). 00526 * 00527 * @param[in] sub a subscription object 00528 * @return the subscription's application context 00529 */ 00530 PN_EXTERN void *pn_subscription_get_context(pn_subscription_t *sub); 00531 00532 /** 00533 * Set an application context for a subscription. 00534 * 00535 * @param[in] sub a subscription object 00536 * @param[in] context the application context for the subscription 00537 */ 00538 PN_EXTERN void pn_subscription_set_context(pn_subscription_t *sub, void *context); 00539 00540 /** 00541 * Get the source address of a subscription. 00542 * 00543 * @param[in] sub a subscription object 00544 * @return the subscription's source address 00545 */ 00546 PN_EXTERN const char *pn_subscription_address(pn_subscription_t *sub); 00547 00548 /** 00549 * Puts a message onto the messenger's outgoing queue. The message may 00550 * also be sent if transmission would not cause blocking. This call 00551 * will not block. 00552 * 00553 * @param[in] messenger a messenger object 00554 * @param[in] msg a message to put on the messenger's outgoing queue 00555 * @return an error code or zero on success 00556 * @see error.h 00557 */ 00558 PN_EXTERN int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg); 00559 00560 /** 00561 * Track the status of a delivery. 00562 * 00563 * Get the current status of the delivery associated with the supplied 00564 * tracker. This may return PN_STATUS_UNKOWN if the tracker has fallen 00565 * outside the incoming/outgoing tracking windows of the messenger. 00566 * 00567 * @param[in] messenger the messenger 00568 * @param[in] tracker the tracker identifying the delivery 00569 * @return a status code for the delivery 00570 */ 00571 PN_EXTERN pn_status_t pn_messenger_status(pn_messenger_t *messenger, pn_tracker_t tracker); 00572 00573 /** 00574 * Get delivery information about a delivery. 00575 * 00576 * Returns the delivery information associated with the supplied tracker. 00577 * This may return NULL if the tracker has fallen outside the 00578 * incoming/outgoing tracking windows of the messenger. 00579 * 00580 * @param[in] messenger the messenger 00581 * @param[in] tracker the tracker identifying the delivery 00582 * @return a pn_delivery_t representing the delivery. 00583 */ 00584 PN_EXTERN pn_delivery_t *pn_messenger_delivery(pn_messenger_t *messenger, 00585 pn_tracker_t tracker); 00586 00587 /** 00588 * Check if the delivery associated with a given tracker is still 00589 * waiting to be sent. 00590 * 00591 * Note that returning false does not imply that the delivery was 00592 * actually sent over the wire. 00593 * 00594 * @param[in] messenger the messenger 00595 * @param[in] tracker the tracker identifying the delivery 00596 * 00597 * @return true if the delivery is still buffered 00598 */ 00599 PN_EXTERN bool pn_messenger_buffered(pn_messenger_t *messenger, pn_tracker_t tracker); 00600 00601 /** 00602 * Frees a Messenger from tracking the status associated with a given 00603 * tracker. Use the PN_CUMULATIVE flag to indicate everything up to 00604 * (and including) the given tracker. 00605 * 00606 * @param[in] messenger the Messenger 00607 * @param[in] tracker identifies a delivery 00608 * @param[in] flags 0 or PN_CUMULATIVE 00609 * 00610 * @return an error code or zero on success 00611 * @see error.h 00612 */ 00613 PN_EXTERN int pn_messenger_settle(pn_messenger_t *messenger, pn_tracker_t tracker, int flags); 00614 00615 /** 00616 * Get a tracker for the outgoing message most recently given to 00617 * pn_messenger_put. 00618 * 00619 * This tracker may be used with pn_messenger_status to determine the 00620 * delivery status of the message, as long as the message is still 00621 * within your outgoing window. 00622 * 00623 * @param[in] messenger the messenger 00624 * 00625 * @return a pn_tracker_t or an undefined value if pn_messenger_get 00626 * has never been called for the given messenger 00627 */ 00628 PN_EXTERN pn_tracker_t pn_messenger_outgoing_tracker(pn_messenger_t *messenger); 00629 00630 /** 00631 * Sends or receives any outstanding messages queued for a messenger. 00632 * This will block for the indicated timeout. 00633 * 00634 * @param[in] messenger the Messenger 00635 * @param[in] timeout the maximum time to block in milliseconds, -1 == 00636 * forever, 0 == do not block 00637 * 00638 * @return 0 if no work to do, < 0 if error, or 1 if work was done. 00639 */ 00640 PN_EXTERN int pn_messenger_work(pn_messenger_t *messenger, int timeout); 00641 00642 /** 00643 * Interrupt a messenger object that may be blocking in another 00644 * thread. 00645 * 00646 * The messenger interface is single-threaded. This is the only 00647 * messenger function intended to be concurrently called from another 00648 * thread. It will interrupt any messenger function which is currently 00649 * blocking and cause it to return with a status of ::PN_INTR. 00650 * 00651 * @param[in] messenger the Messenger to interrupt 00652 */ 00653 PN_EXTERN int pn_messenger_interrupt(pn_messenger_t *messenger); 00654 00655 /** 00656 * Send messages from a messenger's outgoing queue. 00657 * 00658 * If a messenger is in blocking mode (see 00659 * ::pn_messenger_is_blocking()), this operation will block until N 00660 * messages have been sent from the outgoing queue. A value of -1 for 00661 * N means "all messages in the outgoing queue". See below for a full 00662 * definition of what sent from the outgoing queue means. 00663 * 00664 * Any blocking will end once the messenger's configured timeout (if 00665 * any) has been reached. When this happens an error code of 00666 * ::PN_TIMEOUT is returned. 00667 * 00668 * If the messenger is in non blocking mode, this call will return an 00669 * error code of ::PN_INPROGRESS if it is unable to send the requested 00670 * number of messages without blocking. 00671 * 00672 * A message is considered to be sent from the outgoing queue when its 00673 * status has been fully determined. This does not necessarily mean 00674 * the message was successfully sent to the final recipient though, 00675 * for example of the receiver rejects the message, the final status 00676 * will be ::PN_STATUS_REJECTED. Similarly, if a message is sent to an 00677 * invalid address, it may be removed from the outgoing queue without 00678 * ever even being transmitted. In this case the final status will be 00679 * ::PN_STATUS_ABORTED. 00680 * 00681 * @param[in] messenger a messenger object 00682 * @param[in] n the number of messages to send 00683 * 00684 * @return an error code or zero on success 00685 * @see error.h 00686 */ 00687 PN_EXTERN int pn_messenger_send(pn_messenger_t *messenger, int n); 00688 00689 /** 00690 * Retrieve messages into a messenger's incoming queue. 00691 * 00692 * Instructs a messenger to receive up to @c limit messages into the 00693 * incoming message queue of a messenger. If @c limit is -1, the 00694 * messenger will receive as many messages as it can buffer 00695 * internally. If the messenger is in blocking mode, this call will 00696 * block until at least one message is available in the incoming 00697 * queue. 00698 * 00699 * Each call to pn_messenger_recv replaces the previous receive 00700 * operation, so pn_messenger_recv(messenger, 0) will cancel any 00701 * outstanding receive. 00702 * 00703 * After receiving messages onto your incoming queue use 00704 * ::pn_messenger_get() to access message content. 00705 * 00706 * @param[in] messenger the messenger 00707 * @param[in] limit the maximum number of messages to receive or -1 to 00708 * to receive as many messages as it can buffer 00709 * internally. 00710 * @return an error code or zero on success 00711 * @see error.h 00712 */ 00713 PN_EXTERN int pn_messenger_recv(pn_messenger_t *messenger, int limit); 00714 00715 /** 00716 * Get the capacity of the incoming message queue of a messenger. 00717 * 00718 * Note this count does not include those messages already available 00719 * on the incoming queue (@see pn_messenger_incoming()). Rather it 00720 * returns the number of incoming queue entries available for 00721 * receiving messages. 00722 * 00723 * @param[in] messenger the messenger 00724 */ 00725 PN_EXTERN int pn_messenger_receiving(pn_messenger_t *messenger); 00726 00727 /** 00728 * Get the next message from the head of a messenger's incoming queue. 00729 * 00730 * The get operation copies the message data from the head of the 00731 * messenger's incoming queue into the provided ::pn_message_t object. 00732 * If provided ::pn_message_t pointer is NULL, the head essage will be 00733 * discarded. This operation will return ::PN_EOS if there are no 00734 * messages left on the incoming queue. 00735 * 00736 * @param[in] messenger a messenger object 00737 * @param[out] message upon return contains the message from the head of the queue 00738 * @return an error code or zero on success 00739 * @see error.h 00740 */ 00741 PN_EXTERN int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *message); 00742 00743 /** 00744 * Get a tracker for the message most recently retrieved by 00745 * ::pn_messenger_get(). 00746 * 00747 * A tracker for an incoming message allows you to accept or reject 00748 * the associated message. It can also be used for cumulative 00749 * accept/reject operations for the associated message and all prior 00750 * messages as well. 00751 * 00752 * @param[in] messenger a messenger object 00753 * @return a pn_tracker_t or an undefined value if pn_messenger_get 00754 * has never been called for the given messenger 00755 */ 00756 PN_EXTERN pn_tracker_t pn_messenger_incoming_tracker(pn_messenger_t *messenger); 00757 00758 /** 00759 * Get the subscription of the message most recently retrieved by ::pn_messenger_get(). 00760 * 00761 * This operation will return NULL if ::pn_messenger_get() has never 00762 * been succesfully called. 00763 * 00764 * @param[in] messenger a messenger object 00765 * @return a pn_subscription_t or NULL 00766 */ 00767 PN_EXTERN pn_subscription_t *pn_messenger_incoming_subscription(pn_messenger_t *messenger); 00768 00769 /** 00770 * Indicates that an accept or reject should operate cumulatively. 00771 */ 00772 #define PN_CUMULATIVE (0x1) 00773 00774 /** 00775 * Signal successful processing of message(s). 00776 * 00777 * With no flags this operation will signal the sender that the 00778 * message referenced by the tracker was accepted. If the 00779 * PN_CUMULATIVE flag is set, this operation will also reject all 00780 * pending messages prior to the message indicated by the tracker. 00781 * 00782 * Note that when a message is accepted or rejected multiple times, 00783 * either explicitly, or implicitly through use of the ::PN_CUMULATIVE 00784 * flag, only the first outcome applies. For example if a sequence of 00785 * three messages are received: M1, M2, M3, and M2 is rejected, and M3 00786 * is cumulatively accepted, M2 will remain rejected and only M1 and 00787 * M3 will be considered accepted. 00788 * 00789 * @param[in] messenger a messenger object 00790 * @param[in] tracker an incoming tracker 00791 * @param[in] flags 0 or PN_CUMULATIVE 00792 * @return an error code or zero on success 00793 * @see error.h 00794 */ 00795 PN_EXTERN int pn_messenger_accept(pn_messenger_t *messenger, pn_tracker_t tracker, int flags); 00796 00797 /** 00798 * Signal unsuccessful processing of message(s). 00799 * 00800 * With no flags this operation will signal the sender that the 00801 * message indicated by the tracker was rejected. If the PN_CUMULATIVE 00802 * flag is used this operation will also reject all pending messages 00803 * prior to the message indicated by the tracker. 00804 * 00805 * Note that when a message is accepted or rejected multiple times, 00806 * either explicitly, or implicitly through use of the ::PN_CUMULATIVE 00807 * flag, only the first outcome applies. For example if a sequence of 00808 * three messages are received: M1, M2, M3, and M2 is accepted, and M3 00809 * is cumulatively rejected, M2 will remain accepted and only M1 and 00810 * M3 will be considered rejected. 00811 * 00812 * @param[in] messenger a messenger object 00813 * @param[in] tracker an incoming tracker 00814 * @param[in] flags 0 or PN_CUMULATIVE 00815 * @return an error code or zero on success 00816 * @see error.h 00817 */ 00818 PN_EXTERN int pn_messenger_reject(pn_messenger_t *messenger, pn_tracker_t tracker, int flags); 00819 00820 /** 00821 * Get link for the message referenced by the given tracker. 00822 * 00823 * @param[in] messenger a messenger object 00824 * @param[in] tracker a tracker object 00825 * @return a pn_link_t or NULL if the link could not be determined. 00826 */ 00827 PN_EXTERN pn_link_t *pn_messenger_tracker_link(pn_messenger_t *messenger, 00828 pn_tracker_t tracker); 00829 00830 /** 00831 * Get the number of messages in the outgoing message queue of a 00832 * messenger. 00833 * 00834 * @param[in] messenger a messenger object 00835 * @return the outgoing queue depth 00836 */ 00837 PN_EXTERN int pn_messenger_outgoing(pn_messenger_t *messenger); 00838 00839 /** 00840 * Get the number of messages in the incoming message queue of a messenger. 00841 * 00842 * @param[in] messenger a messenger object 00843 * @return the incoming queue depth 00844 */ 00845 PN_EXTERN int pn_messenger_incoming(pn_messenger_t *messenger); 00846 00847 //! Adds a routing rule to a Messenger's internal routing table. 00848 //! 00849 //! The route procedure may be used to influence how a messenger will 00850 //! internally treat a given address or class of addresses. Every call 00851 //! to the route procedure will result in messenger appending a routing 00852 //! rule to its internal routing table. 00853 //! 00854 //! Whenever a message is presented to a messenger for delivery, it 00855 //! will match the address of this message against the set of routing 00856 //! rules in order. The first rule to match will be triggered, and 00857 //! instead of routing based on the address presented in the message, 00858 //! the messenger will route based on the address supplied in the rule. 00859 //! 00860 //! The pattern matching syntax supports two types of matches, a '%' 00861 //! will match any character except a '/', and a '*' will match any 00862 //! character including a '/'. 00863 //! 00864 //! A routing address is specified as a normal AMQP address, however it 00865 //! may additionally use substitution variables from the pattern match 00866 //! that triggered the rule. 00867 //! 00868 //! Any message sent to "foo" will be routed to "amqp://foo.com": 00869 //! 00870 //! pn_messenger_route("foo", "amqp://foo.com"); 00871 //! 00872 //! Any message sent to "foobar" will be routed to 00873 //! "amqp://foo.com/bar": 00874 //! 00875 //! pn_messenger_route("foobar", "amqp://foo.com/bar"); 00876 //! 00877 //! Any message sent to bar/<path> will be routed to the corresponding 00878 //! path within the amqp://bar.com domain: 00879 //! 00880 //! pn_messenger_route("bar/*", "amqp://bar.com/$1"); 00881 //! 00882 //! Route all messages over TLS: 00883 //! 00884 //! pn_messenger_route("amqp:*", "amqps:$1") 00885 //! 00886 //! Supply credentials for foo.com: 00887 //! 00888 //! pn_messenger_route("amqp://foo.com/*", "amqp://user:password@foo.com/$1"); 00889 //! 00890 //! Supply credentials for all domains: 00891 //! 00892 //! pn_messenger_route("amqp://*", "amqp://user:password@$1"); 00893 //! 00894 //! Route all addresses through a single proxy while preserving the 00895 //! original destination: 00896 //! 00897 //! pn_messenger_route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); 00898 //! 00899 //! Route any address through a single broker: 00900 //! 00901 //! pn_messenger_route("*", "amqp://user:password@broker/$1"); 00902 //! 00903 //! @param[in] messenger the Messenger 00904 //! @param[in] pattern a glob pattern 00905 //! @param[in] address an address indicating alternative routing 00906 //! 00907 //! @return an error code or zero on success 00908 //! @see error.h 00909 PN_EXTERN int pn_messenger_route(pn_messenger_t *messenger, const char *pattern, 00910 const char *address); 00911 00912 /** 00913 * Rewrite message addresses prior to transmission. 00914 * 00915 * This operation is similar to pn_messenger_route, except that the 00916 * destination of the message is determined before the message address 00917 * is rewritten. 00918 * 00919 * The outgoing address is only rewritten after routing has been 00920 * finalized. If a message has an outgoing address of 00921 * "amqp://0.0.0.0:5678", and a rewriting rule that changes its 00922 * outgoing address to "foo", it will still arrive at the peer that 00923 * is listening on "amqp://0.0.0.0:5678", but when it arrives there, 00924 * the receiver will see its outgoing address as "foo". 00925 * 00926 * The default rewrite rule removes username and password from 00927 * addresses before they are transmitted. 00928 * 00929 * @param[in] messenger a messenger object 00930 * @param[in] pattern a glob pattern to select messages 00931 * @param[in] address an address indicating outgoing address rewrite 00932 * @return an error code or zero on success 00933 */ 00934 PN_EXTERN int pn_messenger_rewrite(pn_messenger_t *messenger, const char *pattern, 00935 const char *address); 00936 00937 /** 00938 * Extract @link pn_selectable_t selectables @endlink from a passive 00939 * messenger. 00940 * 00941 * A messenger that is in passive mode (see 00942 * ::pn_messenger_is_passive()) will never attempt to perform any I/O 00943 * internally, but instead make its internal file descriptors 00944 * available for external processing via the 00945 * ::pn_messenger_selectable() operation. 00946 * 00947 * An application wishing to perform I/O on behalf of a passive 00948 * messenger must extract all available selectables by calling this 00949 * operation until it returns NULL. The ::pn_selectable_t interface 00950 * may then be used by the application to perform I/O outside the 00951 * messenger. 00952 * 00953 * All selectables returned by this operation must be serviced until 00954 * they reach a terminal state and then freed. See 00955 * ::pn_selectable_is_terminal() for more details. 00956 * 00957 * By default any given selectable will only ever be returned once by 00958 * this operation, however if the selectable's registered flag is set 00959 * to true (see ::pn_selectable_set_registered()), then the selectable 00960 * will be returned whenever its interest set may have changed. 00961 * 00962 * @param[in] messenger a messenger object 00963 * @return the next selectable, or NULL if there are none left 00964 */ 00965 PN_EXTERN pn_selectable_t *pn_messenger_selectable(pn_messenger_t *messenger); 00966 00967 /** 00968 * Get the nearest deadline for selectables associated with a messenger. 00969 * 00970 * @param[in] messenger a messenger object 00971 * @return the nearest deadline 00972 */ 00973 PN_EXTERN pn_timestamp_t pn_messenger_deadline(pn_messenger_t *messenger); 00974 00975 /** 00976 * @} 00977 */ 00978 00979 #define PN_FLAGS_CHECK_ROUTES \ 00980 (0x1) /** Messenger flag to indicate that a call \ 00981 to pn_messenger_start should check that \ 00982 any defined routes are valid */ 00983 00984 #define PN_FLAGS_ALLOW_INSECURE_MECHS \ 00985 (0x2) /** Messenger flag to indicate that the PLAIN \ 00986 mechanism is allowed on an unencrypted \ 00987 connection */ 00988 00989 /** Sets control flags to enable additional function for the Messenger. 00990 * 00991 * @param[in] messenger the messenger 00992 * @param[in] flags 0 or PN_FLAGS_CHECK_ROUTES 00993 * 00994 * @return an error code of zero if there is no error 00995 */ 00996 PN_EXTERN int pn_messenger_set_flags(pn_messenger_t *messenger, 00997 const int flags); 00998 00999 /** Gets the flags for a Messenger. 01000 * 01001 * @param[in] messenger the messenger 01002 * @return The flags set for the messenger 01003 */ 01004 PN_EXTERN int pn_messenger_get_flags(pn_messenger_t *messenger); 01005 01006 /** 01007 * Set the local sender settle mode for the underlying link. 01008 * 01009 * @param[in] messenger the messenger 01010 * @param[in] mode the sender settle mode 01011 */ 01012 PN_EXTERN int pn_messenger_set_snd_settle_mode(pn_messenger_t *messenger, 01013 const pn_snd_settle_mode_t mode); 01014 01015 /** 01016 * Set the local receiver settle mode for the underlying link. 01017 * 01018 * @param[in] messenger the messenger 01019 * @param[in] mode the receiver settle mode 01020 */ 01021 PN_EXTERN int pn_messenger_set_rcv_settle_mode(pn_messenger_t *messenger, 01022 const pn_rcv_settle_mode_t mode); 01023 01024 /** 01025 * Set the tracer associated with a messenger. 01026 * 01027 * @param[in] messenger a messenger object 01028 * @param[in] tracer the tracer callback 01029 */ 01030 PN_EXTERN void pn_messenger_set_tracer(pn_messenger_t *messenger, 01031 pn_tracer_t tracer); 01032 01033 /** 01034 * Gets the remote idle timeout for the specified remote service address 01035 * 01036 * @param[in] messenger a messenger object 01037 * @param[in] address of remote service whose idle timeout is required 01038 * @return the timeout in milliseconds or -1 if an error occurs 01039 */ 01040 PN_EXTERN pn_millis_t 01041 pn_messenger_get_remote_idle_timeout(pn_messenger_t *messenger, 01042 const char *address); 01043 01044 /** 01045 * Sets the SSL peer authentiacation mode required when a trust 01046 * certificate is used. 01047 * 01048 * @param[in] messenger a messenger object 01049 * @param[in] mode the mode required (see pn_ssl_verify_mode_t 01050 * enum for valid values) 01051 * @return 0 if successful or -1 if an error occurs 01052 */ 01053 PN_EXTERN int 01054 pn_messenger_set_ssl_peer_authentication_mode(pn_messenger_t *messenger, 01055 const pn_ssl_verify_mode_t mode); 01056 01057 #ifdef __cplusplus 01058 } 01059 #endif 01060 01061 #endif /* messenger.h */