unit tcpthreadhelper; {$mode ObjFPC}{$H+} interface uses Classes, SysUtils,lNet,lnetbase, syncobjs, extTypes; type TMainThread=class; TConnectionThread=class; { TConnectionThread } TConnectionThread=class(TThread) private fSocket: TLSocket; fCache: TRoundBuffer; fOwner: TMainThread; class function BufferToString(const Buffer: TBuffer; const len: integer): string; class procedure AddToBuffer(const Value: byte; var Buffer: TBuffer; var pos: integer); overLoad; class procedure AddToBuffer(const Value: word; var Buffer: TBuffer; var pos: integer); overLoad; class procedure AddToBuffer(const Value: dword; var Buffer: TBuffer; var pos: integer); overLoad; class procedure AddToBuffer(const Value: QWord; var Buffer: TBuffer; var pos: integer); overLoad; class procedure AddToBuffer(const Value: string; var Buffer: TBuffer; var pos: integer); overLoad; class procedure AddToBuffer(const Value: TGUID; var Buffer: TBuffer; var pos: integer); overLoad; class procedure AddToBuffer(const Value: TBuffer; var Buffer: TBuffer; var pos: integer); overLoad; class procedure AddToBuffer(const Value: TStream; var Buffer: TBuffer; var pos: integer); overLoad; class procedure AddToBuffer(const Value: TParamArray; var Buffer: TBuffer; var pos: integer); overLoad; class procedure ReadFromBuffer(var Value: byte; const Buffer: TBuffer; var pos: integer); overLoad; class procedure ReadFromBuffer(var Value: word; const Buffer: TBuffer; var pos: integer); overLoad; class procedure ReadFromBuffer(var Value: dword; const Buffer: TBuffer; var pos: integer); overLoad; class procedure ReadFromBuffer(var Value: QWord; const Buffer: TBuffer; var pos: integer); overLoad; class procedure ReadFromBuffer(var Value: string; const Buffer: TBuffer; var pos: integer); overLoad; class procedure ReadFromBuffer(var Value: TGUID; const Buffer: TBuffer; var pos: integer); overLoad; class procedure ReadFromBuffer(var Value: TBuffer; const Buffer: TBuffer; var pos: integer); overLoad; class procedure ReadFromBuffer(out Value: TStream; const Buffer: TBuffer; var pos: integer); overLoad; class procedure ReadFromBuffer(out Value: TStrings; const Buffer: TBuffer; var pos: integer); overLoad; class procedure ReadFromBuffer(out Value: TParamArray; const Buffer: TBuffer; var pos: integer); overLoad; class procedure InitBuffer(buffSize: dword; out Buffer: TBuffer; out pos: integer); public ID: TGUID; recNo: qword; property Owner: TMainThread read fOwner; property Socket:TLSocket read fSocket; property Cache: TRoundBuffer read fCache; procedure log(ALevel: TLogLevel; msg: string); procedure ProcessMessage(const mode: byte;const Code:DWORD; const Param:QWord; const ACommand: string;const Values: TStrings; const intData: TParamArray; const Data: TStream); virtual; abstract; class function Role: string; virtual; abstract; procedure SendBuffer(const Buffer: TBuffer; Len: dword); function ReceiveBuffer(var Buffer: TBuffer; out Len: dword): boolean; procedure SendHeader(packetType,state: byte; Code:DWORD; QP:QWORD;SP:string); function ReceiveHeader(out packetType,state: byte;out Sender:TGUID; out num:QWORD; out Code:DWORD;out QP:QWORD;out SP:string): boolean; procedure SendData(part: byte; const Data: TStream); overload; procedure SendData(part: byte; const Data: TBuffer); overload; procedure SendData(part: byte; const Data: TStrings); overload; procedure SendData(part: byte; const Data: TParamArray); overload; procedure SendMessage(const mode: byte; const Status: DWORD; const QParam: QWord; const sParam: string); overload; procedure SendMessage(const mode: byte; const Status: DWORD; const QParam: QWord;const AValue: string; const AKeys: TStrings); overload; procedure SendMessage(const mode: byte; const CommandID: DWORD;const QParam: QWord; const AValue: string; const AKeys: TStrings;const IntData: TParamArray; const AData: TStream); overload; function ReceiveMessage(out mode: byte;out Sender: TGUID; out rNum: QWord;out CommandID: DWORD; out QParam:QWord; out Value: string; out intData: TParamArray; out Keys: TStrings; out Data: TStream): boolean; virtual; constructor Create(Aowner: TMainThread; ASocket:TLSocket); destructor Destroy; override; procedure Execute;override; procedure TerminatedSet; override; end; TConnectionThreadClass = class of TConnectionThread; { TMainThread } TMainThread=class(TThread) private fCon: TLTCP; fPort: integer; fclients: TList; flogger: TLogger; fThreadClass: TConnectionThreadClass; fStarted:boolean; fComplete: boolean; function getThread(index: TLSocket): TConnectionThread; protected procedure Log(ALevel: TLogLevel; Sender:TObject; msg: string); procedure doStart; virtual; procedure TerminateClients; property Complete: boolean read fComplete; public property Port: integer read fPort; property Connect: TLTCP read fCon; property Client[index: TLSocket]: TConnectionThread read getThread; procedure RemoveClient(clt:TConnectionThread); procedure dataReady(aSocket: TLSocket); procedure ProcessConnect(thread: TConnectionThread); virtual; procedure ProcessAccept(thread: TConnectionThread); virtual; procedure Accept(aSocket: TLSocket); procedure doDisconnect(aSocket: TLSocket); procedure doConnect(aSocket: TLSocket); procedure TerminatedSet;override; procedure NetError(const msg: string; aSocket: TLSocket); constructor Create(AThreadClass: TConnectionThreadClass; ALogger: TLogger; APort: integer); destructor Destroy; override; procedure SetComplete; end; { TClientRequest } implementation uses lCommon; function TMainThread.getThread(index: TLSocket): TConnectionThread; var clt: TConnectionThread; i: integer; begin for i := 0 to fclients.Count-1 do if TConnectionThread(fclients[i]).Socket=index then begin result := TConnectionThread(fclients[i]); log(mtExtra,self,format('getThread(%d) %s',[index.Handle,guidToString(result.ID)])); exit; end; result := fThreadClass.Create(self,index); log(mtExtra,self,format('new Thread(%d) %s',[index.Handle,guidToString(result.ID)])); fclients.Add(Result); end; procedure TMainThread.TerminateClients; var i: integer; clt: TConnectionThread; begin log(mtExtra,Self,'Terminate Clients'); for i := fclients.Count-1 downto 0 do begin sleep(0); clt := TConnectionThread(fclients[i]); try log(mtExtra,self,GuidToString(clt.ID)); clt.Terminate; clt.WaitFor; clt.free; except on e: Exception do begin log(mtError,self, '!!ERROR Destroy ' + e.Message); end; end; end; fClients.Clear; end; procedure TMainThread.Log(ALevel: TLogLevel; Sender: TObject; msg: string); begin if assigned(fLogger) then fLogger(ALevel, Sender,Msg); end; procedure TMainThread.doStart; begin fStarted := true; fComplete:=false; end; procedure TMainThread.RemoveClient(clt: TConnectionThread); begin fclients.Remove(clt); end; procedure TMainThread.dataReady(aSocket: TLSocket); var clt: TConnectionThread; begin log(mtExtra,self,'dataReady'); if Terminated then exit; clt := Client[aSocket]; clt.Cache.WriteReady.WaitFor(INFINITE); while clt.Cache.ReadFromSocket(aSocket)<>0 do begin sleep(0); end; end; procedure TMainThread.ProcessConnect(thread: TConnectionThread); begin end; procedure TMainThread.ProcessAccept(thread: TConnectionThread); begin end; procedure TMainThread.Accept(aSocket: TLSocket); var clt: TConnectionThread; begin log(mtExtra,self,'connect'); if Terminated then exit; clt := Client[aSocket]; log(mtExtra,self,format('connected %s on %d ',[GUIDToString(clt.ID), aSocket.Handle])); ProcessAccept(clt); clt.start; end; procedure TMainThread.doDisconnect(aSocket: TLSocket); var clt: TConnectionThread; begin if terminated then exit; log(mtExtra,self,'disconnect'); try clt := Client[aSocket]; if clt.terminated then exit; log(mtExtra,self,format('disconnected %s on %d ',[GUIDToString(clt.ID), aSocket.Handle])); clt.Terminate; fclients.remove(clt); except on e: Exception do begin log(mtError,self,'!!ERROR doDisconnect '+e.Message); raise; end; end; end; procedure TMainThread.doConnect(aSocket: TLSocket); var clt: TConnectionThread; begin log(mtExtra,self,'doConnect'); if Terminated then exit; clt := Client[aSocket]; log(mtExtra,self,format('connected %s on %d ',[GUIDToString(clt.ID), aSocket.Handle])); ProcessConnect(clt); clt.Start; end; procedure TMainThread.TerminatedSet; begin inherited TerminatedSet(); end; procedure TMainThread.NetError(const msg: string; aSocket: TLSocket); begin if assigned(aSocket) then log(mtWarning, self,'!!NETERROR on '+inttostr(aSocket.Handle)+#09+msg) else log(mtWarning, self,'!!NETERROR '+msg); end; constructor TMainThread.Create(AThreadClass: TConnectionThreadClass; ALogger: TLogger; APort: integer); begin inherited Create(true); fStarted := false; fThreadClass:=AThreadClass; fCon := TLTcp.Create(nil); fclients := TList.Create; fLogger := ALogger; fPort := APort; Connect.OnDisconnect:=@doDisconnect; Connect.OnReceive:=@dataReady; Connect.Timeout:=100; log(mtExtra,self,'create main thread'); end; destructor TMainThread.Destroy; begin fClients.Free; fCon.Free; Inherited Destroy; end; procedure TMainThread.SetComplete; begin fComplete:=true; end; { TConnectionThread } class function TConnectionThread.BufferToString(const Buffer: TBuffer; const len: integer): string; var i: integer; begin result := ''; for i := 1 to len do begin result := result + IntToHex(Buffer[i-1],2)+' '; if (i mod 64)=0 then result := result +#13#10; end; end; class procedure TConnectionThread.AddToBuffer(const Value: byte; var Buffer: TBuffer; var pos: integer); begin Buffer[pos] := Value; inc(pos); end; class procedure TConnectionThread.AddToBuffer(const Value: word; var Buffer: TBuffer; var pos: integer); var i: integer; b: byte; begin for i := 0 to 1 do begin b := (Value shr (8*i)) and $FF; Buffer[pos] := b; inc(pos); end; end; class procedure TConnectionThread.AddToBuffer(const Value: dword; var Buffer: TBuffer; var pos: integer); var i: integer; b: byte; begin for i := 0 to 3 do begin b := (Value shr (8*i)) and $FF; Buffer[pos] := b; inc(pos); end; end; class procedure TConnectionThread.AddToBuffer(const Value: QWord; var Buffer: TBuffer; var pos: integer); var i: integer; b: Byte; begin for i := 0 to 7 do begin b := (Value shr (8*i)) and $FF; Buffer[pos] := b; inc(pos); end; end; class procedure TConnectionThread.AddToBuffer(const Value: string; var Buffer: TBuffer; var pos: integer); var len,i: DWORD; p: PChar; b: byte; begin len := Length(Value); AddToBuffer(Len,Buffer,pos); p := PChar(Value); for i := 1 to len do begin b :=byte(p^); AddToBuffer(b,Buffer,pos); inc(p); end; end; class procedure TConnectionThread.AddToBuffer(const Value: TGUID; var Buffer: TBuffer; var pos: integer); var i: integer; begin AddToBuffer(Value.D1,Buffer,pos); AddToBuffer(Value.D2,Buffer,pos); AddToBuffer(Value.D3,Buffer,pos); for i := 0 to 7 do AddToBuffer(Value.D4[i],Buffer,pos); end; class procedure TConnectionThread.AddToBuffer(const Value: TBuffer; var Buffer: TBuffer; var pos: integer); var len: DWORD; i: integer; begin len := length(Value); AddToBuffer(len,Buffer,pos); for i := 0 to len-1 do AddToBuffer(Value[i],Buffer,pos); end; class procedure TConnectionThread.AddToBuffer(const Value: TStream; var Buffer: TBuffer; var pos: integer); var len: QWORD; i: integer; b: byte; begin len := Value.Size; AddToBuffer(len,Buffer,pos); Value.seek(0,soFromBeginning); for i := 0 to len-1 do begin Value.Read(b,1); AddToBuffer(b,Buffer,pos); end; end; class procedure TConnectionThread.AddToBuffer(const Value: TParamArray; var Buffer: TBuffer; var pos: integer); var l,i: DWORD; begin l := Length(Value); AddToBuffer(l,Buffer,pos); for i := low(Value) to high(Value) do AddToBuffer(Value[i],Buffer,pos); end; class procedure TConnectionThread.ReadFromBuffer(var Value: byte; const Buffer: TBuffer; var pos: integer); begin Value := Buffer[pos]; inc(pos); end; class procedure TConnectionThread.ReadFromBuffer(var Value: word; const Buffer: TBuffer; var pos: integer); var i: integer; begin Value := 0; for i := 0 to 1 do begin Value := Value or (Buffer[pos] shl (8*i)); inc(pos); end; end; class procedure TConnectionThread.ReadFromBuffer(var Value: dword; const Buffer: TBuffer; var pos: integer); var i: integer; begin Value := 0; for i := 0 to 3 do begin Value := Value or (Buffer[pos] shl (8*i)); inc(pos); end; end; class procedure TConnectionThread.ReadFromBuffer(var Value: QWord; const Buffer: TBuffer; var pos: integer); var i: integer; begin Value := 0; for i := 0 to 7 do begin Value := Value or (Buffer[pos] shl (8*i)); inc(pos); end; end; class procedure TConnectionThread.ReadFromBuffer(var Value: string; const Buffer: TBuffer; var pos: integer); var len: DWORD; i: integer; p: PChar; begin ReadFromBuffer(Len,Buffer,pos); Value := StringOfChar(' ',len); for i := 1 to len do begin Value[i] := chr(Buffer[pos]); inc(pos); end; end; class procedure TConnectionThread.ReadFromBuffer(var Value: TGUID; const Buffer: TBuffer; var pos: integer); var i: integer; begin ReadFromBuffer(Value.D1,Buffer,pos); ReadFromBuffer(Value.D2,Buffer,pos); ReadFromBuffer(Value.D3,Buffer,pos); for i := 0 to 7 do ReadFromBuffer(Value.D4[i],Buffer,pos); end; class procedure TConnectionThread.ReadFromBuffer(var Value: TBuffer; const Buffer: TBuffer; var pos: integer); var len: DWORD; i: integer; begin ReadFromBuffer(len,Buffer,pos); setLength(Value,len); for i := 0 to len-1 do ReadFromBuffer(Value[i],Buffer,pos); end; class procedure TConnectionThread.ReadFromBuffer(out Value: TStream; const Buffer: TBuffer; var pos: integer); var len: QWORD; i: integer; b: byte; begin Value := TMemoryStream.Create; ReadFromBuffer(len,Buffer,pos); if len=0 then exit; for i := 0 to len-1 do begin ReadFromBuffer(b,Buffer,pos); Value.Write(b,1); end; Value.seek(0,soFromBeginning); end; class procedure TConnectionThread.ReadFromBuffer(out Value: TStrings; const Buffer: TBuffer; var pos: integer); var i,w,d: dword; s: string; begin Value := TStringList.Create; ReadFromBuffer(w,Buffer,pos); if w=0 then exit; for i := 0 to w-1 do begin ReadFromBuffer(s,Buffer,pos); ReadFromBuffer(d,Buffer,pos); Value.AddObject(s, TObject(PtrInt(d))); end; end; class procedure TConnectionThread.ReadFromBuffer(out Value: TParamArray; const Buffer: TBuffer; var pos: integer); var l,i: DWORD; begin ReadFromBuffer(l,Buffer,pos); SetLength(Value,l); if l=0 then exit; for i := 0 to l-1 do ReadFromBuffer(Value[i],Buffer,pos); end; class procedure TConnectionThread.InitBuffer(buffSize: dword; out Buffer: TBuffer; out pos: integer); begin SetLength(Buffer,buffSize); pos := 0; end; procedure TConnectionThread.log(ALevel: TLogLevel; msg: string); begin if assigned(fOwner) then fOwner.log(ALevel, self,Role+#09+ GuidToString(ID)+#09+msg); end; procedure TConnectionThread.SendBuffer(const Buffer: TBuffer; Len: dword); var p,t: PByte; l,rem,i: integer; part_id,tmp: QWORD; b2: array[0..7] of byte; begin log(mtExtra,'Send buffer '+inttostr(len)); try rem := len+Sizeof(dword)+Sizeof(QWord); p := GetMem(rem); try t := p; CopyBytes(t,PacketStart); CopyBytes(t,len); CopyBytes(t,Buffer); t := p; repeat l := Socket.send(t^,rem); dec(rem,l); inc(t,l); if l=0 then sleep(100); until terminated or (rem<=0); finally //log(format('%p',[p])); freeMem(p); end; except on e:Exception do begin log(mtError,'!!ERROR SendBuffer '+e.message); raise; end; end; end; function TConnectionThread.ReceiveBuffer(var Buffer: TBuffer; out Len: dword ): boolean; var p: PByte; i,l,rem: integer; lbytes: array[0..3] of byte; b2: array[0..7] of byte; part_id: QWORD; begin result := false; if Terminated then begin log(mtExtra,'ReceiveBuffer terminated'); exit; end; try Cache.Read(part_id); if Part_id<>PacketStart then begin log(mtError,'ReceiveBuffer PacketStart '+inttohex(part_id,16)); exit; end; Cache.Read(len); setlength(Buffer,len); if len=0 then begin log(mtError,'ReceiveBuffer Length=0'); exit; end; rem := len; p := PByte(Buffer); repeat l := Cache.pop(p^,rem); dec(rem,l); inc(p,l); if Terminated then exit; until terminated or (rem<=0) ; log(mtExtra,'Receive buffer '+inttostr(len)); result := true; except on e:Exception do begin log(mtError,'!!ERROR ReceiveBuffer '+e.message); raise; end; end; end; procedure TConnectionThread.SendHeader(packetType, state: byte; Code: DWORD; QP: QWORD; SP: string); var Buffer: TBuffer; pos: integer; begin try log(mtExtra,format('SendHeader(%d) Code=%d, Param=%x, Command=%s',[packettype,Code,QP,SP])); InitBuffer(sizeof(BYTE)*(Length(SP)+2)+16+2*sizeof(QWord)+sizeof(DWORD)*2,Buffer,pos); AddToBuffer(packetType,Buffer,pos); AddToBuffer(state,Buffer,pos); AddToBuffer(ID,Buffer,pos); AddToBuffer(recNo,Buffer,pos); AddToBuffer(Code,Buffer,pos); AddToBuffer(QP,Buffer,pos); AddToBuffer(SP,Buffer,pos); SendBuffer(Buffer,pos); except on e:Exception do begin log(mtError,'!!ERROR SendHeader '+e.message); raise; end; end; end; function TConnectionThread.ReceiveHeader(out packetType, state: byte; out Sender: TGUID; out num: QWORD; out Code: DWORD; out QP: QWORD; out SP: string ): boolean; var Buffer: TBuffer; len: dword; pos: integer; begin result := false; if Terminated then exit; try if not ReceiveBuffer(Buffer,len) then exit; pos := 0; ReadFromBuffer(packetType,Buffer,pos); ReadFromBuffer(State,Buffer,pos); ReadFromBuffer(Sender,Buffer,pos); ReadFromBuffer(num,Buffer,pos); ReadFromBuffer(Code,Buffer,pos); ReadFromBuffer(QP,Buffer,pos); ReadFromBuffer(SP,Buffer,pos); log(mtExtra,format('ReceiveHeader(%d) Code=%d, Param=%x, Command=%s',[packettype,Code,QP,SP])); result := true; except on e:Exception do begin log(mtError,'!!ERROR ReceiveHeader '+e.message); raise; end; end; end; procedure TConnectionThread.SendMessage(const mode: byte; const Status: DWORD; const QParam: QWord; const sParam: string); begin SendHeader(0,mode,Status,QParam,sParam); end; procedure TConnectionThread.SendMessage(const mode: byte; const Status: DWORD; const QParam: QWord; const AValue: string; const AKeys: TStrings); begin if assigned(AKeys) then begin SendHeader(1,mode,Status,QParam,AValue); SendData(1,AKeys); end else SendHeader(0,mode,Status,QParam,AValue); end; procedure TConnectionThread.SendMessage(const mode: byte; const CommandID: DWORD; const QParam: QWord; const AValue: string; const AKeys: TStrings; const IntData: TParamArray; const AData: TStream); begin try if assigned(AKeys) and assigned(AData) and (length(IntData)>0) then begin SendHeader(7,mode,CommandID,QParam,AValue); SendData(1,AKeys); SendData(2,IntData); SendData(3,AData); end else if (length(IntData)>0) and assigned(AKeys) then begin SendHeader(6,mode,CommandID,QParam,AValue); SendData(1,AKeys); SendData(2,IntData); end else if (length(IntData)>0) and assigned(AData) then begin SendHeader(5,mode,CommandID,QParam,AValue); SendData(1,IntData); SendData(2,AData); end else if assigned(AKeys) and assigned(AData) then begin SendHeader(4,mode,CommandID,QParam,AValue); SendData(1,AKeys); SendData(2,AData); end else if length(IntData)>0 then begin SendHeader(3,mode,CommandID,QParam,AValue); SendData(1,IntData); end else if assigned(AData) then begin SendHeader(2,mode,CommandID,QParam,AValue); SendData(2,AData); end else if assigned(AKeys) then begin SendHeader(1,mode,CommandID,QParam,AValue); SendData(1,AKeys); end else SendHeader(0,mode,CommandID,QParam,AValue); except on E:exception do begin log(mtError,format('send error(%s) %s',[e.classname,e.message])); raise; end; end; end; procedure TConnectionThread.SendData(part: byte; const Data: TStream); var Buffer: TBuffer; pos: integer; footer: dWORD; begin try log(mtExtra,'Send Stream '+inttostr(Data.Size)); setLength(Buffer,1+Data.Size+8); pos := 0; AddToBuffer(part,Buffer,pos); AddToBuffer(Data,Buffer,pos); SendBuffer(Buffer,pos); except on e:Exception do begin raise; end; end; end; procedure TConnectionThread.SendData(part: byte; const Data: TBuffer); var Buffer: TBuffer; pos: integer; footer: dWORD; begin log(mtExtra,'Send Buffer '+inttostr(length(Data))); try setLength(Buffer,length(Data)+4+1); pos := 0; AddToBuffer(part,Buffer,pos); AddToBuffer(Data,Buffer,pos); SendBuffer(Buffer,pos); except on e:Exception do begin raise; end; end; end; procedure TConnectionThread.SendData(part: byte; const Data: TStrings); var Buffer: TBuffer; pos: integer; w,footer: dWORD; len,i: integer; begin try log(mtExtra,'Send Strings'); LogStrings(mtExtra,fOwner.flogger,self,'KEYS',Data); len := 1+4+8*Data.Count; for i:=0 to Data.Count-1 do inc(len,length(Data[i])); setLength(Buffer,len); pos := 0; AddToBuffer(part,Buffer,pos); w := Data.Count; AddToBuffer(w,Buffer,pos); for i := 0 to Data.Count-1 do begin AddToBuffer(Data[i],Buffer,pos); w := ptrInt(Pointer(Data.Objects[i])) and $FFFFFFFF; AddToBuffer(w,Buffer,pos); end; SendBuffer(Buffer,pos); except on e:Exception do begin raise; end; end; end; procedure TConnectionThread.SendData(part: byte; const Data: TParamArray); var i,pos: integer; len: DWORD; Buffer: TBuffer; begin len := length(Data); log(mtExtra,'Send ParamArray '+inttostr(length(Data))); InitBuffer(Sizeof(byte)+(len+1)*SizeOf(DWORD),Buffer,pos); AddToBuffer(part,Buffer,pos); AddToBuffer(len,Buffer,pos); for i := low(Data) to High(Data) do AddToBuffer(Data[i],Buffer,pos); SendBuffer(Buffer,pos); end; function TConnectionThread.ReceiveMessage(out mode: byte; out Sender: TGUID; out rNum: QWord; out CommandID: DWORD; out QParam: QWord; out Value: string; out intData: TParamArray; out Keys: TStrings; out Data: TStream): boolean; var Buffer: TBuffer; Len: dword; pos: integer; s: string; b,b1: byte; begin result := false; if Terminated then exit; try log(mtExtra,'ReceiveMessage'); if not ReceiveHeader(b,mode,Sender,rNum,CommandID,QParam,Value) then exit; if Terminated then exit; case b of 1: begin if not ReceiveBuffer(Buffer,len) then exit; pos := 0; ReadFromBuffer(b,Buffer,pos); if b<>1 then raise EFormatException.Create(''); ReadFromBuffer(Keys,Buffer,pos); LogStrings(mtExtra, fOwner.flogger,self,'KEYS',Keys); end; 2: begin if not ReceiveBuffer(Buffer,len) then exit; pos := 0; ReadFromBuffer(b,Buffer,pos); if b<>1 then raise EFormatException.Create(''); ReadFromBuffer(Data,Buffer,pos); end; 3: begin if not ReceiveBuffer(Buffer,len) then exit; pos := 0; ReadFromBuffer(b,Buffer,pos); if b<>1 then raise EFormatException.Create(''); ReadFromBuffer(intData,Buffer,pos); end; 4: begin if not ReceiveBuffer(Buffer,len) then exit; pos := 0; ReadFromBuffer(b,Buffer,pos); if b<>1 then raise EFormatException.Create(''); ReadFromBuffer(Keys,Buffer,pos); LogStrings(mtExtra,fOwner.flogger,self,'Values',Keys); if not ReceiveBuffer(Buffer,len) then exit; pos := 0; ReadFromBuffer(b,Buffer,pos); if b<>2 then raise EFormatException.Create(''); ReadFromBuffer(Data,Buffer,pos); end; 5: begin if not ReceiveBuffer(Buffer,len) then exit; pos := 0; ReadFromBuffer(b,Buffer,pos); if b<>1 then raise EFormatException.Create(''); ReadFromBuffer(intData,Buffer,pos); if not ReceiveBuffer(Buffer,len) then exit; pos := 0; ReadFromBuffer(b,Buffer,pos); if b<>2 then raise EFormatException.Create(''); ReadFromBuffer(Data,Buffer,pos); end; 6: begin if not ReceiveBuffer(Buffer,len) then exit; pos := 0; ReadFromBuffer(b,Buffer,pos); if b<>1 then raise EFormatException.Create(''); ReadFromBuffer(Keys,Buffer,pos); if not ReceiveBuffer(Buffer,len) then exit; pos := 0; ReadFromBuffer(b,Buffer,pos); if b<>2 then raise EFormatException.Create(''); ReadFromBuffer(intData,Buffer,pos); end; 7: begin if not ReceiveBuffer(Buffer,len) then exit; pos := 0; ReadFromBuffer(b,Buffer,pos); if b<>1 then raise EFormatException.Create(''); ReadFromBuffer(Keys,Buffer,pos); if not ReceiveBuffer(Buffer,len) then exit; pos := 0; ReadFromBuffer(b,Buffer,pos); if b<>2 then raise EFormatException.Create(''); ReadFromBuffer(intData,Buffer,pos); if not ReceiveBuffer(Buffer,len) then exit; pos := 0; ReadFromBuffer(b,Buffer,pos); if b<>3 then raise EFormatException.Create(''); ReadFromBuffer(Data,Buffer,pos); end; end; result := true; except on e:Exception do begin log(mtError,'!!ERROR ReceiveMessage '+e.message); raise; end; end; end; constructor TConnectionThread.Create(Aowner: TMainThread; ASocket: TLSocket); var i,d1,d2 : integer; begin inherited Create(true); //FreeOnTerminate:=true; fCache := TRoundBuffer.Create(@AOwner.log, 1024*1024); fSocket := ASocket; fOwner := AOwner; CreateGuid(ID); recNo := 0; log(mtExtra,'Create'); end; destructor TConnectionThread.Destroy; begin fCache.Free; fOwner.removeClient(self); inherited Destroy; end; procedure TConnectionThread.Execute; var Sender: TGUID; num,Param: QWORD; CommandID: DWORD; Value: string; intData: TParamArray; Keys: TStrings; Data: TStream; mode: byte; begin log(mtExtra,'start thread'); while not terminated do begin if cache.ReadReady.WaitFor(1000)<>wrSignaled then begin sleep(10);continue;end; if terminated then break; if not Socket.Connected then break; Keys := nil; Data := nil; try if ReceiveMessage(mode,Sender,num,CommandID,Param,Value,intData,Keys,Data) then ProcessMessage(mode,CommandID,Param,Value,Keys,intData,Data); finally if assigned(Keys) then Keys.Free; if assigned(Data) then Data.Free; setLength(intData,0); end; end; Cache.Close; //Socket.Disconnect(); end; procedure TConnectionThread.TerminatedSet; begin log(mtExtra,'terminate required'); Cache.Close; fOwner.removeClient(self); end; end.