Pagina 1 di 2 1 2 ultimoultimo
Visualizzazione dei risultati da 1 a 10 su 16
  1. #1

    ConcurrentHashMap e memoria condivisa

    Sono in una applicazione client server in cui il server che ha come campo un mappa, accetta delle richieste dai client ed le gestisce costruendo per ogni client un thread per effettuare delle operazioni su di un mappa, leggere o modificare alcune chiavi,
    Ho usato la collezione thread-safe ConcurrentHashMap, ma se un thread modifica una chiave della mappa mi sono accorto che la modifica permane solo per la durata di vita del thread, credo sia un problema di memoria condivisa ma non sono riuscito a venirne a capo, mi potreste dare una mano per favore.
    Mi posto un codice di esempio per far capire meglio cosa dovrei fare...

    codice:
    //La mappa è un campo privato della classe Server
    private ConcurrentHashMap<Integer,Controllo> mappa = new ConcurrentHashMap<Integer,Controllo>();
    
    //Al server arriva una richiesta dal client che genera un thread per gestirla
    //il corpo del thread dovrebbe controllare il valore della chiave ed eventualmente settarla
    
    Controllo c = mappa.get(intero);
    if(!v.getFlag()){
    	v.setFlag(true);
                  //Il rinserimento con la put della chiave nella mappa non dovrebbe essere      
                  //fatto  ma comunque non funziona indipendentemente se viene fatto o meno
    	mappa.put(intero,v);
    }
    else
    {
    	System.out.println("Il valore della Flag : "+v.getFlag()); 
    
    }
    
    //Un nuovo thread cha accede alla mappa dopo la fine dell'esecuzione 
    //del thread percedente non trova le modifiche...
    
    ....
    Mi aiutate con qualke buon consiglio..Grazie

  2. #2

    Re: ConcurrentHashMap e memoria condivisa

    Originariamente inviato da fuoricorso
    Ho usato la collezione thread-safe ConcurrentHashMap, ma se un thread modifica una chiave della mappa mi sono accorto che la modifica permane solo per la durata di vita del thread
    E' decisamente insolito. Ho provato a scrivere un po' di codice (più o meno implementa la situazione da te descritta) per vedere se ciò si verifica, ma il tutto sembra funzionare;
    i vari thread vedono le modifiche anche in caso di disconnessione/terminazione da parte di altri thread.

    Ecco qui il codice che ho scritto:
    Server.java
    codice:
    /**
     * @(#)Server.java
     *
     *
     * @author Vincenzo
     * @version 1.00 2011/6/7
     */
    
    import java.io.*;
    import java.net.*;
    import java.util.concurrent.*;
    
    public class Server {
    	private ServerSocket ss;
    	private ConcurrentLinkedQueue<Client> process;
        private ConcurrentHashMap<Integer, String> map;
        
        /**
         * Creates a new instance of <code>Server</code>.
         */
        public Server(int port) {
        	try {
        		this.ss = new ServerSocket(port);
        		this.map = new ConcurrentHashMap<Integer, String>();
        		this.process = new ConcurrentLinkedQueue<Client>();
        	} catch ( IOException e ){
        		e.printStackTrace();
        	}
        }
        
        public void start() throws IOException{
        	System.out.println("Server started..\n");
        	while( true ){
        		Socket s = null;
        		if( (s = this.ss.accept()) != null )
        			new Client(s).start();
        	}
        }
        
        /**
         * Client
         * @author Vincenzo
         */
        private class Client extends Thread{
        	private ObjectOutputStream oos;    	
        	private ObjectInputStream ois;
        	
        	public Client(Socket s) throws IOException {
        		this.oos = new ObjectOutputStream( s.getOutputStream() );
        		this.ois = new ObjectInputStream( s.getInputStream() );
        		process.add(this);
        		System.out.println("New connection: " + s);
        	}
        	
        	@Override
        	public void run() {
        		try {
        			try {
        				String protocol = null;
        				while( (protocol = (String)this.ois.readObject()) != null ){
        					if( protocol.equals("SET") ){
        						int key = (Integer)this.ois.readObject();
        						String message = (String)this.ois.readObject();
        						map.put(key, message);
        						
        						// output map
        						System.out.println("\nUltima modifica by: " + this);
        						System.out.println("Map:");
        						for(Integer k : map.keySet())
        							System.out.println("- " + k + " : " + map.get(k));
        						System.out.println();
        					} else if ( protocol.equals("GET") ){
        						this.oos.writeObject( map.get((Integer)this.ois.readObject()) );
        						
        						// situazione dei processi
        						System.out.println("\nSituazione processi:");
        						for(Client p : process)
        							System.out.println("* " + p + ", alive = " + p.isAlive());
        						System.out.println();
        					}
        				}
        			} finally {
                                    this.oos.close();
        				this.ois.close();
        			}
        		} catch ( Exception e ){
        			System.out.println( e.getMessage() + ":" + this );
        		}
        	}
        }
        
        /**
         * @param args the command line arguments
         */
        public static void main(String[] args) {
            // TODO code application logic here
            try {
            	new Server(8000).start();
            } catch ( IOException e ){
            	e.printStackTrace();
            } 
        }
    }
    Client.java
    codice:
    /**
     * @(#)Client.java
     *
     *
     * @author Vincenzo
     * @version 1.00 2011/6/7
     */
    
    import java.io.*;
    import java.net.*;
    import java.util.Scanner;
    
    public class Client {
        
        /**
         * @param args the command line arguments
         */
        public static void main(String[] args) {
            // TODO code application logic here
            try {
            	Socket s = new Socket("localhost", 8000);
            	ObjectOutputStream oos = new ObjectOutputStream( s.getOutputStream() );
            	ObjectInputStream ois = new ObjectInputStream( s.getInputStream() );
            	System.out.println("Connesso con il server!\n");
            	
            	Scanner scan = new Scanner(System.in);
            	while( true ){
            		System.out.println("1 - Inserisci/modifica valore di una chiave");
            		System.out.println("2 - Ottieni il valore di una chiave");
            		System.out.print("Scelta: " );
            		int choice = Integer.parseInt( scan.nextLine() );
            		int key;
            		String message;
            		
            		switch ( choice ){
            			case 1:
            				System.out.print("\nKey: ");
            				key = Integer.parseInt( scan.nextLine() );
            				System.out.print("Message: ");
            				message = scan.nextLine();
            				oos.writeObject("SET");
            				oos.writeObject(key);
            				oos.writeObject(message);
            				System.out.println("Sended: " + key + ", " + message + "\n");
            				break;
            			case 2:
            				System.out.print("\nKey to read: ");
            				key = Integer.parseInt( scan.nextLine() );
            				oos.writeObject("GET");
            				oos.writeObject(key);
            				message = (String)ois.readObject();
            				System.out.println("Received: " + key + ", " + message + "\n");
            				break;
            		}
            	}
            	
            } catch ( Exception e ){
            	e.printStackTrace();
            }
        }
    }
    Magari provalo e fammi sapere.

  3. #3

    GRAZIE GRAZIE

    Ciao GRAZIE molto per la risposta, il codice funziona bene ed è un ottimo esempio. In realtà avevo fatto un errore grossolano nel mio codice, in quanto costruivo la mappa nel metodo run del server(la mappa viene creata all'avvio del server e riempita con dei dati e dopo devo gestire le modifiche) in questo modo essa veniva settata ogni volta che si collegava un nuovo client, mi erano sorti vari dubbi leggendo qualcosa su alcuni forum. Ora dovrei chiederti un grosso piacere, siccome a me interessa sincronizzare un eventuale modifica ad una stessa chiave della mappa alla quale ipoteticamente possono accedere più client(cioè come dal tuo codice i thread della Classe Server ) nello stesso tempo, avevo pensato di mettere nell'oggetto/chiave un campo ReentrantLock per gestire la sincronizzazione, cioè ogni chiave che ovviamente è un oggetto di una classe ad esempio Controllo ha un campo ReentrantLock ed un campo Boolean che più Client potrebbero voler modificare, eventualmente in modo concorrente, il Client effettua una richiesta per accedere ad una chiave della mappa, il thread client del server accede alla mappa e recupera la chiave a questo punto deve modificare un campo della chiave, avevo pensato che una volta che il client(cioè il thread del server) faceva la get(...) sulla mappa, usava il semaforo della chiave per sincronizzare una modifica concorrente, quindi dovrei gestire la sincronizzazione nella modifica di una stessa chiave, tale modifica può essere effetuata una sola volta , mi potresti dare un consiglio, grazie.

  4. #4

    Re: GRAZIE GRAZIE

    Originariamente inviato da fuoricorso
    avevo pensato che una volta che il client(cioè il thread del server) faceva la get(...) sulla mappa, usava il semaforo della chiave per sincronizzare una modifica concorrente, quindi dovrei gestire la sincronizzazione nella modifica di una stessa chiave
    Volendo si può usare il Lock sull'oggetto associato ad una determinata chiave.

    Nell'esempio ho introdotto tale classe (rimpiazza l'oggetto String):
    Message.java
    codice:
    /**
     * @(#)Message.java
     *
     *
     * @author Vincenzo
     * @version 1.00 2011/6/7
     */
    
    import java.util.concurrent.locks.*;
    
    public class Message {
        private String message;
        private final Lock lock;
        
        /**
         * Creates a new instance of <code>Message</code>.
         */
        public Message(String message) {
        	this.lock = new ReentrantLock();
        	this.message = message;
        }
        
        /*
         * @param message
         * @throws RuntimeException
         */
       	public void setMessage(String message) {
         	if( this.lock.tryLock() ) {
         		try {
         			// ad esempio il lavoro dura 10 sec
         			try{
         				Thread.sleep(10000);
           				this.message = message;
         			} catch (InterruptedException e){
         				e.printStackTrace();
         			}
         		} finally {
           			this.lock.unlock();
         		}
         	} else 
         		throw 
         			new RuntimeException("setMessage: Message already locked by another thread!");
       	}
       	
       	/*
       	 * @return message if not locked
       	 * @throws RuntimeException
       	 */
       	public String getMessage() {
       		if( ((ReentrantLock)this.lock).isLocked() )
       			throw new RuntimeException("getMessage: Message already locked by another thread!");
       		
       		return this.message;
       	}
       	
       	@Override
       	public String toString(){
       		return this.message;
       	}
    }
    che, come puoi vedere, utilizza un oggetto ReentrantLock.

    Quindi Server e Client li ho modificati come segue:
    Server.java
    codice:
    /**
     * @(#)Server.java
     *
     *
     * @author Vincenzo
     * @version 1.00 2011/6/7
     */
    
    import java.io.*;
    import java.net.*;
    import java.util.concurrent.*;
    
    public class Server {
    	private ServerSocket ss;
    	private ConcurrentLinkedQueue<Client> process;
        private ConcurrentHashMap<Integer, Message> map;
        
        /**
         * Creates a new instance of <code>Server</code>.
         */
        public Server(int port) {
        	try {
        		this.ss = new ServerSocket(port);
        		this.map = new ConcurrentHashMap<Integer, Message>();
        		this.process = new ConcurrentLinkedQueue<Client>();
        	} catch ( IOException e ){
        		e.printStackTrace();
        	}
        }
        
        public void start() throws IOException{
        	System.out.println("Server started..\n");
        	while( true ){
        		Socket s = null;
        		if( (s = this.ss.accept()) != null )
        			new Client(s).start();
        	}
        }
        
        /**
         * Client
         * @author Vincenzo
         */
        private class Client extends Thread{
        	private ObjectOutputStream oos;    	
        	private ObjectInputStream ois;
        	
        	public Client(Socket s) throws IOException {
        		this.oos = new ObjectOutputStream( s.getOutputStream() );
        		this.ois = new ObjectInputStream( s.getInputStream() );
        		process.add(this);
        		System.out.println("New connection: " + s);
        	}
        	
        	@Override
        	public void run() {
        		try {
        			try {
        				String protocol = null;
        				while( (protocol = (String)this.ois.readObject()) != null ){
        					if( protocol.equals("SET") ){
        						int key = (Integer)this.ois.readObject();
        						String message = (String)this.ois.readObject();
        						try {
        							Message m = null;
        							if( (m = map.get(key)) == null )
        								map.put(key, new Message(message));
        							else
        								m.setMessage(message);
        							
        							// notifica all'utente
        							this.oos.writeObject( (m != null ? "OK you have modified message with key " 
        						    									: "OK you have entered new message with key ") + key);
        							
        							// output map
        							System.out.println("\nUltima modifica by: " + this);
        							System.out.println("Map:");
        							for(Integer k : map.keySet())
        								System.out.println("- " + k + " : " + map.get(k));
        							System.out.println();
        							
        						} catch ( RuntimeException e ){
        							this.oos.writeObject( e.getMessage() );
        							System.out.println( "Access not allowed to " + this + "\n" + e.getMessage() + ", key: " + key );
        						}
        					} else if ( protocol.equals("GET") ){
        						try {
        							Message message = map.get((Integer)this.ois.readObject());
        							if( message != null )
        								this.oos.writeObject( message.getMessage() );
        							else
        								this.oos.writeObject("ERROR inexistent key.");
        							
        							// situazione dei processi
        							System.out.println("\nSituazione processi:");
        							for(Client p : process)
        								System.out.println("* " + p + ", alive = " + p.isAlive());
        							System.out.println();
        							
        						} catch ( RuntimeException e ){
        							this.oos.writeObject( "ERROR " + e.getMessage() );
        							System.out.println( "Access not allowed to " + this + "\n" + e.getMessage() );
        						}
        					}
        				}
        			} finally {
        				this.oos.close();
        				this.ois.close();
        			}
        		} catch ( Exception e ){
        			System.out.println( e.getMessage() + ":" + this );
        		}
        	}
        }
        
        /**
         * @param args the command line arguments
         */
        public static void main(String[] args) {
            // TODO code application logic here
            try {
            	new Server(8000).start();
            } catch ( IOException e ){
            	e.printStackTrace();
            } 
        }
    }
    Client.java
    codice:
    /**
     * @(#)Client.java
     *
     *
     * @author Vincenzo
     * @version 1.00 2011/6/7
     */
    
    import java.io.*;
    import java.net.*;
    import java.util.Scanner;
    
    public class Client {
        
        /**
         * @param args the command line arguments
         */
        public static void main(String[] args) {
            // TODO code application logic here
            try {
            	Socket s = new Socket("localhost", 8000);
            	ObjectOutputStream oos = new ObjectOutputStream( s.getOutputStream() );
            	ObjectInputStream ois = new ObjectInputStream( s.getInputStream() );
            	System.out.println("Connesso con il server!\n");
            	
            	Scanner scan = new Scanner(System.in);
            	while( true ){
            		System.out.println("1 - Inserisci/modifica valore di una chiave");
            		System.out.println("2 - Ottieni il valore di una chiave");
            		System.out.print("Scelta: " );
            		int choice = Integer.parseInt( scan.nextLine() );
            		int key;
            		String message;
            		
            		switch ( choice ){
            			case 1:
            				System.out.print("\nKey: ");
            				key = Integer.parseInt( scan.nextLine() );
            				System.out.print("Message: ");
            				message = scan.nextLine();
            				oos.writeObject("SET");
            				oos.writeObject(key);
            				oos.writeObject(message);
            				
            				message = (String)ois.readObject();
            				if( !message.contains("OK") )
            					System.out.println("Server after dispatch says: " + message);
            				else
            					System.out.println("YEAH! Message for key " + key + " accepted!");
            				
            				break;
            			case 2:
            				System.out.print("\nKey to read: ");
            				key = Integer.parseInt( scan.nextLine() );
            				oos.writeObject("GET");
            				oos.writeObject(key);
            				message = (String)ois.readObject();
            				if ( message.contains("ERROR") )
            					System.out.println("Server after request says: " + message);
            				else
            					System.out.println("YEAH! I took this message for key " + key + ": " + message);
            				break;
            		}
            		System.out.println();
            	}
            	
            } catch ( Exception e ){
            	e.printStackTrace();
            }
        }
    }
    In pratica se un thread sta settando un messaggio con chiave k e nello stesso istante t (nell'esempio varia in [0, 10] sec) un secondo thread tenta di accedere o settare il contenuto della medesima chiave k, non può farlo finchè il thread che lo precede non finisce.

  5. #5
    Grazie, alla fine ci sono arrivato anch'io il dubbio era se poteva essere una buona soluzione oppure si doveva gestire con un blocco synchronized senza mettere un campo Lock nella classe, comunque alla fine mi funziona e ti ringrazio.
    Ti volevo chiedere, se io volessi terminare il client come scelta dello swich senza chiudere il programma con "una chiusura forzata" come potrei gestire la situazione sia al lato client che lato server...

  6. #6
    Originariamente inviato da fuoricorso
    Ti volevo chiedere, se io volessi terminare il client come scelta dello swich senza chiudere il programma con "una chiusura forzata" come potrei gestire la situazione sia al lato client che lato server...
    Lato client lo chiedi esplicitamente all'utente, cioè gli metti a disposizione una opzione per effettuare la disconnessione/chiusura dell'applicazione.
    Invece lato server la soluzione è già nell'esempio:
    sicuramente avrai notato che quando fai disconnettere un client dal server, viene lanciata questa istruzione:
    codice:
    System.out.println( e.getMessage() + ":" + this );
    all'interno del blocco catch.

  7. #7

    GRAZIE

    Conosci una struttura dati che sia thread-safe ed in più mi permetta di fare un qualche ordinamento ai suoi oggetti, tipo un ConcurrentHashMap che però abbia anche un comparatore per avere un ordinamento in base ad una priorità data dalla richieste (cioè le get()) fatte sui suoi oggetti. Esiste la PriorityBlockingQueue ma per fare quello che mi serve potrei trovarmi nella situazione di voler aggiornare un oggetto che è stato richiesto più volte allora dovrei prima cercarlo, poi eliminarlo ed in inserirlo...oppure sbaglio? Mi servirebbe tipo una "ConcurrentTreeMap" per fare una lista dei preferiti senza ovviamnete duplicati e thread-safe esiste? Grazie per la pazienza..

  8. #8

    Re: GRAZIE

    Originariamente inviato da fuoricorso
    Esiste la PriorityBlockingQueue ma per fare quello che mi serve potrei trovarmi nella situazione di voler aggiornare un oggetto che è stato richiesto più volte allora dovrei prima cercarlo, poi eliminarlo ed in inserirlo...oppure sbaglio?
    Non c'è bisogno di eliminarlo e re-inserirlo, previa mutabilità dell'oggetto.
    Infatti nell'esempio quando un thread vuole settare il messaggio, non viene eseguita alcun put dell'oggetto, proprio perchè tale oggetto è mutabile.

  9. #9
    Grazie, ci provo....

  10. #10
    Ciao mi daresti una mano ho problemi con la coda la PriorityBlockingQueue, ho fatto il comparatore per inserire gli elementi in modo ordinato in base alla richiesta che ne viene fatta di questi, cioè io accedo alla ConcurrentHashMap per recuperare un oggetto e setto il campo preferenza di questo, ogni volta che ne viene fatta richiesta, inserisco l'elemento richiesto in una PriorityBlockingQueue SOLO la prima volta che viene richiesto, il comparatore usato mi permette di inserire gli elementi in ordine in base alle richieste, il codice sembra corretto infatti se lo eseguo a parte funziona bene ma inserito nel applicazione client server ho dei problemi in particolare, il campo preferenze mi viene settato ma la coda non viene ordinata in base alle preferrenze ma è come se utilizzassi una coda normale...è un problema di metodi usati io per inserire l'elemeto nella coda uso il metodo add() questa istruzione ovviamente viene fatta solo alla prima richiesta. Mi dai un mano

Permessi di invio

  • Non puoi inserire discussioni
  • Non puoi inserire repliche
  • Non puoi inserire allegati
  • Non puoi modificare i tuoi messaggi
  •  
Powered by vBulletin® Version 4.2.1
Copyright © 2025 vBulletin Solutions, Inc. All rights reserved.