Programmierung

Inter-Thread-Kommunikation

Anknüpfend an den Artikel Multithreading – Parallelisierung und Nebenläufigkeit – der die Hintergrundverarbeitung unter Verwendung von Threads erläutert und bereits grundsätzliche Mechanismen zur Datenübergabe und zur Steuerung von Threads vorstellt – gehe ich in diesem Artikel auf eine weiterführende Möglichkeiten zur Kommunikation mit als Thread oder Prozess gestarteten Jobs ein: Der Austausch von MSX-Nachrichten.

Der Austausch von MSX-Nachrichten kann sowohl uni- als auch bidirektional erfolgen. Das bedeutet, dass eine Seite entweder nur Nachrichten versenden, nur Nachrichten empfangen oder aber sowohl Nachrichten versenden als auch empfangen kann. MSX-Nachrichten können dabei nicht nur zwischen dem Starter eines Jobs und dem Job, sondern auch zwischen zwei verschiedenen Jobs ausgetauscht werden.

Der Transport der Nachrichten ist über sogenannte Pipes gelöst. Sie ermöglichen einen sehr schnellen Datenaustausch ohne Latenzen.

Job starten

Die folgende Funktion startet einen Job als Thread und initiert eine bidirektionale Kommunikation, indem sie je einen Kanal zum Senden und Emfpangen von Nachrichten öffnet.

sub JobStartMSX
(
  var vMsxWrite         : handle; // Sendekanal
  var vMsxRead          : handle; // Empfangskanal
)
: handle;                         // Kontrollobjekt/Fehler

  local
  {
    tJobID              : int;
    tJob                : handle;
    tErr                : int;
  }

{
  // Job starten
  tJobID # JobStart(_JobThread, 5, __PROC__ + ':' + 'JobMSX');
  if (tJobID > 0)
  {
    // Kontrollobjekt öffnen
    tJob # JobOpen(tJobID);
    if (tJob > 0)
    {
      // Sendekanal öffnen
      vMsxWrite # MsxOpen(_MsxThread | _MsxWrite, tJob);
      // Empfangskanal öffnen
      vMsxRead # MsxOpen(_MsxThread | _MsxRead, tJob);
    }
    else
      tErr # tJob;
  }
  else
    tErr # tJobID;

  // Kein Fehler aufgetreten
  if (tErr = _ErrOK)
    // Kontrollobjekt zurückgeben
    return(tJob);
  else
    // Fehler zurückgeben
    return(tErr);
}

Job ausführen

Die aufgerufene Job-Funktion öffnet ebenfalls einen Empfangs- und einen Sendekanal. Sie verarbeitet alle eingehenden Nachrichten bis sie gestoppt wird.

sub JobMSX
(
  aJob                  : handle; // Job
  aEvt                  : int;    // ohne Bedeutung
)

  local
  {
    tMsxRead            : handle;
    tMsxWrite           : handle;
    tID                 : int;
  }

{
  try
  {
    // Timeout-Fehler ignorieren
    ErrTryIgnore(_ErrTimeout);

    // Empfangskanal öffnen
    tMsxRead # MsxOpen(_MsxThread | _MsxRead, aJob);
    // Sendekanal öffnen
    tMsxWrite # MsxOpen(_MsxThread | _MsxWrite, aJob);

    // Solange nicht gestoppt
    while (!aJob->spStopRequest)
    {
      // Nachricht lesen
      if (tMsxRead->MsxRead(_MsxMessage, tID) = _ErrOK)
      {
        // Nachricht schreiben
        tMsxWrite->MsxWrite(_MsxMessage, tID);
        // Nachricht verarbeiten
        switch (tID)
        {
          // ...
        }
        // Nachrichtenende lesen
        tMsxRead->MsxRead(_MsxEnd, tID);
        // Nachrichtenende schreiben und Nachricht senden
        tMsxWrite->MsxWrite(_MsxEnd, 0);
      }
    }
  }

  // Empfangskanal schließen
  tMsxRead->MsxClose();
  // Sendekanal schließen
  tMsxWrite->MsxClose();
}

Entscheidend ist hier der Funktionsaufruf tMsxRead->MsxRead(_MsxMessage, tID). Die Funktion wartet bis eine Nachricht gelesen werden kann, oder ein Timeout auftritt. Im Falle eines Timeouts tritt der Fehler _ErrTimeout auf und die Anweisung wird wiederholt. Standardmäßig liegt der Timeout bei 5 Minuten. Er kann aber auch mit der Anweisung JobControl(_JobMsxTimeoutRead) angepasst werden.

Beendet werden kann der Job entweder mit der Funktion JobControl() mit den Kommandos _JobTerminate oder _JobStop oder durch Beendigung des Elternprozesses, also des Clients. Wird der Job beendet, während er auf Nachrichten wartet, liefert die Funktion MsxRead() den Fehler _ErrTerminated.

Job stoppen

Mit der nachfolgenden Funktion kann der Job nach dessen Start wieder gestoppt werden:

sub JobStopMSX
(
  aJob                  : handle; // Kontrollobjekt
  aMsxWrite             : handle; // Sendekanal
  aMsxRead              : handle; // Empfangskanal
)
{
  // Empfangskanal schließen
  aMsxRead->MsxClose();
  // Sendekanal schließen
  aMsxWrite->MsxClose();
  // Job beenden ohne auf Ende zu warten
  aJob->JobControl(_JobStop);
  // Kontrollobjekt schließen
  aJob->JobClose();
}

Mit Job kommunizieren

Eine Kommunikation mit einem solchen Job könnte beispielsweise wie folgend ablaufen:

// Job starten (zum Beispiel beim Starten der Applikation)
gJob # JobStartMSX(var gMsxWrite, var gMsxRead);

// ...

// Nachricht schreiben
gMsxWrite->MsxWrite(_MsxMessage, 1);
// ...
// Nachrichtenende schreiben und Nachricht senden
gMsxWrite->MsxWrite(_MsxEnd, 0);

// Nachricht lesen
gMsxRead->MsxRead(_MsxMessage, tID);
// ...
// Nachrichtenende lesen
gMsxRead->MsxRead(_MsxEnd, tID);

// ...

// Job stoppen (zum Beispiel beim Beenden der Applikation)
gJob->JobStopMSX(gMSXWrite, gMsxRead);

Über die Eigenschaften spJobMsxReadQ und spJobMsxWriteQ kann zusätzlich die Anzahl der noch zu empfangenden beziehungsweise noch zu sendenden Nachrichten ermittelt werden.

Keine Kommentare

Kommentar abgeben