VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 62159

Last change on this file since 62159 was 62159, checked in by vboxsync, 9 years ago

ValidationKit/TestExecService: Add STDINEOS opcode to mark the end of the stdin stream properly

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 79.1 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 62159 2016-07-11 12:35:00Z vboxsync $
3# pylint: disable=C0302
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2015 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 62159 $"
30
31# Standard Python imports.
32import array
33import errno
34import os
35import select
36import socket
37import threading
38import time
39import types
40import zlib
41import uuid
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49#
50# Helpers for decoding data received from the TXS.
51# These are used both the Session and Transport classes.
52#
53
54def getU32(abData, off):
55 """Get a U32 field."""
56 return abData[off] \
57 + abData[off + 1] * 256 \
58 + abData[off + 2] * 65536 \
59 + abData[off + 3] * 16777216;
60
61def getSZ(abData, off, sDefault = None):
62 """
63 Get a zero-terminated string field.
64 Returns sDefault if the string is invalid.
65 """
66 cchStr = getSZLen(abData, off);
67 if cchStr >= 0:
68 abStr = abData[off:(off + cchStr)];
69 try:
70 return abStr.tostring().decode('utf_8');
71 except:
72 reporter.errorXcpt('getSZ(,%u)' % (off));
73 return sDefault;
74
75def getSZLen(abData, off):
76 """
77 Get the length of a zero-terminated string field, in bytes.
78 Returns -1 if off is beyond the data packet or not properly terminated.
79 """
80 cbData = len(abData);
81 if off >= cbData:
82 return -1;
83
84 offCur = off;
85 while abData[offCur] != 0:
86 offCur = offCur + 1;
87 if offCur >= cbData:
88 return -1;
89
90 return offCur - off;
91
92def isValidOpcodeEncoding(sOpcode):
93 """
94 Checks if the specified opcode is valid or not.
95 Returns True on success.
96 Returns False if it is invalid, details in the log.
97 """
98 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
99 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
100 if len(sOpcode) != 8:
101 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
102 return False;
103 for i in range(0, 1):
104 if sSet1.find(sOpcode[i]) < 0:
105 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
106 return False;
107 for i in range(2, 7):
108 if sSet2.find(sOpcode[i]) < 0:
109 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
110 return False;
111 return True;
112
113#
114# Helper for encoding data sent to the TXS.
115#
116
117def u32ToByteArray(u32):
118 """Encodes the u32 value as a little endian byte (B) array."""
119 return array.array('B', \
120 ( u32 % 256, \
121 (u32 / 256) % 256, \
122 (u32 / 65536) % 256, \
123 (u32 / 16777216) % 256) );
124
125
126
127class TransportBase(object):
128 """
129 Base class for the transport layer.
130 """
131
132 def __init__(self, sCaller):
133 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
134 self.fDummy = 0;
135 self.abReadAheadHdr = array.array('B');
136
137 def toString(self):
138 """
139 Stringify the instance for logging and debugging.
140 """
141 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
142
143 def __str__(self):
144 return self.toString();
145
146 def cancelConnect(self):
147 """
148 Cancels any pending connect() call.
149 Returns None;
150 """
151 return None;
152
153 def connect(self, cMsTimeout):
154 """
155 Quietly attempts to connect to the TXS.
156
157 Returns True on success.
158 Returns False on retryable errors (no logging).
159 Returns None on fatal errors with details in the log.
160
161 Override this method, don't call super.
162 """
163 _ = cMsTimeout;
164 return False;
165
166 def disconnect(self, fQuiet = False):
167 """
168 Disconnect from the TXS.
169
170 Returns True.
171
172 Override this method, don't call super.
173 """
174 _ = fQuiet;
175 return True;
176
177 def sendBytes(self, abBuf, cMsTimeout):
178 """
179 Sends the bytes in the buffer abBuf to the TXS.
180
181 Returns True on success.
182 Returns False on failure and error details in the log.
183
184 Override this method, don't call super.
185
186 Remarks: len(abBuf) is always a multiple of 16.
187 """
188 _ = abBuf; _ = cMsTimeout;
189 return False;
190
191 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
192 """
193 Receive cb number of bytes from the TXS.
194
195 Returns the bytes (array('B')) on success.
196 Returns None on failure and error details in the log.
197
198 Override this method, don't call super.
199
200 Remarks: cb is always a multiple of 16.
201 """
202 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
203 return None;
204
205 def isConnectionOk(self):
206 """
207 Checks if the connection is OK.
208
209 Returns True if it is.
210 Returns False if it isn't (caller should call diconnect).
211
212 Override this method, don't call super.
213 """
214 return True;
215
216 def isRecvPending(self, cMsTimeout = 0):
217 """
218 Checks if there is incoming bytes, optionally waiting cMsTimeout
219 milliseconds for something to arrive.
220
221 Returns True if there is, False if there isn't.
222
223 Override this method, don't call super.
224 """
225 _ = cMsTimeout;
226 return False;
227
228 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
229 """
230 Sends a message (opcode + encoded payload).
231
232 Returns True on success.
233 Returns False on failure and error details in the log.
234 """
235 # Fix + check the opcode.
236 if len(sOpcode) < 2:
237 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
238 return False;
239 sOpcode = sOpcode.ljust(8);
240 if not isValidOpcodeEncoding(sOpcode):
241 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
242 return False;
243
244 # Start construct the message.
245 cbMsg = 16 + len(abPayload);
246 abMsg = array.array('B');
247 abMsg.extend(u32ToByteArray(cbMsg));
248 abMsg.extend((0, 0, 0, 0)); # uCrc32
249 try:
250 abMsg.extend(array.array('B', \
251 ( ord(sOpcode[0]), \
252 ord(sOpcode[1]), \
253 ord(sOpcode[2]), \
254 ord(sOpcode[3]), \
255 ord(sOpcode[4]), \
256 ord(sOpcode[5]), \
257 ord(sOpcode[6]), \
258 ord(sOpcode[7]) ) ) );
259 if len(abPayload) > 0:
260 abMsg.extend(abPayload);
261 except:
262 reporter.fatalXcpt('sendMsgInt: packing problem...');
263 return False;
264
265 # checksum it, padd it and send it off.
266 uCrc32 = zlib.crc32(abMsg[8:]);
267 abMsg[4:8] = u32ToByteArray(uCrc32);
268
269 while len(abMsg) % 16:
270 abMsg.append(0);
271
272 reporter.log2('sendMsgInt: op=%s len=%d to=%d' % (sOpcode, len(abMsg), cMsTimeout));
273 return self.sendBytes(abMsg, cMsTimeout);
274
275 def recvMsg(self, cMsTimeout, fNoDataOk = False):
276 """
277 Receives a message from the TXS.
278
279 Returns the message three-tuple: length, opcode, payload.
280 Returns (None, None, None) on failure and error details in the log.
281 """
282
283 # Read the header.
284 if len(self.abReadAheadHdr) > 0:
285 assert(len(self.abReadAheadHdr) == 16);
286 abHdr = self.abReadAheadHdr;
287 self.abReadAheadHdr = array.array('B');
288 else:
289 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk);
290 if abHdr is None:
291 return (None, None, None);
292 if len(abHdr) != 16:
293 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
294 return (None, None, None);
295
296 # Unpack and validate the header.
297 cbMsg = getU32(abHdr, 0);
298 uCrc32 = getU32(abHdr, 4);
299 sOpcode = abHdr[8:16].tostring().decode('ascii');
300
301 if cbMsg < 16:
302 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
303 return (None, None, None);
304 if cbMsg > 1024*1024:
305 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
306 return (None, None, None);
307 if not isValidOpcodeEncoding(sOpcode):
308 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
309 return (None, None, None);
310
311 # Get the payload (if any), dropping the padding.
312 abPayload = array.array('B');
313 if cbMsg > 16:
314 if cbMsg % 16:
315 cbPadding = 16 - (cbMsg % 16);
316 else:
317 cbPadding = 0;
318 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False);
319 if abPayload is None:
320 self.abReadAheadHdr = abHdr;
321 if not fNoDataOk :
322 reporter.log('recvMsg: failed to recv payload bytes!');
323 return (None, None, None);
324
325 while cbPadding > 0:
326 abPayload.pop();
327 cbPadding = cbPadding - 1;
328
329 # Check the CRC-32.
330 if uCrc32 != 0:
331 uActualCrc32 = zlib.crc32(abHdr[8:]);
332 if cbMsg > 16:
333 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
334 uActualCrc32 = uActualCrc32 & 0xffffffff;
335 if uCrc32 != uActualCrc32:
336 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
337 return (None, None, None);
338
339 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
340 return (cbMsg, sOpcode, abPayload);
341
342 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
343 """
344 Sends a message (opcode + payload tuple).
345
346 Returns True on success.
347 Returns False on failure and error details in the log.
348 Returns None if you pass the incorrectly typed parameters.
349 """
350 # Encode the payload.
351 abPayload = array.array('B');
352 for o in aoPayload:
353 try:
354 if isinstance(o, basestring):
355 # the primitive approach...
356 sUtf8 = o.encode('utf_8');
357 for i in range(0, len(sUtf8)):
358 abPayload.append(ord(sUtf8[i]))
359 abPayload.append(0);
360 elif isinstance(o, types.LongType):
361 if o < 0 or o > 0xffffffff:
362 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
363 return None;
364 abPayload.extend(u32ToByteArray(o));
365 elif isinstance(o, array.array):
366 abPayload.extend(o);
367 else:
368 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
369 return None;
370 except:
371 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
372 return None;
373 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
374
375
376class Session(TdTaskBase):
377 """
378 A Test eXecution Service (TXS) client session.
379 """
380
381 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False):
382 """
383 Construct a TXS session.
384
385 This starts by connecting to the TXS and will enter the signalled state
386 when connected or the timeout has been reached.
387 """
388 TdTaskBase.__init__(self, utils.getCallerName());
389 self.oTransport = oTransport;
390 self.sStatus = "";
391 self.cMsTimeout = 0;
392 self.fErr = True; # Whether to report errors as error.
393 self.msStart = 0;
394 self.oThread = None;
395 self.fnTask = self.taskDummy;
396 self.aTaskArgs = None;
397 self.oTaskRc = None;
398 self.t3oReply = (None, None, None);
399 self.fScrewedUpMsgState = False;
400 self.fTryConnect = fTryConnect;
401
402 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
403 raise base.GenError("startTask failed");
404
405 def __del__(self):
406 """Make sure to cancel the task when deleted."""
407 self.cancelTask();
408
409 def toString(self):
410 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
411 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
412 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
413 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
414
415 def taskDummy(self):
416 """Place holder to catch broken state handling."""
417 raise Exception();
418
419 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
420 """
421 Kicks of a new task.
422
423 cMsTimeout: The task timeout in milliseconds. Values less than
424 500 ms will be adjusted to 500 ms. This means it is
425 OK to use negative value.
426 sStatus: The task status.
427 fnTask: The method that'll execute the task.
428 aArgs: Arguments to pass to fnTask.
429
430 Returns True on success, False + error in log on failure.
431 """
432 if not self.cancelTask():
433 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
434 return False;
435
436 # Change status and make sure we're the
437 self.lockTask();
438 if self.sStatus != "":
439 self.unlockTask();
440 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
441 return False;
442 self.sStatus = "setup";
443 self.oTaskRc = None;
444 self.t3oReply = (None, None, None);
445 self.resetTaskLocked();
446 self.unlockTask();
447
448 self.cMsTimeout = max(cMsTimeout, 500);
449 self.fErr = not fIgnoreErrors;
450 self.fnTask = fnTask;
451 self.aTaskArgs = aArgs;
452 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
453 self.oThread.setDaemon(True);
454 self.msStart = base.timestampMilli();
455
456 self.lockTask();
457 self.sStatus = sStatus;
458 self.unlockTask();
459 self.oThread.start();
460
461 return True;
462
463 def cancelTask(self, fSync = True):
464 """
465 Attempts to cancel any pending tasks.
466 Returns success indicator (True/False).
467 """
468 self.lockTask();
469
470 if self.sStatus == "":
471 self.unlockTask();
472 return True;
473 if self.sStatus == "setup":
474 self.unlockTask();
475 return False;
476 if self.sStatus == "cancelled":
477 self.unlockTask();
478 return False;
479
480 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
481 if self.sStatus == 'connecting':
482 self.oTransport.cancelConnect();
483
484 self.sStatus = "cancelled";
485 oThread = self.oThread;
486 self.unlockTask();
487
488 if not fSync:
489 return False;
490
491 oThread.join(61.0);
492 return oThread.isAlive();
493
494 def taskThread(self):
495 """
496 The task thread function.
497 This does some housekeeping activities around the real task method call.
498 """
499 if not self.isCancelled():
500 try:
501 fnTask = self.fnTask;
502 oTaskRc = fnTask(*self.aTaskArgs);
503 except:
504 reporter.fatalXcpt('taskThread', 15);
505 oTaskRc = None;
506 else:
507 reporter.log('taskThread: cancelled already');
508
509 self.lockTask();
510
511 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
512 self.oTaskRc = oTaskRc;
513 self.oThread = None;
514 self.sStatus = '';
515 self.signalTaskLocked();
516
517 self.unlockTask();
518 return None;
519
520 def isCancelled(self):
521 """Internal method for checking if the task has been cancelled."""
522 self.lockTask();
523 sStatus = self.sStatus;
524 self.unlockTask();
525 if sStatus == "cancelled":
526 return True;
527 return False;
528
529 def hasTimedOut(self):
530 """Internal method for checking if the task has timed out or not."""
531 cMsLeft = self.getMsLeft();
532 if cMsLeft <= 0:
533 return True;
534 return False;
535
536 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
537 """Gets the time left until the timeout."""
538 cMsElapsed = base.timestampMilli() - self.msStart;
539 if cMsElapsed < 0:
540 return cMsMin;
541 cMsLeft = self.cMsTimeout - cMsElapsed;
542 if cMsLeft <= cMsMin:
543 return cMsMin;
544 if cMsLeft > cMsMax and cMsMax > 0:
545 return cMsMax
546 return cMsLeft;
547
548 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
549 """
550 Wrapper for TransportBase.recvMsg that stashes the response away
551 so the client can inspect it later on.
552 """
553 if cMsTimeout is None:
554 cMsTimeout = self.getMsLeft(500);
555 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
556 self.lockTask();
557 self.t3oReply = (cbMsg, sOpcode, abPayload);
558 self.unlockTask();
559 return (cbMsg, sOpcode, abPayload);
560
561 def recvAck(self, fNoDataOk = False):
562 """
563 Receives an ACK or error response from the TXS.
564
565 Returns True on success.
566 Returns False on timeout or transport error.
567 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
568 and there are always details of some sort or another.
569 """
570 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
571 if cbMsg is None:
572 return False;
573 sOpcode = sOpcode.strip()
574 if sOpcode == "ACK":
575 return True;
576 return (sOpcode, getSZ(abPayload, 0, sOpcode));
577
578 def recvAckLogged(self, sCommand, fNoDataOk = False):
579 """
580 Wrapper for recvAck and logging.
581 Returns True on success (ACK).
582 Returns False on time, transport error and errors signalled by TXS.
583 """
584 rc = self.recvAck(fNoDataOk);
585 if rc is not True and not fNoDataOk:
586 if rc is False:
587 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
588 else:
589 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
590 rc = False;
591 return rc;
592
593 def recvTrueFalse(self, sCommand):
594 """
595 Receives a TRUE/FALSE response from the TXS.
596 Returns True on TRUE, False on FALSE and None on error/other (logged).
597 """
598 cbMsg, sOpcode, abPayload = self.recvReply();
599 if cbMsg is None:
600 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
601 return None;
602
603 sOpcode = sOpcode.strip()
604 if sOpcode == "TRUE":
605 return True;
606 if sOpcode == "FALSE":
607 return False;
608 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
609 return None;
610
611 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
612 """
613 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
614 """
615 if cMsTimeout is None:
616 cMsTimeout = self.getMsLeft(500);
617 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
618
619 def asyncToSync(self, fnAsync, *aArgs):
620 """
621 Wraps an asynchronous task into a synchronous operation.
622
623 Returns False on failure, task return status on success.
624 """
625 rc = fnAsync(*aArgs);
626 if rc is False:
627 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
628 return rc;
629
630 rc = self.waitForTask(self.cMsTimeout + 5000);
631 if rc is False:
632 reporter.maybeErrXcpt(self.fErr, 'asyncToSync: waitForTask failed...');
633 self.cancelTask();
634 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
635 return False;
636
637 rc = self.getResult();
638 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
639 return rc;
640
641 #
642 # Connection tasks.
643 #
644
645 def taskConnect(self, cMsIdleFudge):
646 """Tries to connect to the TXS"""
647 while not self.isCancelled():
648 reporter.log2('taskConnect: connecting ...');
649 rc = self.oTransport.connect(self.getMsLeft(500));
650 if rc is True:
651 reporter.log('taskConnect: succeeded');
652 return self.taskGreet(cMsIdleFudge);
653 if rc is None:
654 reporter.log2('taskConnect: unable to connect');
655 return None;
656 if self.hasTimedOut():
657 reporter.log2('taskConnect: timed out');
658 if not self.fTryConnect:
659 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
660 return False;
661 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
662 if not self.fTryConnect:
663 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
664 return False;
665
666 def taskGreet(self, cMsIdleFudge):
667 """Greets the TXS"""
668 rc = self.sendMsg("HOWDY", ());
669 if rc is True:
670 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
671 if rc is True:
672 while cMsIdleFudge > 0:
673 cMsIdleFudge -= 1000;
674 time.sleep(1);
675 else:
676 self.oTransport.disconnect(self.fTryConnect);
677 return rc;
678
679 def taskBye(self):
680 """Says goodbye to the TXS"""
681 rc = self.sendMsg("BYE");
682 if rc is True:
683 rc = self.recvAckLogged("BYE");
684 self.oTransport.disconnect();
685 return rc;
686
687 def taskUuid(self):
688 """Gets the TXS UUID"""
689 rc = self.sendMsg("UUID");
690 if rc is True:
691 rc = False;
692 cbMsg, sOpcode, abPayload = self.recvReply();
693 if cbMsg is not None:
694 sOpcode = sOpcode.strip()
695 if sOpcode == "ACK UUID":
696 sUuid = getSZ(abPayload, 0);
697 if sUuid is not None:
698 sUuid = '{%s}' % (sUuid,)
699 try:
700 _ = uuid.UUID(sUuid);
701 rc = sUuid;
702 except:
703 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
704 else:
705 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
706 else:
707 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
708 else:
709 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
710 return rc;
711
712 #
713 # Process task
714 # pylint: disable=C0111
715 #
716
717 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=R0913,R0914,R0915,C0301
718 # Construct the payload.
719 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
720 for sArg in asArgs:
721 aoPayload.append('%s' % (sArg));
722 aoPayload.append(long(len(asAddEnv)));
723 for sPutEnv in asAddEnv:
724 aoPayload.append('%s' % (sPutEnv));
725 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
726 if isinstance(o, basestring):
727 aoPayload.append(o);
728 elif o is not None:
729 aoPayload.append('|');
730 o.uTxsClientCrc32 = zlib.crc32('');
731 else:
732 aoPayload.append('');
733 aoPayload.append('%s' % (sAsUser));
734 aoPayload.append(long(self.cMsTimeout));
735
736 # Kick of the EXEC command.
737 rc = self.sendMsg('EXEC', aoPayload)
738 if rc is True:
739 rc = self.recvAckLogged('EXEC');
740 if rc is True:
741 # Loop till the process completes, feed input to the TXS and
742 # receive output from it.
743 sFailure = "";
744 msPendingInputReply = None;
745 cbMsg, sOpcode, abPayload = (None, None, None);
746 while True:
747 # Pending input?
748 if msPendingInputReply is None \
749 and oStdIn is not None \
750 and not isinstance(oStdIn, basestring):
751 try:
752 abInput = oStdIn.read(65536);
753 except:
754 reporter.errorXcpt('read standard in');
755 sFailure = 'exception reading stdin';
756 rc = None;
757 break;
758 if len(abInput) > 0:
759 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
760 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
761 if rc is not True:
762 sFailure = 'sendMsg failure';
763 break;
764 msPendingInputReply = base.timestampMilli();
765 continue;
766
767 rc = self.sendMsg('STDINEOS');
768 oStdIn = None;
769 if rc is not True:
770 sFailure = 'sendMsg failure';
771 break;
772 msPendingInputReply = base.timestampMilli();
773
774 # Wait for input (500 ms timeout).
775 if cbMsg is None:
776 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
777 if cbMsg is None:
778 # Check for time out before restarting the loop.
779 # Note! Only doing timeout checking here does mean that
780 # the TXS may prevent us from timing out by
781 # flooding us with data. This is unlikely though.
782 if self.hasTimedOut() \
783 and ( msPendingInputReply is None \
784 or base.timestampMilli() - msPendingInputReply > 30000):
785 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
786 sFailure = 'timeout';
787 rc = None;
788 break;
789 # Check that the connection is OK.
790 if not self.oTransport.isConnectionOk():
791 self.oTransport.disconnect();
792 sFailure = 'disconnected';
793 rc = False;
794 break;
795 continue;
796
797 # Handle the response.
798 sOpcode = sOpcode.rstrip();
799 if sOpcode == 'STDOUT':
800 oOut = oStdOut;
801 elif sOpcode == 'STDERR':
802 oOut = oStdErr;
803 elif sOpcode == 'TESTPIPE':
804 oOut = oTestPipe;
805 else:
806 oOut = None;
807 if oOut is not None:
808 # Output from the process.
809 if len(abPayload) < 4:
810 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
811 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
812 rc = None;
813 break;
814 uStreamCrc32 = getU32(abPayload, 0);
815 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
816 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
817 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
818 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
819 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
820 rc = None;
821 break;
822 try:
823 oOut.write(abPayload[4:]);
824 except:
825 sFailure = 'exception writing %s' % (sOpcode);
826 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
827 rc = None;
828 break;
829 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
830 # Standard input is ignored. Ignore this condition for now.
831 msPendingInputReply = None;
832 reporter.log('taskExecEx: Standard input is ignored... why?');
833 del oStdIn.uTxsClientCrc32;
834 oStdIn = '/dev/null';
835 elif (sOpcode == 'STDINMEM' or sOpcode == 'STDINBAD' or sOpcode == 'STDINCRC')\
836 and msPendingInputReply is not None:
837 # TXS STDIN error, abort.
838 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
839 msPendingInputReply = None;
840 sFailure = 'TXS is out of memory for std input buffering';
841 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
842 rc = None;
843 break;
844 elif sOpcode == 'ACK' and msPendingInputReply is not None:
845 msPendingInputReply = None;
846 elif sOpcode.startswith('PROC '):
847 # Process status message, handle it outside the loop.
848 rc = True;
849 break;
850 else:
851 sFailure = 'Unexpected opcode %s' % (sOpcode);
852 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
853 rc = None;
854 break;
855 # Clear the message.
856 cbMsg, sOpcode, abPayload = (None, None, None);
857
858 # If we sent an STDIN packet and didn't get a reply yet, we'll give
859 # TXS some 5 seconds to reply to this. If we don't wait here we'll
860 # get screwed later on if we mix it up with the reply to some other
861 # command. Hackish.
862 if msPendingInputReply is not None:
863 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
864 if cbMsg2 is not None:
865 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
866 % (cbMsg2, sOpcode2, abPayload2));
867 msPendingInputReply = None;
868 else:
869 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
870 self.fScrewedUpMsgState = True;
871
872 # Parse the exit status (True), abort (None) or do nothing (False).
873 if rc is True:
874 if sOpcode != 'PROC OK':
875 # Do proper parsing some other day if needed:
876 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
877 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
878 rc = False;
879 else:
880 if rc is None:
881 # Abort it.
882 reporter.log('taskExecEx: sending ABORT...');
883 rc = self.sendMsg('ABORT');
884 while rc is True:
885 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
886 if cbMsg is None:
887 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
888 self.fScrewedUpMsgState = True;
889 break;
890 if sOpcode.startswith('PROC '):
891 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
892 break;
893 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
894 # Check that the connection is OK before looping.
895 if not self.oTransport.isConnectionOk():
896 self.oTransport.disconnect();
897 break;
898
899 # Fake response with the reason why we quit.
900 if sFailure is not None:
901 self.t3oReply = (0, 'EXECFAIL', sFailure);
902 rc = None;
903 else:
904 rc = None;
905
906 # Cleanup.
907 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
908 if o is not None and not isinstance(o, basestring):
909 del o.uTxsClientCrc32; # pylint: disable=E1103
910 # Make sure all files are closed
911 o.close(); # pylint: disable=E1103
912 reporter.log('taskExecEx: returns %s' % (rc));
913 return rc;
914
915 #
916 # Admin tasks
917 #
918
919 def hlpRebootShutdownWaitForAck(self, sCmd):
920 """Wait for reboot/shutodwn ACK."""
921 rc = self.recvAckLogged(sCmd);
922 if rc is True:
923 # poll a little while for server to disconnect.
924 uMsStart = base.timestampMilli();
925 while self.oTransport.isConnectionOk() \
926 and base.timestampMilli() - uMsStart >= 5000:
927 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
928 break;
929 self.oTransport.disconnect();
930 return rc;
931
932 def taskReboot(self):
933 rc = self.sendMsg('REBOOT');
934 if rc is True:
935 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
936 return rc;
937
938 def taskShutdown(self):
939 rc = self.sendMsg('SHUTDOWN');
940 if rc is True:
941 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
942 return rc;
943
944 #
945 # CD/DVD control tasks.
946 #
947
948 ## TODO
949
950 #
951 # File system tasks
952 #
953
954 def taskMkDir(self, sRemoteDir, fMode):
955 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
956 if rc is True:
957 rc = self.recvAckLogged('MKDIR');
958 return rc;
959
960 def taskMkDirPath(self, sRemoteDir, fMode):
961 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
962 if rc is True:
963 rc = self.recvAckLogged('MKDRPATH');
964 return rc;
965
966 def taskMkSymlink(self, sLinkTarget, sLink):
967 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
968 if rc is True:
969 rc = self.recvAckLogged('MKSYMLNK');
970 return rc;
971
972 def taskRmDir(self, sRemoteDir):
973 rc = self.sendMsg('RMDIR', (sRemoteDir,));
974 if rc is True:
975 rc = self.recvAckLogged('RMDIR');
976 return rc;
977
978 def taskRmFile(self, sRemoteFile):
979 rc = self.sendMsg('RMFILE', (sRemoteFile,));
980 if rc is True:
981 rc = self.recvAckLogged('RMFILE');
982 return rc;
983
984 def taskRmSymlink(self, sRemoteSymlink):
985 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
986 if rc is True:
987 rc = self.recvAckLogged('RMSYMLNK');
988 return rc;
989
990 def taskRmTree(self, sRemoteTree):
991 rc = self.sendMsg('RMTREE', (sRemoteTree,));
992 if rc is True:
993 rc = self.recvAckLogged('RMTREE');
994 return rc;
995
996 #def "CHMOD "
997 #def "CHOWN "
998 #def "CHGRP "
999
1000 def taskIsDir(self, sRemoteDir):
1001 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1002 if rc is True:
1003 rc = self.recvTrueFalse('ISDIR');
1004 return rc;
1005
1006 def taskIsFile(self, sRemoteFile):
1007 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1008 if rc is True:
1009 rc = self.recvTrueFalse('ISFILE');
1010 return rc;
1011
1012 def taskIsSymlink(self, sRemoteSymlink):
1013 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1014 if rc is True:
1015 rc = self.recvTrueFalse('ISSYMLNK');
1016 return rc;
1017
1018 #def "STAT "
1019 #def "LSTAT "
1020 #def "LIST "
1021
1022 def taskUploadFile(self, sLocalFile, sRemoteFile):
1023 #
1024 # Open the local file (make sure it exist before bothering TXS) and
1025 # tell TXS that we want to upload a file.
1026 #
1027 try:
1028 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1029 except:
1030 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1031 return False;
1032
1033 # Common cause with taskUploadStr
1034 rc = self.taskUploadCommon(oLocalFile, sRemoteFile);
1035
1036 # Cleanup.
1037 oLocalFile.close();
1038 return rc;
1039
1040 def taskUploadString(self, sContent, sRemoteFile):
1041 # Wrap sContent in a file like class.
1042 class InStringFile(object): # pylint: disable=R0903
1043 def __init__(self, sContent):
1044 self.sContent = sContent;
1045 self.off = 0;
1046
1047 def read(self, cbMax):
1048 cbLeft = len(self.sContent) - self.off;
1049 if cbLeft == 0:
1050 return "";
1051 if cbLeft <= cbMax:
1052 sRet = self.sContent[self.off:(self.off + cbLeft)];
1053 else:
1054 sRet = self.sContent[self.off:(self.off + cbMax)];
1055 self.off = self.off + len(sRet);
1056 return sRet;
1057
1058 oLocalString = InStringFile(sContent);
1059 return self.taskUploadCommon(oLocalString, sRemoteFile);
1060
1061 def taskUploadCommon(self, oLocalFile, sRemoteFile):
1062 """Common worker used by taskUploadFile and taskUploadString."""
1063 # Command + ACK.
1064 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1065 if rc is True:
1066 rc = self.recvAckLogged('PUT FILE');
1067 if rc is True:
1068 #
1069 # Push data packets until eof.
1070 #
1071 uMyCrc32 = zlib.crc32("");
1072 while True:
1073 # Read up to 64 KB of data.
1074 try:
1075 sRaw = oLocalFile.read(65536);
1076 except:
1077 rc = None;
1078 break;
1079
1080 # Convert to array - this is silly!
1081 abBuf = array.array('B');
1082 for i, _ in enumerate(sRaw):
1083 abBuf.append(ord(sRaw[i]));
1084 sRaw = None;
1085
1086 # Update the file stream CRC and send it off.
1087 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1088 if len(abBuf) == 0:
1089 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1090 else:
1091 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1092 if rc is False:
1093 break;
1094
1095 # Wait for the reply.
1096 rc = self.recvAck();
1097 if rc is not True:
1098 if rc is False:
1099 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1100 else:
1101 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1102 rc = False;
1103 break;
1104
1105 # EOF?
1106 if len(abBuf) == 0:
1107 break;
1108
1109 # Send ABORT on ACK and I/O errors.
1110 if rc is None:
1111 rc = self.sendMsg('ABORT');
1112 if rc is True:
1113 self.recvAckLogged('ABORT');
1114 rc = False;
1115 return rc;
1116
1117 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1118 try:
1119 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1120 except:
1121 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1122 return False;
1123
1124 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1125
1126 oLocalFile.close();
1127 if rc is False:
1128 try:
1129 os.remove(sLocalFile);
1130 except:
1131 reporter.errorXcpt();
1132 return rc;
1133
1134 def taskDownloadString(self, sRemoteFile):
1135 # Wrap sContent in a file like class.
1136 class OutStringFile(object): # pylint: disable=R0903
1137 def __init__(self):
1138 self.asContent = [];
1139
1140 def write(self, sBuf):
1141 self.asContent.append(sBuf);
1142 return None;
1143
1144 oLocalString = OutStringFile();
1145 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1146 if rc is True:
1147 if len(oLocalString.asContent) == 0:
1148 rc = '';
1149 else:
1150 rc = ''.join(oLocalString.asContent);
1151 return rc;
1152
1153 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1154 """Common worker for taskDownloadFile and taskDownloadString."""
1155 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1156 if rc is True:
1157 #
1158 # Process data packets until eof.
1159 #
1160 uMyCrc32 = zlib.crc32("");
1161 while rc is True:
1162 cbMsg, sOpcode, abPayload = self.recvReply();
1163 if cbMsg is None:
1164 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1165 rc = None;
1166 break;
1167
1168 # Validate.
1169 sOpcode = sOpcode.rstrip();
1170 if sOpcode != 'DATA' and sOpcode != 'DATA EOF':
1171 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1172 % (sOpcode, getSZ(abPayload, 0, "None")));
1173 rc = False;
1174 break;
1175 if sOpcode == 'DATA' and len(abPayload) < 4:
1176 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1177 rc = None;
1178 break;
1179 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1180 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1181 rc = None;
1182 break;
1183
1184 # Check the CRC (common for both packets).
1185 uCrc32 = getU32(abPayload, 0);
1186 if sOpcode == 'DATA':
1187 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1188 if uCrc32 != (uMyCrc32 & 0xffffffff):
1189 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1190 % (hex(uMyCrc32), hex(uCrc32)));
1191 rc = None;
1192 break;
1193 if sOpcode == 'DATA EOF':
1194 rc = self.sendMsg('ACK');
1195 break;
1196
1197 # Finally, push the data to the file.
1198 try:
1199 oLocalFile.write(abPayload[4:].tostring());
1200 except:
1201 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1202 rc = None;
1203 break;
1204 rc = self.sendMsg('ACK');
1205
1206 # Send NACK on validation and I/O errors.
1207 if rc is None:
1208 rc = self.sendMsg('NACK');
1209 rc = False;
1210 return rc;
1211
1212 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1213 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1214 if rc is True:
1215 rc = self.recvAckLogged('UNPKFILE');
1216 return rc;
1217
1218 # pylint: enable=C0111
1219
1220
1221 #
1222 # Public methods - generic task queries
1223 #
1224
1225 def isSuccess(self):
1226 """Returns True if the task completed successfully, otherwise False."""
1227 self.lockTask();
1228 sStatus = self.sStatus;
1229 oTaskRc = self.oTaskRc;
1230 self.unlockTask();
1231 if sStatus != "":
1232 return False;
1233 if oTaskRc is False or oTaskRc is None:
1234 return False;
1235 return True;
1236
1237 def getResult(self):
1238 """
1239 Returns the result of a completed task.
1240 Returns None if not completed yet or no previous task.
1241 """
1242 self.lockTask();
1243 sStatus = self.sStatus;
1244 oTaskRc = self.oTaskRc;
1245 self.unlockTask();
1246 if sStatus != "":
1247 return None;
1248 return oTaskRc;
1249
1250 def getLastReply(self):
1251 """
1252 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1253 Returns a None, None, None three-tuple if there was no last reply.
1254 """
1255 self.lockTask();
1256 t3oReply = self.t3oReply;
1257 self.unlockTask();
1258 return t3oReply;
1259
1260 #
1261 # Public methods - connection.
1262 #
1263
1264 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1265 """
1266 Initiates a disconnect task.
1267
1268 Returns True on success, False on failure (logged).
1269
1270 The task returns True on success and False on failure.
1271 """
1272 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1273
1274 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1275 """Synchronous version."""
1276 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1277
1278 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1279 """
1280 Initiates a task for getting the TXS UUID.
1281
1282 Returns True on success, False on failure (logged).
1283
1284 The task returns UUID string (in {}) on success and False on failure.
1285 """
1286 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskUuid);
1287
1288 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1289 """Synchronous version."""
1290 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1291
1292 #
1293 # Public methods - execution.
1294 #
1295
1296 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=R0913
1297 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1298 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1299 """
1300 Initiates a exec process task.
1301
1302 Returns True on success, False on failure (logged).
1303
1304 The task returns True if the process exited normally with status code 0.
1305 The task returns None if on failure prior to executing the process, and
1306 False if the process exited with a different status or in an abnormal
1307 manner. Both None and False are logged of course and further info can
1308 also be obtained by getLastReply().
1309
1310 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1311 these streams. If None, no special action is taken and the output goes
1312 to where ever the TXS sends its output, and ditto for input.
1313 - To send to / read from the bitbucket, pass '/dev/null'.
1314 - To redirect to/from a file, just specify the remote filename.
1315 - To append to a file use '>>' followed by the remote filename.
1316 - To pipe the stream to/from the TXS, specify a file like
1317 object. For StdIn a non-blocking read() method is required. For
1318 the other a write() method is required. Watch out for deadlock
1319 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1320 """
1321 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1322 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1323 oStdOut, oStdErr, oTestPipe, sAsUser));
1324
1325 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=R0913
1326 oStdIn = '/dev/null', oStdOut = '/dev/null',
1327 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1328 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1329 """Synchronous version."""
1330 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1331 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1332
1333 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1334 cMsTimeout = 3600000, fIgnoreErrors = False):
1335 """
1336 Initiates a exec process test task.
1337
1338 Returns True on success, False on failure (logged).
1339
1340 The task returns True if the process exited normally with status code 0.
1341 The task returns None if on failure prior to executing the process, and
1342 False if the process exited with a different status or in an abnormal
1343 manner. Both None and False are logged of course and further info can
1344 also be obtained by getLastReply().
1345
1346 Standard in is taken from /dev/null. While both standard output and
1347 standard error goes directly to reporter.log(). The testpipe is piped
1348 to reporter.xxxx.
1349 """
1350
1351 sStdIn = '/dev/null';
1352 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1353 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1354 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1355 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1356
1357 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1358 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1359
1360 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1361 cMsTimeout = 3600000, fIgnoreErrors = False):
1362 """Synchronous version."""
1363 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1364 cMsTimeout, fIgnoreErrors);
1365
1366 #
1367 # Public methods - file system
1368 #
1369
1370 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1371 """
1372 Initiates a reboot task.
1373
1374 Returns True on success, False on failure (logged).
1375
1376 The task returns True on success, False on failure (logged). The
1377 session will be disconnected on successful task completion.
1378 """
1379 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1380
1381 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1382 """Synchronous version."""
1383 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1384
1385 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1386 """
1387 Initiates a shutdown task.
1388
1389 Returns True on success, False on failure (logged).
1390
1391 The task returns True on success, False on failure (logged).
1392 """
1393 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1394
1395 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1396 """Synchronous version."""
1397 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1398
1399
1400 #
1401 # Public methods - file system
1402 #
1403
1404 def asyncMkDir(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1405 """
1406 Initiates a mkdir task.
1407
1408 Returns True on success, False on failure (logged).
1409
1410 The task returns True on success, False on failure (logged).
1411 """
1412 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1413
1414 def syncMkDir(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1415 """Synchronous version."""
1416 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1417
1418 def asyncMkDirPath(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1419 """
1420 Initiates a mkdir -p task.
1421
1422 Returns True on success, False on failure (logged).
1423
1424 The task returns True on success, False on failure (logged).
1425 """
1426 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1427
1428 def syncMkDirPath(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1429 """Synchronous version."""
1430 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1431
1432 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1433 """
1434 Initiates a symlink task.
1435
1436 Returns True on success, False on failure (logged).
1437
1438 The task returns True on success, False on failure (logged).
1439 """
1440 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1441
1442 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1443 """Synchronous version."""
1444 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1445
1446 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1447 """
1448 Initiates a rmdir task.
1449
1450 Returns True on success, False on failure (logged).
1451
1452 The task returns True on success, False on failure (logged).
1453 """
1454 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1455
1456 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1457 """Synchronous version."""
1458 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1459
1460 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1461 """
1462 Initiates a rmfile task.
1463
1464 Returns True on success, False on failure (logged).
1465
1466 The task returns True on success, False on failure (logged).
1467 """
1468 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1469
1470 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1471 """Synchronous version."""
1472 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1473
1474 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1475 """
1476 Initiates a rmsymlink task.
1477
1478 Returns True on success, False on failure (logged).
1479
1480 The task returns True on success, False on failure (logged).
1481 """
1482 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1483
1484 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1485 """Synchronous version."""
1486 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1487
1488 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1489 """
1490 Initiates a rmtree task.
1491
1492 Returns True on success, False on failure (logged).
1493
1494 The task returns True on success, False on failure (logged).
1495 """
1496 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1497
1498 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1499 """Synchronous version."""
1500 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1501
1502 #def "CHMOD "
1503 #def "CHOWN "
1504 #def "CHGRP "
1505
1506 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1507 """
1508 Initiates a is-dir query task.
1509
1510 Returns True on success, False on failure (logged).
1511
1512 The task returns True if it's a directory, False if it isn't, and
1513 None on error (logged).
1514 """
1515 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1516
1517 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1518 """Synchronous version."""
1519 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1520
1521 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1522 """
1523 Initiates a is-file query task.
1524
1525 Returns True on success, False on failure (logged).
1526
1527 The task returns True if it's a file, False if it isn't, and None on
1528 error (logged).
1529 """
1530 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1531
1532 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1533 """Synchronous version."""
1534 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1535
1536 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1537 """
1538 Initiates a is-symbolic-link query task.
1539
1540 Returns True on success, False on failure (logged).
1541
1542 The task returns True if it's a symbolic linke, False if it isn't, and
1543 None on error (logged).
1544 """
1545 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1546
1547 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1548 """Synchronous version."""
1549 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1550
1551 #def "STAT "
1552 #def "LSTAT "
1553 #def "LIST "
1554
1555 def asyncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1556 """
1557 Initiates a download query task.
1558
1559 Returns True on success, False on failure (logged).
1560
1561 The task returns True on success, False on failure (logged).
1562 """
1563 return self.startTask(cMsTimeout, fIgnoreErrors, "upload", self.taskUploadFile, (sLocalFile, sRemoteFile));
1564
1565 def syncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1566 """Synchronous version."""
1567 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1568
1569 def asyncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1570 """
1571 Initiates a upload string task.
1572
1573 Returns True on success, False on failure (logged).
1574
1575 The task returns True on success, False on failure (logged).
1576 """
1577 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString", self.taskUploadString, (sContent, sRemoteFile));
1578
1579 def syncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1580 """Synchronous version."""
1581 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, cMsTimeout, fIgnoreErrors);
1582
1583 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1584 """
1585 Initiates a download file task.
1586
1587 Returns True on success, False on failure (logged).
1588
1589 The task returns True on success, False on failure (logged).
1590 """
1591 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1592
1593 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1594 """Synchronous version."""
1595 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1596
1597 def asyncDownloadString(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1598 """
1599 Initiates a download string task.
1600
1601 Returns True on success, False on failure (logged).
1602
1603 The task returns a byte string on success, False on failure (logged).
1604 """
1605 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString", self.taskDownloadString, (sRemoteFile,));
1606
1607 def syncDownloadString(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1608 """Synchronous version."""
1609 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, cMsTimeout, fIgnoreErrors);
1610
1611 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1612 """
1613 Initiates a unpack file task.
1614
1615 Returns True on success, False on failure (logged).
1616
1617 The task returns True on success, False on failure (logged).
1618 """
1619 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile, \
1620 (sRemoteFile, sRemoteDir));
1621
1622 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1623 """Synchronous version."""
1624 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1625
1626
1627class TransportTcp(TransportBase):
1628 """
1629 TCP transport layer for the TXS client session class.
1630 """
1631
1632 def __init__(self, sHostname, uPort, fReversedSetup):
1633 """
1634 Save the parameters. The session will call us back to make the
1635 connection later on its worker thread.
1636 """
1637 TransportBase.__init__(self, utils.getCallerName());
1638 self.sHostname = sHostname;
1639 self.fReversedSetup = fReversedSetup;
1640 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1641 self.oSocket = None;
1642 self.oWakeupW = None;
1643 self.oWakeupR = None;
1644 self.fConnectCanceled = False;
1645 self.fIsConnecting = False;
1646 self.oCv = threading.Condition();
1647 self.abReadAhead = array.array('B');
1648
1649 def toString(self):
1650 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1651 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1652 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1653 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1654
1655 def __isInProgressXcpt(self, oXcpt):
1656 """ In progress exception? """
1657 try:
1658 if isinstance(oXcpt, socket.error):
1659 try:
1660 if oXcpt[0] == errno.EINPROGRESS:
1661 return True;
1662 except: pass;
1663 # Windows?
1664 try:
1665 if oXcpt[0] == errno.EWOULDBLOCK:
1666 return True;
1667 except: pass;
1668 except:
1669 pass;
1670 return False;
1671
1672 def __isWouldBlockXcpt(self, oXcpt):
1673 """ Would block exception? """
1674 try:
1675 if isinstance(oXcpt, socket.error):
1676 try:
1677 if oXcpt[0] == errno.EWOULDBLOCK:
1678 return True;
1679 except: pass;
1680 try:
1681 if oXcpt[0] == errno.EAGAIN:
1682 return True;
1683 except: pass;
1684 except:
1685 pass;
1686 return False;
1687
1688 def __isConnectionReset(self, oXcpt):
1689 """ Connection reset by Peer or others. """
1690 try:
1691 if isinstance(oXcpt, socket.error):
1692 try:
1693 if oXcpt[0] == errno.ECONNRESET:
1694 return True;
1695 except: pass;
1696 try:
1697 if oXcpt[0] == errno.ENETRESET:
1698 return True;
1699 except: pass;
1700 except:
1701 pass;
1702 return False;
1703
1704 def _closeWakeupSockets(self):
1705 """ Closes the wakup sockets. Caller should own the CV. """
1706 oWakeupR = self.oWakeupR;
1707 self.oWakeupR = None;
1708 if oWakeupR is not None:
1709 oWakeupR.close();
1710
1711 oWakeupW = self.oWakeupW;
1712 self.oWakeupW = None;
1713 if oWakeupW is not None:
1714 oWakeupW.close();
1715
1716 return None;
1717
1718 def cancelConnect(self):
1719 # This is bad stuff.
1720 self.oCv.acquire();
1721 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1722 self.fConnectCanceled = True;
1723 if self.fIsConnecting:
1724 oSocket = self.oSocket;
1725 self.oSocket = None;
1726 if oSocket is not None:
1727 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1728 oSocket.close();
1729
1730 oWakeupW = self.oWakeupW;
1731 self.oWakeupW = None;
1732 if oWakeupW is not None:
1733 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1734 try: oWakeupW.send('cancelled!\n');
1735 except: reporter.logXcpt();
1736 try: oWakeupW.shutdown(socket.SHUT_WR);
1737 except: reporter.logXcpt();
1738 oWakeupW.close();
1739 self.oCv.release();
1740
1741 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1742 """ Connects to the TXS server as server, i.e. the reversed setup. """
1743 assert(self.fReversedSetup);
1744
1745 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1746
1747 # Workaround for bind() failure...
1748 try:
1749 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1750 except:
1751 reporter.errorXcpt('socket.listen(1) failed');
1752 return None;
1753
1754 # Bind the socket and make it listen.
1755 try:
1756 oSocket.bind((self.sHostname, self.uPort));
1757 except:
1758 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1759 return None;
1760 try:
1761 oSocket.listen(1);
1762 except:
1763 reporter.errorXcpt('socket.listen(1) failed');
1764 return None;
1765
1766 # Accept connections.
1767 oClientSocket = None;
1768 tClientAddr = None;
1769 try:
1770 (oClientSocket, tClientAddr) = oSocket.accept();
1771 except socket.error, e:
1772 if not self.__isInProgressXcpt(e):
1773 raise;
1774
1775 # Do the actual waiting.
1776 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1777 try:
1778 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1779 except socket.error, e:
1780 if e[0] != errno.EBADF or not self.fConnectCanceled:
1781 raise;
1782 reporter.log('socket.select() on accept was canceled');
1783 return None;
1784 except:
1785 reporter.logXcpt('socket.select() on accept');
1786
1787 # Try accept again.
1788 try:
1789 (oClientSocket, tClientAddr) = oSocket.accept();
1790 except socket.error, e:
1791 if not self.__isInProgressXcpt(e):
1792 if e[0] != errno.EBADF or not self.fConnectCanceled:
1793 raise;
1794 reporter.log('socket.accept() was canceled');
1795 return None;
1796 reporter.log('socket.accept() timed out');
1797 return False;
1798 except:
1799 reporter.errorXcpt('socket.accept() failed');
1800 return None;
1801 except:
1802 reporter.errorXcpt('socket.accept() failed');
1803 return None;
1804
1805 # Store the connected socket and throw away the server socket.
1806 self.oCv.acquire();
1807 if not self.fConnectCanceled:
1808 self.oSocket.close();
1809 self.oSocket = oClientSocket;
1810 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1811 self.oCv.release();
1812 return True;
1813
1814 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1815 """ Connects to the TXS server as client. """
1816 assert(not self.fReversedSetup);
1817
1818 # Connect w/ timeouts.
1819 rc = None;
1820 try:
1821 oSocket.connect((self.sHostname, self.uPort));
1822 rc = True;
1823 except socket.error, e:
1824 iRc = e[0];
1825 if self.__isInProgressXcpt(e):
1826 # Do the actual waiting.
1827 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (e,));
1828 try:
1829 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1830 if len(ttRc[1]) + len(ttRc[2]) == 0:
1831 raise socket.error(errno.ETIMEDOUT, 'select timed out');
1832 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
1833 rc = iRc == 0;
1834 except socket.error, e:
1835 iRc = e[0];
1836 except:
1837 iRc = -42;
1838 reporter.fatalXcpt('socket.select() on connect failed');
1839
1840 if rc is True:
1841 pass;
1842 elif iRc == errno.ECONNREFUSED \
1843 or iRc == errno.EHOSTUNREACH \
1844 or iRc == errno.EINTR \
1845 or iRc == errno.ENETDOWN \
1846 or iRc == errno.ENETUNREACH \
1847 or iRc == errno.ETIMEDOUT:
1848 rc = False; # try again.
1849 else:
1850 if iRc != errno.EBADF or not self.fConnectCanceled:
1851 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
1852 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
1853 except:
1854 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
1855 return rc;
1856
1857
1858 def connect(self, cMsTimeout):
1859 # Create a non-blocking socket.
1860 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
1861 try:
1862 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
1863 except:
1864 reporter.fatalXcpt('socket.socket() failed');
1865 return None;
1866 try:
1867 oSocket.setblocking(0);
1868 except:
1869 oSocket.close();
1870 reporter.fatalXcpt('socket.socket() failed');
1871 return None;
1872
1873 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
1874 oWakeupR = None;
1875 oWakeupW = None;
1876 if hasattr(socket, 'socketpair'):
1877 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=E1101
1878 except: reporter.logXcpt('socket.socketpair() failed');
1879
1880 # Update the state.
1881 self.oCv.acquire();
1882 rc = None;
1883 if not self.fConnectCanceled:
1884 self.oSocket = oSocket;
1885 self.oWakeupW = oWakeupW;
1886 self.oWakeupR = oWakeupR;
1887 self.fIsConnecting = True;
1888 self.oCv.release();
1889
1890 # Try connect.
1891 if oWakeupR is None:
1892 oWakeupR = oSocket; # Avoid select failure.
1893 if self.fReversedSetup:
1894 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
1895 else:
1896 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
1897 oSocket = None;
1898
1899 # Update the state and cleanup on failure/cancel.
1900 self.oCv.acquire();
1901 if rc is True and self.fConnectCanceled:
1902 rc = False;
1903 self.fIsConnecting = False;
1904
1905 if rc is not True:
1906 if self.oSocket is not None:
1907 self.oSocket.close();
1908 self.oSocket = None;
1909 self._closeWakeupSockets();
1910 self.oCv.release();
1911
1912 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
1913 return rc;
1914
1915 def disconnect(self, fQuiet = False):
1916 if self.oSocket is not None:
1917 self.abReadAhead = array.array('B');
1918
1919 # Try a shutting down the socket gracefully (draining it).
1920 try:
1921 self.oSocket.shutdown(socket.SHUT_WR);
1922 except:
1923 if not fQuiet:
1924 reporter.error('shutdown(SHUT_WR)');
1925 try:
1926 self.oSocket.setblocking(0); # just in case it's not set.
1927 sData = "1";
1928 while len(sData) > 0:
1929 sData = self.oSocket.recv(16384);
1930 except:
1931 pass;
1932
1933 # Close it.
1934 self.oCv.acquire();
1935 try: self.oSocket.setblocking(1);
1936 except: pass;
1937 self.oSocket.close();
1938 self.oSocket = None;
1939 else:
1940 self.oCv.acquire();
1941 self._closeWakeupSockets();
1942 self.oCv.release();
1943
1944 def sendBytes(self, abMsg, cMsTimeout):
1945 if self.oSocket is None:
1946 reporter.error('TransportTcp.sendBytes: No connection.');
1947 return False;
1948
1949 # Try send it all.
1950 try:
1951 cbSent = self.oSocket.send(abMsg);
1952 if cbSent == len(abMsg):
1953 return True;
1954 except Exception, oXcpt:
1955 if not self.__isWouldBlockXcpt(oXcpt):
1956 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abMsg)));
1957 return False;
1958 cbSent = 0;
1959
1960 # Do a timed send.
1961 msStart = base.timestampMilli();
1962 while True:
1963 cMsElapsed = base.timestampMilli() - msStart;
1964 if cMsElapsed > cMsTimeout:
1965 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abMsg)));
1966 break;
1967
1968 # wait.
1969 try:
1970 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
1971 if len(ttRc[2]) > 0 and len(ttRc[1]) == 0:
1972 reporter.error('TranportTcp.sendBytes: select returned with exception');
1973 break;
1974 if len(ttRc[1]) == 0:
1975 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abMsg)));
1976 break;
1977 except:
1978 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
1979 break;
1980
1981 # Try send more.
1982 try:
1983 cbSent += self.oSocket.send(abMsg[cbSent:]);
1984 if cbSent == len(abMsg):
1985 return True;
1986 except Exception, oXcpt:
1987 if not self.__isWouldBlockXcpt(oXcpt):
1988 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abMsg)));
1989 break;
1990
1991 return False;
1992
1993 def __returnReadAheadBytes(self, cb):
1994 """ Internal worker for recvBytes. """
1995 assert(len(self.abReadAhead) >= cb);
1996 abRet = self.abReadAhead[:cb];
1997 self.abReadAhead = self.abReadAhead[cb:];
1998 return abRet;
1999
2000 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2001 if self.oSocket is None:
2002 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2003 return None;
2004
2005 # Try read in some more data without bothering with timeout handling first.
2006 if len(self.abReadAhead) < cb:
2007 try:
2008 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2009 if len(abBuf) > 0:
2010 self.abReadAhead.extend(array.array('B', abBuf));
2011 except Exception, oXcpt:
2012 if not self.__isWouldBlockXcpt(oXcpt):
2013 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2014 return None;
2015
2016 if len(self.abReadAhead) >= cb:
2017 return self.__returnReadAheadBytes(cb);
2018
2019 # Timeout loop.
2020 msStart = base.timestampMilli();
2021 while True:
2022 cMsElapsed = base.timestampMilli() - msStart;
2023 if cMsElapsed > cMsTimeout:
2024 if not fNoDataOk or len(self.abReadAhead) > 0:
2025 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2026 break;
2027
2028 # Wait.
2029 try:
2030 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2031 if len(ttRc[2]) > 0 and len(ttRc[0]) == 0:
2032 reporter.error('TranportTcp.recvBytes: select returned with exception');
2033 break;
2034 if len(ttRc[0]) == 0:
2035 if not fNoDataOk or len(self.abReadAhead) > 0:
2036 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2037 % (len(self.abReadAhead), cb, fNoDataOk));
2038 break;
2039 except:
2040 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2041 break;
2042
2043 # Try read more.
2044 try:
2045 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2046 if len(abBuf) == 0:
2047 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2048 % (len(self.abReadAhead), cb, fNoDataOk));
2049 self.disconnect();
2050 return None;
2051
2052 self.abReadAhead.extend(array.array('B', abBuf));
2053
2054 except Exception, oXcpt:
2055 reporter.log('recv => exception %s' % (oXcpt,));
2056 if not self.__isWouldBlockXcpt(oXcpt):
2057 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or len(self.abReadAhead) > 0:
2058 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2059 break;
2060
2061 # Done?
2062 if len(self.abReadAhead) >= cb:
2063 return self.__returnReadAheadBytes(cb);
2064
2065 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2066 return None;
2067
2068 def isConnectionOk(self):
2069 if self.oSocket is None:
2070 return False;
2071 try:
2072 ttRc = select.select([], [], [self.oSocket], 0.0);
2073 if len(ttRc[2]) > 0:
2074 return False;
2075
2076 self.oSocket.send(array.array('B')); # send zero bytes.
2077 except:
2078 return False;
2079 return True;
2080
2081 def isRecvPending(self, cMsTimeout = 0):
2082 try:
2083 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2084 if len(ttRc[0]) == 0:
2085 return False;
2086 except:
2087 pass;
2088 return True;
2089
2090
2091def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2092 """
2093 Opens a connection to a Test Execution Service via TCP, given its name.
2094 """
2095 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' % \
2096 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2097 try:
2098 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2099 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge);
2100 except:
2101 reporter.errorXcpt(None, 15);
2102 return None;
2103 return oSession;
2104
2105
2106def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2107 """
2108 Tries to open a connection to a Test Execution Service via TCP, given its name.
2109
2110 This differs from openTcpSession in that it won't log a connection failure
2111 as an error.
2112 """
2113 try:
2114 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2115 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True);
2116 except:
2117 reporter.errorXcpt(None, 15);
2118 return None;
2119 return oSession;
2120
Note: See TracBrowser for help on using the repository browser.

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette