XRootD
Loading...
Searching...
No Matches
XrdXrootdGSReal.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d X r o o t d G S R e a l . h h */
4/* */
5/* (c) 2019 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdio>
32#include <cstdlib>
33#include <cstring>
34#include <sys/uio.h>
35
36#include "Xrd/XrdScheduler.hh"
37#include "XrdNet/XrdNetMsg.hh"
40
41/******************************************************************************/
42/* G l o b a l s */
43/******************************************************************************/
44
46{
47extern XrdScheduler *Sched;
48extern XrdSysError *eDest;
49extern char *monHost;
50extern char *kySID;
51extern long long mySID;
52extern int startTime;
53
54extern char *SidCGI[4];
55extern char *SidJSON[4];
56}
57
58using namespace XrdXrootdMonInfo;
59
60/******************************************************************************/
61/* C o n s t r u c t o r */
62/******************************************************************************/
63
65 bool &aOK)
66 : XrdJob("GStream"), XrdXrootdGStream(*this),
67 Hello(gsParms.Opt & XrdXrootdGSReal::optNoID
68 || gsParms.Hdr == XrdXrootdGSReal::hdrNone
69 ? 0 : gsParms.dest, gsParms.Fmt),
70 pSeq(0), pSeqID(0), pSeqDID(0), binHdr(0),
71 isCGI(false)
72{
73 static const int minSZ = 1024;
74 static const int dflSZ = 1024*32;
75 static const int maxSZ = 1024*64;
76 int flsT, maxL, hdrLen;
77
78// Do common initialization
79//
80 memset(&hInfo, 0, sizeof(hInfo));
81 aOK = true;
82
83// Compute the correct size of the UDP buffer
84//
85 if (gsParms.maxL <= 0) maxL = dflSZ;
86 else if (gsParms.maxL < minSZ) maxL = minSZ;
87 else if (gsParms.maxL > maxSZ) maxL = maxSZ;
88 else maxL = gsParms.maxL;
89 maxL &= ~7; // Doubleword lengths
90
91// Allocate the UDP buffer. Try to keep the data within a single page.
92//
93 int align;
94 if (maxL >= getpagesize()) align = getpagesize();
95 else if (maxL >= 2048) align = 2048;
96 else if (maxL >= 1024) align = 1024;
97 else align = sizeof(void*);
98
99 if (posix_memalign((void **)&udpBuffer, align, maxL)) {aOK = false; return;}
100
101// Setup the header as needed
102//
103 if (gsParms.Hdr == hdrNone)
104 {hdrLen = 0;
105 binHdr = 0;
106 dictHdr = idntHdr0 = idntHdr1 = 0;
107 } else {
108 switch(gsParms.Fmt)
109 {case fmtBin: hdrLen = hdrBIN(gsParms);
110 break;
111 case fmtCgi: hdrLen = hdrCGI(gsParms, udpBuffer, maxL);
112 break;
113 case fmtJson: hdrLen = hdrJSN(gsParms, udpBuffer, maxL);
114 break;
115 default: hdrLen = 0;
116 }
117 if (gsParms.Opt & optNoID)
118 {if (idntHdr0) {free(idntHdr0); idntHdr0 = 0;}
119 if (idntHdr1) {free(idntHdr1); idntHdr1 = 0;}
120 }
121 }
122
123// Setup buffer pointers
124//
125 udpBFirst = udpBNext = udpBuffer + hdrLen;
126 udpBEnd = udpBuffer + maxL - 1;
127
128 tBeg = tEnd = afTime = 0;
129
130// Initialize remaining variables
131//
132 monType = gsParms.Mode;
133 rsvbytes = 0;
134
135// If we have a specific end-point, then create a network relay to it
136//
137 if (gsParms.dest) udpDest = new XrdNetMsg(eDest, gsParms.dest, &aOK);
138 else udpDest = 0;
139
140// Setup autoflush (a negative value uses the default)
141//
142 if (gsParms.flsT < 0) flsT = XrdXrootdMonitor::Flushing();
143 else flsT = gsParms.flsT;
144 afRunning = false;
145 SetAutoFlush(flsT);
146
147// Construct our user name as in <gNamePI>.0:0@<myhost>
148//
149 char idBuff[1024];
150 snprintf(idBuff, sizeof(idBuff), "%s.0:0@%s", gsParms.pin, monHost);
151
152// Register ourselves
153//
154 gMon.Register(idBuff, monHost, "xroot");
155}
156
157/******************************************************************************/
158/* A u t o F l u s h */
159/******************************************************************************/
160
161void XrdXrootdGSReal::AutoFlush() // gMutex is locked outside constructor
162{
163 if (afTime && !afRunning)
164 {Sched->Schedule((XrdJob *)this, time(0)+afTime);
165 afRunning = true;
166 }
167}
168
169/******************************************************************************/
170/* D o I t */
171/******************************************************************************/
172
174{
175 XrdSysMutexHelper gHelp(gMutex);
176
177// Check if we need to do anything here
178//
179 afRunning = false;
180 if (afTime)
181 {if (tBeg && time(0)-tBeg >= afTime) Expel(0);
182 AutoFlush();
183 }
184}
185
186/******************************************************************************/
187/* Private: E x p e l */
188/******************************************************************************/
189
190void XrdXrootdGSReal::Expel(int dlen) // gMutex is held
191{
192
193// Check if we need to flush this buffer.
194//
195 if (udpBFirst == udpBNext || (dlen && (udpBNext + dlen) < udpBEnd)) return;
196 int size = udpBNext-udpBuffer;
197
198// Complete the buffer header if may be binary of text
199//
200 if (binHdr)
201 {binHdr->hdr.pseq++;
202 binHdr->hdr.plen = htons(static_cast<uint16_t>(size));
203 binHdr->tBeg = htonl(tBeg);
204 binHdr->tEnd = htonl(tEnd);
205 } else {
206 if (hInfo.pseq)
207 {char tBuff[32];
208 if (pSeq >= 999) pSeq = 0;
209 else pSeq++;
210 snprintf(tBuff, sizeof(tBuff), "%3d%10u%10u", pSeq,
211 (unsigned int)tBeg, (unsigned int)tEnd);
212 if (isCGI)
213 {char *plus, *bP = tBuff;
214 while((plus = index(bP, ' '))) {*plus = '+'; bP = plus+1;}
215 }
216 memcpy(hInfo.pseq, tBuff, 3);
217 memcpy(hInfo.tbeg, tBuff+ 3, 10);
218 memcpy(hInfo.tend, tBuff+13, 10);
219 }
220 }
221
222// Make sure the whole thing ends with a null byte
223//
224 *(udpBNext-1) = 0;
225
226// Send off the packet
227//
228 if (udpDest) udpDest->Send(udpBuffer, size);
229 else XrdXrootdMonitor::Send(monType, udpBuffer, size, false);
230
231// Reset the buffer
232//
233 udpBNext = udpBFirst;
234 tBeg = tEnd = 0;
235}
236
237/******************************************************************************/
238/* F l u s h */
239/******************************************************************************/
240
242{
243 XrdSysMutexHelper gHelp(gMutex);
244 Expel(0);
245}
246
247/******************************************************************************/
248/* G e t D i c t I D */
249/******************************************************************************/
250
251uint32_t XrdXrootdGSReal::GetDictID(const char *text, bool isPath)
252{
253// If this is binary encoded, the record the mapping and return it
254//
255 if (binHdr) return (isPath ? gMon.MapPath(text) : gMon.MapInfo(text));
256
257// If there are no headers then we can't produce this record
258//
259 uint32_t psq, did = XrdXrootdMonitor::GetDictID(true);
260 if (!dictHdr) return htonl(did);
261
262// We need to do some additional work to generate non-binary headers here
263//
264 struct iovec iov[3];
265 char dit = (isPath ? XROOTD_MON_MAPPATH : XROOTD_MON_MAPINFO);
266 char buff[1024];
267
268// Generate a new packet sequence number
269//
270 gMutex.Lock();
271 if (pSeqDID >= 999) pSeqDID = 0;
272 else pSeqDID++;
273 psq = pSeqDID;
274 gMutex.UnLock();
275
276// Generate the packet
277//
278 iov[0].iov_base = buff;
279 iov[0].iov_len = snprintf(buff, sizeof(buff), dictHdr, dit, psq, did);
280 iov[1].iov_base = (void *)text;
281 iov[1].iov_len = strlen(text);
282 iov[2].iov_base = (void *)"\"}";
283 iov[2].iov_len = 3;
284
285// Now send it off
286//
287 udpDest->Send(iov, (*dictHdr == '{' ? 3 : 2));
288 return htonl(did);
289}
290
291/******************************************************************************/
292/* H a s H d r */
293/******************************************************************************/
294
296{
297 return binHdr != 0 || dictHdr != 0;
298}
299
300/******************************************************************************/
301/* Private: h d r B I N */
302/******************************************************************************/
303
304int XrdXrootdGSReal::hdrBIN(const XrdXrootdGSReal::GSParms &gs)
305{
306
307// Initialze the udp heaader in the buffer
308//
309 binHdr = (XrdXrootdMonGS*)udpBuffer;
310 memset(binHdr, 0, sizeof(XrdXrootdMonGS));
311 binHdr->hdr.code = XROOTD_MON_MAPGSTA;
312 binHdr->hdr.stod = startTime;
313
314 long long theSID = ntohll(mySID) & 0x00ffffffffffffff;
315 theSID = theSID | (static_cast<long long>(gs.Type) << XROOTD_MON_PIDSHFT);
316 binHdr->sID = htonll(theSID);
317
318 return (int)sizeof(XrdXrootdMonGS);
319}
320
321/******************************************************************************/
322/* Private: h d r C G I */
323/******************************************************************************/
324
325int XrdXrootdGSReal::hdrCGI(const XrdXrootdGSReal::GSParms &gs,
326 char *buff, int blen)
327{
328 const char *hdr, *plug = "\n";
329 char hBuff[2048];
330 int n;
331
332// Pick any needed extensions to this header
333//
334 switch(gs.Hdr)
335 {case hdrSite: plug = SidCGI[0]; break;
336 case hdrHost: plug = SidCGI[1]; break;
337 case hdrInst: plug = SidCGI[2]; break;
338 case hdrFull: plug = SidCGI[3]; break;
339 default: break;
340 }
341
342// Generate the header to use for 'd' or 'i' packets
343//
344 hdr = "code=%%c&pseq=%%u&stod=%u&sid=%s%s&gs.type=%c&did=%%u&data=";
345
346 snprintf(hBuff, sizeof(hBuff), hdr, ntohl(startTime), kySID, plug, gs.Type);
347 dictHdr = strdup(hBuff);
348
349// Generate the headers to use for '=' packets. These have a changeable part
350// and a non-changeable part.
351//
352 hdr = "code=%c&pseq=%%u";
353
354 snprintf(hBuff, sizeof(hBuff), hdr, XROOTD_MON_MAPIDNT);
355 idntHdr0 = strdup(hBuff);
356
357 hdr = "&stod=%u&sid=%s%s";
358
359 n = snprintf(hBuff, sizeof(hBuff), hdr, ntohl(startTime), kySID, SidCGI[3]);
360 idntHdr1 = strdup(hBuff);
361 idntHsz1 = n+1;
362
363// Format the header
364//
365 hdr = "code=%c&pseq=$12&stod=%u&sid=%s%s&gs.type=%c"
366 "&gs.tbeg=$123456789&gs.tend=$123456789%s\n";
367
368 n = snprintf(buff, blen, hdr, XROOTD_MON_MAPGSTA, ntohl(startTime),
369 kySID, plug, gs.Type);
370
371// Return all of the substitution addresses
372//
373 hInfo.pseq = index(buff, '$');
374 hInfo.tbeg = index(hInfo.pseq+1, '$');
375 hInfo.tend = index(hInfo.tbeg+1, '$');
376
377// Return the length
378//
379 isCGI = true;
380 return n;
381}
382
383/******************************************************************************/
384/* Private: h d r J S N */
385/******************************************************************************/
386
387int XrdXrootdGSReal::hdrJSN(const XrdXrootdGSReal::GSParms &gs,
388 char *buff, int blen)
389{
390 const char *hdr, *plug1 = "", *plug2 = "";
391 char hBuff[2048];
392 int n;
393
394// Add any needed extensions to this header
395//
396 if (gs.Hdr != hdrNorm)
397 {plug1 = ",";
398 switch(gs.Hdr)
399 {case hdrSite: plug2 = SidJSON[0]; break;
400 case hdrHost: plug2 = SidJSON[1]; break;
401 case hdrInst: plug2 = SidJSON[2]; break;
402 case hdrFull: plug2 = SidJSON[3]; break;
403 default: plug1 = ""; break;
404 }
405 }
406
407// Generate the header to use for 'd' or 'i' packets
408//
409 hdr = "{\"code\":\"%%c\",\"pseq\":%%u,\"stod\":%u,\"sid\":%s%s%s,"
410 "\"gs\":{\"type\":\"%c\"},\"did\":%%u,\"data\":\"";
411
412 snprintf(hBuff, sizeof(hBuff), hdr, ntohl(startTime), kySID,
413 plug1, plug2, gs.Type);
414 dictHdr = strdup(hBuff);
415
416// Generate the headers to use for '=' packets. These have a changeable part
417// and a non-changeable part.
418//
419 hdr = "{\"code\":\"%c\",\"pseq\":%%u,";
420
421 snprintf(hBuff, sizeof(hBuff), hdr, XROOTD_MON_MAPIDNT);
422 idntHdr0 = strdup(hBuff);
423
424 hdr = "\"stod\":%u,\"sid\":%s,%s}";
425
426 n = snprintf(hBuff, sizeof(hBuff), hdr, ntohl(startTime), kySID, SidJSON[3]);
427 idntHdr1 = strdup(hBuff);
428 idntHsz1 = n+1;
429
430// Generate the header of plugin output
431//
432 hdr = "{\"code\":\"%c\",\"pseq\":$12,\"stod\":%u,\"sid\":%s%s%s,"
433 "\"gs\":{\"type\":\"%c\",\"tbeg\":$123456789,\"tend\":$123456789}}\n";
434
435// Format the header (we are gauranteed to have at least 1024 bytes here)
436//
437 n = snprintf(buff, blen, hdr, XROOTD_MON_MAPGSTA, ntohl(startTime),
438 kySID, plug1, plug2, gs.Type);
439
440// Return all of the substitution addresses
441//
442 hInfo.pseq = index(buff, '$');
443 hInfo.tbeg = index(hInfo.pseq+1, '$');
444 hInfo.tend = index(hInfo.tbeg+1, '$');
445
446// Return the length
447//
448 return n;
449}
450
451/******************************************************************************/
452/* I d e n t */
453/******************************************************************************/
454
456{
457 struct iovec iov[2];
458 char buff[40];
459 uint32_t psq;
460
461// If identification suppressed, then just return
462//
463 if (!idntHdr0 || !udpDest) return;
464
465// Generate a new packet sequence number
466//
467 gMutex.Lock();
468 if (pSeqID >= 999) pSeqID = 0;
469 else pSeqID++;
470 psq = pSeqID;
471 gMutex.UnLock();
472
473// Create header and iovec to send the header
474//
475 iov[0].iov_base = buff;
476 iov[0].iov_len = snprintf(buff, sizeof(buff), idntHdr0, psq);
477 iov[1].iov_base = idntHdr1;
478 iov[1].iov_len = idntHsz1;
479 udpDest->Send(iov, 2);
480}
481
482/******************************************************************************/
483/* I n s e r t */
484/******************************************************************************/
485
486bool XrdXrootdGSReal::Insert(const char *data, int dlen)
487{
488
489// Validate the length and message
490//
491 if (dlen < 8 || dlen > XrdXrootdGStream::MaxDataLen
492 || !data || data[dlen-1]) return false;
493
494// Reserve the storage and copy the message. It always will end with a newline
495//
496 gMutex.Lock();
497 Expel(dlen);
498 memcpy(udpBNext, data, dlen-1);
499 udpBNext[dlen-1] = '\n';
500
501// Timestamp the record and aAdjust buffer pointers
502//
503 tEnd = time(0);
504 if (udpBNext == udpBFirst) tBeg = tEnd;
505 udpBNext += dlen;
506
507// All done
508//
509 gMutex.UnLock();
510 return true;
511}
512
513/******************************************************************************/
514
516{
517 XrdSysMutexHelper gHelp(gMutex);
518
519// Make sure space is reserved
520//
521 if (!rsvbytes) return false;
522
523// We are now sure that the recursive lock is held twice by this thread. So,
524// make it a unitary lock so it gets fully unlocked upon rturn.
525//
526 gMutex.UnLock();
527
528// Check for cancellation
529//
530 if (!dlen)
531 {rsvbytes = 0;
532 return true;
533 }
534
535// Length, it must >= 8 and <= reserved amount and the data must end with a 0.
536//
537 if (dlen > rsvbytes || dlen < 8 || *(udpBNext+dlen-1))
538 {rsvbytes = 0;
539 return false;
540 }
541
542// Adjust the buffer space and time stamp the record
543//
544 tEnd = time(0);
545 if (udpBNext == udpBFirst) tBeg = tEnd;
546 udpBNext += dlen;
547 *(udpBNext-1) = '\n';
548 rsvbytes = 0;
549
550// All done
551
552 return true;
553}
554
555/******************************************************************************/
556/* R e s e r v e */
557/******************************************************************************/
558
560{
561// Validate the length
562//
563 if (dlen < 8 || dlen > XrdXrootdGStream::MaxDataLen) return 0;
564
565// Make sure there is no reserve outstanding
566//
567 gMutex.Lock();
568 if (rsvbytes)
569 {gMutex.UnLock();
570 return 0;
571 }
572
573// Return the allocated the space but keep the lock until Insert() is called.
574//
575 rsvbytes = dlen;
576 Expel(dlen);
577 return udpBNext;
578}
579
580/******************************************************************************/
581/* S e t A u t o F l u s h */
582/******************************************************************************/
583
585{
586 XrdSysMutexHelper gHelp(gMutex);
587
588// Save the current settting and establish the new one and relaunch
589//
590 int afNow = afTime;
591 afTime = (afsec > 0 ? afsec : 0);
592 AutoFlush();
593
594// All done
595//
596 return afNow;
597}
598
599/******************************************************************************/
600/* S p a c e */
601/******************************************************************************/
602
604{
605 XrdSysMutexHelper gHelp(gMutex);
606
607// Return amount of space left
608//
609 return udpBEnd - udpBNext;
610}
static XrdSysError eDest(0,"crypto_")
const long long XROOTD_MON_PIDSHFT
const kXR_char XROOTD_MON_MAPGSTA
const kXR_char XROOTD_MON_MAPINFO
const kXR_char XROOTD_MON_MAPIDNT
const kXR_char XROOTD_MON_MAPPATH
XrdXrootdMonHeader hdr
XrdJob(const char *desc="")
Definition XrdJob.hh:51
uint32_t GetDictID(const char *text, bool isPath=false)
char * Reserve(int dlen)
static const int hdrNone
Format as JSON info.
kXR_char Type
the specific G-Stream identifier
const char * dest
Destination for records.
static const int hdrInst
Include site, host, port, inst.
int SetAutoFlush(int afsec)
static const int fmtBin
Do not include info.
XrdXrootdGSReal(const GSParms &gsParms, bool &aOK)
static const int fmtJson
Format as CGI info.
int maxL
Maximum packet length (default 32K)
static const int hdrSite
Include site.
int flsT
Flush time (default from monitor)
const char * pin
the plugin name.
static const int hdrHost
Include site, host.
char Fmt
How to handle the records.
static const int optNoID
Don't send ident records.
bool Insert(const char *data, int dlen)
int Mode
the monitor type for send routing.
static const int hdrNorm
Include standard header.
static const int fmtCgi
Format as binary info.
static const int hdrFull
Include site, host, port, inst, pgm.
XrdXrootdGStream(XrdXrootdGSReal &gsRef)
static const int MaxDataLen
The larest amount of data that can be inserted in a single call to GStream.
Hello(const char *dest, char mode)
static int Send(int mmode, void *buff, int size, bool setseq=true)
static kXR_unt32 GetDictID(bool hbo=false)
XrdScheduler * Sched
XrdSysError * eDest