/*----------------------------------------------------------------------------* * * Copyright (c) 2004 Southeastern Universities Research Association, * * Thomas Jefferson National Accelerator Facility * * * * This software was developed under a United States Government license * * described in the NOTICE file included as part of this distribution. * * * * E.Wolin, 14-Jul-2004, Jefferson Lab * * * * Authors: Elliott Wolin * * wolin@jlab.org Jefferson Lab, MS-6B * * Phone: (757) 269-7365 12000 Jefferson Ave. * * Fax: (757) 269-5800 Newport News, VA 23606 * * * * Carl Timmer * * timmer@jlab.org Jefferson Lab, MS-6B * * Phone: (757) 269-5130 12000 Jefferson Ave. * * Fax: (757) 269-5800 Newport News, VA 23606 * * * * Description: * * * * Defines cMsg API and return codes * * * * * *----------------------------------------------------------------------------*/ /** * @file * This is the one necessary header file for all cMsg C users. C++ users must * include cMsg.hxx instead, which includes this file. * * * * INTRODUCTION * * cMsg is a simple, abstract API to an arbitrary underlying message service. It * is powerful enough to support synchronous and asynchronous point-to-point and * publish/subscribe communication, and network-accessible message queues. Note * that a given underlying implementation may not necessarily implement all these * features. * * * DOMAINS * * The abstraction relies on the important concept of a "domain", specified via a * "Universal Domain Locator" (UDL) of the form: * * cMsg:domainType://domainInfo * * The domain type refers to an underlying messaging software implementation, * and the domain info is interpreted by the implementation. Generally domains with * different UDL's are isolated from each other, but this is not necessarily the * case. For example, users can easily create gateways between different domains, * or different domain servers may serve into the same messaging namespace. * * The full domain specifier for the full cMsg domain looks like: * * cMsg:cMsg://node:port/cMsg/namespace?param1=val1(¶m2=val2) * * where node:port correspond to the node and port of a cMsg nameserver, and * namespace allows for multiple namespaces on the same server. If the port is missing * a default port is used. Parameters are optional and not specified at this time. * Currently different cMsg domains are completely isolated from each other. A * process can connect to multiple domains if desired. * * * MESSAGES * * Messages are sent via cMsgSend() and related functions. Messages have a type and are * sent to a subject, and both are arbitrary strings. The payload consists of * a single text string. Users must call cMsgFlush() to initiate delivery of messages * in the outbound send queues, although the implementation may deliver messages * before cMsgFlush() is called. Additional message meta-data may be set by the user * (see below), although much of it is set by the system. * * Message consumers ask the system to deliver messages to them that match various * subject/type combinations (each may be NULL). The messages are delivered * asynchronously to callbacks (via cMsgSubscribe()). cMsgFreeMessage() must be * called when the user is done processing the message. Synchronous or RPC-like * messaging is also possible via cMsgSendAndGet(). * * cMsgReceiveStart() must be called to start delivery of messages to callbacks. * * In the cMsg domain perl-like subject wildcard characters are supported, multiple * callbacks for the same subject/type are allowed, and each callback executes in * its own thread. * * * ADDITIONAL INFORMATION * * See the cMsg User's Guide and the cMsg Developer's Guide for more information. * See the cMsg Doxygen and Java docs for the full API specification. * */ #ifndef _cMsg_h #define _cMsg_h /* required includes */ #include #ifndef _cMsgConstants_h #include "cMsgConstants.h" #endif #ifndef VXWORKS #include #endif /** Subscribe configuration. */ typedef void *cMsgSubscribeConfig; /** Shutdown handler function. */ typedef void (cMsgShutdownHandler) (void *userArg); /** Callback function. */ typedef void (cMsgCallbackFunc) (void *msg, void *userArg); /* function prototypes */ #ifdef __cplusplus extern "C" { #endif #ifdef Darwin #define CLOCK_REALTIME 0 int clock_gettime(int dummy, struct timespec *t1); #endif #ifdef VXWORKS char *strdup(const char *s1); int strcasecmp(const char *s1, const char *s2); int strncasecmp(const char *s1, const char *s2, size_t n); #endif /* basic functions */ int cMsgConnect (const char *myUDL, const char *myName, const char *myDescription, void **domainId); int cMsgReconnect (void *domainId); int cMsgSend (void *domainId, void *msg); int cMsgSyncSend (void *domainId, void *msg, const struct timespec *timeout, int *response); int cMsgFlush (void *domainId, const struct timespec *timeout); int cMsgSubscribe (void *domainId, const char *subject, const char *type, cMsgCallbackFunc *callback, void *userArg, cMsgSubscribeConfig *config, void **handle); int cMsgUnSubscribe (void *domainId, void *handle); int cMsgSubscriptionPause (void *domainId, void *handle); int cMsgSubscriptionResume(void *domainId, void *handle); int cMsgSubscriptionQueueClear(void *domainId, void *handle); int cMsgSubscriptionQueueCount(void *domainId, void *handle, int *count); int cMsgSubscriptionQueueIsFull(void *domainId, void *handle, int *full); int cMsgSubscriptionMessagesTotal(void *domainId, void *handle, int *total); int cMsgSendAndGet (void *domainId, void *sendMsg, const struct timespec *timeout, void **replyMsg); int cMsgSubscribeAndGet (void *domainId, const char *subject, const char *type, const struct timespec *timeout, void **replyMsg); int cMsgMonitor (void *domainId, const char *command, void **replyMsg); int cMsgReceiveStart (void *domainId); int cMsgReceiveStop (void *domainId); int cMsgDisconnect (void **domainId); int cMsgSetShutdownHandler(void *domainId, cMsgShutdownHandler *handler, void *userArg); int cMsgShutdownClients (void *domainId, const char *client, int flag); int cMsgShutdownServers (void *domainId, const char *server, int flag); int cMsgGetConnectState (void *domainId, int *connectState); int cMsgSetUDL (void *domainId, const char *udl); int cMsgGetCurrentUDL (void *domainId, const char **udl); char *cMsgPerror (int errorCode); /* message access functions */ int cMsgFreeMessage (void **vmsg); void *cMsgCreateMessage (void); void *cMsgCreateNewMessage (const void *vmsg); void *cMsgCopyMessage (const void *vmsg); int cMsgInitMessage (void *vmsg); void *cMsgCreateResponseMessage(const void *vmsg); void *cMsgCreateNullResponseMessage(const void *vmsg); int cMsgWasSent (const void *vmsg, int *hasBeenSent); int cMsgGetVersion (const void *vmsg, int *version); int cMsgGetGetRequest (const void *vmsg, int *getRequest); int cMsgSetGetResponse ( void *vmsg, int getReponse); int cMsgGetGetResponse (const void *vmsg, int *getReponse); int cMsgSetNullGetResponse ( void *vmsg, int nullGetResponse); int cMsgGetNullGetResponse (const void *vmsg, int *nullGetResponse); int cMsgGetDomain (const void *vmsg, const char **domain); int cMsgGetPayloadText (const void *vmsg, const char **payloadText); int cMsgSetSubject ( void *vmsg, const char *subject); int cMsgGetSubject (const void *vmsg, const char **subject); int cMsgSetType ( void *vmsg, const char *type); int cMsgGetType (const void *vmsg, const char **type); int cMsgSetText ( void *vmsg, const char *text); int cMsgGetText (const void *vmsg, const char **text); int cMsgSetUserInt ( void *vmsg, int userInt); int cMsgGetUserInt (const void *vmsg, int *userInt); int cMsgSetUserTime ( void *vmsg, const struct timespec *userTime); int cMsgGetUserTime (const void *vmsg, struct timespec *userTime); int cMsgGetSender (const void *vmsg, const char **sender); int cMsgGetSenderHost (const void *vmsg, const char **senderHost); int cMsgGetSenderTime (const void *vmsg, struct timespec *senderTime); int cMsgGetReceiver (const void *vmsg, const char **receiver); int cMsgGetReceiverHost (const void *vmsg, const char **receiverHost); int cMsgGetReceiverTime (const void *vmsg, struct timespec *receiverTime); int cMsgSetByteArrayLength ( void *vmsg, int length); int cMsgResetByteArrayLength ( void *vmsg); int cMsgGetByteArrayLength (const void *vmsg, int *length); int cMsgGetByteArrayLengthFull(const void *vmsg, int *length); int cMsgSetByteArrayOffset ( void *vmsg, int offset); int cMsgGetByteArrayOffset (const void *vmsg, int *offset); int cMsgSetByteArrayEndian ( void *vmsg, int endian); int cMsgGetByteArrayEndian (const void *vmsg, int *endian); int cMsgNeedToSwap (const void *vmsg, int *swap); int cMsgSetByteArray ( void *vmsg, char *array, int length); int cMsgSetByteArrayNoCopy ( void *vmsg, char *array, int length); int cMsgGetByteArray (const void *vmsg, char **array); int cMsgSetReliableSend ( void *vmsg, int boolean); int cMsgGetReliableSend ( void *vmsg, int *boolean); /* message context stuff */ int cMsgGetSubscriptionDomain (const void *vmsg, const char **domain); int cMsgGetSubscriptionSubject(const void *vmsg, const char **subject); int cMsgGetSubscriptionType (const void *vmsg, const char **type); int cMsgGetSubscriptionUDL (const void *vmsg, const char **udl); int cMsgGetSubscriptionCueSize(const void *vmsg, int *size); /* misc. */ int cMsgToString (const void *vmsg, char **string); int cMsgToString2 (const void *vmsg, char **string, int binary, int compact, int noSystemFields); int cMsgPayloadToString (const void *vmsg, char **string, int binary, int compact, int noSystemFields); /* ***************************************** */ /* compound payload stuff - 66 user routines */ /* ***************************************** */ int cMsgAddHistoryToPayloadText ( void *vmsg, char *name, char *host, int64_t time, char **pTxt); int cMsgSetHistoryLengthMax ( void *vmsg, int len); int cMsgGetHistoryLengthMax (const void *vmsg, int *len); int cMsgPayloadGet (const void *vmsg, char **names, int *types, int len); int cMsgPayloadGetInfo (const void *vmsg, char ***names, int **types, int *len); int cMsgPayloadGetCount (const void *vmsg, int *count); int cMsgPayloadContainsName (const void *vmsg, const char *name); int cMsgPayloadGetType (const void *vmsg, const char *name, int *type); int cMsgPayloadRemove ( void *vmsg, const char *name); int cMsgPayloadCopy (const void *vmsgFrom, void *vmsgTo); int cMsgPayloadUpdateText (const void *vmsg); int cMsgPayloadGetFieldText (const void *vmsg, const char *name, const char **val); void cMsgPayloadPrint (const void *vmsg); const char *cMsgPayloadFieldDescription(const void *vmsg, const char *name); /* users should NOT have access to these 3 routines */ int cMsgPayloadSetFromText ( void *vmsg, const char *text); int cMsgPayloadSetSystemFieldsFromText ( void *vmsg, const char *text); int cMsgPayloadSetAllFieldsFromText (void *vmsg, const char *text); void cMsgPayloadReset ( void *vmsg); void cMsgPayloadClear ( void *vmsg); int cMsgHasPayload (const void *vmsg, int *hasPayload); int cMsgGetBinary (const void *vmsg, const char *name, const char **val, int *len, int *endian); int cMsgGetBinaryArray (const void *vmsg, const char *name, const char ***vals, int **sizes, int **endians, int *count); int cMsgGetMessage (const void *vmsg, const char *name, const void **val); int cMsgGetMessageArray (const void *vmsg, const char *name, const void ***val, int *len); int cMsgGetString (const void *vmsg, const char *name, const char **val); int cMsgGetStringArray (const void *vmsg, const char *name, const char ***array, int *len); int cMsgGetFloat (const void *vmsg, const char *name, float *val); int cMsgGetFloatArray (const void *vmsg, const char *name, const float **vals, int *len); int cMsgGetDouble (const void *vmsg, const char *name, double *val); int cMsgGetDoubleArray (const void *vmsg, const char *name, const double **vals, int *len); int cMsgGetInt8 (const void *vmsg, const char *name, int8_t *val); int cMsgGetInt16 (const void *vmsg, const char *name, int16_t *val); int cMsgGetInt32 (const void *vmsg, const char *name, int32_t *val); int cMsgGetInt64 (const void *vmsg, const char *name, int64_t *val); int cMsgGetUint8 (const void *vmsg, const char *name, uint8_t *val); int cMsgGetUint16 (const void *vmsg, const char *name, uint16_t *val); int cMsgGetUint32 (const void *vmsg, const char *name, uint32_t *val); int cMsgGetUint64 (const void *vmsg, const char *name, uint64_t *val); int cMsgGetInt8Array (const void *vmsg, const char *name, const int8_t **vals, int *len); int cMsgGetInt16Array (const void *vmsg, const char *name, const int16_t **vals, int *len); int cMsgGetInt32Array (const void *vmsg, const char *name, const int32_t **vals, int *len); int cMsgGetInt64Array (const void *vmsg, const char *name, const int64_t **vals, int *len); int cMsgGetUint8Array (const void *vmsg, const char *name, const uint8_t **vals, int *len); int cMsgGetUint16Array (const void *vmsg, const char *name, const uint16_t **vals, int *len); int cMsgGetUint32Array (const void *vmsg, const char *name, const uint32_t **vals, int *len); int cMsgGetUint64Array (const void *vmsg, const char *name, const uint64_t **vals, int *len); int cMsgAddInt8 ( void *vmsg, const char *name, int8_t val); int cMsgAddInt16 ( void *vmsg, const char *name, int16_t val); int cMsgAddInt32 ( void *vmsg, const char *name, int32_t val); int cMsgAddInt64 ( void *vmsg, const char *name, int64_t val); int cMsgAddUint8 ( void *vmsg, const char *name, uint8_t val); int cMsgAddUint16 ( void *vmsg, const char *name, uint16_t val); int cMsgAddUint32 ( void *vmsg, const char *name, uint32_t val); int cMsgAddUint64 ( void *vmsg, const char *name, uint64_t val); int cMsgAddInt8Array ( void *vmsg, const char *name, const int8_t vals[], int len); int cMsgAddInt16Array ( void *vmsg, const char *name, const int16_t vals[], int len); int cMsgAddInt32Array ( void *vmsg, const char *name, const int32_t vals[], int len); int cMsgAddInt64Array ( void *vmsg, const char *name, const int64_t vals[], int len); int cMsgAddUint8Array ( void *vmsg, const char *name, const uint8_t vals[], int len); int cMsgAddUint16Array ( void *vmsg, const char *name, const uint16_t vals[], int len); int cMsgAddUint32Array ( void *vmsg, const char *name, const uint32_t vals[], int len); int cMsgAddUint64Array ( void *vmsg, const char *name, const uint64_t vals[], int len); int cMsgAddString ( void *vmsg, const char *name, const char *val); int cMsgAddStringArray ( void *vmsg, const char *name, const char **vals, int len); int cMsgAddFloat ( void *vmsg, const char *name, float val); int cMsgAddDouble ( void *vmsg, const char *name, double val); int cMsgAddFloatArray ( void *vmsg, const char *name, const float vals[], int len); int cMsgAddDoubleArray ( void *vmsg, const char *name, const double vals[], int len); int cMsgAddBinary ( void *vmsg, const char *name, const char *src, int size, int endian); int cMsgAddBinaryArray ( void *vmsg, const char *name, const char *src[], int number, const int size[], const int endian[]); int cMsgAddMessage ( void *vmsg, const char *name, const void *vmessage); int cMsgAddMessageArray ( void *vmsg, const char *name, const void *vmessage[], int len); char *cMsgFloatChars(float f); char *cMsgDoubleChars(double d); char *cMsgIntChars(uint32_t i); /* system and domain info access functions */ int cMsgGetUDL (void *domainId, char **udl); int cMsgGetName (void *domainId, char **name); int cMsgGetDescription (void *domainId, char **description); int cMsgGetReceiveState(void *domainId, int *receiveState); /* subscribe configuration functions */ cMsgSubscribeConfig *cMsgSubscribeConfigCreate(void); int cMsgSubscribeConfigDestroy (cMsgSubscribeConfig *config); int cMsgSubscribeSetMaxCueSize (cMsgSubscribeConfig *config, int size); int cMsgSubscribeGetMaxCueSize (cMsgSubscribeConfig *config, int *size); int cMsgSubscribeSetSkipSize (cMsgSubscribeConfig *config, int size); int cMsgSubscribeGetSkipSize (cMsgSubscribeConfig *config, int *size); int cMsgSubscribeSetMaySkip (cMsgSubscribeConfig *config, int maySkip); int cMsgSubscribeGetMaySkip (cMsgSubscribeConfig *config, int *maySkip); int cMsgSubscribeSetMustSerialize (cMsgSubscribeConfig *config, int serialize); int cMsgSubscribeGetMustSerialize (cMsgSubscribeConfig *config, int *serialize); int cMsgSubscribeSetMaxThreads (cMsgSubscribeConfig *config, int threads); int cMsgSubscribeGetMaxThreads (cMsgSubscribeConfig *config, int *threads); int cMsgSubscribeSetMessagesPerThread(cMsgSubscribeConfig *config, int mpt); int cMsgSubscribeGetMessagesPerThread(cMsgSubscribeConfig *config, int *mpt); int cMsgSubscribeSetStackSize (cMsgSubscribeConfig *config, size_t size); int cMsgSubscribeGetStackSize (cMsgSubscribeConfig *config, size_t *size); /* for debugging */ int cMsgSetDebugLevel(int level); #ifdef __cplusplus } #endif #endif /* _cMsg_h */