//CYCLONE MATRIX HOST NODE APPLICATION //Hermann L. Johnson, 2019 //Free for unmodified distribution and non-commercial use. import java.io.*; import java.net.*; import java.util.*; import java.util.zip.*; public class ClientThread extends Thread { private int DOWNLOAD_PORT; private int[] UPLOAD_PORTS; private String SERVER_ADDRESS; private Socket downloadSocket, uploadSocket; private InputStream d_in; //Download input private BufferedWriter d_out, u_out; //Download and upload output private boolean running; private Vector indexLines; private Vector index_lines; private static int SOCKET_TIMEOUT = 4000; private DataManager data_manager; private byte[] blankbytes; //NOTE: Do not even look up mac addresses from IP if nothing is blacklisted //or whitelisted. This will waste a lot of time. //TEST LONG ENTRY DOWNLOADS public ClientThread(String serverAddr, int downloadPort, int[] uploadPorts, DataManager dataManager) { super("ClientThread"); setRunning(false); SERVER_ADDRESS = serverAddr; DOWNLOAD_PORT = downloadPort; UPLOAD_PORTS = uploadPorts; data_manager = dataManager; blankbytes = new byte[10000]; for(int count=0;count<10000;count++) { blankbytes[count] = 127; } } public synchronized boolean running() { return running; } public synchronized void setRunning(boolean newValue) { running = newValue; } public void killThread() { setRunning(false); } public void run() { setRunning(true); boolean shutdownNow = false; indexLines = new Vector(); index_lines = new Vector(); //Set up the network socket. try { downloadSocket = new Socket(SERVER_ADDRESS,DOWNLOAD_PORT); downloadSocket.setSoTimeout(SOCKET_TIMEOUT); } catch(Exception e) { System.err.println( "**Error setting up client download socket! Client failure."); shutdownNow = true; } if(shutdownNow) { try { downloadSocket.close(); } catch(Exception e) { System.err.println( "Error shutting down client download socket after socket setup failure!"); } setRunning(false); return; } //Acquire index from the server. indexLines = acquireIndexOrHashes(true,""); if(!running) return; //Build the index as a string array vector. try { buildStringIndex(); } catch(Exception e) { System.err.println("Error building index string array!"); } Vector missing_datafiles = new Vector(); finalizeIndexForUse(index_lines, missing_datafiles); //Add the remote datafiles that are missing entirely to local data. int num_new_lines; for(int count=0;count hashesToUpload = new Vector(); Vector remoteHashes = new Vector(); Vector localHashes = new Vector(); Vector entriesToDownload = new Vector(); String currentDataFile = ""; for(int count=0;count downloadHashes, boolean entireFile) { //If there is nothing to do. if((downloadHashes.size()==0)&&!entireFile) return; boolean breakWhile = false; boolean shutdownNow = false; //Input and Output strings for communications. String inputString, outputString; TrueStringTokenizer sttok; //This tries to get either the first entry or the entire file as //a ZIP. If we were seeking the entire file and it succeeds, it is //done. If we were just seeking certain entries, it continues to try //to get those entries as ZIP, skipping the first one. //Failure to get ZIP causes a retry for TXT, whether for the entire //file or for specific entries, starting at the first one. try { //Set up the input and output streams. d_out = new BufferedWriter(new PrintWriter( downloadSocket.getOutputStream(), true)); d_in = downloadSocket.getInputStream(); byte[] readBytes = new byte[10000]; int bufferContentSize = 0; String entryLine = new String(""); BufferedInputStream bis = new BufferedInputStream(d_in); ZipInputStream zis = new ZipInputStream(bis); boolean zipcapable = false; //If entireFile, get the entire file. if(entireFile) outputString = "GET /"+currentDataFile+".zip\n"; //Or get the first hash member, to test for ZIP capabilities. else outputString = "GET /"+currentDataFile+".zip/"+ downloadHashes.elementAt(0)+"\n"; d_out.write(outputString,0,outputString.length()); d_out.flush(); breakWhile = false; try { zis.getNextEntry(); } catch(Exception e) { System.err.println("ZIP input stream error."); zipcapable = false; breakWhile = true; } //Reading with a buffer is more efficient than reading byte //by byte, but knowing where the data ends becomes an issue. //Thus the timeout is relied upon. The exception would //result in a lost data count, thus the precautions. //This section is for reading ZIP data. while(!breakWhile) { try { //Clear readBytes. System.arraycopy(blankbytes, 0, readBytes, 0, 10000); //Fill readBytes. bufferContentSize = zis.read(readBytes,0,10000); } catch(Exception e) { //Likely a port timeout. //No more reading. breakWhile = true; //If there is a port failure, close connection after work. shutdownNow = true; //Examine the buffer to see how many bytes are there. //The zis.read, with exception, will not tell us. bufferContentSize = 0; for(int count=0;count<10000;count++) { if(readBytes[count]==127) { bufferContentSize = count; count = 10000; } } } //If the buffer is empty at this point, ensure that there //will be no more reads. if(bufferContentSize<=0) breakWhile = true; //The buffer has data. Add it to entryLine. else { entryLine = entryLine + new String( readBytes,0,bufferContentSize); //If EOF, remove the marker from entryLine and quit reading. if(entryLine.contains("%EOF%")) { entryLine = entryLine.substring( 0,entryLine.indexOf("%EOF%")); breakWhile = true; } //If the buffer has data, ZIP reading is OK. zipcapable = true; System.err.println("ZIP download accepted: "+currentDataFile); sttok = new TrueStringTokenizer(entryLine,"%LBK%"); //The remainder is recycled in case we are reading //data longer than the buffer and the looping is necessary. //Do not use, individually, the last bit after tokens. while( sttok.hasMoreTokens() && sttok.getRemainder().contains("%LBK%") ) { entryLine = sttok.nextToken(); if(!entryLine.equals("")) { data_manager.takeDataLine(entryLine+"%LBK%"); } } entryLine = sttok.getRemainder(); } } //If it is zipcapable, do the other entries in the hashes. //This does nothing if in entireFile mode. if(zipcapable&&!shutdownNow) { //Entries counter is one because of zip testing. //This is skipped for entireFile. for(int entriescounter=1; (entriescounter=0) { entryLine = entryLine + new String( readBytes,0,bufferContentSize); if(entryLine.contains("%EOF%")) { entryLine = entryLine.substring( 0,entryLine.indexOf("%EOF%")); breakWhile = true; } sttok = new TrueStringTokenizer(entryLine, "%LBK%"); while(sttok.hasMoreTokens() && sttok.getRemainder().contains("%LBK%")) { entryLine = sttok.nextToken(); if(!entryLine.equals("")) { data_manager.takeDataLine(entryLine+ "%LBK%"); } } entryLine = sttok.getRemainder(); } } } } } catch(Exception e) { System.err.println( "Client error processing "+currentDataFile+ "! Client failure."); shutdownNow = true; } if(shutdownNow) { //If there is a major error found when working on data, shut //down the connection and thread. try { downloadSocket.close(); } catch(Exception e) { System.err.println( "Error closing client download socket after "+currentDataFile+ " failure!"); } setRunning(false); } } //To upload local entries identified by uploadHashes private void uploadData(String dataFileName, Vector uploadHashes) { if(uploadHashes.size()==0) return; boolean zipcapable = true; //ZIPs go to every other port. These ports should be tried first. //For this implementation, only one ZIP port will be tried. int uploadingPort = 0; boolean zipsuccess = true; String outputString = new String(""); boolean shutdownNow = false; //Try a ZIP port. if(UPLOAD_PORTS.length>1) { uploadingPort = UPLOAD_PORTS[1]; //Set up the network socket. try { uploadSocket = new Socket(SERVER_ADDRESS,uploadingPort); uploadSocket.setSoTimeout(SOCKET_TIMEOUT); ZipOutputStream zos = new ZipOutputStream( new BufferedOutputStream(uploadSocket.getOutputStream())); zos.putNextEntry(new ZipEntry(dataFileName+".txt")); for(int count=0;count localHashes, Vector remoteHashes, Vector entriesToDownload, Vector hashesToUpload) { String spanFirst = new String(""); String spanSecond = new String(""); String current = new String(""); //Set up downloads. for(int count=0;count index_lines, Vector missing_datafiles) { int datafilestatus; for(int count=0;count0) entriesDateStamp = entriesDateStamp.substring(1, entriesDateStamp.length()); } if(sttok.hasMoreTokens()) fileDateTimeStamp = sttok.nextToken(); if(sttok.hasMoreTokens()) entireFileSha = sttok.nextToken(); if(sttok.hasMoreTokens()) regularDataSha = sttok.nextToken(); if(sttok.hasMoreTokens()) txtSize = sttok.nextToken(); if(sttok.hasMoreTokens()) zipSize = sttok.nextToken(); abbrLocation.trim(); entriesDateStamp.trim(); fileDateTimeStamp.trim(); entireFileSha.trim(); regularDataSha.trim(); txtSize.trim(); zipSize.trim(); //Validate abbrLocation and set to uppercase. //Validate date stamps. //If the SHAs or file sizesare wrong, they will just trick us into //not taking their data, which is fine. Our own sizes and SHAs //will always be recalculated. if(isLettersAndDigits(abbrLocation)) { abbrLocation = abbrLocation.toUpperCase(); if(abbrLocation.length()>0) { if(DataManager.isValidDate(entriesDateStamp)) { if(DataManager.isValidDateTime(fileDateTimeStamp)) { new_entry = new String[7]; new_entry[0] = abbrLocation; new_entry[1] = entriesDateStamp; new_entry[2] = fileDateTimeStamp; new_entry[3] = entireFileSha; new_entry[4] = regularDataSha; new_entry[5] = txtSize; new_entry[6] = zipSize; index_lines.addElement(new_entry); } } } } } } private boolean isLettersAndDigits(String toTest) { for(int count=0;count acquireIndexOrHashes(boolean isIndex, String dataFileName) { //DO NOT FORGET TO SKIP DATES THAT ARE TOO OLD String inputString, outputString; boolean breakWhile = false; boolean zipcapable = false; boolean shutdownNow = false; Vector results = new Vector(); try { //Open the streams. d_out = new BufferedWriter(new PrintWriter( downloadSocket.getOutputStream(), true)); d_in = downloadSocket.getInputStream(); if(isIndex) outputString = "GET /index.zip\n"; else outputString = "GET /"+dataFileName+".txt/hashlist.zip\n"; d_out.write(outputString,0,outputString.length()); d_out.flush(); byte[] readBytes = new byte[10000]; int bufferContentSize = 0; String entryLine = new String(""); TrueStringTokenizer sttok; BufferedInputStream bis = new BufferedInputStream(d_in); ZipInputStream zis = new ZipInputStream(bis); breakWhile = false; try { zis.getNextEntry(); } catch(Exception e) { System.err.println("ZIP input stream error."); zipcapable = false; breakWhile = true; } while(!breakWhile) { try { System.arraycopy(blankbytes, 0, readBytes, 0, 10000); bufferContentSize = zis.read(readBytes,0,10000); } catch(Exception e) { //Likely a port timeout. breakWhile = true; //Port timeout means to dispose of connection. shutdownNow = true; bufferContentSize = 0; for(int count=0;count<10000;count++) { if(readBytes[count]==127) { bufferContentSize = count; count = 10000; } } } if(bufferContentSize<=0) breakWhile = true; else { System.err.println("ZIP index or hashes download accepted."); zipcapable = true; entryLine = entryLine + new String( readBytes,0,bufferContentSize); if(entryLine.contains("%EOF%")) { entryLine = entryLine.substring( 0,entryLine.indexOf("%EOF%")); breakWhile = true; } sttok = new TrueStringTokenizer(entryLine,"%LBK%"); //The remainder is recycled in case we are reading //data longer than the buffer and the looping is necessary. //Do not use, individually, the last bit after tokens. while( sttok.hasMoreTokens() && sttok.getRemainder().contains("%LBK%") ) { entryLine = sttok.nextToken(); if(!entryLine.equals("")) { results.addElement(entryLine); } } entryLine = sttok.getRemainder(); } } //The zip reading while statement has ceased. if(!zipcapable&&!shutdownNow) { System.err.println("ZIP Index or Hashes Download Rejected. Going TXT."); if(isIndex) outputString = "GET /index.txt\n"; else outputString = "GET /"+dataFileName+".txt/hashlist.txt\n"; d_out.write(outputString,0,outputString.length()); d_out.flush(); entryLine = ""; breakWhile = false; while(!breakWhile) { try { System.arraycopy(blankbytes, 0, readBytes, 0, 10000); bufferContentSize = bis.read(readBytes,0,10000); } catch(Exception e) { //Likely a port timeout. breakWhile = true; shutdownNow = true; bufferContentSize = 0; for(int count=0;count<10000;count++) { if(readBytes[count]==127) { bufferContentSize = count; count = 10000; } } } if(bufferContentSize<=0) breakWhile = true; else { entryLine = entryLine + new String( readBytes,0,bufferContentSize); if(entryLine.contains("%EOF%")) { entryLine = entryLine.substring( 0,entryLine.indexOf("%EOF%")); breakWhile = true; } System.err.println("TXT index or hashes download accepted."); sttok = new TrueStringTokenizer(entryLine,"%LBK%"); //The remainder is recycled in case we are reading //data longer than the buffer and the looping is //necessary. //Do not use, individually, the last bit after tokens. while( sttok.hasMoreTokens() && sttok.getRemainder().contains("%LBK%") ) { entryLine = sttok.nextToken(); if(!entryLine.equals("")) { results.addElement(entryLine); } } entryLine = sttok.getRemainder(); } } } } catch(Exception e) { System.err.println( "Client error processing index or hashes! Client failure."); shutdownNow = true; } if(shutdownNow) { System.err.println("Shutting down client socket due to completion or error."); try { downloadSocket.close(); } catch(Exception e) { System.err.println( "Error closing client download socket after index "+ "or hashes processing failure!"); } setRunning(false); return results; } return results; } //The datafile name should not have the extension. private void acquireDataFile(String dataFileName) { requestDownloads(dataFileName, new Vector(), true); } }